Preload in chunks to avoid mssql parameter limit

This commit is contained in:
Daniel Hammerschmid 2022-07-14 09:40:24 +02:00 committed by Gerhard Gruber
parent 25fabd9f40
commit a4919ae554

View File

@ -242,16 +242,17 @@ func (scope *Scope) handleBelongsToPreload(field *Field, conditions []interface{
return return
} }
// find relations uniquePrimaryKeysMap := map[interface{}]bool{}
results := makeSlice(field.Struct.Type) for _, key := range toQueryValues(primaryKeys) {
scope.Err(preloadDB.Where(fmt.Sprintf("%v IN (%v)", toQueryCondition(scope, relation.AssociationForeignDBNames), toQueryMarks(primaryKeys)), toQueryValues(primaryKeys)...).Find(results, preloadConditions...).Error) uniquePrimaryKeysMap[indirect(reflect.ValueOf(key)).Interface()] = true
}
// assign find results uniquePrimaryKeys := make([]interface{}, 0, len(uniquePrimaryKeysMap))
var ( for key := range uniquePrimaryKeysMap {
resultsValue = indirect(reflect.ValueOf(results)) uniquePrimaryKeys = append(uniquePrimaryKeys, key)
indirectScopeValue = scope.IndirectValue() }
)
indirectScopeValue := scope.IndirectValue()
foreignFieldToObjects := make(map[string][]*reflect.Value) foreignFieldToObjects := make(map[string][]*reflect.Value)
if indirectScopeValue.Kind() == reflect.Slice { if indirectScopeValue.Kind() == reflect.Slice {
for j := 0; j < indirectScopeValue.Len(); j++ { for j := 0; j < indirectScopeValue.Len(); j++ {
@ -261,17 +262,39 @@ func (scope *Scope) handleBelongsToPreload(field *Field, conditions []interface{
} }
} }
for i := 0; i < resultsValue.Len(); i++ { // need to query relations in chunk of 2000
result := resultsValue.Index(i) // to avoid exceeding the mssql parameter limit of 2100
if indirectScopeValue.Kind() == reflect.Slice { chunkSize := 2000
valueString := toString(getValueFromFields(result, relation.AssociationForeignFieldNames)) for i := 0; i < len(uniquePrimaryKeys); i += chunkSize {
if objects, found := foreignFieldToObjects[valueString]; found { var primaryKeyChunk []interface{}
for _, object := range objects { if chunkSize > len(uniquePrimaryKeys)-i {
object.FieldByName(field.Name).Set(result) primaryKeyChunk = uniquePrimaryKeys[i:]
}
}
} else { } else {
scope.Err(field.Set(result)) primaryKeyChunk = uniquePrimaryKeys[i : i+chunkSize]
}
// find relations
results := makeSlice(field.Struct.Type)
queryMarks := "?" + strings.Repeat(",?", len(primaryKeyChunk)-1)
scope.Err(preloadDB.Where(fmt.Sprintf("%v IN (%v)", toQueryCondition(scope, relation.AssociationForeignDBNames), queryMarks), primaryKeyChunk...).Find(results, preloadConditions...).Error)
// assign find results
var (
resultsValue = indirect(reflect.ValueOf(results))
)
for i := 0; i < resultsValue.Len(); i++ {
result := resultsValue.Index(i)
if indirectScopeValue.Kind() == reflect.Slice {
valueString := toString(getValueFromFields(result, relation.AssociationForeignFieldNames))
if objects, found := foreignFieldToObjects[valueString]; found {
for _, object := range objects {
object.FieldByName(field.Name).Set(result)
}
}
} else {
scope.Err(field.Set(result))
}
} }
} }
} }