diff --git a/goextVersion.go b/goextVersion.go index 2461893..877bc86 100644 --- a/goextVersion.go +++ b/goextVersion.go @@ -1,5 +1,5 @@ package goext -const GoextVersion = "0.0.560" +const GoextVersion = "0.0.561" -const GoextVersionTimestamp = "2025-01-29T10:54:53+0100" +const GoextVersionTimestamp = "2025-01-29T11:02:41+0100" diff --git a/wmo/queryList.go b/wmo/queryList.go index 6836f08..4d17c0c 100644 --- a/wmo/queryList.go +++ b/wmo/queryList.go @@ -32,7 +32,7 @@ func (c *Coll[TData]) List(ctx context.Context, filter ct.Filter, pageSize *int, } } -func (c *Coll[TData]) ListIterateFunc(ctx context.Context, filter ct.Filter, fn func(v TData) error, pageSize *int, inTok ct.CursorToken) error { +func (c *Coll[TData]) ListIterateFunc(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CursorToken, fn func(v TData) error) error { var cursor *mongo.Cursor var err error diff --git a/wmo/queryPaginate.go b/wmo/queryPaginate.go index beb65c6..b5b356d 100644 --- a/wmo/queryPaginate.go +++ b/wmo/queryPaginate.go @@ -7,54 +7,19 @@ import ( "gogs.mikescher.com/BlackForestBytes/goext/exerr" "gogs.mikescher.com/BlackForestBytes/goext/langext" pag "gogs.mikescher.com/BlackForestBytes/goext/pagination" + "iter" ) func (c *Coll[TData]) Paginate(ctx context.Context, filter pag.MongoFilter, page int, limit *int) ([]TData, pag.Pagination, error) { + page, cursorList, pipelineTotalCount, err := c.createPaginatedQuery(ctx, filter, page, limit) + if err != nil { + return nil, pag.Pagination{}, exerr.Wrap(err, "").Build() + } + type totalCountResult struct { Count int `bson:"count"` } - if page < 0 { - page = 1 - } - - pipelineSort := mongo.Pipeline{} - pipelineFilter := mongo.Pipeline{} - sort := bson.D{} - - if filter != nil { - pipelineFilter = filter.FilterQuery(ctx) - sort = filter.Sort(ctx) - } - - if len(sort) != 0 { - pipelineSort = append(pipelineSort, bson.D{{Key: "$sort", Value: sort}}) - } - - pipelinePaginate := mongo.Pipeline{} - if limit != nil { - pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$skip", Value: *limit * (page - 1)}}) - pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$limit", Value: *limit}}) - } else { - page = 1 - } - - pipelineCount := mongo.Pipeline{} - pipelineCount = append(pipelineCount, bson.D{{Key: "$count", Value: "count"}}) - - extrModPipelineResolved := mongo.Pipeline{} - for _, ppl := range c.extraModPipeline { - extrModPipelineResolved = langext.ArrConcat(extrModPipelineResolved, ppl(ctx)) - } - - pipelineList := langext.ArrConcat(pipelineFilter, pipelineSort, pipelinePaginate, extrModPipelineResolved, pipelineSort) - pipelineTotalCount := langext.ArrConcat(pipelineFilter, pipelineCount) - - cursorList, err := c.coll.Aggregate(ctx, pipelineList) - if err != nil { - return nil, pag.Pagination{}, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipelineList).Str("collection", c.Name()).Build() - } - entities, err := c.decodeAll(ctx, cursorList) if err != nil { return nil, pag.Pagination{}, exerr.Wrap(err, "failed to all-decode entities").Build() @@ -93,3 +58,100 @@ func (c *Coll[TData]) Paginate(ctx context.Context, filter pag.MongoFilter, page return entities, paginationObj, nil } + +func (c *Coll[TData]) PaginateIterateFunc(ctx context.Context, filter pag.MongoFilter, page int, limit *int, fn func(v TData) error) error { + page, cursor, _, err := c.createPaginatedQuery(ctx, filter, page, limit) + if err != nil { + return exerr.Wrap(err, "").Build() + } + + defer func() { _ = cursor.Close(ctx) }() + + for cursor.Next(ctx) { + + v, err := c.decodeSingle(ctx, cursor) + if err != nil { + return exerr.Wrap(err, "").Build() + } + + err = fn(v) + if err != nil { + return exerr.Wrap(err, "").Build() + } + + } + + return nil +} + +func (c *Coll[TData]) PaginateIterate(ctx context.Context, filter pag.MongoFilter, page int, limit *int) iter.Seq2[TData, error] { + page, cursor, _, err := c.createPaginatedQuery(ctx, filter, page, limit) + if err != nil { + return langext.IterSingleValueSeq2[TData, error](nil, exerr.Wrap(err, "").Build()) + } + + return func(yield func(TData, error) bool) { + defer func() { _ = cursor.Close(ctx) }() + + for cursor.Next(ctx) { + v, err := c.decodeSingle(ctx, cursor) + if err != nil { + if !yield(nil, err) { + return + } + continue + } + + if !yield(v, nil) { + return + } + } + } +} + +// ===================================================================================================================== + +func (c *Coll[TData]) createPaginatedQuery(ctx context.Context, filter pag.MongoFilter, page int, limit *int) (int, *mongo.Cursor, mongo.Pipeline, error) { + if page < 0 { + page = 1 + } + + pipelineSort := mongo.Pipeline{} + pipelineFilter := mongo.Pipeline{} + sort := bson.D{} + + if filter != nil { + pipelineFilter = filter.FilterQuery(ctx) + sort = filter.Sort(ctx) + } + + if len(sort) != 0 { + pipelineSort = append(pipelineSort, bson.D{{Key: "$sort", Value: sort}}) + } + + pipelinePaginate := mongo.Pipeline{} + if limit != nil { + pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$skip", Value: *limit * (page - 1)}}) + pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$limit", Value: *limit}}) + } else { + page = 1 + } + + pipelineCount := mongo.Pipeline{} + pipelineCount = append(pipelineCount, bson.D{{Key: "$count", Value: "count"}}) + + extrModPipelineResolved := mongo.Pipeline{} + for _, ppl := range c.extraModPipeline { + extrModPipelineResolved = langext.ArrConcat(extrModPipelineResolved, ppl(ctx)) + } + + pipelineList := langext.ArrConcat(pipelineFilter, pipelineSort, pipelinePaginate, extrModPipelineResolved, pipelineSort) + pipelineTotalCount := langext.ArrConcat(pipelineFilter, pipelineCount) + + cursorList, err := c.coll.Aggregate(ctx, pipelineList) + if err != nil { + return 0, nil, nil, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipelineList).Str("collection", c.Name()).Build() + } + + return page, cursorList, pipelineTotalCount, nil +}