Merge branch 'feature/ww-head-watchers'

This commit is contained in:
2020-04-30 15:31:00 +02:00
14 changed files with 254 additions and 59 deletions

90
app.yml
View File

@@ -1,4 +1,50 @@
connectionTimeout: 10000 connectionTimeout: 10000
ws2pServers:
- address: ws://g1-test.cgeek.fr:22001
expectedPubkey: 3dnbnYY9i2bHMQUGyFp5GVvJ2wBkVpus31cDJA5cfRpj
salt: test
passwd: test
currency: g1-test # Ensure we connect to the correct currency
bmaServers:
- address: https://g1-test.cgeek.fr
frequency: 60000 # 1'
# - address: http://remuniter.cgeek.fr:16120
# frequency: 60000 # 1'
# maxLate: 14400000 # 4h'
dprobeHeartbeats:
# - address: https://wotwizard.cgeek.fr/dprobe.heartbeat.txt
# frequency: 60000 # 1'
# lastBeat: 300000 # 5'
webDiffServers:
# - file1: https://wotwizard.cgeek.fr/WWMeta.json
# file2: https://wotwizard2.cgeek.fr/WWMeta.json
# frequency: 60000
headServers:
- address: https://g1.cgeek.fr
frequency: 60000 # 1'
maxLateBlocks: 3
observedPubkey: A5LQXCkx8b6rzppfqdqeHbKPDGmKZtRcqwxP4BSeag5r
wwMeta:
- address: https://wot-wizard.duniter.org
frequency: 60000 # 1'
maxLate: 14400 # 4h
mail:
enabled: false
host: smtp.sparkpostmail.com
port: 587
auth: LOGIN
encryption: STARTTLS
username: SMTP_Injection
apikey: 6976eb51635b680b832dbe4914524d2c038a13b6
from: '"DWatcher" <dwatcher@cgeek.fr>'
to: cem.moreau@gmail.com
reconnectionDelays: reconnectionDelays:
- 5000 - 5000
- 10000 - 10000
@@ -26,46 +72,4 @@ reconnectionDelays:
- 14400000 # 4h - 14400000 # 4h
- 21600000 # 6h - 21600000 # 6h
- 21600000 # 6h - 21600000 # 6h
- 43200000 # 12h - 43200000 # 12h
- 43200000 # 12h
- 43200000 # 12h
- 43200000 # 12h
- 43200000 # 12h
- 43200000 # 12h
- 43200000 # 12h
- 43200000 # 12h
- 43200000 # 12h
ws2pServers:
- address: ws://g1-test.cgeek.fr:22001
expectedPubkey: 3dnbnYY9i2bHMQUGyFp5GVvJ2wBkVpus31cDJA5cfRpj
salt: test
passwd: test
currency: g1-test # Ensure we connect to the correct currency
bmaServers:
- address: https://g1-test.cgeek.fr
frequency: 60000 # 1'
- address: http://remuniter.cgeek.fr:16120
frequency: 60000 # 1'
maxLate: 14400000 # 4h'
dprobeHeartbeats:
- address: https://wotwizard.cgeek.fr/dprobe.heartbeat.txt
frequency: 60000 # 1'
lastBeat: 300000 # 5'
webDiffServers:
- file1: https://wotwizard.cgeek.fr/WWMeta.json
file2: https://wotwizard2.cgeek.fr/WWMeta.json
frequency: 60000
mail:
enabled: false
host: smtp.sparkpostmail.com
port: 587
auth: LOGIN
encryption: STARTTLS
username: SMTP_Injection
apikey: 6976eb51635b680b832dbe4914524d2c038a13b6
from: '"DWatcher" <dwatcher@cgeek.fr>'
to: cem.moreau@gmail.com

View File

