goext/wmo/collection.go
Mike Schwörer e154137105
All checks were successful
Build Docker and Deploy / Run goext test-suite (push) Successful in 2m11s
Trying out paginated cursortoken variant [UNTESTED]
2024-10-25 09:45:42 +02:00

157 lines
5.0 KiB
Go

package wmo
import (
"context"
"go.mongodb.org/mongo-driver/bson/bsontype"
"go.mongodb.org/mongo-driver/mongo"
ct "gogs.mikescher.com/BlackForestBytes/goext/cursortoken"
"gogs.mikescher.com/BlackForestBytes/goext/exerr"
"gogs.mikescher.com/BlackForestBytes/goext/langext"
"reflect"
)
type EntityID interface {
MarshalBSONValue() (bsontype.Type, []byte, error)
String() string
}
type Decodable interface {
Decode(v any) error
}
type Cursorable interface {
Decode(v any) error
Err() error
Close(ctx context.Context) error
All(ctx context.Context, results any) error
RemainingBatchLength() int
Next(ctx context.Context) bool
}
type fullTypeRef struct {
IsPointer bool
Kind reflect.Kind
RealType reflect.Type
Type reflect.Type
UnderlyingType reflect.Type
Name string
Index []int
}
type IColl interface {
Collection() *mongo.Collection
Name() string
Indexes() mongo.IndexView
Drop(ctx context.Context) error
}
type Coll[TData any] struct {
coll *mongo.Collection // internal mongo collection, access via Collection()
dataTypeMap map[string]fullTypeRef // list of TData fields (only if TData is not an interface)
implDataTypeMap map[reflect.Type]map[string]fullTypeRef // dynamic list of fields of TData implementations (only if TData is an interface)
customDecoder *func(ctx context.Context, dec Decodable) (TData, error) // custom decoding function (useful if TData is an interface)
isInterfaceDataType bool // true if TData is an interface (not a struct)
unmarshalHooks []func(d TData) TData // called for every object after unmarshalling
marshalHooks []func(d TData) TData // called for every object before marshalling
extraModPipeline []func(ctx context.Context) mongo.Pipeline // appended to pipelines after filter/limit/skip/sort, used for $lookup, $set, $unset, $project, etc
}
func (c *Coll[TData]) Collection() *mongo.Collection {
return c.coll
}
func (c *Coll[TData]) Name() string {
return c.coll.Name()
}
func (c *Coll[TData]) Indexes() mongo.IndexView {
return c.coll.Indexes()
}
func (c *Coll[TData]) Drop(ctx context.Context) error {
err := c.coll.Drop(ctx)
if err != nil {
return exerr.Wrap(err, "failed to drop collection").Str("collection", c.Name()).Build()
}
return nil
}
func (c *Coll[TData]) WithDecodeFunc(cdf func(ctx context.Context, dec Decodable) (TData, error), example TData) *Coll[TData] {
c.EnsureInitializedReflection(example)
c.customDecoder = langext.Ptr(cdf)
return c
}
// WithUnmarshalHook
// function that is called for every object after reading from DB
func (c *Coll[TData]) WithUnmarshalHook(fn func(d TData) TData) *Coll[TData] {
c.unmarshalHooks = append(c.unmarshalHooks, fn)
return c
}
// WithMarshalHook
// function that is called for every object before writing to DB
func (c *Coll[TData]) WithMarshalHook(fn func(d TData) TData) *Coll[TData] {
c.marshalHooks = append(c.marshalHooks, fn)
return c
}
// WithModifyingPipeline
// pipeline that is appended to all read operations (after filtering)
func (c *Coll[TData]) WithModifyingPipeline(p mongo.Pipeline) *Coll[TData] {
c.extraModPipeline = append(c.extraModPipeline, func(ctx context.Context) mongo.Pipeline { return p })
return c
}
// WithModifyingPipelineFunc
// pipeline that is appended to all read operations (after filtering)
func (c *Coll[TData]) WithModifyingPipelineFunc(fn func(ctx context.Context) mongo.Pipeline) *Coll[TData] {
c.extraModPipeline = append(c.extraModPipeline, fn)
return c
}
func (c *Coll[TData]) createToken(fieldPrimary string, dirPrimary ct.SortDirection, fieldSecondary *string, dirSecondary *ct.SortDirection, lastEntity TData, pageSize *int) (ct.CursorToken, error) {
valuePrimary, err := c.getFieldValueAsTokenString(lastEntity, fieldPrimary)
if err != nil {
return nil, exerr.Wrap(err, "failed to get (primary) field-value as token-string").Type("lastEntity", lastEntity).Str("fieldPrimary", fieldPrimary).Build()
}
valueSeconary := ""
if fieldSecondary != nil && dirSecondary != nil {
valueSeconary, err = c.getFieldValueAsTokenString(lastEntity, *fieldSecondary)
if err != nil {
return nil, exerr.Wrap(err, "failed to get (secondary) field-value as token-string").Type("lastEntity", lastEntity).StrPtr("fieldSecondary", fieldSecondary).Build()
}
}
return ct.NewKeySortToken(
valuePrimary,
valueSeconary,
dirPrimary,
dirPrimary,
langext.Coalesce(pageSize, 0),
ct.Extra{},
), nil
}
func (c *Coll[TData]) needsDoubleSort(ctx context.Context) bool {
for _, ppl := range c.extraModPipeline {
for _, stage := range ppl(ctx) {
for _, bsone := range stage {
if bsone.Key == "$group" {
// a group stage in extraModPipeline results in unsorted data, which means the caller must sort again after these pipeline stages...
return true
}
}
}
}
return false
}