Merge branch 'feature/mailing-queue'

This commit is contained in:
2020-09-20 11:37:18 +02:00
7 changed files with 127 additions and 44 deletions

View File

@@ -38,6 +38,7 @@ wwMeta:
mail:
enabled: false
frequency: 3000 # 10"
host: smtp.sparkpostmail.com
port: 587
auth: LOGIN

View File

@@ -8,11 +8,13 @@ import {webDiffWatcher} from "./watchers/webdiff/webdiff-watcher";
import {Watcher} from "./types/state";
import {headWatcher} from "./watchers/bma/head-watcher";
import {jsonWatcher} from "./watchers/wotwizard/json-watcher";
import {initConfMail} from './mail'
export async function dwatch(confFile: string) {
const yml = fs.readFileSync(confFile, 'utf8')
const conf = yaml.load(yml) as Conf
initConfMail(conf.mail)
const watchers: Watcher[] = [];
(await Promise.all((conf.ws2pServers || []).map(ws2pWatcher(conf)))).forEach(w => watchers.push(w));
(await Promise.all((conf.bmaServers || []).map(bmaWatcher(conf)))).forEach(w => watchers.push(w));

View File

@@ -1,14 +1,91 @@
import {moment} from "duniter/app/lib/common-libs/moment";
import {Conf} from "./types/conf";
import {ConfMail} from './types/conf'
import {ConfMail} from "./types/conf";
import * as nodemailer from 'nodemailer'
import * as os from 'os'
export async function sendMail(conf: ConfMail, subject: string, html: string, cc?: string) {
const DEFAULT_FREQUENCY = 5 * 60 * 1000 // 5'
const queue: MailContent[] = []
let conf: ConfMail|undefined
export function initConfMail(confMail: ConfMail) {
conf = confMail
const freq = conf && conf.frequency || DEFAULT_FREQUENCY
const freqS = freq / 1000
console.log(`[conf] mails are sent every ${freqS.toFixed(0)}s`)
;(async () => {
while (true) {
await consumeQueue()
await new Promise((res) => setTimeout(res, freq))
}
})()
}
export async function queueEmail(subject: string, html: string, cc?: string) {
queue.push({ subject, body: html, cc })
console.log(`[mail] added 1 mail to queue`)
}
function consumeMessages(cc: string): MailContent[] {
const mails: MailContent[] = []
for (let i = 0; i < queue.length; i++) {
const mail = queue[i]
if (mail.cc === (cc || undefined)) {
mails.push(queue[i])
queue.splice(i, 1)
i--
}
}
return mails
}
async function consumeQueue() {
console.log(`[mail] consuming mailing queue: ${queue.length} messages to be sent`)
while (queue.length) {
if (queue.length > 1) {
const ccs = queue.reduce((ccs, mail) => {
const cc = getGroup(mail)
if (ccs.indexOf(cc) === -1) {
ccs.push(cc)
}
return ccs
}, [] as string[])
console.log(`[mail] grouping messages into ${ccs.length} group(s)`)
for (const cc of ccs) {
const messages: MailContent[] = consumeMessages(cc)
const subject = `[dw] [${os.hostname}] ${messages.length} notifications`
let body = ''
for (const message of messages) {
body += `<h1>${message.subject}</h1>\n`
body += `<div>${message.body}</div>\n`
}
await sendEmail(subject, body, cc || undefined)
}
} else {
const mail = queue.shift() as MailContent
await sendEmail(mail.subject, mail.body, mail.cc)
}
}
}
function getGroup(mail: MailContent) {
return mail.cc || ''
}
async function sendEmail(subject: string, html: string, cc?: string) {
if (!conf) {
console.error(`Mail is not properly configured.`)
return
}
console.debug(`[mail] Subject: ${subject}`)
console.debug(`[mail] Body: ${html}`)
if (!conf.enabled) {
console.warn(`Mail is disabled.`)
return
}
console.log(`[mail] Subject: ${subject}`)
console.log(`[mail] Body: ${html}`)
if (conf.enabled) {
let transporter = nodemailer.createTransport({
host: conf.host,
port: conf.port,
@@ -30,21 +107,20 @@ export async function sendMail(conf: ConfMail, subject: string, html: string, cc
html
})
}
}
export const mail = {
onEstablished: (conf: Conf, target: string, message = `Connection established for ${target}`, getHtml: () => string = () => `
onEstablished: (target: string, message = `Connection established for ${target}`, getHtml: () => string = () => `
<p>
Connection from [${os.hostname}] to ${target} established on ${moment().format('DD-MM-YYYY HH:mm:ss')}.
</p>
`) => {
return async (cc?: string) => {
await sendMail(conf.mail, `[dw] [${os.hostname}] ${message}`, getHtml(), cc)
await queueEmail(`[dw] [${os.hostname}] ${message}`, getHtml(), cc)
}
},
onDisconnect: (conf: Conf, target: string, message = `Connection closed for ${target}`, getErrorMessage: () => string = () => '', getHtml: (waitingDelay: number, recallDelay: number) => string = (waitingDelay: number, recallDelay: number) => `
onDisconnect: (target: string, message = `Connection closed for ${target}`, getErrorMessage: () => string = () => '', getHtml: (waitingDelay: number, recallDelay: number) => string = (waitingDelay: number, recallDelay: number) => `
<p>
Connection from [${os.hostname}] to ${target} was lost on ${moment().format('dd-MM-YYYY HH:mm:ss')}.
</p>
@@ -55,18 +131,24 @@ export const mail = {
`) => {
return async (waitingDelay: number, recallDelay: number, cc?: string) => {
console.log('Waiting %s seconds...', (waitingDelay / 1000).toFixed(0))
await sendMail(conf.mail, `[dw] [${os.hostname}] ${message}`, getHtml(waitingDelay, recallDelay), cc)
await queueEmail(`[dw] [${os.hostname}] ${message}`, getHtml(waitingDelay, recallDelay), cc)
}
},
onRestartSuccess: (conf: Conf, target: string, message = `Connection recovered for ${target}`, getHtml: () => string = () => `
onRestartSuccess: (target: string, message = `Connection recovered for ${target}`, getHtml: () => string = () => `
<p>
Connection from [${os.hostname}] to ${target} was recovered on ${moment().format('dd-MM-YYYY HH:mm:ss')}.
</p>
`) => {
return async (cc?: string) => {
console.log(`${message}`)
await sendMail(conf.mail, `[dw] [${os.hostname}] ${message}`, getHtml(), cc)
await queueEmail(`[dw] [${os.hostname}] ${message}`, getHtml(), cc)
}
},
}
interface MailContent {
subject: string
body: string
cc?: string
}

View File

@@ -51,6 +51,7 @@ export interface ConfDprobeHeartbeat extends ConfURL{
export interface ConfMail {
enabled: boolean
frequency: number
host: string
port: number
auth: string

View File

@@ -47,7 +47,7 @@ export function urlWatcher(conf: Conf, checkValidity: (data: any) => Promise<Url
conf.waitingDelay,
conf.recallDelay,
mail.onEstablished(conf, urlConf.address, getOkTitle()),
mail.onEstablished(urlConf.address, getOkTitle()),
// When a disconnection is detected
(waitingDelay: number, recallDelay, error?: any) => {
@@ -57,14 +57,14 @@ export function urlWatcher(conf: Conf, checkValidity: (data: any) => Promise<Url
koTitle = getKoTitle()
koMessage = () => `<p>${error.errorMessage}</p>`
}
return mail.onDisconnect(conf, urlConf.address, koTitle, koMessage)(waitingDelay, recallDelay)
return mail.onDisconnect(urlConf.address, koTitle, koMessage)(waitingDelay, recallDelay)
},
async () => {
console.log('Trying to connect to %s', urlConf.address)
},
mail.onRestartSuccess(conf, urlConf.address, getRecoveredTitle()),
mail.onRestartSuccess(urlConf.address, getRecoveredTitle()),
)
}

View File

@@ -1,11 +1,8 @@
import {watcherLoop} from "../../watcherLoop";
import {Conf, ConfBMA, ConfWebDiff} from "../../types/conf";
import {Conf, ConfWebDiff} from "../../types/conf";
import Axios from "axios";
import {mail} from "../../mail";
import {diffChars} from "diff";
import * as fs from "fs";
import * as path from "path";
import {moment} from "duniter/app/lib/common-libs/moment";
export function webDiffWatcher(conf: Conf) {
@@ -64,10 +61,10 @@ export function webDiffWatcher(conf: Conf) {
conf.waitingDelay,
conf.recallDelay,
() => mail.onEstablished(conf, target, 'webdiff successfully started')(webDiffConf.cc),
() => mail.onEstablished(target, 'webdiff successfully started')(webDiffConf.cc),
// When a disconnection is detected
(waitingDelay: number, recallDelay: number) => mail.onDisconnect(conf, target, 'Diff detected', undefined, (waitingDelay: number) => `
(waitingDelay: number, recallDelay: number) => mail.onDisconnect(target, 'Diff detected', undefined, (waitingDelay: number) => `
${htmlDiff}
<p>
Waiting ${(waitingDelay / 1000).toFixed(0)} seconds before trying to reconnect.
@@ -78,7 +75,7 @@ export function webDiffWatcher(conf: Conf) {
console.log('Trying to connect to %s', target)
},
() => mail.onRestartSuccess(conf, target)(webDiffConf.cc),
() => mail.onRestartSuccess(target)(webDiffConf.cc),
)
}

View File

@@ -44,16 +44,16 @@ export function ws2pWatcher(conf: Conf) {
conf.waitingDelay,
conf.recallDelay,
mail.onEstablished(conf, target, `State OK WS2P on ${wserver.address}`),
mail.onEstablished(target, `State OK WS2P on ${wserver.address}`),
// When a disconnection is detected
mail.onDisconnect(conf, target, `State FAILURE WS2P on ${wserver.address}`),
mail.onDisconnect(target, `State FAILURE WS2P on ${wserver.address}`),
async () => {
console.log('Trying to connect to %s', target)
},
mail.onRestartSuccess(conf, target, `State RECOVERED WS2P on ${wserver.address}`),
mail.onRestartSuccess(target, `State RECOVERED WS2P on ${wserver.address}`),
)
}