Skip to content

Commit

Permalink
support load json index after loadsegment
Browse files Browse the repository at this point in the history
Signed-off-by: Xianhui.Lin <[email protected]>
  • Loading branch information
JsDove committed Dec 10, 2024
1 parent aea736e commit e1531e7
Show file tree
Hide file tree
Showing 16 changed files with 325 additions and 12 deletions.
2 changes: 2 additions & 0 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ enum LoadScope {
Full = 0;
Delta = 1;
Index = 2;
Stats = 3;
}

message LoadSegmentsRequest {
Expand Down Expand Up @@ -644,6 +645,7 @@ message SegmentVersionInfo {
map<int64, FieldIndexInfo> index_info = 7;
data.SegmentLevel level = 8;
bool is_sorted = 9;
repeated int64 field_json_index_stats = 10;
}

message ChannelVersionInfo {
Expand Down
3 changes: 3 additions & 0 deletions internal/querycoordv2/checkers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func NewCheckerController(
// todo temporary work around must fix
// utils.LeaderChecker: NewLeaderChecker(meta, dist, targetMgr, nodeMgr, true),
utils.LeaderChecker: NewLeaderChecker(meta, dist, targetMgr, nodeMgr),
utils.StatsChecker: NewStatsChecker(meta, dist, broker, nodeMgr, targetMgr),
}

manualCheckChs := map[utils.CheckerType]chan struct{}{
Expand Down Expand Up @@ -112,6 +113,8 @@ func getCheckerInterval(checker utils.CheckerType) time.Duration {
return Params.QueryCoordCfg.IndexCheckInterval.GetAsDuration(time.Millisecond)
case utils.LeaderChecker:
return Params.QueryCoordCfg.LeaderViewUpdateInterval.GetAsDuration(time.Second)
case utils.StatsChecker:
return Params.QueryCoordCfg.IndexCheckInterval.GetAsDuration(time.Millisecond)
default:
return Params.QueryCoordCfg.CheckInterval.GetAsDuration(time.Millisecond)
}
Expand Down
193 changes: 193 additions & 0 deletions internal/querycoordv2/checkers/stats_checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package checkers

import (
"context"
"time"

"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var _ Checker = (*StatsChecker)(nil)

// StatsChecker perform segment stats index check.
type StatsChecker struct {
*checkerActivation
meta *meta.Meta
dist *meta.DistributionManager
broker meta.Broker
nodeMgr *session.NodeManager

targetMgr meta.TargetManagerInterface
}

func NewStatsChecker(
meta *meta.Meta,
dist *meta.DistributionManager,
broker meta.Broker,
nodeMgr *session.NodeManager,
targetMgr meta.TargetManagerInterface,
) *StatsChecker {
return &StatsChecker{
checkerActivation: newCheckerActivation(),
meta: meta,
dist: dist,
broker: broker,
nodeMgr: nodeMgr,
targetMgr: targetMgr,
}
}

func (c *StatsChecker) ID() utils.CheckerType {
return utils.StatsChecker
}

func (c *StatsChecker) Description() string {
return "StatsChecker checks stats state change of segments and generates load stats task"
}

func (c *StatsChecker) Check(ctx context.Context) []task.Task {
if !c.IsActive() {
return nil
}
collectionIDs := c.meta.CollectionManager.GetAll(ctx)
log.Info("StatsChecker", zap.Any("collectionIDs", collectionIDs))
var tasks []task.Task

for _, collectionID := range collectionIDs {
resp, err := c.broker.DescribeCollection(ctx, collectionID)
if err != nil {
log.Warn("describeCollection during check stats", zap.Int64("collection", collectionID))
continue
}
collection := c.meta.CollectionManager.GetCollection(ctx, collectionID)
if collection == nil {
log.Warn("collection released during check stats", zap.Int64("collection", collectionID))
continue
}
replicas := c.meta.ReplicaManager.GetByCollection(ctx, collectionID)
for _, replica := range replicas {
tasks = append(tasks, c.checkReplica(ctx, collection, replica, resp)...)
}
}

return tasks
}

func (c *StatsChecker) checkReplica(ctx context.Context, collection *meta.Collection, replica *meta.Replica, resp *milvuspb.DescribeCollectionResponse) []task.Task {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", collection.GetCollectionID()),
)
var tasks []task.Task
segments := c.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithReplica(replica))
idSegments := make(map[int64]*meta.Segment)
roNodeSet := typeutil.NewUniqueSet(replica.GetRONodes()...)
targets := make(map[int64][]int64) // segmentID => FieldID
for _, segment := range segments {
// skip update index in read only node
if roNodeSet.Contain(segment.Node) {
continue
}

// skip update index for l0 segment
segmentInTarget := c.targetMgr.GetSealedSegment(ctx, collection.GetCollectionID(), segment.GetID(), meta.CurrentTargetFirst)
if segmentInTarget == nil || segmentInTarget.GetLevel() == datapb.SegmentLevel_L0 {
continue
}
missing := c.checkSegment(segment, resp)
if len(missing) > 0 {
targets[segment.GetID()] = missing
idSegments[segment.GetID()] = segment
}
}

segmentsToUpdate := typeutil.NewSet[int64]()
for _, segmentIDs := range lo.Chunk(lo.Keys(targets), MaxSegmentNumPerGetIndexInfoRPC) {
for _, segmentID := range segmentIDs {
segmentsToUpdate.Insert(segmentID)
}
}

log.Info("StatsChecker checkReplica", zap.Any("segmentsToUpdate", segmentsToUpdate))

tasks = lo.FilterMap(segmentsToUpdate.Collect(), func(segmentID int64, _ int) (task.Task, bool) {
return c.createSegmentUpdateTask(ctx, idSegments[segmentID], replica)
})

return tasks
}

func (c *StatsChecker) checkSegment(segment *meta.Segment, resp *milvuspb.DescribeCollectionResponse) (missFieldIDs []int64) {
var result []int64
for _, field := range resp.GetSchema().GetFields() {
h := typeutil.CreateFieldSchemaHelper(field)
if h.EnableJsonKeyIndex() {
log.Info("StatsChecker checkSegment", zap.Any("result", result), zap.Any("JsonIndexField", segment.JsonIndexField))
exists := false
for i := 0; i < len(segment.JsonIndexField); i++ {
if segment.JsonIndexField[i] == field.FieldID {
exists = true
break
}
}

if !exists {
result = append(result, field.FieldID)
continue
}
}
}
log.Info("StatsChecker checkSegment", zap.Any("result", result), zap.Any("segment", segment.GetJsonKeyStats()))
return result
}

func (c *StatsChecker) createSegmentUpdateTask(ctx context.Context, segment *meta.Segment, replica *meta.Replica) (task.Task, bool) {
action := task.NewSegmentActionWithScope(segment.Node, task.ActionTypeStatsUpdate, segment.GetInsertChannel(), segment.GetID(), querypb.DataScope_Historical)
t, err := task.NewSegmentTask(
ctx,
params.Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond),
c.ID(),
segment.GetCollectionID(),
replica,
action,
)
if err != nil {
log.Warn("create segment stats update task failed",
zap.Int64("collection", segment.GetCollectionID()),
zap.String("channel", segment.GetInsertChannel()),
zap.Int64("node", segment.Node),
zap.Error(err),
)
return nil, false
}
t.SetPriority(task.TaskPriorityLow)
t.SetReason("missing json stats")
return t, true
}
1 change: 1 addition & 0 deletions internal/querycoordv2/dist/dist_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func (dh *distHandler) updateSegmentsDistribution(ctx context.Context, resp *que
Version: s.GetVersion(),
LastDeltaTimestamp: s.GetLastDeltaTimestamp(),
IndexInfo: s.GetIndexInfo(),
JsonIndexField: s.GetFieldJsonIndexStats(),
})
}

