- fix references in nested structs not being registered properly - rework struct tag parsing - remove debug logs - fix bug where an unrelated field with the same prefix as another population path is erroneously marked as populated
419 lines
9.7 KiB
Go
419 lines
9.7 KiB
Go
package orm
|
|
|
|
import (
|
|
context2 "context"
|
|
"fmt"
|
|
"log"
|
|
"reflect"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/fatih/structtag"
|
|
"go.mongodb.org/mongo-driver/v2/bson"
|
|
"go.mongodb.org/mongo-driver/v2/mongo"
|
|
"go.mongodb.org/mongo-driver/v2/mongo/options"
|
|
"golang.org/x/net/context"
|
|
)
|
|
|
|
// Reference stores a typed document reference
|
|
type Reference struct {
|
|
// owning model name
|
|
Model string
|
|
// the name of the struct field
|
|
FieldName string
|
|
// index of field in owning struct
|
|
Idx int
|
|
// the type of the referenced object
|
|
HydratedType reflect.Type
|
|
|
|
// field kind (struct, slice, ...)
|
|
Kind reflect.Kind
|
|
|
|
exists bool
|
|
}
|
|
|
|
type gridFSReference struct {
|
|
BucketName string
|
|
FilenameFmt string
|
|
LoadType reflect.Type
|
|
Idx int
|
|
}
|
|
|
|
type TModelRegistry map[string]*Model
|
|
|
|
// ModelRegistry - the ModelRegistry stores a map containing
|
|
// pointers to Model instances, keyed by an associated
|
|
// model name
|
|
var ModelRegistry = make(TModelRegistry)
|
|
|
|
// DB - The mongodb database handle
|
|
var DB *mongo.Database
|
|
|
|
// DBClient - The mongodb client
|
|
var DBClient *mongo.Client
|
|
|
|
// NextStringID - Override this function with your own
|
|
// string ID generator!
|
|
var NextStringID func() string
|
|
|
|
var mutex sync.Mutex
|
|
|
|
func makeGfsRef(tag *structtag.Tag, idx int) gridFSReference {
|
|
opts := tag.Options
|
|
var ffmt string
|
|
if len(opts) < 1 {
|
|
ffmt = "%s"
|
|
} else {
|
|
ffmt = opts[0]
|
|
}
|
|
var typ reflect.Type
|
|
if len(opts) < 2 {
|
|
typ = reflect.TypeOf("")
|
|
} else {
|
|
switch opts[1] {
|
|
case "bytes":
|
|
typ = reflect.TypeOf([]byte{})
|
|
case "string":
|
|
typ = reflect.TypeOf("")
|
|
default:
|
|
typ = reflect.TypeOf("")
|
|
}
|
|
}
|
|
|
|
return gridFSReference{
|
|
FilenameFmt: ffmt,
|
|
BucketName: tag.Name,
|
|
LoadType: typ,
|
|
Idx: idx,
|
|
}
|
|
}
|
|
|
|
func makeRef(idx int, modelName string, fieldName string, ht reflect.Type) Reference {
|
|
if modelName != "" {
|
|
if ModelRegistry.Index(modelName) != -1 {
|
|
return Reference{
|
|
Idx: idx,
|
|
Model: modelName,
|
|
HydratedType: ht,
|
|
Kind: ht.Kind(),
|
|
exists: true,
|
|
FieldName: fieldName,
|
|
}
|
|
}
|
|
return Reference{
|
|
Idx: idx,
|
|
Model: modelName,
|
|
FieldName: fieldName,
|
|
HydratedType: ht,
|
|
Kind: ht.Kind(),
|
|
exists: true,
|
|
}
|
|
}
|
|
panic("model name was empty")
|
|
}
|
|
|
|
type parseResult []string
|
|
|
|
func (p parseResult) includes(str string) bool {
|
|
for _, v := range p {
|
|
if v == str {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func parseTags(t reflect.Type, v reflect.Value, lastParsed parseResult, depth int) (map[string][]InternalIndex, map[string]Reference, map[string]gridFSReference, string) {
|
|
coll := ""
|
|
refs := make(map[string]Reference)
|
|
idcs := make(map[string][]InternalIndex)
|
|
gfsRefs := make(map[string]gridFSReference)
|
|
if depth >= 4 {
|
|
return idcs, refs, gfsRefs, coll
|
|
}
|
|
for i := 0; i < v.NumField(); i++ {
|
|
sft := t.Field(i)
|
|
ft := sft.Type
|
|
tags, err := structtag.Parse(string(sft.Tag))
|
|
panik(err)
|
|
switch ft.Kind() {
|
|
case reflect.Slice:
|
|
ft = ft.Elem()
|
|
fallthrough
|
|
case reflect.Pointer:
|
|
if ft.Kind() == reflect.Pointer {
|
|
ft = ft.Elem()
|
|
}
|
|
fallthrough
|
|
case reflect.Struct:
|
|
if ft.ConvertibleTo(reflect.TypeOf(Document{})) {
|
|
collTag, err := tags.Get("coll")
|
|
panik(err)
|
|
coll = collTag.Name
|
|
idxTag, err := tags.Get("idx")
|
|
if err == nil {
|
|
idcs[sft.Type.Name()] = scanIndex(idxTag.Value())
|
|
}
|
|
continue
|
|
}
|
|
|
|
if lastParsed.includes(sft.Name) {
|
|
continue
|
|
}
|
|
blip := lastParsed
|
|
blip = append(blip, sft.Name)
|
|
if ft.Kind() == reflect.Struct && ft != reflect.TypeFor[time.Time]() {
|
|
ii2, rr2, gg2, _ := parseTags(ft, reflect.New(ft).Elem(), blip, depth+1)
|
|
for k, vv := range ii2 {
|
|
idcs[sft.Name+"."+k] = vv
|
|
}
|
|
for k, vv := range rr2 {
|
|
refs[sft.Name+"."+k] = vv
|
|
}
|
|
for k, vv := range gg2 {
|
|
gfsRefs[sft.Name+"."+k] = vv
|
|
}
|
|
}
|
|
if refTag, ok := tags.Get("ref"); ok == nil {
|
|
sname := sft.Name
|
|
refs[sname] = makeRef(i, refTag.Name, sft.Name, sft.Type)
|
|
}
|
|
if gtag, ok := tags.Get("gridfs"); ok == nil {
|
|
sname := sft.Name + "@" + gtag.Name
|
|
gfsRefs[sname] = makeGfsRef(gtag, i)
|
|
}
|
|
fallthrough
|
|
default:
|
|
idxTag, err := tags.Get("idx")
|
|
if err == nil {
|
|
idcs[sft.Name] = scanIndex(idxTag.Value())
|
|
}
|
|
if gtag, ok := tags.Get("gridfs"); ok == nil {
|
|
sname := sft.Name + "@" + gtag.Name
|
|
gfsRefs[sname] = makeGfsRef(gtag, i)
|
|
}
|
|
}
|
|
}
|
|
return idcs, refs, gfsRefs, coll
|
|
}
|
|
|
|
// Has returns the model typename and Model instance corresponding
|
|
// to the argument passed, as well as a boolean indicating whether it
|
|
// was found. otherwise returns `"", nil, false`
|
|
func (r TModelRegistry) Has(i interface{}) (string, *Model, bool) {
|
|
t := reflect.TypeOf(i)
|
|
if t.Kind() == reflect.Ptr {
|
|
t = t.Elem()
|
|
}
|
|
n := t.Name()
|
|
if rT, ok := ModelRegistry[n]; ok {
|
|
return n, rT, true
|
|
}
|
|
return "", nil, false
|
|
}
|
|
|
|
// HasByName functions almost identically to Has,
|
|
// except that it takes a string as its argument.
|
|
func (r TModelRegistry) HasByName(n string) (string, *Model, bool) {
|
|
if t, ok := ModelRegistry[n]; ok {
|
|
return n, t, true
|
|
}
|
|
return "", nil, false
|
|
}
|
|
|
|
// Index returns the index at which the Document struct is embedded
|
|
func (r TModelRegistry) Index(n string) int {
|
|
if v, ok := ModelRegistry[n]; ok {
|
|
return v.idx
|
|
}
|
|
return -1
|
|
}
|
|
|
|
func (r TModelRegistry) new_(n string) interface{} {
|
|
if name, m, ok := ModelRegistry.HasByName(n); ok {
|
|
v := reflect.New(m.Type)
|
|
av := reflect.New(reflect.PointerTo(m.Type)).Elem()
|
|
av.Addr().Elem().Set(v)
|
|
df := av.Addr().Elem().Elem().Field(m.idx)
|
|
ado := reflect.New(reflect.PointerTo(df.Type())).Elem()
|
|
do := reflect.New(df.Type())
|
|
ado.Addr().Elem().Set(do)
|
|
d := ado.Addr().Elem().Interface().(IDocument)
|
|
d.newPopulationMap()
|
|
//d := df.Interface().(IDocument)
|
|
for k := range m.references {
|
|
d.markDepopulated(k)
|
|
}
|
|
d.setModel(*m)
|
|
d.getModel().typeName = name
|
|
d.SetSelf(av.Interface())
|
|
df.Set(reflect.ValueOf(d).Elem())
|
|
return av.Interface()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r TModelRegistry) newForType(rt reflect.Type) interface{} {
|
|
return r.new_(rt.Name())
|
|
}
|
|
|
|
func (r TModelRegistry) Get(name string) *Model {
|
|
model, ok := r[name]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return model
|
|
}
|
|
|
|
// Model registers models in the ModelRegistry, where
|
|
// they can be accessed via a model's struct name
|
|
func (r TModelRegistry) Model(mdl ...any) {
|
|
defer mutex.Unlock()
|
|
mutex.Lock()
|
|
|
|
for _, m := range mdl {
|
|
t := reflect.TypeOf(m)
|
|
v := reflect.ValueOf(m)
|
|
vp := v
|
|
if vp.Kind() != reflect.Ptr {
|
|
vp = reflect.New(v.Type())
|
|
vp.Elem().Set(v)
|
|
}
|
|
id, ok := vp.Interface().(HasID)
|
|
if !ok {
|
|
panic(fmt.Sprintf("you MUST implement the HasID interface!!! skipping...\n"))
|
|
}
|
|
switch (id).Id().(type) {
|
|
case int, int64, int32, string, bson.ObjectID, uint, uint32, uint64:
|
|
break
|
|
default:
|
|
log.Printf("invalid ID type specified!!! skipping...\n")
|
|
}
|
|
|
|
if t.Kind() == reflect.Ptr {
|
|
t = reflect.Indirect(reflect.ValueOf(m)).Type()
|
|
v = reflect.ValueOf(m).Elem()
|
|
}
|
|
n := t.Name()
|
|
if t.Kind() != reflect.Struct {
|
|
panic(fmt.Sprintf("Only structs can be passed to this function, silly! (passed type: %s)", n))
|
|
}
|
|
idx := -1
|
|
for i := 0; i < v.NumField(); i++ {
|
|
ft := t.Field(i)
|
|
if (ft.Type.ConvertibleTo(reflect.TypeOf(Document{}))) {
|
|
idx = i
|
|
break
|
|
}
|
|
}
|
|
if idx < 0 {
|
|
panic("A model must embed the Document struct!")
|
|
}
|
|
inds, refs, gfs, coll := parseTags(t, v, make(parseResult, 0), 0)
|
|
if coll == "" {
|
|
panic(fmt.Sprintf("a Document needs to be given a collection name! (passed type: %s)", n))
|
|
}
|
|
ModelRegistry[n] = &Model{
|
|
idx: idx,
|
|
Type: t,
|
|
collection: coll,
|
|
Indexes: inds,
|
|
references: refs,
|
|
gridFSReferences: gfs,
|
|
typeName: n,
|
|
}
|
|
}
|
|
for k, v := range ModelRegistry {
|
|
for k2, v2 := range v.references {
|
|
if !v2.exists {
|
|
if _, ok := ModelRegistry[v2.FieldName]; ok {
|
|
tmp := ModelRegistry[k].references[k2]
|
|
ModelRegistry[k].references[k2] = Reference{
|
|
Model: k,
|
|
Idx: tmp.Idx,
|
|
FieldName: tmp.FieldName,
|
|
Kind: tmp.Kind,
|
|
HydratedType: tmp.HydratedType,
|
|
exists: true,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func innerWatch(coll *mongo.Collection) {
|
|
sspipeline := mongo.Pipeline{
|
|
bson.D{{"$match", bson.D{{"$or",
|
|
bson.A{
|
|
bson.D{{
|
|
"operationType", "insert",
|
|
}},
|
|
bson.D{{"operationType", "update"}},
|
|
},
|
|
}},
|
|
}},
|
|
}
|
|
|
|
stream, err := coll.Watch(context.TODO(), sspipeline, options.ChangeStream().SetFullDocument(options.UpdateLookup).SetFullDocumentBeforeChange(options.WhenAvailable))
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer func(stream *mongo.ChangeStream, ctx context2.Context) {
|
|
err := stream.Close(ctx)
|
|
panik(err)
|
|
}(stream, context.TODO())
|
|
for stream.Next(context.TODO()) {
|
|
var data bson.M
|
|
if err := stream.Decode(&data); err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
var uid any
|
|
|
|
docKey := data["documentKey"]
|
|
|
|
switch docKey.(type) {
|
|
case bson.M:
|
|
uid = docKey.(bson.M)["_id"]
|
|
case bson.D:
|
|
for _, vv := range docKey.(bson.D) {
|
|
if vv.Key == "_id" {
|
|
uid = vv.Value
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
//var uid = data["documentKey"].(bson.M)["_id"]
|
|
|
|
if data["operationType"] == "insert" {
|
|
counterColl := DB.Collection(COUNTER_COL)
|
|
counterColl.UpdateOne(context.TODO(), bson.M{"collection": coll.Name()}, bson.M{"$set": bson.M{
|
|
"current": uid,
|
|
}}, options.UpdateOne().SetUpsert(true))
|
|
}
|
|
|
|
fmt.Printf("%v\n", data)
|
|
}
|
|
}
|
|
|
|
func Connect(uri string, dbName string) {
|
|
cli, err := mongo.Connect(options.Client().ApplyURI(uri))
|
|
if err != nil {
|
|
log.Fatal("failed to open database")
|
|
}
|
|
panik(err)
|
|
DB = cli.Database(dbName)
|
|
colls, err := DB.ListCollectionNames(context.TODO(), bson.M{"name": bson.M{"$ne": COUNTER_COL}}, options.ListCollections().SetNameOnly(true))
|
|
|
|
for _, c := range colls {
|
|
if c == COUNTER_COL {
|
|
continue
|
|
}
|
|
go innerWatch(DB.Collection(c))
|
|
}
|
|
|
|
DBClient = cli
|
|
}
|