From e1531e7c1b4c637e9588ecd02b7b619eaf0ca136 Mon Sep 17 00:00:00 2001 From: "Xianhui.Lin" Date: Tue, 10 Dec 2024 09:55:30 +0800 Subject: [PATCH] support load json index after loadsegment Signed-off-by: Xianhui.Lin --- internal/proto/query_coord.proto | 2 + internal/querycoordv2/checkers/controller.go | 3 + .../querycoordv2/checkers/stats_checker.go | 193 ++++++++++++++++++ internal/querycoordv2/dist/dist_handler.go | 1 + .../querycoordv2/meta/segment_dist_manager.go | 1 + internal/querycoordv2/task/action.go | 10 +- internal/querycoordv2/task/executor.go | 5 +- internal/querycoordv2/task/scheduler.go | 10 +- internal/querycoordv2/task/utils.go | 6 + internal/querycoordv2/utils/checker.go | 3 + internal/querynodev2/handlers.go | 39 ++++ internal/querynodev2/segments/segment.go | 20 +- .../querynodev2/segments/segment_interface.go | 2 + internal/querynodev2/segments/segment_l0.go | 4 + .../querynodev2/segments/segment_loader.go | 33 +++ internal/querynodev2/services.go | 5 + 16 files changed, 325 insertions(+), 12 deletions(-) create mode 100644 internal/querycoordv2/checkers/stats_checker.go diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index 350a533ce186e..d72cbe1bbc79f 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -395,6 +395,7 @@ enum LoadScope { Full = 0; Delta = 1; Index = 2; + Stats = 3; } message LoadSegmentsRequest { @@ -644,6 +645,7 @@ message SegmentVersionInfo { map index_info = 7; data.SegmentLevel level = 8; bool is_sorted = 9; + repeated int64 field_json_index_stats = 10; } message ChannelVersionInfo { diff --git a/internal/querycoordv2/checkers/controller.go b/internal/querycoordv2/checkers/controller.go index 2cc46e5f1f11b..1baa0acbffe2a 100644 --- a/internal/querycoordv2/checkers/controller.go +++ b/internal/querycoordv2/checkers/controller.go @@ -72,6 +72,7 @@ func NewCheckerController( // todo temporary work around must fix // utils.LeaderChecker: NewLeaderChecker(meta, dist, targetMgr, nodeMgr, true), utils.LeaderChecker: NewLeaderChecker(meta, dist, targetMgr, nodeMgr), + utils.StatsChecker: NewStatsChecker(meta, dist, broker, nodeMgr, targetMgr), } manualCheckChs := map[utils.CheckerType]chan struct{}{ @@ -112,6 +113,8 @@ func getCheckerInterval(checker utils.CheckerType) time.Duration { return Params.QueryCoordCfg.IndexCheckInterval.GetAsDuration(time.Millisecond) case utils.LeaderChecker: return Params.QueryCoordCfg.LeaderViewUpdateInterval.GetAsDuration(time.Second) + case utils.StatsChecker: + return Params.QueryCoordCfg.IndexCheckInterval.GetAsDuration(time.Millisecond) default: return Params.QueryCoordCfg.CheckInterval.GetAsDuration(time.Millisecond) } diff --git a/internal/querycoordv2/checkers/stats_checker.go b/internal/querycoordv2/checkers/stats_checker.go new file mode 100644 index 0000000000000..fa480cbe4ee95 --- /dev/null +++ b/internal/querycoordv2/checkers/stats_checker.go @@ -0,0 +1,193 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package checkers + +import ( + "context" + "time" + + "github.com/samber/lo" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/querycoordv2/params" + "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +var _ Checker = (*StatsChecker)(nil) + +// StatsChecker perform segment stats index check. +type StatsChecker struct { + *checkerActivation + meta *meta.Meta + dist *meta.DistributionManager + broker meta.Broker + nodeMgr *session.NodeManager + + targetMgr meta.TargetManagerInterface +} + +func NewStatsChecker( + meta *meta.Meta, + dist *meta.DistributionManager, + broker meta.Broker, + nodeMgr *session.NodeManager, + targetMgr meta.TargetManagerInterface, +) *StatsChecker { + return &StatsChecker{ + checkerActivation: newCheckerActivation(), + meta: meta, + dist: dist, + broker: broker, + nodeMgr: nodeMgr, + targetMgr: targetMgr, + } +} + +func (c *StatsChecker) ID() utils.CheckerType { + return utils.StatsChecker +} + +func (c *StatsChecker) Description() string { + return "StatsChecker checks stats state change of segments and generates load stats task" +} + +func (c *StatsChecker) Check(ctx context.Context) []task.Task { + if !c.IsActive() { + return nil + } + collectionIDs := c.meta.CollectionManager.GetAll(ctx) + log.Info("StatsChecker", zap.Any("collectionIDs", collectionIDs)) + var tasks []task.Task + + for _, collectionID := range collectionIDs { + resp, err := c.broker.DescribeCollection(ctx, collectionID) + if err != nil { + log.Warn("describeCollection during check stats", zap.Int64("collection", collectionID)) + continue + } + collection := c.meta.CollectionManager.GetCollection(ctx, collectionID) + if collection == nil { + log.Warn("collection released during check stats", zap.Int64("collection", collectionID)) + continue + } + replicas := c.meta.ReplicaManager.GetByCollection(ctx, collectionID) + for _, replica := range replicas { + tasks = append(tasks, c.checkReplica(ctx, collection, replica, resp)...) + } + } + + return tasks +} + +func (c *StatsChecker) checkReplica(ctx context.Context, collection *meta.Collection, replica *meta.Replica, resp *milvuspb.DescribeCollectionResponse) []task.Task { + log := log.Ctx(ctx).With( + zap.Int64("collectionID", collection.GetCollectionID()), + ) + var tasks []task.Task + segments := c.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithReplica(replica)) + idSegments := make(map[int64]*meta.Segment) + roNodeSet := typeutil.NewUniqueSet(replica.GetRONodes()...) + targets := make(map[int64][]int64) // segmentID => FieldID + for _, segment := range segments { + // skip update index in read only node + if roNodeSet.Contain(segment.Node) { + continue + } + + // skip update index for l0 segment + segmentInTarget := c.targetMgr.GetSealedSegment(ctx, collection.GetCollectionID(), segment.GetID(), meta.CurrentTargetFirst) + if segmentInTarget == nil || segmentInTarget.GetLevel() == datapb.SegmentLevel_L0 { + continue + } + missing := c.checkSegment(segment, resp) + if len(missing) > 0 { + targets[segment.GetID()] = missing + idSegments[segment.GetID()] = segment + } + } + + segmentsToUpdate := typeutil.NewSet[int64]() + for _, segmentIDs := range lo.Chunk(lo.Keys(targets), MaxSegmentNumPerGetIndexInfoRPC) { + for _, segmentID := range segmentIDs { + segmentsToUpdate.Insert(segmentID) + } + } + + log.Info("StatsChecker checkReplica", zap.Any("segmentsToUpdate", segmentsToUpdate)) + + tasks = lo.FilterMap(segmentsToUpdate.Collect(), func(segmentID int64, _ int) (task.Task, bool) { + return c.createSegmentUpdateTask(ctx, idSegments[segmentID], replica) + }) + + return tasks +} + +func (c *StatsChecker) checkSegment(segment *meta.Segment, resp *milvuspb.DescribeCollectionResponse) (missFieldIDs []int64) { + var result []int64 + for _, field := range resp.GetSchema().GetFields() { + h := typeutil.CreateFieldSchemaHelper(field) + if h.EnableJsonKeyIndex() { + log.Info("StatsChecker checkSegment", zap.Any("result", result), zap.Any("JsonIndexField", segment.JsonIndexField)) + exists := false + for i := 0; i < len(segment.JsonIndexField); i++ { + if segment.JsonIndexField[i] == field.FieldID { + exists = true + break + } + } + + if !exists { + result = append(result, field.FieldID) + continue + } + } + } + log.Info("StatsChecker checkSegment", zap.Any("result", result), zap.Any("segment", segment.GetJsonKeyStats())) + return result +} + +func (c *StatsChecker) createSegmentUpdateTask(ctx context.Context, segment *meta.Segment, replica *meta.Replica) (task.Task, bool) { + action := task.NewSegmentActionWithScope(segment.Node, task.ActionTypeStatsUpdate, segment.GetInsertChannel(), segment.GetID(), querypb.DataScope_Historical) + t, err := task.NewSegmentTask( + ctx, + params.Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), + c.ID(), + segment.GetCollectionID(), + replica, + action, + ) + if err != nil { + log.Warn("create segment stats update task failed", + zap.Int64("collection", segment.GetCollectionID()), + zap.String("channel", segment.GetInsertChannel()), + zap.Int64("node", segment.Node), + zap.Error(err), + ) + return nil, false + } + t.SetPriority(task.TaskPriorityLow) + t.SetReason("missing json stats") + return t, true +} diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index 828cedc6e5ce3..3a1ff8486053c 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -160,6 +160,7 @@ func (dh *distHandler) updateSegmentsDistribution(ctx context.Context, resp *que Version: s.GetVersion(), LastDeltaTimestamp: s.GetLastDeltaTimestamp(), IndexInfo: s.GetIndexInfo(), + JsonIndexField: s.GetFieldJsonIndexStats(), }) } diff --git a/internal/querycoordv2/meta/segment_dist_manager.go b/internal/querycoordv2/meta/segment_dist_manager.go index 85519b7770360..29a7f63a02af9 100644 --- a/internal/querycoordv2/meta/segment_dist_manager.go +++ b/internal/querycoordv2/meta/segment_dist_manager.go @@ -125,6 +125,7 @@ type Segment struct { Version int64 // Version is the timestamp of loading segment LastDeltaTimestamp uint64 // The timestamp of the last delta record IndexInfo map[int64]*querypb.FieldIndexInfo // index info of loaded segment + JsonIndexField []int64 // json index info of loaded segment } func SegmentFromInfo(info *datapb.SegmentInfo) *Segment { diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index bb426c9955bd6..a80535769f68d 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -35,12 +35,14 @@ const ( ActionTypeGrow ActionType = iota + 1 ActionTypeReduce ActionTypeUpdate + ActionTypeStatsUpdate ) var ActionTypeName = map[ActionType]string{ - ActionTypeGrow: "Grow", - ActionTypeReduce: "Reduce", - ActionTypeUpdate: "Update", + ActionTypeGrow: "Grow", + ActionTypeReduce: "Reduce", + ActionTypeUpdate: "Update", + ActionTypeStatsUpdate: "StatsUpdate", } func (t ActionType) String() string { @@ -155,7 +157,7 @@ func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool return true } return action.rpcReturned.Load() - } else if action.Type() == ActionTypeUpdate { + } else if action.Type() == ActionTypeUpdate || action.Type() == ActionTypeStatsUpdate { return action.rpcReturned.Load() } diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index 263a4e1ec2d71..ca30c0b13d16d 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -156,7 +156,7 @@ func (ex *Executor) removeTask(task Task, step int) { func (ex *Executor) executeSegmentAction(task *SegmentTask, step int) { switch task.Actions()[step].Type() { - case ActionTypeGrow, ActionTypeUpdate: + case ActionTypeGrow, ActionTypeUpdate, ActionTypeStatsUpdate: ex.loadSegment(task, step) case ActionTypeReduce: @@ -463,6 +463,9 @@ func (ex *Executor) executeLeaderAction(task *LeaderTask, step int) { case ActionTypeUpdate: ex.updatePartStatsVersions(task, step) + + case ActionTypeStatsUpdate: + ex.updatePartStatsVersions(task, step) } } diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index cde0c3d43df34..a70d46d59b486 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -47,13 +47,15 @@ const ( TaskTypeReduce TaskTypeMove TaskTypeUpdate + TaskTypeStatsUpdate ) var TaskTypeName = map[Type]string{ - TaskTypeGrow: "Grow", - TaskTypeReduce: "Reduce", - TaskTypeMove: "Move", - TaskTypeUpdate: "Update", + TaskTypeGrow: "Grow", + TaskTypeReduce: "Reduce", + TaskTypeMove: "Move", + TaskTypeUpdate: "Update", + TaskTypeStatsUpdate: "StatsUpdate", } type Type int32 diff --git a/internal/querycoordv2/task/utils.go b/internal/querycoordv2/task/utils.go index c4f4df26e5332..400d945073cb3 100644 --- a/internal/querycoordv2/task/utils.go +++ b/internal/querycoordv2/task/utils.go @@ -95,6 +95,8 @@ func GetTaskType(task Task) Type { return TaskTypeReduce case task.Actions()[0].Type() == ActionTypeUpdate: return TaskTypeUpdate + case task.Actions()[0].Type() == ActionTypeStatsUpdate: + return TaskTypeStatsUpdate } return 0 } @@ -132,6 +134,10 @@ func packLoadSegmentRequest( loadScope = querypb.LoadScope_Index } + if action.Type() == ActionTypeStatsUpdate { + loadScope = querypb.LoadScope_Stats + } + if task.Source() == utils.LeaderChecker { loadScope = querypb.LoadScope_Delta } diff --git a/internal/querycoordv2/utils/checker.go b/internal/querycoordv2/utils/checker.go index 0234ff2e98d8a..b201837d26e6c 100644 --- a/internal/querycoordv2/utils/checker.go +++ b/internal/querycoordv2/utils/checker.go @@ -28,6 +28,7 @@ const ( IndexCheckerName = "index_checker" LeaderCheckerName = "leader_checker" ManualBalanceName = "manual_balance" + StatsCheckerName = "stats_checker" ) type CheckerType int32 @@ -39,6 +40,7 @@ const ( IndexChecker LeaderChecker ManualBalance + StatsChecker ) var checkerNames = map[CheckerType]string{ @@ -48,6 +50,7 @@ var checkerNames = map[CheckerType]string{ IndexChecker: IndexCheckerName, LeaderChecker: LeaderCheckerName, ManualBalance: ManualBalanceName, + StatsChecker: StatsCheckerName, } func (s CheckerType) String() string { diff --git a/internal/querynodev2/handlers.go b/internal/querynodev2/handlers.go index 5b16778a49424..0416a846e5638 100644 --- a/internal/querynodev2/handlers.go +++ b/internal/querynodev2/handlers.go @@ -186,6 +186,45 @@ func (node *QueryNode) loadIndex(ctx context.Context, req *querypb.LoadSegmentsR return status } +func (node *QueryNode) loadStats(ctx context.Context, req *querypb.LoadSegmentsRequest) *commonpb.Status { + log := log.Ctx(ctx).With( + zap.Int64("collectionID", req.GetCollectionID()), + zap.Int64s("segmentIDs", lo.Map(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })), + ) + + status := merr.Success() + log.Info("start to load stats") + + for _, info := range req.GetInfos() { + log := log.With(zap.Int64("segmentID", info.GetSegmentID())) + segment := node.manager.Segment.GetSealed(info.GetSegmentID()) + if segment == nil { + log.Warn("segment not found for load stats operation") + continue + } + localSegment, ok := segment.(*segments.LocalSegment) + if !ok { + log.Warn("segment not local for load stats opeartion") + continue + } + + if localSegment.IsLazyLoad() { + localSegment.SetLoadInfo(info) + localSegment.SetNeedUpdatedVersion(req.GetVersion()) + node.manager.DiskCache.MarkItemNeedReload(ctx, localSegment.ID()) + return nil + } + err := node.loader.LoadJsonIndex(ctx, localSegment, info) + if err != nil { + log.Warn("failed to load stats", zap.Error(err)) + status = merr.Status(err) + break + } + } + + return status +} + func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryRequest, channel string) (*internalpb.RetrieveResults, error) { msgID := req.Req.Base.GetMsgID() traceID := trace.SpanFromContext(ctx).SpanContext().TraceID() diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 7a04b7c5e13ea..52c01b3fe60a8 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -279,6 +279,7 @@ type LocalSegment struct { lastDeltaTimestamp *atomic.Uint64 fields *typeutil.ConcurrentMap[int64, *FieldInfo] fieldIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo] + fieldJsonStats []int64 } func NewSegment(ctx context.Context, @@ -1091,7 +1092,17 @@ func (s *LocalSegment) LoadTextIndex(ctx context.Context, textLogs *datapb.TextI func (s *LocalSegment) LoadJsonKeyIndex(ctx context.Context, jsonKeyStats *datapb.JsonKeyStats, schemaHelper *typeutil.SchemaHelper) error { log.Ctx(ctx).Info("load json key index", zap.Int64("field id", jsonKeyStats.GetFieldID()), zap.Any("json key logs", jsonKeyStats)) - + exists := false + for _, field := range s.fieldJsonStats { + if field == jsonKeyStats.GetFieldID() { + exists = true + break + } + } + if exists { + log.Warn("JsonKeyIndexStats already loaded") + return nil + } f, err := schemaHelper.GetFieldFromID(jsonKeyStats.GetFieldID()) if err != nil { return err @@ -1118,9 +1129,8 @@ func (s *LocalSegment) LoadJsonKeyIndex(ctx context.Context, jsonKeyStats *datap status = C.LoadJsonKeyIndex(traceCtx.ctx, s.ptr, (*C.uint8_t)(unsafe.Pointer(&marshaled[0])), (C.uint64_t)(len(marshaled))) return nil, nil }).Await() - + s.fieldJsonStats = append(s.fieldJsonStats, jsonKeyStats.GetFieldID()) return HandleCStatus(ctx, &status, "Load JsonKeyStats failed") - } func (s *LocalSegment) UpdateIndexInfo(ctx context.Context, indexInfo *querypb.FieldIndexInfo, info *LoadIndexInfo) error { @@ -1380,3 +1390,7 @@ func (s *LocalSegment) indexNeedLoadRawData(schema *schemapb.CollectionSchema, i } return !typeutil.IsVectorType(fieldSchema.DataType) && s.HasRawData(indexInfo.IndexInfo.FieldID), nil } + +func (s *LocalSegment) GetFieldJsonIndexStats() []int64 { + return s.fieldJsonStats +} diff --git a/internal/querynodev2/segments/segment_interface.go b/internal/querynodev2/segments/segment_interface.go index 400886ccd5edf..d930190c49ec3 100644 --- a/internal/querynodev2/segments/segment_interface.go +++ b/internal/querynodev2/segments/segment_interface.go @@ -102,4 +102,6 @@ type Segment interface { // lazy load related NeedUpdatedVersion() int64 RemoveUnusedFieldFiles() error + + GetFieldJsonIndexStats() []int64 } diff --git a/internal/querynodev2/segments/segment_l0.go b/internal/querynodev2/segments/segment_l0.go index cab1f64b7645a..c556bc0590c66 100644 --- a/internal/querynodev2/segments/segment_l0.go +++ b/internal/querynodev2/segments/segment_l0.go @@ -188,3 +188,7 @@ func (s *L0Segment) Release(ctx context.Context, opts ...releaseOption) { func (s *L0Segment) RemoveUnusedFieldFiles() error { panic("not implemented") } + +func (s *L0Segment) GetFieldJsonIndexStats() []int64 { + return nil +} diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index f0d51c9b611ad..1435e896944b9 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -93,6 +93,10 @@ type Loader interface { segment Segment, loadInfo *querypb.SegmentLoadInfo, ) error + + LoadJsonIndex(ctx context.Context, + segment Segment, + info *querypb.SegmentLoadInfo) error } type ResourceEstimate struct { @@ -1684,6 +1688,35 @@ func (loader *segmentLoader) LoadIndex(ctx context.Context, return loader.waitSegmentLoadDone(ctx, commonpb.SegmentState_SegmentStateNone, []int64{loadInfo.GetSegmentID()}, version) } +func (loader *segmentLoader) LoadJsonIndex(ctx context.Context, + seg Segment, + loadInfo *querypb.SegmentLoadInfo, +) error { + segment, ok := seg.(*LocalSegment) + if !ok { + return merr.WrapErrParameterInvalid("LocalSegment", fmt.Sprintf("%T", seg)) + } + + collection := segment.GetCollection() + schemaHelper, _ := typeutil.CreateSchemaHelper(collection.Schema()) + + jsonKeyIndexInfo := make(map[int64]*datapb.JsonKeyStats, len(loadInfo.GetJsonKeyStatsLogs())) + for _, fieldStatsLog := range loadInfo.GetJsonKeyStatsLogs() { + jsonKeyLog, ok := jsonKeyIndexInfo[fieldStatsLog.FieldID] + if !ok { + jsonKeyIndexInfo[fieldStatsLog.FieldID] = fieldStatsLog + } else if fieldStatsLog.GetVersion() > jsonKeyLog.GetVersion() { + jsonKeyIndexInfo[fieldStatsLog.FieldID] = fieldStatsLog + } + } + for _, info := range jsonKeyIndexInfo { + if err := segment.LoadJsonKeyIndex(ctx, info, schemaHelper); err != nil { + return err + } + } + return nil +} + func getBinlogDataDiskSize(fieldBinlog *datapb.FieldBinlog) int64 { fieldSize := int64(0) for _, binlog := range fieldBinlog.Binlogs { diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 35a1b4fa89878..df555d040bf71 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -485,6 +485,9 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen if req.GetLoadScope() == querypb.LoadScope_Index { return node.loadIndex(ctx, req), nil } + if req.GetLoadScope() == querypb.LoadScope_Stats { + return node.loadStats(ctx, req), nil + } // Actual load segment log.Info("start to load segments...") @@ -1160,6 +1163,7 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get sealedSegments := node.manager.Segment.GetBy(segments.WithType(commonpb.SegmentState_Sealed)) segmentVersionInfos := make([]*querypb.SegmentVersionInfo, 0, len(sealedSegments)) for _, s := range sealedSegments { + log.Info("GetDataDistribution", zap.Any("JsonKeyStatsLogs", s.LoadInfo().GetJsonKeyStatsLogs()), zap.Any("Indexes", s.Indexes())) segmentVersionInfos = append(segmentVersionInfos, &querypb.SegmentVersionInfo{ ID: s.ID(), Collection: s.Collection(), @@ -1172,6 +1176,7 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get IndexInfo: lo.SliceToMap(s.Indexes(), func(info *segments.IndexedFieldInfo) (int64, *querypb.FieldIndexInfo) { return info.IndexInfo.FieldID, info.IndexInfo }), + FieldJsonIndexStats: s.GetFieldJsonIndexStats(), }) }