mirror of
https://github.com/ditatompel/xmr-remote-nodes.git
synced 2025-01-08 05:52:10 +07:00
Using slog for logging level
The log level for the apps is using `log/slog` from Go standard library. This commit change log format for fiber http logger to match with the slog standard log format (date and time). This commit also remove `APP_DEBUG` field from config struct. TODO: Use `slog` for default app output. Note that in this commit, the `slog` output only implemented in `cron` "db migrate" and probe client.
This commit is contained in:
parent
ce830c393b
commit
46bc3dc2e8
8 changed files with 66 additions and 40 deletions
|
@ -1,3 +1,7 @@
|
||||||
|
# General Config
|
||||||
|
# ##############
|
||||||
|
LOG_LEVEL=INFO # can be DEBUG, INFO, WARN, ERROR
|
||||||
|
|
||||||
# Prober config
|
# Prober config
|
||||||
# #############
|
# #############
|
||||||
SERVER_ENDPOINT="http://127.0.0.1:18901"
|
SERVER_ENDPOINT="http://127.0.0.1:18901"
|
||||||
|
@ -8,9 +12,7 @@ TOR_SOCKS="127.0.0.1:9050"
|
||||||
# Server Config
|
# Server Config
|
||||||
# #############
|
# #############
|
||||||
SECRET_KEY="" # must be 32 char length, use `openssl rand -base64 32` to generate random secret
|
SECRET_KEY="" # must be 32 char length, use `openssl rand -base64 32` to generate random secret
|
||||||
LOG_LEVEL=INFO # can be DEBUG, INFO, WARNING, ERROR
|
|
||||||
# Fiber Config
|
# Fiber Config
|
||||||
APP_DEBUG=false # if this set to true , LOG_LEVEL will be set to DEBUG
|
|
||||||
APP_PREFORK=true
|
APP_PREFORK=true
|
||||||
APP_HOST="127.0.0.1"
|
APP_HOST="127.0.0.1"
|
||||||
APP_PORT=18090
|
APP_PORT=18090
|
||||||
|
@ -21,4 +23,4 @@ DB_HOST=127.0.0.1
|
||||||
DB_PORT=3306
|
DB_PORT=3306
|
||||||
DB_USER=root
|
DB_USER=root
|
||||||
DB_PASSWORD=
|
DB_PASSWORD=
|
||||||
DB_NAME=wa_ditatombot
|
DB_NAME=xmr_nodes
|
||||||
|
|
|
@ -4,7 +4,9 @@ Source code of [https://xmr.ditatompel.com](https://xmr.ditatompel.com).
|
||||||
|
|
||||||
## Requirements
|
## Requirements
|
||||||
|
|
||||||
|
- Go >= 1.22
|
||||||
- Linux Machine (AMD64 or ARM64)
|
- Linux Machine (AMD64 or ARM64)
|
||||||
|
- MySQL/MariaDB (for server)
|
||||||
- [GeoIP Database](https://dev.maxmind.com/geoip/geoip2/geolite2/) (for server, optional). Place it to `./assets/geoip`, see [./internal/repo/geoip.go](./internal/repo/geoip.go).
|
- [GeoIP Database](https://dev.maxmind.com/geoip/geoip2/geolite2/) (for server, optional). Place it to `./assets/geoip`, see [./internal/repo/geoip.go](./internal/repo/geoip.go).
|
||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
|
|
15
cmd/probe.go
15
cmd/probe.go
|
@ -7,6 +7,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"log/slog"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
@ -47,7 +48,6 @@ func runProbe() {
|
||||||
fmt.Println("Please set SERVER_ENDPOINT in .env")
|
fmt.Println("Please set SERVER_ENDPOINT in .env")
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
fmt.Printf("Accept Tor: %t\n", cfg.AcceptTor)
|
|
||||||
|
|
||||||
if cfg.AcceptTor && cfg.TorSocks == "" {
|
if cfg.AcceptTor && cfg.TorSocks == "" {
|
||||||
fmt.Println("Please set TOR_SOCKS in .env")
|
fmt.Println("Please set TOR_SOCKS in .env")
|
||||||
|
@ -58,16 +58,16 @@ func runProbe() {
|
||||||
|
|
||||||
node, err := probe.getJob()
|
node, err := probe.getJob()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
slog.Error(fmt.Sprintf("[PROBE] getJob: %s", err.Error()))
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
fetchNode, err := probe.fetchNode(node)
|
fetchNode, err := probe.fetchNode(node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
slog.Error(fmt.Sprintf("[PROBE] fetchNode: %s", err.Error()))
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
fmt.Println(prettyPrint(fetchNode))
|
slog.Debug(fmt.Sprintf("[PROBE] fetchNode: %s", prettyPrint(fetchNode)))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *proberClient) getJob() (repo.MoneroNode, error) {
|
func (p *proberClient) getJob() (repo.MoneroNode, error) {
|
||||||
|
@ -79,6 +79,7 @@ func (p *proberClient) getJob() (repo.MoneroNode, error) {
|
||||||
node := repo.MoneroNode{}
|
node := repo.MoneroNode{}
|
||||||
|
|
||||||
endpoint := fmt.Sprintf("%s/api/v1/job%s", p.config.ServerEndpoint, queryParams)
|
endpoint := fmt.Sprintf("%s/api/v1/job%s", p.config.ServerEndpoint, queryParams)
|
||||||
|
slog.Info(fmt.Sprintf("[PROBE] Getting node from %s", endpoint))
|
||||||
|
|
||||||
req, err := http.NewRequest(http.MethodGet, endpoint, nil)
|
req, err := http.NewRequest(http.MethodGet, endpoint, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -108,6 +109,7 @@ func (p *proberClient) getJob() (repo.MoneroNode, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
node = response.Data
|
node = response.Data
|
||||||
|
slog.Info(fmt.Sprintf("[PROBE] Got node: %s://%s:%d", node.Protocol, node.Hostname, node.Port))
|
||||||
|
|
||||||
return node, nil
|
return node, nil
|
||||||
}
|
}
|
||||||
|
@ -116,6 +118,8 @@ func (p *proberClient) fetchNode(node repo.MoneroNode) (repo.MoneroNode, error)
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
endpoint := fmt.Sprintf("%s://%s:%d/json_rpc", node.Protocol, node.Hostname, node.Port)
|
endpoint := fmt.Sprintf("%s://%s:%d/json_rpc", node.Protocol, node.Hostname, node.Port)
|
||||||
rpcParam := []byte(`{"jsonrpc": "2.0","id": "0","method": "get_info"}`)
|
rpcParam := []byte(`{"jsonrpc": "2.0","id": "0","method": "get_info"}`)
|
||||||
|
slog.Info(fmt.Sprintf("[PROBE] Fetching node info from %s", endpoint))
|
||||||
|
slog.Debug(fmt.Sprintf("[PROBE] RPC param: %s", string(rpcParam)))
|
||||||
|
|
||||||
req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(rpcParam))
|
req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(rpcParam))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -238,8 +242,7 @@ func (p *proberClient) fetchNode(node repo.MoneroNode) (repo.MoneroNode, error)
|
||||||
tookTime := time.Since(startTime).Seconds()
|
tookTime := time.Since(startTime).Seconds()
|
||||||
node.EstimateFee = feeEstimate.Result.Fee
|
node.EstimateFee = feeEstimate.Result.Fee
|
||||||
|
|
||||||
fmt.Printf("Took %f seconds\n", tookTime)
|
slog.Info(fmt.Sprintf("[PROBE] Took %f seconds", tookTime))
|
||||||
|
|
||||||
if err := p.reportResult(node, tookTime); err != nil {
|
if err := p.reportResult(node, tookTime); err != nil {
|
||||||
return node, err
|
return node, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,8 +9,6 @@ import (
|
||||||
|
|
||||||
const AppVer = "0.0.1"
|
const AppVer = "0.0.1"
|
||||||
|
|
||||||
var LogLevel string
|
|
||||||
|
|
||||||
var rootCmd = &cobra.Command{
|
var rootCmd = &cobra.Command{
|
||||||
Use: "xmr-nodes",
|
Use: "xmr-nodes",
|
||||||
Short: "XMR Nodes",
|
Short: "XMR Nodes",
|
||||||
|
@ -26,5 +24,4 @@ func Execute() {
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
config.LoadAll(".env")
|
config.LoadAll(".env")
|
||||||
LogLevel = config.AppCfg().LogLevel
|
|
||||||
}
|
}
|
||||||
|
|
12
cmd/serve.go
12
cmd/serve.go
|
@ -2,6 +2,7 @@ package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
@ -49,12 +50,13 @@ func serve() {
|
||||||
app := fiber.New(fiberConfig())
|
app := fiber.New(fiberConfig())
|
||||||
|
|
||||||
// recover
|
// recover
|
||||||
app.Use(recover.New(recover.Config{EnableStackTrace: appCfg.Debug}))
|
app.Use(recover.New(recover.Config{EnableStackTrace: true}))
|
||||||
|
|
||||||
// logger middleware
|
// logger middleware
|
||||||
if appCfg.Debug {
|
if appCfg.LogLevel == "DEBUG" {
|
||||||
app.Use(logger.New(logger.Config{
|
app.Use(logger.New(logger.Config{
|
||||||
Format: "[${time}] ${status} - ${latency} ${method} ${path} ${queryParams} ${ip} ${ua}\n",
|
Format: "${time} DEBUG [HTTP] ${status} - ${latency} ${method} ${path} ${queryParams} ${ip} ${ua}\n",
|
||||||
|
TimeFormat: "2006/01/02 15:04:05",
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,14 +90,14 @@ func serve() {
|
||||||
go func() {
|
go func() {
|
||||||
// capture sigterm and other system call here
|
// capture sigterm and other system call here
|
||||||
<-sigCh
|
<-sigCh
|
||||||
fmt.Println("Shutting down HTTP server...")
|
slog.Info("Shutting down HTTP server...")
|
||||||
_ = app.Shutdown()
|
_ = app.Shutdown()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// start http server
|
// start http server
|
||||||
serverAddr := fmt.Sprintf("%s:%d", appCfg.Host, appCfg.Port)
|
serverAddr := fmt.Sprintf("%s:%d", appCfg.Host, appCfg.Port)
|
||||||
if err := app.Listen(serverAddr); err != nil {
|
if err := app.Listen(serverAddr); err != nil {
|
||||||
fmt.Printf("Server is not running! error: %v", err)
|
slog.Error(fmt.Sprintf("Server is not running! error: %v", err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,20 +1,21 @@
|
||||||
package config
|
package config
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"log/slog"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
type App struct {
|
type App struct {
|
||||||
|
// general config
|
||||||
|
LogLevel string
|
||||||
// configuration for server
|
// configuration for server
|
||||||
Debug bool
|
|
||||||
Prefork bool
|
Prefork bool
|
||||||
Host string
|
Host string
|
||||||
Port int
|
Port int
|
||||||
ProxyHeader string
|
ProxyHeader string
|
||||||
AllowOrigin string
|
AllowOrigin string
|
||||||
SecretKey string
|
SecretKey string
|
||||||
LogLevel string
|
|
||||||
// configuration for prober (client)
|
// configuration for prober (client)
|
||||||
ServerEndpoint string
|
ServerEndpoint string
|
||||||
ApiKey string
|
ApiKey string
|
||||||
|
@ -30,21 +31,27 @@ func AppCfg() *App {
|
||||||
|
|
||||||
// loads App configuration
|
// loads App configuration
|
||||||
func LoadApp() {
|
func LoadApp() {
|
||||||
|
// general config
|
||||||
|
app.LogLevel = os.Getenv("LOG_LEVEL")
|
||||||
|
switch app.LogLevel {
|
||||||
|
case "DEBUG":
|
||||||
|
slog.SetLogLoggerLevel(slog.LevelDebug)
|
||||||
|
case "ERROR":
|
||||||
|
slog.SetLogLoggerLevel(slog.LevelError)
|
||||||
|
case "WARN":
|
||||||
|
slog.SetLogLoggerLevel(slog.LevelWarn)
|
||||||
|
default:
|
||||||
|
slog.SetLogLoggerLevel(slog.LevelInfo)
|
||||||
|
}
|
||||||
|
|
||||||
// server configuration
|
// server configuration
|
||||||
app.Host = os.Getenv("APP_HOST")
|
app.Host = os.Getenv("APP_HOST")
|
||||||
app.Port, _ = strconv.Atoi(os.Getenv("APP_PORT"))
|
app.Port, _ = strconv.Atoi(os.Getenv("APP_PORT"))
|
||||||
app.Debug, _ = strconv.ParseBool(os.Getenv("APP_DEBUG"))
|
|
||||||
app.Prefork, _ = strconv.ParseBool(os.Getenv("APP_PREFORK"))
|
app.Prefork, _ = strconv.ParseBool(os.Getenv("APP_PREFORK"))
|
||||||
app.ProxyHeader = os.Getenv("APP_PROXY_HEADER")
|
app.ProxyHeader = os.Getenv("APP_PROXY_HEADER")
|
||||||
app.AllowOrigin = os.Getenv("APP_ALLOW_ORIGIN")
|
app.AllowOrigin = os.Getenv("APP_ALLOW_ORIGIN")
|
||||||
app.SecretKey = os.Getenv("SECRET_KEY")
|
app.SecretKey = os.Getenv("SECRET_KEY")
|
||||||
app.LogLevel = os.Getenv("LOG_LEVEL")
|
|
||||||
if app.LogLevel == "" {
|
|
||||||
app.LogLevel = "INFO"
|
|
||||||
}
|
|
||||||
if app.Debug {
|
|
||||||
app.LogLevel = "DEBUG"
|
|
||||||
}
|
|
||||||
// prober configuration
|
// prober configuration
|
||||||
app.ServerEndpoint = os.Getenv("SERVER_ENDPOINT")
|
app.ServerEndpoint = os.Getenv("SERVER_ENDPOINT")
|
||||||
app.ApiKey = os.Getenv("API_KEY")
|
app.ApiKey = os.Getenv("API_KEY")
|
||||||
|
|
|
@ -2,6 +2,7 @@ package database
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
)
|
)
|
||||||
|
|
||||||
type migrateFn func(*DB) error
|
type migrateFn func(*DB) error
|
||||||
|
@ -11,11 +12,11 @@ var dbMigrate = [...]migrateFn{v1}
|
||||||
func MigrateDb(db *DB) error {
|
func MigrateDb(db *DB) error {
|
||||||
version := getSchemaVersion(db)
|
version := getSchemaVersion(db)
|
||||||
if version < 0 {
|
if version < 0 {
|
||||||
return fmt.Errorf("can't get database schema version")
|
return fmt.Errorf("[DB] can't get database schema version")
|
||||||
} else if version == 0 {
|
} else if version == 0 {
|
||||||
fmt.Println("No database schema version found, creating schema version 1")
|
slog.Warn("[DB] No database schema version found, creating schema version 1")
|
||||||
} else {
|
} else {
|
||||||
fmt.Println("Database schema version:", version)
|
slog.Info(fmt.Sprintf("[DB] Current database schema version: %d", version))
|
||||||
}
|
}
|
||||||
|
|
||||||
for ; version < len(dbMigrate); version++ {
|
for ; version < len(dbMigrate); version++ {
|
||||||
|
@ -25,7 +26,7 @@ func MigrateDb(db *DB) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
migrateFn := dbMigrate[version]
|
migrateFn := dbMigrate[version]
|
||||||
fmt.Println("Migrating database schema version:", version+1)
|
slog.Info(fmt.Sprintf("[DB] Migrating database schema version %d", version+1))
|
||||||
|
|
||||||
if err := migrateFn(db); err != nil {
|
if err := migrateFn(db); err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
|
@ -64,7 +65,9 @@ func setSchemaVersion(db *DB, version int) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func v1(db *DB) error {
|
func v1(db *DB) error {
|
||||||
|
slog.Debug("[DB] Migrating database schema version 1")
|
||||||
// table: tbl_admin
|
// table: tbl_admin
|
||||||
|
slog.Debug("[DB] Creating table: tbl_admin")
|
||||||
_, err := db.Exec(`CREATE TABLE tbl_admin (
|
_, err := db.Exec(`CREATE TABLE tbl_admin (
|
||||||
id INT(11) UNSIGNED NOT NULL AUTO_INCREMENT,
|
id INT(11) UNSIGNED NOT NULL AUTO_INCREMENT,
|
||||||
username VARCHAR(255) NOT NULL,
|
username VARCHAR(255) NOT NULL,
|
||||||
|
@ -76,12 +79,14 @@ func v1(db *DB) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
slog.Debug("[DB] Adding unique key to table: tbl_admin")
|
||||||
_, err = db.Exec(`ALTER TABLE tbl_admin ADD UNIQUE KEY (username)`)
|
_, err = db.Exec(`ALTER TABLE tbl_admin ADD UNIQUE KEY (username)`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// table: tbl_cron
|
// table: tbl_cron
|
||||||
|
slog.Debug("[DB] Creating table: tbl_cron")
|
||||||
_, err = db.Exec(`CREATE TABLE tbl_cron (
|
_, err = db.Exec(`CREATE TABLE tbl_cron (
|
||||||
id INT(8) UNSIGNED NOT NULL AUTO_INCREMENT,
|
id INT(8) UNSIGNED NOT NULL AUTO_INCREMENT,
|
||||||
title VARCHAR(255) NOT NULL DEFAULT '',
|
title VARCHAR(255) NOT NULL DEFAULT '',
|
||||||
|
@ -98,6 +103,7 @@ func v1(db *DB) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
slog.Debug("[DB] Adding default cron jobs to table: tbl_cron")
|
||||||
_, err = db.Exec(`INSERT INTO tbl_cron (title, slug, description, run_every)
|
_, err = db.Exec(`INSERT INTO tbl_cron (title, slug, description, run_every)
|
||||||
VALUES ('Delete old probe logs', 'delete_old_probe_logs', 'Delete old probe log from the database',120);`)
|
VALUES ('Delete old probe logs', 'delete_old_probe_logs', 'Delete old probe log from the database',120);`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -105,6 +111,7 @@ func v1(db *DB) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// table: tbl_node
|
// table: tbl_node
|
||||||
|
slog.Debug("[DB] Creating table: tbl_node")
|
||||||
_, err = db.Exec(`CREATE TABLE tbl_node (
|
_, err = db.Exec(`CREATE TABLE tbl_node (
|
||||||
id INT(11) UNSIGNED NOT NULL AUTO_INCREMENT,
|
id INT(11) UNSIGNED NOT NULL AUTO_INCREMENT,
|
||||||
protocol VARCHAR(6) NOT NULL DEFAULT 'http' COMMENT 'http | https',
|
protocol VARCHAR(6) NOT NULL DEFAULT 'http' COMMENT 'http | https',
|
||||||
|
@ -139,6 +146,7 @@ func v1(db *DB) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// table: tbl_prober
|
// table: tbl_prober
|
||||||
|
slog.Debug("[DB] Creating table: tbl_prober")
|
||||||
_, err = db.Exec(`CREATE TABLE tbl_prober (
|
_, err = db.Exec(`CREATE TABLE tbl_prober (
|
||||||
id INT(9) UNSIGNED NOT NULL AUTO_INCREMENT,
|
id INT(9) UNSIGNED NOT NULL AUTO_INCREMENT,
|
||||||
name VARCHAR(255) NOT NULL,
|
name VARCHAR(255) NOT NULL,
|
||||||
|
@ -149,12 +157,15 @@ func v1(db *DB) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
slog.Debug("[DB] Adding unique key to table: tbl_prober")
|
||||||
_, err = db.Exec(`ALTER TABLE tbl_prober ADD UNIQUE KEY (api_key)`)
|
_, err = db.Exec(`ALTER TABLE tbl_prober ADD UNIQUE KEY (api_key)`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// table: tbl_probe_log
|
// table: tbl_probe_log
|
||||||
|
slog.Debug("[DB] Creating table: tbl_probe_log")
|
||||||
_, err = db.Exec(`CREATE TABLE tbl_probe_log (
|
_, err = db.Exec(`CREATE TABLE tbl_probe_log (
|
||||||
id BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,
|
id BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,
|
||||||
node_id INT(11) UNSIGNED NOT NULL DEFAULT 0,
|
node_id INT(11) UNSIGNED NOT NULL DEFAULT 0,
|
||||||
|
@ -173,6 +184,7 @@ func v1(db *DB) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
slog.Debug("[DB] Adding key to table: tbl_probe_log")
|
||||||
_, err = db.Exec(`ALTER TABLE tbl_probe_log ADD KEY (node_id)`)
|
_, err = db.Exec(`ALTER TABLE tbl_probe_log ADD KEY (node_id)`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -2,6 +2,7 @@ package repo
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
"math"
|
"math"
|
||||||
"slices"
|
"slices"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -40,10 +41,10 @@ func NewCron(db *database.DB) CronRepository {
|
||||||
func (repo *CronRepo) RunCronProcess() {
|
func (repo *CronRepo) RunCronProcess() {
|
||||||
for {
|
for {
|
||||||
time.Sleep(60 * time.Second)
|
time.Sleep(60 * time.Second)
|
||||||
fmt.Println("Running cron...")
|
slog.Info("[CRON] Running cron cycle...")
|
||||||
list, err := repo.queueList()
|
list, err := repo.queueList()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("Error parsing to struct:", err)
|
slog.Warn(fmt.Sprintf("[CRON] Error parsing queue list to struct: %s", err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, task := range list {
|
for _, task := range list {
|
||||||
|
@ -51,7 +52,7 @@ func (repo *CronRepo) RunCronProcess() {
|
||||||
currentTs := startTime.Unix()
|
currentTs := startTime.Unix()
|
||||||
delayedTask := currentTs - task.NextRun
|
delayedTask := currentTs - task.NextRun
|
||||||
if task.CronState == 1 && delayedTask <= int64(rerunTimeout) {
|
if task.CronState == 1 && delayedTask <= int64(rerunTimeout) {
|
||||||
fmt.Println("SKIP STATE 1:", task.Slug)
|
slog.Debug(fmt.Sprintf("[CRON] Skipping task %s because it is already running", task.Slug))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,12 +61,12 @@ func (repo *CronRepo) RunCronProcess() {
|
||||||
repo.execCron(task.Slug)
|
repo.execCron(task.Slug)
|
||||||
|
|
||||||
runTime := math.Ceil(time.Since(startTime).Seconds()*1000) / 1000
|
runTime := math.Ceil(time.Since(startTime).Seconds()*1000) / 1000
|
||||||
fmt.Println("Runtime:", runTime)
|
slog.Info(fmt.Sprintf("[CRON] Task %s done in %f seconds", task.Slug, runTime))
|
||||||
nextRun := currentTs + int64(task.RunEvery)
|
nextRun := currentTs + int64(task.RunEvery)
|
||||||
|
|
||||||
repo.postRunTask(task.Id, nextRun, runTime)
|
repo.postRunTask(task.Id, nextRun, runTime)
|
||||||
}
|
}
|
||||||
fmt.Println("Cron done!")
|
slog.Info("[CRON] Cron cycle done!")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -151,7 +152,7 @@ func (repo *CronRepo) preRunTask(id int, lastRunTs int64) {
|
||||||
query := `UPDATE tbl_cron SET cron_state = ?, last_run = ? WHERE id = ?`
|
query := `UPDATE tbl_cron SET cron_state = ?, last_run = ? WHERE id = ?`
|
||||||
row, err := repo.db.Query(query, 1, lastRunTs, id)
|
row, err := repo.db.Query(query, 1, lastRunTs, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("ERROR PRERUN:", err)
|
slog.Error(fmt.Sprintf("[CRON] Failed to update pre cron state: %s", err))
|
||||||
}
|
}
|
||||||
defer row.Close()
|
defer row.Close()
|
||||||
}
|
}
|
||||||
|
@ -160,7 +161,7 @@ func (repo *CronRepo) postRunTask(id int, nextRun int64, runtime float64) {
|
||||||
query := `UPDATE tbl_cron SET cron_state = ?, next_run = ?, run_time = ? WHERE id = ?`
|
query := `UPDATE tbl_cron SET cron_state = ?, next_run = ?, run_time = ? WHERE id = ?`
|
||||||
row, err := repo.db.Query(query, 0, nextRun, runtime, id)
|
row, err := repo.db.Query(query, 0, nextRun, runtime, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("ERROR PRERUN:", err)
|
slog.Error(fmt.Sprintf("[CRON] Failed to update post cron state: %s", err))
|
||||||
}
|
}
|
||||||
defer row.Close()
|
defer row.Close()
|
||||||
}
|
}
|
||||||
|
@ -168,7 +169,7 @@ func (repo *CronRepo) postRunTask(id int, nextRun int64, runtime float64) {
|
||||||
func (repo *CronRepo) execCron(slug string) {
|
func (repo *CronRepo) execCron(slug string) {
|
||||||
switch slug {
|
switch slug {
|
||||||
case "delete_old_probe_logs":
|
case "delete_old_probe_logs":
|
||||||
fmt.Println("Running task", slug)
|
slog.Info(fmt.Sprintf("[CRON] Start running task: %s", slug))
|
||||||
repo.deleteOldProbeLogs()
|
repo.deleteOldProbeLogs()
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -180,6 +181,6 @@ func (repo *CronRepo) deleteOldProbeLogs() {
|
||||||
query := `DELETE FROM tbl_probe_log WHERE date_checked < ?`
|
query := `DELETE FROM tbl_probe_log WHERE date_checked < ?`
|
||||||
_, err := repo.db.Exec(query, startTs)
|
_, err := repo.db.Exec(query, startTs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err)
|
slog.Error(fmt.Sprintf("[CRON] Failed to delete old probe logs: %s", err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue