package monero import ( "encoding/json" "errors" "fmt" "log/slog" "math" "slices" "strings" "time" "github.com/ditatompel/xmr-remote-nodes/internal/ip/geo" ) type QueryLogs struct { NodeID int // 0 for all, >0 for specific node Status int // -1 for all, 0 for failed, 1 for success FailedReason string // empty for all, if not empty, will be used as search from failed_reaso RowsPerPage int Page int SortBy string SortDirection string } func (q QueryLogs) toSQL() (args []interface{}, where, sortBy, sortDirection string) { wq := []string{} if q.NodeID != 0 { wq = append(wq, "node_id = ?") args = append(args, q.NodeID) } if q.Status != -1 { wq = append(wq, "is_available = ?") args = append(args, q.Status) } if q.FailedReason != "" { wq = append(wq, "failed_reason LIKE ?") args = append(args, "%"+q.FailedReason+"%") } if len(wq) > 0 { where = "WHERE " + strings.Join(wq, " AND ") } as := []string{"date_checked", "fetch_runtime"} sortBy = "id" if slices.Contains(as, q.SortBy) { sortBy = q.SortBy } sortDirection = "DESC" if q.SortDirection == "asc" { sortDirection = "ASC" } return args, where, sortBy, sortDirection } type FetchLog struct { ID int `db:"id" json:"id,omitempty"` NodeID int `db:"node_id" json:"node_id"` ProberID int `db:"prober_id" json:"prober_id"` Status int `db:"is_available" json:"status"` Height int `db:"height" json:"height"` AdjustedTime int `db:"adjusted_time" json:"adjusted_time"` DatabaseSize int `db:"database_size" json:"database_size"` Difficulty int `db:"difficulty" json:"difficulty"` EstimateFee int `db:"estimate_fee" json:"estimate_fee"` DateChecked int `db:"date_checked" json:"date_checked"` FailedReason string `db:"failed_reason" json:"failed_reason"` FetchRuntime float64 `db:"fetch_runtime" json:"fetch_runtime"` } type FetchLogs struct { TotalRows int `json:"total_rows"` RowsPerPage int `json:"rows_per_page"` Items []*FetchLog `json:"items"` } // Logs returns list of fetched log result for given query func (r *moneroRepo) Logs(q QueryLogs) (FetchLogs, error) { args, where, sortBy, sortDirection := q.toSQL() var fetchLogs FetchLogs fetchLogs.RowsPerPage = q.RowsPerPage qTotal := fmt.Sprintf(`SELECT COUNT(id) FROM tbl_probe_log %s`, where) err := r.db.QueryRow(qTotal, args...).Scan(&fetchLogs.TotalRows) if err != nil { return fetchLogs, err } args = append(args, q.RowsPerPage, (q.Page-1)*q.RowsPerPage) query := fmt.Sprintf(` SELECT * FROM tbl_probe_log %s -- where query ORDER BY %s %s LIMIT ? OFFSET ?`, where, sortBy, sortDirection) err = r.db.Select(&fetchLogs.Items, query, args...) return fetchLogs, err } // GiveJob returns node that should be probed for the next time func (r *moneroRepo) GiveJob(acceptTor, acceptIPv6 int) (Node, error) { args := []interface{}{} wq := []string{} where := "" if acceptTor != 1 { wq = append(wq, "is_tor = ?") args = append(args, 0) } if acceptIPv6 != 1 { wq = append(wq, "ipv6_only = ?") args = append(args, 0) } if len(wq) > 0 { where = "WHERE " + strings.Join(wq, " AND ") } var node Node query := fmt.Sprintf(` SELECT id, hostname, port, protocol, is_tor, last_check_status FROM tbl_node %s -- where query if any ORDER BY last_checked ASC LIMIT 1`, where) err := r.db.QueryRow(query, args...).Scan( &node.ID, &node.Hostname, &node.Port, &node.Protocol, &node.IsTor, &node.LastCheckStatus) if err != nil { return node, err } _, err = r.db.Exec(` UPDATE tbl_node SET last_checked = ? WHERE id = ?`, time.Now().Unix(), node.ID) if err != nil { return node, err } return node, nil } type ProbeReport struct { TookTime float64 `json:"took_time"` Message string `json:"message"` Node Node `json:"node"` } type nodeStats struct { Online uint `db:"online"` // total count online Offline uint `db:"offline"` // total count offline TotalFetched uint `db:"total_fetched"` } // Create new status indicator based from LastCheckStatus and recent IsAvailable status func (p *ProbeReport) parseStatuses() string { var s [5]int if err := p.Node.LastCheckStatus.Unmarshal(&s); err != nil { slog.Warn(err.Error()) s = [5]int{2, 2, 2, 2, 2} } si := 0 // set default "status indicator" to offline if p.Node.IsAvailable { si = 1 } ns := s[1:] ns = append(ns, si) j, err := json.Marshal(ns) if err != nil { slog.Warn(err.Error()) } return string(j) } // Process report data from probers func (r *moneroRepo) ProcessJob(report ProbeReport, proberId int64) error { if report.Node.ID == 0 { return errors.New("Invalid node") } now := time.Now() qInsertLog := ` INSERT INTO tbl_probe_log ( node_id, prober_id, is_available, height, adjusted_time, database_size, difficulty, estimate_fee, date_checked, failed_reason, fetch_runtime ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )` _, err := r.db.Exec(qInsertLog, report.Node.ID, proberId, report.Node.IsAvailable, report.Node.Height, report.Node.AdjustedTime, report.Node.DatabaseSize, report.Node.Difficulty, report.Node.EstimateFee, now.Unix(), report.Message, report.TookTime) if err != nil { return err } limitTs := now.AddDate(0, -1, 0).Unix() var stats nodeStats qstats := ` SELECT SUM(if(is_available='1',1,0)) AS online, SUM(if(is_available='0',1,0)) AS offline, SUM(if(id='0',0,1)) AS total_fetched FROM tbl_probe_log WHERE node_id = ? AND date_checked > ?` if err := r.db.Get(&stats, qstats, report.Node.ID, limitTs); err != nil { slog.Warn(err.Error()) } avgUptime := (float64(stats.Online) / float64(stats.TotalFetched)) * 100 report.Node.Uptime = math.Ceil(avgUptime*100) / 100 statuses := report.parseStatuses() // recheck IP if report.Node.IP != "" { if ipInfo, errGeoIp := geo.Info(report.Node.IP); errGeoIp != nil { fmt.Println("WARN:", errGeoIp.Error()) } else { report.Node.ASN = ipInfo.ASN report.Node.ASNName = ipInfo.ASNOrg report.Node.CountryCode = ipInfo.CountryCode report.Node.CountryName = ipInfo.CountryName report.Node.City = ipInfo.City report.Node.Longitude = ipInfo.Longitude report.Node.Latitude = ipInfo.Latitude } } if report.Node.IsAvailable { update := ` UPDATE tbl_node SET is_available = ?, nettype = ?, height = ?, adjusted_time = ?, database_size = ?, difficulty = ?, version = ?, uptime = ?, estimate_fee = ?, ip_addr = ?, asn = ?, asn_name = ?, country = ?, country_name = ?, city = ?, last_checked = ?, last_check_status = ?, cors_capable = ?, ipv6_only = ? WHERE id = ?` _, err := r.db.Exec(update, 1, report.Node.Nettype, report.Node.Height, report.Node.AdjustedTime, report.Node.DatabaseSize, report.Node.Difficulty, report.Node.Version, report.Node.Uptime, report.Node.EstimateFee, report.Node.IP, report.Node.ASN, report.Node.ASNName, report.Node.CountryCode, report.Node.CountryName, report.Node.City, now.Unix(), statuses, report.Node.CORSCapable, report.Node.IPv6Only, report.Node.ID) if err != nil { slog.Warn(err.Error()) } } else { u := ` UPDATE tbl_node SET is_available = ?, uptime = ?, last_checked = ?, last_check_status = ?, ipv6_only = ? WHERE id = ?` if _, err := r.db.Exec(u, 0, report.Node.Uptime, now.Unix(), statuses, report.Node.IPv6Only, report.Node.ID); err != nil { slog.Warn(err.Error()) } } if avgUptime <= 0 && stats.TotalFetched > 300 { fmt.Println("Deleting Monero node (0% uptime from > 300 records)") if err := r.Delete(report.Node.ID); err != nil { slog.Warn(err.Error()) } } _, err = r.db.Exec(` UPDATE tbl_prober SET last_submit_ts = ? WHERE id = ?`, now.Unix(), proberId) return err }