@@ -1,6 +1,7 @@
import * as minimist from 'minimist' import * as minimist from 'minimist'
import * as path from 'path' import * as path from 'path'
import {dwatch} from './lib/dwatch' import {dwatch} from './lib/dwatch'
import {Watcher} from "./lib/types/state";
process.on('uncaughtException', (err) => { process.on('uncaughtException', (err) => {
// Dunno why this specific exception is not caught // Dunno why this specific exception is not caught
@@ -20,9 +21,9 @@ process.on('unhandledRejection', (err) => {
console.log('Starting...') console.log('Starting...')
try { try {
await dwatch(argv.conf || path.join(__dirname, '../app.yml')) const watchers: Watcher[] = await dwatch(argv.conf || path.join(__dirname, '../app.yml'))
} catch (e) { } catch (e) {
// webappServe(watchers)
console.error(e) console.error(e)
} }
})() })()

View File

@@ -5,14 +5,20 @@ import {ws2pWatcher} from "./watchers/ws2p/ws2p-watcher";
import {bmaWatcher} from "./watchers/bma/bma-watcher"; import {bmaWatcher} from "./watchers/bma/bma-watcher";
import {dprobeHeartbeat} from './watchers/dprobe/dprobe-heartbeat-watcher' import {dprobeHeartbeat} from './watchers/dprobe/dprobe-heartbeat-watcher'
import {webDiffWatcher} from "./watchers/webdiff/webdiff-watcher"; import {webDiffWatcher} from "./watchers/webdiff/webdiff-watcher";
import {Watcher} from "./types/state";
import {headWatcher} from "./watchers/bma/head-watcher";
import {jsonWatcher} from "./watchers/wotwizard/json-watcher";
export async function dwatch(confFile: string) { export async function dwatch(confFile: string) {
const yml = fs.readFileSync(confFile, 'utf8') const yml = fs.readFileSync(confFile, 'utf8')
const conf = yaml.load(yml) as Conf const conf = yaml.load(yml) as Conf
const watchers: Watcher[] = [];
await Promise.all((conf.ws2pServers || []).map(ws2pWatcher(conf))) (await Promise.all((conf.ws2pServers || []).map(ws2pWatcher(conf)))).forEach(w => watchers.push(w));
await Promise.all((conf.bmaServers || []).map(bmaWatcher(conf))) (await Promise.all((conf.bmaServers || []).map(bmaWatcher(conf)))).forEach(w => watchers.push(w));
await Promise.all((conf.dprobeHeartbeats || []).map(dprobeHeartbeat(conf))) (await Promise.all((conf.dprobeHeartbeats || []).map(dprobeHeartbeat(conf)))).forEach(w => watchers.push(w));
await Promise.all((conf.webDiffServers || []).map(webDiffWatcher(conf))) (await Promise.all((conf.webDiffServers || []).map(webDiffWatcher(conf)))).forEach(w => watchers.push(w));
(await Promise.all((conf.headServers || []).map(headWatcher(conf)))).forEach(w => watchers.push(w));
(await Promise.all((conf.wwMeta || []).map(jsonWatcher(conf)))).forEach(w => watchers.push(w));
return watchers
} }

View File

@@ -5,6 +5,8 @@ export interface Conf {
bmaServers: ConfBMA[] bmaServers: ConfBMA[]
dprobeHeartbeats: ConfDprobeHeartbeat[] dprobeHeartbeats: ConfDprobeHeartbeat[]
webDiffServers: ConfWebDiff[] webDiffServers: ConfWebDiff[]
headServers: ConfHead[]
wwMeta: ConfWWMeta[]
mail: ConfMail mail: ConfMail
} }
@@ -17,6 +19,7 @@ export interface ConfWS2P {
} }
export interface ConfURL { export interface ConfURL {
name: string
address: string address: string
frequency: number frequency: number
} }
@@ -32,6 +35,15 @@ export interface ConfBMA extends ConfURL {
maxLate?: number maxLate?: number
} }
export interface ConfHead extends ConfURL {
observedPubkey: string
maxLateBlocks: number
}
export interface ConfWWMeta extends ConfURL {
maxLate: number
}
export interface ConfDprobeHeartbeat extends ConfURL{ export interface ConfDprobeHeartbeat extends ConfURL{
lastBeat: number lastBeat: number
} }

21
src/lib/types/state.ts Normal file
View File

@@ -0,0 +1,21 @@
export class Watcher {
private state: 'INIT'|'OK'|'FAILURE'|'RECOVERED'
private failureMessage?: string
constructor(public readonly name: string) {
this.state = "INIT"
}
public stateOK() {
this.state = "OK"
}
public stateRecovered() {
this.state = "RECOVERED"
}
public stateFailure(error: string) {
this.state = "FAILURE"
this.failureMessage = error
}
}

View File

@@ -1,4 +1,7 @@
export async function watcherLoop( import {Watcher} from "./types/state";
export function watcherLoop(
name: string,
connect: () => Promise<void>, connect: () => Promise<void>,
onConnectionClosed: () => Promise<void>, onConnectionClosed: () => Promise<void>,
reconnectionDelays: number[], reconnectionDelays: number[],
@@ -7,8 +10,9 @@ export async function watcherLoop(
onRestart: () => Promise<void>, onRestart: () => Promise<void>,
onRestartSuccess: () => Promise<void>, onRestartSuccess: () => Promise<void>,
onError: (e: Error) => Promise<void>, onError: (e: Error) => Promise<void>,
) { ): Watcher {
let watcher: Watcher = new Watcher(name)
let hasStarted = false let hasStarted = false
;(async () => { ;(async () => {
@@ -28,8 +32,10 @@ export async function watcherLoop(
if (!hasStarted) { if (!hasStarted) {
hasStarted = true hasStarted = true
await onStart() await onStart()
watcher.stateOK()
} else { } else {
await onRestartSuccess() await onRestartSuccess()
watcher.stateRecovered()
} }
// We reset the delay of reconnection // We reset the delay of reconnection
@@ -39,6 +45,7 @@ export async function watcherLoop(
} catch (e) { } catch (e) {
await onError(e) await onError(e)
watcher.stateFailure(e && e.message || e)
} }
// Wait before reconnecting // Wait before reconnecting
const waitingDelay = reconnectionDelays[Math.min(reconnectionDelays.length - 1, i)] const waitingDelay = reconnectionDelays[Math.min(reconnectionDelays.length - 1, i)]
@@ -47,4 +54,6 @@ export async function watcherLoop(
i++ i++
} }
})() })()
return watcher
} }

