DeliveryRetryJob

This commit is contained in:
Mike Schwörer 2022-11-20 15:40:19 +01:00
parent c2899fd727
commit 08a93551e7
Signed by: Mikescher
GPG Key ID: D3C7172E0A70F8CF
12 changed files with 388 additions and 21 deletions

View File

@ -3,9 +3,8 @@
//TODO
- background job for re-delivery
- POST::/messages
- https://firebase.google.com/docs/cloud-messaging/send-message#rest
- https://firebase.google.com/docs/cloud-messaging/send-message#rest !!
- List subscriptions on all owned channels (RESTful?)
- deploy
- full-text-search: https://www.sqlite.org/fts5.html#contentless_tables

View File

@ -1110,6 +1110,11 @@ func (h APIHandler) DeleteMessage(g *gin.Context) ginresp.HTTPResponse {
return ginresp.InternAPIError(500, apierr.DATABASE_ERROR, "Failed to delete message", err)
}
err = h.database.CancelPendingDeliveries(ctx, msg.SCNMessageID)
if err != nil {
return ginresp.InternAPIError(500, apierr.DATABASE_ERROR, "Failed to cancel deliveries", err)
}
return ctx.FinishSuccess(ginresp.JSON(http.StatusOK, msg.FullJSON()))
}

View File

