From ea21f5de9aa6d32cfc27968ca2e0720bd20a4437 Mon Sep 17 00:00:00 2001 From: "Xianhui.Lin" Date: Wed, 8 Jan 2025 16:46:30 +0800 Subject: [PATCH] Add schema update time verification to insert and upsert so as to use the cache Signed-off-by: Xianhui.Lin fix unitest Signed-off-by: Xianhui.Lin improve Signed-off-by: Xianhui.Lin improve Signed-off-by: Xianhui.Lin --- go.mod | 2 +- go.sum | 4 +-- internal/metastore/model/collection.go | 2 ++ internal/proxy/impl.go | 18 +++++----- internal/proxy/meta_cache.go | 3 ++ internal/proxy/task.go | 1 + internal/proxy/task_insert.go | 37 +++++++++++++++----- internal/proxy/task_upsert.go | 21 ++++++++++- internal/rootcoord/alter_collection_task.go | 10 ++++++ internal/rootcoord/create_collection_task.go | 3 +- internal/rootcoord/root_coord.go | 1 + pkg/go.mod | 2 +- pkg/go.sum | 4 +-- pkg/util/merr/errors.go | 2 +- pkg/util/merr/errors_test.go | 2 +- pkg/util/merr/utils.go | 11 ++++++ 16 files changed, 96 insertions(+), 27 deletions(-) diff --git a/go.mod b/go.mod index 5ed45a46879f0..d639d051f7774 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.17.9 github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d - github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b + github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250107132849-4099923438e6 github.com/minio/minio-go/v7 v7.0.73 github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 github.com/prometheus/client_golang v1.14.0 diff --git a/go.sum b/go.sum index f65b21e48d0e7..2ae6c02fac152 100644 --- a/go.sum +++ b/go.sum @@ -630,8 +630,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b h1:iPPhnFx+s7FF53UeWj7A4EYhPRMFPL6mHqyQw7qRjeQ= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250107132849-4099923438e6 h1:IUHMoxffuI2xbtMJJE/XK7kKzavZnN5sMiheIKTDVt8= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250107132849-4099923438e6/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE= github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= diff --git a/internal/metastore/model/collection.go b/internal/metastore/model/collection.go index 2a41804f14641..b99b888711a76 100644 --- a/internal/metastore/model/collection.go +++ b/internal/metastore/model/collection.go @@ -30,6 +30,7 @@ type Collection struct { Properties []*commonpb.KeyValuePair State pb.CollectionState EnableDynamicField bool + UpdateTimestamp uint64 } func (c *Collection) Available() bool { @@ -58,6 +59,7 @@ func (c *Collection) Clone() *Collection { State: c.State, EnableDynamicField: c.EnableDynamicField, Functions: CloneFunctions(c.Functions), + UpdateTimestamp: c.UpdateTimestamp, } } diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index c5fc8fc9ca555..50cc820512436 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -2628,10 +2628,11 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) Version: msgpb.InsertDataVersion_ColumnBased, }, }, - idAllocator: node.rowIDAllocator, - segIDAssigner: node.segAssigner, - chMgr: node.chMgr, - chTicker: node.chTicker, + idAllocator: node.rowIDAllocator, + segIDAssigner: node.segAssigner, + chMgr: node.chMgr, + chTicker: node.chTicker, + schemaTimestamp: request.SchemaTimestamp, } var enqueuedTask task = it if streamingutil.IsStreamingServiceEnabled() { @@ -2871,10 +2872,11 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) }, }, - idAllocator: node.rowIDAllocator, - segIDAssigner: node.segAssigner, - chMgr: node.chMgr, - chTicker: node.chTicker, + idAllocator: node.rowIDAllocator, + segIDAssigner: node.segAssigner, + chMgr: node.chMgr, + chTicker: node.chTicker, + schemaTimestamp: request.SchemaTimestamp, } var enqueuedTask task = it if streamingutil.IsStreamingServiceEnabled() { diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index feec9752f78bb..7bcdfa6ea66d9 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -103,6 +103,7 @@ type collectionInfo struct { consistencyLevel commonpb.ConsistencyLevel partitionKeyIsolation bool replicateID string + updateTimestamp uint64 } type databaseInfo struct { @@ -473,6 +474,7 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string, createdUtcTimestamp: collection.CreatedUtcTimestamp, consistencyLevel: collection.ConsistencyLevel, partitionKeyIsolation: isolation, + updateTimestamp: collection.UpdateTimestamp, }, nil } _, dbOk := m.collInfo[database] @@ -490,6 +492,7 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string, consistencyLevel: collection.ConsistencyLevel, partitionKeyIsolation: isolation, replicateID: replicateID, + updateTimestamp: collection.UpdateTimestamp, } log.Ctx(ctx).Info("meta update success", zap.String("database", database), zap.String("collectionName", collectionName), diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 786e2171eb1f4..4ae850663ee43 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -703,6 +703,7 @@ func (t *describeCollectionTask) Execute(ctx context.Context) error { t.result.Properties = result.Properties t.result.DbName = result.GetDbName() t.result.NumPartitions = result.NumPartitions + t.result.UpdateTimestamp = result.UpdateTimestamp for _, field := range result.Schema.Fields { if field.IsDynamic { continue diff --git a/internal/proxy/task_insert.go b/internal/proxy/task_insert.go index 9de31cd53d600..d9f305639a547 100644 --- a/internal/proxy/task_insert.go +++ b/internal/proxy/task_insert.go @@ -29,15 +29,16 @@ type insertTask struct { insertMsg *BaseInsertTask ctx context.Context - result *milvuspb.MutationResult - idAllocator *allocator.IDAllocator - segIDAssigner *segIDAssigner - chMgr channelsMgr - chTicker channelsTimeTicker - vChannels []vChan - pChannels []pChan - schema *schemapb.CollectionSchema - partitionKeys *schemapb.FieldData + result *milvuspb.MutationResult + idAllocator *allocator.IDAllocator + segIDAssigner *segIDAssigner + chMgr channelsMgr + chTicker channelsTimeTicker + vChannels []vChan + pChannels []pChan + schema *schemapb.CollectionSchema + partitionKeys *schemapb.FieldData + schemaTimestamp uint64 } // TraceCtx returns insertTask context @@ -125,6 +126,24 @@ func (it *insertTask) PreExecute(ctx context.Context) error { return merr.WrapErrAsInputError(merr.WrapErrParameterTooLarge("insert request size exceeds maxInsertSize")) } + collID, err := globalMetaCache.GetCollectionID(context.Background(), it.insertMsg.GetDbName(), collectionName) + if err != nil { + log.Ctx(ctx).Warn("fail to get collection id", zap.Error(err)) + return err + } + colInfo, err := globalMetaCache.GetCollectionInfo(ctx, it.insertMsg.GetDbName(), collectionName, collID) + if err != nil { + log.Ctx(ctx).Warn("fail to get collection info", zap.Error(err)) + return err + } + if it.schemaTimestamp != 0 { + if it.schemaTimestamp != colInfo.updateTimestamp { + err := merr.WrapErrCollectionSchemaMisMatch(collectionName) + log.Ctx(ctx).Warn("collection schema mismatch", zap.String("collectionName", collectionName), zap.Error(err)) + return err + } + } + replicateID, err := GetReplicateID(it.ctx, it.insertMsg.GetDbName(), collectionName) if err != nil { log.Warn("get replicate id failed", zap.String("collectionName", collectionName), zap.Error(err)) diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go index 1de223fa4124d..6edc6ebf25fb5 100644 --- a/internal/proxy/task_upsert.go +++ b/internal/proxy/task_upsert.go @@ -65,7 +65,8 @@ type upsertTask struct { partitionKeys *schemapb.FieldData // automatic generate pk as new pk wehen autoID == true // delete task need use the oldIds - oldIds *schemapb.IDs + oldIds *schemapb.IDs + schemaTimestamp uint64 } // TraceCtx returns upsertTask context @@ -292,6 +293,24 @@ func (it *upsertTask) PreExecute(ctx context.Context) error { Timestamp: it.EndTs(), } + collID, err := globalMetaCache.GetCollectionID(context.Background(), it.req.GetDbName(), collectionName) + if err != nil { + log.Warn("fail to get collection id", zap.Error(err)) + return err + } + colInfo, err := globalMetaCache.GetCollectionInfo(ctx, it.req.GetDbName(), collectionName, collID) + if err != nil { + log.Warn("fail to get collection info", zap.Error(err)) + return err + } + if it.schemaTimestamp != 0 { + if it.schemaTimestamp != colInfo.updateTimestamp { + err := merr.WrapErrCollectionSchemaMisMatch(collectionName) + log.Warn("collection schema mismatch", zap.String("collectionName", collectionName), zap.Error(err)) + return err + } + } + replicateID, err := GetReplicateID(ctx, it.req.GetDbName(), collectionName) if err != nil { log.Warn("get replicate info failed", zap.String("collectionName", collectionName), zap.Error(err)) diff --git a/internal/rootcoord/alter_collection_task.go b/internal/rootcoord/alter_collection_task.go index 97dddc29312cf..c18f0b531b4b2 100644 --- a/internal/rootcoord/alter_collection_task.go +++ b/internal/rootcoord/alter_collection_task.go @@ -102,6 +102,10 @@ func executeAlterCollectionTaskSteps(ctx context.Context, oldColl.Properties = oldProperties newColl := col.Clone() newColl.Properties = newProperties + ts, err := core.tsoAllocator.GenerateTSO(1) + if err != nil { + newColl.UpdateTimestamp = ts + } redoTask := newBaseRedoTask(core.stepExecutor) redoTask.AddSyncStep(&AlterCollectionStep{ baseStep: baseStep{core: core}, @@ -280,6 +284,12 @@ func executeAlterCollectionFieldTaskSteps(ctx context.Context, if err != nil { return err } + + ts, err = core.tsoAllocator.GenerateTSO(1) + if err != nil { + newColl.UpdateTimestamp = ts + } + redoTask := newBaseRedoTask(core.stepExecutor) redoTask.AddSyncStep(&AlterCollectionStep{ baseStep: baseStep{core: core}, diff --git a/internal/rootcoord/create_collection_task.go b/internal/rootcoord/create_collection_task.go index 00f1804127aa4..5bf0f402e9667 100644 --- a/internal/rootcoord/create_collection_task.go +++ b/internal/rootcoord/create_collection_task.go @@ -573,7 +573,7 @@ func (t *createCollectionTask) Execute(ctx context.Context) error { State: pb.PartitionState_PartitionCreated, } } - + tso, _ := t.core.tsoAllocator.GenerateTSO(1) collInfo := model.Collection{ CollectionID: collID, DBID: t.dbID, @@ -592,6 +592,7 @@ func (t *createCollectionTask) Execute(ctx context.Context) error { Partitions: partitions, Properties: t.Req.Properties, EnableDynamicField: t.schema.EnableDynamicField, + UpdateTimestamp: tso, } // We cannot check the idempotency inside meta table when adding collection, since we'll execute duplicate steps diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index ecdba34ee7f7d..8d1cea4761c77 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1218,6 +1218,7 @@ func convertModelToDesc(collInfo *model.Collection, aliases []string, dbName str resp.Properties = collInfo.Properties resp.NumPartitions = int64(len(collInfo.Partitions)) resp.DbId = collInfo.DBID + resp.UpdateTimestamp = collInfo.UpdateTimestamp return resp } diff --git a/pkg/go.mod b/pkg/go.mod index dd9f7bbffd09c..d35968c74d156 100644 --- a/pkg/go.mod +++ b/pkg/go.mod @@ -14,7 +14,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/json-iterator/go v1.1.12 github.com/klauspost/compress v1.17.7 - github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b + github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250107132849-4099923438e6 github.com/nats-io/nats-server/v2 v2.10.12 github.com/nats-io/nats.go v1.34.1 github.com/panjf2000/ants/v2 v2.7.2 diff --git a/pkg/go.sum b/pkg/go.sum index d339ec77fba97..f39f6db46417c 100644 --- a/pkg/go.sum +++ b/pkg/go.sum @@ -488,8 +488,8 @@ github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119 h1:9VXijWu github.com/milvus-io/cgosymbolizer v0.0.0-20240722103217-b7dee0e50119/go.mod h1:DvXTE/K/RtHehxU8/GtDs4vFtfw64jJ3PaCnFri8CRg= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b h1:iPPhnFx+s7FF53UeWj7A4EYhPRMFPL6mHqyQw7qRjeQ= -github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20241211060635-410431d7865b/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250107132849-4099923438e6 h1:IUHMoxffuI2xbtMJJE/XK7kKzavZnN5sMiheIKTDVt8= +github.com/milvus-io/milvus-proto/go-api/v2 v2.5.0-beta.0.20250107132849-4099923438e6/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/pulsar-client-go v0.12.1 h1:O2JZp1tsYiO7C0MQ4hrUY/aJXnn2Gry6hpm7UodghmE= github.com/milvus-io/pulsar-client-go v0.12.1/go.mod h1:dkutuH4oS2pXiGm+Ti7fQZ4MRjrMPZ8IJeEGAWMeckk= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index b36d7230b93b8..2c08ddc055fb5 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -71,7 +71,7 @@ var ( ErrCollectionOnRecovering = newMilvusError("collection on recovering", 106, true) ErrCollectionVectorClusteringKeyNotAllowed = newMilvusError("vector clustering key not allowed", 107, false) ErrCollectionReplicateMode = newMilvusError("can't operate on the collection under standby mode", 108, false) - + ErrCollectionSchemaMismatch = newMilvusError("collection schema mismatch", 109, false) // Partition related ErrPartitionNotFound = newMilvusError("partition not found", 200, false) ErrPartitionNotLoaded = newMilvusError("partition not loaded", 201, false) diff --git a/pkg/util/merr/errors_test.go b/pkg/util/merr/errors_test.go index 4febe3e64c2e4..536c25daa2b64 100644 --- a/pkg/util/merr/errors_test.go +++ b/pkg/util/merr/errors_test.go @@ -88,7 +88,7 @@ func (s *ErrSuite) TestWrap() { s.ErrorIs(WrapErrCollectionNotLoaded("test_collection", "failed to alter index %s", "hnsw"), ErrCollectionNotLoaded) s.ErrorIs(WrapErrCollectionOnRecovering("test_collection", "channel lost %s", "dev"), ErrCollectionOnRecovering) s.ErrorIs(WrapErrCollectionVectorClusteringKeyNotAllowed("test_collection", "field"), ErrCollectionVectorClusteringKeyNotAllowed) - + s.ErrorIs(WrapErrCollectionSchemaMisMatch("schema mismatch", "field"), ErrCollectionSchemaMismatch) // Partition related s.ErrorIs(WrapErrPartitionNotFound("test_partition", "failed to get partition"), ErrPartitionNotFound) s.ErrorIs(WrapErrPartitionNotLoaded("test_partition", "failed to query"), ErrPartitionNotLoaded) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index f86711e7297f9..818efd1691182 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -184,6 +184,9 @@ func oldCode(code int32) commonpb.ErrorCode { case ErrChannelLack.code(): return commonpb.ErrorCode_MetaFailed + case ErrCollectionSchemaMismatch.code(): + return commonpb.ErrorCode_SchemaMismatch + default: return commonpb.ErrorCode_UnexpectedError } @@ -550,6 +553,14 @@ func WrapErrCollectionVectorClusteringKeyNotAllowed(collection any, msgAndArgs . return err } +func WrapErrCollectionSchemaMisMatch(collection any, msg ...string) error { + err := wrapFields(ErrCollectionSchemaMismatch, value("collection", collection)) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "->")) + } + return err +} + func WrapErrAliasNotFound(db any, alias any, msg ...string) error { err := wrapFields(ErrAliasNotFound, value("database", db),