package jobs import ( "context" "fmt" "github.com/rs/zerolog/log" "gogs.mikescher.com/BlackForestBytes/goext/exerr" "gogs.mikescher.com/BlackForestBytes/goext/langext" "gogs.mikescher.com/BlackForestBytes/goext/mathext" "gogs.mikescher.com/BlackForestBytes/goext/rfctime" "gogs.mikescher.com/BlackForestBytes/goext/syncext" bunny "locbunny" "locbunny/logic" "locbunny/models" "math/rand" "time" ) type JobFunction[TData any] func(ctx context.Context, app *logic.Application, lstr *JobListener, data *TData) (int, error) type JobRunner[TData any] struct { name string app *logic.Application isRunning *syncext.AtomicBool isStarted bool interval time.Duration sigChannel chan string runTimeout time.Duration jobFunc JobFunction[TData] data *TData } func NewJobRunner[TData any](app *logic.Application, name string, interval time.Duration, timeout time.Duration, fn JobFunction[TData], data *TData) *JobRunner[TData] { return &JobRunner[TData]{ app: app, isRunning: syncext.NewAtomicBool(false), isStarted: false, sigChannel: make(chan string), interval: interval, runTimeout: timeout, name: name, jobFunc: fn, data: data, } } func (j *JobRunner[TData]) Start() error { if j.isRunning.Get() { return exerr.New(bunny.ErrJob, "job already running").Build() } if j.isStarted { return exerr.New(bunny.ErrJob, "job was already started").Build() // re-start after stop is not allowed } j.isStarted = true go j.mainLoop() return nil } func (j *JobRunner[TData]) Stop() { log.Info().Msg(fmt.Sprintf("Stopping Job [%s]", j.name)) syncext.WriteNonBlocking(j.sigChannel, "stop") j.isRunning.Wait(false) log.Info().Msg(fmt.Sprintf("Stopped Job [%s]", j.name)) } func (j *JobRunner[TData]) Running() bool { return j.isRunning.Get() } func (j *JobRunner[TData]) mainLoop() { j.isRunning.Set(true) firstRun := true for { interval := j.interval if firstRun { // randomize first interval to spread jobs around perc := mathext.Clamp(rand.Float64(), 0.1, 0.5) interval = time.Duration(int64(float64(interval) * perc)) } firstRun = false signal, okay := syncext.ReadChannelWithTimeout(j.sigChannel, interval) if okay { if signal == "stop" { log.Info().Msg(fmt.Sprintf("Job [%s] received signal", j.name)) break } else if signal == "run" { log.Info().Msg(fmt.Sprintf("Job [%s] received signal", j.name)) continue } else { log.Error().Msg(fmt.Sprintf("Received unknown job signal: <%s> in job [%s]", signal, j.name)) } } log.Debug().Msg(fmt.Sprintf("Run job [%s]", j.name)) err := j.execute() if err != nil { log.Err(err).Msg(fmt.Sprintf("Failed to execute job [%s]: %s", j.name, err.Error())) } } log.Info().Msg(fmt.Sprintf("Job [%s] exiting main-loop", j.name)) j.isRunning.Set(false) } func (j *JobRunner[TData]) execute() (err error) { defer func() { if rec := recover(); rec != nil { err = exerr.New(bunny.ErrJob, "Recovered panic in JobRunner::execute").Any("recover", rec).Build() } }() runCtx, cancelRunCtx := context.WithTimeout(context.Background(), j.runTimeout) defer cancelRunCtx() jobExec := models.JobExecution{ JobExecutionID: models.NewJobExecutionID(), JobName: j.name, StartTime: rfctime.NowRFC3339Nano(), EndTime: nil, Changes: 0, Status: models.JobStatusRunning, } lstr := NewJobListener(j.app, jobExec.JobExecutionID, j.name) lstr.LogInfo("JOB_START", "Job started", nil) changes, err := langext.RunPanicSafeR2(func() (int, error) { return j.jobFunc(runCtx, j.app, lstr, j.data) }) //goland:noinspection GoTypeAssertionOnErrors if panicerr, ok := err.(langext.PanicWrappedErr); ok { jobExec.EndTime = langext.Ptr(rfctime.NowRFC3339Nano()) jobExec.Error = langext.Ptr(panicerr.Error()) jobExec.Status = models.JobStatusFailed jobExec.Changes = changes lstr.LogFatal("JOB_PANIC", "Job finished with a panic", langext.H{"msg": panicerr.Error(), "obj": panicerr.ReoveredObj()}) log.Error().Str("panic", panicerr.Error()).Msg(fmt.Sprintf("Job '%s' <%s> finished with panic and %d changes after %f minutes", j.name, jobExec.JobExecutionID, changes, jobExec.Delta().Minutes())) } else if err != nil { jobExec.EndTime = langext.Ptr(rfctime.NowRFC3339Nano()) jobExec.Error = langext.Ptr(err.Error()) jobExec.Status = models.JobStatusFailed jobExec.Changes = changes lstr.LogFatal("JOB_ERR", "Job finished with an error", langext.H{"msg": err.Error(), "err_obj": exerr.FromError(err).ToAPIJson(false, true, true)}) log.Error().Str("err", err.Error()).Msg(fmt.Sprintf("Job '%s' <%s> finished with an error and %d changes after %f minutes", j.name, jobExec.JobExecutionID, changes, jobExec.Delta().Minutes())) } else { jobExec.EndTime = langext.Ptr(rfctime.NowRFC3339Nano()) jobExec.Error = nil jobExec.Status = models.JobStatusSuccess jobExec.Changes = changes lstr.LogInfo("JOB_FINISH", "Job finished", nil) log.Info().Msg(fmt.Sprintf("Job '%s' <%s> finished successfully with %d changes after %f minutes", j.name, jobExec.JobExecutionID, changes, jobExec.Delta().Minutes())) } return nil }