From 2cfc0503913aecc3676b1058413736f81c002458 Mon Sep 17 00:00:00 2001 From: cgeek Date: Sun, 20 Sep 2020 10:58:20 +0200 Subject: [PATCH 1/4] mailing: added queue --- src/lib/dwatch.ts | 2 + src/lib/mail.ts | 100 ++++++++++++++------ src/lib/watchers/abstract/url-watcher.ts | 6 +- src/lib/watchers/webdiff/webdiff-watcher.ts | 6 +- src/lib/watchers/ws2p/ws2p-watcher.ts | 6 +- 5 files changed, 83 insertions(+), 37 deletions(-) diff --git a/src/lib/dwatch.ts b/src/lib/dwatch.ts index 7d971f6..6efb535 100644 --- a/src/lib/dwatch.ts +++ b/src/lib/dwatch.ts @@ -8,11 +8,13 @@ import {webDiffWatcher} from "./watchers/webdiff/webdiff-watcher"; import {Watcher} from "./types/state"; import {headWatcher} from "./watchers/bma/head-watcher"; import {jsonWatcher} from "./watchers/wotwizard/json-watcher"; +import {initConfMail} from './mail' export async function dwatch(confFile: string) { const yml = fs.readFileSync(confFile, 'utf8') const conf = yaml.load(yml) as Conf + initConfMail(conf.mail) const watchers: Watcher[] = []; (await Promise.all((conf.ws2pServers || []).map(ws2pWatcher(conf)))).forEach(w => watchers.push(w)); (await Promise.all((conf.bmaServers || []).map(bmaWatcher(conf)))).forEach(w => watchers.push(w)); diff --git a/src/lib/mail.ts b/src/lib/mail.ts index fb78075..60578bb 100644 --- a/src/lib/mail.ts +++ b/src/lib/mail.ts @@ -4,47 +4,85 @@ import {ConfMail} from './types/conf' import * as nodemailer from 'nodemailer' import * as os from 'os' -export async function sendMail(conf: ConfMail, subject: string, html: string, cc?: string) { +// TODO: in confMail + +const QUEUE_PERIOD = 5000 +const queue: EmailContent[] = [] +let conf: ConfMail|undefined + +export function initConfMail(confMail: ConfMail) { + conf = confMail + + ;(async () => { + while (true) { + await consumeQueue() + await new Promise((res) => setTimeout(res, QUEUE_PERIOD)) + } + })() +} + +export async function queueEmail(subject: string, html: string, cc?: string) { + queue.push({ subject, body: html, cc }) +} + +async function consumeQueue() { + while (queue.length) { + const mail = queue.shift() + if (mail) { + await sendEmail(mail.subject, mail.body, mail.cc) + } + } +} + +async function sendEmail(subject: string, html: string, cc?: string) { + + if (!conf) { + console.error(`Mail is not properly configured.`) + return + } console.log(`[mail] Subject: ${subject}`) console.log(`[mail] Body: ${html}`) - if (conf.enabled) { - let transporter = nodemailer.createTransport({ - host: conf.host, - port: conf.port, - secure: false, // true for 465, false for other ports - auth: { - user: conf.username, // generated ethereal user - pass: conf.apikey // generated ethereal password - }, - authMethod: conf.auth, - requireTLS: true, - }); - -// send mail with defined transport object - let info = await transporter.sendMail({ - from: conf.from, - to: conf.to, - cc: (conf.cc ? [conf.cc] : []).concat(cc ? [cc] : []).join(','), - subject, - html - }) + if (!conf.enabled) { + console.warn(`Mail is disabled.`) + return } + + let transporter = nodemailer.createTransport({ + host: conf.host, + port: conf.port, + secure: false, // true for 465, false for other ports + auth: { + user: conf.username, // generated ethereal user + pass: conf.apikey // generated ethereal password + }, + authMethod: conf.auth, + requireTLS: true, + }); + + // send mail with defined transport object + let info = await transporter.sendMail({ + from: conf.from, + to: conf.to, + cc: (conf.cc ? [conf.cc] : []).concat(cc ? [cc] : []).join(','), + subject, + html + }) } export const mail = { - onEstablished: (conf: Conf, target: string, message = `Connection established for ${target}`, getHtml: () => string = () => ` + onEstablished: (target: string, message = `Connection established for ${target}`, getHtml: () => string = () => `

Connection from [${os.hostname}] to ${target} established on ${moment().format('DD-MM-YYYY HH:mm:ss')}.

`) => { return async (cc?: string) => { - await sendMail(conf.mail, `[dw] [${os.hostname}] ${message}`, getHtml(), cc) + await queueEmail(`[dw] [${os.hostname}] ${message}`, getHtml(), cc) } }, - onDisconnect: (conf: Conf, target: string, message = `Connection closed for ${target}`, getErrorMessage: () => string = () => '', getHtml: (waitingDelay: number, recallDelay: number) => string = (waitingDelay: number, recallDelay: number) => ` + onDisconnect: (target: string, message = `Connection closed for ${target}`, getErrorMessage: () => string = () => '', getHtml: (waitingDelay: number, recallDelay: number) => string = (waitingDelay: number, recallDelay: number) => `

Connection from [${os.hostname}] to ${target} was lost on ${moment().format('dd-MM-YYYY HH:mm:ss')}.

@@ -55,18 +93,24 @@ export const mail = { `) => { return async (waitingDelay: number, recallDelay: number, cc?: string) => { console.log('Waiting %s seconds...', (waitingDelay / 1000).toFixed(0)) - await sendMail(conf.mail, `[dw] [${os.hostname}] ${message}`, getHtml(waitingDelay, recallDelay), cc) + await queueEmail(`[dw] [${os.hostname}] ${message}`, getHtml(waitingDelay, recallDelay), cc) } }, - onRestartSuccess: (conf: Conf, target: string, message = `Connection recovered for ${target}`, getHtml: () => string = () => ` + onRestartSuccess: (target: string, message = `Connection recovered for ${target}`, getHtml: () => string = () => `

Connection from [${os.hostname}] to ${target} was recovered on ${moment().format('dd-MM-YYYY HH:mm:ss')}.

`) => { return async (cc?: string) => { console.log(`${message}`) - await sendMail(conf.mail, `[dw] [${os.hostname}] ${message}`, getHtml(), cc) + await queueEmail(`[dw] [${os.hostname}] ${message}`, getHtml(), cc) } }, } + +interface EmailContent { + subject: string + body: string + cc?: string +} \ No newline at end of file diff --git a/src/lib/watchers/abstract/url-watcher.ts b/src/lib/watchers/abstract/url-watcher.ts index d956783..ea12d00 100644 --- a/src/lib/watchers/abstract/url-watcher.ts +++ b/src/lib/watchers/abstract/url-watcher.ts @@ -47,7 +47,7 @@ export function urlWatcher(conf: Conf, checkValidity: (data: any) => Promise { @@ -57,14 +57,14 @@ export function urlWatcher(conf: Conf, checkValidity: (data: any) => Promise `

${error.errorMessage}

` } - return mail.onDisconnect(conf, urlConf.address, koTitle, koMessage)(waitingDelay, recallDelay) + return mail.onDisconnect(urlConf.address, koTitle, koMessage)(waitingDelay, recallDelay) }, async () => { console.log('Trying to connect to %s', urlConf.address) }, - mail.onRestartSuccess(conf, urlConf.address, getRecoveredTitle()), + mail.onRestartSuccess(urlConf.address, getRecoveredTitle()), ) } diff --git a/src/lib/watchers/webdiff/webdiff-watcher.ts b/src/lib/watchers/webdiff/webdiff-watcher.ts index fe7da3a..e092e8f 100644 --- a/src/lib/watchers/webdiff/webdiff-watcher.ts +++ b/src/lib/watchers/webdiff/webdiff-watcher.ts @@ -64,10 +64,10 @@ export function webDiffWatcher(conf: Conf) { conf.waitingDelay, conf.recallDelay, - () => mail.onEstablished(conf, target, 'webdiff successfully started')(webDiffConf.cc), + () => mail.onEstablished(target, 'webdiff successfully started')(webDiffConf.cc), // When a disconnection is detected - (waitingDelay: number, recallDelay: number) => mail.onDisconnect(conf, target, 'Diff detected', undefined, (waitingDelay: number) => ` + (waitingDelay: number, recallDelay: number) => mail.onDisconnect(target, 'Diff detected', undefined, (waitingDelay: number) => ` ${htmlDiff}

Waiting ${(waitingDelay / 1000).toFixed(0)} seconds before trying to reconnect. @@ -78,7 +78,7 @@ export function webDiffWatcher(conf: Conf) { console.log('Trying to connect to %s', target) }, - () => mail.onRestartSuccess(conf, target)(webDiffConf.cc), + () => mail.onRestartSuccess(target)(webDiffConf.cc), ) } diff --git a/src/lib/watchers/ws2p/ws2p-watcher.ts b/src/lib/watchers/ws2p/ws2p-watcher.ts index 96bc697..aa156cd 100644 --- a/src/lib/watchers/ws2p/ws2p-watcher.ts +++ b/src/lib/watchers/ws2p/ws2p-watcher.ts @@ -44,16 +44,16 @@ export function ws2pWatcher(conf: Conf) { conf.waitingDelay, conf.recallDelay, - mail.onEstablished(conf, target, `State OK WS2P on ${wserver.address}`), + mail.onEstablished(target, `State OK WS2P on ${wserver.address}`), // When a disconnection is detected - mail.onDisconnect(conf, target, `State FAILURE WS2P on ${wserver.address}`), + mail.onDisconnect(target, `State FAILURE WS2P on ${wserver.address}`), async () => { console.log('Trying to connect to %s', target) }, - mail.onRestartSuccess(conf, target, `State RECOVERED WS2P on ${wserver.address}`), + mail.onRestartSuccess(target, `State RECOVERED WS2P on ${wserver.address}`), ) } From 356a07e39e82c0116f058df0cdd6ada78adca699 Mon Sep 17 00:00:00 2001 From: cgeek Date: Sun, 20 Sep 2020 11:08:00 +0200 Subject: [PATCH 2/4] mailing: frequency consumption set by conf --- app.yml | 1 + src/lib/mail.ts | 18 ++++++++++-------- src/lib/types/conf.ts | 1 + 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/app.yml b/app.yml index f3ac6fa..4fd55f9 100644 --- a/app.yml +++ b/app.yml @@ -38,6 +38,7 @@ wwMeta: mail: enabled: false + frequency: 10000 # 10" host: smtp.sparkpostmail.com port: 587 auth: LOGIN diff --git a/src/lib/mail.ts b/src/lib/mail.ts index 60578bb..d978439 100644 --- a/src/lib/mail.ts +++ b/src/lib/mail.ts @@ -1,31 +1,33 @@ import {moment} from "duniter/app/lib/common-libs/moment"; -import {Conf} from "./types/conf"; -import {ConfMail} from './types/conf' +import {ConfMail} from "./types/conf"; import * as nodemailer from 'nodemailer' import * as os from 'os' -// TODO: in confMail - -const QUEUE_PERIOD = 5000 +const DEFAULT_FREQUENCY = 5 * 60 * 1000 // 5' const queue: EmailContent[] = [] let conf: ConfMail|undefined export function initConfMail(confMail: ConfMail) { conf = confMail + const freq = conf && conf.frequency || DEFAULT_FREQUENCY + const freqS = freq / 1000 + console.log(`[conf] mails are sent every ${freqS.toFixed(0)}s`) ;(async () => { while (true) { await consumeQueue() - await new Promise((res) => setTimeout(res, QUEUE_PERIOD)) + await new Promise((res) => setTimeout(res, freq)) } })() } export async function queueEmail(subject: string, html: string, cc?: string) { queue.push({ subject, body: html, cc }) + console.log(`[mail] added 1 mail to queue`) } async function consumeQueue() { + console.log(`[mail] consuming mailing queue: ${queue.length} messages to be sent`) while (queue.length) { const mail = queue.shift() if (mail) { @@ -41,8 +43,8 @@ async function sendEmail(subject: string, html: string, cc?: string) { return } - console.log(`[mail] Subject: ${subject}`) - console.log(`[mail] Body: ${html}`) + console.debug(`[mail] Subject: ${subject}`) + console.debug(`[mail] Body: ${html}`) if (!conf.enabled) { console.warn(`Mail is disabled.`) return diff --git a/src/lib/types/conf.ts b/src/lib/types/conf.ts index 99b8a80..ef1c7e5 100644 --- a/src/lib/types/conf.ts +++ b/src/lib/types/conf.ts @@ -51,6 +51,7 @@ export interface ConfDprobeHeartbeat extends ConfURL{ export interface ConfMail { enabled: boolean + frequency: number host: string port: number auth: string From 2f36cea1b058796da180b821cd5670ba90dab741 Mon Sep 17 00:00:00 2001 From: cgeek Date: Sun, 20 Sep 2020 11:36:44 +0200 Subject: [PATCH 3/4] mailing: grouped emails --- app.yml | 2 +- src/lib/mail.ts | 44 ++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/app.yml b/app.yml index 4fd55f9..9da37b8 100644 --- a/app.yml +++ b/app.yml @@ -38,7 +38,7 @@ wwMeta: mail: enabled: false - frequency: 10000 # 10" + frequency: 3000 # 10" host: smtp.sparkpostmail.com port: 587 auth: LOGIN diff --git a/src/lib/mail.ts b/src/lib/mail.ts index d978439..739ba2b 100644 --- a/src/lib/mail.ts +++ b/src/lib/mail.ts @@ -4,7 +4,7 @@ import * as nodemailer from 'nodemailer' import * as os from 'os' const DEFAULT_FREQUENCY = 5 * 60 * 1000 // 5' -const queue: EmailContent[] = [] +const queue: MailContent[] = [] let conf: ConfMail|undefined export function initConfMail(confMail: ConfMail) { @@ -26,16 +26,52 @@ export async function queueEmail(subject: string, html: string, cc?: string) { console.log(`[mail] added 1 mail to queue`) } +function consumeMessages(cc: string): MailContent[] { + const mails: MailContent[] = [] + for (let i = 0; i < queue.length; i++) { + const mail = queue[i] + if (mail.cc === (cc || undefined)) { + mails.push(queue[i]) + queue.splice(i, 1) + i-- + } + } + return mails +} + async function consumeQueue() { console.log(`[mail] consuming mailing queue: ${queue.length} messages to be sent`) while (queue.length) { - const mail = queue.shift() - if (mail) { + if (queue.length > 1) { + const ccs = queue.reduce((ccs, mail) => { + const cc = getGroup(mail) + if (ccs.indexOf(cc) === -1) { + ccs.push(cc) + } + return ccs + }, [] as string[]) + console.log(`[mail] grouping messages into ${ccs.length} group(s)`) + for (const cc of ccs) { + const messages: MailContent[] = consumeMessages(cc) + const subject = `[dw] [${os.hostname}] ${messages.length} notifications` + let body = '' + for (const message of messages) { + body += `

${message.subject}

\n` + body += `
${message.body}
\n` + } + await sendEmail(subject, body, cc || undefined) + } + } else { + const mail = queue.shift() as MailContent await sendEmail(mail.subject, mail.body, mail.cc) } } } +function getGroup(mail: MailContent) { + return mail.cc || '' +} + async function sendEmail(subject: string, html: string, cc?: string) { if (!conf) { @@ -111,7 +147,7 @@ export const mail = { }, } -interface EmailContent { +interface MailContent { subject: string body: string cc?: string From 27b0a67f687300c654d8af356e0d5afb6c6f8204 Mon Sep 17 00:00:00 2001 From: cgeek Date: Sun, 20 Sep 2020 11:37:10 +0200 Subject: [PATCH 4/4] mailing: optimize imports --- src/lib/watchers/webdiff/webdiff-watcher.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/lib/watchers/webdiff/webdiff-watcher.ts b/src/lib/watchers/webdiff/webdiff-watcher.ts index e092e8f..6e7d84b 100644 --- a/src/lib/watchers/webdiff/webdiff-watcher.ts +++ b/src/lib/watchers/webdiff/webdiff-watcher.ts @@ -1,11 +1,8 @@ import {watcherLoop} from "../../watcherLoop"; -import {Conf, ConfBMA, ConfWebDiff} from "../../types/conf"; +import {Conf, ConfWebDiff} from "../../types/conf"; import Axios from "axios"; import {mail} from "../../mail"; import {diffChars} from "diff"; -import * as fs from "fs"; -import * as path from "path"; -import {moment} from "duniter/app/lib/common-libs/moment"; export function webDiffWatcher(conf: Conf) {