From dffbfc9fe966b162a2b5f3984b0b586b9d384254 Mon Sep 17 00:00:00 2001 From: Daniel Sullivan Date: Sat, 19 Oct 2019 17:34:23 +0900 Subject: [PATCH] Caching initial --- cache_store.go | 80 +++++++++++++++++++++++++++++++++++++++++++++++ callback_query.go | 71 ++++++++++++++++++++++++++++++----------- go.mod | 1 + scope.go | 20 ++++++++++++ 4 files changed, 153 insertions(+), 19 deletions(-) create mode 100644 cache_store.go diff --git a/cache_store.go b/cache_store.go new file mode 100644 index 00000000..f9e1088e --- /dev/null +++ b/cache_store.go @@ -0,0 +1,80 @@ +package gorm + +import ( + "fmt" + "os" + "strconv" + "sync" + "time" +) + +type cacheItem struct { + dataMutex sync.RWMutex + data interface{} + created int64 + accessMutex sync.RWMutex + accessCount int64 +} + +type cache struct { + size int + highWaterMark int + enabled bool + database map[string]*cacheItem +} + +func (c *cache) Enable() { + // Kick off the maintenance loop + size := os.Getenv("QUERY_CACHE_SIZE") + if size == "" { + size = "8192" + } + + highWaterMark := os.Getenv("QUERY_CACHE_HIGH_WATER") + if highWaterMark == "" { + highWaterMark = "6192" + } + + c.size, _ = strconv.Atoi(size) + c.highWaterMark, _ = strconv.Atoi(highWaterMark) + + c.database = make(map[string]*cacheItem, c.size) + + c.enabled = true +} + +func (c cache) GetItem(key string, offset int64) interface{} { + fmt.Println("Getting item " + key) + + if item, ok := c.database[key]; ok { + item.dataMutex.RLock() + item.accessMutex.Lock() + + defer item.dataMutex.RUnlock() + defer item.accessMutex.Unlock() + + item.accessCount++ + + if (item.created+offset < time.Now().Unix()) || offset == -1 { + return item.data + } + } + + return nil +} + +func (c *cache) StoreItem(key string, data interface{}) { + fmt.Println("Storing item " + key) + + if _, ok := c.database[key]; !ok { + c.database[key] = &cacheItem{ + data: data, + created: time.Now().Unix(), + } + } else { + c.database[key].dataMutex.Lock() + c.database[key].data = data + c.database[key].created = time.Now().Unix() + c.database[key].dataMutex.Unlock() + } +} diff --git a/callback_query.go b/callback_query.go index e3b3d534..3dcc703c 100644 --- a/callback_query.go +++ b/callback_query.go @@ -64,35 +64,68 @@ func queryCallback(scope *Scope) { scope.SQL += addExtraSpaceIfExist(fmt.Sprint(str)) } - if rows, err := scope.SQLDB().Query(scope.SQL, scope.SQLVars...); scope.Err(err) == nil { - defer rows.Close() + // Work out if we can return a result from cache + cacheOperation := scope.Cache() - columns, _ := rows.Columns() - for rows.Next() { - scope.db.RowsAffected++ + writeToCache := false + readFromDB := true - elem := results - if isSlice { - elem = reflect.New(resultType).Elem() + key := fmt.Sprint(scope.SQL, scope.SQLVars) + + if cacheOperation != nil { + // If the time is > 0, simply provide the cached results + if *cacheOperation > 0 || *cacheOperation == -1 { + cacheResults := scope.CacheStore().GetItem(key, *cacheOperation) + if cacheResults != nil { + results.Set(reflect.ValueOf(cacheResults)) + readFromDB = false + } else { + readFromDB = true + writeToCache = true } + } else { + readFromDB = true + writeToCache = true + } + } - scope.scan(rows, columns, scope.New(elem.Addr().Interface()).Fields()) + if readFromDB { + if rows, err := scope.SQLDB().Query(scope.SQL, scope.SQLVars...); scope.Err(err) == nil { + defer rows.Close() - if isSlice { - if isPtr { - results.Set(reflect.Append(results, elem.Addr())) - } else { - results.Set(reflect.Append(results, elem)) + columns, _ := rows.Columns() + for rows.Next() { + scope.db.RowsAffected++ + + elem := results + if isSlice { + elem = reflect.New(resultType).Elem() + } + + scope.scan(rows, columns, scope.New(elem.Addr().Interface()).Fields()) + + if isSlice { + if isPtr { + results.Set(reflect.Append(results, elem.Addr())) + } else { + results.Set(reflect.Append(results, elem)) + } } } - } - if err := rows.Err(); err != nil { - scope.Err(err) - } else if scope.db.RowsAffected == 0 && !isSlice { - scope.Err(ErrRecordNotFound) + if err := rows.Err(); err != nil { + scope.Err(err) + } else if scope.db.RowsAffected == 0 && !isSlice { + scope.Err(ErrRecordNotFound) + } + + // If we're allowed, write the results to the cache } } + + if writeToCache { + scope.CacheStore().StoreItem(key, results.Interface()) + } } } diff --git a/go.mod b/go.mod index 43b06f3b..b327ea24 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.12 require ( github.com/denisenkom/go-mssqldb v0.0.0-20190515213511-eb9f6a1743f3 github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5 + github.com/go-redis/redis v6.15.2+incompatible github.com/go-sql-driver/mysql v1.4.1 github.com/jinzhu/inflection v1.0.0 github.com/jinzhu/now v1.0.1 diff --git a/scope.go b/scope.go index 9322cdb6..d20cb9d5 100644 --- a/scope.go +++ b/scope.go @@ -24,6 +24,7 @@ type Scope struct { skipLeft bool fields *[]*Field selectAttrs *[]string + cacheStore *cache } // IndirectValue return scope's reflect value's indirect value @@ -45,6 +46,11 @@ func (scope *Scope) DB() *DB { return scope.db } +// CacheStore returns scope's cache store +func (scope *Scope) CacheStore() *cache { + return scope.cacheStore +} + // NewDB create a new DB without search information func (scope *Scope) NewDB() *DB { if scope.db != nil { @@ -323,10 +329,24 @@ type tabler interface { TableName() string } +type cacher interface { + Cache() *int64 +} + type dbTabler interface { TableName(*DB) string } +func (scope *Scope) Cache() *int64 { + if scope.cacheStore.enabled { + if cacher, ok := scope.Value.(cacher); ok { + return cacher.Cache() + } + } + + return nil +} + // TableName return table name func (scope *Scope) TableName() string { if scope.Search != nil && scope.Search.tableName != nil {