[add] head-watcher
This commit is contained in:
6
app.yml
6
app.yml
@@ -23,6 +23,12 @@ webDiffServers:
|
|||||||
# file2: https://wotwizard2.cgeek.fr/WWMeta.json
|
# file2: https://wotwizard2.cgeek.fr/WWMeta.json
|
||||||
# frequency: 60000
|
# frequency: 60000
|
||||||
|
|
||||||
|
headServers:
|
||||||
|
- address: https://g1.cgeek.fr
|
||||||
|
frequency: 60000 # 1'
|
||||||
|
maxLateBlocks: 3
|
||||||
|
observedPubkey: A5LQXCkx8b6rzppfqdqeHbKPDGmKZtRcqwxP4BSeag5r
|
||||||
|
|
||||||
mail:
|
mail:
|
||||||
enabled: false
|
enabled: false
|
||||||
host: smtp.sparkpostmail.com
|
host: smtp.sparkpostmail.com
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ export interface Conf {
|
|||||||
bmaServers: ConfBMA[]
|
bmaServers: ConfBMA[]
|
||||||
dprobeHeartbeats: ConfDprobeHeartbeat[]
|
dprobeHeartbeats: ConfDprobeHeartbeat[]
|
||||||
webDiffServers: ConfWebDiff[]
|
webDiffServers: ConfWebDiff[]
|
||||||
|
headServers: ConfHead[]
|
||||||
mail: ConfMail
|
mail: ConfMail
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -33,6 +34,11 @@ export interface ConfBMA extends ConfURL {
|
|||||||
maxLate?: number
|
maxLate?: number
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface ConfHead extends ConfURL {
|
||||||
|
observedPubkey: string
|
||||||
|
maxLateBlocks: number
|
||||||
|
}
|
||||||
|
|
||||||
export interface ConfDprobeHeartbeat extends ConfURL{
|
export interface ConfDprobeHeartbeat extends ConfURL{
|
||||||
lastBeat: number
|
lastBeat: number
|
||||||
}
|
}
|
||||||
|
|||||||
104
src/lib/watchers/bma/head-watcher.ts
Normal file
104
src/lib/watchers/bma/head-watcher.ts
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
import {Conf, ConfHead} from "../../types/conf";
|
||||||
|
import {urlWatcher} from '../abstract/url-watcher'
|
||||||
|
import {WS2PHead} from "duniter/app/modules/ws2p/lib/WS2PCluster";
|
||||||
|
|
||||||
|
function handleLateness(confHead: ConfHead, mainHeads: HeadMetric[], observedHead?: TrameWS2P) {
|
||||||
|
if (!mainHeads.length) {
|
||||||
|
throw 'No consensus found'
|
||||||
|
}
|
||||||
|
if (!observedHead) {
|
||||||
|
throw `Observed pubkey ${confHead.observedPubkey} not found in heads`
|
||||||
|
}
|
||||||
|
const matchingHeads = mainHeads.filter(h => h.blockstamp === observedHead.blockstamp)
|
||||||
|
if (!matchingHeads.length) {
|
||||||
|
// Check how much late is the node
|
||||||
|
const farAwayHeads = mainHeads.filter(h => h.blockNumber - observedHead.blockNumber >= confHead.maxLateBlocks)
|
||||||
|
if (farAwayHeads.length) {
|
||||||
|
throw `Observed pubkey is too late for ${farAwayHeads.length} consensus by at least ${confHead.maxLateBlocks}`
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function headWatcher(conf: Conf) {
|
||||||
|
|
||||||
|
const URL_PATH = '/network/ws2p/heads'
|
||||||
|
|
||||||
|
return async (confHead: ConfHead) => {
|
||||||
|
|
||||||
|
return urlWatcher(conf, async (data) => {
|
||||||
|
const heads = data as { heads: WS2PHead[] }
|
||||||
|
const mainHeads = getMain(heads)
|
||||||
|
const observedHead = getObserved(heads, confHead.observedPubkey)
|
||||||
|
handleLateness(confHead, mainHeads, observedHead)
|
||||||
|
})({
|
||||||
|
name: `head watcher ${confHead.address}`,
|
||||||
|
address: confHead.address + URL_PATH,
|
||||||
|
frequency: confHead.frequency
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function getMain(heads: { heads: WS2PHead[] }): HeadMetric[] {
|
||||||
|
const mapByHash: { [k: string]: HeadMetric } = {}
|
||||||
|
for (const head of heads.heads) {
|
||||||
|
const trame = getTrame(head)
|
||||||
|
if (trame) {
|
||||||
|
mapByHash[trame.blockstamp] = mapByHash[trame.blockstamp] || { pubkeys: [], blockNumber: trame.blockNumber, blockstamp: trame.blockstamp }
|
||||||
|
mapByHash[trame.blockstamp].pubkeys.push(trame.pubkey)
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
console.warn('Pas de trame pour %s', head)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Object.keys(mapByHash).reduce((main, blockstamp) => {
|
||||||
|
const newMetric = mapByHash[blockstamp]
|
||||||
|
if (!main.length) {
|
||||||
|
return [newMetric]
|
||||||
|
}
|
||||||
|
const newMain: HeadMetric[] = main.filter(m => m.pubkeys.length >= newMetric.pubkeys.length)
|
||||||
|
let maxLen = main.length ? main[0].pubkeys.length : newMetric.pubkeys.length
|
||||||
|
if (newMetric.pubkeys.length >= maxLen) {
|
||||||
|
newMain.push(newMetric)
|
||||||
|
}
|
||||||
|
return newMain
|
||||||
|
}, [] as HeadMetric[])
|
||||||
|
}
|
||||||
|
|
||||||
|
function getObserved(heads: { heads: WS2PHead[] }, observedPubkey: string): TrameWS2P|undefined {
|
||||||
|
const trame = heads.heads
|
||||||
|
.map(getTrame)
|
||||||
|
.filter(trame => trame && trame.pubkey === observedPubkey)[0]
|
||||||
|
if (!trame) {
|
||||||
|
return undefined
|
||||||
|
}
|
||||||
|
return trame
|
||||||
|
}
|
||||||
|
|
||||||
|
function getTrame(head: WS2PHead): TrameWS2P|undefined {
|
||||||
|
if (head.messageV2) {
|
||||||
|
const [ protocol, type, trameVersion, pubkey, blockstamp, peerUuid, software, softVersion, n1, n2, n3 ] = head.messageV2.split(':')
|
||||||
|
const [ blockNumber, blockHash ] = blockstamp.split('-')
|
||||||
|
return { pubkey, blockstamp, blockNumber: parseInt(blockNumber), blockHash }
|
||||||
|
}
|
||||||
|
else if (head.message) {
|
||||||
|
const [ protocol, type, trameVersion, pubkey, blockstamp, peerUuid, software, softVersion, n1 ] = head.message.split(':')
|
||||||
|
const [ blockNumber, blockHash ] = blockstamp.split('-')
|
||||||
|
return { pubkey, blockstamp, blockNumber: parseInt(blockNumber), blockHash }
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return undefined
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
interface HeadMetric {
|
||||||
|
pubkeys: string[]
|
||||||
|
blockstamp: string
|
||||||
|
blockNumber: number
|
||||||
|
}
|
||||||
|
|
||||||
|
interface TrameWS2P {
|
||||||
|
pubkey: string
|
||||||
|
blockstamp: string
|
||||||
|
blockNumber: number
|
||||||
|
blockHash: string
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user