package orm import ( context2 "context" "fmt" "log" "reflect" "sync" "github.com/fatih/structtag" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "golang.org/x/net/context" ) // Reference stores a typed document reference type Reference struct { // owning model name Model string // the name of the struct field FieldName string // index of field in owning struct Idx int // the type of the referenced object HydratedType reflect.Type // field kind (struct, slice, ...) Kind reflect.Kind exists bool } type gridFSReference struct { BucketName string FilenameFmt string LoadType reflect.Type Idx int } type TModelRegistry map[string]*Model // ModelRegistry - the ModelRegistry stores a map containing // pointers to Model instances, keyed by an associated // model name var ModelRegistry = make(TModelRegistry) // DB - The mongodb database handle var DB *mongo.Database // DBClient - The mongodb client var DBClient *mongo.Client // NextStringID - Override this function with your own // string ID generator! var NextStringID func() string var mutex sync.Mutex func makeGfsRef(tag *structtag.Tag, idx int) gridFSReference { opts := tag.Options var ffmt string if len(opts) < 1 { ffmt = "%s" } else { ffmt = opts[0] } var typ reflect.Type if len(opts) < 2 { typ = reflect.TypeOf("") } else { switch opts[1] { case "bytes": typ = reflect.TypeOf([]byte{}) case "string": typ = reflect.TypeOf("") default: typ = reflect.TypeOf("") } } return gridFSReference{ FilenameFmt: ffmt, BucketName: tag.Name, LoadType: typ, Idx: idx, } } func makeRef(idx int, modelName string, fieldName string, ht reflect.Type) Reference { if modelName != "" { if ModelRegistry.Index(modelName) != -1 { return Reference{ Idx: idx, Model: modelName, HydratedType: ht, Kind: ht.Kind(), exists: true, FieldName: fieldName, } } return Reference{ Idx: idx, Model: modelName, FieldName: fieldName, HydratedType: ht, Kind: ht.Kind(), exists: true, } } panic("model name was empty") } func parseTags(t reflect.Type, v reflect.Value) (map[string][]InternalIndex, map[string]Reference, map[string]gridFSReference, string) { coll := "" refs := make(map[string]Reference) idcs := make(map[string][]InternalIndex) gfsRefs := make(map[string]gridFSReference) for i := 0; i < v.NumField(); i++ { sft := t.Field(i) ft := sft.Type tags, err := structtag.Parse(string(sft.Tag)) panik(err) shouldContinue := true for { if !shouldContinue { break } switch ft.Kind() { case reflect.Slice: ft = ft.Elem() if _, ok := tags.Get("ref"); ok != nil { if ft.Kind() == reflect.Struct { ii2, rr2, gg2, _ := parseTags(ft, reflect.New(ft).Elem()) for k, vv := range ii2 { idcs[sft.Name+"."+k] = vv } for k, vv := range rr2 { refs[sft.Name+"."+k] = vv } for k, vv := range gg2 { gfsRefs[sft.Name+"."+k] = vv } } } continue case reflect.Pointer: ft = ft.Elem() fallthrough case reflect.Struct: if ft.ConvertibleTo(reflect.TypeOf(Document{})) { collTag, err := tags.Get("coll") panik(err) coll = collTag.Name idxTag, err := tags.Get("idx") if err == nil { idcs[sft.Type.Name()] = scanIndex(idxTag.Value()) } shouldContinue = false break } if refTag, ok := tags.Get("ref"); ok == nil { sname := sft.Name + "@" + refTag.Name refs[sname] = makeRef(i, refTag.Name, sft.Name, sft.Type) } if gtag, ok := tags.Get("gridfs"); ok == nil { sname := sft.Name + "@" + gtag.Name gfsRefs[sname] = makeGfsRef(gtag, i) } fallthrough default: idxTag, err := tags.Get("idx") if err == nil { idcs[sft.Name] = scanIndex(idxTag.Value()) } if gtag, ok := tags.Get("gridfs"); ok == nil { sname := sft.Name + "@" + gtag.Name gfsRefs[sname] = makeGfsRef(gtag, i) } shouldContinue = false } } } return idcs, refs, gfsRefs, coll } // Has returns the model typename and Model instance corresponding // to the argument passed, as well as a boolean indicating whether it // was found. otherwise returns `"", nil, false` func (r TModelRegistry) Has(i interface{}) (string, *Model, bool) { t := reflect.TypeOf(i) if t.Kind() == reflect.Ptr { t = t.Elem() } n := t.Name() if rT, ok := ModelRegistry[n]; ok { return n, rT, true } return "", nil, false } // HasByName functions almost identically to Has, // except that it takes a string as its argument. func (r TModelRegistry) HasByName(n string) (string, *Model, bool) { if t, ok := ModelRegistry[n]; ok { return n, t, true } return "", nil, false } // Index returns the index at which the Document struct is embedded func (r TModelRegistry) Index(n string) int { if v, ok := ModelRegistry[n]; ok { return v.idx } return -1 } func (r TModelRegistry) new_(n string) interface{} { if name, m, ok := ModelRegistry.HasByName(n); ok { v := reflect.New(m.Type) df := v.Elem().Field(m.idx) do := reflect.New(df.Type()) d := do.Interface().(IDocument) //d := df.Interface().(IDocument) d.setModel(*m) d.getModel().typeName = name df.Set(reflect.ValueOf(d).Elem()) return v.Interface() } return nil } // Model registers models in the ModelRegistry, where // they can be accessed via a model's struct name func (r TModelRegistry) Model(mdl ...any) { defer mutex.Unlock() mutex.Lock() for _, m := range mdl { t := reflect.TypeOf(m) v := reflect.ValueOf(m) vp := v if vp.Kind() != reflect.Ptr { vp = reflect.New(v.Type()) vp.Elem().Set(v) } id, ok := vp.Interface().(HasID) if !ok { panic(fmt.Sprintf("you MUST implement the HasID interface!!! skipping...\n")) } switch (id).Id().(type) { case int, int64, int32, string, primitive.ObjectID, uint, uint32, uint64: break default: log.Printf("invalid ID type specified!!! skipping...\n") } if t.Kind() == reflect.Ptr { t = reflect.Indirect(reflect.ValueOf(m)).Type() v = reflect.ValueOf(m).Elem() } n := t.Name() if t.Kind() != reflect.Struct { panic(fmt.Sprintf("Only structs can be passed to this function, silly! (passed type: %s)", n)) } idx := -1 for i := 0; i < v.NumField(); i++ { ft := t.Field(i) if (ft.Type.ConvertibleTo(reflect.TypeOf(Document{}))) { idx = i break } } if idx < 0 { panic("A model must embed the Document struct!") } inds, refs, gfs, coll := parseTags(t, v) if coll == "" { panic(fmt.Sprintf("a Document needs to be given a collection name! (passed type: %s)", n)) } ModelRegistry[n] = &Model{ idx: idx, Type: t, collection: coll, Indexes: inds, references: refs, gridFSReferences: gfs, } } for k, v := range ModelRegistry { for k2, v2 := range v.references { if !v2.exists { if _, ok := ModelRegistry[v2.FieldName]; ok { tmp := ModelRegistry[k].references[k2] ModelRegistry[k].references[k2] = Reference{ Model: k, Idx: tmp.Idx, FieldName: tmp.FieldName, Kind: tmp.Kind, HydratedType: tmp.HydratedType, exists: true, } } } } } } func innerWatch(coll *mongo.Collection) { sspipeline := mongo.Pipeline{ bson.D{{"$match", bson.D{{"$or", bson.A{ bson.D{{ "operationType", "insert", }}, bson.D{{"operationType", "update"}}, }, }}, }}, } stream, err := coll.Watch(context.TODO(), sspipeline, options.ChangeStream().SetFullDocument(options.UpdateLookup).SetFullDocumentBeforeChange(options.WhenAvailable)) panik(err) defer func(stream *mongo.ChangeStream, ctx context2.Context) { err := stream.Close(ctx) panik(err) }(stream, context.TODO()) for stream.Next(context.TODO()) { var data bson.M if err := stream.Decode(&data); err != nil { log.Fatal(err) } var uid = data["documentKey"].(bson.M)["_id"] if data["operationType"] == "insert" { counterColl := DB.Collection(COUNTER_COL) counterColl.UpdateOne(context.TODO(), bson.M{"collection": coll.Name()}, bson.M{"$set": bson.M{ "current": uid, }}, options.Update().SetUpsert(true)) } fmt.Printf("%v\n", data) } } func Connect(uri string, dbName string) { cli, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(uri)) if err != nil { log.Fatal("failed to open database") } panik(err) DB = cli.Database(dbName) colls, err := DB.ListCollectionNames(context.TODO(), bson.M{"name": bson.M{"$ne": COUNTER_COL}}, options.ListCollections().SetNameOnly(true)) for _, c := range colls { if c == COUNTER_COL { continue } go innerWatch(DB.Collection(c)) } DBClient = cli }