xmr-remote-nodes/internal/repo/monero.go
ditatompel f931066c8c
Keep nettype, height, etc same as before
Do not replace existing database record if prober can't reach the remote
nodes. Only update node status and last fetch record.
2024-05-08 19:03:41 +07:00

517 lines
17 KiB
Go

package repo
import (
"database/sql"
"encoding/json"
"errors"
"fmt"
"math"
"net"
"slices"
"strings"
"time"
"github.com/ditatompel/xmr-nodes/internal/database"
"github.com/jmoiron/sqlx/types"
)
type MoneroRepository interface {
Node(id int) (MoneroNode, error)
Add(protocol string, host string, port uint) error
Nodes(q MoneroQueryParams) (MoneroNodes, error)
GiveJob(acceptTor int) (MoneroNode, error)
ProcessJob(report ProbeReport, proberId int64) error
NetFee() []NetFee
Countries() ([]MoneroCountries, error)
Logs(q MoneroLogQueryParams) (MoneroNodeFetchLogs, error)
}
type MoneroRepo struct {
db *database.DB
}
func NewMoneroRepo(db *database.DB) MoneroRepository {
return &MoneroRepo{db}
}
type MoneroNode struct {
Id uint `json:"id,omitempty" db:"id"`
Hostname string `json:"hostname" db:"hostname"`
Ip string `json:"ip" db:"ip_addr"`
Port uint `json:"port" db:"port"`
Protocol string `json:"protocol" db:"protocol"`
IsTor bool `json:"is_tor" db:"is_tor"`
IsAvailable bool `json:"is_available" db:"is_available"`
NetType string `json:"nettype" db:"nettype"`
Height uint `json:"height" db:"height"`
AdjustedTime uint `json:"adjusted_time" db:"adjusted_time"`
DatabaseSize uint `json:"database_size" db:"database_size"`
Difficulty uint `json:"difficulty" db:"difficulty"`
Version string `json:"version" db:"version"`
Status string `json:"status,omitempty"`
Uptime float64 `json:"uptime" db:"uptime"`
EstimateFee uint `json:"estimate_fee" db:"estimate_fee"`
Asn uint `json:"asn" db:"asn"`
AsnName string `json:"asn_name" db:"asn_name"`
CountryCode string `json:"cc" db:"country"`
CountryName string `json:"country_name" db:"country_name"`
City string `json:"city" db:"city"`
Lat float64 `json:"latitude" db:"lat"`
Lon float64 `json:"longitude" db:"lon"`
DateEntered uint `json:"date_entered,omitempty" db:"date_entered"`
LastChecked uint `json:"last_checked" db:"last_checked"`
FailedCount uint `json:"failed_count,omitempty" db:"failed_count"`
LastCheckStatus types.JSONText `json:"last_check_statuses" db:"last_check_status"`
CorsCapable bool `json:"cors" db:"cors_capable"`
}
func (repo *MoneroRepo) Node(id int) (MoneroNode, error) {
node := MoneroNode{}
err := repo.db.Get(&node, `SELECT * FROM tbl_node WHERE id = ?`, id)
if err != nil && err != sql.ErrNoRows {
fmt.Println("WARN:", err)
return node, errors.New("Can't get node information")
}
if err == sql.ErrNoRows {
return node, errors.New("Node not found")
}
return node, err
}
type MoneroNodes struct {
TotalRows int `json:"total_rows"`
RowsPerPage int `json:"rows_per_page"`
CurrentPage int `json:"current_page"`
NextPage int `json:"next_page"`
Items []*MoneroNode `json:"items"`
}
type MoneroQueryParams struct {
Host string
NetType string
Protocol string
CC string // 2 letter country code
Status int
Cors int
RowsPerPage int
Page int
SortBy string
SortDirection string
}
func (repo *MoneroRepo) Nodes(q MoneroQueryParams) (MoneroNodes, error) {
queryParams := []interface{}{}
whereQueries := []string{}
where := ""
if q.Host != "" {
whereQueries = append(whereQueries, "(hostname LIKE ? OR ip_addr LIKE ?)")
queryParams = append(queryParams, "%"+q.Host+"%")
queryParams = append(queryParams, "%"+q.Host+"%")
}
if q.NetType != "any" {
whereQueries = append(whereQueries, "nettype = ?")
queryParams = append(queryParams, q.NetType)
}
if q.Protocol != "any" {
if q.Protocol == "tor" {
whereQueries = append(whereQueries, "is_tor = ?")
queryParams = append(queryParams, 1)
} else {
whereQueries = append(whereQueries, "(protocol = ? AND is_tor = ?)")
queryParams = append(queryParams, q.Protocol)
queryParams = append(queryParams, 0)
}
}
if q.CC != "any" {
whereQueries = append(whereQueries, "country = ?")
if q.CC == "UNKNOWN" {
queryParams = append(queryParams, "")
} else {
queryParams = append(queryParams, q.CC)
}
}
if q.Status != -1 {
whereQueries = append(whereQueries, "is_available = ?")
queryParams = append(queryParams, q.Status)
}
if q.Cors != -1 {
whereQueries = append(whereQueries, "cors_capable = ?")
queryParams = append(queryParams, 1)
}
if len(whereQueries) > 0 {
where = "WHERE " + strings.Join(whereQueries, " AND ")
}
nodes := MoneroNodes{}
queryTotalRows := fmt.Sprintf("SELECT COUNT(id) AS total_rows FROM tbl_node %s", where)
err := repo.db.QueryRow(queryTotalRows, queryParams...).Scan(&nodes.TotalRows)
if err != nil {
return nodes, err
}
queryParams = append(queryParams, q.RowsPerPage, (q.Page-1)*q.RowsPerPage)
allowedSort := []string{"last_checked", "uptime"}
sortBy := "last_checked"
if slices.Contains(allowedSort, q.SortBy) {
sortBy = q.SortBy
}
sortDirection := "DESC"
if q.SortDirection == "asc" {
sortDirection = "ASC"
}
query := fmt.Sprintf("SELECT id, protocol, hostname, port, is_tor, is_available, nettype, height, adjusted_time, database_size, difficulty, version, uptime, estimate_fee, ip_addr, asn, asn_name, country, country_name, city, lat, lon, date_entered, last_checked, last_check_status, cors_capable FROM tbl_node %s ORDER BY %s %s LIMIT ? OFFSET ?", where, sortBy, sortDirection)
row, err := repo.db.Query(query, queryParams...)
if err != nil {
return nodes, err
}
defer row.Close()
nodes.RowsPerPage = q.RowsPerPage
nodes.CurrentPage = q.Page
nodes.NextPage = q.Page + 1
for row.Next() {
node := MoneroNode{}
err = row.Scan(&node.Id, &node.Protocol, &node.Hostname, &node.Port, &node.IsTor, &node.IsAvailable, &node.NetType, &node.Height, &node.AdjustedTime, &node.DatabaseSize, &node.Difficulty, &node.Version, &node.Uptime, &node.EstimateFee, &node.Ip, &node.Asn, &node.AsnName, &node.CountryCode, &node.CountryName, &node.City, &node.Lat, &node.Lon, &node.DateEntered, &node.LastChecked, &node.LastCheckStatus, &node.CorsCapable)
if err != nil {
return nodes, err
}
nodes.Items = append(nodes.Items, &node)
}
return nodes, nil
}
type MoneroLogQueryParams struct {
NodeId int // 0 fpr all, >0 for specific node
WorkerId int // 0 for all, >0 for specific worker
Status int // -1 for all, 0 for failed, 1 for success
FailedReason string // empty for all, if not empty, will be used as search from failed_reaso
RowsPerPage int
Page int
SortBy string
SortDirection string
}
type ProbeLog struct {
Id int `db:"id" json:"id,omitempty"`
NodeId int `db:"node_id" json:"node_id"`
ProberId int `db:"prober_id" json:"prober_id"`
Status int `db:"is_available" json:"status"`
Height int `db:"height" json:"height"`
AdjustedTime int `db:"adjusted_time" json:"adjusted_time"`
DatabaseSize int `db:"database_size" json:"database_size"`
Difficulty int `db:"difficulty" json:"difficulty"`
EstimateFee int `db:"estimate_fee" json:"estimate_fee"`
DateChecked int `db:"date_checked" json:"date_checked"`
FailedReason string `db:"failed_reason" json:"failed_reason"`
FetchRuntime float64 `db:"fetch_runtime" json:"fetch_runtime"`
}
type MoneroNodeFetchLogs struct {
TotalRows int `json:"total_rows"`
RowsPerPage int `json:"rows_per_page"`
Items []*ProbeLog `json:"items"`
}
func (repo *MoneroRepo) Logs(q MoneroLogQueryParams) (MoneroNodeFetchLogs, error) {
queryParams := []interface{}{}
whereQueries := []string{}
where := ""
if q.NodeId != 0 {
whereQueries = append(whereQueries, "node_id = ?")
queryParams = append(queryParams, q.NodeId)
}
if q.Status != -1 {
whereQueries = append(whereQueries, "is_available = ?")
queryParams = append(queryParams, q.Status)
}
if q.FailedReason != "" {
whereQueries = append(whereQueries, "failed_reason LIKE ?")
queryParams = append(queryParams, "%"+q.FailedReason+"%")
}
if len(whereQueries) > 0 {
where = "WHERE " + strings.Join(whereQueries, " AND ")
}
fetchLogs := MoneroNodeFetchLogs{}
queryTotalRows := fmt.Sprintf("SELECT COUNT(id) FROM tbl_probe_log %s", where)
err := repo.db.QueryRow(queryTotalRows, queryParams...).Scan(&fetchLogs.TotalRows)
if err != nil {
return fetchLogs, err
}
queryParams = append(queryParams, q.RowsPerPage, (q.Page-1)*q.RowsPerPage)
allowedSort := []string{"date_checked", "fetch_runtime"}
sortBy := "id"
if slices.Contains(allowedSort, q.SortBy) {
sortBy = q.SortBy
}
sortDirection := "DESC"
if q.SortDirection == "asc" {
sortDirection = "ASC"
}
query := fmt.Sprintf("SELECT id, node_id, prober_id, is_available, height, adjusted_time, database_size, difficulty, estimate_fee, date_checked, failed_reason, fetch_runtime FROM tbl_probe_log %s ORDER BY %s %s LIMIT ? OFFSET ?", where, sortBy, sortDirection)
row, err := repo.db.Query(query, queryParams...)
if err != nil {
return fetchLogs, err
}
defer row.Close()
fetchLogs.RowsPerPage = q.RowsPerPage
for row.Next() {
probeLog := ProbeLog{}
err = row.Scan(&probeLog.Id, &probeLog.NodeId, &probeLog.ProberId, &probeLog.Status, &probeLog.Height, &probeLog.AdjustedTime, &probeLog.DatabaseSize, &probeLog.Difficulty, &probeLog.EstimateFee, &probeLog.DateChecked, &probeLog.FailedReason, &probeLog.FetchRuntime)
if err != nil {
return fetchLogs, err
}
fetchLogs.Items = append(fetchLogs.Items, &probeLog)
}
return fetchLogs, nil
}
func (repo *MoneroRepo) Add(protocol string, hostname string, port uint) error {
if protocol != "http" && protocol != "https" {
return errors.New("Invalid protocol, must one of or HTTP/HTTPS")
}
if port > 65535 || port < 1 {
return errors.New("Invalid port number")
}
is_tor := false
if strings.HasSuffix(hostname, ".onion") {
is_tor = true
}
ip := ""
if !is_tor {
hostIps, err := net.LookupIP(hostname)
if err != nil {
return err
}
hostIp := hostIps[0].To4()
if hostIp == nil {
return errors.New("Host IP is not IPv4")
}
if hostIp.IsPrivate() {
return errors.New("IP address is private")
}
if hostIp.IsLoopback() {
return errors.New("IP address is loopback address")
}
ip = hostIp.String()
}
check := `SELECT id FROM tbl_node WHERE protocol = ? AND hostname = ? AND port = ? LIMIT 1`
row, err := repo.db.Query(check, protocol, hostname, port)
if err != nil {
return err
}
defer row.Close()
if row.Next() {
return errors.New("Node already monitored")
}
statusDb, _ := json.Marshal([5]int{2, 2, 2, 2, 2})
query := `INSERT INTO tbl_node (protocol, hostname, port, is_tor, nettype, ip_addr, lat, lon, date_entered, last_checked, last_check_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
_, err = repo.db.Exec(query, protocol, hostname, port, is_tor, "", ip, 0, 0, time.Now().Unix(), 0, string(statusDb))
if err != nil {
return err
}
return nil
}
func (repo *MoneroRepo) Delete(id uint) error {
if _, err := repo.db.Exec(`DELETE FROM tbl_node WHERE id = ?`, id); err != nil {
return err
}
if _, err := repo.db.Exec(`DELETE FROM tbl_probe_log WHERE node_id = ?`, id); err != nil {
return err
}
return nil
}
func (repo *MoneroRepo) GiveJob(acceptTor int) (MoneroNode, error) {
queryParams := []interface{}{}
whereQueries := []string{}
where := ""
if acceptTor != 1 {
whereQueries = append(whereQueries, "is_tor = ?")
queryParams = append(queryParams, 0)
}
if len(whereQueries) > 0 {
where = "WHERE " + strings.Join(whereQueries, " AND ")
}
node := MoneroNode{}
query := fmt.Sprintf(`SELECT id, hostname, port, protocol, is_tor, last_check_status FROM tbl_node %s ORDER BY last_checked ASC LIMIT 1`, where)
err := repo.db.QueryRow(query, queryParams...).Scan(&node.Id, &node.Hostname, &node.Port, &node.Protocol, &node.IsTor, &node.LastCheckStatus)
if err != nil {
return node, err
}
update := `UPDATE tbl_node SET last_checked = ? WHERE id = ?`
_, err = repo.db.Exec(update, time.Now().Unix(), node.Id)
if err != nil {
return node, err
}
return node, nil
}
type ProbeReport struct {
TookTime float64 `json:"took_time"`
Message string `json:"message"`
NodeInfo MoneroNode `json:"node_info"`
}
func (repo *MoneroRepo) ProcessJob(report ProbeReport, proberId int64) error {
if report.NodeInfo.Id == 0 {
return errors.New("Invalid node")
}
qInsertLog := `INSERT INTO tbl_probe_log (node_id, prober_id, is_available, height, adjusted_time, database_size, difficulty, estimate_fee, date_checked, failed_reason, fetch_runtime) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
_, err := repo.db.Exec(qInsertLog, report.NodeInfo.Id, proberId, report.NodeInfo.IsAvailable, report.NodeInfo.Height, report.NodeInfo.AdjustedTime, report.NodeInfo.DatabaseSize, report.NodeInfo.Difficulty, report.NodeInfo.EstimateFee, time.Now().Unix(), report.Message, report.TookTime)
if err != nil {
return err
}
now := time.Now()
limitTs := now.AddDate(0, -1, 0).Unix()
nodeStats := struct {
OnlineCount uint `db:"online"`
OfflineCount uint `db:"offline"`
TotalFetched uint `db:"total_fetched"`
}{}
qstats := `SELECT
SUM(if(is_available='1',1,0)) AS online,
SUM(if(is_available='0',1,0)) AS offline,
SUM(if(id='0',0,1)) AS total_fetched FROM
tbl_probe_log WHERE node_id = ? AND date_checked > ?`
repo.db.Get(&nodeStats, qstats, report.NodeInfo.Id, limitTs)
avgUptime := (float64(nodeStats.OnlineCount) / float64(nodeStats.TotalFetched)) * 100
report.NodeInfo.Uptime = math.Ceil(avgUptime*100) / 100
var statuses [5]int
errUnmarshal := report.NodeInfo.LastCheckStatus.Unmarshal(&statuses)
if errUnmarshal != nil {
fmt.Println("Warning", errUnmarshal.Error())
statuses = [5]int{2, 2, 2, 2, 2}
}
nodeAvailable := 0
if report.NodeInfo.IsAvailable {
nodeAvailable = 1
}
newStatuses := statuses[1:]
newStatuses = append(newStatuses, nodeAvailable)
statuesValueToDb, errMarshalStatus := json.Marshal(newStatuses)
if errMarshalStatus != nil {
fmt.Println("WARN", errMarshalStatus.Error())
}
// recheck IP
if report.NodeInfo.Ip != "" {
if ipInfo, errGeoIp := GetGeoIpInfo(report.NodeInfo.Ip); errGeoIp != nil {
fmt.Println("WARN:", errGeoIp.Error())
} else {
report.NodeInfo.Asn = ipInfo.Asn
report.NodeInfo.AsnName = ipInfo.AsnOrg
report.NodeInfo.CountryCode = ipInfo.CountryCode
report.NodeInfo.CountryName = ipInfo.CountryName
report.NodeInfo.City = ipInfo.City
report.NodeInfo.Lon = ipInfo.Longitude
report.NodeInfo.Lat = ipInfo.Latitude
}
}
if report.NodeInfo.IsAvailable {
update := `UPDATE tbl_node SET
is_available = ?, nettype = ?, height = ?, adjusted_time = ?,
database_size = ?, difficulty = ?, version = ?, uptime = ?,
estimate_fee = ?, ip_addr = ?, asn = ?, asn_name = ?, country = ?,
country_name = ?, city = ?, last_checked = ?, last_check_status = ?,
cors_capable = ?
WHERE id = ?`
_, err = repo.db.Exec(update,
nodeAvailable, report.NodeInfo.NetType, report.NodeInfo.Height, report.NodeInfo.AdjustedTime, report.NodeInfo.DatabaseSize, report.NodeInfo.Difficulty, report.NodeInfo.Version, report.NodeInfo.Uptime, report.NodeInfo.EstimateFee, report.NodeInfo.Ip, report.NodeInfo.Asn, report.NodeInfo.AsnName, report.NodeInfo.CountryCode, report.NodeInfo.CountryName, report.NodeInfo.City, now.Unix(), string(statuesValueToDb), report.NodeInfo.CorsCapable, report.NodeInfo.Id)
} else {
update := `UPDATE tbl_node SET is_available = ?, uptime = ?, last_checked = ?, last_check_status = ? WHERE id = ?`
_, err = repo.db.Exec(update, nodeAvailable, report.NodeInfo.Uptime, now.Unix(), string(statuesValueToDb), report.NodeInfo.Id)
}
if avgUptime <= 0 && nodeStats.TotalFetched > 300 {
fmt.Println("Deleting Monero node (0% uptime from > 300 records)")
repo.Delete(report.NodeInfo.Id)
}
repo.db.Exec(`UPDATE tbl_prober SET last_submit_ts = ? WHERE id = ?`, now.Unix(), proberId)
return err
}
type NetFee struct {
Nettype string `json:"nettype" db:"nettype"`
EstimateFee uint `json:"estimate_fee" db:"estimate_fee"`
NodeCount int `json:"node_count" db:"node_count"`
}
func (repo *MoneroRepo) NetFee() []NetFee {
netTypes := [3]string{"mainnet", "stagenet", "testnet"}
netFees := []NetFee{}
for _, net := range netTypes {
fees := NetFee{}
err := repo.db.Get(&fees, `SELECT COUNT(id) AS node_count, nettype, estimate_fee FROM tbl_node WHERE nettype = ? GROUP BY estimate_fee ORDER BY node_count DESC LIMIT 1`, net)
if err != nil {
fmt.Println("WARN:", err.Error())
continue
}
netFees = append(netFees, fees)
}
return netFees
}
type MoneroCountries struct {
TotalNodes int `json:"total_nodes" db:"total_nodes"`
Cc string `json:"cc" db:"country"`
Name string `json:"name" db:"country_name"`
}
func (repo *MoneroRepo) Countries() ([]MoneroCountries, error) {
countries := []MoneroCountries{}
err := repo.db.Select(&countries, `SELECT COUNT(id) AS total_nodes, country, country_name FROM tbl_node GROUP BY country ORDER BY country ASC`)
return countries, err
}