Has many preload in chunks to avoid mssql parameter limit

This commit is contained in:
Daniel Hammerschmid 2022-11-09 15:06:15 +01:00 committed by Gerhard Gruber
parent 77fc126a0d
commit ba791c8b56

View File

@ -186,46 +186,68 @@ func (scope *Scope) handleHasManyPreload(field *Field, conditions []interface{})
return return
} }
uniquePrimaryKeysMap := map[interface{}]bool{}
for _, key := range toQueryValues(primaryKeys) {
uniquePrimaryKeysMap[indirect(reflect.ValueOf(key)).Interface()] = true
}
uniquePrimaryKeys := make([]interface{}, 0, len(uniquePrimaryKeysMap))
for key := range uniquePrimaryKeysMap {
uniquePrimaryKeys = append(uniquePrimaryKeys, key)
}
// preload conditions // preload conditions
preloadDB, preloadConditions := scope.generatePreloadDBWithConditions(conditions) preloadDB, preloadConditions := scope.generatePreloadDBWithConditions(conditions)
// find relations indirectScopeValue := scope.IndirectValue()
query := fmt.Sprintf("%v IN (%v)", toQueryCondition(scope, relation.ForeignDBNames), toQueryMarks(primaryKeys))
values := toQueryValues(primaryKeys)
if relation.PolymorphicType != "" {
query += fmt.Sprintf(" AND %v = ?", scope.Quote(relation.PolymorphicDBName))
values = append(values, relation.PolymorphicValue)
}
results := makeSlice(field.Struct.Type) // need to query relations in chunk of 2000
scope.Err(preloadDB.Where(query, values...).Find(results, preloadConditions...).Error) // to avoid exceeding the mssql parameter limit of 2100
chunkSize := 2000
// assign find results for i := 0; i < len(uniquePrimaryKeys); i += chunkSize {
var ( var primaryKeyChunk []interface{}
resultsValue = indirect(reflect.ValueOf(results)) if chunkSize > len(uniquePrimaryKeys)-i {
indirectScopeValue = scope.IndirectValue() primaryKeyChunk = uniquePrimaryKeys[i:]
) } else {
primaryKeyChunk = uniquePrimaryKeys[i : i+chunkSize]
if indirectScopeValue.Kind() == reflect.Slice {
preloadMap := make(map[string][]reflect.Value)
for i := 0; i < resultsValue.Len(); i++ {
result := resultsValue.Index(i)
foreignValues := getValueFromFields(result, relation.ForeignFieldNames)
preloadMap[toString(foreignValues)] = append(preloadMap[toString(foreignValues)], result)
} }
for j := 0; j < indirectScopeValue.Len(); j++ { // find relations
object := indirect(indirectScopeValue.Index(j)) queryMarks := "?" + strings.Repeat(",?", len(primaryKeyChunk)-1)
objectRealValue := getValueFromFields(object, relation.AssociationForeignFieldNames) query := fmt.Sprintf("%v IN (%v)", toQueryCondition(scope, relation.ForeignDBNames), queryMarks)
f := object.FieldByName(field.Name) values := primaryKeyChunk
if results, ok := preloadMap[toString(objectRealValue)]; ok { if relation.PolymorphicType != "" {
f.Set(reflect.Append(f, results...)) query += fmt.Sprintf(" AND %v = ?", scope.Quote(relation.PolymorphicDBName))
} else { values = append(values, relation.PolymorphicValue)
f.Set(reflect.MakeSlice(f.Type(), 0, 0)) }
results := makeSlice(field.Struct.Type)
scope.Err(preloadDB.Where(query, values...).Find(results, preloadConditions...).Error)
// assign find results
resultsValue := indirect(reflect.ValueOf(results))
if indirectScopeValue.Kind() == reflect.Slice {
preloadMap := make(map[string][]reflect.Value)
for i := 0; i < resultsValue.Len(); i++ {
result := resultsValue.Index(i)
foreignValues := getValueFromFields(result, relation.ForeignFieldNames)
preloadMap[toString(foreignValues)] = append(preloadMap[toString(foreignValues)], result)
} }
for j := 0; j < indirectScopeValue.Len(); j++ {
object := indirect(indirectScopeValue.Index(j))
objectRealValue := getValueFromFields(object, relation.AssociationForeignFieldNames)
f := object.FieldByName(field.Name)
if results, ok := preloadMap[toString(objectRealValue)]; ok {
f.Set(reflect.Append(f, results...))
} else {
f.Set(reflect.MakeSlice(f.Type(), 0, 0))
}
}
} else {
scope.Err(field.Set(resultsValue))
} }
} else {
scope.Err(field.Set(resultsValue))
} }
} }