v0.0.561 wmo PaginateIterateFunc+PaginateIterate
Some checks failed
Build Docker and Deploy / Run goext test-suite (push) Failing after 1m31s

This commit is contained in:
Mike Schwörer 2025-01-29 11:02:41 +01:00
parent 28cdfc5bd2
commit be9b9e8ccf
Signed by: Mikescher
GPG Key ID: D3C7172E0A70F8CF
3 changed files with 106 additions and 44 deletions

View File

@ -1,5 +1,5 @@
package goext
const GoextVersion = "0.0.560"
const GoextVersion = "0.0.561"
const GoextVersionTimestamp = "2025-01-29T10:54:53+0100"
const GoextVersionTimestamp = "2025-01-29T11:02:41+0100"

View File

@ -32,7 +32,7 @@ func (c *Coll[TData]) List(ctx context.Context, filter ct.Filter, pageSize *int,
}
}
func (c *Coll[TData]) ListIterateFunc(ctx context.Context, filter ct.Filter, fn func(v TData) error, pageSize *int, inTok ct.CursorToken) error {
func (c *Coll[TData]) ListIterateFunc(ctx context.Context, filter ct.Filter, pageSize *int, inTok ct.CursorToken, fn func(v TData) error) error {
var cursor *mongo.Cursor
var err error

View File

@ -7,54 +7,19 @@ import (
"gogs.mikescher.com/BlackForestBytes/goext/exerr"
"gogs.mikescher.com/BlackForestBytes/goext/langext"
pag "gogs.mikescher.com/BlackForestBytes/goext/pagination"
"iter"
)
func (c *Coll[TData]) Paginate(ctx context.Context, filter pag.MongoFilter, page int, limit *int) ([]TData, pag.Pagination, error) {
page, cursorList, pipelineTotalCount, err := c.createPaginatedQuery(ctx, filter, page, limit)
if err != nil {
return nil, pag.Pagination{}, exerr.Wrap(err, "").Build()
}
type totalCountResult struct {
Count int `bson:"count"`
}
if page < 0 {
page = 1
}
pipelineSort := mongo.Pipeline{}
pipelineFilter := mongo.Pipeline{}
sort := bson.D{}
if filter != nil {
pipelineFilter = filter.FilterQuery(ctx)
sort = filter.Sort(ctx)
}
if len(sort) != 0 {
pipelineSort = append(pipelineSort, bson.D{{Key: "$sort", Value: sort}})
}
pipelinePaginate := mongo.Pipeline{}
if limit != nil {
pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$skip", Value: *limit * (page - 1)}})
pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$limit", Value: *limit}})
} else {
page = 1
}
pipelineCount := mongo.Pipeline{}
pipelineCount = append(pipelineCount, bson.D{{Key: "$count", Value: "count"}})
extrModPipelineResolved := mongo.Pipeline{}
for _, ppl := range c.extraModPipeline {
extrModPipelineResolved = langext.ArrConcat(extrModPipelineResolved, ppl(ctx))
}
pipelineList := langext.ArrConcat(pipelineFilter, pipelineSort, pipelinePaginate, extrModPipelineResolved, pipelineSort)
pipelineTotalCount := langext.ArrConcat(pipelineFilter, pipelineCount)
cursorList, err := c.coll.Aggregate(ctx, pipelineList)
if err != nil {
return nil, pag.Pagination{}, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipelineList).Str("collection", c.Name()).Build()
}
entities, err := c.decodeAll(ctx, cursorList)
if err != nil {
return nil, pag.Pagination{}, exerr.Wrap(err, "failed to all-decode entities").Build()
@ -93,3 +58,100 @@ func (c *Coll[TData]) Paginate(ctx context.Context, filter pag.MongoFilter, page
return entities, paginationObj, nil
}
func (c *Coll[TData]) PaginateIterateFunc(ctx context.Context, filter pag.MongoFilter, page int, limit *int, fn func(v TData) error) error {
page, cursor, _, err := c.createPaginatedQuery(ctx, filter, page, limit)
if err != nil {
return exerr.Wrap(err, "").Build()
}
defer func() { _ = cursor.Close(ctx) }()
for cursor.Next(ctx) {
v, err := c.decodeSingle(ctx, cursor)
if err != nil {
return exerr.Wrap(err, "").Build()
}
err = fn(v)
if err != nil {
return exerr.Wrap(err, "").Build()
}
}
return nil
}
func (c *Coll[TData]) PaginateIterate(ctx context.Context, filter pag.MongoFilter, page int, limit *int) iter.Seq2[TData, error] {
page, cursor, _, err := c.createPaginatedQuery(ctx, filter, page, limit)
if err != nil {
return langext.IterSingleValueSeq2[TData, error](nil, exerr.Wrap(err, "").Build())
}
return func(yield func(TData, error) bool) {
defer func() { _ = cursor.Close(ctx) }()
for cursor.Next(ctx) {
v, err := c.decodeSingle(ctx, cursor)
if err != nil {
if !yield(nil, err) {
return
}
continue
}
if !yield(v, nil) {
return
}
}
}
}
// =====================================================================================================================
func (c *Coll[TData]) createPaginatedQuery(ctx context.Context, filter pag.MongoFilter, page int, limit *int) (int, *mongo.Cursor, mongo.Pipeline, error) {
if page < 0 {
page = 1
}
pipelineSort := mongo.Pipeline{}
pipelineFilter := mongo.Pipeline{}
sort := bson.D{}
if filter != nil {
pipelineFilter = filter.FilterQuery(ctx)
sort = filter.Sort(ctx)
}
if len(sort) != 0 {
pipelineSort = append(pipelineSort, bson.D{{Key: "$sort", Value: sort}})
}
pipelinePaginate := mongo.Pipeline{}
if limit != nil {
pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$skip", Value: *limit * (page - 1)}})
pipelinePaginate = append(pipelinePaginate, bson.D{{Key: "$limit", Value: *limit}})
} else {
page = 1
}
pipelineCount := mongo.Pipeline{}
pipelineCount = append(pipelineCount, bson.D{{Key: "$count", Value: "count"}})
extrModPipelineResolved := mongo.Pipeline{}
for _, ppl := range c.extraModPipeline {
extrModPipelineResolved = langext.ArrConcat(extrModPipelineResolved, ppl(ctx))
}
pipelineList := langext.ArrConcat(pipelineFilter, pipelineSort, pipelinePaginate, extrModPipelineResolved, pipelineSort)
pipelineTotalCount := langext.ArrConcat(pipelineFilter, pipelineCount)
cursorList, err := c.coll.Aggregate(ctx, pipelineList)
if err != nil {
return 0, nil, nil, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipelineList).Str("collection", c.Name()).Build()
}
return page, cursorList, pipelineTotalCount, nil
}