2022-11-20 15:40:19 +01:00
package jobs
import (
"blackforestbytes.com/simplecloudnotifier/logic"
"blackforestbytes.com/simplecloudnotifier/models"
2023-01-13 17:17:17 +01:00
"errors"
"fmt"
2022-11-20 15:40:19 +01:00
"github.com/rs/zerolog/log"
2023-01-13 17:17:17 +01:00
"gogs.mikescher.com/BlackForestBytes/goext/syncext"
2022-11-20 15:40:19 +01:00
"time"
)
type DeliveryRetryJob struct {
2023-01-13 17:17:17 +01:00
app * logic . Application
name string
isRunning * syncext . AtomicBool
isStarted bool
sigChannel chan string
2022-11-20 15:40:19 +01:00
}
func NewDeliveryRetryJob ( app * logic . Application ) * DeliveryRetryJob {
return & DeliveryRetryJob {
2023-01-13 17:17:17 +01:00
app : app ,
name : "DeliveryRetryJob" ,
isRunning : syncext . NewAtomicBool ( false ) ,
isStarted : false ,
sigChannel : make ( chan string ) ,
2022-11-20 15:40:19 +01:00
}
}
2023-01-13 17:17:17 +01:00
func ( j * DeliveryRetryJob ) Start ( ) error {
if j . isRunning . Get ( ) {
return errors . New ( "job already running" )
2022-11-20 15:40:19 +01:00
}
2023-01-13 17:17:17 +01:00
if j . isStarted {
return errors . New ( "job was already started" ) // re-start after stop is not allowed
}
j . isStarted = true
2022-11-20 15:40:19 +01:00
go j . mainLoop ( )
2023-01-13 17:17:17 +01:00
return nil
2022-11-20 15:40:19 +01:00
}
func ( j * DeliveryRetryJob ) Stop ( ) {
2023-01-13 17:17:17 +01:00
log . Info ( ) . Msg ( fmt . Sprintf ( "Stopping Job [%s]" , j . name ) )
syncext . WriteNonBlocking ( j . sigChannel , "stop" )
j . isRunning . Wait ( false )
log . Info ( ) . Msg ( fmt . Sprintf ( "Stopped Job [%s]" , j . name ) )
}
func ( j * DeliveryRetryJob ) Running ( ) bool {
return j . isRunning . Get ( )
2022-11-20 15:40:19 +01:00
}
func ( j * DeliveryRetryJob ) mainLoop ( ) {
2023-01-13 17:17:17 +01:00
j . isRunning . Set ( true )
var fastRerun bool = false
var err error = nil
2022-11-20 15:40:19 +01:00
2023-01-13 17:17:17 +01:00
for {
interval := 30 * time . Second
2022-11-20 15:40:19 +01:00
if fastRerun {
2023-01-13 17:17:17 +01:00
interval = 1 * time . Second
2022-11-20 15:40:19 +01:00
}
2023-01-13 17:17:17 +01:00
signal , okay := syncext . ReadChannelWithTimeout ( j . sigChannel , interval )
if okay {
if signal == "stop" {
log . Info ( ) . Msg ( fmt . Sprintf ( "Job [%s] received <stop> signal" , j . name ) )
break
} else if signal == "run" {
log . Info ( ) . Msg ( fmt . Sprintf ( "Job [%s] received <run> signal" , j . name ) )
continue
} else {
log . Error ( ) . Msg ( fmt . Sprintf ( "Received unknown job signal: <%s> in job [%s]" , signal , j . name ) )
}
2022-11-20 15:40:19 +01:00
}
2023-01-13 17:17:17 +01:00
log . Debug ( ) . Msg ( fmt . Sprintf ( "Run job [%s]" , j . name ) )
t0 := time . Now ( )
fastRerun , err = j . execute ( )
if err != nil {
log . Err ( err ) . Msg ( fmt . Sprintf ( "Failed to execute job [%s]: %s" , j . name , err . Error ( ) ) )
} else {
t1 := time . Now ( )
log . Debug ( ) . Msg ( fmt . Sprintf ( "Job [%s] finished successfully after %f minutes" , j . name , ( t1 . Sub ( t0 ) ) . Minutes ( ) ) )
}
2022-11-20 15:40:19 +01:00
}
2023-01-13 17:17:17 +01:00
log . Info ( ) . Msg ( fmt . Sprintf ( "Job [%s] exiting main-loop" , j . name ) )
j . isRunning . Set ( false )
2022-11-20 15:40:19 +01:00
}
2023-01-13 17:17:17 +01:00
func ( j * DeliveryRetryJob ) execute ( ) ( fastrr bool , err error ) {
2022-11-20 15:40:19 +01:00
defer func ( ) {
if rec := recover ( ) ; rec != nil {
2022-11-20 17:19:11 +01:00
log . Error ( ) . Interface ( "recover" , rec ) . Msg ( "Recovered panic in DeliveryRetryJob" )
2023-01-13 17:17:17 +01:00
err = errors . New ( fmt . Sprintf ( "Panic recovered: %v" , rec ) )
fastrr = false
2022-11-20 15:40:19 +01:00
}
} ( )
ctx := j . app . NewSimpleTransactionContext ( 10 * time . Second )
defer ctx . Cancel ( )
2023-01-06 00:39:21 +01:00
deliveries , err := j . app . Database . Primary . ListRetrieableDeliveries ( ctx , 32 )
2022-11-20 15:40:19 +01:00
if err != nil {
2023-01-13 17:17:17 +01:00
return false , err
2022-11-20 15:40:19 +01:00
}
err = ctx . CommitTransaction ( )
if err != nil {
2023-01-13 17:17:17 +01:00
return false , err
2022-11-20 15:40:19 +01:00
}
if len ( deliveries ) == 32 {
log . Warn ( ) . Msg ( "The delivery pipeline is greater than 32 (too much for a single cycle)" )
}
for _ , delivery := range deliveries {
j . redeliver ( ctx , delivery )
}
2023-01-13 17:17:17 +01:00
return len ( deliveries ) == 32 , nil
2022-11-20 15:40:19 +01:00
}
func ( j * DeliveryRetryJob ) redeliver ( ctx * logic . SimpleContext , delivery models . Delivery ) {
2023-01-06 00:39:21 +01:00
client , err := j . app . Database . Primary . GetClient ( ctx , delivery . ReceiverUserID , delivery . ReceiverClientID )
2022-11-20 15:40:19 +01:00
if err != nil {
2023-01-14 00:48:51 +01:00
log . Err ( err ) . Str ( "ReceiverUserID" , delivery . ReceiverUserID . String ( ) ) . Str ( "ReceiverClientID" , delivery . ReceiverClientID . String ( ) ) . Msg ( "Failed to get client" )
2022-11-20 15:40:19 +01:00
ctx . RollbackTransaction ( )
return
}
2023-01-14 00:48:51 +01:00
msg , err := j . app . Database . Primary . GetMessage ( ctx , delivery . MessageID , true )
2022-11-20 15:40:19 +01:00
if err != nil {
2023-01-14 00:48:51 +01:00
log . Err ( err ) . Str ( "MessageID" , delivery . MessageID . String ( ) ) . Msg ( "Failed to get message" )
2022-11-20 15:40:19 +01:00
ctx . RollbackTransaction ( )
return
}
2022-12-14 12:29:55 +01:00
if msg . Deleted {
2023-01-06 00:39:21 +01:00
err = j . app . Database . Primary . SetDeliveryFailed ( ctx , delivery )
2022-11-20 15:40:19 +01:00
if err != nil {
2023-01-14 00:48:51 +01:00
log . Err ( err ) . Str ( "MessageID" , delivery . MessageID . String ( ) ) . Str ( "DeliveryID" , delivery . DeliveryID . String ( ) ) . Msg ( "Failed to update delivery" )
2022-11-20 15:40:19 +01:00
ctx . RollbackTransaction ( )
return
}
} else {
2022-12-14 12:29:55 +01:00
fcmDelivID , err := j . app . DeliverMessage ( ctx , client , msg )
if err == nil {
2023-01-06 00:39:21 +01:00
err = j . app . Database . Primary . SetDeliverySuccess ( ctx , delivery , * fcmDelivID )
2022-12-14 12:29:55 +01:00
if err != nil {
2023-01-14 00:48:51 +01:00
log . Err ( err ) . Str ( "MessageID" , delivery . MessageID . String ( ) ) . Str ( "DeliveryID" , delivery . DeliveryID . String ( ) ) . Msg ( "Failed to update delivery" )
2022-12-14 12:29:55 +01:00
ctx . RollbackTransaction ( )
return
}
} else if delivery . RetryCount + 1 > delivery . MaxRetryCount ( ) {
2023-01-06 00:39:21 +01:00
err = j . app . Database . Primary . SetDeliveryFailed ( ctx , delivery )
2022-12-14 12:29:55 +01:00
if err != nil {
2023-01-14 00:48:51 +01:00
log . Err ( err ) . Str ( "MessageID" , delivery . MessageID . String ( ) ) . Str ( "DeliveryID" , delivery . DeliveryID . String ( ) ) . Msg ( "Failed to update delivery" )
2022-12-14 12:29:55 +01:00
ctx . RollbackTransaction ( )
return
}
2023-01-14 00:48:51 +01:00
log . Warn ( ) . Str ( "MessageID" , delivery . MessageID . String ( ) ) . Str ( "DeliveryID" , delivery . DeliveryID . String ( ) ) . Msg ( "Delivery failed after <max> retries (set to FAILURE)" )
2022-12-14 12:29:55 +01:00
} else {
2023-01-06 00:39:21 +01:00
err = j . app . Database . Primary . SetDeliveryRetry ( ctx , delivery )
2022-12-14 12:29:55 +01:00
if err != nil {
2023-01-14 00:48:51 +01:00
log . Err ( err ) . Str ( "MessageID" , delivery . MessageID . String ( ) ) . Str ( "DeliveryID" , delivery . DeliveryID . String ( ) ) . Msg ( "Failed to update delivery" )
2022-12-14 12:29:55 +01:00
ctx . RollbackTransaction ( )
return
}
2022-11-20 15:40:19 +01:00
}
2022-12-14 12:29:55 +01:00
2022-11-20 15:40:19 +01:00
}
err = ctx . CommitTransaction ( )
}