From e154137105dda4535158877ef617cdf9ac2a0a10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20Schw=C3=B6rer?= Date: Fri, 25 Oct 2024 09:45:42 +0200 Subject: [PATCH] Trying out paginated cursortoken variant [UNTESTED] --- cursortoken/token.go | 185 +++++++++-------------------------- cursortoken/tokenKeySort.go | 128 ++++++++++++++++++++++++ cursortoken/tokenPaginate.go | 33 +++++++ wmo/collection.go | 20 ++-- wmo/queryList.go | 122 +++++++++++++++++++++-- 5 files changed, 332 insertions(+), 156 deletions(-) create mode 100644 cursortoken/tokenKeySort.go create mode 100644 cursortoken/tokenPaginate.go diff --git a/cursortoken/token.go b/cursortoken/token.go index 0483720..c595241 100644 --- a/cursortoken/token.go +++ b/cursortoken/token.go @@ -3,12 +3,16 @@ package cursortoken import ( "encoding/base32" "encoding/json" - "go.mongodb.org/mongo-driver/bson/primitive" "gogs.mikescher.com/BlackForestBytes/goext/exerr" + "strconv" "strings" "time" ) +type CursorToken interface { + Token() string +} + type Mode string const ( @@ -24,97 +28,6 @@ type Extra struct { PageSize *int } -type CursorToken struct { - Mode Mode - ValuePrimary string - ValueSecondary string - Direction SortDirection - DirectionSecondary SortDirection - PageSize int - Extra Extra -} - -type cursorTokenSerialize struct { - ValuePrimary *string `json:"v1,omitempty"` - ValueSecondary *string `json:"v2,omitempty"` - Direction *SortDirection `json:"dir,omitempty"` - DirectionSecondary *SortDirection `json:"dir2,omitempty"` - PageSize *int `json:"size,omitempty"` - - ExtraTimestamp *time.Time `json:"ts,omitempty"` - ExtraId *string `json:"id,omitempty"` - ExtraPage *int `json:"pg,omitempty"` - ExtraPageSize *int `json:"sz,omitempty"` -} - -func Start() CursorToken { - return CursorToken{ - Mode: CTMStart, - ValuePrimary: "", - ValueSecondary: "", - Direction: "", - DirectionSecondary: "", - PageSize: 0, - Extra: Extra{}, - } -} - -func End() CursorToken { - return CursorToken{ - Mode: CTMEnd, - ValuePrimary: "", - ValueSecondary: "", - Direction: "", - DirectionSecondary: "", - PageSize: 0, - Extra: Extra{}, - } -} - -func (c *CursorToken) Token() string { - if c.Mode == CTMStart { - return "@start" - } - if c.Mode == CTMEnd { - return "@end" - } - - // We kinda manually implement omitempty for the CursorToken here - // because omitempty does not work for time.Time and otherwise we would always - // get weird time values when decoding a token that initially didn't have an Timestamp set - // For this usecase we treat Unix=0 as an empty timestamp - - sertok := cursorTokenSerialize{} - - if c.ValuePrimary != "" { - sertok.ValuePrimary = &c.ValuePrimary - } - if c.ValueSecondary != "" { - sertok.ValueSecondary = &c.ValueSecondary - } - if c.Direction != "" { - sertok.Direction = &c.Direction - } - if c.DirectionSecondary != "" { - sertok.DirectionSecondary = &c.DirectionSecondary - } - if c.PageSize != 0 { - sertok.PageSize = &c.PageSize - } - - sertok.ExtraTimestamp = c.Extra.Timestamp - sertok.ExtraId = c.Extra.Id - sertok.ExtraPage = c.Extra.Page - sertok.ExtraPageSize = c.Extra.PageSize - - body, err := json.Marshal(sertok) - if err != nil { - panic(err) - } - - return "tok_" + base32.StdEncoding.EncodeToString(body) -} - func Decode(tok string) (CursorToken, error) { if tok == "" { return Start(), nil @@ -125,60 +38,56 @@ func Decode(tok string) (CursorToken, error) { if strings.ToLower(tok) == "@end" { return End(), nil } - - if !strings.HasPrefix(tok, "tok_") { - return CursorToken{}, exerr.New(exerr.TypeCursorTokenDecode, "could not decode token, missing prefix").Str("token", tok).Build() + if strings.ToLower(tok) == "$end" { + return PageEnd(), nil + } + if strings.HasPrefix(tok, "$") && len(tok) > 1 { + n, err := strconv.ParseInt(tok[1:], 10, 64) + if err != nil { + return nil, exerr.Wrap(err, "failed to deserialize token").Str("token", tok).WithType(exerr.TypeCursorTokenDecode).Build() + } + return Page(int(n)), nil } - body, err := base32.StdEncoding.DecodeString(tok[len("tok_"):]) - if err != nil { - return CursorToken{}, err - } + if strings.HasPrefix(tok, "tok_") { - var tokenDeserialize cursorTokenSerialize - err = json.Unmarshal(body, &tokenDeserialize) - if err != nil { - return CursorToken{}, exerr.Wrap(err, "failed to deserialize token").Str("token", tok).Build() - } + body, err := base32.StdEncoding.DecodeString(tok[len("tok_"):]) + if err != nil { + return nil, err + } - token := CursorToken{Mode: CTMNormal} + var tokenDeserialize cursorTokenKeySortSerialize + err = json.Unmarshal(body, &tokenDeserialize) + if err != nil { + return nil, exerr.Wrap(err, "failed to deserialize token").Str("token", tok).WithType(exerr.TypeCursorTokenDecode).Build() + } - if tokenDeserialize.ValuePrimary != nil { - token.ValuePrimary = *tokenDeserialize.ValuePrimary - } - if tokenDeserialize.ValueSecondary != nil { - token.ValueSecondary = *tokenDeserialize.ValueSecondary - } - if tokenDeserialize.Direction != nil { - token.Direction = *tokenDeserialize.Direction - } - if tokenDeserialize.DirectionSecondary != nil { - token.DirectionSecondary = *tokenDeserialize.DirectionSecondary - } - if tokenDeserialize.PageSize != nil { - token.PageSize = *tokenDeserialize.PageSize - } + token := CTKeySort{Mode: CTMNormal} - token.Extra.Timestamp = tokenDeserialize.ExtraTimestamp - token.Extra.Id = tokenDeserialize.ExtraId - token.Extra.Page = tokenDeserialize.ExtraPage - token.Extra.PageSize = tokenDeserialize.ExtraPageSize + if tokenDeserialize.ValuePrimary != nil { + token.ValuePrimary = *tokenDeserialize.ValuePrimary + } + if tokenDeserialize.ValueSecondary != nil { + token.ValueSecondary = *tokenDeserialize.ValueSecondary + } + if tokenDeserialize.Direction != nil { + token.Direction = *tokenDeserialize.Direction + } + if tokenDeserialize.DirectionSecondary != nil { + token.DirectionSecondary = *tokenDeserialize.DirectionSecondary + } + if tokenDeserialize.PageSize != nil { + token.PageSize = *tokenDeserialize.PageSize + } - return token, nil -} + token.Extra.Timestamp = tokenDeserialize.ExtraTimestamp + token.Extra.Id = tokenDeserialize.ExtraId + token.Extra.Page = tokenDeserialize.ExtraPage + token.Extra.PageSize = tokenDeserialize.ExtraPageSize + + return token, nil -func (c *CursorToken) ValuePrimaryObjectId() (primitive.ObjectID, bool) { - if oid, err := primitive.ObjectIDFromHex(c.ValuePrimary); err == nil { - return oid, true } else { - return primitive.ObjectID{}, false - } -} - -func (c *CursorToken) ValueSecondaryObjectId() (primitive.ObjectID, bool) { - if oid, err := primitive.ObjectIDFromHex(c.ValueSecondary); err == nil { - return oid, true - } else { - return primitive.ObjectID{}, false + return nil, exerr.New(exerr.TypeCursorTokenDecode, "could not decode token, missing/unknown prefix").Str("token", tok).Build() } } diff --git a/cursortoken/tokenKeySort.go b/cursortoken/tokenKeySort.go new file mode 100644 index 0000000..cead3ed --- /dev/null +++ b/cursortoken/tokenKeySort.go @@ -0,0 +1,128 @@ +package cursortoken + +import ( + "encoding/base32" + "encoding/json" + "go.mongodb.org/mongo-driver/bson/primitive" + "time" +) + +type CTKeySort struct { + Mode Mode + ValuePrimary string + ValueSecondary string + Direction SortDirection + DirectionSecondary SortDirection + PageSize int + Extra Extra +} + +type cursorTokenKeySortSerialize struct { + ValuePrimary *string `json:"v1,omitempty"` + ValueSecondary *string `json:"v2,omitempty"` + Direction *SortDirection `json:"dir,omitempty"` + DirectionSecondary *SortDirection `json:"dir2,omitempty"` + PageSize *int `json:"size,omitempty"` + + ExtraTimestamp *time.Time `json:"ts,omitempty"` + ExtraId *string `json:"id,omitempty"` + ExtraPage *int `json:"pg,omitempty"` + ExtraPageSize *int `json:"sz,omitempty"` +} + +func NewKeySortToken(valuePrimary string, valueSecondary string, direction SortDirection, directionSecondary SortDirection, pageSize int, extra Extra) CursorToken { + return CTKeySort{ + Mode: CTMNormal, + ValuePrimary: valuePrimary, + ValueSecondary: valueSecondary, + Direction: direction, + DirectionSecondary: directionSecondary, + PageSize: pageSize, + Extra: extra, + } +} + +func Start() CursorToken { + return CTKeySort{ + Mode: CTMStart, + ValuePrimary: "", + ValueSecondary: "", + Direction: "", + DirectionSecondary: "", + PageSize: 0, + Extra: Extra{}, + } +} + +func End() CursorToken { + return CTKeySort{ + Mode: CTMEnd, + ValuePrimary: "", + ValueSecondary: "", + Direction: "", + DirectionSecondary: "", + PageSize: 0, + Extra: Extra{}, + } +} + +func (c CTKeySort) Token() string { + + if c.Mode == CTMStart { + return "@start" + } + if c.Mode == CTMEnd { + return "@end" + } + + // We kinda manually implement omitempty for the CursorToken here + // because omitempty does not work for time.Time and otherwise we would always + // get weird time values when decoding a token that initially didn't have an Timestamp set + // For this usecase we treat Unix=0 as an empty timestamp + + sertok := cursorTokenKeySortSerialize{} + + if c.ValuePrimary != "" { + sertok.ValuePrimary = &c.ValuePrimary + } + if c.ValueSecondary != "" { + sertok.ValueSecondary = &c.ValueSecondary + } + if c.Direction != "" { + sertok.Direction = &c.Direction + } + if c.DirectionSecondary != "" { + sertok.DirectionSecondary = &c.DirectionSecondary + } + if c.PageSize != 0 { + sertok.PageSize = &c.PageSize + } + + sertok.ExtraTimestamp = c.Extra.Timestamp + sertok.ExtraId = c.Extra.Id + sertok.ExtraPage = c.Extra.Page + sertok.ExtraPageSize = c.Extra.PageSize + + body, err := json.Marshal(sertok) + if err != nil { + panic(err) + } + + return "tok_" + base32.StdEncoding.EncodeToString(body) +} + +func (c CTKeySort) valuePrimaryObjectId() (primitive.ObjectID, bool) { + if oid, err := primitive.ObjectIDFromHex(c.ValuePrimary); err == nil { + return oid, true + } else { + return primitive.ObjectID{}, false + } +} + +func (c CTKeySort) valueSecondaryObjectId() (primitive.ObjectID, bool) { + if oid, err := primitive.ObjectIDFromHex(c.ValueSecondary); err == nil { + return oid, true + } else { + return primitive.ObjectID{}, false + } +} diff --git a/cursortoken/tokenPaginate.go b/cursortoken/tokenPaginate.go new file mode 100644 index 0000000..0505d6e --- /dev/null +++ b/cursortoken/tokenPaginate.go @@ -0,0 +1,33 @@ +package cursortoken + +import "strconv" + +type CTPaginated struct { + Mode Mode + Page int +} + +func Page(p int) CursorToken { + return CTPaginated{ + Mode: CTMNormal, + Page: p, + } +} + +func PageEnd() CursorToken { + return CTPaginated{ + Mode: CTMEnd, + Page: 0, + } +} + +func (c CTPaginated) Token() string { + if c.Mode == CTMStart { + return "$1" + } + if c.Mode == CTMEnd { + return "$end" + } + + return "$" + strconv.Itoa(c.Page) +} diff --git a/wmo/collection.go b/wmo/collection.go index 8431e3f..2cc92a4 100644 --- a/wmo/collection.go +++ b/wmo/collection.go @@ -120,25 +120,25 @@ func (c *Coll[TData]) createToken(fieldPrimary string, dirPrimary ct.SortDirecti valuePrimary, err := c.getFieldValueAsTokenString(lastEntity, fieldPrimary) if err != nil { - return ct.CursorToken{}, exerr.Wrap(err, "failed to get (primary) field-value as token-string").Type("lastEntity", lastEntity).Str("fieldPrimary", fieldPrimary).Build() + return nil, exerr.Wrap(err, "failed to get (primary) field-value as token-string").Type("lastEntity", lastEntity).Str("fieldPrimary", fieldPrimary).Build() } valueSeconary := "" if fieldSecondary != nil && dirSecondary != nil { valueSeconary, err = c.getFieldValueAsTokenString(lastEntity, *fieldSecondary) if err != nil { - return ct.CursorToken{}, exerr.Wrap(err, "failed to get (secondary) field-value as token-string").Type("lastEntity", lastEntity).StrPtr("fieldSecondary", fieldSecondary).Build() + return nil, exerr.Wrap(err, "failed to get (secondary) field-value as token-string").Type("lastEntity", lastEntity).StrPtr("fieldSecondary", fieldSecondary).Build() } } - return ct.CursorToken{ - Mode: ct.CTMNormal, - ValuePrimary: valuePrimary, - ValueSecondary: valueSeconary, - Direction: dirPrimary, - PageSize: langext.Coalesce(pageSize, 0), - Extra: ct.Extra{}, - }, nil + return ct.NewKeySortToken( + valuePrimary, + valueSeconary, + dirPrimary, + dirPrimary, + langext.Coalesce(pageSize, 0), + ct.Extra{}, + ), nil } func (c *Coll[TData]) needsDoubleSort(ctx context.Context) bool { diff --git a/wmo/queryList.go b/wmo/queryList.go index a0c753d..5bf2766 100644 --- a/wmo/queryList.go +++ b/wmo/queryList.go @@ -10,6 +10,24 @@ import ( ) func (c *Coll[TData]) List(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CursorToken) ([]TData, ct.CursorToken, error) { + if ctks, ok := inTok.(ct.CTKeySort); ok { + d, tok, err := c.listWithKSToken(ctx, filter, pageSize, ctks) + if err != nil { + return nil, ct.End(), err + } + return d, tok, nil + } else if ctks, ok := inTok.(ct.CTPaginated); ok { + d, tok, err := c.listWithPaginatedToken(ctx, filter, pageSize, ctks) + if err != nil { + return nil, ct.End(), err + } + return d, tok, nil + } else { + return nil, ct.End(), exerr.New(exerr.TypeCursorTokenDecode, "unknown ct type").Any("token", inTok).Type("tokenType", inTok).Build() + } +} + +func (c *Coll[TData]) listWithKSToken(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CTKeySort) ([]TData, ct.CursorToken, error) { if inTok.Mode == ct.CTMEnd { return make([]TData, 0), ct.End(), nil } @@ -41,7 +59,7 @@ func (c *Coll[TData]) List(ctx context.Context, filter ct.Filter, pageSize *int, paginationPipeline, doubleSortPipeline, err := createPaginationPipeline(c, inTok, sortPrimary, sortDirPrimary, sortSecondary, sortDirSecondary, pageSize) if err != nil { - return nil, ct.CursorToken{}, exerr. + return nil, nil, exerr. Wrap(err, "failed to create pagination"). WithType(exerr.TypeCursorTokenDecode). Str("collection", c.Name()). @@ -66,7 +84,7 @@ func (c *Coll[TData]) List(ctx context.Context, filter ct.Filter, pageSize *int, cursor, err := c.coll.Aggregate(ctx, pipeline) if err != nil { - return nil, ct.CursorToken{}, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipeline).Str("collection", c.Name()).Build() + return nil, nil, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipeline).Str("collection", c.Name()).Build() } defer func() { _ = cursor.Close(ctx) }() @@ -75,7 +93,7 @@ func (c *Coll[TData]) List(ctx context.Context, filter ct.Filter, pageSize *int, if pageSize == nil { entries, err := c.decodeAll(ctx, cursor) if err != nil { - return nil, ct.CursorToken{}, exerr.Wrap(err, "failed to all-decode entities").Build() + return nil, nil, exerr.Wrap(err, "failed to all-decode entities").Build() } return entries, ct.End(), nil } @@ -85,7 +103,7 @@ func (c *Coll[TData]) List(ctx context.Context, filter ct.Filter, pageSize *int, var entry TData entry, err = c.decodeSingle(ctx, cursor) if err != nil { - return nil, ct.CursorToken{}, exerr.Wrap(err, "failed to decode entity").Build() + return nil, nil, exerr.Wrap(err, "failed to decode entity").Build() } entities = append(entities, entry) } @@ -100,12 +118,70 @@ func (c *Coll[TData]) List(ctx context.Context, filter ct.Filter, pageSize *int, nextToken, err := c.createToken(sortPrimary, sortDirPrimary, sortSecondary, sortDirSecondary, last, pageSize) if err != nil { - return nil, ct.CursorToken{}, exerr.Wrap(err, "failed to create (out)-token").Build() + return nil, nil, exerr.Wrap(err, "failed to create (out)-token").Build() } return entities, nextToken, nil } +func (c *Coll[TData]) listWithPaginatedToken(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CTPaginated) ([]TData, ct.CursorToken, error) { + var err error + + page := inTok.Page + + if page < 0 { + page = 1 + } + + pipelineSort := mongo.Pipeline{} + pipelineFilter := mongo.Pipeline{} + + if filter != nil { + pipelineFilter = filter.FilterQuery(ctx) + pf1, pd1, pf2, pd2 := filter.Pagination(ctx) + + pipelineSort, err = createSortOnlyPipeline(pf1, pd1, &pf2, &pd2) + if err != nil { + return nil, nil, exerr.Wrap(err, "failed to create sort pipeline").Build() + } + } + + pipelinePaginate := mongo.Pipeline{} + if pageSize != nil { + pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$skip", Value: *pageSize * (page - 1)}}) + pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$limit", Value: *pageSize}}) + } else { + page = 1 + } + + pipelineCount := mongo.Pipeline{} + pipelineCount = append(pipelineCount, bson.D{{Key: "$count", Value: "count"}}) + + extrModPipelineResolved := mongo.Pipeline{} + for _, ppl := range c.extraModPipeline { + extrModPipelineResolved = langext.ArrConcat(extrModPipelineResolved, ppl(ctx)) + } + + pipelineList := langext.ArrConcat(pipelineFilter, pipelineSort, pipelinePaginate, extrModPipelineResolved, pipelineSort) + + cursorList, err := c.coll.Aggregate(ctx, pipelineList) + if err != nil { + return nil, nil, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipelineList).Str("collection", c.Name()).Build() + } + + entities, err := c.decodeAll(ctx, cursorList) + if err != nil { + return nil, nil, exerr.Wrap(err, "failed to all-decode entities").Build() + } + + tokOut := ct.Page(page + 1) + if pageSize == nil || len(entities) < *pageSize { + tokOut = ct.PageEnd() + } + + return entities, tokOut, nil +} + func (c *Coll[TData]) Count(ctx context.Context, filter ct.RawFilter) (int64, error) { type countRes struct { Count int64 `bson:"c"` @@ -138,12 +214,12 @@ func (c *Coll[TData]) ListWithCount(ctx context.Context, filter ct.Filter, pageS // NOTE: Possible optimization: Cache count in CursorToken, then fetch count only on first page. count, err := c.Count(ctx, filter) if err != nil { - return nil, ct.CursorToken{}, 0, err + return nil, nil, 0, err } data, token, err := c.List(ctx, filter, pageSize, inTok) if err != nil { - return nil, ct.CursorToken{}, 0, err + return nil, nil, 0, err } return data, token, count, nil } @@ -184,7 +260,7 @@ func (c *Coll[TData]) ListAllIDs(ctx context.Context, filter ct.RawFilter) ([]st return langext.ArrMap(res, func(v idObject) string { return v.ID }), nil } -func createPaginationPipeline[TData any](coll *Coll[TData], token ct.CursorToken, fieldPrimary string, sortPrimary ct.SortDirection, fieldSecondary *string, sortSecondary *ct.SortDirection, pageSize *int) ([]bson.D, []bson.D, error) { +func createPaginationPipeline[TData any](coll *Coll[TData], token ct.CTKeySort, fieldPrimary string, sortPrimary ct.SortDirection, fieldSecondary *string, sortSecondary *ct.SortDirection, pageSize *int) ([]bson.D, []bson.D, error) { cond := bson.A{} sort := bson.D{} @@ -265,3 +341,33 @@ func createPaginationPipeline[TData any](coll *Coll[TData], token ct.CursorToken return pipeline, pipelineSort, nil } + +func createSortOnlyPipeline(fieldPrimary string, sortPrimary ct.SortDirection, fieldSecondary *string, sortSecondary *ct.SortDirection) ([]bson.D, error) { + + sort := bson.D{} + + if sortPrimary == ct.SortASC { + // We sort ASC on - so we want all entries newer ($gt) than the $primary + sort = append(sort, bson.E{Key: fieldPrimary, Value: +1}) + } else if sortPrimary == ct.SortDESC { + // We sort DESC on - so we want all entries older ($lt) than the $primary + sort = append(sort, bson.E{Key: fieldPrimary, Value: -1}) + } + + if fieldSecondary != nil && sortSecondary != nil && *fieldSecondary != fieldPrimary { + + if *sortSecondary == ct.SortASC { + + sort = append(sort, bson.E{Key: *fieldSecondary, Value: +1}) + + } else if *sortSecondary == ct.SortDESC { + + sort = append(sort, bson.E{Key: *fieldSecondary, Value: -1}) + + } + } + + pipelineSort := mongo.Pipeline{bson.D{{Key: "$sort", Value: sort}}} + + return pipelineSort, nil +}