View File

@@ -2,15 +2,17 @@ import {watcherLoop} from "../../watcherLoop";
import {Conf, ConfURL} from "../../types/conf"; import {Conf, ConfURL} from "../../types/conf";
import Axios from "axios"; import Axios from "axios";
import {mail} from "../../mail"; import {mail} from "../../mail";
import {Watcher} from "../../types/state";
export function urlWatcher(conf: Conf, checkValidity: (data: any) => Promise<void>) { export function urlWatcher(conf: Conf, checkValidity: (data: any) => Promise<void>) {
return async (urlConf: ConfURL) => { return async (urlConf: ConfURL): Promise<Watcher> => {
let nodeDownRes: () => void let nodeDownRes: () => void
let nodeDownPromise: Promise<void> = new Promise(res => nodeDownRes = res) let nodeDownPromise: Promise<void> = new Promise(res => nodeDownRes = res)
await watcherLoop( return watcherLoop(
urlConf.name,
async () => { async () => {
let interval: NodeJS.Timer; let interval: NodeJS.Timer;
const res = await Axios.get(urlConf.address) const res = await Axios.get(urlConf.address)

View File

@@ -8,12 +8,13 @@ export function bmaWatcher(conf: Conf) {
return async (bmaServer: ConfBMA) => { return async (bmaServer: ConfBMA) => {
await urlWatcher(conf, async (data) => { return urlWatcher(conf, async (data) => {
const block = data as { medianTime: number } const block = data as { medianTime: number }
if (bmaServer.maxLate && moment().unix() - block.medianTime > bmaServer.maxLate) { if (bmaServer.maxLate && moment().unix() - block.medianTime > bmaServer.maxLate) {
throw 'Server is late' throw 'Server is late'
} }
})({ })({
name: `BMA ${bmaServer.address}`,
address: bmaServer.address + URL_PATH, address: bmaServer.address + URL_PATH,
frequency: bmaServer.frequency frequency: bmaServer.frequency
}) })

View File

@@ -0,0 +1,104 @@
import {Conf, ConfHead} from "../../types/conf";
import {urlWatcher} from '../abstract/url-watcher'
import {WS2PHead} from "duniter/app/modules/ws2p/lib/WS2PCluster";
function handleLateness(confHead: ConfHead, mainHeads: HeadMetric[], observedHead?: TrameWS2P) {
if (!mainHeads.length) {
throw 'No consensus found'
}
if (!observedHead) {
throw `Observed pubkey ${confHead.observedPubkey} not found in heads`
}
const matchingHeads = mainHeads.filter(h => h.blockstamp === observedHead.blockstamp)
if (!matchingHeads.length) {
// Check how much late is the node
const farAwayHeads = mainHeads.filter(h => h.blockNumber - observedHead.blockNumber >= confHead.maxLateBlocks)
if (farAwayHeads.length) {
throw `Observed pubkey is too late for ${farAwayHeads.length} consensus by at least ${confHead.maxLateBlocks}`
}
}
}
export function headWatcher(conf: Conf) {
const URL_PATH = '/network/ws2p/heads'
return async (confHead: ConfHead) => {
return urlWatcher(conf, async (data) => {
const heads = data as { heads: WS2PHead[] }
const mainHeads = getMain(heads)
const observedHead = getObserved(heads, confHead.observedPubkey)
handleLateness(confHead, mainHeads, observedHead)
})({
name: `head watcher ${confHead.address}`,
address: confHead.address + URL_PATH,
frequency: confHead.frequency
})
}
}
function getMain(heads: { heads: WS2PHead[] }): HeadMetric[] {
const mapByHash: { [k: string]: HeadMetric } = {}
for (const head of heads.heads) {
const trame = getTrame(head)
if (trame) {
mapByHash[trame.blockstamp] = mapByHash[trame.blockstamp] || { pubkeys: [], blockNumber: trame.blockNumber, blockstamp: trame.blockstamp }
mapByHash[trame.blockstamp].pubkeys.push(trame.pubkey)
}
else {
console.warn('Pas de trame pour %s', head)
}
}
return Object.keys(mapByHash).reduce((main, blockstamp) => {
const newMetric = mapByHash[blockstamp]
if (!main.length) {
return [newMetric]
}
const newMain: HeadMetric[] = main.filter(m => m.pubkeys.length >= newMetric.pubkeys.length)
let maxLen = main.length ? main[0].pubkeys.length : newMetric.pubkeys.length
if (newMetric.pubkeys.length >= maxLen) {
newMain.push(newMetric)
}
return newMain
}, [] as HeadMetric[])
}
function getObserved(heads: { heads: WS2PHead[] }, observedPubkey: string): TrameWS2P|undefined {
const trame = heads.heads
.map(getTrame)
.filter(trame => trame && trame.pubkey === observedPubkey)[0]
if (!trame) {
return undefined
}
return trame
}
function getTrame(head: WS2PHead): TrameWS2P|undefined {
if (head.messageV2) {
const [ protocol, type, trameVersion, pubkey, blockstamp, peerUuid, software, softVersion, n1, n2, n3 ] = head.messageV2.split(':')
const [ blockNumber, blockHash ] = blockstamp.split('-')
return { pubkey, blockstamp, blockNumber: parseInt(blockNumber), blockHash }
}
else if (head.message) {
const [ protocol, type, trameVersion, pubkey, blockstamp, peerUuid, software, softVersion, n1 ] = head.message.split(':')
const [ blockNumber, blockHash ] = blockstamp.split('-')
return { pubkey, blockstamp, blockNumber: parseInt(blockNumber), blockHash }
}
else {
return undefined
}
}
interface HeadMetric {
pubkeys: string[]
blockstamp: string
blockNumber: number
}
interface TrameWS2P {
pubkey: string
blockstamp: string
blockNumber: number
blockHash: string
}

View File

@@ -6,7 +6,7 @@ export function dprobeHeartbeat(conf: Conf) {
return async (dconf: ConfDprobeHeartbeat) => { return async (dconf: ConfDprobeHeartbeat) => {
await urlWatcher(conf, async (data) => { return urlWatcher(conf, async (data) => {
const last = moment(data, 'YYYY-MM-DD HH:mm:ss\n') const last = moment(data, 'YYYY-MM-DD HH:mm:ss\n')
const past = moment().diff(last) const past = moment().diff(last)
if (past > dconf.lastBeat) { if (past > dconf.lastBeat) {

View File

@@ -39,7 +39,8 @@ export function webDiffWatcher(conf: Conf) {
} }
} }
await watcherLoop( return watcherLoop(
`webdiff ${target}`,
async () => { async () => {
const data1 = await Axios.get(webDiffConf.file1) const data1 = await Axios.get(webDiffConf.file1)
const data2 = await Axios.get(webDiffConf.file2) const data2 = await Axios.get(webDiffConf.file2)

View File

@@ -0,0 +1,32 @@
import {Conf, ConfWWMeta} from "../../types/conf";
import {urlWatcher} from '../abstract/url-watcher'
function handleLateness(confHead: ConfWWMeta, data: WWMetaJson) {
const diff = Math.round(Date.now()/1000 - data.now)
if (diff >= confHead.maxLate) {
throw `WWMeta.json is late by ${diff}s (>= ${confHead.maxLate})`
}
}
export function jsonWatcher(conf: Conf) {
const URL_PATH = '/WWMeta.json'
return async (confWWMeta: ConfWWMeta) => {
return urlWatcher(conf, async (data) => {
handleLateness(confWWMeta, data)
})({
name: `WWMeta.json watcher ${confWWMeta.address}`,
address: confWWMeta.address + URL_PATH,
frequency: confWWMeta.frequency
})
}
}
interface WWMetaJson {
dossNb: number
block: number
now: number
computation_duration: number
}

View File

@@ -16,7 +16,8 @@ export function ws2pWatcher(conf: Conf) {
const keypair = new Key(keys.pub, keys.sec) const keypair = new Key(keys.pub, keys.sec)
const target = `${wserver.address} (${wserver.expectedPubkey.substr(0, 8)})` const target = `${wserver.address} (${wserver.expectedPubkey.substr(0, 8)})`
await watcherLoop( return watcherLoop(
`WS2P ${wserver.address}`,
async () => { async () => {
const localAuth = new WS2PPubkeyLocalAuth(wserver.currency, keypair, "", async () => true) const localAuth = new WS2PPubkeyLocalAuth(wserver.currency, keypair, "", async () => true)

View File

@@ -9,7 +9,8 @@
"noImplicitThis": true, "noImplicitThis": true,
"noImplicitAny": true, "noImplicitAny": true,
"noImplicitReturns": true, "noImplicitReturns": true,
"experimentalDecorators": true "experimentalDecorators": true,
"skipLibCheck": true
}, },
"include": [ "include": [
"src/*", "src/*",