Skip to content

Commit

Permalink
Add schema update time verification to insert and upsert so as to use…
Browse files Browse the repository at this point in the history
… the cache

Signed-off-by: Xianhui.Lin <[email protected]>

fix unitest

Signed-off-by: Xianhui.Lin <[email protected]>

improve

Signed-off-by: Xianhui.Lin <[email protected]>

improve

Signed-off-by: Xianhui.Lin <[email protected]>
  • Loading branch information
JsDove committed Jan 9, 2025
1 parent aceb972 commit ea21f5d
Show file tree
Hide file tree
Showing 16 changed files with 96 additions and 27 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.9
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250107132849-4099923438e6
github.com/minio/minio-go/v7 v7.0.73
github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81
github.com/prometheus/client_golang v1.14.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -630,8 +630,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b h1:iPPhnFx+s7FF53UeWj7A4EYhPRMFPL6mHqyQw7qRjeQ=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250107132849-4099923438e6 h1:IUHMoxffuI2xbtMJJE/XK7kKzavZnN5sMiheIKTDVt8=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250107132849-4099923438e6/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
Expand Down
2 changes: 2 additions & 0 deletions internal/metastore/model/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Collection struct {
Properties []*commonpb.KeyValuePair
State pb.CollectionState
EnableDynamicField bool
UpdateTimestamp uint64
}

func (c *Collection) Available() bool {
Expand Down Expand Up @@ -58,6 +59,7 @@ func (c *Collection) Clone() *Collection {
State: c.State,
EnableDynamicField: c.EnableDynamicField,
Functions: CloneFunctions(c.Functions),
UpdateTimestamp: c.UpdateTimestamp,
}
}

Expand Down
18 changes: 10 additions & 8 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2628,10 +2628,11 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
Version: msgpb.InsertDataVersion_ColumnBased,
},
},
idAllocator: node.rowIDAllocator,
segIDAssigner: node.segAssigner,
chMgr: node.chMgr,
chTicker: node.chTicker,
idAllocator: node.rowIDAllocator,
segIDAssigner: node.segAssigner,
chMgr: node.chMgr,
chTicker: node.chTicker,
schemaTimestamp: request.SchemaTimestamp,
}
var enqueuedTask task = it
if streamingutil.IsStreamingServiceEnabled() {
Expand Down Expand Up @@ -2871,10 +2872,11 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest)
},
},

idAllocator: node.rowIDAllocator,
segIDAssigner: node.segAssigner,
chMgr: node.chMgr,
chTicker: node.chTicker,
idAllocator: node.rowIDAllocator,
segIDAssigner: node.segAssigner,
chMgr: node.chMgr,
chTicker: node.chTicker,
schemaTimestamp: request.SchemaTimestamp,
}
var enqueuedTask task = it
if streamingutil.IsStreamingServiceEnabled() {
Expand Down
3 changes: 3 additions & 0 deletions internal/proxy/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type collectionInfo struct {
consistencyLevel commonpb.ConsistencyLevel
partitionKeyIsolation bool
replicateID string
updateTimestamp uint64
}

type databaseInfo struct {
Expand Down Expand Up @@ -473,6 +474,7 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
createdUtcTimestamp: collection.CreatedUtcTimestamp,
consistencyLevel: collection.ConsistencyLevel,
partitionKeyIsolation: isolation,
updateTimestamp: collection.UpdateTimestamp,
}, nil
}
_, dbOk := m.collInfo[database]
Expand All @@ -490,6 +492,7 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
consistencyLevel: collection.ConsistencyLevel,
partitionKeyIsolation: isolation,
replicateID: replicateID,
updateTimestamp: collection.UpdateTimestamp,
}