@ -5,7 +5,6 @@ import (
"blackforestbytes.com/simplecloudnotifier/common/ginresp"
"blackforestbytes.com/simplecloudnotifier/db"
"blackforestbytes.com/simplecloudnotifier/logic"
"blackforestbytes.com/simplecloudnotifier/models"
"database/sql"
"fmt"
"github.com/gin-gonic/gin"
@ -277,7 +276,7 @@ func (h MessageHandler) sendMessageInternal(ctx *logic.AppContext, UserID *int64
for _, client := range clients {
fcmDelivID, err := h.deliverMessage(ctx, client, msg)
fcmDelivID, err := h.app.DeliverMessage(ctx, client, msg)
if err != nil {
_, err = h.database.CreateRetryDelivery(ctx, client, msg)
if err != nil {
@ -306,15 +305,3 @@ func (h MessageHandler) sendMessageInternal(ctx *logic.AppContext, UserID *int64
SCNMessageID: msg.SCNMessageID,
}))
}
func (h MessageHandler) deliverMessage(ctx *logic.AppContext, client models.Client, msg models.Message) (*string, error) {
if client.FCMToken != nil {
fcmDelivID, err := h.app.Firebase.SendNotification(ctx, client, msg)
if err != nil {
return nil, err
}
return langext.Ptr(fcmDelivID), nil
} else {
return langext.Ptr(""), nil
}
}

View File

@ -7,6 +7,7 @@ import (
"blackforestbytes.com/simplecloudnotifier/common/ginext"
"blackforestbytes.com/simplecloudnotifier/db"
"blackforestbytes.com/simplecloudnotifier/firebase"
"blackforestbytes.com/simplecloudnotifier/jobs"
"blackforestbytes.com/simplecloudnotifier/logic"
"fmt"
"github.com/rs/zerolog/log"
@ -36,7 +37,9 @@ func main() {
fb := firebase.NewFirebaseApp()
app.Init(conf, ginengine, &fb)
jobRetry := jobs.NewDeliveryRetryJob(app)
app.Init(conf, ginengine, &fb, []logic.Job{jobRetry})
router.Init(ginengine)

View File

@ -1,6 +1,7 @@
package db
import (
scn "blackforestbytes.com/simplecloudnotifier"
"blackforestbytes.com/simplecloudnotifier/models"
"gogs.mikescher.com/BlackForestBytes/goext/langext"
"time"
@ -13,7 +14,7 @@ func (db *Database) CreateRetryDelivery(ctx TxContext, client models.Client, msg
}
now := time.Now().UTC()
next := now.Add(5 * time.Second)
next := scn.NextDeliveryTimestamp(now)
res, err := tx.ExecContext(ctx, "INSERT INTO deliveries (scn_message_id, receiver_user_id, receiver_client_id, timestamp_created, timestamp_finalized, status, fcm_message_id, next_delivery) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
msg.SCNMessageID,
@ -86,3 +87,92 @@ func (db *Database) CreateSuccessDelivery(ctx TxContext, client models.Client, m
FCMMessageID: langext.Ptr(fcmDelivID),
}, nil
}
func (db *Database) ListRetrieableDeliveries(ctx TxContext, pageSize int) ([]models.Delivery, error) {
tx, err := ctx.GetOrCreateTransaction(db)
if err != nil {
return nil, err
}
rows, err := tx.QueryContext(ctx, "SELECT * FROM deliveries WHERE status = 'RETRY' AND next_delivery < ? LIMIT ?",
time2DB(time.Now()),
pageSize)
if err != nil {
return nil, err
}
data, err := models.DecodeDeliveries(rows)
if err != nil {
return nil, err
}
return data, nil
}
func (db *Database) SetDeliverySuccess(ctx TxContext, delivery models.Delivery, fcmDelivID string) error {
tx, err := ctx.GetOrCreateTransaction(db)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "UPDATE deliveries SET status = 'SUCCESS', next_delivery = NULL, retry_count = ?, timestamp_finalized = ?, fcm_message_id = ? WHERE delivery_id = ?",
delivery.RetryCount+1,
time2DB(time.Now()),
fcmDelivID,
delivery.DeliveryID)
if err != nil {
return err
}
return nil
}
func (db *Database) SetDeliveryFailed(ctx TxContext, delivery models.Delivery) error {
tx, err := ctx.GetOrCreateTransaction(db)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "UPDATE deliveries SET status = 'FAILED', next_delivery = NULL, retry_count = ?, timestamp_finalized = ? WHERE delivery_id = ?",
delivery.RetryCount+1,
time2DB(time.Now()),
delivery.DeliveryID)
if err != nil {
return err
}
return nil
}
func (db *Database) SetDeliveryRetry(ctx TxContext, delivery models.Delivery) error {
tx, err := ctx.GetOrCreateTransaction(db)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "UPDATE deliveries SET status = 'RETRY', next_delivery = ?, retry_count = ? WHERE delivery_id = ?",
scn.NextDeliveryTimestamp(time.Now()),
delivery.RetryCount+1,
delivery.DeliveryID)
if err != nil {
return err
}
return nil
}
func (db *Database) CancelPendingDeliveries(ctx TxContext, scnMessageID int64) error {
tx, err := ctx.GetOrCreateTransaction(db)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "UPDATE deliveries SET status = 'FAILED', next_delivery = NULL, timestamp_finalized = ? WHERE scn_message_id = ? AND status = 'RETRY'",
time.Now(),
scnMessageID)
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,145 @@
package jobs
import (
"blackforestbytes.com/simplecloudnotifier/logic"
"blackforestbytes.com/simplecloudnotifier/models"
"github.com/rs/zerolog/log"
"time"
)
type DeliveryRetryJob struct {
app *logic.Application
running bool
stopChannel chan bool
}
func NewDeliveryRetryJob(app *logic.Application) *DeliveryRetryJob {
return &DeliveryRetryJob{
app: app,
running: true,
stopChannel: make(chan bool, 8),
}
}
func (j *DeliveryRetryJob) Start() {
if !j.running {
panic("cannot re-start job")
}
go j.mainLoop()
}
func (j *DeliveryRetryJob) Stop() {
j.running = false
}
func (j *DeliveryRetryJob) mainLoop() {
fastRerun := false
for j.running {
if fastRerun {
j.sleep(1 * time.Second)
} else {
j.sleep(30 * time.Second)
}
if !j.running {
return
}
fastRerun = j.run()
}
}
func (j *DeliveryRetryJob) run() bool {
defer func() {
if rec := recover(); rec != nil {
log.Error().Interface("rec", rec).Msg("Recovered panic in DeliveryRetryJob")
}
}()
ctx := j.app.NewSimpleTransactionContext(10 * time.Second)
defer ctx.Cancel()
deliveries, err := j.app.Database.ListRetrieableDeliveries(ctx, 32)
if err != nil {
log.Err(err).Msg("Failed to query retrieable deliveries")
return false
}
err = ctx.CommitTransaction()
if err != nil {
log.Err(err).Msg("Failed to commit")
return false
}
if len(deliveries) == 32 {
log.Warn().Msg("The delivery pipeline is greater than 32 (too much for a single cycle)")
}
for _, delivery := range deliveries {
j.redeliver(ctx, delivery)
}
return len(deliveries) == 32
}
func (j *DeliveryRetryJob) redeliver(ctx *logic.SimpleContext, delivery models.Delivery) {
client, err := j.app.Database.GetClient(ctx, delivery.ReceiverUserID, delivery.ReceiverClientID)
if err != nil {
log.Err(err).Int64("ReceiverUserID", delivery.ReceiverUserID).Int64("ReceiverClientID", delivery.ReceiverClientID).Msg("Failed to get client")
ctx.RollbackTransaction()
return
}
msg, err := j.app.Database.GetMessage(ctx, delivery.SCNMessageID)
if err != nil {
log.Err(err).Int64("SCNMessageID", delivery.SCNMessageID).Msg("Failed to get message")
ctx.RollbackTransaction()
return
}
fcmDelivID, err := j.app.DeliverMessage(ctx, client, msg)
if err != nil {
err = j.app.Database.SetDeliverySuccess(ctx, delivery, *fcmDelivID)
if err != nil {
log.Err(err).Int64("SCNMessageID", delivery.SCNMessageID).Int64("DeliveryID", delivery.DeliveryID).Msg("Failed to update delivery")
ctx.RollbackTransaction()
return
}
} else if delivery.RetryCount+1 > delivery.MaxRetryCount() {
err = j.app.Database.SetDeliveryFailed(ctx, delivery)
if err != nil {
log.Err(err).Int64("SCNMessageID", delivery.SCNMessageID).Int64("DeliveryID", delivery.DeliveryID).Msg("Failed to update delivery")
ctx.RollbackTransaction()
return
}
} else {
err = j.app.Database.SetDeliveryRetry(ctx, delivery)
if err != nil {
log.Err(err).Int64("SCNMessageID", delivery.SCNMessageID).Int64("DeliveryID", delivery.DeliveryID).Msg("Failed to update delivery")
ctx.RollbackTransaction()
return
}
}
err = ctx.CommitTransaction()
}
func (j *DeliveryRetryJob) sleep(d time.Duration) {
if !j.running {
return
}
afterCh := time.After(d)
for {
select {
case <-j.stopChannel:
j.stopChannel <- true
return
case <-afterCh:
return
}
}
}

View File

@ -29,6 +29,7 @@ type Application struct {
Database *db.Database
Firebase *firebase.App
DefaultChannel string
Jobs []Job
}
func NewApp(db *db.Database) *Application {
@ -38,10 +39,11 @@ func NewApp(db *db.Database) *Application {
}
}
func (app *Application) Init(cfg scn.Config, g *gin.Engine, fb *firebase.App) {
func (app *Application) Init(cfg scn.Config, g *gin.Engine, fb *firebase.App, jobs []Job) {
app.Config = cfg
app.Gin = g
app.Firebase = fb
app.Jobs = jobs
}
func (app *Application) Run() {
@ -59,22 +61,33 @@ func (app *Application) Run() {
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
for _, job := range app.Jobs {
job.Start()
}
select {
case <-stop:
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
log.Info().Msg("Stopping HTTP-Server")
err := httpserver.Shutdown(ctx)
if err != nil {
log.Info().Err(err).Msg("Error while stopping the http-server")
return
} else {
log.Info().Msg("Stopped HTTP-Server")
}
log.Info().Msg("Stopped HTTP-Server")
case err := <-errChan:
log.Error().Err(err).Msg("HTTP-Server failed")
}
for _, job := range app.Jobs {
job.Start()
}
}
func (app *Application) GenerateRandomAuthKey() string {
@ -147,6 +160,11 @@ func (app *Application) StartRequest(g *gin.Context, uri any, query any, body an
return actx, nil
}
func (app *Application) NewSimpleTransactionContext(timeout time.Duration) *SimpleContext {
ictx, cancel := context.WithTimeout(context.Background(), timeout)
return CreateSimpleContext(ictx, cancel)
}
func (app *Application) getPermissions(ctx *AppContext, hdr string) (PermissionSet, error) {
if hdr == "" {
return NewEmptyPermissions(), nil
@ -222,3 +240,15 @@ func (app *Application) NormalizeUsername(v string) string {
return v
}
func (app *Application) DeliverMessage(ctx context.Context, client models.Client, msg models.Message) (*string, error) {
if client.FCMToken != nil {
fcmDelivID, err := app.Firebase.SendNotification(ctx, client, msg)
if err != nil {
return nil, err
}
return langext.Ptr(fcmDelivID), nil
} else {
return langext.Ptr(""), nil
}
}

6
server/logic/jobs.go Normal file
View File

@ -0,0 +1,6 @@
package logic
type Job interface {
Start()
Stop()
}

View File

@ -0,0 +1,94 @@
package logic
import (
"blackforestbytes.com/simplecloudnotifier/db"
"context"
"database/sql"
"errors"
"github.com/rs/zerolog/log"
"time"
)
type SimpleContext struct {
inner context.Context
cancelFunc context.CancelFunc
cancelled bool
transaction *sql.Tx
}
func CreateSimpleContext(innerCtx context.Context, cancelFn context.CancelFunc) *SimpleContext {
return &SimpleContext{
inner: innerCtx,
cancelFunc: cancelFn,
cancelled: false,
transaction: nil,
}
}
func (sc *SimpleContext) Deadline() (deadline time.Time, ok bool) {
return sc.inner.Deadline()
}
func (sc *SimpleContext) Done() <-chan struct{} {
return sc.inner.Done()
}
func (sc *SimpleContext) Err() error {
return sc.inner.Err()
}
func (sc *SimpleContext) Value(key any) any {
return sc.inner.Value(key)
}
func (sc *SimpleContext) Cancel() {
sc.cancelled = true
if sc.transaction != nil {
log.Error().Msg("Rollback transaction")
err := sc.transaction.Rollback()
if err != nil {
panic("failed to rollback transaction: " + err.Error())
}
sc.transaction = nil
}
sc.cancelFunc()
}
func (sc *SimpleContext) GetOrCreateTransaction(db *db.Database) (*sql.Tx, error) {
if sc.cancelled {
return nil, errors.New("context cancelled")
}
if sc.transaction != nil {
return sc.transaction, nil
}
tx, err := db.BeginTx(sc)
if err != nil {
return nil, err
}
sc.transaction = tx
return tx, nil
}
func (sc *SimpleContext) CommitTransaction() error {
if sc.transaction == nil {
return nil
}
err := sc.transaction.Commit()
if err != nil {
return err
}
sc.transaction = nil
return nil
}
func (sc *SimpleContext) RollbackTransaction() {
if sc.transaction == nil {
return
}
err := sc.transaction.Rollback()
if err != nil {
panic(err)
}
sc.transaction = nil
return
}

View File

@ -43,6 +43,10 @@ func (d Delivery) JSON() DeliveryJSON {
}
}
func (d Delivery) MaxRetryCount() int {
return 5
}
type DeliveryJSON struct {
DeliveryID int64 `json:"delivery_id"`
SCNMessageID int64 `json:"scn_message_id"`

View File

@ -8,3 +8,7 @@ import (
func QuotaDayString() string {
return time.Now().In(timeext.TimezoneBerlin).Format("2006-01-02")
}
func NextDeliveryTimestamp(now time.Time) time.Time {
return now.Add(5 * time.Second)
}