Merge branch 'cursortoken-paginated'
Some checks failed
Build Docker and Deploy / Run goext test-suite (push) Has been cancelled

This commit is contained in:
Mike Schwörer 2024-10-27 02:18:25 +02:00
commit d4894e31fe
Signed by: Mikescher
GPG Key ID: D3C7172E0A70F8CF
5 changed files with 332 additions and 156 deletions

View File

@ -3,12 +3,16 @@ package cursortoken
import ( import (
"encoding/base32" "encoding/base32"
"encoding/json" "encoding/json"
"go.mongodb.org/mongo-driver/bson/primitive"
"gogs.mikescher.com/BlackForestBytes/goext/exerr" "gogs.mikescher.com/BlackForestBytes/goext/exerr"
"strconv"
"strings" "strings"
"time" "time"
) )
type CursorToken interface {
Token() string
}
type Mode string type Mode string
const ( const (
@ -24,97 +28,6 @@ type Extra struct {
PageSize *int PageSize *int
} }
type CursorToken struct {
Mode Mode
ValuePrimary string
ValueSecondary string
Direction SortDirection
DirectionSecondary SortDirection
PageSize int
Extra Extra
}
type cursorTokenSerialize struct {
ValuePrimary *string `json:"v1,omitempty"`
ValueSecondary *string `json:"v2,omitempty"`
Direction *SortDirection `json:"dir,omitempty"`
DirectionSecondary *SortDirection `json:"dir2,omitempty"`
PageSize *int `json:"size,omitempty"`
ExtraTimestamp *time.Time `json:"ts,omitempty"`
ExtraId *string `json:"id,omitempty"`
ExtraPage *int `json:"pg,omitempty"`
ExtraPageSize *int `json:"sz,omitempty"`
}
func Start() CursorToken {
return CursorToken{
Mode: CTMStart,
ValuePrimary: "",
ValueSecondary: "",
Direction: "",
DirectionSecondary: "",
PageSize: 0,
Extra: Extra{},
}
}
func End() CursorToken {
return CursorToken{
Mode: CTMEnd,
ValuePrimary: "",
ValueSecondary: "",
Direction: "",
DirectionSecondary: "",
PageSize: 0,
Extra: Extra{},
}
}
func (c *CursorToken) Token() string {
if c.Mode == CTMStart {
return "@start"
}
if c.Mode == CTMEnd {
return "@end"
}
// We kinda manually implement omitempty for the CursorToken here
// because omitempty does not work for time.Time and otherwise we would always
// get weird time values when decoding a token that initially didn't have an Timestamp set
// For this usecase we treat Unix=0 as an empty timestamp
sertok := cursorTokenSerialize{}
if c.ValuePrimary != "" {
sertok.ValuePrimary = &c.ValuePrimary
}
if c.ValueSecondary != "" {
sertok.ValueSecondary = &c.ValueSecondary
}
if c.Direction != "" {
sertok.Direction = &c.Direction
}
if c.DirectionSecondary != "" {
sertok.DirectionSecondary = &c.DirectionSecondary
}
if c.PageSize != 0 {
sertok.PageSize = &c.PageSize
}
sertok.ExtraTimestamp = c.Extra.Timestamp
sertok.ExtraId = c.Extra.Id
sertok.ExtraPage = c.Extra.Page
sertok.ExtraPageSize = c.Extra.PageSize
body, err := json.Marshal(sertok)
if err != nil {
panic(err)
}
return "tok_" + base32.StdEncoding.EncodeToString(body)
}
func Decode(tok string) (CursorToken, error) { func Decode(tok string) (CursorToken, error) {
if tok == "" { if tok == "" {
return Start(), nil return Start(), nil
@ -125,23 +38,31 @@ func Decode(tok string) (CursorToken, error) {
if strings.ToLower(tok) == "@end" { if strings.ToLower(tok) == "@end" {
return End(), nil return End(), nil
} }
if strings.ToLower(tok) == "$end" {
if !strings.HasPrefix(tok, "tok_") { return PageEnd(), nil
return CursorToken{}, exerr.New(exerr.TypeCursorTokenDecode, "could not decode token, missing prefix").Str("token", tok).Build()
} }
if strings.HasPrefix(tok, "$") && len(tok) > 1 {
n, err := strconv.ParseInt(tok[1:], 10, 64)
if err != nil {
return nil, exerr.Wrap(err, "failed to deserialize token").Str("token", tok).WithType(exerr.TypeCursorTokenDecode).Build()
}
return Page(int(n)), nil
}
if strings.HasPrefix(tok, "tok_") {
body, err := base32.StdEncoding.DecodeString(tok[len("tok_"):]) body, err := base32.StdEncoding.DecodeString(tok[len("tok_"):])
if err != nil { if err != nil {
return CursorToken{}, err return nil, err
} }
var tokenDeserialize cursorTokenSerialize var tokenDeserialize cursorTokenKeySortSerialize
err = json.Unmarshal(body, &tokenDeserialize) err = json.Unmarshal(body, &tokenDeserialize)
if err != nil { if err != nil {
return CursorToken{}, exerr.Wrap(err, "failed to deserialize token").Str("token", tok).Build() return nil, exerr.Wrap(err, "failed to deserialize token").Str("token", tok).WithType(exerr.TypeCursorTokenDecode).Build()
} }
token := CursorToken{Mode: CTMNormal} token := CTKeySort{Mode: CTMNormal}
if tokenDeserialize.ValuePrimary != nil { if tokenDeserialize.ValuePrimary != nil {
token.ValuePrimary = *tokenDeserialize.ValuePrimary token.ValuePrimary = *tokenDeserialize.ValuePrimary
@ -165,20 +86,8 @@ func Decode(tok string) (CursorToken, error) {
token.Extra.PageSize = tokenDeserialize.ExtraPageSize token.Extra.PageSize = tokenDeserialize.ExtraPageSize
return token, nil return token, nil
}
func (c *CursorToken) ValuePrimaryObjectId() (primitive.ObjectID, bool) {
if oid, err := primitive.ObjectIDFromHex(c.ValuePrimary); err == nil {
return oid, true
} else { } else {
return primitive.ObjectID{}, false return nil, exerr.New(exerr.TypeCursorTokenDecode, "could not decode token, missing/unknown prefix").Str("token", tok).Build()
}
}
func (c *CursorToken) ValueSecondaryObjectId() (primitive.ObjectID, bool) {
if oid, err := primitive.ObjectIDFromHex(c.ValueSecondary); err == nil {
return oid, true
} else {
return primitive.ObjectID{}, false
} }
} }

128
cursortoken/tokenKeySort.go Normal file
View File

@ -0,0 +1,128 @@
package cursortoken
import (
"encoding/base32"
"encoding/json"
"go.mongodb.org/mongo-driver/bson/primitive"
"time"
)
type CTKeySort struct {
Mode Mode
ValuePrimary string
ValueSecondary string
Direction SortDirection
DirectionSecondary SortDirection
PageSize int
Extra Extra
}
type cursorTokenKeySortSerialize struct {
ValuePrimary *string `json:"v1,omitempty"`
ValueSecondary *string `json:"v2,omitempty"`
Direction *SortDirection `json:"dir,omitempty"`
DirectionSecondary *SortDirection `json:"dir2,omitempty"`
PageSize *int `json:"size,omitempty"`
ExtraTimestamp *time.Time `json:"ts,omitempty"`
ExtraId *string `json:"id,omitempty"`
ExtraPage *int `json:"pg,omitempty"`
ExtraPageSize *int `json:"sz,omitempty"`
}
func NewKeySortToken(valuePrimary string, valueSecondary string, direction SortDirection, directionSecondary SortDirection, pageSize int, extra Extra) CursorToken {
return CTKeySort{
Mode: CTMNormal,
ValuePrimary: valuePrimary,
ValueSecondary: valueSecondary,
Direction: direction,
DirectionSecondary: directionSecondary,
PageSize: pageSize,
Extra: extra,
}
}
func Start() CursorToken {
return CTKeySort{
Mode: CTMStart,
ValuePrimary: "",
ValueSecondary: "",
Direction: "",
DirectionSecondary: "",
PageSize: 0,
Extra: Extra{},
}
}
func End() CursorToken {
return CTKeySort{
Mode: CTMEnd,
ValuePrimary: "",
ValueSecondary: "",
Direction: "",
DirectionSecondary: "",
PageSize: 0,
Extra: Extra{},
}
}
func (c CTKeySort) Token() string {
if c.Mode == CTMStart {
return "@start"
}
if c.Mode == CTMEnd {
return "@end"
}
// We kinda manually implement omitempty for the CursorToken here
// because omitempty does not work for time.Time and otherwise we would always
// get weird time values when decoding a token that initially didn't have an Timestamp set
// For this usecase we treat Unix=0 as an empty timestamp
sertok := cursorTokenKeySortSerialize{}
if c.ValuePrimary != "" {
sertok.ValuePrimary = &c.ValuePrimary
}
if c.ValueSecondary != "" {
sertok.ValueSecondary = &c.ValueSecondary
}
if c.Direction != "" {
sertok.Direction = &c.Direction
}
if c.DirectionSecondary != "" {
sertok.DirectionSecondary = &c.DirectionSecondary
}
if c.PageSize != 0 {
sertok.PageSize = &c.PageSize
}
sertok.ExtraTimestamp = c.Extra.Timestamp
sertok.ExtraId = c.Extra.Id
sertok.ExtraPage = c.Extra.Page
sertok.ExtraPageSize = c.Extra.PageSize
body, err := json.Marshal(sertok)
if err != nil {
panic(err)
}
return "tok_" + base32.StdEncoding.EncodeToString(body)
}
func (c CTKeySort) valuePrimaryObjectId() (primitive.ObjectID, bool) {
if oid, err := primitive.ObjectIDFromHex(c.ValuePrimary); err == nil {
return oid, true
} else {
return primitive.ObjectID{}, false
}
}
func (c CTKeySort) valueSecondaryObjectId() (primitive.ObjectID, bool) {
if oid, err := primitive.ObjectIDFromHex(c.ValueSecondary); err == nil {
return oid, true
} else {
return primitive.ObjectID{}, false
}
}

View File

@ -0,0 +1,33 @@
package cursortoken
import "strconv"
type CTPaginated struct {
Mode Mode
Page int
}
func Page(p int) CursorToken {
return CTPaginated{
Mode: CTMNormal,
Page: p,
}
}
func PageEnd() CursorToken {
return CTPaginated{
Mode: CTMEnd,
Page: 0,
}
}
func (c CTPaginated) Token() string {
if c.Mode == CTMStart {
return "$1"
}
if c.Mode == CTMEnd {
return "$end"
}
return "$" + strconv.Itoa(c.Page)
}

View File

@ -120,25 +120,25 @@ func (c *Coll[TData]) createToken(fieldPrimary string, dirPrimary ct.SortDirecti
valuePrimary, err := c.getFieldValueAsTokenString(lastEntity, fieldPrimary) valuePrimary, err := c.getFieldValueAsTokenString(lastEntity, fieldPrimary)
if err != nil { if err != nil {
return ct.CursorToken{}, exerr.Wrap(err, "failed to get (primary) field-value as token-string").Type("lastEntity", lastEntity).Str("fieldPrimary", fieldPrimary).Build() return nil, exerr.Wrap(err, "failed to get (primary) field-value as token-string").Type("lastEntity", lastEntity).Str("fieldPrimary", fieldPrimary).Build()
} }
valueSeconary := "" valueSeconary := ""
if fieldSecondary != nil && dirSecondary != nil { if fieldSecondary != nil && dirSecondary != nil {
valueSeconary, err = c.getFieldValueAsTokenString(lastEntity, *fieldSecondary) valueSeconary, err = c.getFieldValueAsTokenString(lastEntity, *fieldSecondary)
if err != nil { if err != nil {
return ct.CursorToken{}, exerr.Wrap(err, "failed to get (secondary) field-value as token-string").Type("lastEntity", lastEntity).StrPtr("fieldSecondary", fieldSecondary).Build() return nil, exerr.Wrap(err, "failed to get (secondary) field-value as token-string").Type("lastEntity", lastEntity).StrPtr("fieldSecondary", fieldSecondary).Build()
} }
} }
return ct.CursorToken{ return ct.NewKeySortToken(
Mode: ct.CTMNormal, valuePrimary,
ValuePrimary: valuePrimary, valueSeconary,
ValueSecondary: valueSeconary, dirPrimary,
Direction: dirPrimary, dirPrimary,
PageSize: langext.Coalesce(pageSize, 0), langext.Coalesce(pageSize, 0),
Extra: ct.Extra{}, ct.Extra{},
}, nil ), nil
} }
func (c *Coll[TData]) needsDoubleSort(ctx context.Context) bool { func (c *Coll[TData]) needsDoubleSort(ctx context.Context) bool {

View File

@ -10,6 +10,24 @@ import (
) )
func (c *Coll[TData]) List(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CursorToken) ([]TData, ct.CursorToken, error) { func (c *Coll[TData]) List(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CursorToken) ([]TData, ct.CursorToken, error) {
if ctks, ok := inTok.(ct.CTKeySort); ok {
d, tok, err := c.listWithKSToken(ctx, filter, pageSize, ctks)
if err != nil {
return nil, ct.End(), err
}
return d, tok, nil
} else if ctks, ok := inTok.(ct.CTPaginated); ok {
d, tok, err := c.listWithPaginatedToken(ctx, filter, pageSize, ctks)
if err != nil {
return nil, ct.End(), err
}
return d, tok, nil
} else {
return nil, ct.End(), exerr.New(exerr.TypeCursorTokenDecode, "unknown ct type").Any("token", inTok).Type("tokenType", inTok).Build()
}
}
func (c *Coll[TData]) listWithKSToken(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CTKeySort) ([]TData, ct.CursorToken, error) {
if inTok.Mode == ct.CTMEnd { if inTok.Mode == ct.CTMEnd {
return make([]TData, 0), ct.End(), nil return make([]TData, 0), ct.End(), nil
} }
@ -41,7 +59,7 @@ func (c *Coll[TData]) List(ctx context.Context, filter ct.Filter, pageSize *int,
paginationPipeline, doubleSortPipeline, err := createPaginationPipeline(c, inTok, sortPrimary, sortDirPrimary, sortSecondary, sortDirSecondary, pageSize) paginationPipeline, doubleSortPipeline, err := createPaginationPipeline(c, inTok, sortPrimary, sortDirPrimary, sortSecondary, sortDirSecondary, pageSize)
if err != nil { if err != nil {
return nil, ct.CursorToken{}, exerr. return nil, nil, exerr.
Wrap(err, "failed to create pagination"). Wrap(err, "failed to create pagination").
WithType(exerr.TypeCursorTokenDecode). WithType(exerr.TypeCursorTokenDecode).
Str("collection", c.Name()). Str("collection", c.Name()).
@ -66,7 +84,7 @@ func (c *Coll[TData]) List(ctx context.Context, filter ct.Filter, pageSize *int,
cursor, err := c.coll.Aggregate(ctx, pipeline) cursor, err := c.coll.Aggregate(ctx, pipeline)
if err != nil { if err != nil {
return nil, ct.CursorToken{}, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipeline).Str("collection", c.Name()).Build() return nil, nil, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipeline).Str("collection", c.Name()).Build()
} }
defer func() { _ = cursor.Close(ctx) }() defer func() { _ = cursor.Close(ctx) }()
@ -75,7 +93,7 @@ func (c *Coll[TData]) List(ctx context.Context, filter ct.Filter, pageSize *int,
if pageSize == nil { if pageSize == nil {
entries, err := c.decodeAll(ctx, cursor) entries, err := c.decodeAll(ctx, cursor)
if err != nil { if err != nil {
return nil, ct.CursorToken{}, exerr.Wrap(err, "failed to all-decode entities").Build() return nil, nil, exerr.Wrap(err, "failed to all-decode entities").Build()
} }
return entries, ct.End(), nil return entries, ct.End(), nil
} }
@ -85,7 +103,7 @@ func (c *Coll[TData]) List(ctx context.Context, filter ct.Filter, pageSize *int,
var entry TData var entry TData
entry, err = c.decodeSingle(ctx, cursor) entry, err = c.decodeSingle(ctx, cursor)
if err != nil { if err != nil {
return nil, ct.CursorToken{}, exerr.Wrap(err, "failed to decode entity").Build() return nil, nil, exerr.Wrap(err, "failed to decode entity").Build()
} }
entities = append(entities, entry) entities = append(entities, entry)
} }
@ -100,12 +118,70 @@ func (c *Coll[TData]) List(ctx context.Context, filter ct.Filter, pageSize *int,
nextToken, err := c.createToken(sortPrimary, sortDirPrimary, sortSecondary, sortDirSecondary, last, pageSize) nextToken, err := c.createToken(sortPrimary, sortDirPrimary, sortSecondary, sortDirSecondary, last, pageSize)
if err != nil { if err != nil {
return nil, ct.CursorToken{}, exerr.Wrap(err, "failed to create (out)-token").Build() return nil, nil, exerr.Wrap(err, "failed to create (out)-token").Build()
} }
return entities, nextToken, nil return entities, nextToken, nil
} }
func (c *Coll[TData]) listWithPaginatedToken(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CTPaginated) ([]TData, ct.CursorToken, error) {
var err error
page := inTok.Page
if page < 0 {
page = 1
}
pipelineSort := mongo.Pipeline{}
pipelineFilter := mongo.Pipeline{}
if filter != nil {
pipelineFilter = filter.FilterQuery(ctx)
pf1, pd1, pf2, pd2 := filter.Pagination(ctx)
pipelineSort, err = createSortOnlyPipeline(pf1, pd1, &pf2, &pd2)
if err != nil {
return nil, nil, exerr.Wrap(err, "failed to create sort pipeline").Build()
}
}
pipelinePaginate := mongo.Pipeline{}
if pageSize != nil {
pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$skip", Value: *pageSize * (page - 1)}})
pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$limit", Value: *pageSize}})
} else {
page = 1
}
pipelineCount := mongo.Pipeline{}
pipelineCount = append(pipelineCount, bson.D{{Key: "$count", Value: "count"}})
extrModPipelineResolved := mongo.Pipeline{}
for _, ppl := range c.extraModPipeline {
extrModPipelineResolved = langext.ArrConcat(extrModPipelineResolved, ppl(ctx))
}
pipelineList := langext.ArrConcat(pipelineFilter, pipelineSort, pipelinePaginate, extrModPipelineResolved, pipelineSort)
cursorList, err := c.coll.Aggregate(ctx, pipelineList)
if err != nil {
return nil, nil, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipelineList).Str("collection", c.Name()).Build()
}
entities, err := c.decodeAll(ctx, cursorList)
if err != nil {
return nil, nil, exerr.Wrap(err, "failed to all-decode entities").Build()
}
tokOut := ct.Page(page + 1)
if pageSize == nil || len(entities) < *pageSize {
tokOut = ct.PageEnd()
}
return entities, tokOut, nil
}
func (c *Coll[TData]) Count(ctx context.Context, filter ct.RawFilter) (int64, error) { func (c *Coll[TData]) Count(ctx context.Context, filter ct.RawFilter) (int64, error) {
type countRes struct { type countRes struct {
Count int64 `bson:"c"` Count int64 `bson:"c"`
@ -138,12 +214,12 @@ func (c *Coll[TData]) ListWithCount(ctx context.Context, filter ct.Filter, pageS
// NOTE: Possible optimization: Cache count in CursorToken, then fetch count only on first page. // NOTE: Possible optimization: Cache count in CursorToken, then fetch count only on first page.
count, err := c.Count(ctx, filter) count, err := c.Count(ctx, filter)
if err != nil { if err != nil {
return nil, ct.CursorToken{}, 0, err return nil, nil, 0, err
} }
data, token, err := c.List(ctx, filter, pageSize, inTok) data, token, err := c.List(ctx, filter, pageSize, inTok)
if err != nil { if err != nil {
return nil, ct.CursorToken{}, 0, err return nil, nil, 0, err
} }
return data, token, count, nil return data, token, count, nil
} }
@ -184,7 +260,7 @@ func (c *Coll[TData]) ListAllIDs(ctx context.Context, filter ct.RawFilter) ([]st
return langext.ArrMap(res, func(v idObject) string { return v.ID }), nil return langext.ArrMap(res, func(v idObject) string { return v.ID }), nil
} }
func createPaginationPipeline[TData any](coll *Coll[TData], token ct.CursorToken, fieldPrimary string, sortPrimary ct.SortDirection, fieldSecondary *string, sortSecondary *ct.SortDirection, pageSize *int) ([]bson.D, []bson.D, error) { func createPaginationPipeline[TData any](coll *Coll[TData], token ct.CTKeySort, fieldPrimary string, sortPrimary ct.SortDirection, fieldSecondary *string, sortSecondary *ct.SortDirection, pageSize *int) ([]bson.D, []bson.D, error) {
cond := bson.A{} cond := bson.A{}
sort := bson.D{} sort := bson.D{}
@ -265,3 +341,33 @@ func createPaginationPipeline[TData any](coll *Coll[TData], token ct.CursorToken
return pipeline, pipelineSort, nil return pipeline, pipelineSort, nil
} }
func createSortOnlyPipeline(fieldPrimary string, sortPrimary ct.SortDirection, fieldSecondary *string, sortSecondary *ct.SortDirection) ([]bson.D, error) {
sort := bson.D{}
if sortPrimary == ct.SortASC {
// We sort ASC on <field> - so we want all entries newer ($gt) than the $primary
sort = append(sort, bson.E{Key: fieldPrimary, Value: +1})
} else if sortPrimary == ct.SortDESC {
// We sort DESC on <field> - so we want all entries older ($lt) than the $primary
sort = append(sort, bson.E{Key: fieldPrimary, Value: -1})
}
if fieldSecondary != nil && sortSecondary != nil && *fieldSecondary != fieldPrimary {
if *sortSecondary == ct.SortASC {
sort = append(sort, bson.E{Key: *fieldSecondary, Value: +1})
} else if *sortSecondary == ct.SortDESC {
sort = append(sort, bson.E{Key: *fieldSecondary, Value: -1})
}
}
pipelineSort := mongo.Pipeline{bson.D{{Key: "$sort", Value: sort}}}
return pipelineSort, nil
}