package wmo import ( "context" "go.mongodb.org/mongo-driver/bson/bsontype" "go.mongodb.org/mongo-driver/mongo" ct "gogs.mikescher.com/BlackForestBytes/goext/cursortoken" "gogs.mikescher.com/BlackForestBytes/goext/exerr" "gogs.mikescher.com/BlackForestBytes/goext/langext" "reflect" ) type EntityID interface { MarshalBSONValue() (bsontype.Type, []byte, error) String() string } type Decodable interface { Decode(v any) error } type Cursorable interface { Decode(v any) error Err() error Close(ctx context.Context) error All(ctx context.Context, results any) error RemainingBatchLength() int Next(ctx context.Context) bool } type fullTypeRef struct { IsPointer bool Kind reflect.Kind RealType reflect.Type Type reflect.Type UnderlyingType reflect.Type Name string Index []int } type IColl interface { Collection() *mongo.Collection Name() string Indexes() mongo.IndexView Drop(ctx context.Context) error } type Coll[TData any] struct { coll *mongo.Collection // internal mongo collection, access via Collection() dataTypeMap map[string]fullTypeRef // list of TData fields (only if TData is not an interface) implDataTypeMap map[reflect.Type]map[string]fullTypeRef // dynamic list of fields of TData implementations (only if TData is an interface) customDecoder *func(ctx context.Context, dec Decodable) (TData, error) // custom decoding function (useful if TData is an interface) isInterfaceDataType bool // true if TData is an interface (not a struct) unmarshalHooks []func(d TData) TData // called for every object after unmarshalling marshalHooks []func(d TData) TData // called for every object before marshalling extraModPipeline []func(ctx context.Context) mongo.Pipeline // appended to pipelines after filter/limit/skip/sort, used for $lookup, $set, $unset, $project, etc } func (c *Coll[TData]) Collection() *mongo.Collection { return c.coll } func (c *Coll[TData]) Name() string { return c.coll.Name() } func (c *Coll[TData]) Indexes() mongo.IndexView { return c.coll.Indexes() } func (c *Coll[TData]) Drop(ctx context.Context) error { err := c.coll.Drop(ctx) if err != nil { return exerr.Wrap(err, "failed to drop collection").Str("collection", c.Name()).Build() } return nil } func (c *Coll[TData]) WithDecodeFunc(cdf func(ctx context.Context, dec Decodable) (TData, error), example TData) *Coll[TData] { c.EnsureInitializedReflection(example) c.customDecoder = langext.Ptr(cdf) return c } // WithUnmarshalHook // function that is called for every object after reading from DB func (c *Coll[TData]) WithUnmarshalHook(fn func(d TData) TData) *Coll[TData] { c.unmarshalHooks = append(c.unmarshalHooks, fn) return c } // WithMarshalHook // function that is called for every object before writing to DB func (c *Coll[TData]) WithMarshalHook(fn func(d TData) TData) *Coll[TData] { c.marshalHooks = append(c.marshalHooks, fn) return c } // WithModifyingPipeline // pipeline that is appended to all read operations (after filtering) func (c *Coll[TData]) WithModifyingPipeline(p mongo.Pipeline) *Coll[TData] { c.extraModPipeline = append(c.extraModPipeline, func(ctx context.Context) mongo.Pipeline { return p }) return c } // WithModifyingPipelineFunc // pipeline that is appended to all read operations (after filtering) func (c *Coll[TData]) WithModifyingPipelineFunc(fn func(ctx context.Context) mongo.Pipeline) *Coll[TData] { c.extraModPipeline = append(c.extraModPipeline, fn) return c } func (c *Coll[TData]) createToken(fieldPrimary string, dirPrimary ct.SortDirection, fieldSecondary *string, dirSecondary *ct.SortDirection, lastEntity TData, pageSize *int) (ct.CursorToken, error) { valuePrimary, err := c.getFieldValueAsTokenString(lastEntity, fieldPrimary) if err != nil { return nil, exerr.Wrap(err, "failed to get (primary) field-value as token-string").Type("lastEntity", lastEntity).Str("fieldPrimary", fieldPrimary).Build() } valueSeconary := "" if fieldSecondary != nil && dirSecondary != nil { valueSeconary, err = c.getFieldValueAsTokenString(lastEntity, *fieldSecondary) if err != nil { return nil, exerr.Wrap(err, "failed to get (secondary) field-value as token-string").Type("lastEntity", lastEntity).StrPtr("fieldSecondary", fieldSecondary).Build() } } return ct.NewKeySortToken( valuePrimary, valueSeconary, dirPrimary, dirPrimary, langext.Coalesce(pageSize, 0), ct.Extra{}, ), nil } func (c *Coll[TData]) needsDoubleSort(ctx context.Context) bool { for _, ppl := range c.extraModPipeline { for _, stage := range ppl(ctx) { for _, bsone := range stage { if bsone.Key == "$group" { // a group stage in extraModPipeline results in unsorted data, which means the caller must sort again after these pipeline stages... return true } } } } return false }