[state]: watchers return a state
This commit is contained in:
@@ -5,16 +5,19 @@ import {ws2pWatcher} from "./watchers/ws2p/ws2p-watcher";
|
||||
import {bmaWatcher} from "./watchers/bma/bma-watcher";
|
||||
import {dprobeHeartbeat} from './watchers/dprobe/dprobe-heartbeat-watcher'
|
||||
import {webDiffWatcher} from "./watchers/webdiff/webdiff-watcher";
|
||||
import {ServiceState} from "./types/state";
|
||||
import {Watcher} from "./types/state";
|
||||
|
||||
export async function dwatch(confFile: string) {
|
||||
|
||||
const yml = fs.readFileSync(confFile, 'utf8')
|
||||
const conf = yaml.load(yml) as Conf
|
||||
const states: ServiceState[] = []
|
||||
const watchers: () => Promise<>
|
||||
await Promise.all((conf.ws2pServers || []).map(ws2pWatcher(conf)))
|
||||
await Promise.all((conf.bmaServers || []).map(bmaWatcher(conf)))
|
||||
await Promise.all((conf.dprobeHeartbeats || []).map(dprobeHeartbeat(conf)))
|
||||
await Promise.all((conf.webDiffServers || []).map(webDiffWatcher(conf)))
|
||||
const watchers: Watcher[] = [];
|
||||
(await Promise.all((conf.ws2pServers || []).map(ws2pWatcher(conf)))).forEach(w => watchers.push(w));
|
||||
(await Promise.all((conf.bmaServers || []).map(bmaWatcher(conf)))).forEach(w => watchers.push(w));
|
||||
(await Promise.all((conf.dprobeHeartbeats || []).map(dprobeHeartbeat(conf)))).forEach(w => watchers.push(w));
|
||||
(await Promise.all((conf.webDiffServers || []).map(webDiffWatcher(conf)))).forEach(w => watchers.push(w));
|
||||
return watchers
|
||||
}
|
||||
|
||||
// (await Promise.all((conf.headServers || []).map(headWatcher(conf)))).forEach(w => watchers.push(w));
|
||||
// (await Promise.all((conf.wwMeta || []).map(jsonWatcher(conf)))).forEach(w => watchers.push(w));
|
||||
@@ -17,6 +17,7 @@ export interface ConfWS2P {
|
||||
}
|
||||
|
||||
export interface ConfURL {
|
||||
name: string
|
||||
address: string
|
||||
frequency: number
|
||||
}
|
||||
|
||||
@@ -1,3 +1,21 @@
|
||||
export interface ServiceState {
|
||||
state: 'INIT'|'OK'|'FAILURE'
|
||||
}
|
||||
export class Watcher {
|
||||
private state: 'INIT'|'OK'|'FAILURE'|'RECOVERED'
|
||||
private failureMessage?: string
|
||||
|
||||
constructor(public readonly name: string) {
|
||||
this.state = "INIT"
|
||||
}
|
||||
|
||||
public stateOK() {
|
||||
this.state = "OK"
|
||||
}
|
||||
|
||||
public stateRecovered() {
|
||||
this.state = "RECOVERED"
|
||||
}
|
||||
|
||||
public stateFailure(error: string) {
|
||||
this.state = "FAILURE"
|
||||
this.failureMessage = error
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import {ServiceState} from "./types/state";
|
||||
import {Watcher} from "./types/state";
|
||||
|
||||
export function watcherLoop(
|
||||
name: string,
|
||||
connect: () => Promise<void>,
|
||||
onConnectionClosed: () => Promise<void>,
|
||||
reconnectionDelays: number[],
|
||||
@@ -9,9 +10,9 @@ export function watcherLoop(
|
||||
onRestart: () => Promise<void>,
|
||||
onRestartSuccess: () => Promise<void>,
|
||||
onError: (e: Error) => Promise<void>,
|
||||
): ServiceState {
|
||||
): Watcher {
|
||||
|
||||
let state: ServiceState = { state: 'INIT' }
|
||||
let watcher: Watcher = new Watcher(name)
|
||||
let hasStarted = false
|
||||
|
||||
;(async () => {
|
||||
@@ -31,8 +32,10 @@ export function watcherLoop(
|
||||
if (!hasStarted) {
|
||||
hasStarted = true
|
||||
await onStart()
|
||||
watcher.stateOK()
|
||||
} else {
|
||||
await onRestartSuccess()
|
||||
watcher.stateRecovered()
|
||||
}
|
||||
|
||||
// We reset the delay of reconnection
|
||||
@@ -42,6 +45,7 @@ export function watcherLoop(
|
||||
|
||||
} catch (e) {
|
||||
await onError(e)
|
||||
watcher.stateFailure(e && e.message || e)
|
||||
}
|
||||
// Wait before reconnecting
|
||||
const waitingDelay = reconnectionDelays[Math.min(reconnectionDelays.length - 1, i)]
|
||||
@@ -50,4 +54,6 @@ export function watcherLoop(
|
||||
i++
|
||||
}
|
||||
})()
|
||||
|
||||
return watcher
|
||||
}
|
||||
|
||||
@@ -2,15 +2,17 @@ import {watcherLoop} from "../../watcherLoop";
|
||||
import {Conf, ConfURL} from "../../types/conf";
|
||||
import Axios from "axios";
|
||||
import {mail} from "../../mail";
|
||||
import {Watcher} from "../../types/state";
|
||||
|
||||
export function urlWatcher(conf: Conf, checkValidity: (data: any) => Promise<void>) {
|
||||
|
||||
return async (urlConf: ConfURL) => {
|
||||
return async (urlConf: ConfURL): Promise<Watcher> => {
|
||||
|
||||
let nodeDownRes: () => void
|
||||
let nodeDownPromise: Promise<void> = new Promise(res => nodeDownRes = res)
|
||||
|
||||
await watcherLoop(
|
||||
return watcherLoop(
|
||||
urlConf.name,
|
||||
async () => {
|
||||
let interval: NodeJS.Timer;
|
||||
const res = await Axios.get(urlConf.address)
|
||||
|
||||
@@ -8,12 +8,13 @@ export function bmaWatcher(conf: Conf) {
|
||||
|
||||
return async (bmaServer: ConfBMA) => {
|
||||
|
||||
await urlWatcher(conf, async (data) => {
|
||||
return urlWatcher(conf, async (data) => {
|
||||
const block = data as { medianTime: number }
|
||||
if (bmaServer.maxLate && moment().unix() - block.medianTime > bmaServer.maxLate) {
|
||||
throw 'Server is late'
|
||||
}
|
||||
})({
|
||||
name: `BMA ${bmaServer.address}`,
|
||||
address: bmaServer.address + URL_PATH,
|
||||
frequency: bmaServer.frequency
|
||||
})
|
||||
|
||||
@@ -6,7 +6,7 @@ export function dprobeHeartbeat(conf: Conf) {
|
||||
|
||||
return async (dconf: ConfDprobeHeartbeat) => {
|
||||
|
||||
await urlWatcher(conf, async (data) => {
|
||||
return urlWatcher(conf, async (data) => {
|
||||
const last = moment(data, 'YYYY-MM-DD HH:mm:ss\n')
|
||||
const past = moment().diff(last)
|
||||
if (past > dconf.lastBeat) {
|
||||
|
||||
@@ -39,7 +39,8 @@ export function webDiffWatcher(conf: Conf) {
|
||||
}
|
||||
}
|
||||
|
||||
await watcherLoop(
|
||||
return watcherLoop(
|
||||
`webdiff ${target}`,
|
||||
async () => {
|
||||
const data1 = await Axios.get(webDiffConf.file1)
|
||||
const data2 = await Axios.get(webDiffConf.file2)
|
||||
|
||||
@@ -16,7 +16,8 @@ export function ws2pWatcher(conf: Conf) {
|
||||
const keypair = new Key(keys.pub, keys.sec)
|
||||
const target = `${wserver.address} (${wserver.expectedPubkey.substr(0, 8)})`
|
||||
|
||||
await watcherLoop(
|
||||
return watcherLoop(
|
||||
`WS2P ${wserver.address}`,
|
||||
async () => {
|
||||
|
||||
const localAuth = new WS2PPubkeyLocalAuth(wserver.currency, keypair, "", async () => true)
|
||||
|
||||
Reference in New Issue
Block a user