mirror of
https://github.com/ditatompel/xmr-remote-nodes.git
synced 2025-01-08 05:52:10 +07:00
ditatompel
5496692c5d
I hope it will be less discoverable by other users and less likely to be used unintentionally in other projects.
185 lines
4.9 KiB
Go
185 lines
4.9 KiB
Go
package repo
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"slices"
|
|
"strings"
|
|
"time"
|
|
"xmr-remote-nodes/internal/database"
|
|
)
|
|
|
|
type CronRepository interface {
|
|
RunCronProcess()
|
|
Crons(q CronQueryParams) (CronTasks, error)
|
|
}
|
|
|
|
type CronRepo struct {
|
|
db *database.DB
|
|
}
|
|
|
|
type Cron struct {
|
|
Id int `json:"id" db:"id"`
|
|
Title string `json:"title" db:"title"`
|
|
Slug string `json:"slug" db:"slug"`
|
|
Description string `json:"description" db:"description"`
|
|
RunEvery int `json:"run_every" db:"run_every"`
|
|
LastRun int64 `json:"last_run" db:"last_run"`
|
|
NextRun int64 `json:"next_run" db:"next_run"`
|
|
RunTime float64 `json:"run_time" db:"run_time"`
|
|
CronState int `json:"cron_state" db:"cron_state"`
|
|
IsEnabled int `json:"is_enabled" db:"is_enabled"`
|
|
}
|
|
|
|
var rerunTimeout = 300
|
|
|
|
func NewCron(db *database.DB) CronRepository {
|
|
return &CronRepo{db}
|
|
}
|
|
|
|
func (repo *CronRepo) RunCronProcess() {
|
|
for {
|
|
time.Sleep(60 * time.Second)
|
|
fmt.Println("Running cron...")
|
|
list, err := repo.queueList()
|
|
if err != nil {
|
|
fmt.Println("Error parsing to struct:", err)
|
|
continue
|
|
}
|
|
for _, task := range list {
|
|
startTime := time.Now()
|
|
currentTs := startTime.Unix()
|
|
delayedTask := currentTs - task.NextRun
|
|
if task.CronState == 1 && delayedTask <= int64(rerunTimeout) {
|
|
fmt.Println("SKIP STATE 1:", task.Slug)
|
|
continue
|
|
}
|
|
|
|
repo.preRunTask(task.Id, currentTs)
|
|
|
|
repo.execCron(task.Slug)
|
|
|
|
runTime := math.Ceil(time.Since(startTime).Seconds()*1000) / 1000
|
|
fmt.Println("Runtime:", runTime)
|
|
nextRun := currentTs + int64(task.RunEvery)
|
|
|
|
repo.postRunTask(task.Id, nextRun, runTime)
|
|
}
|
|
fmt.Println("Cron done!")
|
|
}
|
|
}
|
|
|
|
type CronQueryParams struct {
|
|
Title string
|
|
Description string
|
|
IsEnabled int
|
|
CronState int
|
|
RowsPerPage int
|
|
Page int
|
|
SortBy string
|
|
SortDirection string
|
|
}
|
|
|
|
type CronTasks struct {
|
|
TotalRows int `json:"total_rows"`
|
|
RowsPerPage int `json:"rows_per_page"`
|
|
Items []*Cron `json:"items"`
|
|
}
|
|
|
|
func (repo *CronRepo) Crons(q CronQueryParams) (CronTasks, error) {
|
|
queryParams := []interface{}{}
|
|
whereQueries := []string{}
|
|
where := ""
|
|
|
|
if q.Title != "" {
|
|
whereQueries = append(whereQueries, "title LIKE ?")
|
|
queryParams = append(queryParams, "%"+q.Title+"%")
|
|
}
|
|
if q.Description != "" {
|
|
whereQueries = append(whereQueries, "description LIKE ?")
|
|
queryParams = append(queryParams, "%"+q.Description+"%")
|
|
}
|
|
if q.IsEnabled != -1 {
|
|
whereQueries = append(whereQueries, "is_enabled = ?")
|
|
queryParams = append(queryParams, q.IsEnabled)
|
|
}
|
|
if q.CronState != -1 {
|
|
whereQueries = append(whereQueries, "cron_state = ?")
|
|
queryParams = append(queryParams, q.CronState)
|
|
}
|
|
if len(whereQueries) > 0 {
|
|
where = "WHERE " + strings.Join(whereQueries, " AND ")
|
|
}
|
|
tasks := CronTasks{}
|
|
|
|
queryTotalRows := fmt.Sprintf("SELECT COUNT(id) FROM tbl_cron %s", where)
|
|
err := repo.db.QueryRow(queryTotalRows, queryParams...).Scan(&tasks.TotalRows)
|
|
if err != nil {
|
|
return tasks, err
|
|
}
|
|
queryParams = append(queryParams, q.RowsPerPage, (q.Page-1)*q.RowsPerPage)
|
|
allowedSort := []string{"id", "run_every", "last_run", "next_run", "run_time"}
|
|
sortBy := "id"
|
|
if slices.Contains(allowedSort, q.SortBy) {
|
|
sortBy = q.SortBy
|
|
}
|
|
sortDirection := "DESC"
|
|
if q.SortDirection == "asc" {
|
|
sortDirection = "ASC"
|
|
}
|
|
|
|
query := fmt.Sprintf("SELECT id, title, slug, description, run_every, last_run, next_run, run_time, cron_state, is_enabled FROM tbl_cron %s ORDER BY %s %s LIMIT ? OFFSET ?", where, sortBy, sortDirection)
|
|
err = repo.db.Select(&tasks.Items, query, queryParams...)
|
|
if err != nil {
|
|
return tasks, err
|
|
}
|
|
tasks.RowsPerPage = q.RowsPerPage
|
|
|
|
return tasks, nil
|
|
}
|
|
|
|
func (repo *CronRepo) queueList() ([]Cron, error) {
|
|
tasks := []Cron{}
|
|
query := `SELECT id, run_every, last_run, slug, next_run, cron_state FROM tbl_cron
|
|
WHERE is_enabled = ? AND next_run <= ?`
|
|
err := repo.db.Select(&tasks, query, 1, time.Now().Unix())
|
|
|
|
return tasks, err
|
|
}
|
|
|
|
func (repo *CronRepo) preRunTask(id int, lastRunTs int64) {
|
|
query := `UPDATE tbl_cron SET cron_state = ?, last_run = ? WHERE id = ?`
|
|
row, err := repo.db.Query(query, 1, lastRunTs, id)
|
|
if err != nil {
|
|
fmt.Println("ERROR PRERUN:", err)
|
|
}
|
|
defer row.Close()
|
|
}
|
|
|
|
func (repo *CronRepo) postRunTask(id int, nextRun int64, runtime float64) {
|
|
query := `UPDATE tbl_cron SET cron_state = ?, next_run = ?, run_time = ? WHERE id = ?`
|
|
row, err := repo.db.Query(query, 0, nextRun, runtime, id)
|
|
if err != nil {
|
|
fmt.Println("ERROR PRERUN:", err)
|
|
}
|
|
defer row.Close()
|
|
}
|
|
|
|
func (repo *CronRepo) execCron(slug string) {
|
|
switch slug {
|
|
case "delete_old_probe_logs":
|
|
fmt.Println("Running task", slug)
|
|
repo.deleteOldProbeLogs()
|
|
break
|
|
}
|
|
}
|
|
|
|
func (repo *CronRepo) deleteOldProbeLogs() {
|
|
// for now, we only delete stats older than 1 month +2 days
|
|
startTs := time.Now().AddDate(0, -1, -2).Unix()
|
|
query := `DELETE FROM tbl_probe_log WHERE date_checked < ?`
|
|
_, err := repo.db.Exec(query, startTs)
|
|
if err != nil {
|
|
fmt.Println(err)
|
|
}
|
|
}
|