Fix RequestLogCollectorJob sometimes not properly shutting down

This commit is contained in:
Mike Schwörer 2023-05-28 12:31:14 +02:00
parent 624c613bd1
commit efaad3f97c
Signed by: Mikescher
GPG Key ID: D3C7172E0A70F8CF
5 changed files with 26 additions and 14 deletions

View File

@ -24,7 +24,7 @@ func NewDeliveryRetryJob(app *logic.Application) *DeliveryRetryJob {
name: "DeliveryRetryJob", name: "DeliveryRetryJob",
isRunning: syncext.NewAtomicBool(false), isRunning: syncext.NewAtomicBool(false),
isStarted: false, isStarted: false,
sigChannel: make(chan string), sigChannel: make(chan string, 1),
} }
} }
@ -45,7 +45,9 @@ func (j *DeliveryRetryJob) Start() error {
func (j *DeliveryRetryJob) Stop() { func (j *DeliveryRetryJob) Stop() {
log.Info().Msg(fmt.Sprintf("Stopping Job [%s]", j.name)) log.Info().Msg(fmt.Sprintf("Stopping Job [%s]", j.name))
syncext.WriteNonBlocking(j.sigChannel, "stop") if !syncext.WriteNonBlocking(j.sigChannel, "stop") {
log.Error().Msg(fmt.Sprintf("Failed to send Stop-Signal to Job [%s]", j.name))
}
j.isRunning.Wait(false) j.isRunning.Wait(false)
log.Info().Msg(fmt.Sprintf("Stopped Job [%s]", j.name)) log.Info().Msg(fmt.Sprintf("Stopped Job [%s]", j.name))
} }

View File

@ -23,7 +23,7 @@ func NewRequestLogCleanupJob(app *logic.Application) *RequestLogCleanupJob {
name: "RequestLogCleanupJob", name: "RequestLogCleanupJob",
isRunning: syncext.NewAtomicBool(false), isRunning: syncext.NewAtomicBool(false),
isStarted: false, isStarted: false,
sigChannel: make(chan string), sigChannel: make(chan string, 1),
} }
} }
@ -44,7 +44,9 @@ func (j *RequestLogCleanupJob) Start() error {
func (j *RequestLogCleanupJob) Stop() { func (j *RequestLogCleanupJob) Stop() {
log.Info().Msg(fmt.Sprintf("Stopping Job [%s]", j.name)) log.Info().Msg(fmt.Sprintf("Stopping Job [%s]", j.name))
syncext.WriteNonBlocking(j.sigChannel, "stop") if !syncext.WriteNonBlocking(j.sigChannel, "stop") {
log.Error().Msg(fmt.Sprintf("Failed to send Stop-Signal to Job [%s]", j.name))
}
j.isRunning.Wait(false) j.isRunning.Wait(false)
log.Info().Msg(fmt.Sprintf("Stopped Job [%s]", j.name)) log.Info().Msg(fmt.Sprintf("Stopped Job [%s]", j.name))
} }

View File

@ -25,7 +25,7 @@ func NewRequestLogCollectorJob(app *logic.Application) *RequestLogCollectorJob {
name: "RequestLogCollectorJob", name: "RequestLogCollectorJob",
isRunning: syncext.NewAtomicBool(false), isRunning: syncext.NewAtomicBool(false),
isStarted: false, isStarted: false,
sigChannel: make(chan string), sigChannel: make(chan string, 1),
} }
} }
@ -46,7 +46,9 @@ func (j *RequestLogCollectorJob) Start() error {
func (j *RequestLogCollectorJob) Stop() { func (j *RequestLogCollectorJob) Stop() {
log.Info().Msg(fmt.Sprintf("Stopping Job [%s]", j.name)) log.Info().Msg(fmt.Sprintf("Stopping Job [%s]", j.name))
syncext.WriteNonBlocking(j.sigChannel, "stop") if !syncext.WriteNonBlocking(j.sigChannel, "stop") {
log.Error().Msg(fmt.Sprintf("Failed to send Stop-Signal to Job [%s]", j.name))
}
j.isRunning.Wait(false) j.isRunning.Wait(false)
log.Info().Msg(fmt.Sprintf("Stopped Job [%s]", j.name)) log.Info().Msg(fmt.Sprintf("Stopped Job [%s]", j.name))
} }
@ -61,6 +63,14 @@ func (j *RequestLogCollectorJob) mainLoop() {
mainLoop: mainLoop:
for { for {
select { select {
case obj := <-j.app.RequestLogQueue:
requestid := models.NewRequestID()
err := j.insertLog(requestid, obj)
if err != nil {
log.Error().Err(err).Msg(fmt.Sprintf("Failed to insert RequestLog {%s} into DB", requestid))
} else {
log.Debug().Msg(fmt.Sprintf("Inserted RequestLog '%s' into DB", requestid))
}
case signal := <-j.sigChannel: case signal := <-j.sigChannel:
if signal == "stop" { if signal == "stop" {
log.Info().Msg(fmt.Sprintf("Job [%s] received <stop> signal", j.name)) log.Info().Msg(fmt.Sprintf("Job [%s] received <stop> signal", j.name))
@ -71,14 +81,6 @@ mainLoop:
} else { } else {
log.Error().Msg(fmt.Sprintf("Received unknown job signal: <%s> in job [%s]", signal, j.name)) log.Error().Msg(fmt.Sprintf("Received unknown job signal: <%s> in job [%s]", signal, j.name))
} }
case obj := <-j.app.RequestLogQueue:
requestid := models.NewRequestID()
err := j.insertLog(requestid, obj)
if err != nil {
log.Error().Err(err).Msg(fmt.Sprintf("Failed to insert RequestLog {%s} into DB", requestid))
} else {
log.Debug().Msg(fmt.Sprintf("Inserted RequestLog '%s' into DB", requestid))
}
} }
} }

View File

@ -144,6 +144,8 @@ func (app *Application) Run() {
job.Stop() job.Stop()
} }
log.Info().Msg("Manually stopped Jobs")
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel() defer cancel()
err := app.Database.Stop(ctx) err := app.Database.Stop(ctx)
@ -151,6 +153,8 @@ func (app *Application) Run() {
log.Info().Err(err).Msg("Error while stopping the database") log.Info().Err(err).Msg("Error while stopping the database")
} }
log.Info().Msg("Manually closed database connection")
app.IsRunning.Set(false) app.IsRunning.Set(false)
} }

View File

@ -130,8 +130,10 @@ func StartSimpleWebserver(t *testing.T) (*logic.Application, string, func()) {
} }
stop := func() { stop := func() {
t.Logf("Stopping App")
app.Stop() app.Stop()
_ = app.IsRunning.WaitWithTimeout(5*time.Second, false) _ = app.IsRunning.WaitWithTimeout(5*time.Second, false)
t.Logf("Stopped App")
_ = os.Remove(dbfile1) _ = os.Remove(dbfile1)
_ = os.Remove(dbfile2) _ = os.Remove(dbfile2)
_ = os.Remove(dbfile3) _ = os.Remove(dbfile3)