From 0fa719d4babb5b7f3bf8b7880e08937d98ce130a Mon Sep 17 00:00:00 2001 From: Kang Zhang <100667394+kangclzjc@users.noreply.github.com> Date: Tue, 20 Aug 2024 09:44:19 +0800 Subject: [PATCH] koordlet: add record events (#2162) Signed-off-by: Zhang Kang --- pkg/koordlet/koordlet.go | 2 +- .../protocol/container_context.go | 6 ++ .../runtimehooks/protocol/host_qos_context.go | 6 ++ .../runtimehooks/protocol/kubeqos_context.go | 5 ++ .../runtimehooks/protocol/pod_context.go | 50 +++++++++-- .../runtimehooks/protocol/pod_context_test.go | 84 +++++++++++++++++++ .../runtimehooks/protocol/protocol.go | 2 + .../runtimehooks/reconciler/reconciler.go | 5 ++ pkg/koordlet/runtimehooks/runtimehooks.go | 11 ++- .../runtimehooks/runtimehooks_test.go | 7 +- 10 files changed, 170 insertions(+), 8 deletions(-) diff --git a/pkg/koordlet/koordlet.go b/pkg/koordlet/koordlet.go index 99c4f094b..ea18f34f3 100644 --- a/pkg/koordlet/koordlet.go +++ b/pkg/koordlet/koordlet.go @@ -106,7 +106,7 @@ func NewDaemon(config *config.Configuration) (Daemon, error) { qosManager := qosmanager.NewQOSManager(config.QOSManagerConf, scheme, kubeClient, crdClient, nodeName, statesInformer, metricCache, config.CollectorConf, evictVersion) - runtimeHook, err := runtimehooks.NewRuntimeHook(statesInformer, config.RuntimeHookConf) + runtimeHook, err := runtimehooks.NewRuntimeHook(statesInformer, config.RuntimeHookConf, scheme, kubeClient, nodeName) if err != nil { return nil, err } diff --git a/pkg/koordlet/runtimehooks/protocol/container_context.go b/pkg/koordlet/runtimehooks/protocol/container_context.go index 1d85ab009..3779c3e84 100644 --- a/pkg/koordlet/runtimehooks/protocol/container_context.go +++ b/pkg/koordlet/runtimehooks/protocol/container_context.go @@ -21,6 +21,8 @@ import ( "strings" "github.com/containerd/nri/pkg/api" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" "k8s.io/klog/v2" apiext "github.com/koordinator-sh/koordinator/apis/extension" @@ -217,6 +219,10 @@ type ContainerContext struct { updaters []resourceexecutor.ResourceUpdater } +func (c *ContainerContext) RecordEvent(r record.EventRecorder, pod *corev1.Pod) { + //TODO: Don't record pod by container level +} + func (c *ContainerContext) FromNri(pod *api.PodSandbox, container *api.Container) { c.Request.FromNri(pod, container) } diff --git a/pkg/koordlet/runtimehooks/protocol/host_qos_context.go b/pkg/koordlet/runtimehooks/protocol/host_qos_context.go index 3b89c8207..3bfd998d4 100644 --- a/pkg/koordlet/runtimehooks/protocol/host_qos_context.go +++ b/pkg/koordlet/runtimehooks/protocol/host_qos_context.go @@ -17,6 +17,8 @@ limitations under the License. package protocol import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" "k8s.io/klog/v2" ext "github.com/koordinator-sh/koordinator/apis/extension" @@ -49,6 +51,10 @@ type HostAppContext struct { updaters []resourceexecutor.ResourceUpdater } +func (c *HostAppContext) RecordEvent(r record.EventRecorder, pod *corev1.Pod) { + //TODO: don't support record pod by host level +} + func (c *HostAppContext) FromReconciler(hostAppSpec *slov1alpha1.HostApplicationSpec) { c.Request.FromReconciler(hostAppSpec) } diff --git a/pkg/koordlet/runtimehooks/protocol/kubeqos_context.go b/pkg/koordlet/runtimehooks/protocol/kubeqos_context.go index b93156e02..1865b43ca 100644 --- a/pkg/koordlet/runtimehooks/protocol/kubeqos_context.go +++ b/pkg/koordlet/runtimehooks/protocol/kubeqos_context.go @@ -18,6 +18,7 @@ package protocol import ( corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "github.com/koordinator-sh/koordinator/pkg/koordlet/audit" @@ -46,6 +47,10 @@ type KubeQOSContext struct { updaters []resourceexecutor.ResourceUpdater } +func (k *KubeQOSContext) RecordEvent(r record.EventRecorder, pod *corev1.Pod) { + //TODO: Don't record pods by QoS +} + func (k *KubeQOSContext) FromReconciler(kubeQOS corev1.PodQOSClass) { k.Request.FromReconciler(kubeQOS) } diff --git a/pkg/koordlet/runtimehooks/protocol/pod_context.go b/pkg/koordlet/runtimehooks/protocol/pod_context.go index ea1ec3106..be7576ed6 100644 --- a/pkg/koordlet/runtimehooks/protocol/pod_context.go +++ b/pkg/koordlet/runtimehooks/protocol/pod_context.go @@ -18,10 +18,13 @@ package protocol import ( "fmt" + "sort" "github.com/containerd/nri/pkg/api" - + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" + recutil "k8s.io/client-go/tools/record/util" "k8s.io/klog/v2" apiext "github.com/koordinator-sh/koordinator/apis/extension" @@ -122,15 +125,52 @@ func (p *PodRequest) FromReconciler(podMeta *statesinformer.PodMeta) { } } +type RecorderEvent struct { + HookName string + MsgFmt string + Reason string + EventType string +} + type PodResponse struct { Resources Resources } type PodContext struct { - Request PodRequest - Response PodResponse - executor resourceexecutor.ResourceUpdateExecutor - updaters []resourceexecutor.ResourceUpdater + Request PodRequest + Response PodResponse + executor resourceexecutor.ResourceUpdateExecutor + updaters []resourceexecutor.ResourceUpdater + RecorderEvents []RecorderEvent +} + +func (p *PodContext) RecordEvent(r record.EventRecorder, pod *corev1.Pod) { + // Noraml, Warning => RecordEvent + events := make(map[string]RecorderEvent) + for _, event := range p.RecorderEvents { + if !recutil.ValidateEventType(event.EventType) { + klog.Warningf("EventType is not valid %v", event) + continue + } + + e := event + if _, ok := events[event.EventType]; ok { + e.MsgFmt += "-" + event.MsgFmt + e.Reason += "-" + event.Reason + } + events[event.EventType] = e + } + + eventTypes := make([]string, 0, len(events)) + for eventType := range events { + eventTypes = append(eventTypes, eventType) + } + sort.Strings(eventTypes) + + for _, eventType := range eventTypes { + event := events[eventType] + r.Eventf(pod, eventType, event.Reason, event.MsgFmt) + } } func (p *PodResponse) ProxyDone(resp *runtimeapi.PodSandboxHookResponse) { diff --git a/pkg/koordlet/runtimehooks/protocol/pod_context_test.go b/pkg/koordlet/runtimehooks/protocol/pod_context_test.go index ad223ccbc..2b9bb6fc3 100644 --- a/pkg/koordlet/runtimehooks/protocol/pod_context_test.go +++ b/pkg/koordlet/runtimehooks/protocol/pod_context_test.go @@ -21,11 +21,14 @@ import ( "testing" "github.com/containerd/nri/pkg/api" + "github.com/golang/mock/gomock" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "github.com/koordinator-sh/koordinator/apis/extension" + apiext "github.com/koordinator-sh/koordinator/apis/extension" "github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor" + "github.com/koordinator-sh/koordinator/pkg/koordlet/util/testutil" ) func TestPodContext_FromNri(t *testing.T) { @@ -174,3 +177,84 @@ func TestPodContext_NriRemoveDone(t *testing.T) { }) } } + +func TestPodContext_RecordEvent(t *testing.T) { + type fields struct { + Request PodRequest + Response PodResponse + executor resourceexecutor.ResourceUpdateExecutor + updaters []resourceexecutor.ResourceUpdater + RecorderEvents []RecorderEvent + } + type args struct { + pod *corev1.Pod + } + tests := []struct { + name string + fields fields + args args + }{ + { + name: "EventType is not valid", + fields: fields{ + Request: PodRequest{}, + Response: PodResponse{}, + executor: nil, + updaters: nil, + RecorderEvents: []RecorderEvent{ + {EventType: "test"}, + }, + }, + args: args{ + pod: nil, + }, + }, + { + name: "EventType is valid", + fields: fields{ + Request: PodRequest{}, + Response: PodResponse{}, + executor: nil, + updaters: nil, + RecorderEvents: []RecorderEvent{ + { + HookName: "resctrl", + EventType: corev1.EventTypeNormal, + MsgFmt: "test", + Reason: "test", + }, + { + HookName: "cpuset", + EventType: corev1.EventTypeNormal, + MsgFmt: "test", + Reason: "test", + }, + { + HookName: "resctrl", + EventType: corev1.EventTypeWarning, + MsgFmt: "test", + Reason: "test", + }, + }, + }, + }, + } + pod := testutil.MockTestPod(apiext.QoSBE, "test_be_pod") + // env + ctl := gomock.NewController(t) + defer ctl.Finish() + + fakeRecorder := &testutil.FakeRecorder{} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &PodContext{ + Request: tt.fields.Request, + Response: tt.fields.Response, + executor: tt.fields.executor, + updaters: tt.fields.updaters, + RecorderEvents: tt.fields.RecorderEvents, + } + p.RecordEvent(fakeRecorder, pod) + }) + } +} diff --git a/pkg/koordlet/runtimehooks/protocol/protocol.go b/pkg/koordlet/runtimehooks/protocol/protocol.go index b3f6a93c8..6a0a4401c 100644 --- a/pkg/koordlet/runtimehooks/protocol/protocol.go +++ b/pkg/koordlet/runtimehooks/protocol/protocol.go @@ -20,6 +20,7 @@ import ( "strconv" corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api/v1/resource" slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" @@ -33,6 +34,7 @@ type HooksProtocol interface { ReconcilerDone(executor resourceexecutor.ResourceUpdateExecutor) Update() GetUpdaters() []resourceexecutor.ResourceUpdater + RecordEvent(r record.EventRecorder, pod *corev1.Pod) } type hooksProtocolBuilder struct { diff --git a/pkg/koordlet/runtimehooks/reconciler/reconciler.go b/pkg/koordlet/runtimehooks/reconciler/reconciler.go index 8f64e6c4d..cf10bcc62 100644 --- a/pkg/koordlet/runtimehooks/reconciler/reconciler.go +++ b/pkg/koordlet/runtimehooks/reconciler/reconciler.go @@ -22,6 +22,7 @@ import ( "time" corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/record" "k8s.io/klog/v2" apiext "github.com/koordinator-sh/koordinator/apis/extension" @@ -301,6 +302,7 @@ type Context struct { StatesInformer statesinformer.StatesInformer Executor resourceexecutor.ResourceUpdateExecutor ReconcileInterval time.Duration + EventRecorder record.EventRecorder } func NewReconciler(ctx Context) Reconciler { @@ -308,6 +310,7 @@ func NewReconciler(ctx Context) Reconciler { podUpdated: make(chan struct{}, 1), executor: ctx.Executor, reconcileInterval: ctx.ReconcileInterval, + eventRecorder: ctx.EventRecorder, } // TODO register individual pod event ctx.StatesInformer.RegisterCallbacks(statesinformer.RegisterTypeAllPods, "runtime-hooks-reconciler", @@ -321,6 +324,7 @@ type reconciler struct { podUpdated chan struct{} executor resourceexecutor.ResourceUpdateExecutor reconcileInterval time.Duration + eventRecorder record.EventRecorder } func (c *reconciler) Run(stopCh <-chan struct{}) error { @@ -420,6 +424,7 @@ func (c *reconciler) reconcilePodCgroup(stopCh <-chan struct{}) { klog.V(5).Infof("calling reconcile function %v for pod %v finished", r.description, podMeta.Key()) } + podCtx.RecordEvent(c.eventRecorder, podMeta.Pod) } for resourceType, r := range globalCgroupReconcilers.sandboxContainerLevel { diff --git a/pkg/koordlet/runtimehooks/runtimehooks.go b/pkg/koordlet/runtimehooks/runtimehooks.go index 019d4e413..bdae743bf 100644 --- a/pkg/koordlet/runtimehooks/runtimehooks.go +++ b/pkg/koordlet/runtimehooks/runtimehooks.go @@ -19,7 +19,12 @@ package runtimehooks import ( "fmt" + corev1 "k8s.io/api/core/v1" + apiruntime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" + clientset "k8s.io/client-go/kubernetes" + clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "github.com/koordinator-sh/koordinator/pkg/features" @@ -82,7 +87,10 @@ func (r *runtimeHook) Run(stopCh <-chan struct{}) error { return nil } -func NewRuntimeHook(si statesinformer.StatesInformer, cfg *Config) (RuntimeHook, error) { +func NewRuntimeHook(si statesinformer.StatesInformer, cfg *Config, schema *apiruntime.Scheme, kubeClient clientset.Interface, nodeName string) (RuntimeHook, error) { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(schema, corev1.EventSource{Component: "koordlet-runtimehook", Host: nodeName}) failurePolicy, err := config.GetFailurePolicyType(cfg.RuntimeHooksFailurePolicy) if err != nil { return nil, err @@ -136,6 +144,7 @@ func NewRuntimeHook(si statesinformer.StatesInformer, cfg *Config) (RuntimeHook, StatesInformer: si, Executor: e, ReconcileInterval: cfg.RuntimeHookReconcileInterval, + EventRecorder: recorder, } newPluginOptions := hooks.Options{ diff --git a/pkg/koordlet/runtimehooks/runtimehooks_test.go b/pkg/koordlet/runtimehooks/runtimehooks_test.go index b8345e118..545b875ee 100644 --- a/pkg/koordlet/runtimehooks/runtimehooks_test.go +++ b/pkg/koordlet/runtimehooks/runtimehooks_test.go @@ -23,6 +23,8 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + apiruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" "github.com/koordinator-sh/koordinator/pkg/features" mockstatesinformer "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer/mockstatesinformer" @@ -116,7 +118,10 @@ func Test_runtimeHook_Run(t *testing.T) { defer ctrl.Finish() si := mockstatesinformer.NewMockStatesInformer(ctrl) si.EXPECT().RegisterCallbacks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - r, err := NewRuntimeHook(si, tt.fields.config) + scheme := apiruntime.NewScheme() + kubeClient := &kubernetes.Clientset{} + nodeName := "test-node" + r, err := NewRuntimeHook(si, tt.fields.config, scheme, kubeClient, nodeName) assert.NoError(t, err) stop := make(chan struct{})