mirror of
https://github.com/ditatompel/xmr-remote-nodes.git
synced 2025-01-08 05:52:10 +07:00
ditatompel
ec11fa0126
This response was used for my HTMX "infinite scroll" data, which is not used in this Svelte project.
512 lines
17 KiB
Go
512 lines
17 KiB
Go
package repo
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"math"
|
|
"net"
|
|
"slices"
|
|
"strings"
|
|
"time"
|
|
"xmr-remote-nodes/internal/database"
|
|
|
|
"github.com/jmoiron/sqlx/types"
|
|
)
|
|
|
|
type MoneroRepository interface {
|
|
Node(id int) (MoneroNode, error)
|
|
Add(protocol string, host string, port uint) error
|
|
Nodes(q MoneroQueryParams) (MoneroNodes, error)
|
|
GiveJob(acceptTor int) (MoneroNode, error)
|
|
ProcessJob(report ProbeReport, proberId int64) error
|
|
NetFee() []NetFee
|
|
Countries() ([]MoneroCountries, error)
|
|
Logs(q MoneroLogQueryParams) (MoneroNodeFetchLogs, error)
|
|
}
|
|
|
|
type MoneroRepo struct {
|
|
db *database.DB
|
|
}
|
|
|
|
func NewMoneroRepo(db *database.DB) MoneroRepository {
|
|
return &MoneroRepo{db}
|
|
}
|
|
|
|
type MoneroNode struct {
|
|
Id uint `json:"id,omitempty" db:"id"`
|
|
Hostname string `json:"hostname" db:"hostname"`
|
|
Ip string `json:"ip" db:"ip_addr"`
|
|
Port uint `json:"port" db:"port"`
|
|
Protocol string `json:"protocol" db:"protocol"`
|
|
IsTor bool `json:"is_tor" db:"is_tor"`
|
|
IsAvailable bool `json:"is_available" db:"is_available"`
|
|
NetType string `json:"nettype" db:"nettype"`
|
|
Height uint `json:"height" db:"height"`
|
|
AdjustedTime uint `json:"adjusted_time" db:"adjusted_time"`
|
|
DatabaseSize uint `json:"database_size" db:"database_size"`
|
|
Difficulty uint `json:"difficulty" db:"difficulty"`
|
|
Version string `json:"version" db:"version"`
|
|
Status string `json:"status,omitempty"`
|
|
Uptime float64 `json:"uptime" db:"uptime"`
|
|
EstimateFee uint `json:"estimate_fee" db:"estimate_fee"`
|
|
Asn uint `json:"asn" db:"asn"`
|
|
AsnName string `json:"asn_name" db:"asn_name"`
|
|
CountryCode string `json:"cc" db:"country"`
|
|
CountryName string `json:"country_name" db:"country_name"`
|
|
City string `json:"city" db:"city"`
|
|
Lat float64 `json:"latitude" db:"lat"`
|
|
Lon float64 `json:"longitude" db:"lon"`
|
|
DateEntered uint `json:"date_entered,omitempty" db:"date_entered"`
|
|
LastChecked uint `json:"last_checked" db:"last_checked"`
|
|
FailedCount uint `json:"failed_count,omitempty" db:"failed_count"`
|
|
LastCheckStatus types.JSONText `json:"last_check_statuses" db:"last_check_status"`
|
|
CorsCapable bool `json:"cors" db:"cors_capable"`
|
|
}
|
|
|
|
func (repo *MoneroRepo) Node(id int) (MoneroNode, error) {
|
|
node := MoneroNode{}
|
|
err := repo.db.Get(&node, `SELECT * FROM tbl_node WHERE id = ?`, id)
|
|
if err != nil && err != sql.ErrNoRows {
|
|
fmt.Println("WARN:", err)
|
|
return node, errors.New("Can't get node information")
|
|
}
|
|
if err == sql.ErrNoRows {
|
|
return node, errors.New("Node not found")
|
|
}
|
|
return node, err
|
|
}
|
|
|
|
type MoneroNodes struct {
|
|
TotalRows int `json:"total_rows"`
|
|
RowsPerPage int `json:"rows_per_page"`
|
|
Items []*MoneroNode `json:"items"`
|
|
}
|
|
|
|
type MoneroQueryParams struct {
|
|
Host string
|
|
NetType string
|
|
Protocol string
|
|
CC string // 2 letter country code
|
|
Status int
|
|
Cors int
|
|
RowsPerPage int
|
|
Page int
|
|
SortBy string
|
|
SortDirection string
|
|
}
|
|
|
|
func (repo *MoneroRepo) Nodes(q MoneroQueryParams) (MoneroNodes, error) {
|
|
queryParams := []interface{}{}
|
|
whereQueries := []string{}
|
|
where := ""
|
|
|
|
if q.Host != "" {
|
|
whereQueries = append(whereQueries, "(hostname LIKE ? OR ip_addr LIKE ?)")
|
|
queryParams = append(queryParams, "%"+q.Host+"%")
|
|
queryParams = append(queryParams, "%"+q.Host+"%")
|
|
}
|
|
if q.NetType != "any" {
|
|
whereQueries = append(whereQueries, "nettype = ?")
|
|
queryParams = append(queryParams, q.NetType)
|
|
}
|
|
if q.Protocol != "any" {
|
|
if q.Protocol == "tor" {
|
|
whereQueries = append(whereQueries, "is_tor = ?")
|
|
queryParams = append(queryParams, 1)
|
|
} else {
|
|
whereQueries = append(whereQueries, "(protocol = ? AND is_tor = ?)")
|
|
queryParams = append(queryParams, q.Protocol)
|
|
queryParams = append(queryParams, 0)
|
|
}
|
|
}
|
|
if q.CC != "any" {
|
|
whereQueries = append(whereQueries, "country = ?")
|
|
if q.CC == "UNKNOWN" {
|
|
queryParams = append(queryParams, "")
|
|
} else {
|
|
queryParams = append(queryParams, q.CC)
|
|
}
|
|
}
|
|
if q.Status != -1 {
|
|
whereQueries = append(whereQueries, "is_available = ?")
|
|
queryParams = append(queryParams, q.Status)
|
|
}
|
|
if q.Cors != -1 {
|
|
whereQueries = append(whereQueries, "cors_capable = ?")
|
|
queryParams = append(queryParams, 1)
|
|
}
|
|
|
|
if len(whereQueries) > 0 {
|
|
where = "WHERE " + strings.Join(whereQueries, " AND ")
|
|
}
|
|
|
|
nodes := MoneroNodes{}
|
|
|
|
queryTotalRows := fmt.Sprintf("SELECT COUNT(id) AS total_rows FROM tbl_node %s", where)
|
|
|
|
err := repo.db.QueryRow(queryTotalRows, queryParams...).Scan(&nodes.TotalRows)
|
|
if err != nil {
|
|
return nodes, err
|
|
}
|
|
queryParams = append(queryParams, q.RowsPerPage, (q.Page-1)*q.RowsPerPage)
|
|
|
|
allowedSort := []string{"last_checked", "uptime"}
|
|
sortBy := "last_checked"
|
|
if slices.Contains(allowedSort, q.SortBy) {
|
|
sortBy = q.SortBy
|
|
}
|
|
sortDirection := "DESC"
|
|
if q.SortDirection == "asc" {
|
|
sortDirection = "ASC"
|
|
}
|
|
|
|
query := fmt.Sprintf("SELECT id, protocol, hostname, port, is_tor, is_available, nettype, height, adjusted_time, database_size, difficulty, version, uptime, estimate_fee, ip_addr, asn, asn_name, country, country_name, city, lat, lon, date_entered, last_checked, last_check_status, cors_capable FROM tbl_node %s ORDER BY %s %s LIMIT ? OFFSET ?", where, sortBy, sortDirection)
|
|
|
|
row, err := repo.db.Query(query, queryParams...)
|
|
if err != nil {
|
|
return nodes, err
|
|
}
|
|
defer row.Close()
|
|
|
|
nodes.RowsPerPage = q.RowsPerPage
|
|
|
|
for row.Next() {
|
|
node := MoneroNode{}
|
|
err = row.Scan(&node.Id, &node.Protocol, &node.Hostname, &node.Port, &node.IsTor, &node.IsAvailable, &node.NetType, &node.Height, &node.AdjustedTime, &node.DatabaseSize, &node.Difficulty, &node.Version, &node.Uptime, &node.EstimateFee, &node.Ip, &node.Asn, &node.AsnName, &node.CountryCode, &node.CountryName, &node.City, &node.Lat, &node.Lon, &node.DateEntered, &node.LastChecked, &node.LastCheckStatus, &node.CorsCapable)
|
|
if err != nil {
|
|
return nodes, err
|
|
}
|
|
nodes.Items = append(nodes.Items, &node)
|
|
}
|
|
|
|
return nodes, nil
|
|
}
|
|
|
|
type MoneroLogQueryParams struct {
|
|
NodeId int // 0 fpr all, >0 for specific node
|
|
WorkerId int // 0 for all, >0 for specific worker
|
|
Status int // -1 for all, 0 for failed, 1 for success
|
|
FailedReason string // empty for all, if not empty, will be used as search from failed_reaso
|
|
|
|
RowsPerPage int
|
|
Page int
|
|
SortBy string
|
|
SortDirection string
|
|
}
|
|
|
|
type ProbeLog struct {
|
|
Id int `db:"id" json:"id,omitempty"`
|
|
NodeId int `db:"node_id" json:"node_id"`
|
|
ProberId int `db:"prober_id" json:"prober_id"`
|
|
Status int `db:"is_available" json:"status"`
|
|
Height int `db:"height" json:"height"`
|
|
AdjustedTime int `db:"adjusted_time" json:"adjusted_time"`
|
|
DatabaseSize int `db:"database_size" json:"database_size"`
|
|
Difficulty int `db:"difficulty" json:"difficulty"`
|
|
EstimateFee int `db:"estimate_fee" json:"estimate_fee"`
|
|
DateChecked int `db:"date_checked" json:"date_checked"`
|
|
FailedReason string `db:"failed_reason" json:"failed_reason"`
|
|
FetchRuntime float64 `db:"fetch_runtime" json:"fetch_runtime"`
|
|
}
|
|
|
|
type MoneroNodeFetchLogs struct {
|
|
TotalRows int `json:"total_rows"`
|
|
RowsPerPage int `json:"rows_per_page"`
|
|
Items []*ProbeLog `json:"items"`
|
|
}
|
|
|
|
func (repo *MoneroRepo) Logs(q MoneroLogQueryParams) (MoneroNodeFetchLogs, error) {
|
|
queryParams := []interface{}{}
|
|
whereQueries := []string{}
|
|
where := ""
|
|
|
|
if q.NodeId != 0 {
|
|
whereQueries = append(whereQueries, "node_id = ?")
|
|
queryParams = append(queryParams, q.NodeId)
|
|
}
|
|
if q.Status != -1 {
|
|
whereQueries = append(whereQueries, "is_available = ?")
|
|
queryParams = append(queryParams, q.Status)
|
|
}
|
|
if q.FailedReason != "" {
|
|
whereQueries = append(whereQueries, "failed_reason LIKE ?")
|
|
queryParams = append(queryParams, "%"+q.FailedReason+"%")
|
|
}
|
|
|
|
if len(whereQueries) > 0 {
|
|
where = "WHERE " + strings.Join(whereQueries, " AND ")
|
|
}
|
|
|
|
fetchLogs := MoneroNodeFetchLogs{}
|
|
|
|
queryTotalRows := fmt.Sprintf("SELECT COUNT(id) FROM tbl_probe_log %s", where)
|
|
err := repo.db.QueryRow(queryTotalRows, queryParams...).Scan(&fetchLogs.TotalRows)
|
|
if err != nil {
|
|
return fetchLogs, err
|
|
}
|
|
queryParams = append(queryParams, q.RowsPerPage, (q.Page-1)*q.RowsPerPage)
|
|
|
|
allowedSort := []string{"date_checked", "fetch_runtime"}
|
|
sortBy := "id"
|
|
if slices.Contains(allowedSort, q.SortBy) {
|
|
sortBy = q.SortBy
|
|
}
|
|
sortDirection := "DESC"
|
|
if q.SortDirection == "asc" {
|
|
sortDirection = "ASC"
|
|
}
|
|
|
|
query := fmt.Sprintf("SELECT id, node_id, prober_id, is_available, height, adjusted_time, database_size, difficulty, estimate_fee, date_checked, failed_reason, fetch_runtime FROM tbl_probe_log %s ORDER BY %s %s LIMIT ? OFFSET ?", where, sortBy, sortDirection)
|
|
|
|
row, err := repo.db.Query(query, queryParams...)
|
|
if err != nil {
|
|
return fetchLogs, err
|
|
}
|
|
defer row.Close()
|
|
|
|
fetchLogs.RowsPerPage = q.RowsPerPage
|
|
|
|
for row.Next() {
|
|
probeLog := ProbeLog{}
|
|
err = row.Scan(&probeLog.Id, &probeLog.NodeId, &probeLog.ProberId, &probeLog.Status, &probeLog.Height, &probeLog.AdjustedTime, &probeLog.DatabaseSize, &probeLog.Difficulty, &probeLog.EstimateFee, &probeLog.DateChecked, &probeLog.FailedReason, &probeLog.FetchRuntime)
|
|
if err != nil {
|
|
return fetchLogs, err
|
|
}
|
|
|
|
fetchLogs.Items = append(fetchLogs.Items, &probeLog)
|
|
}
|
|
|
|
return fetchLogs, nil
|
|
}
|
|
|
|
func (repo *MoneroRepo) Add(protocol string, hostname string, port uint) error {
|
|
if protocol != "http" && protocol != "https" {
|
|
return errors.New("Invalid protocol, must one of or HTTP/HTTPS")
|
|
}
|
|
|
|
if port > 65535 || port < 1 {
|
|
return errors.New("Invalid port number")
|
|
}
|
|
|
|
is_tor := false
|
|
if strings.HasSuffix(hostname, ".onion") {
|
|
is_tor = true
|
|
}
|
|
ip := ""
|
|
|
|
if !is_tor {
|
|
hostIps, err := net.LookupIP(hostname)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
hostIp := hostIps[0].To4()
|
|
if hostIp == nil {
|
|
return errors.New("Host IP is not IPv4")
|
|
}
|
|
if hostIp.IsPrivate() {
|
|
return errors.New("IP address is private")
|
|
}
|
|
if hostIp.IsLoopback() {
|
|
return errors.New("IP address is loopback address")
|
|
}
|
|
|
|
ip = hostIp.String()
|
|
}
|
|
|
|
check := `SELECT id FROM tbl_node WHERE protocol = ? AND hostname = ? AND port = ? LIMIT 1`
|
|
row, err := repo.db.Query(check, protocol, hostname, port)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer row.Close()
|
|
|
|
if row.Next() {
|
|
return errors.New("Node already monitored")
|
|
}
|
|
statusDb, _ := json.Marshal([5]int{2, 2, 2, 2, 2})
|
|
|
|
query := `INSERT INTO tbl_node (protocol, hostname, port, is_tor, nettype, ip_addr, lat, lon, date_entered, last_checked, last_check_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
|
|
_, err = repo.db.Exec(query, protocol, hostname, port, is_tor, "", ip, 0, 0, time.Now().Unix(), 0, string(statusDb))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (repo *MoneroRepo) Delete(id uint) error {
|
|
if _, err := repo.db.Exec(`DELETE FROM tbl_node WHERE id = ?`, id); err != nil {
|
|
return err
|
|
}
|
|
if _, err := repo.db.Exec(`DELETE FROM tbl_probe_log WHERE node_id = ?`, id); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (repo *MoneroRepo) GiveJob(acceptTor int) (MoneroNode, error) {
|
|
queryParams := []interface{}{}
|
|
whereQueries := []string{}
|
|
where := ""
|
|
|
|
if acceptTor != 1 {
|
|
whereQueries = append(whereQueries, "is_tor = ?")
|
|
queryParams = append(queryParams, 0)
|
|
}
|
|
|
|
if len(whereQueries) > 0 {
|
|
where = "WHERE " + strings.Join(whereQueries, " AND ")
|
|
}
|
|
|
|
node := MoneroNode{}
|
|
|
|
query := fmt.Sprintf(`SELECT id, hostname, port, protocol, is_tor, last_check_status FROM tbl_node %s ORDER BY last_checked ASC LIMIT 1`, where)
|
|
err := repo.db.QueryRow(query, queryParams...).Scan(&node.Id, &node.Hostname, &node.Port, &node.Protocol, &node.IsTor, &node.LastCheckStatus)
|
|
if err != nil {
|
|
return node, err
|
|
}
|
|
|
|
update := `UPDATE tbl_node SET last_checked = ? WHERE id = ?`
|
|
_, err = repo.db.Exec(update, time.Now().Unix(), node.Id)
|
|
if err != nil {
|
|
return node, err
|
|
}
|
|
|
|
return node, nil
|
|
}
|
|
|
|
type ProbeReport struct {
|
|
TookTime float64 `json:"took_time"`
|
|
Message string `json:"message"`
|
|
NodeInfo MoneroNode `json:"node_info"`
|
|
}
|
|
|
|
func (repo *MoneroRepo) ProcessJob(report ProbeReport, proberId int64) error {
|
|
if report.NodeInfo.Id == 0 {
|
|
return errors.New("Invalid node")
|
|
}
|
|
|
|
qInsertLog := `INSERT INTO tbl_probe_log (node_id, prober_id, is_available, height, adjusted_time, database_size, difficulty, estimate_fee, date_checked, failed_reason, fetch_runtime) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
|
|
|
|
_, err := repo.db.Exec(qInsertLog, report.NodeInfo.Id, proberId, report.NodeInfo.IsAvailable, report.NodeInfo.Height, report.NodeInfo.AdjustedTime, report.NodeInfo.DatabaseSize, report.NodeInfo.Difficulty, report.NodeInfo.EstimateFee, time.Now().Unix(), report.Message, report.TookTime)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
now := time.Now()
|
|
limitTs := now.AddDate(0, -1, 0).Unix()
|
|
|
|
nodeStats := struct {
|
|
OnlineCount uint `db:"online"`
|
|
OfflineCount uint `db:"offline"`
|
|
TotalFetched uint `db:"total_fetched"`
|
|
}{}
|
|
|
|
qstats := `SELECT
|
|
SUM(if(is_available='1',1,0)) AS online,
|
|
SUM(if(is_available='0',1,0)) AS offline,
|
|
SUM(if(id='0',0,1)) AS total_fetched
|
|
FROM tbl_probe_log WHERE node_id = ? AND date_checked > ?`
|
|
repo.db.Get(&nodeStats, qstats, report.NodeInfo.Id, limitTs)
|
|
|
|
avgUptime := (float64(nodeStats.OnlineCount) / float64(nodeStats.TotalFetched)) * 100
|
|
report.NodeInfo.Uptime = math.Ceil(avgUptime*100) / 100
|
|
|
|
var statuses [5]int
|
|
errUnmarshal := report.NodeInfo.LastCheckStatus.Unmarshal(&statuses)
|
|
if errUnmarshal != nil {
|
|
fmt.Println("Warning", errUnmarshal.Error())
|
|
statuses = [5]int{2, 2, 2, 2, 2}
|
|
}
|
|
|
|
nodeAvailable := 0
|
|
|
|
if report.NodeInfo.IsAvailable {
|
|
nodeAvailable = 1
|
|
}
|
|
newStatuses := statuses[1:]
|
|
newStatuses = append(newStatuses, nodeAvailable)
|
|
statuesValueToDb, errMarshalStatus := json.Marshal(newStatuses)
|
|
if errMarshalStatus != nil {
|
|
fmt.Println("WARN", errMarshalStatus.Error())
|
|
}
|
|
|
|
// recheck IP
|
|
if report.NodeInfo.Ip != "" {
|
|
if ipInfo, errGeoIp := GetGeoIpInfo(report.NodeInfo.Ip); errGeoIp != nil {
|
|
fmt.Println("WARN:", errGeoIp.Error())
|
|
} else {
|
|
report.NodeInfo.Asn = ipInfo.Asn
|
|
report.NodeInfo.AsnName = ipInfo.AsnOrg
|
|
report.NodeInfo.CountryCode = ipInfo.CountryCode
|
|
report.NodeInfo.CountryName = ipInfo.CountryName
|
|
report.NodeInfo.City = ipInfo.City
|
|
report.NodeInfo.Lon = ipInfo.Longitude
|
|
report.NodeInfo.Lat = ipInfo.Latitude
|
|
}
|
|
}
|
|
|
|
if report.NodeInfo.IsAvailable {
|
|
update := `UPDATE tbl_node SET
|
|
is_available = ?, nettype = ?, height = ?, adjusted_time = ?,
|
|
database_size = ?, difficulty = ?, version = ?, uptime = ?,
|
|
estimate_fee = ?, ip_addr = ?, asn = ?, asn_name = ?, country = ?,
|
|
country_name = ?, city = ?, last_checked = ?, last_check_status = ?,
|
|
cors_capable = ?
|
|
WHERE id = ?`
|
|
_, err = repo.db.Exec(update,
|
|
nodeAvailable, report.NodeInfo.NetType, report.NodeInfo.Height, report.NodeInfo.AdjustedTime, report.NodeInfo.DatabaseSize, report.NodeInfo.Difficulty, report.NodeInfo.Version, report.NodeInfo.Uptime, report.NodeInfo.EstimateFee, report.NodeInfo.Ip, report.NodeInfo.Asn, report.NodeInfo.AsnName, report.NodeInfo.CountryCode, report.NodeInfo.CountryName, report.NodeInfo.City, now.Unix(), string(statuesValueToDb), report.NodeInfo.CorsCapable, report.NodeInfo.Id)
|
|
} else {
|
|
update := `UPDATE tbl_node SET is_available = ?, uptime = ?, last_checked = ?, last_check_status = ? WHERE id = ?`
|
|
_, err = repo.db.Exec(update, nodeAvailable, report.NodeInfo.Uptime, now.Unix(), string(statuesValueToDb), report.NodeInfo.Id)
|
|
}
|
|
|
|
if avgUptime <= 0 && nodeStats.TotalFetched > 300 {
|
|
fmt.Println("Deleting Monero node (0% uptime from > 300 records)")
|
|
repo.Delete(report.NodeInfo.Id)
|
|
}
|
|
|
|
repo.db.Exec(`UPDATE tbl_prober SET last_submit_ts = ? WHERE id = ?`, now.Unix(), proberId)
|
|
|
|
return err
|
|
}
|
|
|
|
type NetFee struct {
|
|
Nettype string `json:"nettype" db:"nettype"`
|
|
EstimateFee uint `json:"estimate_fee" db:"estimate_fee"`
|
|
NodeCount int `json:"node_count" db:"node_count"`
|
|
}
|
|
|
|
func (repo *MoneroRepo) NetFee() []NetFee {
|
|
netTypes := [3]string{"mainnet", "stagenet", "testnet"}
|
|
netFees := []NetFee{}
|
|
|
|
for _, net := range netTypes {
|
|
fees := NetFee{}
|
|
err := repo.db.Get(&fees, `SELECT COUNT(id) AS node_count, nettype, estimate_fee FROM tbl_node WHERE nettype = ? GROUP BY estimate_fee ORDER BY node_count DESC LIMIT 1`, net)
|
|
if err != nil {
|
|
fmt.Println("WARN:", err.Error())
|
|
continue
|
|
}
|
|
netFees = append(netFees, fees)
|
|
}
|
|
|
|
return netFees
|
|
}
|
|
|
|
type MoneroCountries struct {
|
|
TotalNodes int `json:"total_nodes" db:"total_nodes"`
|
|
Cc string `json:"cc" db:"country"`
|
|
Name string `json:"name" db:"country_name"`
|
|
}
|
|
|
|
func (repo *MoneroRepo) Countries() ([]MoneroCountries, error) {
|
|
countries := []MoneroCountries{}
|
|
|
|
err := repo.db.Select(&countries, `SELECT COUNT(id) AS total_nodes, country, country_name FROM tbl_node GROUP BY country ORDER BY country ASC`)
|
|
return countries, err
|
|
}
|