feat: Function to parse node statuses from report

This commit is contained in:
Cristian Ditaputratama 2024-06-03 22:13:12 +07:00
parent 85f3169689
commit ced90fde8e
Signed by: ditatompel
GPG key ID: 31D3D06D77950979

View file

@ -13,7 +13,7 @@ import (
) )
type QueryLogs struct { type QueryLogs struct {
NodeID int // 0 fpr all, >0 for specific node NodeID int // 0 for all, >0 for specific node
Status int // -1 for all, 0 for failed, 1 for success Status int // -1 for all, 0 for failed, 1 for success
FailedReason string // empty for all, if not empty, will be used as search from failed_reaso FailedReason string // empty for all, if not empty, will be used as search from failed_reaso
@ -162,11 +162,41 @@ func (r *MoneroRepo) GiveJob(acceptTor int) (Node, error) {
type ProbeReport struct { type ProbeReport struct {
TookTime float64 `json:"took_time"` TookTime float64 `json:"took_time"`
Message string `json:"message"` Message string `json:"message"`
NodeInfo Node `json:"node_info"` Node Node `json:"node"`
} }
type nodeStats struct {
Online uint `db:"online"` // total count online
Offline uint `db:"offline"` // total count offline
TotalFetched uint `db:"total_fetched"`
}
// Create new status indicator based from LastCheckStatus and recent IsAvailable status
func (p *ProbeReport) parseStatuses() string {
var s [5]int
if err := p.Node.LastCheckStatus.Unmarshal(&s); err != nil {
slog.Warn(err.Error())
s = [5]int{2, 2, 2, 2, 2}
}
si := 0 // set default "status indicator" to offline
if p.Node.IsAvailable {
si = 1
}
ns := s[1:]
ns = append(ns, si)
j, err := json.Marshal(ns)
if err != nil {
slog.Warn(err.Error())
}
return string(j)
}
// Process report data from probers
func (r *MoneroRepo) ProcessJob(report ProbeReport, proberId int64) error { func (r *MoneroRepo) ProcessJob(report ProbeReport, proberId int64) error {
if report.NodeInfo.ID == 0 { if report.Node.ID == 0 {
return errors.New("Invalid node") return errors.New("Invalid node")
} }
@ -199,14 +229,14 @@ func (r *MoneroRepo) ProcessJob(report ProbeReport, proberId int64) error {
? ?
)` )`
_, err := r.db.Exec(qInsertLog, _, err := r.db.Exec(qInsertLog,
report.NodeInfo.ID, report.Node.ID,
proberId, proberId,
report.NodeInfo.IsAvailable, report.Node.IsAvailable,
report.NodeInfo.Height, report.Node.Height,
report.NodeInfo.AdjustedTime, report.Node.AdjustedTime,
report.NodeInfo.DatabaseSize, report.Node.DatabaseSize,
report.NodeInfo.Difficulty, report.Node.Difficulty,
report.NodeInfo.EstimateFee, report.Node.EstimateFee,
now.Unix(), now.Unix(),
report.Message, report.Message,
report.TookTime) report.TookTime)
@ -216,11 +246,7 @@ func (r *MoneroRepo) ProcessJob(report ProbeReport, proberId int64) error {
limitTs := now.AddDate(0, -1, 0).Unix() limitTs := now.AddDate(0, -1, 0).Unix()
nodeStats := struct { var stats nodeStats
OnlineCount uint `db:"online"`
OfflineCount uint `db:"offline"`
TotalFetched uint `db:"total_fetched"`
}{}
qstats := ` qstats := `
SELECT SELECT
@ -232,48 +258,31 @@ func (r *MoneroRepo) ProcessJob(report ProbeReport, proberId int64) error {
WHERE WHERE
node_id = ? node_id = ?
AND date_checked > ?` AND date_checked > ?`
if err := r.db.Get(&nodeStats, qstats, report.NodeInfo.ID, limitTs); err != nil { if err := r.db.Get(&stats, qstats, report.Node.ID, limitTs); err != nil {
slog.Warn(err.Error()) slog.Warn(err.Error())
} }
avgUptime := (float64(nodeStats.OnlineCount) / float64(nodeStats.TotalFetched)) * 100 avgUptime := (float64(stats.Online) / float64(stats.TotalFetched)) * 100
report.NodeInfo.Uptime = math.Ceil(avgUptime*100) / 100 report.Node.Uptime = math.Ceil(avgUptime*100) / 100
var statuses [5]int statuses := report.parseStatuses()
errUnmarshal := report.NodeInfo.LastCheckStatus.Unmarshal(&statuses)
if errUnmarshal != nil {
fmt.Println("Warning", errUnmarshal.Error())
statuses = [5]int{2, 2, 2, 2, 2}
}
nodeAvailable := 0
if report.NodeInfo.IsAvailable {
nodeAvailable = 1
}
newStatuses := statuses[1:]
newStatuses = append(newStatuses, nodeAvailable)
statuesValueToDb, errMarshalStatus := json.Marshal(newStatuses)
if errMarshalStatus != nil {
fmt.Println("WARN", errMarshalStatus.Error())
}
// recheck IP // recheck IP
if report.NodeInfo.IP != "" { if report.Node.IP != "" {
if ipInfo, errGeoIp := geo.Info(report.NodeInfo.IP); errGeoIp != nil { if ipInfo, errGeoIp := geo.Info(report.Node.IP); errGeoIp != nil {
fmt.Println("WARN:", errGeoIp.Error()) fmt.Println("WARN:", errGeoIp.Error())
} else { } else {
report.NodeInfo.ASN = ipInfo.ASN report.Node.ASN = ipInfo.ASN
report.NodeInfo.ASNName = ipInfo.ASNOrg report.Node.ASNName = ipInfo.ASNOrg
report.NodeInfo.CountryCode = ipInfo.CountryCode report.Node.CountryCode = ipInfo.CountryCode
report.NodeInfo.CountryName = ipInfo.CountryName report.Node.CountryName = ipInfo.CountryName
report.NodeInfo.City = ipInfo.City report.Node.City = ipInfo.City
report.NodeInfo.Longitude = ipInfo.Longitude report.Node.Longitude = ipInfo.Longitude
report.NodeInfo.Latitude = ipInfo.Latitude report.Node.Latitude = ipInfo.Latitude
} }
} }
if report.NodeInfo.IsAvailable { if report.Node.IsAvailable {
update := ` update := `
UPDATE tbl_node UPDATE tbl_node
SET SET
@ -298,25 +307,25 @@ func (r *MoneroRepo) ProcessJob(report ProbeReport, proberId int64) error {
WHERE WHERE
id = ?` id = ?`
_, err := r.db.Exec(update, _, err := r.db.Exec(update,
nodeAvailable, 1,
report.NodeInfo.Nettype, report.Node.Nettype,
report.NodeInfo.Height, report.Node.Height,
report.NodeInfo.AdjustedTime, report.Node.AdjustedTime,
report.NodeInfo.DatabaseSize, report.Node.DatabaseSize,
report.NodeInfo.Difficulty, report.Node.Difficulty,
report.NodeInfo.Version, report.Node.Version,
report.NodeInfo.Uptime, report.Node.Uptime,
report.NodeInfo.EstimateFee, report.Node.EstimateFee,
report.NodeInfo.IP, report.Node.IP,
report.NodeInfo.ASN, report.Node.ASN,
report.NodeInfo.ASNName, report.Node.ASNName,
report.NodeInfo.CountryCode, report.Node.CountryCode,
report.NodeInfo.CountryName, report.Node.CountryName,
report.NodeInfo.City, report.Node.City,
now.Unix(), now.Unix(),
string(statuesValueToDb), statuses,
report.NodeInfo.CORSCapable, report.Node.CORSCapable,
report.NodeInfo.ID) report.Node.ID)
if err != nil { if err != nil {
slog.Warn(err.Error()) slog.Warn(err.Error())
} }
@ -330,14 +339,14 @@ func (r *MoneroRepo) ProcessJob(report ProbeReport, proberId int64) error {
last_check_status = ? last_check_status = ?
WHERE WHERE
id = ?` id = ?`
if _, err := r.db.Exec(u, nodeAvailable, report.NodeInfo.Uptime, now.Unix(), string(statuesValueToDb), report.NodeInfo.ID); err != nil { if _, err := r.db.Exec(u, 0, report.Node.Uptime, now.Unix(), statuses, report.Node.ID); err != nil {
slog.Warn(err.Error()) slog.Warn(err.Error())
} }
} }
if avgUptime <= 0 && nodeStats.TotalFetched > 300 { if avgUptime <= 0 && stats.TotalFetched > 300 {
fmt.Println("Deleting Monero node (0% uptime from > 300 records)") fmt.Println("Deleting Monero node (0% uptime from > 300 records)")
if err := r.Delete(report.NodeInfo.ID); err != nil { if err := r.Delete(report.Node.ID); err != nil {
slog.Warn(err.Error()) slog.Warn(err.Error())
} }
} }