package wmo import ( "context" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "gogs.mikescher.com/BlackForestBytes/goext/exerr" "gogs.mikescher.com/BlackForestBytes/goext/langext" "iter" ) func (c *Coll[TData]) createFindQuery(ctx context.Context, filter bson.M, opts ...*options.FindOptions) (*mongo.Cursor, error) { pipeline := mongo.Pipeline{} pipeline = append(pipeline, bson.D{{Key: "$match", Value: filter}}) for _, opt := range opts { if opt != nil && opt.Sort != nil { pipeline = append(pipeline, bson.D{{Key: "$sort", Value: opt.Sort}}) } } for _, opt := range opts { if opt != nil && opt.Skip != nil { pipeline = append(pipeline, bson.D{{Key: "$skip", Value: *opt.Skip}}) } } for _, opt := range opts { if opt != nil && opt.Limit != nil { pipeline = append(pipeline, bson.D{{Key: "$limit", Value: *opt.Limit}}) } } for _, ppl := range c.extraModPipeline { pipeline = langext.ArrConcat(pipeline, ppl(ctx)) } if c.needsDoubleSort(ctx) { for _, opt := range opts { if opt != nil && opt.Sort != nil { pipeline = append(pipeline, bson.D{{Key: "$sort", Value: opt.Sort}}) } } } for _, opt := range opts { if opt != nil && opt.Projection != nil { pipeline = append(pipeline, bson.D{{Key: "$project", Value: opt.Projection}}) } } convOpts := make([]*options.AggregateOptions, 0, len(opts)) for _, v := range opts { vConv, err := convertFindOpt(v) if err != nil { return nil, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipeline).Str("collection", c.Name()).Build() } convOpts = append(convOpts, vConv) } cursor, err := c.coll.Aggregate(ctx, pipeline, convOpts...) if err != nil { return nil, exerr.Wrap(err, "mongo-aggregation failed").Any("pipeline", pipeline).Str("collection", c.Name()).Build() } return cursor, nil } func (c *Coll[TData]) Find(ctx context.Context, filter bson.M, opts ...*options.FindOptions) ([]TData, error) { cursor, err := c.createFindQuery(ctx, filter, opts...) if err != nil { return nil, exerr.Wrap(err, "").Build() } defer func() { _ = cursor.Close(ctx) }() res, err := c.decodeAll(ctx, cursor) if err != nil { return nil, exerr.Wrap(err, "failed to decode values").Build() } return res, nil } func (c *Coll[TData]) IterateFunc(ctx context.Context, filter bson.M, fn func(v TData) error, opts ...*options.FindOptions) error { cursor, err := c.createFindQuery(ctx, filter, opts...) if err != nil { return exerr.Wrap(err, "").Build() } defer func() { _ = cursor.Close(ctx) }() for cursor.Next(ctx) { v, err := c.decodeSingle(ctx, cursor) if err != nil { return exerr.Wrap(err, "").Build() } err = fn(v) if err != nil { return exerr.Wrap(err, "").Build() } } return nil } func (c *Coll[TData]) Iterate(ctx context.Context, filter bson.M, opts ...*options.FindOptions) iter.Seq2[TData, error] { cursor, err := c.createFindQuery(ctx, filter, opts...) if err != nil { return langext.IterSingleValueSeq2[TData, error](nil, exerr.Wrap(err, "").Build()) } return func(yield func(TData, error) bool) { defer func() { _ = cursor.Close(ctx) }() for cursor.Next(ctx) { v, err := c.decodeSingle(ctx, cursor) if err != nil { if !yield(nil, err) { return } continue } if !yield(v, nil) { return } } } } // converts FindOptions to AggregateOptions func convertFindOpt(v *options.FindOptions) (*options.AggregateOptions, error) { if v == nil { return nil, nil } r := options.Aggregate() if v.AllowDiskUse != nil { r.SetAllowDiskUse(*v.AllowDiskUse) } if v.AllowPartialResults != nil { return nil, exerr.New(exerr.TypeMongoInvalidOpt, "Invalid option 'AllowPartialResults' (cannot convert to AggregateOptions)").Build() } if v.BatchSize != nil { r.SetBatchSize(*v.BatchSize) } if v.Collation != nil { r.SetCollation(v.Collation) } if v.Comment != nil { r.SetComment(*v.Comment) } if v.CursorType != nil { return nil, exerr.New(exerr.TypeMongoInvalidOpt, "Invalid option 'CursorType' (cannot convert to AggregateOptions)").Build() } if v.Hint != nil { r.SetHint(v.Hint) } if v.Max != nil { return nil, exerr.New(exerr.TypeMongoInvalidOpt, "Invalid option 'Max' (cannot convert to AggregateOptions)").Build() } if v.MaxAwaitTime != nil { r.SetMaxAwaitTime(*v.MaxAwaitTime) } if v.MaxTime != nil { r.SetMaxTime(*v.MaxTime) } if v.Min != nil { return nil, exerr.New(exerr.TypeMongoInvalidOpt, "Invalid option 'Min' (cannot convert to AggregateOptions)").Build() } if v.NoCursorTimeout != nil { return nil, exerr.New(exerr.TypeMongoInvalidOpt, "Invalid option 'NoCursorTimeout' (cannot convert to AggregateOptions)").Build() } if v.OplogReplay != nil { return nil, exerr.New(exerr.TypeMongoInvalidOpt, "Invalid option 'OplogReplay' (cannot convert to AggregateOptions)").Build() } if v.ReturnKey != nil { return nil, exerr.New(exerr.TypeMongoInvalidOpt, "Invalid option 'ReturnKey' (cannot convert to AggregateOptions)").Build() } if v.ShowRecordID != nil { return nil, exerr.New(exerr.TypeMongoInvalidOpt, "Invalid option 'ShowRecordID' (cannot convert to AggregateOptions)").Build() } if v.Snapshot != nil { return nil, exerr.New(exerr.TypeMongoInvalidOpt, "Invalid option 'Snapshot' (cannot convert to AggregateOptions)").Build() } if v.Let != nil { r.SetLet(v.Let) } return r, nil }