From efaad3f97c701c914209825877e86b9454120a01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20Schw=C3=B6rer?= Date: Sun, 28 May 2023 12:31:14 +0200 Subject: [PATCH] Fix RequestLogCollectorJob sometimes not properly shutting down --- scnserver/jobs/DeliveryRetryJob.go | 6 ++++-- scnserver/jobs/RequestLogCleanupJob.go | 6 ++++-- scnserver/jobs/RequestLogCollectorJob.go | 22 ++++++++++++---------- scnserver/logic/application.go | 4 ++++ scnserver/test/util/webserver.go | 2 ++ 5 files changed, 26 insertions(+), 14 deletions(-) diff --git a/scnserver/jobs/DeliveryRetryJob.go b/scnserver/jobs/DeliveryRetryJob.go index 052b994..dcf8d02 100644 --- a/scnserver/jobs/DeliveryRetryJob.go +++ b/scnserver/jobs/DeliveryRetryJob.go @@ -24,7 +24,7 @@ func NewDeliveryRetryJob(app *logic.Application) *DeliveryRetryJob { name: "DeliveryRetryJob", isRunning: syncext.NewAtomicBool(false), isStarted: false, - sigChannel: make(chan string), + sigChannel: make(chan string, 1), } } @@ -45,7 +45,9 @@ func (j *DeliveryRetryJob) Start() error { func (j *DeliveryRetryJob) Stop() { log.Info().Msg(fmt.Sprintf("Stopping Job [%s]", j.name)) - syncext.WriteNonBlocking(j.sigChannel, "stop") + if !syncext.WriteNonBlocking(j.sigChannel, "stop") { + log.Error().Msg(fmt.Sprintf("Failed to send Stop-Signal to Job [%s]", j.name)) + } j.isRunning.Wait(false) log.Info().Msg(fmt.Sprintf("Stopped Job [%s]", j.name)) } diff --git a/scnserver/jobs/RequestLogCleanupJob.go b/scnserver/jobs/RequestLogCleanupJob.go index 3c47aa3..a46912e 100644 --- a/scnserver/jobs/RequestLogCleanupJob.go +++ b/scnserver/jobs/RequestLogCleanupJob.go @@ -23,7 +23,7 @@ func NewRequestLogCleanupJob(app *logic.Application) *RequestLogCleanupJob { name: "RequestLogCleanupJob", isRunning: syncext.NewAtomicBool(false), isStarted: false, - sigChannel: make(chan string), + sigChannel: make(chan string, 1), } } @@ -44,7 +44,9 @@ func (j *RequestLogCleanupJob) Start() error { func (j *RequestLogCleanupJob) Stop() { log.Info().Msg(fmt.Sprintf("Stopping Job [%s]", j.name)) - syncext.WriteNonBlocking(j.sigChannel, "stop") + if !syncext.WriteNonBlocking(j.sigChannel, "stop") { + log.Error().Msg(fmt.Sprintf("Failed to send Stop-Signal to Job [%s]", j.name)) + } j.isRunning.Wait(false) log.Info().Msg(fmt.Sprintf("Stopped Job [%s]", j.name)) } diff --git a/scnserver/jobs/RequestLogCollectorJob.go b/scnserver/jobs/RequestLogCollectorJob.go index b723967..bdd28e9 100644 --- a/scnserver/jobs/RequestLogCollectorJob.go +++ b/scnserver/jobs/RequestLogCollectorJob.go @@ -25,7 +25,7 @@ func NewRequestLogCollectorJob(app *logic.Application) *RequestLogCollectorJob { name: "RequestLogCollectorJob", isRunning: syncext.NewAtomicBool(false), isStarted: false, - sigChannel: make(chan string), + sigChannel: make(chan string, 1), } } @@ -46,7 +46,9 @@ func (j *RequestLogCollectorJob) Start() error { func (j *RequestLogCollectorJob) Stop() { log.Info().Msg(fmt.Sprintf("Stopping Job [%s]", j.name)) - syncext.WriteNonBlocking(j.sigChannel, "stop") + if !syncext.WriteNonBlocking(j.sigChannel, "stop") { + log.Error().Msg(fmt.Sprintf("Failed to send Stop-Signal to Job [%s]", j.name)) + } j.isRunning.Wait(false) log.Info().Msg(fmt.Sprintf("Stopped Job [%s]", j.name)) } @@ -61,6 +63,14 @@ func (j *RequestLogCollectorJob) mainLoop() { mainLoop: for { select { + case obj := <-j.app.RequestLogQueue: + requestid := models.NewRequestID() + err := j.insertLog(requestid, obj) + if err != nil { + log.Error().Err(err).Msg(fmt.Sprintf("Failed to insert RequestLog {%s} into DB", requestid)) + } else { + log.Debug().Msg(fmt.Sprintf("Inserted RequestLog '%s' into DB", requestid)) + } case signal := <-j.sigChannel: if signal == "stop" { log.Info().Msg(fmt.Sprintf("Job [%s] received signal", j.name)) @@ -71,14 +81,6 @@ mainLoop: } else { log.Error().Msg(fmt.Sprintf("Received unknown job signal: <%s> in job [%s]", signal, j.name)) } - case obj := <-j.app.RequestLogQueue: - requestid := models.NewRequestID() - err := j.insertLog(requestid, obj) - if err != nil { - log.Error().Err(err).Msg(fmt.Sprintf("Failed to insert RequestLog {%s} into DB", requestid)) - } else { - log.Debug().Msg(fmt.Sprintf("Inserted RequestLog '%s' into DB", requestid)) - } } } diff --git a/scnserver/logic/application.go b/scnserver/logic/application.go index 4b81405..a032ac1 100644 --- a/scnserver/logic/application.go +++ b/scnserver/logic/application.go @@ -144,6 +144,8 @@ func (app *Application) Run() { job.Stop() } + log.Info().Msg("Manually stopped Jobs") + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() err := app.Database.Stop(ctx) @@ -151,6 +153,8 @@ func (app *Application) Run() { log.Info().Err(err).Msg("Error while stopping the database") } + log.Info().Msg("Manually closed database connection") + app.IsRunning.Set(false) } diff --git a/scnserver/test/util/webserver.go b/scnserver/test/util/webserver.go index 1da3625..a4c5247 100644 --- a/scnserver/test/util/webserver.go +++ b/scnserver/test/util/webserver.go @@ -130,8 +130,10 @@ func StartSimpleWebserver(t *testing.T) (*logic.Application, string, func()) { } stop := func() { + t.Logf("Stopping App") app.Stop() _ = app.IsRunning.WaitWithTimeout(5*time.Second, false) + t.Logf("Stopped App") _ = os.Remove(dbfile1) _ = os.Remove(dbfile2) _ = os.Remove(dbfile3)