Expand Down
1 change: 1 addition & 0 deletions internal/querycoordv2/meta/segment_dist_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ type Segment struct {
Version int64 // Version is the timestamp of loading segment
LastDeltaTimestamp uint64 // The timestamp of the last delta record
IndexInfo map[int64]*querypb.FieldIndexInfo // index info of loaded segment
JsonIndexField []int64 // json index info of loaded segment
}

func SegmentFromInfo(info *datapb.SegmentInfo) *Segment {
Expand Down
10 changes: 6 additions & 4 deletions internal/querycoordv2/task/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ const (
ActionTypeGrow ActionType = iota + 1
ActionTypeReduce
ActionTypeUpdate
ActionTypeStatsUpdate
)

var ActionTypeName = map[ActionType]string{
ActionTypeGrow: "Grow",
ActionTypeReduce: "Reduce",
ActionTypeUpdate: "Update",
ActionTypeGrow: "Grow",
ActionTypeReduce: "Reduce",
ActionTypeUpdate: "Update",
ActionTypeStatsUpdate: "StatsUpdate",
}

func (t ActionType) String() string {
Expand Down Expand Up @@ -155,7 +157,7 @@ func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool
return true
}
return action.rpcReturned.Load()
} else if action.Type() == ActionTypeUpdate {
} else if action.Type() == ActionTypeUpdate || action.Type() == ActionTypeStatsUpdate {
return action.rpcReturned.Load()
}

