v0.0.518 Improve sq db-listener interface (breaking)
All checks were successful
Build Docker and Deploy / Run goext test-suite (push) Successful in 4m11s
All checks were successful
Build Docker and Deploy / Run goext test-suite (push) Successful in 4m11s
This commit is contained in:
parent
295b3ef793
commit
f6b47792a4
14
go.mod
14
go.mod
@ -9,9 +9,9 @@ require (
|
||||
github.com/rs/xid v1.6.0
|
||||
github.com/rs/zerolog v1.33.0
|
||||
go.mongodb.org/mongo-driver v1.17.1
|
||||
golang.org/x/crypto v0.27.0
|
||||
golang.org/x/sys v0.25.0
|
||||
golang.org/x/term v0.24.0
|
||||
golang.org/x/crypto v0.28.0
|
||||
golang.org/x/sys v0.26.0
|
||||
golang.org/x/term v0.25.0
|
||||
)
|
||||
|
||||
require (
|
||||
@ -53,10 +53,10 @@ require (
|
||||
github.com/xdg-go/scram v1.1.2 // indirect
|
||||
github.com/xdg-go/stringprep v1.0.4 // indirect
|
||||
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
|
||||
golang.org/x/arch v0.10.0 // indirect
|
||||
golang.org/x/image v0.20.0 // indirect
|
||||
golang.org/x/net v0.29.0 // indirect
|
||||
golang.org/x/text v0.18.0 // indirect
|
||||
golang.org/x/arch v0.11.0 // indirect
|
||||
golang.org/x/image v0.21.0 // indirect
|
||||
golang.org/x/net v0.30.0 // indirect
|
||||
golang.org/x/text v0.19.0 // indirect
|
||||
google.golang.org/protobuf v1.34.2 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
modernc.org/libc v1.37.6 // indirect
|
||||
|
14
go.sum
14
go.sum
@ -261,6 +261,8 @@ golang.org/x/arch v0.9.0 h1:ub9TgUInamJ8mrZIGlBG6/4TqWeMszd4N8lNorbrr6k=
|
||||
golang.org/x/arch v0.9.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
|
||||
golang.org/x/arch v0.10.0 h1:S3huipmSclq3PJMNe76NGwkBR504WFkQ5dhzWzP8ZW8=
|
||||
golang.org/x/arch v0.10.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
|
||||
golang.org/x/arch v0.11.0 h1:KXV8WWKCXm6tRpLirl2szsO5j/oOODwZf4hATmGVNs4=
|
||||
golang.org/x/arch v0.11.0/go.mod h1:FEVrYAQjsQXMVJ1nsMoVVXPZg6p2JE2mx8psSWTDQys=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
@ -287,6 +289,8 @@ golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
|
||||
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
|
||||
golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A=
|
||||
golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70=
|
||||
golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw=
|
||||
golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U=
|
||||
golang.org/x/image v0.0.0-20190910094157-69e4b8554b2a/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
|
||||
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8 h1:hVwzHzIUGRjiF7EcUjqNxk3NCfkPxbDKRdnNE1Rpg0U=
|
||||
golang.org/x/image v0.0.0-20191009234506-e7c1f5e7dbb8/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
|
||||
@ -300,6 +304,8 @@ golang.org/x/image v0.19.0 h1:D9FX4QWkLfkeqaC62SonffIIuYdOk/UE2XKUBgRIBIQ=
|
||||
golang.org/x/image v0.19.0/go.mod h1:y0zrRqlQRWQ5PXaYCOMLTW2fpsxZ8Qh9I/ohnInJEys=
|
||||
golang.org/x/image v0.20.0 h1:7cVCUjQwfL18gyBJOmYvptfSHS8Fb3YUDtfLIZ7Nbpw=
|
||||
golang.org/x/image v0.20.0/go.mod h1:0a88To4CYVBAHp5FXJm8o7QbUl37Vd85ply1vyD8auM=
|
||||
golang.org/x/image v0.21.0 h1:c5qV36ajHpdj4Qi0GnE0jUc/yuo33OLFaa0d+crTD5s=
|
||||
golang.org/x/image v0.21.0/go.mod h1:vUbsLavqK/W303ZroQQVKQ+Af3Yl6Uz1Ppu5J/cLz78=
|
||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
@ -326,6 +332,8 @@ golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
|
||||
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
|
||||
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
|
||||
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
|
||||
golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4=
|
||||
golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU=
|
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
|
||||
@ -365,6 +373,8 @@ golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
|
||||
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
|
||||
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
|
||||
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE=
|
||||
@ -385,6 +395,8 @@ golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU=
|
||||
golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk=
|
||||
golang.org/x/term v0.24.0 h1:Mh5cbb+Zk2hqqXNO7S1iTjEphVL+jb8ZWaqh/g+JWkM=
|
||||
golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8=
|
||||
golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24=
|
||||
golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
@ -401,6 +413,8 @@ golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
|
||||
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
|
||||
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
|
||||
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
|
||||
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
|
||||
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
|
||||
|
@ -1,5 +1,5 @@
|
||||
package goext
|
||||
|
||||
const GoextVersion = "0.0.517"
|
||||
const GoextVersion = "0.0.518"
|
||||
|
||||
const GoextVersionTimestamp = "2024-10-02T11:31:34+0200"
|
||||
const GoextVersionTimestamp = "2024-10-05T00:45:55+0200"
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/exerr"
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/langext"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type DB interface {
|
||||
@ -57,86 +58,118 @@ func (db *database) AddListener(listener Listener) {
|
||||
|
||||
func (db *database) Exec(ctx context.Context, sqlstr string, prep PP) (sql.Result, error) {
|
||||
origsql := sqlstr
|
||||
|
||||
t0 := time.Now()
|
||||
|
||||
preMeta := PreExecMeta{}
|
||||
for _, v := range db.lstr {
|
||||
err := v.PreExec(ctx, nil, &sqlstr, &prep)
|
||||
err := v.PreExec(ctx, nil, &sqlstr, &prep, preMeta)
|
||||
if err != nil {
|
||||
return nil, exerr.Wrap(err, "failed to call SQL pre-exec listener").Str("original_sql", origsql).Str("sql", sqlstr).Any("sql_params", prep).Build()
|
||||
}
|
||||
}
|
||||
|
||||
t1 := time.Now()
|
||||
|
||||
res, err := db.db.NamedExecContext(ctx, sqlstr, prep)
|
||||
|
||||
postMeta := PostExecMeta{Init: t0, Start: t1, End: time.Now()}
|
||||
for _, v := range db.lstr {
|
||||
v.PostExec(nil, origsql, sqlstr, prep)
|
||||
v.PostExec(nil, origsql, sqlstr, prep, postMeta)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, exerr.Wrap(err, "Failed to [exec] sql statement").Str("original_sql", origsql).Str("sql", sqlstr).Any("sql_params", prep).Build()
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (db *database) Query(ctx context.Context, sqlstr string, prep PP) (*sqlx.Rows, error) {
|
||||
origsql := sqlstr
|
||||
|
||||
t0 := time.Now()
|
||||
|
||||
preMeta := PreQueryMeta{}
|
||||
for _, v := range db.lstr {
|
||||
err := v.PreQuery(ctx, nil, &sqlstr, &prep)
|
||||
err := v.PreQuery(ctx, nil, &sqlstr, &prep, preMeta)
|
||||
if err != nil {
|
||||
return nil, exerr.Wrap(err, "failed to call SQL pre-query listener").Str("original_sql", origsql).Str("sql", sqlstr).Any("sql_params", prep).Build()
|
||||
}
|
||||
}
|
||||
|
||||
t1 := time.Now()
|
||||
|
||||
rows, err := sqlx.NamedQueryContext(ctx, db.db, sqlstr, prep)
|
||||
|
||||
postMeta := PostQueryMeta{Init: t0, Start: t1, End: time.Now()}
|
||||
for _, v := range db.lstr {
|
||||
v.PostQuery(nil, origsql, sqlstr, prep)
|
||||
v.PostQuery(nil, origsql, sqlstr, prep, postMeta)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, exerr.Wrap(err, "Failed to [query] sql statement").Str("original_sql", origsql).Str("sql", sqlstr).Any("sql_params", prep).Build()
|
||||
}
|
||||
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
func (db *database) Ping(ctx context.Context) error {
|
||||
|
||||
t0 := time.Now()
|
||||
|
||||
preMeta := PrePingMeta{}
|
||||
for _, v := range db.lstr {
|
||||
err := v.PrePing(ctx)
|
||||
err := v.PrePing(ctx, preMeta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
t1 := time.Now()
|
||||
|
||||
err := db.db.PingContext(ctx)
|
||||
|
||||
postMeta := PostPingMeta{Init: t0, Start: t1, End: time.Now()}
|
||||
for _, v := range db.lstr {
|
||||
v.PostPing(err)
|
||||
v.PostPing(err, postMeta)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return exerr.Wrap(err, "Failed to [ping] sql database").Build()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *database) BeginTransaction(ctx context.Context, iso sql.IsolationLevel) (Tx, error) {
|
||||
|
||||
t0 := time.Now()
|
||||
|
||||
db.lock.Lock()
|
||||
txid := db.txctr
|
||||
db.txctr += 1 // with overflow !
|
||||
db.lock.Unlock()
|
||||
|
||||
preMeta := PreTxBeginMeta{}
|
||||
for _, v := range db.lstr {
|
||||
err := v.PreTxBegin(ctx, txid)
|
||||
err := v.PreTxBegin(ctx, txid, preMeta)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
t1 := time.Now()
|
||||
|
||||
xtx, err := db.db.BeginTxx(ctx, &sql.TxOptions{Isolation: iso})
|
||||
if err != nil {
|
||||
return nil, exerr.Wrap(err, "Failed to start sql transaction").Build()
|
||||
|
||||
postMeta := PostTxBeginMeta{Init: t0, Start: t1, End: time.Now()}
|
||||
for _, v := range db.lstr {
|
||||
v.PostTxBegin(txid, err, postMeta)
|
||||
}
|
||||
|
||||
for _, v := range db.lstr {
|
||||
v.PostTxBegin(txid, err)
|
||||
if err != nil {
|
||||
return nil, exerr.Wrap(err, "Failed to start sql transaction").Build()
|
||||
}
|
||||
|
||||
return NewTransaction(xtx, txid, db), nil
|
||||
|
191
sq/listener.go
191
sq/listener.go
@ -1,187 +1,248 @@
|
||||
package sq
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
type PrePingMeta struct {
|
||||
}
|
||||
|
||||
type PreTxBeginMeta struct {
|
||||
}
|
||||
|
||||
type PreTxCommitMeta struct {
|
||||
}
|
||||
|
||||
type PreTxRollbackMeta struct {
|
||||
}
|
||||
|
||||
type PreQueryMeta struct {
|
||||
}
|
||||
|
||||
type PreExecMeta struct {
|
||||
}
|
||||
|
||||
type PostPingMeta struct {
|
||||
Init time.Time
|
||||
Start time.Time
|
||||
End time.Time
|
||||
}
|
||||
|
||||
type PostTxBeginMeta struct {
|
||||
Init time.Time
|
||||
Start time.Time
|
||||
End time.Time
|
||||
}
|
||||
|
||||
type PostTxCommitMeta struct {
|
||||
Init time.Time
|
||||
Start time.Time
|
||||
End time.Time
|
||||
ExecCounter int
|
||||
QueryCounter int
|
||||
}
|
||||
|
||||
type PostTxRollbackMeta struct {
|
||||
Init time.Time
|
||||
Start time.Time
|
||||
End time.Time
|
||||
ExecCounter int
|
||||
QueryCounter int
|
||||
}
|
||||
|
||||
type PostQueryMeta struct {
|
||||
Init time.Time
|
||||
Start time.Time
|
||||
End time.Time
|
||||
}
|
||||
|
||||
type PostExecMeta struct {
|
||||
Init time.Time
|
||||
Start time.Time
|
||||
End time.Time
|
||||
}
|
||||
|
||||
type Listener interface {
|
||||
PrePing(ctx context.Context) error
|
||||
PreTxBegin(ctx context.Context, txid uint16) error
|
||||
PreTxCommit(txid uint16) error
|
||||
PreTxRollback(txid uint16) error
|
||||
PreQuery(ctx context.Context, txID *uint16, sql *string, params *PP) error
|
||||
PreExec(ctx context.Context, txID *uint16, sql *string, params *PP) error
|
||||
PrePing(ctx context.Context, meta PrePingMeta) error
|
||||
PreTxBegin(ctx context.Context, txid uint16, meta PreTxBeginMeta) error
|
||||
PreTxCommit(txid uint16, meta PreTxCommitMeta) error
|
||||
PreTxRollback(txid uint16, meta PreTxRollbackMeta) error
|
||||
PreQuery(ctx context.Context, txID *uint16, sql *string, params *PP, meta PreQueryMeta) error
|
||||
PreExec(ctx context.Context, txID *uint16, sql *string, params *PP, meta PreExecMeta) error
|
||||
|
||||
PostPing(result error)
|
||||
PostTxBegin(txid uint16, result error)
|
||||
PostTxCommit(txid uint16, result error)
|
||||
PostTxRollback(txid uint16, result error)
|
||||
PostQuery(txID *uint16, sqlOriginal string, sqlReal string, params PP)
|
||||
PostExec(txID *uint16, sqlOriginal string, sqlReal string, params PP)
|
||||
PostPing(result error, meta PostPingMeta)
|
||||
PostTxBegin(txid uint16, result error, meta PostTxBeginMeta)
|
||||
PostTxCommit(txid uint16, result error, meta PostTxCommitMeta)
|
||||
PostTxRollback(txid uint16, result error, meta PostTxRollbackMeta)
|
||||
PostQuery(txID *uint16, sqlOriginal string, sqlReal string, params PP, meta PostQueryMeta)
|
||||
PostExec(txID *uint16, sqlOriginal string, sqlReal string, params PP, meta PostExecMeta)
|
||||
}
|
||||
|
||||
type genListener struct {
|
||||
prePing func(ctx context.Context) error
|
||||
preTxBegin func(ctx context.Context, txid uint16) error
|
||||
preTxCommit func(txid uint16) error
|
||||
preTxRollback func(txid uint16) error
|
||||
preQuery func(ctx context.Context, txID *uint16, sql *string, params *PP) error
|
||||
preExec func(ctx context.Context, txID *uint16, sql *string, params *PP) error
|
||||
postPing func(result error)
|
||||
postTxBegin func(txid uint16, result error)
|
||||
postTxCommit func(txid uint16, result error)
|
||||
postTxRollback func(txid uint16, result error)
|
||||
postQuery func(txID *uint16, sqlOriginal string, sqlReal string, params PP)
|
||||
postExec func(txID *uint16, sqlOriginal string, sqlReal string, params PP)
|
||||
prePing func(ctx context.Context, meta PrePingMeta) error
|
||||
preTxBegin func(ctx context.Context, txid uint16, meta PreTxBeginMeta) error
|
||||
preTxCommit func(txid uint16, meta PreTxCommitMeta) error
|
||||
preTxRollback func(txid uint16, meta PreTxRollbackMeta) error
|
||||
preQuery func(ctx context.Context, txID *uint16, sql *string, params *PP, meta PreQueryMeta) error
|
||||
preExec func(ctx context.Context, txID *uint16, sql *string, params *PP, meta PreExecMeta) error
|
||||
postPing func(result error, meta PostPingMeta)
|
||||
postTxBegin func(txid uint16, result error, meta PostTxBeginMeta)
|
||||
postTxCommit func(txid uint16, result error, meta PostTxCommitMeta)
|
||||
postTxRollback func(txid uint16, result error, meta PostTxRollbackMeta)
|
||||
postQuery func(txID *uint16, sqlOriginal string, sqlReal string, params PP, meta PostQueryMeta)
|
||||
postExec func(txID *uint16, sqlOriginal string, sqlReal string, params PP, meta PostExecMeta)
|
||||
}
|
||||
|
||||
func (g genListener) PrePing(ctx context.Context) error {
|
||||
func (g genListener) PrePing(ctx context.Context, meta PrePingMeta) error {
|
||||
if g.prePing != nil {
|
||||
return g.prePing(ctx)
|
||||
return g.prePing(ctx, meta)
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (g genListener) PreTxBegin(ctx context.Context, txid uint16) error {
|
||||
func (g genListener) PreTxBegin(ctx context.Context, txid uint16, meta PreTxBeginMeta) error {
|
||||
if g.preTxBegin != nil {
|
||||
return g.preTxBegin(ctx, txid)
|
||||
return g.preTxBegin(ctx, txid, meta)
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (g genListener) PreTxCommit(txid uint16) error {
|
||||
func (g genListener) PreTxCommit(txid uint16, meta PreTxCommitMeta) error {
|
||||
if g.preTxCommit != nil {
|
||||
return g.preTxCommit(txid)
|
||||
return g.preTxCommit(txid, meta)
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (g genListener) PreTxRollback(txid uint16) error {
|
||||
func (g genListener) PreTxRollback(txid uint16, meta PreTxRollbackMeta) error {
|
||||
if g.preTxRollback != nil {
|
||||
return g.preTxRollback(txid)
|
||||
return g.preTxRollback(txid, meta)
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (g genListener) PreQuery(ctx context.Context, txID *uint16, sql *string, params *PP) error {
|
||||
func (g genListener) PreQuery(ctx context.Context, txID *uint16, sql *string, params *PP, meta PreQueryMeta) error {
|
||||
if g.preQuery != nil {
|
||||
return g.preQuery(ctx, txID, sql, params)
|
||||
return g.preQuery(ctx, txID, sql, params, meta)
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (g genListener) PreExec(ctx context.Context, txID *uint16, sql *string, params *PP) error {
|
||||
func (g genListener) PreExec(ctx context.Context, txID *uint16, sql *string, params *PP, meta PreExecMeta) error {
|
||||
if g.preExec != nil {
|
||||
return g.preExec(ctx, txID, sql, params)
|
||||
return g.preExec(ctx, txID, sql, params, meta)
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (g genListener) PostPing(result error) {
|
||||
func (g genListener) PostPing(result error, meta PostPingMeta) {
|
||||
if g.postPing != nil {
|
||||
g.postPing(result)
|
||||
g.postPing(result, meta)
|
||||
}
|
||||
}
|
||||
|
||||
func (g genListener) PostTxBegin(txid uint16, result error) {
|
||||
func (g genListener) PostTxBegin(txid uint16, result error, meta PostTxBeginMeta) {
|
||||
if g.postTxBegin != nil {
|
||||
g.postTxBegin(txid, result)
|
||||
g.postTxBegin(txid, result, meta)
|
||||
}
|
||||
}
|
||||
|
||||
func (g genListener) PostTxCommit(txid uint16, result error) {
|
||||
func (g genListener) PostTxCommit(txid uint16, result error, meta PostTxCommitMeta) {
|
||||
if g.postTxCommit != nil {
|
||||
g.postTxCommit(txid, result)
|
||||
g.postTxCommit(txid, result, meta)
|
||||
}
|
||||
}
|
||||
|
||||
func (g genListener) PostTxRollback(txid uint16, result error) {
|
||||
func (g genListener) PostTxRollback(txid uint16, result error, meta PostTxRollbackMeta) {
|
||||
if g.postTxRollback != nil {
|
||||
g.postTxRollback(txid, result)
|
||||
g.postTxRollback(txid, result, meta)
|
||||
}
|
||||
}
|
||||
|
||||
func (g genListener) PostQuery(txID *uint16, sqlOriginal string, sqlReal string, params PP) {
|
||||
func (g genListener) PostQuery(txID *uint16, sqlOriginal string, sqlReal string, params PP, meta PostQueryMeta) {
|
||||
if g.postQuery != nil {
|
||||
g.postQuery(txID, sqlOriginal, sqlReal, params)
|
||||
g.postQuery(txID, sqlOriginal, sqlReal, params, meta)
|
||||
}
|
||||
}
|
||||
|
||||
func (g genListener) PostExec(txID *uint16, sqlOriginal string, sqlReal string, params PP) {
|
||||
func (g genListener) PostExec(txID *uint16, sqlOriginal string, sqlReal string, params PP, meta PostExecMeta) {
|
||||
if g.postExec != nil {
|
||||
g.postExec(txID, sqlOriginal, sqlReal, params)
|
||||
g.postExec(txID, sqlOriginal, sqlReal, params, meta)
|
||||
}
|
||||
}
|
||||
|
||||
func NewPrePingListener(f func(ctx context.Context) error) Listener {
|
||||
func NewPrePingListener(f func(ctx context.Context, meta PrePingMeta) error) Listener {
|
||||
return genListener{prePing: f}
|
||||
}
|
||||
|
||||
func NewPreTxBeginListener(f func(ctx context.Context, txid uint16) error) Listener {
|
||||
func NewPreTxBeginListener(f func(ctx context.Context, txid uint16, meta PreTxBeginMeta) error) Listener {
|
||||
return genListener{preTxBegin: f}
|
||||
}
|
||||
|
||||
func NewPreTxCommitListener(f func(txid uint16) error) Listener {
|
||||
func NewPreTxCommitListener(f func(txid uint16, meta PreTxCommitMeta) error) Listener {
|
||||
return genListener{preTxCommit: f}
|
||||
}
|
||||
|
||||
func NewPreTxRollbackListener(f func(txid uint16) error) Listener {
|
||||
func NewPreTxRollbackListener(f func(txid uint16, meta PreTxRollbackMeta) error) Listener {
|
||||
return genListener{preTxRollback: f}
|
||||
}
|
||||
|
||||
func NewPreQueryListener(f func(ctx context.Context, txID *uint16, sql *string, params *PP) error) Listener {
|
||||
func NewPreQueryListener(f func(ctx context.Context, txID *uint16, sql *string, params *PP, meta PreQueryMeta) error) Listener {
|
||||
return genListener{preQuery: f}
|
||||
}
|
||||
|
||||
func NewPreExecListener(f func(ctx context.Context, txID *uint16, sql *string, params *PP) error) Listener {
|
||||
func NewPreExecListener(f func(ctx context.Context, txID *uint16, sql *string, params *PP, meta PreExecMeta) error) Listener {
|
||||
return genListener{preExec: f}
|
||||
}
|
||||
|
||||
func NewPreListener(f func(ctx context.Context, cmdtype string, txID *uint16, sql *string, params *PP) error) Listener {
|
||||
return genListener{
|
||||
preExec: func(ctx context.Context, txID *uint16, sql *string, params *PP) error {
|
||||
preExec: func(ctx context.Context, txID *uint16, sql *string, params *PP, meta PreExecMeta) error {
|
||||
return f(ctx, "EXEC", txID, sql, params)
|
||||
},
|
||||
preQuery: func(ctx context.Context, txID *uint16, sql *string, params *PP) error {
|
||||
preQuery: func(ctx context.Context, txID *uint16, sql *string, params *PP, meta PreQueryMeta) error {
|
||||
return f(ctx, "QUERY", txID, sql, params)
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func NewPostPingListener(f func(result error)) Listener {
|
||||
func NewPostPingListener(f func(result error, meta PostPingMeta)) Listener {
|
||||
return genListener{postPing: f}
|
||||
}
|
||||
|
||||
func NewPostTxBeginListener(f func(txid uint16, result error)) Listener {
|
||||
func NewPostTxBeginListener(f func(txid uint16, result error, meta PostTxBeginMeta)) Listener {
|
||||
return genListener{postTxBegin: f}
|
||||
}
|
||||
|
||||
func NewPostTxCommitListener(f func(txid uint16, result error)) Listener {
|
||||
func NewPostTxCommitListener(f func(txid uint16, result error, meta PostTxCommitMeta)) Listener {
|
||||
return genListener{postTxCommit: f}
|
||||
}
|
||||
|
||||
func NewPostTxRollbackListener(f func(txid uint16, result error)) Listener {
|
||||
func NewPostTxRollbackListener(f func(txid uint16, result error, meta PostTxRollbackMeta)) Listener {
|
||||
return genListener{postTxRollback: f}
|
||||
}
|
||||
|
||||
func NewPostQueryListener(f func(txID *uint16, sqlOriginal string, sqlReal string, params PP)) Listener {
|
||||
func NewPostQueryListener(f func(txID *uint16, sqlOriginal string, sqlReal string, params PP, meta PostQueryMeta)) Listener {
|
||||
return genListener{postQuery: f}
|
||||
}
|
||||
|
||||
func NewPostExecListener(f func(txID *uint16, sqlOriginal string, sqlReal string, params PP)) Listener {
|
||||
func NewPostExecListener(f func(txID *uint16, sqlOriginal string, sqlReal string, params PP, meta PostExecMeta)) Listener {
|
||||
return genListener{postExec: f}
|
||||
}
|
||||
|
||||
func NewPostListener(f func(cmdtype string, txID *uint16, sqlOriginal string, sqlReal string, params PP)) Listener {
|
||||
return genListener{
|
||||
postExec: func(txID *uint16, sqlOriginal string, sqlReal string, params PP) {
|
||||
postExec: func(txID *uint16, sqlOriginal string, sqlReal string, params PP, meta PostExecMeta) {
|
||||
f("EXEC", txID, sqlOriginal, sqlReal, params)
|
||||
},
|
||||
postQuery: func(txID *uint16, sqlOriginal string, sqlReal string, params PP) {
|
||||
postQuery: func(txID *uint16, sqlOriginal string, sqlReal string, params PP, meta PostQueryMeta) {
|
||||
f("QUERY", txID, sqlOriginal, sqlReal, params)
|
||||
},
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"github.com/jmoiron/sqlx"
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/exerr"
|
||||
"gogs.mikescher.com/BlackForestBytes/goext/langext"
|
||||
"time"
|
||||
)
|
||||
|
||||
type TxStatus string
|
||||
@ -46,42 +47,56 @@ func NewTransaction(xtx *sqlx.Tx, txid uint16, db *database) Tx {
|
||||
}
|
||||
|
||||
func (tx *transaction) Rollback() error {
|
||||
|
||||
t0 := time.Now()
|
||||
|
||||
preMeta := PreTxRollbackMeta{}
|
||||
for _, v := range tx.db.lstr {
|
||||
err := v.PreTxRollback(tx.id)
|
||||
err := v.PreTxRollback(tx.id, preMeta)
|
||||
if err != nil {
|
||||
return exerr.Wrap(err, "failed to call SQL pre-rollback listener").Int("tx.id", int(tx.id)).Build()
|
||||
}
|
||||
}
|
||||
|
||||
t1 := time.Now()
|
||||
|
||||
result := tx.tx.Rollback()
|
||||
|
||||
if result == nil {
|
||||
tx.status = TxStatusRollback
|
||||
}
|
||||
|
||||
postMeta := PostTxRollbackMeta{Init: t0, Start: t1, End: time.Now(), ExecCounter: tx.execCtr, QueryCounter: tx.queryCtr}
|
||||
for _, v := range tx.db.lstr {
|
||||
v.PostTxRollback(tx.id, result)
|
||||
v.PostTxRollback(tx.id, result, postMeta)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (tx *transaction) Commit() error {
|
||||
|
||||
t0 := time.Now()
|
||||
|
||||
preMeta := PreTxCommitMeta{}
|
||||
for _, v := range tx.db.lstr {
|
||||
err := v.PreTxCommit(tx.id)
|
||||
err := v.PreTxCommit(tx.id, preMeta)
|
||||
if err != nil {
|
||||
return exerr.Wrap(err, "failed to call SQL pre-commit listener").Int("tx.id", int(tx.id)).Build()
|
||||
}
|
||||
}
|
||||
|
||||
t1 := time.Now()
|
||||
|
||||
result := tx.tx.Commit()
|
||||
|
||||
if result == nil {
|
||||
tx.status = TxStatusComitted
|
||||
}
|
||||
|
||||
postMeta := PostTxCommitMeta{Init: t0, Start: t1, End: time.Now(), ExecCounter: tx.execCtr, QueryCounter: tx.queryCtr}
|
||||
for _, v := range tx.db.lstr {
|
||||
v.PostTxRollback(tx.id, result)
|
||||
v.PostTxCommit(tx.id, result, postMeta)
|
||||
}
|
||||
|
||||
return result
|
||||
@ -89,21 +104,29 @@ func (tx *transaction) Commit() error {
|
||||
|
||||
func (tx *transaction) Exec(ctx context.Context, sqlstr string, prep PP) (sql.Result, error) {
|
||||
origsql := sqlstr
|
||||
|
||||
t0 := time.Now()
|
||||
|
||||
preMeta := PreExecMeta{}
|
||||
for _, v := range tx.db.lstr {
|
||||
err := v.PreExec(ctx, langext.Ptr(tx.id), &sqlstr, &prep)
|
||||
err := v.PreExec(ctx, langext.Ptr(tx.id), &sqlstr, &prep, preMeta)
|
||||
if err != nil {
|
||||
return nil, exerr.Wrap(err, "failed to call SQL pre-exec listener").Int("tx.id", int(tx.id)).Str("original_sql", origsql).Str("sql", sqlstr).Any("sql_params", prep).Build()
|
||||
}
|
||||
}
|
||||
|
||||
t1 := time.Now()
|
||||
|
||||
res, err := tx.tx.NamedExecContext(ctx, sqlstr, prep)
|
||||
tx.execCtr++
|
||||
|
||||
if tx.status == TxStatusInitial && err == nil {
|
||||
tx.status = TxStatusActive
|
||||
}
|
||||
|
||||
postMeta := PostExecMeta{Init: t0, Start: t1, End: time.Now()}
|
||||
for _, v := range tx.db.lstr {
|
||||
v.PostExec(langext.Ptr(tx.id), origsql, sqlstr, prep)
|
||||
v.PostExec(langext.Ptr(tx.id), origsql, sqlstr, prep, postMeta)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
@ -114,21 +137,29 @@ func (tx *transaction) Exec(ctx context.Context, sqlstr string, prep PP) (sql.Re
|
||||
|
||||
func (tx *transaction) Query(ctx context.Context, sqlstr string, prep PP) (*sqlx.Rows, error) {
|
||||
origsql := sqlstr
|
||||
|
||||
t0 := time.Now()
|
||||
|
||||
preMeta := PreQueryMeta{}
|
||||
for _, v := range tx.db.lstr {
|
||||
err := v.PreQuery(ctx, langext.Ptr(tx.id), &sqlstr, &prep)
|
||||
err := v.PreQuery(ctx, langext.Ptr(tx.id), &sqlstr, &prep, preMeta)
|
||||
if err != nil {
|
||||
return nil, exerr.Wrap(err, "failed to call SQL pre-query listener").Int("tx.id", int(tx.id)).Str("original_sql", origsql).Str("sql", sqlstr).Any("sql_params", prep).Build()
|
||||
}
|
||||
}
|
||||
|
||||
t1 := time.Now()
|
||||
|
||||
rows, err := sqlx.NamedQueryContext(ctx, tx.tx, sqlstr, prep)
|
||||
tx.queryCtr++
|
||||
|
||||
if tx.status == TxStatusInitial && err == nil {
|
||||
tx.status = TxStatusActive
|
||||
}
|
||||
|
||||
postMeta := PostQueryMeta{Init: t0, Start: t1, End: time.Now()}
|
||||
for _, v := range tx.db.lstr {
|
||||
v.PostQuery(langext.Ptr(tx.id), origsql, sqlstr, prep)
|
||||
v.PostQuery(langext.Ptr(tx.id), origsql, sqlstr, prep, postMeta)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user