diff --git a/go.mod b/go.mod index 531d4c635..e64872b35 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.0.0 k8s.io/api v0.29.0 k8s.io/apimachinery v0.29.0 - k8s.io/client-go v1.5.2 + k8s.io/client-go v0.29.0 k8s.io/klog v1.0.0 k8s.io/klog/v2 v2.110.1 k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect diff --git a/pkg/k8sclient/k8sclient.go b/pkg/k8sclient/k8sclient.go index 83f306f80..10fa71a8c 100644 --- a/pkg/k8sclient/k8sclient.go +++ b/pkg/k8sclient/k8sclient.go @@ -59,7 +59,9 @@ type NoK8sNetworkError struct { // ClientInfo contains information given from k8s client type ClientInfo struct { Client kubernetes.Interface + WatchClient kubernetes.Interface NetClient netclient.Interface + NetWatchClient netclient.Interface EventBroadcaster record.EventBroadcaster EventRecorder record.EventRecorder diff --git a/pkg/k8sclient/kubeconfig.go b/pkg/k8sclient/kubeconfig.go index b9ca1b402..14fe80d91 100644 --- a/pkg/k8sclient/kubeconfig.go +++ b/pkg/k8sclient/kubeconfig.go @@ -228,8 +228,19 @@ func newClientInfo(config *rest.Config) (*ClientInfo, error) { if err != nil { return nil, err } + netClient, err := netclient.NewForConfig(config) + if err != nil { + return nil, err + } - netclient, err := netclient.NewForConfig(config) + watchConfig := rest.CopyConfig(config) + // Do not set timeout for watches and delegate timeout to client-go + watchConfig.Timeout = 0 + watchClient, err := kubernetes.NewForConfig(watchConfig) + if err != nil { + return nil, err + } + netWatchClient, err := netclient.NewForConfig(watchConfig) if err != nil { return nil, err } @@ -240,7 +251,9 @@ func newClientInfo(config *rest.Config) (*ClientInfo, error) { recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "multus"}) return &ClientInfo{ Client: client, - NetClient: netclient, + WatchClient: watchClient, + NetClient: netClient, + NetWatchClient: netWatchClient, EventBroadcaster: broadcaster, EventRecorder: recorder, }, nil diff --git a/pkg/multus/multus_cni100_test.go b/pkg/multus/multus_cni100_test.go index 7c4f7013c..fa04755dc 100644 --- a/pkg/multus/multus_cni100_test.go +++ b/pkg/multus/multus_cni100_test.go @@ -28,6 +28,7 @@ import ( cni100 "github.com/containernetworking/cni/pkg/types/100" "github.com/containernetworking/plugins/pkg/ns" "github.com/containernetworking/plugins/pkg/testutils" + "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/k8sclient" "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/logging" testhelpers "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/testing" diff --git a/pkg/multus/multus_suite_test.go b/pkg/multus/multus_suite_test.go index a9ad866f2..330c848f0 100644 --- a/pkg/multus/multus_suite_test.go +++ b/pkg/multus/multus_suite_test.go @@ -31,10 +31,11 @@ import ( cni100 "github.com/containernetworking/cni/pkg/types/100" cniversion "github.com/containernetworking/cni/pkg/version" netfake "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned/fake" - "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/k8sclient" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/record" + "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/k8sclient" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) @@ -225,6 +226,8 @@ func (f *fakeExec) FindInPath(plugin string, paths []string) (string, error) { // NewFakeClientInfo returns fake client (just for testing) func NewFakeClientInfo() *k8sclient.ClientInfo { return &k8sclient.ClientInfo{ + // We use watch clients to avoid reconnections in fixed intervals in production, though we do not + // distinguish between non-watch clients and watch clients in tests for simplicity Client: fake.NewSimpleClientset(), NetClient: netfake.NewSimpleClientset(), EventRecorder: record.NewFakeRecorder(10), diff --git a/pkg/server/server.go b/pkg/server/server.go index d469af23a..bf62927c9 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -158,10 +158,10 @@ func informerObjectTrim(obj interface{}) (interface{}, error) { return obj, nil } -func newNetDefInformer(netdefClient netdefclient.Interface) (netdefinformer.SharedInformerFactory, cache.SharedIndexInformer) { +func newNetDefInformer(netWatchClient netdefclient.Interface) (netdefinformer.SharedInformerFactory, cache.SharedIndexInformer) { const resyncInterval time.Duration = 1 * time.Second - informerFactory := netdefinformer.NewSharedInformerFactoryWithOptions(netdefClient, resyncInterval) + informerFactory := netdefinformer.NewSharedInformerFactoryWithOptions(netWatchClient, resyncInterval) netdefInformer := informerFactory.InformerFor(&netdefv1.NetworkAttachmentDefinition{}, func(client netdefclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return netdefinformerv1.NewNetworkAttachmentDefinitionInformer( client, @@ -173,7 +173,7 @@ func newNetDefInformer(netdefClient netdefclient.Interface) (netdefinformer.Shar return informerFactory, netdefInformer } -func newPodInformer(kubeClient kubernetes.Interface, nodeName string) (internalinterfaces.SharedInformerFactory, cache.SharedIndexInformer) { +func newPodInformer(watchClient kubernetes.Interface, nodeName string) (internalinterfaces.SharedInformerFactory, cache.SharedIndexInformer) { var tweakFunc internalinterfaces.TweakListOptionsFunc if nodeName != "" { logging.Verbosef("Filtering pod watch for node %q", nodeName) @@ -185,7 +185,7 @@ func newPodInformer(kubeClient kubernetes.Interface, nodeName string) (internali const resyncInterval time.Duration = 1 * time.Second - informerFactory := informerfactory.NewSharedInformerFactoryWithOptions(kubeClient, resyncInterval, informerfactory.WithTransform(informerObjectTrim)) + informerFactory := informerfactory.NewSharedInformerFactoryWithOptions(watchClient, resyncInterval, informerfactory.WithTransform(informerObjectTrim)) podInformer := informerFactory.InformerFor(&kapi.Pod{}, func(c kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return v1coreinformers.NewFilteredPodInformer( c, @@ -194,7 +194,6 @@ func newPodInformer(kubeClient kubernetes.Interface, nodeName string) (internali cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, tweakFunc) }) - return informerFactory, podInformer } @@ -255,8 +254,8 @@ func NewCNIServer(daemonConfig *ControllerNetConf, serverConfig []byte, ignoreRe } func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, servConfig []byte, ignoreReadinessIndicator bool) (*Server, error) { - informerFactory, podInformer := newPodInformer(kubeClient.Client, os.Getenv("MULTUS_NODE_NAME")) - netdefInformerFactory, netdefInformer := newNetDefInformer(kubeClient.NetClient) + podInformerFactory, podInformer := newPodInformer(kubeClient.WatchClient, os.Getenv("MULTUS_NODE_NAME")) + netdefInformerFactory, netdefInformer := newNetDefInformer(kubeClient.NetWatchClient) kubeClient.SetK8sClientInformers(podInformer, netdefInformer) router := http.NewServeMux() @@ -277,7 +276,7 @@ func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, s []string{"handler", "code", "method"}, ), }, - informerFactory: informerFactory, + podInformerFactory: podInformerFactory, podInformer: podInformer, netdefInformerFactory: netdefInformerFactory, netdefInformer: netdefInformer, @@ -356,7 +355,7 @@ func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, s // Start starts the server and begins serving on the given listener func (s *Server) Start(ctx context.Context, l net.Listener) { - s.informerFactory.Start(ctx.Done()) + s.podInformerFactory.Start(ctx.Done()) s.netdefInformerFactory.Start(ctx.Done()) // Give the initial sync some time to complete in large clusters, but diff --git a/pkg/server/types.go b/pkg/server/types.go index 81d4d6819..225877d12 100644 --- a/pkg/server/types.go +++ b/pkg/server/types.go @@ -54,7 +54,7 @@ type Server struct { exec invoke.Exec serverConfig []byte metrics *Metrics - informerFactory internalinterfaces.SharedInformerFactory + podInformerFactory internalinterfaces.SharedInformerFactory podInformer cache.SharedIndexInformer netdefInformerFactory netdefinformer.SharedInformerFactory netdefInformer cache.SharedIndexInformer diff --git a/vendor/modules.txt b/vendor/modules.txt index 26cb7a525..6a2a152dc 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -478,7 +478,7 @@ k8s.io/apimachinery/pkg/version k8s.io/apimachinery/pkg/watch k8s.io/apimachinery/third_party/forked/golang/json k8s.io/apimachinery/third_party/forked/golang/reflect -# k8s.io/client-go v1.5.2 => k8s.io/client-go v0.29.0 +# k8s.io/client-go v0.29.0 ## explicit; go 1.21 k8s.io/client-go/applyconfigurations/admissionregistration/v1 k8s.io/client-go/applyconfigurations/admissionregistration/v1alpha1