Skip to content

Commit

Permalink
koordlet: add record events (#2162)
Browse files Browse the repository at this point in the history
Signed-off-by: Zhang Kang <[email protected]>
  • Loading branch information
kangclzjc authored Aug 20, 2024
1 parent 4672e54 commit 0fa719d
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pkg/koordlet/koordlet.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func NewDaemon(config *config.Configuration) (Daemon, error) {

qosManager := qosmanager.NewQOSManager(config.QOSManagerConf, scheme, kubeClient, crdClient, nodeName, statesInformer, metricCache, config.CollectorConf, evictVersion)

runtimeHook, err := runtimehooks.NewRuntimeHook(statesInformer, config.RuntimeHookConf)
runtimeHook, err := runtimehooks.NewRuntimeHook(statesInformer, config.RuntimeHookConf, scheme, kubeClient, nodeName)
if err != nil {
return nil, err
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/koordlet/runtimehooks/protocol/container_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"strings"

"github.com/containerd/nri/pkg/api"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
Expand Down Expand Up @@ -217,6 +219,10 @@ type ContainerContext struct {
updaters []resourceexecutor.ResourceUpdater
}

func (c *ContainerContext) RecordEvent(r record.EventRecorder, pod *corev1.Pod) {
//TODO: Don't record pod by container level
}

func (c *ContainerContext) FromNri(pod *api.PodSandbox, container *api.Container) {
c.Request.FromNri(pod, container)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/koordlet/runtimehooks/protocol/host_qos_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package protocol

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"

ext "github.com/koordinator-sh/koordinator/apis/extension"
Expand Down Expand Up @@ -49,6 +51,10 @@ type HostAppContext struct {
updaters []resourceexecutor.ResourceUpdater
}

func (c *HostAppContext) RecordEvent(r record.EventRecorder, pod *corev1.Pod) {
//TODO: don't support record pod by host level
}

func (c *HostAppContext) FromReconciler(hostAppSpec *slov1alpha1.HostApplicationSpec) {
c.Request.FromReconciler(hostAppSpec)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/koordlet/runtimehooks/protocol/kubeqos_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package protocol

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"

"github.com/koordinator-sh/koordinator/pkg/koordlet/audit"
Expand Down Expand Up @@ -46,6 +47,10 @@ type KubeQOSContext struct {
updaters []resourceexecutor.ResourceUpdater
}

func (k *KubeQOSContext) RecordEvent(r record.EventRecorder, pod *corev1.Pod) {
//TODO: Don't record pods by QoS
}

func (k *KubeQOSContext) FromReconciler(kubeQOS corev1.PodQOSClass) {
k.Request.FromReconciler(kubeQOS)
}
Expand Down
50 changes: 45 additions & 5 deletions pkg/koordlet/runtimehooks/protocol/pod_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ package protocol

import (
"fmt"
"sort"

"github.com/containerd/nri/pkg/api"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
recutil "k8s.io/client-go/tools/record/util"
"k8s.io/klog/v2"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
Expand Down Expand Up @@ -122,15 +125,52 @@ func (p *PodRequest) FromReconciler(podMeta *statesinformer.PodMeta) {
}
}

type RecorderEvent struct {
HookName string
MsgFmt string
Reason string
EventType string
}

type PodResponse struct {
Resources Resources
}

type PodContext struct {
Request PodRequest
Response PodResponse
executor resourceexecutor.ResourceUpdateExecutor
updaters []resourceexecutor.ResourceUpdater
Request PodRequest
Response PodResponse
executor resourceexecutor.ResourceUpdateExecutor
updaters []resourceexecutor.ResourceUpdater
RecorderEvents []RecorderEvent
}

func (p *PodContext) RecordEvent(r record.EventRecorder, pod *corev1.Pod) {
// Noraml, Warning => RecordEvent
events := make(map[string]RecorderEvent)
for _, event := range p.RecorderEvents {
if !recutil.ValidateEventType(event.EventType) {
klog.Warningf("EventType is not valid %v", event)
continue
}

e := event
if _, ok := events[event.EventType]; ok {
e.MsgFmt += "-" + event.MsgFmt
e.Reason += "-" + event.Reason
}
events[event.EventType] = e
}

eventTypes := make([]string, 0, len(events))
for eventType := range events {
eventTypes = append(eventTypes, eventType)
}
sort.Strings(eventTypes)

for _, eventType := range eventTypes {
event := events[eventType]
r.Eventf(pod, eventType, event.Reason, event.MsgFmt)
}
}

func (p *PodResponse) ProxyDone(resp *runtimeapi.PodSandboxHookResponse) {
Expand Down
84 changes: 84 additions & 0 deletions pkg/koordlet/runtimehooks/protocol/pod_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (
"testing"

"github.com/containerd/nri/pkg/api"
"github.com/golang/mock/gomock"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/koordinator-sh/koordinator/apis/extension"
apiext "github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/testutil"
)

func TestPodContext_FromNri(t *testing.T) {
Expand Down Expand Up @@ -174,3 +177,84 @@ func TestPodContext_NriRemoveDone(t *testing.T) {
})
}
}

func TestPodContext_RecordEvent(t *testing.T) {
type fields struct {
Request PodRequest
Response PodResponse
executor resourceexecutor.ResourceUpdateExecutor
updaters []resourceexecutor.ResourceUpdater
RecorderEvents []RecorderEvent
}
type args struct {
pod *corev1.Pod
}
tests := []struct {
name string
fields fields
args args
}{
{
name: "EventType is not valid",
fields: fields{
Request: PodRequest{},
Response: PodResponse{},
executor: nil,
updaters: nil,
RecorderEvents: []RecorderEvent{
{EventType: "test"},
},
},
args: args{
pod: nil,
},
},
{
name: "EventType is valid",
fields: fields{
Request: PodRequest{},
Response: PodResponse{},
executor: nil,
updaters: nil,
RecorderEvents: []RecorderEvent{
{
HookName: "resctrl",
EventType: corev1.EventTypeNormal,
MsgFmt: "test",
Reason: "test",
},
{
HookName: "cpuset",
EventType: corev1.EventTypeNormal,
MsgFmt: "test",
Reason: "test",
},
{
HookName: "resctrl",
EventType: corev1.EventTypeWarning,
MsgFmt: "test",
Reason: "test",
},
},
},
},
}
pod := testutil.MockTestPod(apiext.QoSBE, "test_be_pod")
// env
ctl := gomock.NewController(t)
defer ctl.Finish()

fakeRecorder := &testutil.FakeRecorder{}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &PodContext{
Request: tt.fields.Request,
Response: tt.fields.Response,
executor: tt.fields.executor,
updaters: tt.fields.updaters,
RecorderEvents: tt.fields.RecorderEvents,
}
p.RecordEvent(fakeRecorder, pod)
})
}
}
2 changes: 2 additions & 0 deletions pkg/koordlet/runtimehooks/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strconv"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/v1/resource"

slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1"
Expand All @@ -33,6 +34,7 @@ type HooksProtocol interface {
ReconcilerDone(executor resourceexecutor.ResourceUpdateExecutor)
Update()
GetUpdaters() []resourceexecutor.ResourceUpdater
RecordEvent(r record.EventRecorder, pod *corev1.Pod)
}

type hooksProtocolBuilder struct {
Expand Down
5 changes: 5 additions & 0 deletions pkg/koordlet/runtimehooks/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
Expand Down Expand Up @@ -301,13 +302,15 @@ type Context struct {
StatesInformer statesinformer.StatesInformer
Executor resourceexecutor.ResourceUpdateExecutor
ReconcileInterval time.Duration
EventRecorder record.EventRecorder
}

func NewReconciler(ctx Context) Reconciler {
r := &reconciler{
podUpdated: make(chan struct{}, 1),
executor: ctx.Executor,
reconcileInterval: ctx.ReconcileInterval,
eventRecorder: ctx.EventRecorder,
}
// TODO register individual pod event
ctx.StatesInformer.RegisterCallbacks(statesinformer.RegisterTypeAllPods, "runtime-hooks-reconciler",
Expand All @@ -321,6 +324,7 @@ type reconciler struct {
podUpdated chan struct{}
executor resourceexecutor.ResourceUpdateExecutor
reconcileInterval time.Duration
eventRecorder record.EventRecorder
}

func (c *reconciler) Run(stopCh <-chan struct{}) error {
Expand Down Expand Up @@ -420,6 +424,7 @@ func (c *reconciler) reconcilePodCgroup(stopCh <-chan struct{}) {
klog.V(5).Infof("calling reconcile function %v for pod %v finished",
r.description, podMeta.Key())
}
podCtx.RecordEvent(c.eventRecorder, podMeta.Pod)
}

for resourceType, r := range globalCgroupReconcilers.sandboxContainerLevel {
Expand Down
11 changes: 10 additions & 1 deletion pkg/koordlet/runtimehooks/runtimehooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ package runtimehooks
import (
"fmt"

corev1 "k8s.io/api/core/v1"
apiruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"

"github.com/koordinator-sh/koordinator/pkg/features"
Expand Down Expand Up @@ -82,7 +87,10 @@ func (r *runtimeHook) Run(stopCh <-chan struct{}) error {
return nil
}

func NewRuntimeHook(si statesinformer.StatesInformer, cfg *Config) (RuntimeHook, error) {
func NewRuntimeHook(si statesinformer.StatesInformer, cfg *Config, schema *apiruntime.Scheme, kubeClient clientset.Interface, nodeName string) (RuntimeHook, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(schema, corev1.EventSource{Component: "koordlet-runtimehook", Host: nodeName})
failurePolicy, err := config.GetFailurePolicyType(cfg.RuntimeHooksFailurePolicy)
if err != nil {
return nil, err
Expand Down Expand Up @@ -136,6 +144,7 @@ func NewRuntimeHook(si statesinformer.StatesInformer, cfg *Config) (RuntimeHook,
StatesInformer: si,
Executor: e,
ReconcileInterval: cfg.RuntimeHookReconcileInterval,
EventRecorder: recorder,
}

newPluginOptions := hooks.Options{
Expand Down
7 changes: 6 additions & 1 deletion pkg/koordlet/runtimehooks/runtimehooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
apiruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"

"github.com/koordinator-sh/koordinator/pkg/features"
mockstatesinformer "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer/mockstatesinformer"
Expand Down Expand Up @@ -116,7 +118,10 @@ func Test_runtimeHook_Run(t *testing.T) {
defer ctrl.Finish()
si := mockstatesinformer.NewMockStatesInformer(ctrl)
si.EXPECT().RegisterCallbacks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
r, err := NewRuntimeHook(si, tt.fields.config)
scheme := apiruntime.NewScheme()
kubeClient := &kubernetes.Clientset{}
nodeName := "test-node"
r, err := NewRuntimeHook(si, tt.fields.config, scheme, kubeClient, nodeName)
assert.NoError(t, err)
stop := make(chan struct{})

Expand Down

0 comments on commit 0fa719d

Please sign in to comment.