Expand Down
5 changes: 4 additions & 1 deletion internal/querycoordv2/task/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (ex *Executor) removeTask(task Task, step int) {

func (ex *Executor) executeSegmentAction(task *SegmentTask, step int) {
switch task.Actions()[step].Type() {
case ActionTypeGrow, ActionTypeUpdate:
case ActionTypeGrow, ActionTypeUpdate, ActionTypeStatsUpdate:
ex.loadSegment(task, step)

case ActionTypeReduce:
Expand Down Expand Up @@ -463,6 +463,9 @@ func (ex *Executor) executeLeaderAction(task *LeaderTask, step int) {

case ActionTypeUpdate:
ex.updatePartStatsVersions(task, step)

case ActionTypeStatsUpdate:
ex.updatePartStatsVersions(task, step)
}
}

Expand Down
10 changes: 6 additions & 4 deletions internal/querycoordv2/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ const (
TaskTypeReduce
TaskTypeMove
TaskTypeUpdate
TaskTypeStatsUpdate
)

var TaskTypeName = map[Type]string{
TaskTypeGrow: "Grow",
TaskTypeReduce: "Reduce",
TaskTypeMove: "Move",
TaskTypeUpdate: "Update",
TaskTypeGrow: "Grow",
TaskTypeReduce: "Reduce",
TaskTypeMove: "Move",
TaskTypeUpdate: "Update",
TaskTypeStatsUpdate: "StatsUpdate",
}

type Type int32
Expand Down
6 changes: 6 additions & 0 deletions internal/querycoordv2/task/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ func GetTaskType(task Task) Type {
return TaskTypeReduce
case task.Actions()[0].Type() == ActionTypeUpdate:
return TaskTypeUpdate
case task.Actions()[0].Type() == ActionTypeStatsUpdate:
return TaskTypeStatsUpdate
}
return 0
}
Expand Down Expand Up @@ -132,6 +134,10 @@ func packLoadSegmentRequest(
loadScope = querypb.LoadScope_Index
}

if action.Type() == ActionTypeStatsUpdate {
loadScope = querypb.LoadScope_Stats
}

if task.Source() == utils.LeaderChecker {
loadScope = querypb.LoadScope_Delta
}
Expand Down
3 changes: 3 additions & 0 deletions internal/querycoordv2/utils/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
IndexCheckerName = "index_checker"
LeaderCheckerName = "leader_checker"
ManualBalanceName = "manual_balance"
StatsCheckerName = "stats_checker"
)

type CheckerType int32
Expand All @@ -39,6 +40,7 @@ const (
IndexChecker
LeaderChecker
ManualBalance
StatsChecker
)

var checkerNames = map[CheckerType]string{
Expand All @@ -48,6 +50,7 @@ var checkerNames = map[CheckerType]string{
IndexChecker: IndexCheckerName,
LeaderChecker: LeaderCheckerName,
ManualBalance: ManualBalanceName,
StatsChecker: StatsCheckerName,
}

func (s CheckerType) String() string {
Expand Down
39 changes: 39 additions & 0 deletions internal/querynodev2/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,45 @@ func (node *QueryNode) loadIndex(ctx context.Context, req *querypb.LoadSegmentsR
return status
}

func (node *QueryNode) loadStats(ctx context.Context, req *querypb.LoadSegmentsRequest) *commonpb.Status {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()),
zap.Int64s("segmentIDs", lo.Map(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })),
)

status := merr.Success()
log.Info("start to load stats")

for _, info := range req.GetInfos() {
log := log.With(zap.Int64("segmentID", info.GetSegmentID()))
segment := node.manager.Segment.GetSealed(info.GetSegmentID())
if segment == nil {
log.Warn("segment not found for load stats operation")
continue
}
localSegment, ok := segment.(*segments.LocalSegment)
if !ok {
log.Warn("segment not local for load stats opeartion")
continue
}

if localSegment.IsLazyLoad() {
localSegment.SetLoadInfo(info)
localSegment.SetNeedUpdatedVersion(req.GetVersion())
node.manager.DiskCache.MarkItemNeedReload(ctx, localSegment.ID())
return nil
}
err := node.loader.LoadJsonIndex(ctx, localSegment, info)
if err != nil {
log.Warn("failed to load stats", zap.Error(err))
status = merr.Status(err)
break
}
}

return status
}

func (node *QueryNode) queryChannel(ctx context.Context, req *querypb.QueryRequest, channel string) (*internalpb.RetrieveResults, error) {
msgID := req.Req.Base.GetMsgID()
traceID := trace.SpanFromContext(ctx).SpanContext().TraceID()
Expand Down
Loading

0 comments on commit e1531e7

Please sign in to comment.