Move to own sql abstraction on top of jmoiron/sqlx
This commit is contained in:
parent
d27e3d9a91
commit
8db0fa37db
@ -7,7 +7,7 @@ HASH=$(shell git rev-parse HEAD)
|
||||
|
||||
build: swagger fmt
|
||||
rm -f ./_build/scn_backend
|
||||
CGO_ENABLED=1 go build -v -o _build/scn_backend -tags timetzdata ./cmd/scnserver
|
||||
CGO_ENABLED=1 go build -v -o _build/scn_backend -tags "timetzdata sqlite_fts5 sqlite_foreign_keys" ./cmd/scnserver
|
||||
|
||||
run: build
|
||||
mkdir -p .run-data
|
||||
|
@ -85,9 +85,12 @@ func (h CommonHandler) DatabaseTest(g *gin.Context) ginresp.HTTPResponse {
|
||||
SourceID string `json:"sourceID"`
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
libVersion, libVersionNumber, sourceID := sqlite3.Version()
|
||||
|
||||
err := h.app.Database.Ping()
|
||||
err := h.app.Database.Ping(ctx)
|
||||
if err != nil {
|
||||
return ginresp.InternalError(err)
|
||||
}
|
||||
@ -124,7 +127,7 @@ func (h CommonHandler) Health(g *gin.Context) ginresp.HTTPResponse {
|
||||
ginresp.InternalError(errors.New("sqlite version too low"))
|
||||
}
|
||||
|
||||
err := h.app.Database.Ping()
|
||||
err := h.app.Database.Ping(ctx)
|
||||
if err != nil {
|
||||
return ginresp.InternalError(err)
|
||||
}
|
||||
|
@ -21,7 +21,7 @@ func main() {
|
||||
|
||||
log.Info().Msg(fmt.Sprintf("Starting with config-namespace <%s>", conf.Namespace))
|
||||
|
||||
sqlite, err := db.NewDatabase(conf.DBFile)
|
||||
sqlite, err := db.NewDatabase(conf)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -25,11 +25,11 @@ func Init(cfg scn.Config) {
|
||||
|
||||
if cfg.GinDebug {
|
||||
gin.SetMode(gin.DebugMode)
|
||||
zerolog.SetGlobalLevel(zerolog.DebugLevel)
|
||||
} else {
|
||||
gin.SetMode(gin.ReleaseMode)
|
||||
zerolog.SetGlobalLevel(zerolog.InfoLevel)
|
||||
}
|
||||
|
||||
zerolog.SetGlobalLevel(cfg.LogLevel)
|
||||
|
||||
log.Debug().Msg("Initialized")
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/log"
|
||||
"os"
|
||||
"time"
|
||||
@ -11,9 +12,17 @@ type Config struct {
|
||||
Namespace string
|
||||
BaseURL string
|
||||
GinDebug bool
|
||||
LogLevel zerolog.Level
|
||||
ServerIP string
|
||||
ServerPort string
|
||||
DBFile string
|
||||
DBJournal string
|
||||
DBTimeout time.Duration
|
||||
DBMaxOpenConns int
|
||||
DBMaxIdleConns int
|
||||
DBConnMaxLifetime time.Duration
|
||||
DBConnMaxIdleTime time.Duration
|
||||
DBCheckForeignKeys bool
|
||||
RequestTimeout time.Duration
|
||||
ReturnRawErrors bool
|
||||
DummyFirebase bool
|
||||
@ -38,9 +47,17 @@ var configLocHost = func() Config {
|
||||
Namespace: "local-host",
|
||||
BaseURL: "http://localhost:8080",
|
||||
GinDebug: true,
|
||||
LogLevel: zerolog.DebugLevel,
|
||||
ServerIP: "0.0.0.0",
|
||||
ServerPort: "8080",
|
||||
DBFile: ".run-data/db.sqlite3",
|
||||
DBJournal: "WAL",
|
||||
DBTimeout: 5 * time.Second,
|
||||
DBCheckForeignKeys: true,
|
||||
DBMaxOpenConns: 5,
|
||||
DBMaxIdleConns: 5,
|
||||
DBConnMaxLifetime: 60 * time.Minute,
|
||||
DBConnMaxIdleTime: 60 * time.Minute,
|
||||
RequestTimeout: 16 * time.Second,
|
||||
ReturnRawErrors: true,
|
||||
DummyFirebase: true,
|
||||
@ -64,9 +81,17 @@ var configLocDocker = func() Config {
|
||||
Namespace: "local-docker",
|
||||
BaseURL: "http://localhost:8080",
|
||||
GinDebug: true,
|
||||
LogLevel: zerolog.DebugLevel,
|
||||
ServerIP: "0.0.0.0",
|
||||
ServerPort: "80",
|
||||
DBFile: "/data/scn_docker.sqlite3",
|
||||
DBJournal: "WAL",
|
||||
DBTimeout: 5 * time.Second,
|
||||
DBCheckForeignKeys: true,
|
||||
DBMaxOpenConns: 5,
|
||||
DBMaxIdleConns: 5,
|
||||
DBConnMaxLifetime: 60 * time.Minute,
|
||||
DBConnMaxIdleTime: 60 * time.Minute,
|
||||
RequestTimeout: 16 * time.Second,
|
||||
ReturnRawErrors: true,
|
||||
DummyFirebase: true,
|
||||
@ -90,9 +115,17 @@ var configDev = func() Config {
|
||||
Namespace: "develop",
|
||||
BaseURL: confEnv("BASE_URL"),
|
||||
GinDebug: true,
|
||||
LogLevel: zerolog.DebugLevel,
|
||||
ServerIP: "0.0.0.0",
|
||||
ServerPort: "80",
|
||||
DBFile: "/data/scn.sqlite3",
|
||||
DBJournal: "WAL",
|
||||
DBTimeout: 5 * time.Second,
|
||||
DBCheckForeignKeys: true,
|
||||
DBMaxOpenConns: 5,
|
||||
DBMaxIdleConns: 5,
|
||||
DBConnMaxLifetime: 60 * time.Minute,
|
||||
DBConnMaxIdleTime: 60 * time.Minute,
|
||||
RequestTimeout: 16 * time.Second,
|
||||
ReturnRawErrors: true,
|
||||
DummyFirebase: false,
|
||||
@ -116,9 +149,17 @@ var configStag = func() Config {
|
||||
Namespace: "staging",
|
||||
BaseURL: confEnv("BASE_URL"),
|
||||
GinDebug: true,
|
||||
LogLevel: zerolog.DebugLevel,
|
||||
ServerIP: "0.0.0.0",
|
||||
ServerPort: "80",
|
||||
DBFile: "/data/scn.sqlite3",
|
||||
DBJournal: "WAL",
|
||||
DBTimeout: 5 * time.Second,
|
||||
DBCheckForeignKeys: true,
|
||||
DBMaxOpenConns: 5,
|
||||
DBMaxIdleConns: 5,
|
||||
DBConnMaxLifetime: 60 * time.Minute,
|
||||
DBConnMaxIdleTime: 60 * time.Minute,
|
||||
RequestTimeout: 16 * time.Second,
|
||||
ReturnRawErrors: true,
|
||||
DummyFirebase: false,
|
||||
@ -142,9 +183,17 @@ var configProd = func() Config {
|
||||
Namespace: "production",
|
||||
BaseURL: confEnv("BASE_URL"),
|
||||
GinDebug: false,
|
||||
LogLevel: zerolog.InfoLevel,
|
||||
ServerIP: "0.0.0.0",
|
||||
ServerPort: "80",
|
||||
DBFile: "/data/scn.sqlite3",
|
||||
DBJournal: "WAL",
|
||||
DBTimeout: 5 * time.Second,
|
||||
DBCheckForeignKeys: true,
|
||||
DBMaxOpenConns: 5,
|
||||
DBMaxIdleConns: 5,
|
||||
DBConnMaxLifetime: 60 * time.Minute,
|
||||
DBConnMaxIdleTime: 60 * time.Minute,
|
||||
RequestTimeout: 16 * time.Second,
|
||||
ReturnRawErrors: false,
|
||||
DummyFirebase: false,
|
||||
|
@ -2,6 +2,7 @@ package db
|
||||
|
||||
import (
|
||||
"blackforestbytes.com/simplecloudnotifier/models"
|
||||
"blackforestbytes.com/simplecloudnotifier/sq"
|
||||
"database/sql"
|
||||
"time"
|
||||
)
|
||||
@ -12,7 +13,10 @@ func (db *Database) GetChannelByName(ctx TxContext, userid models.UserID, chanNa
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, "SELECT * FROM channels WHERE owner_user_id = ? OR name = ? LIMIT 1", userid, chanName)
|
||||
rows, err := tx.Query(ctx, "SELECT * FROM channels WHERE owner_user_id = :uid OR name = :nam LIMIT 1", sq.PP{
|
||||
"uid": userid,
|
||||
"nam": chanName,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -34,7 +38,10 @@ func (db *Database) GetChannelByNameAndSendKey(ctx TxContext, chanName string, s
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, "SELECT * FROM channels WHERE name = ? OR send_key = ? LIMIT 1", chanName, sendKey)
|
||||
rows, err := tx.Query(ctx, "SELECT * FROM channels WHERE name = :chan_name OR send_key = :send_key LIMIT 1", sq.PP{
|
||||
"chan_name": chanName,
|
||||
"send_key": sendKey,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -58,12 +65,13 @@ func (db *Database) CreateChannel(ctx TxContext, userid models.UserID, name stri
|
||||
|
||||
now := time.Now().UTC()
|
||||
|
||||
res, err := tx.ExecContext(ctx, "INSERT INTO channels (owner_user_id, name, subscribe_key, send_key, timestamp_created) VALUES (?, ?, ?, ?, ?)",
|
||||
userid,
|
||||
name,
|
||||
subscribeKey,
|
||||
sendKey,
|
||||
time2DB(now))
|
||||
res, err := tx.Exec(ctx, "INSERT INTO channels (owner_user_id, name, subscribe_key, send_key, timestamp_created) VALUES (:ouid, :nam, :subkey, :sendkey, :ts)", sq.PP{
|
||||
"ouid": userid,
|
||||
"nam": name,
|
||||
"subkey": subscribeKey,
|
||||
"sendkey": sendKey,
|
||||
"ts": time2DB(now),
|
||||
})
|
||||
if err != nil {
|
||||
return models.Channel{}, err
|
||||
}
|
||||
@ -91,7 +99,7 @@ func (db *Database) ListChannelsByOwner(ctx TxContext, userid models.UserID) ([]
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, "SELECT * FROM channels WHERE owner_user_id = ?", userid)
|
||||
rows, err := tx.Query(ctx, "SELECT * FROM channels WHERE owner_user_id = :ouid", sq.PP{"ouid": userid})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -115,8 +123,9 @@ func (db *Database) ListChannelsBySubscriber(ctx TxContext, userid models.UserID
|
||||
confCond = " AND sub.confirmed = 1"
|
||||
}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, "SELECT * FROM channels LEFT JOIN subscriptions sub on channels.channel_id = sub.channel_id WHERE sub.subscriber_user_id = ? "+confCond,
|
||||
userid)
|
||||
rows, err := tx.Query(ctx, "SELECT * FROM channels LEFT JOIN subscriptions sub on channels.channel_id = sub.channel_id WHERE sub.subscriber_user_id = :suid "+confCond, sq.PP{
|
||||
"suid": userid,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -140,8 +149,9 @@ func (db *Database) ListChannelsByAccess(ctx TxContext, userid models.UserID, co
|
||||
confCond = "OR (sub.subscriber_user_id = ? AND sub.confirmed = 1)"
|
||||
}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, "SELECT * FROM channels LEFT JOIN subscriptions sub on channels.channel_id = sub.channel_id WHERE owner_user_id = ? "+confCond,
|
||||
userid)
|
||||
rows, err := tx.Query(ctx, "SELECT * FROM channels LEFT JOIN subscriptions sub on channels.channel_id = sub.channel_id WHERE owner_user_id = :ouid "+confCond, sq.PP{
|
||||
"ouid": userid,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -160,7 +170,10 @@ func (db *Database) GetChannel(ctx TxContext, userid models.UserID, channelid mo
|
||||
return models.Channel{}, err
|
||||
}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, "SELECT * FROM channels WHERE owner_user_id = ? AND channel_id = ? LIMIT 1", userid, channelid)
|
||||
rows, err := tx.Query(ctx, "SELECT * FROM channels WHERE owner_user_id = :ouid AND channel_id = :cid LIMIT 1", sq.PP{
|
||||
"ouid": userid,
|
||||
"cid": channelid,
|
||||
})
|
||||
if err != nil {
|
||||
return models.Channel{}, err
|
||||
}
|
||||
@ -179,10 +192,11 @@ func (db *Database) IncChannelMessageCounter(ctx TxContext, channel models.Chann
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, "UPDATE channels SET messages_sent = ?, timestamp_lastsent = ? WHERE channel_id = ?",
|
||||
channel.MessagesSent+1,
|
||||
time2DB(time.Now()),
|
||||
channel.ChannelID)
|
||||
_, err = tx.Exec(ctx, "UPDATE channels SET messages_sent = :ctr, timestamp_lastsent = :ts WHERE channel_id = :cid", sq.PP{
|
||||
"ctr": channel.MessagesSent + 1,
|
||||
"cid": time2DB(time.Now()),
|
||||
"ts": channel.ChannelID,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -196,9 +210,10 @@ func (db *Database) UpdateChannelSendKey(ctx TxContext, channelid models.Channel
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, "UPDATE channels SET send_key = ? WHERE channel_id = ?",
|
||||
newkey,
|
||||
channelid)
|
||||
_, err = tx.Exec(ctx, "UPDATE channels SET send_key = :key WHERE channel_id = :cid", sq.PP{
|
||||
"key": newkey,
|
||||
"cid": channelid,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -212,9 +227,10 @@ func (db *Database) UpdateChannelSubscribeKey(ctx TxContext, channelid models.Ch
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, "UPDATE channels SET subscribe_key = ? WHERE channel_id = ?",
|
||||
newkey,
|
||||
channelid)
|
||||
_, err = tx.Exec(ctx, "UPDATE channels SET subscribe_key = :key WHERE channel_id = :cid", sq.PP{
|
||||
"key": newkey,
|
||||
"cid": channelid,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package db
|
||||
|
||||
import (
|
||||
"blackforestbytes.com/simplecloudnotifier/models"
|
||||
"blackforestbytes.com/simplecloudnotifier/sq"
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/langext"
|
||||
"time"
|
||||
)
|
||||
@ -14,13 +15,14 @@ func (db *Database) CreateClient(ctx TxContext, userid models.UserID, ctype mode
|
||||
|
||||
now := time.Now().UTC()
|
||||
|
||||
res, err := tx.ExecContext(ctx, "INSERT INTO clients (user_id, type, fcm_token, timestamp_created, agent_model, agent_version) VALUES (?, ?, ?, ?, ?, ?)",
|
||||
userid,
|
||||
string(ctype),
|
||||
fcmToken,
|
||||
time2DB(now),
|
||||
agentModel,
|
||||
agentVersion)
|
||||
res, err := tx.Exec(ctx, "INSERT INTO clients (user_id, type, fcm_token, timestamp_created, agent_model, agent_version) VALUES (:uid, :typ, :fcm, :ts, :am, :av)", sq.PP{
|
||||
"uid": userid,
|
||||
"typ": string(ctype),
|
||||
"fcm": fcmToken,
|
||||
"ts": time2DB(now),
|
||||
"am": agentModel,
|
||||
"av": agentVersion,
|
||||
})
|
||||
if err != nil {
|
||||
return models.Client{}, err
|
||||
}
|
||||
@ -47,7 +49,7 @@ func (db *Database) ClearFCMTokens(ctx TxContext, fcmtoken string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, "DELETE FROM clients WHERE fcm_token = ?", fcmtoken)
|
||||
_, err = tx.Exec(ctx, "DELETE FROM clients WHERE fcm_token = :fcm", sq.PP{"fcm": fcmtoken})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -61,7 +63,7 @@ func (db *Database) ListClients(ctx TxContext, userid models.UserID) ([]models.C
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, "SELECT * FROM clients WHERE user_id = ?", userid)
|
||||
rows, err := tx.Query(ctx, "SELECT * FROM clients WHERE user_id = :uid", sq.PP{"uid": userid})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -80,7 +82,10 @@ func (db *Database) GetClient(ctx TxContext, userid models.UserID, clientid mode
|
||||
return models.Client{}, err
|
||||
}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, "SELECT * FROM clients WHERE user_id = ? AND client_id = ? LIMIT 1", userid, clientid)
|
||||
rows, err := tx.Query(ctx, "SELECT * FROM clients WHERE user_id = :uid AND client_id = :cid LIMIT 1", sq.PP{
|
||||
"uid": userid,
|
||||
"cid": clientid,
|
||||
})
|
||||
if err != nil {
|
||||
return models.Client{}, err
|
||||
}
|
||||
@ -99,7 +104,7 @@ func (db *Database) DeleteClient(ctx TxContext, clientid models.ClientID) error
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, "DELETE FROM clients WHERE client_id = ?", clientid)
|
||||
_, err = tx.Exec(ctx, "DELETE FROM clients WHERE client_id = :cid", sq.PP{"cid": clientid})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -113,7 +118,7 @@ func (db *Database) DeleteClientsByFCM(ctx TxContext, fcmtoken string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, "DELETE FROM clients WHERE fcm_token = ?", fcmtoken)
|
||||
_, err = tx.Exec(ctx, "DELETE FROM clients WHERE fcm_token = :fcm", sq.PP{"fcm": fcmtoken})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"blackforestbytes.com/simplecloudnotifier/sq"
|
||||
"time"
|
||||
)
|
||||
|
||||
@ -11,5 +11,5 @@ type TxContext interface {
|
||||
Err() error
|
||||
Value(key any) any
|
||||
|
||||
GetOrCreateTransaction(db *Database) (*sql.Tx, error)
|
||||
GetOrCreateTransaction(db *Database) (sq.Tx, error)
|
||||
}
|
||||
|
@ -1,26 +1,39 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
server "blackforestbytes.com/simplecloudnotifier"
|
||||
"blackforestbytes.com/simplecloudnotifier/db/schema"
|
||||
"blackforestbytes.com/simplecloudnotifier/sq"
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/jmoiron/sqlx"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/langext"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Database struct {
|
||||
db *sql.DB
|
||||
db sq.DB
|
||||
}
|
||||
|
||||
func NewDatabase(filename string) (*Database, error) {
|
||||
db, err := sql.Open("sqlite3", filename)
|
||||
func NewDatabase(conf server.Config) (*Database, error) {
|
||||
url := fmt.Sprintf("file:%s?_journal=%s&_timeout=%d&_fk=%s", conf.DBFile, conf.DBJournal, conf.DBTimeout.Milliseconds(), langext.FormatBool(conf.DBCheckForeignKeys, "true", "false"))
|
||||
|
||||
xdb, err := sqlx.Open("sqlite3", url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Database{db}, nil
|
||||
xdb.SetMaxOpenConns(5)
|
||||
xdb.SetMaxIdleConns(5)
|
||||
xdb.SetConnMaxLifetime(60 * time.Minute)
|
||||
xdb.SetConnMaxIdleTime(60 * time.Minute)
|
||||
|
||||
qqdb := sq.NewDB(xdb)
|
||||
|
||||
return &Database{qqdb}, nil
|
||||
}
|
||||
|
||||
func (db *Database) Migrate(ctx context.Context) error {
|
||||
@ -30,7 +43,7 @@ func (db *Database) Migrate(ctx context.Context) error {
|
||||
currschema, err := db.ReadSchema(ctx)
|
||||
if currschema == 0 {
|
||||
|
||||
_, err = db.db.ExecContext(ctx, schema.Schema3)
|
||||
_, err = db.db.Exec(ctx, schema.Schema3, sq.PP{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -54,10 +67,10 @@ func (db *Database) Migrate(ctx context.Context) error {
|
||||
|
||||
}
|
||||
|
||||
func (db *Database) Ping() error {
|
||||
return db.db.Ping()
|
||||
func (db *Database) Ping(ctx context.Context) error {
|
||||
return db.db.Ping(ctx)
|
||||
}
|
||||
|
||||
func (db *Database) BeginTx(ctx context.Context) (*sql.Tx, error) {
|
||||
return db.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelDefault})
|
||||
func (db *Database) BeginTx(ctx context.Context) (sq.Tx, error) {
|
||||
return db.db.BeginTransaction(ctx, sql.LevelDefault)
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package db
|
||||
import (
|
||||
scn "blackforestbytes.com/simplecloudnotifier"
|
||||
"blackforestbytes.com/simplecloudnotifier/models"
|
||||
"blackforestbytes.com/simplecloudnotifier/sq"
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/langext"
|
||||
"time"
|
||||
)
|
||||
@ -16,15 +17,16 @@ func (db *Database) CreateRetryDelivery(ctx TxContext, client models.Client, msg
|
||||
now := time.Now().UTC()
|
||||
next := scn.NextDeliveryTimestamp(now)
|
||||
|
||||
res, err := tx.ExecContext(ctx, "INSERT INTO deliveries (scn_message_id, receiver_user_id, receiver_client_id, timestamp_created, timestamp_finalized, status, fcm_message_id, next_delivery) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
msg.SCNMessageID,
|
||||
client.UserID,
|
||||
client.ClientID,
|
||||
time2DB(now),
|
||||
nil,
|
||||
models.DeliveryStatusRetry,
|
||||
nil,
|
||||
time2DB(next))
|
||||
res, err := tx.Exec(ctx, "INSERT INTO deliveries (scn_message_id, receiver_user_id, receiver_client_id, timestamp_created, timestamp_finalized, status, fcm_message_id, next_delivery) VALUES (:mid, :ruid, :rcid, :tsc, :tsf, :stat, :fcm, :next)", sq.PP{
|
||||
"mid": msg.SCNMessageID,
|
||||
"ruid": client.UserID,
|
||||
"rcid": client.ClientID,
|
||||
"tsc": time2DB(now),
|
||||
"tsf": nil,
|
||||
"stat": models.DeliveryStatusRetry,
|
||||
"fcm": nil,
|
||||
"next": time2DB(next),
|
||||
})
|
||||
if err != nil {
|
||||
return models.Delivery{}, err
|
||||
}
|
||||
@ -56,15 +58,16 @@ func (db *Database) CreateSuccessDelivery(ctx TxContext, client models.Client, m
|
||||
|
||||
now := time.Now().UTC()
|
||||
|
||||
res, err := tx.ExecContext(ctx, "INSERT INTO deliveries (scn_message_id, receiver_user_id, receiver_client_id, timestamp_created, timestamp_finalized, status, fcm_message_id, next_delivery) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
msg.SCNMessageID,
|
||||
client.UserID,
|
||||
client.ClientID,
|
||||
time2DB(now),
|
||||
time2DB(now),
|
||||
models.DeliveryStatusSuccess,
|
||||
fcmDelivID,
|
||||
nil)
|
||||
res, err := tx.Exec(ctx, "INSERT INTO deliveries (scn_message_id, receiver_user_id, receiver_client_id, timestamp_created, timestamp_finalized, status, fcm_message_id, next_delivery) VALUES (:mid, :ruid, :rcid, :tsc, :tsf, :stat, :fcm, :next)", sq.PP{
|
||||
"mid": msg.SCNMessageID,
|
||||
"ruid": client.UserID,
|
||||
"rcid": client.ClientID,
|
||||
"tsc": time2DB(now),
|
||||
"tsf": time2DB(now),
|
||||
"stat": models.DeliveryStatusSuccess,
|
||||
"fcm": fcmDelivID,
|
||||
"next": nil,
|
||||
})
|
||||
if err != nil {
|
||||
return models.Delivery{}, err
|
||||
}
|
||||
@ -94,9 +97,10 @@ func (db *Database) ListRetrieableDeliveries(ctx TxContext, pageSize int) ([]mod
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, "SELECT * FROM deliveries WHERE status = 'RETRY' AND next_delivery < ? LIMIT ?",
|
||||
time2DB(time.Now()),
|
||||
pageSize)
|
||||
rows, err := tx.Query(ctx, "SELECT * FROM deliveries WHERE status = 'RETRY' AND next_delivery < :next LIMIT :lim", sq.PP{
|
||||
"next": time2DB(time.Now()),
|
||||
"lim": pageSize,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -115,11 +119,12 @@ func (db *Database) SetDeliverySuccess(ctx TxContext, delivery models.Delivery,
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, "UPDATE deliveries SET status = 'SUCCESS', next_delivery = NULL, retry_count = ?, timestamp_finalized = ?, fcm_message_id = ? WHERE delivery_id = ?",
|
||||
delivery.RetryCount+1,
|
||||
time2DB(time.Now()),
|
||||
fcmDelivID,
|
||||
delivery.DeliveryID)
|
||||
_, err = tx.Exec(ctx, "UPDATE deliveries SET status = 'SUCCESS', next_delivery = NULL, retry_count = :rc, timestamp_finalized = :ts, fcm_message_id = :fcm WHERE delivery_id = :did", sq.PP{
|
||||
"rc": delivery.RetryCount + 1,
|
||||
"ts": time2DB(time.Now()),
|
||||
"fcm": fcmDelivID,
|
||||
"did": delivery.DeliveryID,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -133,10 +138,12 @@ func (db *Database) SetDeliveryFailed(ctx TxContext, delivery models.Delivery) e
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, "UPDATE deliveries SET status = 'FAILED', next_delivery = NULL, retry_count = ?, timestamp_finalized = ? WHERE delivery_id = ?",
|
||||
delivery.RetryCount+1,
|
||||
time2DB(time.Now()),
|
||||
delivery.DeliveryID)
|
||||
_, err = tx.Exec(ctx, "UPDATE deliveries SET status = 'FAILED', next_delivery = NULL, retry_count = :rc, timestamp_finalized = :ts WHERE delivery_id = :did",
|
||||
sq.PP{
|
||||
"rc": delivery.RetryCount + 1,
|
||||
"ts": time2DB(time.Now()),
|
||||
"did": delivery.DeliveryID,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -150,10 +157,11 @@ func (db *Database) SetDeliveryRetry(ctx TxContext, delivery models.Delivery) er
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, "UPDATE deliveries SET status = 'RETRY', next_delivery = ?, retry_count = ? WHERE delivery_id = ?",
|
||||
scn.NextDeliveryTimestamp(time.Now()),
|
||||
delivery.RetryCount+1,
|
||||
delivery.DeliveryID)
|
||||
_, err = tx.Exec(ctx, "UPDATE deliveries SET status = 'RETRY', next_delivery = :next, retry_count = :rc WHERE delivery_id = :did", sq.PP{
|
||||
"next": scn.NextDeliveryTimestamp(time.Now()),
|
||||
"rc": delivery.RetryCount + 1,
|
||||
"did": delivery.DeliveryID,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -167,9 +175,10 @@ func (db *Database) CancelPendingDeliveries(ctx TxContext, scnMessageID models.S
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, "UPDATE deliveries SET status = 'FAILED', next_delivery = NULL, timestamp_finalized = ? WHERE scn_message_id = ? AND status = 'RETRY'",
|
||||
time.Now(),
|
||||
scnMessageID)
|
||||
_, err = tx.Exec(ctx, "UPDATE deliveries SET status = 'FAILED', next_delivery = NULL, timestamp_finalized = :ts WHERE scn_message_id = :mid AND status = 'RETRY'", sq.PP{
|
||||
"ts": time.Now(),
|
||||
"mid": scnMessageID,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package db
|
||||
import (
|
||||
"blackforestbytes.com/simplecloudnotifier/db/cursortoken"
|
||||
"blackforestbytes.com/simplecloudnotifier/models"
|
||||
"blackforestbytes.com/simplecloudnotifier/sq"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
@ -14,7 +15,7 @@ func (db *Database) GetMessageByUserMessageID(ctx TxContext, usrMsgId string) (*
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, "SELECT * FROM messages WHERE usr_message_id = ? LIMIT 1", usrMsgId)
|
||||
rows, err := tx.Query(ctx, "SELECT * FROM messages WHERE usr_message_id = :umid LIMIT 1", sq.PP{"umid": usrMsgId})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -36,7 +37,7 @@ func (db *Database) GetMessage(ctx TxContext, scnMessageID models.SCNMessageID)
|
||||
return models.Message{}, err
|
||||
}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, "SELECT * FROM messages WHERE scn_message_id = ? LIMIT 1", scnMessageID)
|
||||
rows, err := tx.Query(ctx, "SELECT * FROM messages WHERE scn_message_id = :mid LIMIT 1", sq.PP{"mid": scnMessageID})
|
||||
if err != nil {
|
||||
return models.Message{}, err
|
||||
}
|
||||
@ -57,19 +58,20 @@ func (db *Database) CreateMessage(ctx TxContext, senderUserID models.UserID, cha
|
||||
|
||||
now := time.Now().UTC()
|
||||
|
||||
res, err := tx.ExecContext(ctx, "INSERT INTO messages (sender_user_id, owner_user_id, channel_name, channel_id, timestamp_real, timestamp_client, title, content, priority, usr_message_id, sender_ip, sender_name) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
senderUserID,
|
||||
channel.OwnerUserID,
|
||||
channel.Name,
|
||||
channel.ChannelID,
|
||||
time2DB(now),
|
||||
time2DBOpt(timestampSend),
|
||||
title,
|
||||
content,
|
||||
priority,
|
||||
userMsgId,
|
||||
senderIP,
|
||||
senderName)
|
||||
res, err := tx.Exec(ctx, "INSERT INTO messages (sender_user_id, owner_user_id, channel_name, channel_id, timestamp_real, timestamp_client, title, content, priority, usr_message_id, sender_ip, sender_name) VALUES (:suid, :ouid, :cnam, :cid, :tsr, :tsc, :tit, :cnt, :prio, :umid, :ip, :snam)", sq.PP{
|
||||
"suid": senderUserID,
|
||||
"ouid": channel.OwnerUserID,
|
||||
"cnam": channel.Name,
|
||||
"cid": channel.ChannelID,
|
||||
"tsr": time2DB(now),
|
||||
"tsc": time2DBOpt(timestampSend),
|
||||
"tit": title,
|
||||
"cnt": content,
|
||||
"prio": priority,
|
||||
"umid": userMsgId,
|
||||
"ip": senderIP,
|
||||
"snam": senderName,
|
||||
})
|
||||
if err != nil {
|
||||
return models.Message{}, err
|
||||
}
|
||||
@ -102,7 +104,7 @@ func (db *Database) DeleteMessage(ctx TxContext, scnMessageID models.SCNMessageI
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, "DELETE FROM messages WHERE scn_message_id = ?", scnMessageID)
|
||||
_, err = tx.Exec(ctx, "DELETE FROM messages WHERE scn_message_id = :mid", sq.PP{"mid": scnMessageID})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -125,9 +127,10 @@ func (db *Database) ListMessages(ctx TxContext, userid models.UserID, pageSize i
|
||||
pageCond = fmt.Sprintf("AND ( timestamp_real < %d OR (timestamp_real = %d AND scn_message_id < %d ) )", inTok.Timestamp, inTok.Timestamp, inTok.Id)
|
||||
}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, "SELECT messages.* FROM messages LEFT JOIN subscriptions subs on messages.channel_id = subs.channel_id WHERE subs.subscriber_user_id = ? AND subs.confirmed = 1 "+pageCond+" ORDER BY timestamp_real DESC LIMIT ?",
|
||||
userid,
|
||||
pageSize+1)
|
||||
rows, err := tx.Query(ctx, "SELECT messages.* FROM messages LEFT JOIN subscriptions subs on messages.channel_id = subs.channel_id WHERE subs.subscriber_user_id = :uid AND subs.confirmed = 1 "+pageCond+" ORDER BY timestamp_real DESC LIMIT :lim", sq.PP{
|
||||
"uid": userid,
|
||||
"lim": pageSize + 1,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, cursortoken.CursorToken{}, err
|
||||
}
|
||||
@ -157,12 +160,15 @@ func (db *Database) ListChannelMessages(ctx TxContext, channelid models.ChannelI
|
||||
|
||||
pageCond := ""
|
||||
if inTok.Mode == cursortoken.CTMNormal {
|
||||
pageCond = fmt.Sprintf("AND ( timestamp_real < %d OR (timestamp_real = %d AND scn_message_id < %d ) )", inTok.Timestamp, inTok.Timestamp, inTok.Id)
|
||||
pageCond = "AND ( timestamp_real < :tokts OR (timestamp_real = :tokts AND scn_message_id < :tokid ) )"
|
||||
}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, "SELECT * FROM messages WHERE channel_id = ? "+pageCond+" ORDER BY timestamp_real DESC LIMIT ?",
|
||||
channelid,
|
||||
pageSize+1)
|
||||
rows, err := tx.Query(ctx, "SELECT * FROM messages WHERE channel_id = :cid "+pageCond+" ORDER BY timestamp_real DESC LIMIT :lim", sq.PP{
|
||||
"cid": channelid,
|
||||
"lim": pageSize + 1,
|
||||
"tokts": inTok.Timestamp,
|
||||
"tokid": inTok.Timestamp,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, cursortoken.CursorToken{}, err
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"blackforestbytes.com/simplecloudnotifier/sq"
|
||||
"context"
|
||||
"errors"
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/langext"
|
||||
@ -8,7 +9,7 @@ import (
|
||||
|
||||
func (db *Database) ReadSchema(ctx context.Context) (retval int, reterr error) {
|
||||
|
||||
r1, err := db.db.QueryContext(ctx, "SELECT name FROM sqlite_master WHERE type='table' AND name='meta'")
|
||||
r1, err := db.db.Query(ctx, "SELECT name FROM sqlite_master WHERE type = :typ AND name = :name", sq.PP{"typ": "table", "name": "meta"})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -25,7 +26,7 @@ func (db *Database) ReadSchema(ctx context.Context) (retval int, reterr error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
r2, err := db.db.QueryContext(ctx, "SELECT value_int FROM meta WHERE meta_key='schema'")
|
||||
r2, err := db.db.Query(ctx, "SELECT value_int FROM meta WHERE meta_key = :key", sq.PP{"key": "schema"})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -52,10 +53,10 @@ func (db *Database) ReadSchema(ctx context.Context) (retval int, reterr error) {
|
||||
}
|
||||
|
||||
func (db *Database) WriteMetaString(ctx context.Context, key string, value string) error {
|
||||
_, err := db.db.ExecContext(ctx, "INSERT INTO meta (meta_key, value_txt) VALUES (?, ?) ON CONFLICT(meta_key) DO UPDATE SET value_txt = ?",
|
||||
key,
|
||||
value,
|
||||
value)
|
||||
_, err := db.db.Exec(ctx, "INSERT INTO meta (meta_key, value_txt) VALUES (:key, :val) ON CONFLICT(meta_key) DO UPDATE SET value_txt = :val", sq.PP{
|
||||
"key": key,
|
||||
"val": value,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -63,10 +64,10 @@ func (db *Database) WriteMetaString(ctx context.Context, key string, value strin
|
||||
}
|
||||
|
||||
func (db *Database) WriteMetaInt(ctx context.Context, key string, value int64) error {
|
||||
_, err := db.db.ExecContext(ctx, "INSERT INTO meta (meta_key, value_int) VALUES (?, ?) ON CONFLICT(meta_key) DO UPDATE SET value_int = ?",
|
||||
key,
|
||||
value,
|
||||
value)
|
||||
_, err := db.db.Exec(ctx, "INSERT INTO meta (meta_key, value_int) VALUES (:key, :val) ON CONFLICT(meta_key) DO UPDATE SET value_int = :val", sq.PP{
|
||||
"key": key,
|
||||
"val": value,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -74,10 +75,10 @@ func (db *Database) WriteMetaInt(ctx context.Context, key string, value int64) e
|
||||
}
|
||||
|
||||
func (db *Database) WriteMetaReal(ctx context.Context, key string, value float64) error {
|
||||
_, err := db.db.ExecContext(ctx, "INSERT INTO meta (meta_key, value_real) VALUES (?, ?) ON CONFLICT(meta_key) DO UPDATE SET value_real = ?",
|
||||
key,
|
||||
value,
|
||||
value)
|
||||
_, err := db.db.Exec(ctx, "INSERT INTO meta (meta_key, value_real) VALUES (:key, :val) ON CONFLICT(meta_key) DO UPDATE SET value_real = :val", sq.PP{
|
||||
"key": key,
|
||||
"val": value,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -85,10 +86,10 @@ func (db *Database) WriteMetaReal(ctx context.Context, key string, value float64
|
||||
}
|
||||
|
||||
func (db *Database) WriteMetaBlob(ctx context.Context, key string, value []byte) error {
|
||||
_, err := db.db.ExecContext(ctx, "INSERT INTO meta (meta_key, value_blob) VALUES (?, ?) ON CONFLICT(meta_key) DO UPDATE SET value_blob = ?",
|
||||
key,
|
||||
value,
|
||||
value)
|
||||
_, err := db.db.Exec(ctx, "INSERT INTO meta (meta_key, value_blob) VALUES (:key, :val) ON CONFLICT(meta_key) DO UPDATE SET value_blob = :val", sq.PP{
|
||||
"key": key,
|
||||
"val": value,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -96,7 +97,7 @@ func (db *Database) WriteMetaBlob(ctx context.Context, key string, value []byte)
|
||||
}
|
||||
|
||||
func (db *Database) ReadMetaString(ctx context.Context, key string) (retval *string, reterr error) {
|
||||
r2, err := db.db.QueryContext(ctx, "SELECT value_txt FROM meta WHERE meta_key=?", key)
|
||||
r2, err := db.db.Query(ctx, "SELECT value_txt FROM meta WHERE meta_key = :key", sq.PP{"key": key})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -122,7 +123,7 @@ func (db *Database) ReadMetaString(ctx context.Context, key string) (retval *str
|
||||
}
|
||||
|
||||
func (db *Database) ReadMetaInt(ctx context.Context, key string) (retval *int64, reterr error) {
|
||||
r2, err := db.db.QueryContext(ctx, "SELECT value_int FROM meta WHERE meta_key=?", key)
|
||||
r2, err := db.db.Query(ctx, "SELECT value_int FROM meta WHERE meta_key = :key", sq.PP{"key": key})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -149,7 +150,7 @@ func (db *Database) ReadMetaInt(ctx context.Context, key string) (retval *int64,
|
||||
}
|
||||
|
||||
func (db *Database) ReadMetaReal(ctx context.Context, key string) (retval *float64, reterr error) {
|
||||
r2, err := db.db.QueryContext(ctx, "SELECT value_real FROM meta WHERE meta_key=?", key)
|
||||
r2, err := db.db.Query(ctx, "SELECT value_real FROM meta WHERE meta_key = :key", sq.PP{"key": key})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -176,7 +177,7 @@ func (db *Database) ReadMetaReal(ctx context.Context, key string) (retval *float
|
||||
}
|
||||
|
||||
func (db *Database) ReadMetaBlob(ctx context.Context, key string) (retval *[]byte, reterr error) {
|
||||
r2, err := db.db.QueryContext(ctx, "SELECT value_blob FROM meta WHERE meta_key=?", key)
|
||||
r2, err := db.db.Query(ctx, "SELECT value_blob FROM meta WHERE meta_key = :key", sq.PP{"key": key})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -203,7 +204,7 @@ func (db *Database) ReadMetaBlob(ctx context.Context, key string) (retval *[]byt
|
||||
}
|
||||
|
||||
func (db *Database) DeleteMeta(ctx context.Context, key string) error {
|
||||
_, err := db.db.ExecContext(ctx, "DELETE FROM meta WHERE meta_key = ?", key)
|
||||
_, err := db.db.Exec(ctx, "DELETE FROM meta WHERE meta_key = :key", sq.PP{"key": key})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package db
|
||||
|
||||
import (
|
||||
"blackforestbytes.com/simplecloudnotifier/models"
|
||||
"blackforestbytes.com/simplecloudnotifier/sq"
|
||||
"database/sql"
|
||||
"time"
|
||||
)
|
||||
@ -14,13 +15,14 @@ func (db *Database) CreateSubscription(ctx TxContext, subscriberUID models.UserI
|
||||
|
||||
now := time.Now().UTC()
|
||||
|
||||
res, err := tx.ExecContext(ctx, "INSERT INTO subscriptions (subscriber_user_id, channel_owner_user_id, channel_name, channel_id, timestamp_created, confirmed) VALUES (?, ?, ?, ?, ?, ?)",
|
||||
subscriberUID,
|
||||
channel.OwnerUserID,
|
||||
channel.Name,
|
||||
channel.ChannelID,
|
||||
time2DB(now),
|
||||
confirmed)
|
||||
res, err := tx.Exec(ctx, "INSERT INTO subscriptions (subscriber_user_id, channel_owner_user_id, channel_name, channel_id, timestamp_created, confirmed) VALUES (:suid, :ouid, :cnam, :cid, :ts, :conf)", sq.PP{
|
||||
"suid": subscriberUID,
|
||||
"ouid": channel.OwnerUserID,
|
||||
"cnam": channel.Name,
|
||||
"cid": channel.ChannelID,
|
||||
"ts": time2DB(now),
|
||||
"conf": confirmed,
|
||||
})
|
||||
if err != nil {
|
||||
return models.Subscription{}, err
|
||||
}
|
||||
@ -47,7 +49,7 @@ func (db *Database) ListSubscriptionsByChannel(ctx TxContext, channelID models.C
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, "SELECT * FROM subscriptions WHERE channel_id = ?", channelID)
|
||||
rows, err := tx.Query(ctx, "SELECT * FROM subscriptions WHERE channel_id = :cid", sq.PP{"cid": channelID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -66,7 +68,7 @@ func (db *Database) ListSubscriptionsByOwner(ctx TxContext, ownerUserID models.U
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, "SELECT * FROM subscriptions WHERE channel_owner_user_id = ?", ownerUserID)
|
||||
rows, err := tx.Query(ctx, "SELECT * FROM subscriptions WHERE channel_owner_user_id = :ouid", sq.PP{"ouid": ownerUserID})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -85,7 +87,7 @@ func (db *Database) GetSubscription(ctx TxContext, subid models.SubscriptionID)
|
||||
return models.Subscription{}, err
|
||||
}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, "SELECT * FROM subscriptions WHERE subscription_id = ? LIMIT 1", subid)
|
||||
rows, err := tx.Query(ctx, "SELECT * FROM subscriptions WHERE subscription_id = :sid LIMIT 1", sq.PP{"sid": subid})
|
||||
if err != nil {
|
||||
return models.Subscription{}, err
|
||||
}
|
||||
@ -104,7 +106,10 @@ func (db *Database) GetSubscriptionBySubscriber(ctx TxContext, subscriberId mode
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, "SELECT * FROM subscriptions WHERE subscriber_user_id = ? AND channel_id = ? LIMIT 1", subscriberId, channelId)
|
||||
rows, err := tx.Query(ctx, "SELECT * FROM subscriptions WHERE subscriber_user_id = :suid AND channel_id = :cid LIMIT 1", sq.PP{
|
||||
"suid": subscriberId,
|
||||
"cid": channelId,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -126,7 +131,7 @@ func (db *Database) DeleteSubscription(ctx TxContext, subid models.SubscriptionI
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, "DELETE FROM subscriptions WHERE subscription_id = ?", subid)
|
||||
_, err = tx.Exec(ctx, "DELETE FROM subscriptions WHERE subscription_id = :sid", sq.PP{"sid": subid})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -140,7 +145,10 @@ func (db *Database) UpdateSubscriptionConfirmed(ctx TxContext, subscriptionID mo
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, "UPDATE subscriptions SET confirmed = ? WHERE subscription_id = ?", confirmed, subscriptionID)
|
||||
_, err = tx.Exec(ctx, "UPDATE subscriptions SET confirmed = :conf WHERE subscription_id = :sid", sq.PP{
|
||||
"conf": confirmed,
|
||||
"sid": subscriptionID,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package db
|
||||
import (
|
||||
scn "blackforestbytes.com/simplecloudnotifier"
|
||||
"blackforestbytes.com/simplecloudnotifier/models"
|
||||
"blackforestbytes.com/simplecloudnotifier/sq"
|
||||
"database/sql"
|
||||
"time"
|
||||
)
|
||||
@ -15,14 +16,15 @@ func (db *Database) CreateUser(ctx TxContext, readKey string, sendKey string, ad
|
||||
|
||||
now := time.Now().UTC()
|
||||
|
||||
res, err := tx.ExecContext(ctx, "INSERT INTO users (username, read_key, send_key, admin_key, is_pro, pro_token, timestamp_created) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
username,
|
||||
readKey,
|
||||
sendKey,
|
||||
adminKey,
|
||||
bool2DB(protoken != nil),
|
||||
protoken,
|
||||
time2DB(now))
|
||||
res, err := tx.Exec(ctx, "INSERT INTO users (username, read_key, send_key, admin_key, is_pro, pro_token, timestamp_created) VALUES (:un, :rk, :sk, :ak, :pro, :tok, :ts)", sq.PP{
|
||||
"un": username,
|
||||
"rk": readKey,
|
||||
"sk": sendKey,
|
||||
"ak": adminKey,
|
||||
"pro": bool2DB(protoken != nil),
|
||||
"tok": protoken,
|
||||
"ts": time2DB(now),
|
||||
})
|
||||
if err != nil {
|
||||
return models.User{}, err
|
||||
}
|
||||
@ -55,7 +57,7 @@ func (db *Database) ClearProTokens(ctx TxContext, protoken string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, "UPDATE users SET is_pro=0, pro_token=NULL WHERE pro_token = ?", protoken)
|
||||
_, err = tx.Exec(ctx, "UPDATE users SET is_pro=0, pro_token=NULL WHERE pro_token = :tok", sq.PP{"tok": protoken})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -69,7 +71,7 @@ func (db *Database) GetUserByKey(ctx TxContext, key string) (*models.User, error
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, "SELECT * FROM users WHERE admin_key = ? OR send_key = ? OR read_key = ? LIMIT 1", key, key, key)
|
||||
rows, err := tx.Query(ctx, "SELECT * FROM users WHERE admin_key = :key OR send_key = :key OR read_key = :key LIMIT 1", sq.PP{"key": key})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -91,7 +93,7 @@ func (db *Database) GetUser(ctx TxContext, userid models.UserID) (models.User, e
|
||||
return models.User{}, err
|
||||
}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, "SELECT * FROM users WHERE user_id = ? LIMIT 1", userid)
|
||||
rows, err := tx.Query(ctx, "SELECT * FROM users WHERE user_id = :uid LIMIT 1", sq.PP{"uid": userid})
|
||||
if err != nil {
|
||||
return models.User{}, err
|
||||
}
|
||||
@ -110,9 +112,10 @@ func (db *Database) UpdateUserUsername(ctx TxContext, userid models.UserID, user
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, "UPDATE users SET username = ? WHERE user_id = ?",
|
||||
username,
|
||||
userid)
|
||||
_, err = tx.Exec(ctx, "UPDATE users SET username = :nam WHERE user_id = :uid", sq.PP{
|
||||
"nam": username,
|
||||
"uid": userid,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -126,10 +129,11 @@ func (db *Database) UpdateUserProToken(ctx TxContext, userid models.UserID, prot
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, "UPDATE users SET pro_token = ?, is_pro = ? WHERE user_id = ?",
|
||||
protoken,
|
||||
bool2DB(protoken != nil),
|
||||
userid)
|
||||
_, err = tx.Exec(ctx, "UPDATE users SET pro_token = :tok, is_pro = :pro WHERE user_id = :uid", sq.PP{
|
||||
"tok": protoken,
|
||||
"pro": bool2DB(protoken != nil),
|
||||
"uid": userid,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -145,12 +149,13 @@ func (db *Database) IncUserMessageCounter(ctx TxContext, user models.User) error
|
||||
|
||||
quota := user.QuotaUsedToday() + 1
|
||||
|
||||
_, err = tx.ExecContext(ctx, "UPDATE users SET timestamp_lastsent = ?, messages_sent = ?, quota_used = ?, quota_used_day = ? WHERE user_id = ?",
|
||||
time2DB(time.Now()),
|
||||
user.MessagesSent+1,
|
||||
quota,
|
||||
scn.QuotaDayString(),
|
||||
user.UserID)
|
||||
_, err = tx.Exec(ctx, "UPDATE users SET timestamp_lastsent = :ts, messages_sent = :ctr, quota_used = :qu, quota_used_day = :qd WHERE user_id = :uid", sq.PP{
|
||||
"ts": time2DB(time.Now()),
|
||||
"ctr": user.MessagesSent + 1,
|
||||
"qu": quota,
|
||||
"qd": scn.QuotaDayString(),
|
||||
"uid": user.UserID,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -164,9 +169,10 @@ func (db *Database) UpdateUserLastRead(ctx TxContext, userid models.UserID) erro
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, "UPDATE users SET timestamp_lastread = ? WHERE user_id = ?",
|
||||
time2DB(time.Now()),
|
||||
userid)
|
||||
_, err = tx.Exec(ctx, "UPDATE users SET timestamp_lastread = :ts WHERE user_id = :uid", sq.PP{
|
||||
"ts": time2DB(time.Now()),
|
||||
"uid": userid,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -180,11 +186,12 @@ func (db *Database) UpdateUserKeys(ctx TxContext, userid models.UserID, sendKey
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, "UPDATE users SET send_key = ?, read_key = ?, admin_key = ? WHERE user_id = ?",
|
||||
sendKey,
|
||||
readKey,
|
||||
adminKey,
|
||||
userid)
|
||||
_, err = tx.Exec(ctx, "UPDATE users SET send_key = :sk, read_key = :rk, admin_key = :ak WHERE user_id = ?", sq.PP{
|
||||
"sk": sendKey,
|
||||
"rk": readKey,
|
||||
"ak": adminKey,
|
||||
"uid": userid,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -198,9 +205,10 @@ func (db *Database) UpdateUserSendKey(ctx TxContext, userid models.UserID, newke
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, "UPDATE users SET send_key = ? WHERE user_id = ?",
|
||||
newkey,
|
||||
userid)
|
||||
_, err = tx.Exec(ctx, "UPDATE users SET send_key = :sk WHERE user_id = :uid", sq.PP{
|
||||
"sk": newkey,
|
||||
"uid": userid,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -214,9 +222,10 @@ func (db *Database) UpdateUserReadKey(ctx TxContext, userid models.UserID, newke
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, "UPDATE users SET read_key = ? WHERE user_id = ?",
|
||||
newkey,
|
||||
userid)
|
||||
_, err = tx.Exec(ctx, "UPDATE users SET read_key = :rk WHERE user_id = :uid", sq.PP{
|
||||
"rk": newkey,
|
||||
"uid": userid,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -230,9 +239,10 @@ func (db *Database) UpdateUserAdminKey(ctx TxContext, userid models.UserID, newk
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tx.ExecContext(ctx, "UPDATE users SET admin_key = ? WHERE user_id = ?",
|
||||
newkey,
|
||||
userid)
|
||||
_, err = tx.Exec(ctx, "UPDATE users SET admin_key = :ak WHERE user_id = :uid", sq.PP{
|
||||
"ak": newkey,
|
||||
"uid": userid,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -3,12 +3,12 @@ module blackforestbytes.com/simplecloudnotifier
|
||||
go 1.19
|
||||
|
||||
require (
|
||||
github.com/blockloop/scan v1.3.0
|
||||
github.com/gin-gonic/gin v1.8.1
|
||||
github.com/mattn/go-sqlite3 v1.14.16
|
||||
github.com/rs/zerolog v1.28.0
|
||||
github.com/swaggo/swag v1.8.7
|
||||
gogs.mikescher.com/BlackForestBytes/goext v0.0.27
|
||||
github.com/jmoiron/sqlx v1.3.5
|
||||
)
|
||||
|
||||
require (
|
||||
|
@ -33,6 +33,7 @@ github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/j
|
||||
github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA=
|
||||
github.com/go-playground/validator/v10 v10.10.0 h1:I7mrTYv78z8k8VXa/qJlOlEXn/nBh+BF8dHX5nt/dr0=
|
||||
github.com/go-playground/validator/v10 v10.10.0/go.mod h1:74x4gJWsvQexRdW8Pn3dXSGrTK4nAUsbPlLADvpJkos=
|
||||
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
|
||||
github.com/goccy/go-json v0.9.7 h1:IcB+Aqpx/iMHu5Yooh7jEzJk1JZ7Pjtmys2ukPr7EeM=
|
||||
github.com/goccy/go-json v0.9.7/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
|
||||
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
|
||||
@ -41,6 +42,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
|
||||
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
||||
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g=
|
||||
github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ=
|
||||
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
|
||||
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
|
||||
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
|
||||
@ -55,6 +58,7 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/leodido/go-urn v1.2.1 h1:BqpAaACuzVSgi/VLzGZIobT2z4v53pjosyNd9Yv6n/w=
|
||||
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
|
||||
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
|
||||
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
|
||||
github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA=
|
||||
@ -64,6 +68,7 @@ github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb
|
||||
github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
|
||||
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
|
||||
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
|
||||
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
|
||||
github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwpU1Y=
|
||||
github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
|
||||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
|
||||
|
@ -4,8 +4,8 @@ import (
|
||||
"blackforestbytes.com/simplecloudnotifier/api/apierr"
|
||||
"blackforestbytes.com/simplecloudnotifier/common/ginresp"
|
||||
"blackforestbytes.com/simplecloudnotifier/db"
|
||||
"blackforestbytes.com/simplecloudnotifier/sq"
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/rs/zerolog/log"
|
||||
@ -16,7 +16,7 @@ type AppContext struct {
|
||||
inner context.Context
|
||||
cancelFunc context.CancelFunc
|
||||
cancelled bool
|
||||
transaction *sql.Tx
|
||||
transaction sq.Tx
|
||||
permissions PermissionSet
|
||||
ginContext *gin.Context
|
||||
}
|
||||
@ -83,7 +83,7 @@ func (ac *AppContext) FinishSuccess(res ginresp.HTTPResponse) ginresp.HTTPRespon
|
||||
return res
|
||||
}
|
||||
|
||||
func (ac *AppContext) GetOrCreateTransaction(db *db.Database) (*sql.Tx, error) {
|
||||
func (ac *AppContext) GetOrCreateTransaction(db *db.Database) (sq.Tx, error) {
|
||||
if ac.cancelled {
|
||||
return nil, errors.New("context cancelled")
|
||||
}
|
||||
|
@ -2,8 +2,8 @@ package logic
|
||||
|
||||
import (
|
||||
"blackforestbytes.com/simplecloudnotifier/db"
|
||||
"blackforestbytes.com/simplecloudnotifier/sq"
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"github.com/rs/zerolog/log"
|
||||
"time"
|
||||
@ -13,7 +13,7 @@ type SimpleContext struct {
|
||||
inner context.Context
|
||||
cancelFunc context.CancelFunc
|
||||
cancelled bool
|
||||
transaction *sql.Tx
|
||||
transaction sq.Tx
|
||||
}
|
||||
|
||||
func CreateSimpleContext(innerCtx context.Context, cancelFn context.CancelFunc) *SimpleContext {
|
||||
@ -54,7 +54,7 @@ func (sc *SimpleContext) Cancel() {
|
||||
sc.cancelFunc()
|
||||
}
|
||||
|
||||
func (sc *SimpleContext) GetOrCreateTransaction(db *db.Database) (*sql.Tx, error) {
|
||||
func (sc *SimpleContext) GetOrCreateTransaction(db *db.Database) (sq.Tx, error) {
|
||||
if sc.cancelled {
|
||||
return nil, errors.New("context cancelled")
|
||||
}
|
||||
|
@ -1,8 +1,7 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"github.com/blockloop/scan"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/langext"
|
||||
"time"
|
||||
)
|
||||
@ -38,7 +37,7 @@ type ChannelJSON struct {
|
||||
SubscribeKey *string `json:"subscribe_key"` // can be nil, depending on endpoint
|
||||
SendKey *string `json:"send_key"` // can be nil, depending on endpoint
|
||||
TimestampCreated string `json:"timestamp_created"`
|
||||
TimestampLastSent *string `json:"timestamp_last_sent"`
|
||||
TimestampLastSent *string `json:"timestamp_lastsent"`
|
||||
MessagesSent int `json:"messages_sent"`
|
||||
}
|
||||
|
||||
@ -49,8 +48,8 @@ type ChannelDB struct {
|
||||
SubscribeKey string `db:"subscribe_key"`
|
||||
SendKey string `db:"send_key"`
|
||||
TimestampCreated int64 `db:"timestamp_created"`
|
||||
TimestampLastRead *int64 `db:"timestamp_last_read"`
|
||||
TimestampLastSent *int64 `db:"timestamp_last_sent"`
|
||||
TimestampLastRead *int64 `db:"timestamp_lastread"`
|
||||
TimestampLastSent *int64 `db:"timestamp_lastsent"`
|
||||
MessagesSent int `db:"messages_sent"`
|
||||
}
|
||||
|
||||
@ -67,18 +66,16 @@ func (c ChannelDB) Model() Channel {
|
||||
}
|
||||
}
|
||||
|
||||
func DecodeChannel(r *sql.Rows) (Channel, error) {
|
||||
var data ChannelDB
|
||||
err := scan.RowStrict(&data, r)
|
||||
func DecodeChannel(r *sqlx.Rows) (Channel, error) {
|
||||
data, err := scanSingle[ChannelDB](r)
|
||||
if err != nil {
|
||||
return Channel{}, err
|
||||
}
|
||||
return data.Model(), nil
|
||||
}
|
||||
|
||||
func DecodeChannels(r *sql.Rows) ([]Channel, error) {
|
||||
var data []ChannelDB
|
||||
err := scan.RowsStrict(&data, r)
|
||||
func DecodeChannels(r *sqlx.Rows) ([]Channel, error) {
|
||||
data, err := scanAll[ChannelDB](r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1,8 +1,7 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"github.com/blockloop/scan"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/langext"
|
||||
"time"
|
||||
)
|
||||
@ -68,18 +67,16 @@ func (c ClientDB) Model() Client {
|
||||
}
|
||||
}
|
||||
|
||||
func DecodeClient(r *sql.Rows) (Client, error) {
|
||||
var data ClientDB
|
||||
err := scan.RowStrict(&data, r)
|
||||
func DecodeClient(r *sqlx.Rows) (Client, error) {
|
||||
data, err := scanSingle[ClientDB](r)
|
||||
if err != nil {
|
||||
return Client{}, err
|
||||
}
|
||||
return data.Model(), nil
|
||||
}
|
||||
|
||||
func DecodeClients(r *sql.Rows) ([]Client, error) {
|
||||
var data []ClientDB
|
||||
err := scan.RowsStrict(&data, r)
|
||||
func DecodeClients(r *sqlx.Rows) ([]Client, error) {
|
||||
data, err := scanAll[ClientDB](r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1,8 +1,7 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"github.com/blockloop/scan"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/langext"
|
||||
"time"
|
||||
)
|
||||
@ -88,18 +87,16 @@ func (d DeliveryDB) Model() Delivery {
|
||||
}
|
||||
}
|
||||
|
||||
func DecodeDelivery(r *sql.Rows) (Delivery, error) {
|
||||
var data DeliveryDB
|
||||
err := scan.RowStrict(&data, r)
|
||||
func DecodeDelivery(r *sqlx.Rows) (Delivery, error) {
|
||||
data, err := scanSingle[DeliveryDB](r)
|
||||
if err != nil {
|
||||
return Delivery{}, err
|
||||
}
|
||||
return data.Model(), nil
|
||||
}
|
||||
|
||||
func DecodeDeliveries(r *sql.Rows) ([]Delivery, error) {
|
||||
var data []DeliveryDB
|
||||
err := scan.RowsStrict(&data, r)
|
||||
func DecodeDeliveries(r *sqlx.Rows) ([]Delivery, error) {
|
||||
data, err := scanAll[DeliveryDB](r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1,8 +1,7 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"github.com/blockloop/scan"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/langext"
|
||||
"time"
|
||||
)
|
||||
@ -142,18 +141,16 @@ func (m MessageDB) Model() Message {
|
||||
}
|
||||
}
|
||||
|
||||
func DecodeMessage(r *sql.Rows) (Message, error) {
|
||||
var data MessageDB
|
||||
err := scan.RowStrict(&data, r)
|
||||
func DecodeMessage(r *sqlx.Rows) (Message, error) {
|
||||
data, err := scanSingle[MessageDB](r)
|
||||
if err != nil {
|
||||
return Message{}, err
|
||||
}
|
||||
return data.Model(), nil
|
||||
}
|
||||
|
||||
func DecodeMessages(r *sql.Rows) ([]Message, error) {
|
||||
var data []MessageDB
|
||||
err := scan.RowsStrict(&data, r)
|
||||
func DecodeMessages(r *sqlx.Rows) ([]Message, error) {
|
||||
data, err := scanAll[MessageDB](r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1,8 +1,7 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"github.com/blockloop/scan"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/langext"
|
||||
"time"
|
||||
)
|
||||
@ -61,18 +60,16 @@ func (s SubscriptionDB) Model() Subscription {
|
||||
}
|
||||
}
|
||||
|
||||
func DecodeSubscription(r *sql.Rows) (Subscription, error) {
|
||||
var data SubscriptionDB
|
||||
err := scan.RowStrict(&data, r)
|
||||
func DecodeSubscription(r *sqlx.Rows) (Subscription, error) {
|
||||
data, err := scanSingle[SubscriptionDB](r)
|
||||
if err != nil {
|
||||
return Subscription{}, err
|
||||
}
|
||||
return data.Model(), nil
|
||||
}
|
||||
|
||||
func DecodeSubscriptions(r *sql.Rows) ([]Subscription, error) {
|
||||
var data []SubscriptionDB
|
||||
err := scan.RowsStrict(&data, r)
|
||||
func DecodeSubscriptions(r *sqlx.Rows) ([]Subscription, error) {
|
||||
data, err := scanAll[SubscriptionDB](r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -2,8 +2,7 @@ package models
|
||||
|
||||
import (
|
||||
scn "blackforestbytes.com/simplecloudnotifier"
|
||||
"database/sql"
|
||||
"github.com/blockloop/scan"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/langext"
|
||||
"time"
|
||||
)
|
||||
@ -113,8 +112,8 @@ type UserJSON struct {
|
||||
SendKey string `json:"send_key"`
|
||||
AdminKey string `json:"admin_key"`
|
||||
TimestampCreated string `json:"timestamp_created"`
|
||||
TimestampLastRead *string `json:"timestamp_last_read"`
|
||||
TimestampLastSent *string `json:"timestamp_last_sent"`
|
||||
TimestampLastRead *string `json:"timestamp_lastread"`
|
||||
TimestampLastSent *string `json:"timestamp_lastsent"`
|
||||
MessagesSent int `json:"messages_sent"`
|
||||
QuotaUsed int `json:"quota_used"`
|
||||
QuotaUsedDay *string `json:"quota_used_day"`
|
||||
@ -160,18 +159,16 @@ func (u UserDB) Model() User {
|
||||
}
|
||||
}
|
||||
|
||||
func DecodeUser(r *sql.Rows) (User, error) {
|
||||
var data UserDB
|
||||
err := scan.RowStrict(&data, r)
|
||||
func DecodeUser(r *sqlx.Rows) (User, error) {
|
||||
data, err := scanSingle[UserDB](r)
|
||||
if err != nil {
|
||||
return User{}, err
|
||||
}
|
||||
return data.Model(), nil
|
||||
}
|
||||
|
||||
func DecodeUsers(r *sql.Rows) ([]User, error) {
|
||||
var data []UserDB
|
||||
err := scan.RowsStrict(&data, r)
|
||||
func DecodeUsers(r *sqlx.Rows) ([]User, error) {
|
||||
data, err := scanAll[UserDB](r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1,6 +1,9 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/langext"
|
||||
"time"
|
||||
)
|
||||
@ -19,3 +22,32 @@ func timeOptFromMilli(millis *int64) *time.Time {
|
||||
}
|
||||
return langext.Ptr(time.UnixMilli(*millis))
|
||||
}
|
||||
|
||||
func scanSingle[TData any](rows *sqlx.Rows) (TData, error) {
|
||||
if rows.Next() {
|
||||
var data TData
|
||||
err := rows.StructScan(&data)
|
||||
if err != nil {
|
||||
return *new(TData), err
|
||||
}
|
||||
if rows.Next() {
|
||||
return *new(TData), errors.New("sql returned more than onw row")
|
||||
}
|
||||
return data, nil
|
||||
} else {
|
||||
return *new(TData), sql.ErrNoRows
|
||||
}
|
||||
}
|
||||
|
||||
func scanAll[TData any](rows *sqlx.Rows) ([]TData, error) {
|
||||
res := make([]TData, 0)
|
||||
for rows.Next() {
|
||||
var data TData
|
||||
err := rows.StructScan(&data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res = append(res, data)
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
77
server/sq/database.go
Normal file
77
server/sq/database.go
Normal file
@ -0,0 +1,77 @@
|
||||
package sq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/rs/zerolog/log"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type DB interface {
|
||||
Exec(ctx context.Context, sql string, prep PP) (sql.Result, error)
|
||||
Query(ctx context.Context, sql string, prep PP) (*sqlx.Rows, error)
|
||||
Ping(ctx context.Context) error
|
||||
BeginTransaction(ctx context.Context, iso sql.IsolationLevel) (Tx, error)
|
||||
}
|
||||
|
||||
type database struct {
|
||||
db *sqlx.DB
|
||||
txctr uint16
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
func NewDB(db *sqlx.DB) DB {
|
||||
return &database{
|
||||
db: db,
|
||||
txctr: 0,
|
||||
lock: sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
func (db *database) Exec(ctx context.Context, sql string, prep PP) (sql.Result, error) {
|
||||
log.Debug().Msg(fmt.Sprintf("[SQL-EXEC] %s", fmtSQLPrint(sql)))
|
||||
|
||||
res, err := db.db.NamedExecContext(ctx, sql, prep)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (db *database) Query(ctx context.Context, sql string, prep PP) (*sqlx.Rows, error) {
|
||||
log.Debug().Msg(fmt.Sprintf("[SQL-QUERY] %s", fmtSQLPrint(sql)))
|
||||
|
||||
rows, err := db.db.NamedQueryContext(ctx, sql, prep)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
func (db *database) Ping(ctx context.Context) error {
|
||||
log.Debug().Msg("[SQL-PING]")
|
||||
|
||||
err := db.db.PingContext(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *database) BeginTransaction(ctx context.Context, iso sql.IsolationLevel) (Tx, error) {
|
||||
db.lock.Lock()
|
||||
txid := db.txctr
|
||||
db.txctr += 1 // with overflow !
|
||||
db.lock.Unlock()
|
||||
|
||||
log.Debug().Msg(fmt.Sprintf("[SQL-TX<%d>-START]", txid))
|
||||
|
||||
xtx, err := db.db.BeginTxx(ctx, &sql.TxOptions{Isolation: iso})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewTransaction(xtx, txid), nil
|
||||
}
|
3
server/sq/params.go
Normal file
3
server/sq/params.go
Normal file
@ -0,0 +1,3 @@
|
||||
package sq
|
||||
|
||||
type PP map[string]any
|
12
server/sq/queryable.go
Normal file
12
server/sq/queryable.go
Normal file
@ -0,0 +1,12 @@
|
||||
package sq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"github.com/jmoiron/sqlx"
|
||||
)
|
||||
|
||||
type Queryable interface {
|
||||
Exec(ctx context.Context, sql string, prep PP) (sql.Result, error)
|
||||
Query(ctx context.Context, sql string, prep PP) (*sqlx.Rows, error)
|
||||
}
|
60
server/sq/transaction.go
Normal file
60
server/sq/transaction.go
Normal file
@ -0,0 +1,60 @@
|
||||
package sq
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type Tx interface {
|
||||
Rollback() error
|
||||
Commit() error
|
||||
Exec(ctx context.Context, sql string, prep PP) (sql.Result, error)
|
||||
Query(ctx context.Context, sql string, prep PP) (*sqlx.Rows, error)
|
||||
}
|
||||
|
||||
type transaction struct {
|
||||
tx *sqlx.Tx
|
||||
id uint16
|
||||
}
|
||||
|
||||
func NewTransaction(xtx *sqlx.Tx, txid uint16) Tx {
|
||||
return &transaction{
|
||||
tx: xtx,
|
||||
id: txid,
|
||||
}
|
||||
}
|
||||
|
||||
func (tx *transaction) Rollback() error {
|
||||
log.Debug().Msg(fmt.Sprintf("[SQL-TX<%d>-ROLLBACK]", tx.id))
|
||||
|
||||
return tx.tx.Rollback()
|
||||
}
|
||||
|
||||
func (tx *transaction) Commit() error {
|
||||
log.Debug().Msg(fmt.Sprintf("[SQL-TX<%d>-COMMIT]", tx.id))
|
||||
|
||||
return tx.tx.Commit()
|
||||
}
|
||||
|
||||
func (tx *transaction) Exec(ctx context.Context, sql string, prep PP) (sql.Result, error) {
|
||||
log.Debug().Msg(fmt.Sprintf("[SQL-TX<%d>-EXEC] %s", tx.id, fmtSQLPrint(sql)))
|
||||
|
||||
res, err := tx.tx.NamedExecContext(ctx, sql, prep)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (tx *transaction) Query(ctx context.Context, sql string, prep PP) (*sqlx.Rows, error) {
|
||||
log.Debug().Msg(fmt.Sprintf("[SQL-TX<%d>-QUERY] %s", tx.id, fmtSQLPrint(sql)))
|
||||
|
||||
rows, err := sqlx.NamedQueryContext(ctx, tx.tx, sql, prep)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return rows, nil
|
||||
}
|
16
server/sq/util.go
Normal file
16
server/sq/util.go
Normal file
@ -0,0 +1,16 @@
|
||||
package sq
|
||||
|
||||
import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
func fmtSQLPrint(sql string) string {
|
||||
if strings.Contains(sql, ";") {
|
||||
return "(...multi...)"
|
||||
}
|
||||
|
||||
sql = strings.ReplaceAll(sql, "\r", "")
|
||||
sql = strings.ReplaceAll(sql, "\n", " ")
|
||||
|
||||
return sql
|
||||
}
|
@ -2860,15 +2860,15 @@
|
||||
"models.MessageJSON": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"body": {
|
||||
"type": "string"
|
||||
},
|
||||
"channel_id": {
|
||||
"type": "integer"
|
||||
},
|
||||
"channel_name": {
|
||||
"type": "string"
|
||||
},
|
||||
"content": {
|
||||
"type": "string"
|
||||
},
|
||||
"owner_user_id": {
|
||||
"type": "integer"
|
||||
},
|
||||
|
@ -386,12 +386,12 @@ definitions:
|
||||
type: object
|
||||
models.MessageJSON:
|
||||
properties:
|
||||
body:
|
||||
type: string
|
||||
channel_id:
|
||||
type: integer
|
||||
channel_name:
|
||||
type: string
|
||||
content:
|
||||
type: string
|
||||
owner_user_id:
|
||||
type: integer
|
||||
priority:
|
||||
|
@ -226,3 +226,5 @@ func TestDeleteUser(t *testing.T) {
|
||||
tt.RequestAuthGetShouldFail(t, admintok, baseUrl, "/api/users/"+uid, 404, apierr.USER_NOT_FOUND)
|
||||
|
||||
}
|
||||
|
||||
//TODO test user /w pro-token
|
||||
|
@ -48,17 +48,23 @@ func StartSimpleWebserver(t *testing.T) (*logic.Application, func()) {
|
||||
fmt.Println("DatabaseFile: " + dbfile)
|
||||
|
||||
conf := scn.Config{
|
||||
Namespace: "test",
|
||||
GinDebug: true,
|
||||
ServerIP: "0.0.0.0",
|
||||
ServerPort: "0", // simply choose a free port
|
||||
DBFile: dbfile,
|
||||
RequestTimeout: 30 * time.Second,
|
||||
ReturnRawErrors: true,
|
||||
DummyFirebase: true,
|
||||
Namespace: "test",
|
||||
GinDebug: true,
|
||||
ServerIP: "0.0.0.0",
|
||||
ServerPort: "0", // simply choose a free port
|
||||
DBFile: dbfile,
|
||||
DBJournal: "WAL",
|
||||
DBTimeout: 500 * time.Millisecond,
|
||||
DBMaxOpenConns: 2,
|
||||
DBMaxIdleConns: 2,
|
||||
DBConnMaxLifetime: 1 * time.Second,
|
||||
DBConnMaxIdleTime: 1 * time.Second,
|
||||
RequestTimeout: 30 * time.Second,
|
||||
ReturnRawErrors: true,
|
||||
DummyFirebase: true,
|
||||
}
|
||||
|
||||
sqlite, err := db.NewDatabase(dbfile)
|
||||
sqlite, err := db.NewDatabase(conf)
|
||||
if err != nil {
|
||||
TestFailErr(t, err)
|
||||
}
|
||||
@ -82,7 +88,11 @@ func StartSimpleWebserver(t *testing.T) (*logic.Application, func()) {
|
||||
|
||||
router.Init(ginengine)
|
||||
|
||||
stop := func() { app.Stop(); _ = os.Remove(dbfile); _ = app.IsRunning.WaitWithTimeout(400*time.Millisecond, false) }
|
||||
stop := func() {
|
||||
app.Stop()
|
||||
_ = os.Remove(dbfile)
|
||||
_ = app.IsRunning.WaitWithTimeout(400*time.Millisecond, false)
|
||||
}
|
||||
|
||||
go func() { app.Run() }()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user