From 8c1f6b0c435671943efbc677903c47be7ea5bad5 Mon Sep 17 00:00:00 2001 From: ditatompel Date: Thu, 23 May 2024 02:24:06 +0700 Subject: [PATCH] Make cron as it's own package --- cmd/server/cron.go | 9 +++---- cmd/server/serve.go | 4 +-- internal/{repo => cron}/cron.go | 43 ++++++++++++++++----------------- 3 files changed, 27 insertions(+), 29 deletions(-) rename internal/{repo => cron}/cron.go (74%) diff --git a/cmd/server/cron.go b/cmd/server/cron.go index f1dde84..51c4797 100644 --- a/cmd/server/cron.go +++ b/cmd/server/cron.go @@ -5,8 +5,8 @@ import ( "os" "text/tabwriter" "time" + "xmr-remote-nodes/internal/cron" "xmr-remote-nodes/internal/database" - "xmr-remote-nodes/internal/repo" "github.com/spf13/cobra" ) @@ -19,8 +19,7 @@ var cronCmd = &cobra.Command{ if err := database.ConnectDB(); err != nil { panic(err) } - cronRepo := repo.NewCron(database.GetDB()) - crons, err := cronRepo.Crons() + crons, err := cron.New().Crons() if err != nil { fmt.Println(err) return @@ -32,8 +31,8 @@ var cronCmd = &cobra.Command{ w := tabwriter.NewWriter(os.Stdout, 1, 1, 1, ' ', 0) fmt.Fprintf(w, "ID\t| Name\t| Run Every\t| Last Run\t| Took Time\n") for _, cron := range crons { - fmt.Fprintf(w, "%d\t| %s\t| %d\t| %s\t| %f\n", - cron.Id, + fmt.Fprintf(w, "%d\t| %s\t| %ds\t| %s\t| %f\n", + cron.ID, cron.Title, cron.RunEvery, time.Unix(cron.LastRun, 0).Format(time.RFC3339), diff --git a/cmd/server/serve.go b/cmd/server/serve.go index ec1ecc5..32c5101 100644 --- a/cmd/server/serve.go +++ b/cmd/server/serve.go @@ -9,8 +9,8 @@ import ( "xmr-remote-nodes/frontend" "xmr-remote-nodes/handler" "xmr-remote-nodes/internal/config" + "xmr-remote-nodes/internal/cron" "xmr-remote-nodes/internal/database" - "xmr-remote-nodes/internal/repo" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/cors" @@ -73,7 +73,7 @@ func serve() { // start a cleanup cron-job if !fiber.IsChild() { - cronRepo := repo.NewCron(database.GetDB()) + cronRepo := cron.New() go cronRepo.RunCronProcess() } diff --git a/internal/repo/cron.go b/internal/cron/cron.go similarity index 74% rename from internal/repo/cron.go rename to internal/cron/cron.go index 5368488..218251f 100644 --- a/internal/repo/cron.go +++ b/internal/cron/cron.go @@ -1,4 +1,4 @@ -package repo +package cron import ( "fmt" @@ -18,7 +18,7 @@ type CronRepo struct { } type Cron struct { - Id int `json:"id" db:"id"` + ID int `json:"id" db:"id"` Title string `json:"title" db:"title"` Slug string `json:"slug" db:"slug"` Description string `json:"description" db:"description"` @@ -32,15 +32,15 @@ type Cron struct { var rerunTimeout = 300 -func NewCron(db *database.DB) CronRepository { - return &CronRepo{db} +func New() CronRepository { + return &CronRepo{db: database.GetDB()} } -func (repo *CronRepo) RunCronProcess() { +func (r *CronRepo) RunCronProcess() { for { time.Sleep(60 * time.Second) slog.Info("[CRON] Running cron cycle...") - list, err := repo.queueList() + list, err := r.queueList() if err != nil { slog.Warn(fmt.Sprintf("[CRON] Error parsing queue list to struct: %s", err)) continue @@ -54,23 +54,23 @@ func (repo *CronRepo) RunCronProcess() { continue } - repo.preRunTask(task.Id, currentTs) + r.preRunTask(task.ID, currentTs) - repo.execCron(task.Slug) + r.execCron(task.Slug) runTime := math.Ceil(time.Since(startTime).Seconds()*1000) / 1000 slog.Info(fmt.Sprintf("[CRON] Task %s done in %f seconds", task.Slug, runTime)) nextRun := currentTs + int64(task.RunEvery) - repo.postRunTask(task.Id, nextRun, runTime) + r.postRunTask(task.ID, nextRun, runTime) } slog.Info("[CRON] Cron cycle done!") } } -func (repo *CronRepo) Crons() ([]Cron, error) { +func (r *CronRepo) Crons() ([]Cron, error) { var tasks []Cron - err := repo.db.Select(&tasks, ` + err := r.db.Select(&tasks, ` SELECT id, title, @@ -87,7 +87,7 @@ func (repo *CronRepo) Crons() ([]Cron, error) { return tasks, err } -func (repo *CronRepo) queueList() ([]Cron, error) { +func (r *CronRepo) queueList() ([]Cron, error) { tasks := []Cron{} query := ` SELECT @@ -102,12 +102,12 @@ func (repo *CronRepo) queueList() ([]Cron, error) { WHERE is_enabled = ? AND next_run <= ?` - err := repo.db.Select(&tasks, query, 1, time.Now().Unix()) + err := r.db.Select(&tasks, query, 1, time.Now().Unix()) return tasks, err } -func (repo *CronRepo) preRunTask(id int, lastRunTs int64) { +func (r *CronRepo) preRunTask(id int, lastRunTs int64) { query := ` UPDATE tbl_cron SET @@ -115,14 +115,14 @@ func (repo *CronRepo) preRunTask(id int, lastRunTs int64) { last_run = ? WHERE id = ?` - row, err := repo.db.Query(query, 1, lastRunTs, id) + row, err := r.db.Query(query, 1, lastRunTs, id) if err != nil { slog.Error(fmt.Sprintf("[CRON] Failed to update pre cron state: %s", err)) } defer row.Close() } -func (repo *CronRepo) postRunTask(id int, nextRun int64, runtime float64) { +func (r *CronRepo) postRunTask(id int, nextRun int64, runtime float64) { query := ` UPDATE tbl_cron SET @@ -131,27 +131,26 @@ func (repo *CronRepo) postRunTask(id int, nextRun int64, runtime float64) { run_time = ? WHERE id = ?` - row, err := repo.db.Query(query, 0, nextRun, runtime, id) + row, err := r.db.Query(query, 0, nextRun, runtime, id) if err != nil { slog.Error(fmt.Sprintf("[CRON] Failed to update post cron state: %s", err)) } defer row.Close() } -func (repo *CronRepo) execCron(slug string) { +func (r *CronRepo) execCron(slug string) { switch slug { case "delete_old_probe_logs": slog.Info(fmt.Sprintf("[CRON] Start running task: %s", slug)) - repo.deleteOldProbeLogs() - break + r.deleteOldProbeLogs() } } -func (repo *CronRepo) deleteOldProbeLogs() { +func (r *CronRepo) deleteOldProbeLogs() { // for now, we only delete stats older than 1 month +2 days startTs := time.Now().AddDate(0, -1, -2).Unix() query := `DELETE FROM tbl_probe_log WHERE date_checked < ?` - _, err := repo.db.Exec(query, startTs) + _, err := r.db.Exec(query, startTs) if err != nil { slog.Error(fmt.Sprintf("[CRON] Failed to delete old probe logs: %s", err)) }