From 28cdfc5bd2eaeee1a693ed8d38a11ceedc19628f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mike=20Schw=C3=B6rer?= Date: Wed, 29 Jan 2025 10:54:53 +0100 Subject: [PATCH] v0.0.560 wmo ListIterateFunc + ListIterate --- goextVersion.go | 4 +- wmo/queryFind.go | 5 +- wmo/queryList.go | 365 ++++++++++++++++++++++++++++++----------------- 3 files changed, 236 insertions(+), 138 deletions(-) diff --git a/goextVersion.go b/goextVersion.go index 2fde419..2461893 100644 --- a/goextVersion.go +++ b/goextVersion.go @@ -1,5 +1,5 @@ package goext -const GoextVersion = "0.0.559" +const GoextVersion = "0.0.560" -const GoextVersionTimestamp = "2025-01-28T15:55:18+0100" +const GoextVersionTimestamp = "2025-01-29T10:54:53+0100" diff --git a/wmo/queryFind.go b/wmo/queryFind.go index ec7e96f..253699f 100644 --- a/wmo/queryFind.go +++ b/wmo/queryFind.go @@ -85,7 +85,7 @@ func (c *Coll[TData]) Find(ctx context.Context, filter bson.M, opts ...*options. return res, nil } -func (c *Coll[TData]) IterateFunc(ctx context.Context, filter bson.M, fn func(v TData) error, opts ...*options.FindOptions) error { +func (c *Coll[TData]) FindIterateFunc(ctx context.Context, filter bson.M, fn func(v TData) error, opts ...*options.FindOptions) error { cursor, err := c.createFindQuery(ctx, filter, opts...) if err != nil { @@ -111,8 +111,7 @@ func (c *Coll[TData]) IterateFunc(ctx context.Context, filter bson.M, fn func(v return nil } -func (c *Coll[TData]) Iterate(ctx context.Context, filter bson.M, opts ...*options.FindOptions) iter.Seq2[TData, error] { - +func (c *Coll[TData]) FindIterate(ctx context.Context, filter bson.M, opts ...*options.FindOptions) iter.Seq2[TData, error] { cursor, err := c.createFindQuery(ctx, filter, opts...) if err != nil { return langext.IterSingleValueSeq2[TData, error](nil, exerr.Wrap(err, "").Build()) diff --git a/wmo/queryList.go b/wmo/queryList.go index ad79d8b..6836f08 100644 --- a/wmo/queryList.go +++ b/wmo/queryList.go @@ -7,6 +7,7 @@ import ( ct "gogs.mikescher.com/BlackForestBytes/goext/cursortoken" "gogs.mikescher.com/BlackForestBytes/goext/exerr" "gogs.mikescher.com/BlackForestBytes/goext/langext" + "iter" ) func (c *Coll[TData]) List(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CursorToken) ([]TData, ct.CursorToken, error) { @@ -20,8 +21,8 @@ func (c *Coll[TData]) List(ctx context.Context, filter ct.Filter, pageSize *int, return nil, ct.End(), err } return d, tok, nil - } else if ctks, ok := inTok.(ct.CTPaginated); ok { - d, tok, err := c.listWithPaginatedToken(ctx, filter, pageSize, ctks) + } else if ctpag, ok := inTok.(ct.CTPaginated); ok { + d, tok, err := c.listWithPaginatedToken(ctx, filter, pageSize, ctpag) if err != nil { return nil, ct.End(), err } @@ -31,159 +32,78 @@ func (c *Coll[TData]) List(ctx context.Context, filter ct.Filter, pageSize *int, } } -func (c *Coll[TData]) listWithKSToken(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CTKeySort) ([]TData, ct.CursorToken, error) { - if inTok.Mode == ct.CTMEnd { - return make([]TData, 0), ct.End(), nil - } +func (c *Coll[TData]) ListIterateFunc(ctx context.Context, filter ct.Filter, fn func(v TData) error, pageSize *int, inTok ct.CursorToken) error { + var cursor *mongo.Cursor + var err error - if pageSize != nil && *pageSize == 0 { - return make([]TData, 0), inTok, nil // fast track, we return an empty list and do not advance the cursor token - } - - pipeline := mongo.Pipeline{} - pf1 := "_id" - pd1 := ct.SortASC - pf2 := "_id" - pd2 := ct.SortASC - - if filter != nil { - pipeline = filter.FilterQuery(ctx) - pf1, pd1, pf2, pd2 = filter.Pagination(ctx) - } - - sortPrimary := pf1 - sortDirPrimary := pd1 - sortSecondary := &pf2 - sortDirSecondary := &pd2 - - if pf1 == pf2 { - sortSecondary = nil - sortDirSecondary = nil - } - - paginationPipeline, doubleSortPipeline, err := createPaginationPipeline(c, inTok, sortPrimary, sortDirPrimary, sortSecondary, sortDirSecondary, pageSize) - if err != nil { - return nil, nil, exerr. - Wrap(err, "failed to create pagination"). - WithType(exerr.TypeCursorTokenDecode). - Str("collection", c.Name()). - Any("inTok", inTok). - Any("sortPrimary", sortPrimary). - Any("sortDirPrimary", sortDirPrimary). - Any("sortSecondary", sortSecondary). - Any("sortDirSecondary", sortDirSecondary). - Any("pageSize", pageSize). - Build() - } - - pipeline = append(pipeline, paginationPipeline...) - - for _, ppl := range c.extraModPipeline { - pipeline = langext.ArrConcat(pipeline, ppl(ctx)) - } - - if c.needsDoubleSort(ctx) { - pipeline = langext.ArrConcat(pipeline, doubleSortPipeline) - } - - cursor, err := c.coll.Aggregate(ctx, pipeline) - if err != nil { - return nil, nil, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipeline).Str("collection", c.Name()).Build() + if ctks, ok := inTok.(ct.CTKeySort); ok { + _, _, _, _, cursor, err = c.createKSListQuery(ctx, filter, pageSize, ctks) + if err != nil { + return exerr.Wrap(err, "").Build() + } + } else if ctpag, ok := inTok.(ct.CTPaginated); ok { + _, cursor, err = c.createPaginatedListQuery(ctx, filter, pageSize, ctpag) + if err != nil { + return exerr.Wrap(err, "").Build() + } + } else { + return exerr.New(exerr.TypeCursorTokenDecode, "unknown ct type").Any("token", inTok).Type("tokenType", inTok).Build() } defer func() { _ = cursor.Close(ctx) }() - // fast branch - if pageSize == nil { - entries, err := c.decodeAll(ctx, cursor) + for cursor.Next(ctx) { + + v, err := c.decodeSingle(ctx, cursor) if err != nil { - return nil, nil, exerr.Wrap(err, "failed to all-decode entities").Build() + return exerr.Wrap(err, "").Build() } - return entries, ct.End(), nil - } - entities := make([]TData, 0, cursor.RemainingBatchLength()) - for (pageSize == nil || len(entities) != *pageSize) && cursor.Next(ctx) { - var entry TData - entry, err = c.decodeSingle(ctx, cursor) + err = fn(v) if err != nil { - return nil, nil, exerr.Wrap(err, "failed to decode entity").Build() + return exerr.Wrap(err, "").Build() } - entities = append(entities, entry) + } - if pageSize == nil || len(entities) < *pageSize || !cursor.TryNext(ctx) { - return entities, ct.End(), nil - } - - last := entities[len(entities)-1] - - c.EnsureInitializedReflection(last) - - nextToken, err := c.createToken(sortPrimary, sortDirPrimary, sortSecondary, sortDirSecondary, last, pageSize) - if err != nil { - return nil, nil, exerr.Wrap(err, "failed to create (out)-token").Build() - } - - return entities, nextToken, nil + return nil } -func (c *Coll[TData]) listWithPaginatedToken(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CTPaginated) ([]TData, ct.CursorToken, error) { +func (c *Coll[TData]) ListIterate(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CursorToken) iter.Seq2[TData, error] { + var cursor *mongo.Cursor var err error - page := inTok.Page - - if page < 0 { - page = 1 + if ctks, ok := inTok.(ct.CTKeySort); ok { + _, _, _, _, cursor, err = c.createKSListQuery(ctx, filter, pageSize, ctks) + if err != nil { + return langext.IterSingleValueSeq2[TData, error](nil, exerr.Wrap(err, "").Build()) + } + } else if ctpag, ok := inTok.(ct.CTPaginated); ok { + _, cursor, err = c.createPaginatedListQuery(ctx, filter, pageSize, ctpag) + if err != nil { + return langext.IterSingleValueSeq2[TData, error](nil, exerr.Wrap(err, "").Build()) + } + } else { + return langext.IterSingleValueSeq2[TData, error](nil, exerr.New(exerr.TypeCursorTokenDecode, "unknown ct type").Any("token", inTok).Type("tokenType", inTok).Build()) } - pipelineSort := mongo.Pipeline{} - pipelineFilter := mongo.Pipeline{} + return func(yield func(TData, error) bool) { + defer func() { _ = cursor.Close(ctx) }() - if filter != nil { - pipelineFilter = filter.FilterQuery(ctx) - pf1, pd1, pf2, pd2 := filter.Pagination(ctx) + for cursor.Next(ctx) { + v, err := c.decodeSingle(ctx, cursor) + if err != nil { + if !yield(nil, err) { + return + } + continue + } - pipelineSort, err = createSortOnlyPipeline(pf1, pd1, &pf2, &pd2) - if err != nil { - return nil, nil, exerr.Wrap(err, "failed to create sort pipeline").Build() + if !yield(v, nil) { + return + } } } - - pipelinePaginate := mongo.Pipeline{} - if pageSize != nil { - pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$skip", Value: *pageSize * (page - 1)}}) - pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$limit", Value: *pageSize}}) - } else { - page = 1 - } - - pipelineCount := mongo.Pipeline{} - pipelineCount = append(pipelineCount, bson.D{{Key: "$count", Value: "count"}}) - - extrModPipelineResolved := mongo.Pipeline{} - for _, ppl := range c.extraModPipeline { - extrModPipelineResolved = langext.ArrConcat(extrModPipelineResolved, ppl(ctx)) - } - - pipelineList := langext.ArrConcat(pipelineFilter, pipelineSort, pipelinePaginate, extrModPipelineResolved, pipelineSort) - - cursorList, err := c.coll.Aggregate(ctx, pipelineList) - if err != nil { - return nil, nil, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipelineList).Str("collection", c.Name()).Build() - } - - entities, err := c.decodeAll(ctx, cursorList) - if err != nil { - return nil, nil, exerr.Wrap(err, "failed to all-decode entities").Build() - } - - tokOut := ct.Page(page + 1) - if pageSize == nil || len(entities) < *pageSize { - tokOut = ct.PageEnd() - } - - return entities, tokOut, nil } func (c *Coll[TData]) Count(ctx context.Context, filter ct.RawFilter) (int64, error) { @@ -291,6 +211,185 @@ func (c *Coll[TData]) ListAllIDs(ctx context.Context, filter ct.RawFilter) ([]st return langext.ArrMap(res, func(v idObject) string { return v.ID }), nil } +// ===================================================================================================================== + +func (c *Coll[TData]) createKSListQuery(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CTKeySort) (string, ct.SortDirection, *string, *ct.SortDirection, *mongo.Cursor, error) { + pipeline := mongo.Pipeline{} + pf1 := "_id" + pd1 := ct.SortASC + pf2 := "_id" + pd2 := ct.SortASC + + if filter != nil { + pipeline = filter.FilterQuery(ctx) + pf1, pd1, pf2, pd2 = filter.Pagination(ctx) + } + + sortPrimary := pf1 + sortDirPrimary := pd1 + sortSecondary := &pf2 + sortDirSecondary := &pd2 + + if pf1 == pf2 { + sortSecondary = nil + sortDirSecondary = nil + } + + paginationPipeline, doubleSortPipeline, err := createPaginationPipeline(c, inTok, sortPrimary, sortDirPrimary, sortSecondary, sortDirSecondary, pageSize) + if err != nil { + return "", "", nil, nil, nil, exerr. + Wrap(err, "failed to create pagination"). + WithType(exerr.TypeCursorTokenDecode). + Str("collection", c.Name()). + Any("inTok", inTok). + Any("sortPrimary", sortPrimary). + Any("sortDirPrimary", sortDirPrimary). + Any("sortSecondary", sortSecondary). + Any("sortDirSecondary", sortDirSecondary). + Any("pageSize", pageSize). + Build() + } + + pipeline = append(pipeline, paginationPipeline...) + + for _, ppl := range c.extraModPipeline { + pipeline = langext.ArrConcat(pipeline, ppl(ctx)) + } + + if c.needsDoubleSort(ctx) { + pipeline = langext.ArrConcat(pipeline, doubleSortPipeline) + } + + cursor, err := c.coll.Aggregate(ctx, pipeline) + if err != nil { + return "", "", nil, nil, nil, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipeline).Str("collection", c.Name()).Build() + } + + return sortPrimary, sortDirPrimary, sortSecondary, sortDirSecondary, cursor, nil +} + +func (c *Coll[TData]) createPaginatedListQuery(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CTPaginated) (int, *mongo.Cursor, error) { + var err error + + page := inTok.Page + + pipelineSort := mongo.Pipeline{} + pipelineFilter := mongo.Pipeline{} + + if filter != nil { + pipelineFilter = filter.FilterQuery(ctx) + pf1, pd1, pf2, pd2 := filter.Pagination(ctx) + + pipelineSort, err = createSortOnlyPipeline(pf1, pd1, &pf2, &pd2) + if err != nil { + return 0, nil, exerr.Wrap(err, "failed to create sort pipeline").Build() + } + } + + pipelinePaginate := mongo.Pipeline{} + if pageSize != nil { + pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$skip", Value: *pageSize * (page - 1)}}) + pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$limit", Value: *pageSize}}) + } else { + page = 1 + } + + pipelineCount := mongo.Pipeline{} + pipelineCount = append(pipelineCount, bson.D{{Key: "$count", Value: "count"}}) + + extrModPipelineResolved := mongo.Pipeline{} + for _, ppl := range c.extraModPipeline { + extrModPipelineResolved = langext.ArrConcat(extrModPipelineResolved, ppl(ctx)) + } + + pipelineList := langext.ArrConcat(pipelineFilter, pipelineSort, pipelinePaginate, extrModPipelineResolved, pipelineSort) + + cursorList, err := c.coll.Aggregate(ctx, pipelineList) + if err != nil { + return 0, nil, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipelineList).Str("collection", c.Name()).Build() + } + + return page, cursorList, nil +} + +func (c *Coll[TData]) listWithKSToken(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CTKeySort) ([]TData, ct.CursorToken, error) { + if inTok.Mode == ct.CTMEnd { + return make([]TData, 0), ct.End(), nil + } + + if pageSize != nil && *pageSize == 0 { + return make([]TData, 0), inTok, nil // fast track, we return an empty list and do not advance the cursor token + } + + sortPrimary, sortDirPrimary, sortSecondary, sortDirSecondary, cursor, err := c.createKSListQuery(ctx, filter, pageSize, inTok) + if err != nil { + return nil, nil, exerr.Wrap(err, "").Build() + } + + defer func() { _ = cursor.Close(ctx) }() + + // fast branch + if pageSize == nil { + entries, err := c.decodeAll(ctx, cursor) + if err != nil { + return nil, nil, exerr.Wrap(err, "failed to all-decode entities").Build() + } + return entries, ct.End(), nil + } + + entities := make([]TData, 0, cursor.RemainingBatchLength()) + for (pageSize == nil || len(entities) != *pageSize) && cursor.Next(ctx) { + var entry TData + entry, err = c.decodeSingle(ctx, cursor) + if err != nil { + return nil, nil, exerr.Wrap(err, "failed to decode entity").Build() + } + entities = append(entities, entry) + } + + if pageSize == nil || len(entities) < *pageSize || !cursor.TryNext(ctx) { + return entities, ct.End(), nil + } + + last := entities[len(entities)-1] + + c.EnsureInitializedReflection(last) + + nextToken, err := c.createToken(sortPrimary, sortDirPrimary, sortSecondary, sortDirSecondary, last, pageSize) + if err != nil { + return nil, nil, exerr.Wrap(err, "failed to create (out)-token").Build() + } + + return entities, nextToken, nil +} + +func (c *Coll[TData]) listWithPaginatedToken(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CTPaginated) ([]TData, ct.CursorToken, error) { + var err error + + page := inTok.Page + + if page < 0 { + page = 1 + } + + page, cursorList, err := c.createPaginatedListQuery(ctx, filter, pageSize, inTok) + if err != nil { + return nil, nil, exerr.Wrap(err, "").Build() + } + + entities, err := c.decodeAll(ctx, cursorList) + if err != nil { + return nil, nil, exerr.Wrap(err, "failed to all-decode entities").Build() + } + + tokOut := ct.Page(page + 1) + if pageSize == nil || len(entities) < *pageSize { + tokOut = ct.PageEnd() + } + + return entities, tokOut, nil +} + func createPaginationPipeline[TData any](coll *Coll[TData], token ct.CTKeySort, fieldPrimary string, sortPrimary ct.SortDirection, fieldSecondary *string, sortSecondary *ct.SortDirection, pageSize *int) ([]bson.D, []bson.D, error) { cond := bson.A{}