diamond-orm/registry.go

377 lines
8.9 KiB
Go

package orm
import (
context2 "context"
"fmt"
"log"
"reflect"
"sync"
"github.com/fatih/structtag"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"golang.org/x/net/context"
)
// Reference stores a typed document reference
type Reference struct {
// owning model name
Model string
// the name of the struct field
FieldName string
// index of field in owning struct
Idx int
// the type of the referenced object
HydratedType reflect.Type
// field kind (struct, slice, ...)
Kind reflect.Kind
exists bool
}
type gridFSReference struct {
BucketName string
FilenameFmt string
LoadType reflect.Type
Idx int
}
type TModelRegistry map[string]*Model
// ModelRegistry - the ModelRegistry stores a map containing
// pointers to Model instances, keyed by an associated
// model name
var ModelRegistry = make(TModelRegistry)
// DB - The mongodb database handle
var DB *mongo.Database
// DBClient - The mongodb client
var DBClient *mongo.Client
// NextStringID - Override this function with your own
// string ID generator!
var NextStringID func() string
var mutex sync.Mutex
func makeGfsRef(tag *structtag.Tag, idx int) gridFSReference {
opts := tag.Options
var ffmt string
if len(opts) < 1 {
ffmt = "%s"
} else {
ffmt = opts[0]
}
var typ reflect.Type
if len(opts) < 2 {
typ = reflect.TypeOf("")
} else {
switch opts[1] {
case "bytes":
typ = reflect.TypeOf([]byte{})
case "string":
typ = reflect.TypeOf("")
default:
typ = reflect.TypeOf("")
}
}
return gridFSReference{
FilenameFmt: ffmt,
BucketName: tag.Name,
LoadType: typ,
Idx: idx,
}
}
func makeRef(idx int, modelName string, fieldName string, ht reflect.Type) Reference {
if modelName != "" {
if ModelRegistry.Index(modelName) != -1 {
return Reference{
Idx: idx,
Model: modelName,
HydratedType: ht,
Kind: ht.Kind(),
exists: true,
FieldName: fieldName,
}
}
return Reference{
Idx: idx,
Model: modelName,
FieldName: fieldName,
HydratedType: ht,
Kind: ht.Kind(),
exists: true,
}
}
panic("model name was empty")
}
func parseTags(t reflect.Type, v reflect.Value) (map[string][]InternalIndex, map[string]Reference, map[string]gridFSReference, string) {
coll := ""
refs := make(map[string]Reference)
idcs := make(map[string][]InternalIndex)
gfsRefs := make(map[string]gridFSReference)
for i := 0; i < v.NumField(); i++ {
sft := t.Field(i)
ft := sft.Type
tags, err := structtag.Parse(string(sft.Tag))
panik(err)
shouldContinue := true
for {
if !shouldContinue {
break
}
switch ft.Kind() {
case reflect.Slice:
ft = ft.Elem()
if _, ok := tags.Get("ref"); ok != nil {
if ft.Kind() == reflect.Struct {
ii2, rr2, gg2, _ := parseTags(ft, reflect.New(ft).Elem())
for k, vv := range ii2 {
idcs[sft.Name+"."+k] = vv
}
for k, vv := range rr2 {
refs[sft.Name+"."+k] = vv
}
for k, vv := range gg2 {
gfsRefs[sft.Name+"."+k] = vv
}
}
}
continue
case reflect.Pointer:
ft = ft.Elem()
fallthrough
case reflect.Struct:
if ft.ConvertibleTo(reflect.TypeOf(Document{})) {
collTag, err := tags.Get("coll")
panik(err)
coll = collTag.Name
idxTag, err := tags.Get("idx")
if err == nil {
idcs[sft.Type.Name()] = scanIndex(idxTag.Value())
}
shouldContinue = false
break
}
if refTag, ok := tags.Get("ref"); ok == nil {
sname := sft.Name + "@" + refTag.Name
refs[sname] = makeRef(i, refTag.Name, sft.Name, sft.Type)
}
if gtag, ok := tags.Get("gridfs"); ok == nil {
sname := sft.Name + "@" + gtag.Name
gfsRefs[sname] = makeGfsRef(gtag, i)
}
fallthrough
default:
idxTag, err := tags.Get("idx")
if err == nil {
idcs[sft.Name] = scanIndex(idxTag.Value())
}
if gtag, ok := tags.Get("gridfs"); ok == nil {
sname := sft.Name + "@" + gtag.Name
gfsRefs[sname] = makeGfsRef(gtag, i)
}
shouldContinue = false
}
}
}
return idcs, refs, gfsRefs, coll
}
// Has returns the model typename and Model instance corresponding
// to the argument passed, as well as a boolean indicating whether it
// was found. otherwise returns `"", nil, false`
func (r TModelRegistry) Has(i interface{}) (string, *Model, bool) {
t := reflect.TypeOf(i)
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
n := t.Name()
if rT, ok := ModelRegistry[n]; ok {
return n, rT, true
}
return "", nil, false
}
// HasByName functions almost identically to Has,
// except that it takes a string as its argument.
func (r TModelRegistry) HasByName(n string) (string, *Model, bool) {
if t, ok := ModelRegistry[n]; ok {
return n, t, true
}
return "", nil, false
}
// Index returns the index at which the Document struct is embedded
func (r TModelRegistry) Index(n string) int {
if v, ok := ModelRegistry[n]; ok {
return v.idx
}
return -1
}
func (r TModelRegistry) new_(n string) interface{} {
if name, m, ok := ModelRegistry.HasByName(n); ok {
v := reflect.New(m.Type)
df := v.Elem().Field(m.idx)
do := reflect.New(df.Type())
d := do.Interface().(IDocument)
//d := df.Interface().(IDocument)
d.setModel(*m)
d.getModel().typeName = name
df.Set(reflect.ValueOf(d).Elem())
return v.Interface()
}
return nil
}
func (r TModelRegistry) Get(name string) *Model {
model, ok := r[name]
if !ok {
return nil
}
return model
}
// Model registers models in the ModelRegistry, where
// they can be accessed via a model's struct name
func (r TModelRegistry) Model(mdl ...any) {
defer mutex.Unlock()
mutex.Lock()
for _, m := range mdl {
t := reflect.TypeOf(m)
v := reflect.ValueOf(m)
vp := v
if vp.Kind() != reflect.Ptr {
vp = reflect.New(v.Type())
vp.Elem().Set(v)
}
id, ok := vp.Interface().(HasID)
if !ok {
panic(fmt.Sprintf("you MUST implement the HasID interface!!! skipping...\n"))
}
switch (id).Id().(type) {
case int, int64, int32, string, primitive.ObjectID, uint, uint32, uint64:
break
default:
log.Printf("invalid ID type specified!!! skipping...\n")
}
if t.Kind() == reflect.Ptr {
t = reflect.Indirect(reflect.ValueOf(m)).Type()
v = reflect.ValueOf(m).Elem()
}
n := t.Name()
if t.Kind() != reflect.Struct {
panic(fmt.Sprintf("Only structs can be passed to this function, silly! (passed type: %s)", n))
}
idx := -1
for i := 0; i < v.NumField(); i++ {
ft := t.Field(i)
if (ft.Type.ConvertibleTo(reflect.TypeOf(Document{}))) {
idx = i
break
}
}
if idx < 0 {
panic("A model must embed the Document struct!")
}
inds, refs, gfs, coll := parseTags(t, v)
if coll == "" {
panic(fmt.Sprintf("a Document needs to be given a collection name! (passed type: %s)", n))
}
ModelRegistry[n] = &Model{
idx: idx,
Type: t,
collection: coll,
Indexes: inds,
references: refs,
gridFSReferences: gfs,
}
}
for k, v := range ModelRegistry {
for k2, v2 := range v.references {
if !v2.exists {
if _, ok := ModelRegistry[v2.FieldName]; ok {
tmp := ModelRegistry[k].references[k2]
ModelRegistry[k].references[k2] = Reference{
Model: k,
Idx: tmp.Idx,
FieldName: tmp.FieldName,
Kind: tmp.Kind,
HydratedType: tmp.HydratedType,
exists: true,
}
}
}
}
}
}
func innerWatch(coll *mongo.Collection) {
sspipeline := mongo.Pipeline{
bson.D{{"$match", bson.D{{"$or",
bson.A{
bson.D{{
"operationType", "insert",
}},
bson.D{{"operationType", "update"}},
},
}},
}},
}
stream, err := coll.Watch(context.TODO(), sspipeline, options.ChangeStream().SetFullDocument(options.UpdateLookup).SetFullDocumentBeforeChange(options.WhenAvailable))
panik(err)
defer func(stream *mongo.ChangeStream, ctx context2.Context) {
err := stream.Close(ctx)
panik(err)
}(stream, context.TODO())
for stream.Next(context.TODO()) {
var data bson.M
if err := stream.Decode(&data); err != nil {
log.Fatal(err)
}
var uid = data["documentKey"].(bson.M)["_id"]
if data["operationType"] == "insert" {
counterColl := DB.Collection(COUNTER_COL)
counterColl.UpdateOne(context.TODO(), bson.M{"collection": coll.Name()}, bson.M{"$set": bson.M{
"current": uid,
}}, options.Update().SetUpsert(true))
}
fmt.Printf("%v\n", data)
}
}
func Connect(uri string, dbName string) {
cli, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(uri))
if err != nil {
log.Fatal("failed to open database")
}
panik(err)
DB = cli.Database(dbName)
colls, err := DB.ListCollectionNames(context.TODO(), bson.M{"name": bson.M{"$ne": COUNTER_COL}}, options.ListCollections().SetNameOnly(true))
for _, c := range colls {
if c == COUNTER_COL {
continue
}
go innerWatch(DB.Collection(c))
}
DBClient = cli
}