2024-05-04 17:24:47 +07:00
package repo
import (
2024-05-07 01:08:01 +07:00
"database/sql"
2024-05-04 17:24:47 +07:00
"encoding/json"
"errors"
2024-05-04 18:52:47 +07:00
"fmt"
2024-05-05 01:42:47 +07:00
"math"
2024-05-04 17:24:47 +07:00
"net"
2024-05-04 18:52:47 +07:00
"slices"
2024-05-04 17:24:47 +07:00
"strings"
"time"
"github.com/ditatompel/xmr-nodes/internal/database"
2024-05-04 18:52:47 +07:00
"github.com/jmoiron/sqlx/types"
2024-05-04 17:24:47 +07:00
)
type MoneroRepository interface {
2024-05-07 01:08:01 +07:00
Node ( id int ) ( MoneroNode , error )
2024-05-04 17:24:47 +07:00
Add ( protocol string , host string , port uint ) error
2024-05-04 18:52:47 +07:00
Nodes ( q MoneroQueryParams ) ( MoneroNodes , error )
2024-05-04 19:27:21 +07:00
GiveJob ( acceptTor int ) ( MoneroNode , error )
2024-05-05 01:42:47 +07:00
ProcessJob ( report ProbeReport , proberId int64 ) error
2024-05-06 13:19:48 +07:00
NetFee ( ) [ ] NetFee
2024-05-06 13:35:15 +07:00
Countries ( ) ( [ ] MoneroCountries , error )
2024-05-06 17:19:17 +07:00
Logs ( q MoneroLogQueryParams ) ( MoneroNodeFetchLogs , error )
2024-05-04 17:24:47 +07:00
}
type MoneroRepo struct {
db * database . DB
}
func NewMoneroRepo ( db * database . DB ) MoneroRepository {
return & MoneroRepo { db }
}
2024-05-04 18:52:47 +07:00
type MoneroNode struct {
Id uint ` json:"id,omitempty" db:"id" `
Hostname string ` json:"hostname" db:"hostname" `
Ip string ` json:"ip" db:"ip_addr" `
Port uint ` json:"port" db:"port" `
Protocol string ` json:"protocol" db:"protocol" `
IsTor bool ` json:"is_tor" db:"is_tor" `
IsAvailable bool ` json:"is_available" db:"is_available" `
NetType string ` json:"nettype" db:"nettype" `
2024-05-04 22:53:03 +07:00
Height uint ` json:"height" db:"height" `
2024-05-04 18:52:47 +07:00
AdjustedTime uint ` json:"adjusted_time" db:"adjusted_time" `
DatabaseSize uint ` json:"database_size" db:"database_size" `
Difficulty uint ` json:"difficulty" db:"difficulty" `
2024-05-04 22:53:03 +07:00
Version string ` json:"version" db:"version" `
Status string ` json:"status,omitempty" `
2024-05-05 01:42:47 +07:00
Uptime float64 ` json:"uptime" db:"uptime" `
2024-05-04 18:52:47 +07:00
EstimateFee uint ` json:"estimate_fee" db:"estimate_fee" `
Asn uint ` json:"asn" db:"asn" `
AsnName string ` json:"asn_name" db:"asn_name" `
CountryCode string ` json:"cc" db:"country" `
CountryName string ` json:"country_name" db:"country_name" `
City string ` json:"city" db:"city" `
Lat float64 ` json:"latitude" db:"lat" `
Lon float64 ` json:"longitude" db:"lon" `
DateEntered uint ` json:"date_entered,omitempty" db:"date_entered" `
LastChecked uint ` json:"last_checked" db:"last_checked" `
FailedCount uint ` json:"failed_count,omitempty" db:"failed_count" `
LastCheckStatus types . JSONText ` json:"last_check_statuses" db:"last_check_status" `
CorsCapable bool ` json:"cors" db:"cors_capable" `
}
2024-05-07 01:08:01 +07:00
func ( repo * MoneroRepo ) Node ( id int ) ( MoneroNode , error ) {
node := MoneroNode { }
err := repo . db . Get ( & node , ` SELECT * FROM tbl_node WHERE id = ? ` , id )
if err != nil && err != sql . ErrNoRows {
fmt . Println ( "WARN:" , err )
return node , errors . New ( "Can't get node information" )
}
if err == sql . ErrNoRows {
return node , errors . New ( "Node not found" )
}
return node , err
}
2024-05-04 18:52:47 +07:00
type MoneroNodes struct {
TotalRows int ` json:"total_rows" `
RowsPerPage int ` json:"rows_per_page" `
CurrentPage int ` json:"current_page" `
NextPage int ` json:"next_page" `
Items [ ] * MoneroNode ` json:"items" `
}
type MoneroQueryParams struct {
Host string
2024-05-06 14:33:13 +07:00
NetType string
Protocol string
CC string // 2 letter country code
Status int
Cors int
2024-05-04 18:52:47 +07:00
RowsPerPage int
Page int
SortBy string
SortDirection string
}
func ( repo * MoneroRepo ) Nodes ( q MoneroQueryParams ) ( MoneroNodes , error ) {
queryParams := [ ] interface { } { }
whereQueries := [ ] string { }
where := ""
if q . Host != "" {
whereQueries = append ( whereQueries , "(hostname LIKE ? OR ip_addr LIKE ?)" )
queryParams = append ( queryParams , "%" + q . Host + "%" )
queryParams = append ( queryParams , "%" + q . Host + "%" )
}
2024-05-06 14:33:13 +07:00
if q . NetType != "any" {
whereQueries = append ( whereQueries , "nettype = ?" )
queryParams = append ( queryParams , q . NetType )
}
if q . Protocol != "any" {
if q . Protocol == "tor" {
whereQueries = append ( whereQueries , "is_tor = ?" )
queryParams = append ( queryParams , 1 )
} else {
whereQueries = append ( whereQueries , "(protocol = ? AND is_tor = ?)" )
queryParams = append ( queryParams , q . Protocol )
queryParams = append ( queryParams , 0 )
}
}
if q . CC != "any" {
whereQueries = append ( whereQueries , "country = ?" )
if q . CC == "UNKNOWN" {
queryParams = append ( queryParams , "" )
} else {
queryParams = append ( queryParams , q . CC )
}
}
if q . Status != - 1 {
whereQueries = append ( whereQueries , "is_available = ?" )
queryParams = append ( queryParams , q . Status )
}
if q . Cors != - 1 {
whereQueries = append ( whereQueries , "cors_capable = ?" )
queryParams = append ( queryParams , 1 )
}
2024-05-04 18:52:47 +07:00
if len ( whereQueries ) > 0 {
where = "WHERE " + strings . Join ( whereQueries , " AND " )
}
nodes := MoneroNodes { }
queryTotalRows := fmt . Sprintf ( "SELECT COUNT(id) AS total_rows FROM tbl_node %s" , where )
err := repo . db . QueryRow ( queryTotalRows , queryParams ... ) . Scan ( & nodes . TotalRows )
if err != nil {
return nodes , err
}
queryParams = append ( queryParams , q . RowsPerPage , ( q . Page - 1 ) * q . RowsPerPage )
allowedSort := [ ] string { "last_checked" , "uptime" }
sortBy := "last_checked"
if slices . Contains ( allowedSort , q . SortBy ) {
sortBy = q . SortBy
}
sortDirection := "DESC"
if q . SortDirection == "asc" {
sortDirection = "ASC"
}
2024-05-04 22:53:03 +07:00
query := fmt . Sprintf ( "SELECT id, protocol, hostname, port, is_tor, is_available, nettype, height, adjusted_time, database_size, difficulty, version, uptime, estimate_fee, ip_addr, asn, asn_name, country, country_name, city, lat, lon, date_entered, last_checked, last_check_status, cors_capable FROM tbl_node %s ORDER BY %s %s LIMIT ? OFFSET ?" , where , sortBy , sortDirection )
2024-05-04 18:52:47 +07:00
row , err := repo . db . Query ( query , queryParams ... )
if err != nil {
return nodes , err
}
defer row . Close ( )
nodes . RowsPerPage = q . RowsPerPage
nodes . CurrentPage = q . Page
nodes . NextPage = q . Page + 1
for row . Next ( ) {
node := MoneroNode { }
2024-05-05 02:27:20 +07:00
err = row . Scan ( & node . Id , & node . Protocol , & node . Hostname , & node . Port , & node . IsTor , & node . IsAvailable , & node . NetType , & node . Height , & node . AdjustedTime , & node . DatabaseSize , & node . Difficulty , & node . Version , & node . Uptime , & node . EstimateFee , & node . Ip , & node . Asn , & node . AsnName , & node . CountryCode , & node . CountryName , & node . City , & node . Lat , & node . Lon , & node . DateEntered , & node . LastChecked , & node . LastCheckStatus , & node . CorsCapable )
2024-05-04 18:52:47 +07:00
if err != nil {
return nodes , err
}
nodes . Items = append ( nodes . Items , & node )
}
return nodes , nil
}
2024-05-06 17:19:17 +07:00
type MoneroLogQueryParams struct {
NodeId int // 0 fpr all, >0 for specific node
WorkerId int // 0 for all, >0 for specific worker
Status int // -1 for all, 0 for failed, 1 for success
FailReason string // empty for all, if not empty, will be used as search from failed_reaso
RowsPerPage int
Page int
SortBy string
SortDirection string
}
type ProbeLog struct {
Id int ` db:"id" json:"id,omitempty" `
NodeId int ` db:"node_id" json:"node_id" `
ProberId int ` db:"prober_id" json:"prober_id" `
Status int ` db:"is_available" json:"status" `
Height int ` db:"height" json:"height" `
AdjustedTime int ` db:"adjusted_time" json:"adjusted_time" `
DatabaseSize int ` db:"database_size" json:"database_size" `
Difficulty int ` db:"difficulty" json:"difficulty" `
EstimateFee int ` db:"estimate_fee" json:"estimate_fee" `
DateChecked int ` db:"date_checked" json:"date_checked" `
FailedReason string ` db:"failed_reason" json:"failed_reason" `
FetchRuntime float64 ` db:"fetch_runtime" json:"fetch_runtime" `
}
type MoneroNodeFetchLogs struct {
TotalRows int ` json:"total_rows" `
RowsPerPage int ` json:"rows_per_page" `
Items [ ] * ProbeLog ` json:"items" `
}
func ( repo * MoneroRepo ) Logs ( q MoneroLogQueryParams ) ( MoneroNodeFetchLogs , error ) {
queryParams := [ ] interface { } { }
whereQueries := [ ] string { }
where := ""
if q . NodeId != 0 {
whereQueries = append ( whereQueries , "node_id = ?" )
queryParams = append ( queryParams , q . NodeId )
}
if len ( whereQueries ) > 0 {
where = "WHERE " + strings . Join ( whereQueries , " AND " )
}
fetchLogs := MoneroNodeFetchLogs { }
queryTotalRows := fmt . Sprintf ( "SELECT COUNT(id) FROM tbl_probe_log %s" , where )
err := repo . db . QueryRow ( queryTotalRows , queryParams ... ) . Scan ( & fetchLogs . TotalRows )
if err != nil {
return fetchLogs , err
}
queryParams = append ( queryParams , q . RowsPerPage , ( q . Page - 1 ) * q . RowsPerPage )
allowedSort := [ ] string { "date_checked" , "fetch_runtime" }
sortBy := "id"
if slices . Contains ( allowedSort , q . SortBy ) {
sortBy = q . SortBy
}
sortDirection := "DESC"
if q . SortDirection == "asc" {
sortDirection = "ASC"
}
query := fmt . Sprintf ( "SELECT id, node_id, prober_id, is_available, height, adjusted_time, database_size, difficulty, estimate_fee, date_checked, failed_reason, fetch_runtime FROM tbl_probe_log %s ORDER BY %s %s LIMIT ? OFFSET ?" , where , sortBy , sortDirection )
row , err := repo . db . Query ( query , queryParams ... )
if err != nil {
return fetchLogs , err
}
defer row . Close ( )
fetchLogs . RowsPerPage = q . RowsPerPage
for row . Next ( ) {
probeLog := ProbeLog { }
err = row . Scan ( & probeLog . Id , & probeLog . NodeId , & probeLog . ProberId , & probeLog . Status , & probeLog . Height , & probeLog . AdjustedTime , & probeLog . DatabaseSize , & probeLog . Difficulty , & probeLog . EstimateFee , & probeLog . DateChecked , & probeLog . FailedReason , & probeLog . FetchRuntime )
if err != nil {
return fetchLogs , err
}
fetchLogs . Items = append ( fetchLogs . Items , & probeLog )
}
return fetchLogs , nil
}
2024-05-04 17:24:47 +07:00
func ( repo * MoneroRepo ) Add ( protocol string , hostname string , port uint ) error {
if protocol != "http" && protocol != "https" {
return errors . New ( "Invalid protocol, must one of or HTTP/HTTPS" )
}
if port > 65535 || port < 1 {
return errors . New ( "Invalid port number" )
}
is_tor := false
if strings . HasSuffix ( hostname , ".onion" ) {
is_tor = true
}
ip := ""
if ! is_tor {
hostIps , err := net . LookupIP ( hostname )
if err != nil {
return err
}
hostIp := hostIps [ 0 ] . To4 ( )
if hostIp == nil {
return errors . New ( "Host IP is not IPv4" )
}
if hostIp . IsPrivate ( ) {
return errors . New ( "IP address is private" )
}
if hostIp . IsLoopback ( ) {
return errors . New ( "IP address is loopback address" )
}
ip = hostIp . String ( )
}
check := ` SELECT id FROM tbl_node WHERE protocol = ? AND hostname = ? AND port = ? LIMIT 1 `
row , err := repo . db . Query ( check , protocol , hostname , port )
if err != nil {
return err
}
defer row . Close ( )
if row . Next ( ) {
return errors . New ( "Node already monitored" )
}
statusDb , _ := json . Marshal ( [ 5 ] int { 2 , 2 , 2 , 2 , 2 } )
query := ` INSERT INTO tbl_node (protocol, hostname, port, is_tor, nettype, ip_addr, lat, lon, date_entered, last_checked, last_check_status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `
_ , err = repo . db . Exec ( query , protocol , hostname , port , is_tor , "" , ip , 0 , 0 , time . Now ( ) . Unix ( ) , 0 , string ( statusDb ) )
if err != nil {
return err
}
return nil
}
2024-05-04 19:27:21 +07:00
2024-05-06 17:45:18 +07:00
func ( repo * MoneroRepo ) Delete ( id uint ) error {
if _ , err := repo . db . Exec ( ` DELETE FROM tbl_node WHERE id = ? ` , id ) ; err != nil {
return err
}
if _ , err := repo . db . Exec ( ` DELETE FROM tbl_probe_log WHERE node_id = ? ` , id ) ; err != nil {
return err
}
return nil
}
2024-05-04 19:27:21 +07:00
func ( repo * MoneroRepo ) GiveJob ( acceptTor int ) ( MoneroNode , error ) {
queryParams := [ ] interface { } { }
whereQueries := [ ] string { }
where := ""
if acceptTor != 1 {
whereQueries = append ( whereQueries , "is_tor = ?" )
queryParams = append ( queryParams , 0 )
}
if len ( whereQueries ) > 0 {
where = "WHERE " + strings . Join ( whereQueries , " AND " )
}
node := MoneroNode { }
2024-05-05 01:42:47 +07:00
query := fmt . Sprintf ( ` SELECT id, hostname, port, protocol, is_tor, last_check_status FROM tbl_node %s ORDER BY last_checked ASC LIMIT 1 ` , where )
err := repo . db . QueryRow ( query , queryParams ... ) . Scan ( & node . Id , & node . Hostname , & node . Port , & node . Protocol , & node . IsTor , & node . LastCheckStatus )
2024-05-04 19:27:21 +07:00
if err != nil {
return node , err
}
update := ` UPDATE tbl_node SET last_checked = ? WHERE id = ? `
_ , err = repo . db . Exec ( update , time . Now ( ) . Unix ( ) , node . Id )
if err != nil {
return node , err
}
return node , nil
}
2024-05-05 01:42:47 +07:00
type ProbeReport struct {
TookTime float64 ` json:"took_time" `
Message string ` json:"message" `
NodeInfo MoneroNode ` json:"node_info" `
}
func ( repo * MoneroRepo ) ProcessJob ( report ProbeReport , proberId int64 ) error {
if report . NodeInfo . Id == 0 {
return errors . New ( "Invalid node" )
}
qInsertLog := ` INSERT INTO tbl_probe_log (node_id, prober_id, is_available, height, adjusted_time, database_size, difficulty, estimate_fee, date_checked, failed_reason, fetch_runtime) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `
_ , err := repo . db . Exec ( qInsertLog , report . NodeInfo . Id , proberId , report . NodeInfo . IsAvailable , report . NodeInfo . Height , report . NodeInfo . AdjustedTime , report . NodeInfo . DatabaseSize , report . NodeInfo . Difficulty , report . NodeInfo . EstimateFee , time . Now ( ) . Unix ( ) , report . Message , report . TookTime )
if err != nil {
return err
}
now := time . Now ( )
limitTs := now . AddDate ( 0 , - 1 , 0 ) . Unix ( )
nodeStats := struct {
OnlineCount uint ` db:"online" `
OfflineCount uint ` db:"offline" `
TotalFetched uint ` db:"total_fetched" `
} { }
qstats := ` SELECT
SUM ( if ( is_available = '1' , 1 , 0 ) ) AS online ,
SUM ( if ( is_available = '0' , 1 , 0 ) ) AS offline ,
SUM ( if ( id = '0' , 0 , 1 ) ) AS total_fetched FROM
tbl_probe_log WHERE node_id = ? AND date_checked > ? `
repo . db . Get ( & nodeStats , qstats , report . NodeInfo . Id , limitTs )
avgUptime := ( float64 ( nodeStats . OnlineCount ) / float64 ( nodeStats . TotalFetched ) ) * 100
report . NodeInfo . Uptime = math . Ceil ( avgUptime * 100 ) / 100
var statuses [ 5 ] int
errUnmarshal := report . NodeInfo . LastCheckStatus . Unmarshal ( & statuses )
if errUnmarshal != nil {
fmt . Println ( "Warning" , errUnmarshal . Error ( ) )
statuses = [ 5 ] int { 2 , 2 , 2 , 2 , 2 }
}
nodeAvailable := 0
if report . NodeInfo . IsAvailable {
nodeAvailable = 1
}
newStatuses := statuses [ 1 : ]
newStatuses = append ( newStatuses , nodeAvailable )
statuesValueToDb , errMarshalStatus := json . Marshal ( newStatuses )
if errMarshalStatus != nil {
fmt . Println ( "WARN" , errMarshalStatus . Error ( ) )
}
// recheck IP
2024-05-05 02:20:54 +07:00
if report . NodeInfo . Ip != "" {
if ipInfo , errGeoIp := GetGeoIpInfo ( report . NodeInfo . Ip ) ; errGeoIp != nil {
fmt . Println ( "WARN:" , errGeoIp . Error ( ) )
} else {
report . NodeInfo . Asn = ipInfo . Asn
report . NodeInfo . AsnName = ipInfo . AsnOrg
report . NodeInfo . CountryCode = ipInfo . CountryCode
report . NodeInfo . CountryName = ipInfo . CountryName
report . NodeInfo . City = ipInfo . City
report . NodeInfo . Lon = ipInfo . Longitude
report . NodeInfo . Lat = ipInfo . Latitude
}
}
2024-05-05 01:42:47 +07:00
update := ` UPDATE tbl_node SET
is_available = ? , nettype = ? , height = ? , adjusted_time = ? ,
database_size = ? , difficulty = ? , version = ? , uptime = ? ,
estimate_fee = ? , ip_addr = ? , asn = ? , asn_name = ? , country = ? ,
country_name = ? , city = ? , last_checked = ? , last_check_status = ? ,
cors_capable = ?
WHERE id = ? `
_ , err = repo . db . Exec ( update ,
nodeAvailable , report . NodeInfo . NetType , report . NodeInfo . Height , report . NodeInfo . AdjustedTime , report . NodeInfo . DatabaseSize , report . NodeInfo . Difficulty , report . NodeInfo . Version , report . NodeInfo . Uptime , report . NodeInfo . EstimateFee , report . NodeInfo . Ip , report . NodeInfo . Asn , report . NodeInfo . AsnName , report . NodeInfo . CountryCode , report . NodeInfo . CountryName , report . NodeInfo . City , now . Unix ( ) , string ( statuesValueToDb ) , report . NodeInfo . CorsCapable , report . NodeInfo . Id )
2024-05-06 17:45:18 +07:00
if avgUptime <= 0 && nodeStats . TotalFetched > 300 {
fmt . Println ( "Deleting Monero node (0% uptime from > 300 records)" )
repo . Delete ( report . NodeInfo . Id )
}
2024-05-05 01:42:47 +07:00
return err
}
2024-05-06 13:19:48 +07:00
type NetFee struct {
Nettype string ` json:"nettype" db:"nettype" `
EstimateFee uint ` json:"estimate_fee" db:"estimate_fee" `
NodeCount int ` json:"node_count" db:"node_count" `
}
func ( repo * MoneroRepo ) NetFee ( ) [ ] NetFee {
netTypes := [ 3 ] string { "mainnet" , "stagenet" , "testnet" }
netFees := [ ] NetFee { }
for _ , net := range netTypes {
fees := NetFee { }
err := repo . db . Get ( & fees , ` SELECT COUNT(id) AS node_count, nettype, estimate_fee FROM tbl_node WHERE nettype = ? GROUP BY estimate_fee ORDER BY node_count DESC LIMIT 1 ` , net )
if err != nil {
fmt . Println ( "WARN:" , err . Error ( ) )
continue
}
netFees = append ( netFees , fees )
}
return netFees
}
2024-05-06 13:35:15 +07:00
type MoneroCountries struct {
TotalNodes int ` json:"total_nodes" db:"total_nodes" `
Cc string ` json:"cc" db:"country" `
Name string ` json:"name" db:"country_name" `
}
func ( repo * MoneroRepo ) Countries ( ) ( [ ] MoneroCountries , error ) {
countries := [ ] MoneroCountries { }
err := repo . db . Select ( & countries , ` SELECT COUNT(id) AS total_nodes, country, country_name FROM tbl_node GROUP BY country ORDER BY country ASC ` )
return countries , err
}