From 1baddfd2d1d323caead1ba0950df7439c4e20a85 Mon Sep 17 00:00:00 2001 From: ditatompel Date: Sun, 5 May 2024 01:42:47 +0700 Subject: [PATCH] Process submitted job from prober --- cmd/probe.go | 55 +++++++++++++-- handler/middlewares.go | 2 +- handler/response.go | 27 ++++++++ handler/routes.go | 1 + internal/repo/monero.go | 92 +++++++++++++++++++++++++- tools/resources/database/structure.sql | 22 +++++- 6 files changed, 188 insertions(+), 11 deletions(-) diff --git a/cmd/probe.go b/cmd/probe.go index 0334f4f..193f6d7 100644 --- a/cmd/probe.go +++ b/cmd/probe.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "net" @@ -21,7 +22,8 @@ import ( const RPCUserAgent = "ditatombot/0.0.1 (Monero RPC Monitoring; Contact: ditatombot@ditatompel.com)" type proberClient struct { - config *config.App + config *config.App + message string } func newProber(cfg *config.App) *proberClient { @@ -146,19 +148,22 @@ func (p *proberClient) fetchNode(node repo.MoneroNode) (repo.MoneroNode, error) resp, err := client.Do(req) if err != nil { - // TODO: Post report to server + p.message = err.Error() + p.reportResult(node, time.Since(startTime).Seconds()) return node, err } defer resp.Body.Close() if resp.StatusCode != 200 { - // TODO: Post report to server - return node, fmt.Errorf("status code: %d", resp.StatusCode) + p.message = fmt.Sprintf("status code: %d", resp.StatusCode) + p.reportResult(node, time.Since(startTime).Seconds()) + return node, errors.New(p.message) } body, err := io.ReadAll(resp.Body) if err != nil { - // TODO: Post report to server + p.message = err.Error() + p.reportResult(node, time.Since(startTime).Seconds()) return node, err } @@ -167,7 +172,8 @@ func (p *proberClient) fetchNode(node repo.MoneroNode) (repo.MoneroNode, error) }{} if err := json.Unmarshal(body, &reportNode); err != nil { - // TODO: Post report to server + p.message = err.Error() + p.reportResult(node, time.Since(startTime).Seconds()) return node, err } if reportNode.Status == "OK" { @@ -177,6 +183,7 @@ func (p *proberClient) fetchNode(node repo.MoneroNode) (repo.MoneroNode, error) node.AdjustedTime = reportNode.AdjustedTime node.DatabaseSize = reportNode.DatabaseSize node.Difficulty = reportNode.Difficulty + node.Height = reportNode.Height node.Version = reportNode.Version if resp.Header.Get("Access-Control-Allow-Origin") == "*" || resp.Header.Get("Access-Control-Allow-Origin") == "https://xmr.ditatompel.com" { @@ -233,9 +240,45 @@ func (p *proberClient) fetchNode(node repo.MoneroNode) (repo.MoneroNode, error) node.EstimateFee = feeEstimate.Result.Fee fmt.Printf("Took %f seconds\n", tookTime) + + if err := p.reportResult(node, tookTime); err != nil { + return node, err + } return node, nil } +func (p *proberClient) reportResult(node repo.MoneroNode, tookTime float64) error { + jsonData, err := json.Marshal(repo.ProbeReport{ + TookTime: tookTime, + Message: p.message, + NodeInfo: node, + }) + if err != nil { + return err + } + + endpoint := fmt.Sprintf("%s/api/v1/job", p.config.ServerEndpoint) + req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(jsonData)) + if err != nil { + return err + } + req.Header.Add("X-Prober-Api-Key", p.config.ApiKey) + req.Header.Set("Content-Type", "application/json; charset=UTF-8") + req.Header.Set("User-Agent", RPCUserAgent) + + client := &http.Client{Timeout: 60 * time.Second} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return fmt.Errorf("status code: %d", resp.StatusCode) + } + return nil +} + // for debug purposes func prettyPrint(i interface{}) string { s, _ := json.MarshalIndent(i, "", "\t") diff --git a/handler/middlewares.go b/handler/middlewares.go index b0b616c..9718943 100644 --- a/handler/middlewares.go +++ b/handler/middlewares.go @@ -41,6 +41,6 @@ func CheckProber(c *fiber.Ctx) error { }) } - c.Locals("prober", prober) + c.Locals("prober_id", prober.Id) return c.Next() } diff --git a/handler/response.go b/handler/response.go index 0eb6e57..c35e210 100644 --- a/handler/response.go +++ b/handler/response.go @@ -191,6 +191,33 @@ func GiveJob(c *fiber.Ctx) error { }) } +func ProcessJob(c *fiber.Ctx) error { + moneroRepo := repo.NewMoneroRepo(database.GetDB()) + report := repo.ProbeReport{} + + if err := c.BodyParser(&report); err != nil { + return c.Status(fiber.StatusUnprocessableEntity).JSON(fiber.Map{ + "status": "error", + "message": err.Error(), + "data": nil, + }) + } + + if err := moneroRepo.ProcessJob(report, c.Locals("prober_id").(int64)); err != nil { + return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{ + "status": "error", + "message": err.Error(), + "data": nil, + }) + } + + return c.JSON(fiber.Map{ + "status": "ok", + "message": "Success", + "data": nil, + }) +} + func Crons(c *fiber.Ctx) error { cronRepo := repo.NewCron(database.GetDB()) diff --git a/handler/routes.go b/handler/routes.go index c6aa7fe..164737b 100644 --- a/handler/routes.go +++ b/handler/routes.go @@ -17,5 +17,6 @@ func V1Api(app *fiber.App) { v1.Get("/nodes", MoneroNodes) v1.Post("/nodes", AddNode) v1.Get("/job", CheckProber, GiveJob) + v1.Post("/job", CheckProber, ProcessJob) v1.Get("/crons", Crons) } diff --git a/internal/repo/monero.go b/internal/repo/monero.go index 51a45b6..640df27 100644 --- a/internal/repo/monero.go +++ b/internal/repo/monero.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "net" "slices" "strings" @@ -18,6 +19,7 @@ type MoneroRepository interface { Add(protocol string, host string, port uint) error Nodes(q MoneroQueryParams) (MoneroNodes, error) GiveJob(acceptTor int) (MoneroNode, error) + ProcessJob(report ProbeReport, proberId int64) error } type MoneroRepo struct { @@ -43,7 +45,7 @@ type MoneroNode struct { Difficulty uint `json:"difficulty" db:"difficulty"` Version string `json:"version" db:"version"` Status string `json:"status,omitempty"` - Uptime float32 `json:"uptime" db:"uptime"` + Uptime float64 `json:"uptime" db:"uptime"` EstimateFee uint `json:"estimate_fee" db:"estimate_fee"` Asn uint `json:"asn" db:"asn"` AsnName string `json:"asn_name" db:"asn_name"` @@ -206,8 +208,8 @@ func (repo *MoneroRepo) GiveJob(acceptTor int) (MoneroNode, error) { node := MoneroNode{} - query := fmt.Sprintf(`SELECT id, hostname, port, protocol, is_tor FROM tbl_node %s ORDER BY last_checked ASC LIMIT 1`, where) - err := repo.db.QueryRow(query, queryParams...).Scan(&node.Id, &node.Hostname, &node.Port, &node.Protocol, &node.IsTor) + query := fmt.Sprintf(`SELECT id, hostname, port, protocol, is_tor, last_check_status FROM tbl_node %s ORDER BY last_checked ASC LIMIT 1`, where) + err := repo.db.QueryRow(query, queryParams...).Scan(&node.Id, &node.Hostname, &node.Port, &node.Protocol, &node.IsTor, &node.LastCheckStatus) if err != nil { return node, err } @@ -220,3 +222,87 @@ func (repo *MoneroRepo) GiveJob(acceptTor int) (MoneroNode, error) { return node, nil } + +type ProbeReport struct { + TookTime float64 `json:"took_time"` + Message string `json:"message"` + NodeInfo MoneroNode `json:"node_info"` +} + +func (repo *MoneroRepo) ProcessJob(report ProbeReport, proberId int64) error { + if report.NodeInfo.Id == 0 { + return errors.New("Invalid node") + } + + qInsertLog := `INSERT INTO tbl_probe_log (node_id, prober_id, is_available, height, adjusted_time, database_size, difficulty, estimate_fee, date_checked, failed_reason, fetch_runtime) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)` + + _, err := repo.db.Exec(qInsertLog, report.NodeInfo.Id, proberId, report.NodeInfo.IsAvailable, report.NodeInfo.Height, report.NodeInfo.AdjustedTime, report.NodeInfo.DatabaseSize, report.NodeInfo.Difficulty, report.NodeInfo.EstimateFee, time.Now().Unix(), report.Message, report.TookTime) + if err != nil { + return err + } + + now := time.Now() + limitTs := now.AddDate(0, -1, 0).Unix() + + nodeStats := struct { + OnlineCount uint `db:"online"` + OfflineCount uint `db:"offline"` + TotalFetched uint `db:"total_fetched"` + }{} + + qstats := `SELECT + SUM(if(is_available='1',1,0)) AS online, + SUM(if(is_available='0',1,0)) AS offline, + SUM(if(id='0',0,1)) AS total_fetched FROM + tbl_probe_log WHERE node_id = ? AND date_checked > ?` + repo.db.Get(&nodeStats, qstats, report.NodeInfo.Id, limitTs) + + avgUptime := (float64(nodeStats.OnlineCount) / float64(nodeStats.TotalFetched)) * 100 + report.NodeInfo.Uptime = math.Ceil(avgUptime*100) / 100 + + var statuses [5]int + errUnmarshal := report.NodeInfo.LastCheckStatus.Unmarshal(&statuses) + if errUnmarshal != nil { + fmt.Println("Warning", errUnmarshal.Error()) + statuses = [5]int{2, 2, 2, 2, 2} + } + + nodeAvailable := 0 + + if report.NodeInfo.IsAvailable { + nodeAvailable = 1 + } + newStatuses := statuses[1:] + newStatuses = append(newStatuses, nodeAvailable) + statuesValueToDb, errMarshalStatus := json.Marshal(newStatuses) + if errMarshalStatus != nil { + fmt.Println("WARN", errMarshalStatus.Error()) + } + + // recheck IP + // TODO: Fill the data using GeoIP + + // if report.NodeInfo.Ip != "" { + // ipInfo, errGeoIp := GetGeoIpInfo(report.NodeInfo.Ip) + // if errGeoIp == nil { + // report.NodeInfo.Asn = ipInfo.Asn + // report.NodeInfo.AsnName = ipInfo.AsnOrg + // report.NodeInfo.CountryCode = ipInfo.CountryCode + // report.NodeInfo.CountryName = ipInfo.CountryName + // report.NodeInfo.City = ipInfo.City + // } + // } + + update := `UPDATE tbl_node SET + is_available = ?, nettype = ?, height = ?, adjusted_time = ?, + database_size = ?, difficulty = ?, version = ?, uptime = ?, + estimate_fee = ?, ip_addr = ?, asn = ?, asn_name = ?, country = ?, + country_name = ?, city = ?, last_checked = ?, last_check_status = ?, + cors_capable = ? + WHERE id = ?` + + _, err = repo.db.Exec(update, + nodeAvailable, report.NodeInfo.NetType, report.NodeInfo.Height, report.NodeInfo.AdjustedTime, report.NodeInfo.DatabaseSize, report.NodeInfo.Difficulty, report.NodeInfo.Version, report.NodeInfo.Uptime, report.NodeInfo.EstimateFee, report.NodeInfo.Ip, report.NodeInfo.Asn, report.NodeInfo.AsnName, report.NodeInfo.CountryCode, report.NodeInfo.CountryName, report.NodeInfo.City, now.Unix(), string(statuesValueToDb), report.NodeInfo.CorsCapable, report.NodeInfo.Id) + + return err +} diff --git a/tools/resources/database/structure.sql b/tools/resources/database/structure.sql index e94a886..cf8eca5 100644 --- a/tools/resources/database/structure.sql +++ b/tools/resources/database/structure.sql @@ -54,7 +54,7 @@ CREATE TABLE `tbl_node` ( `adjusted_time` bigint(20) unsigned NOT NULL DEFAULT 0, `database_size` bigint(20) unsigned NOT NULL DEFAULT 0, `difficulty` bigint(20) unsigned NOT NULL DEFAULT 0, - `version` varchar(200) NOT NULL, + `version` varchar(200) NOT NULL DEFAULT '', `uptime` float(5,2) unsigned NOT NULL DEFAULT 0.00, `estimate_fee` int(9) unsigned NOT NULL DEFAULT 0, `ip_addr` varchar(200) NOT NULL, @@ -72,6 +72,26 @@ CREATE TABLE `tbl_node` ( PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; /*!40101 SET character_set_client = @saved_cs_client */; +DROP TABLE IF EXISTS `tbl_probe_log`; +/*!40101 SET @saved_cs_client = @@character_set_client */; +/*!40101 SET character_set_client = utf8 */; +CREATE TABLE `tbl_probe_log` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, + `node_id` bigint(20) unsigned NOT NULL DEFAULT 0, + `prober_id` int(9) unsigned NOT NULL DEFAULT 0, + `is_available` tinyint(1) unsigned NOT NULL DEFAULT 0, + `height` bigint(20) unsigned NOT NULL DEFAULT 0, + `adjusted_time` bigint(20) unsigned NOT NULL DEFAULT 0, + `database_size` bigint(20) unsigned NOT NULL DEFAULT 0, + `difficulty` bigint(20) unsigned NOT NULL DEFAULT 0, + `estimate_fee` int(9) unsigned NOT NULL DEFAULT 0, + `date_checked` bigint(20) unsigned NOT NULL DEFAULT 0, + `failed_reason` text NOT NULL DEFAULT '', + `fetch_runtime` float(5,2) unsigned DEFAULT NULL, + PRIMARY KEY (`id`), + KEY `node_id` (`node_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; +/*!40101 SET character_set_client = @saved_cs_client */; DROP TABLE IF EXISTS `tbl_prober`; /*!40101 SET @saved_cs_client = @@character_set_client */; /*!40101 SET character_set_client = utf8 */;