2023-01-06 00:39:21 +01:00
package primary
import (
server "blackforestbytes.com/simplecloudnotifier"
"blackforestbytes.com/simplecloudnotifier/db/dbtools"
2023-05-28 19:59:57 +02:00
"blackforestbytes.com/simplecloudnotifier/db/schema"
2023-07-27 17:44:06 +02:00
"blackforestbytes.com/simplecloudnotifier/db/simplectx"
2024-09-15 21:07:46 +02:00
"blackforestbytes.com/simplecloudnotifier/models"
2023-01-06 00:39:21 +01:00
"context"
"database/sql"
"errors"
"fmt"
2024-07-15 17:26:55 +02:00
"github.com/glebarez/go-sqlite"
2023-01-06 00:39:21 +01:00
"github.com/jmoiron/sqlx"
2023-05-28 19:59:57 +02:00
"github.com/rs/zerolog/log"
2024-09-16 20:11:28 +02:00
"gogs.mikescher.com/BlackForestBytes/goext/exerr"
2023-01-06 00:39:21 +01:00
"gogs.mikescher.com/BlackForestBytes/goext/langext"
"gogs.mikescher.com/BlackForestBytes/goext/sq"
2024-09-16 20:11:28 +02:00
"os"
"path/filepath"
2023-01-06 00:39:21 +01:00
"time"
)
type Database struct {
2024-09-16 20:11:28 +02:00
db sq . DB
pp * dbtools . DBPreprocessor
wal bool
name string
schemaVersion int
schema map [ int ] schema . Def
2023-01-06 00:39:21 +01:00
}
func NewPrimaryDatabase ( cfg server . Config ) ( * Database , error ) {
conf := cfg . DBMain
2024-07-16 17:19:55 +02:00
url := fmt . Sprintf ( "file:%s?_pragma=journal_mode(%s)&_pragma=timeout(%d)&_pragma=foreign_keys(%s)&_pragma=busy_timeout(%d)" ,
conf . File ,
conf . Journal ,
conf . Timeout . Milliseconds ( ) ,
langext . FormatBool ( conf . CheckForeignKeys , "true" , "false" ) ,
conf . BusyTimeout . Milliseconds ( ) )
2023-01-06 00:39:21 +01:00
2024-07-15 17:26:55 +02:00
if ! langext . InArray ( "sqlite3" , sql . Drivers ( ) ) {
sqlite . RegisterAsSQLITE3 ( )
}
2023-01-06 00:39:21 +01:00
xdb , err := sqlx . Open ( "sqlite3" , url )
if err != nil {
return nil , err
}
if conf . SingleConn {
xdb . SetMaxOpenConns ( 1 )
} else {
xdb . SetMaxOpenConns ( 5 )
xdb . SetMaxIdleConns ( 5 )
xdb . SetConnMaxLifetime ( 60 * time . Minute )
xdb . SetConnMaxIdleTime ( 60 * time . Minute )
}
2024-09-15 21:07:46 +02:00
qqdb := sq . NewDB ( xdb , sq . DBOptions { RegisterDefaultConverter : langext . PTrue , RegisterCommentTrimmer : langext . PTrue } )
models . RegisterConverter ( qqdb )
2023-01-06 00:39:21 +01:00
2023-01-15 06:30:30 +01:00
if conf . EnableLogger {
qqdb . AddListener ( dbtools . DBLogger { } )
}
2023-01-06 00:39:21 +01:00
pp , err := dbtools . NewDBPreprocessor ( qqdb )
if err != nil {
return nil , err
}
qqdb . AddListener ( pp )
2024-09-16 20:11:28 +02:00
scndb := & Database {
db : qqdb ,
pp : pp ,
wal : conf . Journal == "WAL" ,
schemaVersion : schema . PrimarySchemaVersion ,
schema : schema . PrimarySchema ,
name : "primary" ,
}
2023-01-06 00:39:21 +01:00
return scndb , nil
}
2023-01-15 06:30:30 +01:00
func ( db * Database ) DB ( ) sq . DB {
return db . db
}
2023-07-27 17:44:06 +02:00
func ( db * Database ) Migrate ( outerctx context . Context ) error {
innerctx , cancel := context . WithTimeout ( outerctx , 24 * time . Second )
tctx := simplectx . CreateSimpleContext ( innerctx , cancel )
2023-01-06 00:39:21 +01:00
2023-07-27 17:44:06 +02:00
tx , err := tctx . GetOrCreateTransaction ( db )
if err != nil {
return err
}
defer func ( ) {
if tx . Status ( ) == sq . TxStatusInitial || tx . Status ( ) == sq . TxStatusActive {
err = tx . Rollback ( )
if err != nil {
log . Err ( err ) . Msg ( "failed to rollback transaction" )
}
}
} ( )
ppReInit := false
currschema , err := db . ReadSchema ( tctx )
2023-05-28 19:59:57 +02:00
if err != nil {
return err
}
2024-09-16 20:11:28 +02:00
if currschema == db . schemaVersion {
log . Info ( ) . Msgf ( "Database [%s] is up-to-date (%d == %d)" , db . name , currschema , db . schemaVersion )
2023-07-27 17:44:06 +02:00
}
2024-09-16 20:11:28 +02:00
for currschema < db . schemaVersion {
2023-07-27 17:44:06 +02:00
2024-09-16 20:11:28 +02:00
if currschema == 0 {
log . Info ( ) . Msgf ( "Migrate database (initialize) [%s] %d -> %d" , db . name , currschema , db . schemaVersion )
2023-07-27 17:44:06 +02:00
2024-09-17 22:25:09 +02:00
schemastr := db . schema [ db . schemaVersion ] . SQL
schemahash := db . schema [ db . schemaVersion ] . Hash
2023-01-06 00:39:21 +01:00
2024-09-16 20:11:28 +02:00
_ , err = tx . Exec ( tctx , schemastr , sq . PP { } )
if err != nil {
return err
}
2023-05-28 19:59:57 +02:00
2024-09-16 20:11:28 +02:00
err = db . WriteMetaInt ( tctx , "schema" , int64 ( db . schemaVersion ) )
if err != nil {
return err
}
2023-07-27 17:44:06 +02:00
2024-09-16 20:11:28 +02:00
err = db . WriteMetaString ( tctx , "schema_hash" , schemahash )
if err != nil {
return err
}
2023-07-27 17:44:06 +02:00
2024-09-16 20:11:28 +02:00
ppReInit = true
2023-01-06 00:39:21 +01:00
2024-09-16 20:11:28 +02:00
currschema = db . schemaVersion
} else {
log . Info ( ) . Msgf ( "Migrate database [%s] %d -> %d" , db . name , currschema , currschema + 1 )
2023-01-06 00:39:21 +01:00
2024-09-16 20:11:28 +02:00
err = db . migrateSingle ( tctx , tx , currschema , currschema + 1 )
if err != nil {
return err
}
2023-05-28 19:59:57 +02:00
2024-09-16 20:11:28 +02:00
currschema = currschema + 1
2023-05-28 19:59:57 +02:00
}
2024-09-16 20:11:28 +02:00
}
2023-05-28 19:59:57 +02:00
2024-09-16 20:11:28 +02:00
if currschema != db . schemaVersion {
return errors . New ( fmt . Sprintf ( "Unknown DB schema: %d" , currschema ) )
2023-07-27 17:44:06 +02:00
}
2024-09-16 20:11:28 +02:00
err = tx . Commit ( )
if err != nil {
return err
}
2023-07-27 17:44:06 +02:00
2024-09-16 20:11:28 +02:00
if ppReInit {
log . Debug ( ) . Msg ( "Re-Init preprocessor" )
err = db . pp . Init ( outerctx ) // Re-Init
2023-05-28 19:59:57 +02:00
if err != nil {
return err
}
2024-09-16 20:11:28 +02:00
}
2023-05-28 19:59:57 +02:00
2024-09-16 20:11:28 +02:00
return nil
}
2023-05-28 19:59:57 +02:00
2024-09-16 20:11:28 +02:00
//goland:noinspection SqlConstantCondition,SqlWithoutWhere
func ( db * Database ) migrateSingle ( tctx * simplectx . SimpleContext , tx sq . Tx , schemaFrom int , schemaTo int ) error {
2024-06-01 01:01:58 +02:00
2024-09-17 22:25:09 +02:00
if schemaFrom == schemaTo - 1 {
2024-06-01 01:01:58 +02:00
2024-09-17 22:25:09 +02:00
migSQL := db . schema [ schemaTo ] . MigScript
if migSQL == "" {
return exerr . New ( exerr . TypeInternal , fmt . Sprintf ( "missing %s migration from %d to %d" , db . name , schemaFrom , schemaTo ) ) . Build ( )
}
2024-06-01 01:01:58 +02:00
2024-09-17 22:25:09 +02:00
return db . migrateBySQL ( tctx , tx , migSQL , schemaFrom , schemaTo , db . schema [ schemaTo ] . Hash , nil )
2024-09-16 20:11:28 +02:00
}
2024-06-01 01:01:58 +02:00
2024-09-16 20:11:28 +02:00
return exerr . New ( exerr . TypeInternal , fmt . Sprintf ( "missing %s migration from %d to %d" , db . name , schemaFrom , schemaTo ) ) . Build ( )
}
2024-06-01 01:01:58 +02:00
2024-09-16 20:11:28 +02:00
func ( db * Database ) migrateBySQL ( tctx * simplectx . SimpleContext , tx sq . Tx , stmts string , currSchemaVers int , resultSchemVers int , resultHash string , post func ( tctx * simplectx . SimpleContext , tx sq . Tx ) error ) error {
2024-06-01 01:01:58 +02:00
2024-09-16 20:11:28 +02:00
schemaHashMeta , err := db . ReadMetaString ( tctx , "schema_hash" )
if err != nil {
return err
2024-06-01 01:01:58 +02:00
}
2024-09-16 20:11:28 +02:00
schemHashDBBefore , err := sq . HashSqliteDatabase ( tctx , tx )
if err != nil {
return err
}
2024-06-01 01:01:58 +02:00
2024-09-16 20:11:28 +02:00
if schemHashDBBefore != langext . Coalesce ( schemaHashMeta , "" ) || langext . Coalesce ( schemaHashMeta , "" ) != db . schema [ currSchemaVers ] . Hash {
log . Debug ( ) . Str ( "schemHashDB" , schemHashDBBefore ) . Msg ( "Schema (primary db)" )
log . Debug ( ) . Str ( "schemaHashMeta" , langext . Coalesce ( schemaHashMeta , "" ) ) . Msg ( "Schema (primary db)" )
log . Debug ( ) . Str ( "schemaHashAsset" , db . schema [ currSchemaVers ] . Hash ) . Msg ( "Schema (primary db)" )
return errors . New ( "database schema does not match (primary db)" )
} else {
log . Debug ( ) . Str ( "schemHash" , schemHashDBBefore ) . Msg ( "Verified Schema consistency (primary db)" )
}
2024-06-01 01:01:58 +02:00
2024-09-16 20:11:28 +02:00
log . Info ( ) . Msgf ( "Upgrade schema from %d -> %d" , currSchemaVers , resultSchemVers )
2024-06-01 01:01:58 +02:00
2024-09-16 20:11:28 +02:00
_ , err = tx . Exec ( tctx , stmts , sq . PP { } )
if err != nil {
return err
}
2024-09-16 18:26:28 +02:00
2024-09-16 20:11:28 +02:00
schemHashDBAfter , err := sq . HashSqliteDatabase ( tctx , tx )
if err != nil {
return err
}
2024-09-16 18:26:28 +02:00
2024-09-16 20:11:28 +02:00
if schemHashDBAfter != resultHash {
2024-09-16 18:26:28 +02:00
2024-09-16 20:11:28 +02:00
schemaDBStr := langext . Must ( createSqliteDatabaseSchemaStringFromSQL ( tctx , db . schema [ resultSchemVers ] . SQL ) )
resultDBStr := langext . Must ( sq . CreateSqliteDatabaseSchemaString ( tctx , tx ) )
2024-09-16 18:26:28 +02:00
2024-09-16 20:11:28 +02:00
fmt . Printf ( "========================================= SQL SCHEMA-DUMP STR (CORRECT | FROM COMPILED SCHEMA):%s\n=========================================\n\n" , schemaDBStr )
fmt . Printf ( "========================================= SQL SCHEMA-DUMP STR (CURRNET | AFTER MIGRATION):%s\n=========================================\n\n" , resultDBStr )
2024-09-16 18:26:28 +02:00
2024-09-16 20:11:28 +02:00
return fmt . Errorf ( "database [%s] schema does not match after [%d -> %d] migration (expected: %s | actual: %s)" , db . name , currSchemaVers , resultSchemVers , resultHash , schemHashDBBefore )
}
2024-09-16 18:26:28 +02:00
2024-09-16 20:11:28 +02:00
err = db . WriteMetaInt ( tctx , "schema" , int64 ( resultSchemVers ) )
if err != nil {
return err
}
2024-09-16 18:26:28 +02:00
2024-09-16 20:11:28 +02:00
err = db . WriteMetaString ( tctx , "schema_hash" , resultHash )
if err != nil {
return err
2024-09-16 18:26:28 +02:00
}
2024-09-16 20:11:28 +02:00
log . Info ( ) . Msgf ( "Upgrade schema from %d -> %d succesfully" , currSchemaVers , resultSchemVers )
2024-09-16 18:26:28 +02:00
2024-09-16 20:11:28 +02:00
return nil
}
2024-09-16 18:26:28 +02:00
2024-09-16 20:11:28 +02:00
func createSqliteDatabaseSchemaStringFromSQL ( ctx context . Context , schemaStr string ) ( string , error ) {
dbdir := os . TempDir ( )
dbfile1 := filepath . Join ( dbdir , langext . MustHexUUID ( ) + ".sqlite3" )
defer func ( ) { _ = os . Remove ( dbfile1 ) } ( )
2024-09-16 18:26:28 +02:00
2024-09-16 20:11:28 +02:00
err := os . MkdirAll ( dbdir , os . ModePerm )
if err != nil {
return "" , err
2023-07-27 17:44:06 +02:00
}
2023-05-28 19:59:57 +02:00
2024-09-16 20:11:28 +02:00
url := fmt . Sprintf ( "file:%s?_pragma=journal_mode(%s)&_pragma=timeout(%d)&_pragma=foreign_keys(%s)&_pragma=busy_timeout(%d)" , dbfile1 , "DELETE" , 1000 , "true" , 1000 )
2023-01-06 00:39:21 +01:00
2024-09-16 20:11:28 +02:00
xdb , err := sqlx . Open ( "sqlite" , url )
2023-07-27 17:44:06 +02:00
if err != nil {
2024-09-16 20:11:28 +02:00
return "" , err
2023-07-27 17:44:06 +02:00
}
2024-09-16 20:11:28 +02:00
db := sq . NewDB ( xdb , sq . DBOptions { } )
_ , err = db . Exec ( ctx , schemaStr , sq . PP { } )
if err != nil {
return "" , err
2023-07-27 17:44:06 +02:00
}
2024-09-16 20:11:28 +02:00
return sq . CreateSqliteDatabaseSchemaString ( ctx , db )
2023-01-06 00:39:21 +01:00
}
func ( db * Database ) Ping ( ctx context . Context ) error {
return db . db . Ping ( ctx )
}
2024-09-20 17:13:36 +02:00
func ( db * Database ) Version ( ctx context . Context ) ( string , string , error ) {
type rt struct {
Version string ` db:"version" `
SourceID string ` db:"sourceID" `
}
resp , err := sq . QuerySingle [ rt ] ( ctx , db . db , "SELECT sqlite_version() AS version, sqlite_source_id() AS sourceID" , sq . PP { } , sq . SModeFast , sq . Safe )
if err != nil {
return "" , "" , err
}
return resp . Version , resp . SourceID , nil
}
2023-01-06 00:39:21 +01:00
func ( db * Database ) BeginTx ( ctx context . Context ) ( sq . Tx , error ) {
return db . db . BeginTransaction ( ctx , sql . LevelDefault )
}
func ( db * Database ) Stop ( ctx context . Context ) error {
if db . wal {
_ , err := db . db . Exec ( ctx , "PRAGMA wal_checkpoint;" , sq . PP { } )
if err != nil {
return err
}
}
err := db . db . Exit ( )
if err != nil {
return err
}
return nil
}