log.Ctx(ctx).Info("meta update success", zap.String("database", database), zap.String("collectionName", collectionName),
Expand Down
1 change: 1 addition & 0 deletions internal/proxy/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ func (t *describeCollectionTask) Execute(ctx context.Context) error {
t.result.Properties = result.Properties
t.result.DbName = result.GetDbName()
t.result.NumPartitions = result.NumPartitions
t.result.UpdateTimestamp = result.UpdateTimestamp
for _, field := range result.Schema.Fields {
if field.IsDynamic {
continue
Expand Down
37 changes: 28 additions & 9 deletions internal/proxy/task_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ type insertTask struct {
insertMsg *BaseInsertTask
ctx context.Context

result *milvuspb.MutationResult
idAllocator *allocator.IDAllocator
segIDAssigner *segIDAssigner
chMgr channelsMgr
chTicker channelsTimeTicker
vChannels []vChan
pChannels []pChan
schema *schemapb.CollectionSchema
partitionKeys *schemapb.FieldData
result *milvuspb.MutationResult
idAllocator *allocator.IDAllocator
segIDAssigner *segIDAssigner
chMgr channelsMgr
chTicker channelsTimeTicker
vChannels []vChan
pChannels []pChan
schema *schemapb.CollectionSchema
partitionKeys *schemapb.FieldData
schemaTimestamp uint64
}

// TraceCtx returns insertTask context
Expand Down Expand Up @@ -125,6 +126,24 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
return merr.WrapErrAsInputError(merr.WrapErrParameterTooLarge("insert request size exceeds maxInsertSize"))
}

collID, err := globalMetaCache.GetCollectionID(context.Background(), it.insertMsg.GetDbName(), collectionName)
if err != nil {
log.Ctx(ctx).Warn("fail to get collection id", zap.Error(err))
return err
}
colInfo, err := globalMetaCache.GetCollectionInfo(ctx, it.insertMsg.GetDbName(), collectionName, collID)
if err != nil {
log.Ctx(ctx).Warn("fail to get collection info", zap.Error(err))
return err
}
if it.schemaTimestamp != 0 {
if it.schemaTimestamp != colInfo.updateTimestamp {
err := merr.WrapErrCollectionSchemaMisMatch(collectionName)
log.Ctx(ctx).Warn("collection schema mismatch", zap.String("collectionName", collectionName), zap.Error(err))
return err
}
}

replicateID, err := GetReplicateID(it.ctx, it.insertMsg.GetDbName(), collectionName)
if err != nil {
log.Warn("get replicate id failed", zap.String("collectionName", collectionName), zap.Error(err))
Expand Down
21 changes: 20 additions & 1 deletion internal/proxy/task_upsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ type upsertTask struct {
partitionKeys *schemapb.FieldData
// automatic generate pk as new pk wehen autoID == true
// delete task need use the oldIds
oldIds *schemapb.IDs
oldIds *schemapb.IDs
schemaTimestamp uint64
}

// TraceCtx returns upsertTask context
Expand Down Expand Up @@ -292,6 +293,24 @@ func (it *upsertTask) PreExecute(ctx context.Context) error {
Timestamp: it.EndTs(),
}

collID, err := globalMetaCache.GetCollectionID(context.Background(), it.req.GetDbName(), collectionName)
if err != nil {
log.Warn("fail to get collection id", zap.Error(err))
return err
}
colInfo, err := globalMetaCache.GetCollectionInfo(ctx, it.req.GetDbName(), collectionName, collID)
if err != nil {
log.Warn("fail to get collection info", zap.Error(err))
return err
}
if it.schemaTimestamp != 0 {
if it.schemaTimestamp != colInfo.updateTimestamp {
err := merr.WrapErrCollectionSchemaMisMatch(collectionName)
log.Warn("collection schema mismatch", zap.String("collectionName", collectionName), zap.Error(err))
return err
}
}

replicateID, err := GetReplicateID(ctx, it.req.GetDbName(), collectionName)
if err != nil {
log.Warn("get replicate info failed", zap.String("collectionName", collectionName), zap.Error(err))
Expand Down
10 changes: 10 additions & 0 deletions internal/rootcoord/alter_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func executeAlterCollectionTaskSteps(ctx context.Context,
oldColl.Properties = oldProperties
newColl := col.Clone()
newColl.Properties = newProperties
ts, err := core.tsoAllocator.GenerateTSO(1)
if err != nil {
newColl.UpdateTimestamp = ts
}
redoTask := newBaseRedoTask(core.stepExecutor)
redoTask.AddSyncStep(&AlterCollectionStep{
baseStep: baseStep{core: core},
Expand Down Expand Up @@ -280,6 +284,12 @@ func executeAlterCollectionFieldTaskSteps(ctx context.Context,
if err != nil {
return err
}

ts, err = core.tsoAllocator.GenerateTSO(1)
if err != nil {
newColl.UpdateTimestamp = ts
}

redoTask := newBaseRedoTask(core.stepExecutor)
redoTask.AddSyncStep(&AlterCollectionStep{
baseStep: baseStep{core: core},
Expand Down
3 changes: 2 additions & 1 deletion internal/rootcoord/create_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
State: pb.PartitionState_PartitionCreated,
}
}

tso, _ := t.core.tsoAllocator.GenerateTSO(1)
collInfo := model.Collection{
CollectionID: collID,
DBID: t.dbID,
Expand All @@ -592,6 +592,7 @@ func (t *createCollectionTask) Execute(ctx context.Context) error {
Partitions: partitions,
Properties: t.Req.Properties,
EnableDynamicField: t.schema.EnableDynamicField,
UpdateTimestamp: tso,
}

// We cannot check the idempotency inside meta table when adding collection, since we'll execute duplicate steps
Expand Down
1 change: 1 addition & 0 deletions internal/rootcoord/root_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,7 @@ func convertModelToDesc(collInfo *model.Collection, aliases []string, dbName str
resp.Properties = collInfo.Properties
resp.NumPartitions = int64(len(collInfo.Partitions))
resp.DbId = collInfo.DBID
resp.UpdateTimestamp = collInfo.UpdateTimestamp
return resp
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/json-iterator/go v1.1.12
github.com/klauspost/compress v1.17.7
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250107132849-4099923438e6
github.com/nats-io/nats-server/v2 v2.10.12
github.com/nats-io/nats.go v1.34.1
github.com/panjf2000/ants/v2 v2.7.2
Expand Down
4 changes: 2 additions & 2 deletions pkg/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -488,8 +488,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu
github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b h1:iPPhnFx+s7FF53UeWj7A4EYhPRMFPL6mHqyQw7qRjeQ=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250107132849-4099923438e6 h1:IUHMoxffuI2xbtMJJE/XK7kKzavZnN5sMiheIKTDVt8=
github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250107132849-4099923438e6/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE=
github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/merr/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var (
ErrCollectionOnRecovering = newMilvusError("collection on recovering", 106, true)
ErrCollectionVectorClusteringKeyNotAllowed = newMilvusError("vector clustering key not allowed", 107, false)
ErrCollectionReplicateMode = newMilvusError("can't operate on the collection under standby mode", 108, false)

ErrCollectionSchemaMismatch = newMilvusError("collection schema mismatch", 109, false)
// Partition related
ErrPartitionNotFound = newMilvusError("partition not found", 200, false)
ErrPartitionNotLoaded = newMilvusError("partition not loaded", 201, false)
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/merr/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (s *ErrSuite) TestWrap() {
s.ErrorIs(WrapErrCollectionNotLoaded("test_collection", "failed to alter index %s", "hnsw"), ErrCollectionNotLoaded)
s.ErrorIs(WrapErrCollectionOnRecovering("test_collection", "channel lost %s", "dev"), ErrCollectionOnRecovering)
s.ErrorIs(WrapErrCollectionVectorClusteringKeyNotAllowed("test_collection", "field"), ErrCollectionVectorClusteringKeyNotAllowed)

s.ErrorIs(WrapErrCollectionSchemaMisMatch("schema mismatch", "field"), ErrCollectionSchemaMismatch)
// Partition related
s.ErrorIs(WrapErrPartitionNotFound("test_partition", "failed to get partition"), ErrPartitionNotFound)
s.ErrorIs(WrapErrPartitionNotLoaded("test_partition", "failed to query"), ErrPartitionNotLoaded)
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/merr/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ func oldCode(code int32) commonpb.ErrorCode {
case ErrChannelLack.code():
return commonpb.ErrorCode_MetaFailed

case ErrCollectionSchemaMismatch.code():
return commonpb.ErrorCode_SchemaMismatch

default:
return commonpb.ErrorCode_UnexpectedError
}
Expand Down Expand Up @@ -550,6 +553,14 @@ func WrapErrCollectionVectorClusteringKeyNotAllowed(collection any, msgAndArgs .
return err
}

func WrapErrCollectionSchemaMisMatch(collection any, msg ...string) error {
err := wrapFields(ErrCollectionSchemaMismatch, value("collection", collection))
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "->"))
}
return err
}

func WrapErrAliasNotFound(db any, alias any, msg ...string) error {
err := wrapFields(ErrAliasNotFound,
value("database", db),
Expand Down

0 comments on commit ea21f5d

Please sign in to comment.