diff --git a/Makefile b/Makefile index 6f171655..cb3cff1c 100644 --- a/Makefile +++ b/Makefile @@ -415,9 +415,9 @@ protoc-gen-go-grpc: ## Download protoc-gen-go-grpc locally if necessary. .PHONY: e2e-test e2e-test: - # KUBECONFIG must be set to the cluster, and PP needs to be deployed already + # KUBECONFIG must be set to the cluster, and SNR needs to be deployed already # count arg makes the test ignoring cached test results - go test ./e2e -ginkgo.vv -test.v -timeout 60m -count=1 ${TEST_OPS} + go test ./e2e -ginkgo.vv -test.v -timeout 80m -count=1 ${TEST_OPS} YQ = $(shell pwd)/bin/yq .PHONY: yq diff --git a/e2e/self_node_remediation_test.go b/e2e/self_node_remediation_test.go index 116229ac..b673b106 100644 --- a/e2e/self_node_remediation_test.go +++ b/e2e/self_node_remediation_test.go @@ -5,7 +5,6 @@ import ( "fmt" "os" "strconv" - "strings" "sync" "time" @@ -19,7 +18,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" - "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/medik8s/self-node-remediation/api/v1alpha1" @@ -28,219 +26,150 @@ import ( ) const ( - disconnectCommand = "ip route add blackhole %s" - reconnectCommand = "ip route delete blackhole %s" - nodeExecTimeout = 120 * time.Second - reconnectInterval = 300 * time.Second - skipLogsEnvVarName = "SKIP_LOG_VERIFICATION" + disconnectCommand = "ip route add blackhole %s" + reconnectCommand = "ip route delete blackhole %s" + + // unblock API server after this time + reconnectInterval = 300 * time.Second + + // time after which the pod should be deleted (respect API check duration!) + podDeletedTimeout = 5 * time.Minute + skipOOSREnvVarName = "SKIP_OOST_REMEDIATION_VERIFICATION" ) var _ = Describe("Self Node Remediation E2E", func() { - Describe("Workers Remediation", func() { - var node *v1.Node - workers := &v1.NodeList{} - var oldBootTime *time.Time - var oldUID types.UID - var apiIPs []string - - BeforeEach(func() { + var apiIPs []string + workerNodes := &v1.NodeList{} + controlPlaneNodes := &v1.NodeList{} - // get all things that doesn't change once only - if node == nil { - // get worker node(s) - selector := labels.NewSelector() - req, _ := labels.NewRequirement(commonlabels.WorkerRole, selection.Exists, []string{}) - selector = selector.Add(*req) - Expect(k8sClient.List(context.Background(), workers, &client.ListOptions{LabelSelector: selector})).ToNot(HaveOccurred()) - Expect(len(workers.Items)).To(BeNumerically(">=", 2)) - - node = &workers.Items[0] - oldUID = node.GetUID() - - apiIPs = getApiIPs() - } else { - // just update the node for getting the current UID - Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(node), node)).ToNot(HaveOccurred()) - oldUID = node.GetUID() - } + var nodeUnderTest *v1.Node + var oldBootTime *time.Time - var err error - oldBootTime, err = getBootTime(node) - Expect(err).ToNot(HaveOccurred()) + BeforeEach(func() { + if len(apiIPs) == 0 { + // init some common stuff - ensureSnrRunning(workers) - }) + // API IPs + apiIPs = getApiIPs() - AfterEach(func() { - // restart snr pods for resetting logs... - //restartSnrPods(workers) - }) + // Worker nodes + selector := labels.NewSelector() + req, _ := labels.NewRequirement(commonlabels.WorkerRole, selection.Exists, []string{}) + selector = selector.Add(*req) + Expect(k8sClient.List(context.Background(), workerNodes, &client.ListOptions{LabelSelector: selector})).ToNot(HaveOccurred()) + Expect(len(workerNodes.Items)).To(BeNumerically(">=", 2)) - JustAfterEach(func() { - // TODO why worker 1?? - //printSNRLogsFromNode(&workers.Items[1]) - }) + // Control plane nodes + selector = labels.NewSelector() + req, _ = labels.NewRequirement(commonlabels.ControlPlaneRole, selection.Exists, []string{}) + selector = selector.Add(*req) + Expect(k8sClient.List(context.Background(), controlPlaneNodes, &client.ListOptions{LabelSelector: selector})).To(Succeed()) + Expect(len(controlPlaneNodes.Items)).To(BeNumerically(">=", 2)) - Describe("With API connectivity", func() { - Context("creating a SNR", func() { - // normal remediation - // - create SNR - // - node should reboot - // - node should be deleted and re-created + } + }) - var snr *v1alpha1.SelfNodeRemediation - var remediationStrategy v1alpha1.RemediationStrategyType - JustBeforeEach(func() { - snr = createSNR(node, remediationStrategy) - }) + JustBeforeEach(func() { + var err error + oldBootTime, err = utils.GetBootTime(context.Background(), k8sClientSet, nodeUnderTest, testNamespace) + Expect(err).ToNot(HaveOccurred()) + }) - AfterEach(func() { - if snr != nil { - deleteAndWait(snr) - } - }) + verifyRemediationSucceeds := func(snr *v1alpha1.SelfNodeRemediation) { + checkPodDeleted(nodeUnderTest) + utils.CheckReboot(context.Background(), k8sClientSet, nodeUnderTest, oldBootTime, testNamespace) + // Simulate NHC deleting SNR + deleteAndWait(snr) + checkNoExecuteTaintRemoved(nodeUnderTest) + } - Context("Resource Deletion Strategy", func() { - var oldPodCreationTime time.Time + Describe("Workers Remediation", func() { - BeforeEach(func() { - remediationStrategy = v1alpha1.ResourceDeletionRemediationStrategy - oldPodCreationTime = findSnrPod(node).CreationTimestamp.Time - }) + BeforeEach(func() { + nodeUnderTest = &workerNodes.Items[0] + ensureSnrRunning(workerNodes) + }) - It("should delete pods and volume attachments", func() { - checkPodRecreated(node, oldPodCreationTime) - //Simulate NHC trying to delete SNR - deleteAndWait(snr) - snr = nil + Describe("With API connectivity", func() { + // normal remediation + // - create SNR + // - nodeUnderTest should reboot - checkNoExecuteTaintRemoved(node) - }) - }) + var snr *v1alpha1.SelfNodeRemediation + var remediationStrategy v1alpha1.RemediationStrategyType - Context("OutOfService Remediation Strategy", func() { - var oldPodCreationTime time.Time + JustBeforeEach(func() { + snr = createSNR(nodeUnderTest, remediationStrategy) + }) - BeforeEach(func() { - if _, isExist := os.LookupEnv(skipOOSREnvVarName); isExist { - Skip("Skip this test due to out-of-service taint not supported") - } - remediationStrategy = v1alpha1.OutOfServiceTaintRemediationStrategy - oldPodCreationTime = findSnrPod(node).CreationTimestamp.Time - }) + Context("Resource Deletion Strategy", func() { + BeforeEach(func() { + remediationStrategy = v1alpha1.ResourceDeletionRemediationStrategy + }) - It("should delete pods", func() { - checkPodRecreated(node, oldPodCreationTime) - //Simulate NHC trying to delete SNR - deleteAndWait(snr) - snr = nil + It("should delete pods and volume attachments", func() { + verifyRemediationSucceeds(snr) + }) + }) - checkNoExecuteTaintRemoved(node) - checkOutOfServiceTaintRemoved(node) - }) + Context("OutOfService Remediation Strategy", func() { + BeforeEach(func() { + if _, isExist := os.LookupEnv(skipOOSREnvVarName); isExist { + Skip("Skip this test due to out-of-service taint not supported") + } + remediationStrategy = v1alpha1.OutOfServiceTaintRemediationStrategy }) + It("should delete pods", func() { + verifyRemediationSucceeds(snr) + checkOutOfServiceTaintRemoved(nodeUnderTest) + }) }) }) Describe("Without API connectivity", func() { + var testStartTime *metav1.Time + BeforeEach(func() { + testStartTime = &metav1.Time{time.Now()} + }) + Context("Healthy node (no SNR)", func() { // no api connectivity // a) healthy - // - kill connectivity on one node + // - kill connectivity on one nodeUnderTest // - wait until connection restored - // - verify node did not reboot and wasn't deleted + // - verify nodeUnderTest did not reboot // - verify peer check did happen - var testStartTime *metav1.Time BeforeEach(func() { - testStartTime = &metav1.Time{time.Now()} - killApiConnection(node, apiIPs, true) - }) - - AfterEach(func() { - // nothing to do + killApiConnection(nodeUnderTest, apiIPs, true) }) - It("should not reboot and not re-create node", func() { - // order matters - // - because the 2nd check has a small timeout only - checkNoNodeRecreate(node, oldUID) - checkNoReboot(node, oldBootTime) - - //if _, isExist := os.LookupEnv(skipLogsEnvVarName); !isExist { - // check logs to make sure that the actual peer health check did run - checkSnrLogs(node, []string{"failed to check api server", "Peer told me I'm healthy."}, testStartTime) - //} + It("should not reboot", func() { + utils.CheckNoReboot(context.Background(), k8sClientSet, nodeUnderTest, oldBootTime, testNamespace) + checkSnrLogs(nodeUnderTest, []string{"failed to check api server", "Peer told me I'm healthy."}, testStartTime) }) }) - Context("Unhealthy node (with SNR)", func() { - - // no api connectivity - // b) unhealthy - // - kill connectivity on one node - // - create SNR - // - verify node does reboot and is deleted / re-created - - var snr *v1alpha1.SelfNodeRemediation - var oldPodCreationTime time.Time - var testStartTime *metav1.Time - - BeforeEach(func() { - testStartTime = &metav1.Time{time.Now()} - killApiConnection(node, apiIPs, false) - snr = createSNR(node, v1alpha1.ResourceDeletionRemediationStrategy) - oldPodCreationTime = findSnrPod(node).CreationTimestamp.Time - }) - - AfterEach(func() { - if snr != nil { - deleteAndWait(snr) - } - }) - - It("should reboot and delete node resources", func() { - // order matters - // - because node check works while api is disconnected from node, reboot check not - // - because the 2nd check has a small timeout only - checkReboot(node, oldBootTime) - checkPodRecreated(node, oldPodCreationTime) - //if _, isExist := os.LookupEnv(skipLogsEnvVarName); !isExist { - // we can't check logs of unhealthy node anymore, check peer logs - peer := &workers.Items[1] - checkSnrLogs(peer, []string{node.GetName(), "IsHealthy OWNED by NHC unhealthy"}, testStartTime) - //} - }) - - }) - Context("All nodes (no API connection for all)", func() { // no api connectivity // c) api issue // - kill connectivity on all nodes - // - verify node does not reboot and isn't deleted + // - verify nodeUnderTest does not reboot and isn't deleted - uids := make(map[string]types.UID) bootTimes := make(map[string]*time.Time) - var testStartTime *metav1.Time BeforeEach(func() { wg := sync.WaitGroup{} - testStartTime = &metav1.Time{time.Now()} - for i := range workers.Items { + for i := range workerNodes.Items { wg.Add(1) - worker := &workers.Items[i] - - // save old UID first - Expect(k8sClient.Get(context.Background(), client.ObjectKeyFromObject(worker), worker)).ToNot(HaveOccurred()) - uids[worker.GetName()] = worker.GetUID() + worker := &workerNodes.Items[i] // and the last boot time - t, err := getBootTime(worker) + t, err := utils.GetBootTime(context.Background(), k8sClientSet, worker, testNamespace) Expect(err).ToNot(HaveOccurred()) bootTimes[worker.GetName()] = t @@ -255,109 +184,78 @@ var _ = Describe("Self Node Remediation E2E", func() { time.Sleep(10 * time.Second) }) - AfterEach(func() { - // nothing to do - }) - It("should not have rebooted and not be re-created", func() { // all nodes should satisfy this test wg := sync.WaitGroup{} - for i := range workers.Items { + for i := range workerNodes.Items { wg.Add(1) - worker := &workers.Items[i] + worker := &workerNodes.Items[i] go func() { defer GinkgoRecover() defer wg.Done() - - // order matters - // - because the 2nd check has a small timeout only - checkNoNodeRecreate(worker, uids[worker.GetName()]) - checkNoReboot(worker, bootTimes[worker.GetName()]) - - //if _, isExist := os.LookupEnv(skipLogsEnvVarName); !isExist { - // check logs to make sure that the actual peer health check did run + utils.CheckNoReboot(context.Background(), k8sClientSet, worker, bootTimes[worker.GetName()], testNamespace) checkSnrLogs(worker, []string{"failed to check api server", "nodes couldn't access the api-server"}, testStartTime) - //} }() } wg.Wait() }) }) - }) - }) - Describe("Control Plane Remediation", func() { - controlPlaneNodes := &v1.NodeList{} - var controlPlaneNode *v1.Node + Context("Unhealthy node (with SNR)", func() { - BeforeEach(func() { + // no api connectivity + // b) unhealthy + // - kill connectivity on one nodeUnderTest + // - create SNR + // - verify nodeUnderTest does reboot - // get all things that doesn't change once only - if controlPlaneNode == nil { - // get worker node(s) - selector := labels.NewSelector() - req, _ := labels.NewRequirement(commonlabels.MasterRole, selection.Exists, []string{}) - selector = selector.Add(*req) - if err := k8sClient.List(context.Background(), controlPlaneNodes, &client.ListOptions{LabelSelector: selector}); err != nil && errors.IsNotFound(err) { - selector = labels.NewSelector() - req, _ = labels.NewRequirement(commonlabels.ControlPlaneRole, selection.Exists, []string{}) - selector = selector.Add(*req) - Expect(k8sClient.List(context.Background(), controlPlaneNodes, &client.ListOptions{LabelSelector: selector})).ToNot(HaveOccurred()) - } - Expect(len(controlPlaneNodes.Items)).To(BeNumerically(">=", 2)) - - controlPlaneNode = &controlPlaneNodes.Items[0] + var snr *v1alpha1.SelfNodeRemediation - } + BeforeEach(func() { + killApiConnection(nodeUnderTest, apiIPs, false) + snr = createSNR(nodeUnderTest, v1alpha1.ResourceDeletionRemediationStrategy) + }) - ensureSnrRunning(controlPlaneNodes) - }) + It("should reboot and delete node resources", func() { + verifyRemediationSucceeds(snr) + // we can't check logs of unhealthy nodeUnderTest anymore, check peer logs + peer := &workerNodes.Items[1] + checkSnrLogs(peer, []string{nodeUnderTest.GetName(), "IsHealthy OWNED by NHC unhealthy"}, testStartTime) + //} + }) + + }) - AfterEach(func() { - // restart snr pods for resetting logs... - //restartSnrPods(controlPlaneNodes) }) + }) - JustAfterEach(func() { - //printSNRLogsFromNode(&controlPlaneNodes.Items[1]) + Describe("Control Plane Remediation", func() { + BeforeEach(func() { + nodeUnderTest = &controlPlaneNodes.Items[0] + ensureSnrRunning(controlPlaneNodes) }) Describe("With API connectivity", func() { Context("creating a SNR", func() { // normal remediation // - create SNR - // - node should reboot - // - node should be deleted and re-created + // - nodeUnderTest should reboot var snr *v1alpha1.SelfNodeRemediation var remediationStrategy v1alpha1.RemediationStrategyType JustBeforeEach(func() { - snr = createSNR(controlPlaneNode, remediationStrategy) - }) - - AfterEach(func() { - if snr != nil { - deleteAndWait(snr) - } + snr = createSNR(nodeUnderTest, remediationStrategy) }) Context("Resource Deletion Strategy", func() { - var oldPodCreationTime time.Time - BeforeEach(func() { remediationStrategy = v1alpha1.ResourceDeletionRemediationStrategy - oldPodCreationTime = findSnrPod(controlPlaneNode).CreationTimestamp.Time }) It("should delete pods and volume attachments", func() { - checkPodRecreated(controlPlaneNode, oldPodCreationTime) - //Simulate NHC trying to delete SNR - deleteAndWait(snr) - snr = nil - - checkNoExecuteTaintRemoved(controlPlaneNode) + verifyRemediationSucceeds(snr) }) }) @@ -367,12 +265,11 @@ var _ = Describe("Self Node Remediation E2E", func() { }) }) -func checkPodRecreated(node *v1.Node, oldPodCreationTime time.Time) bool { - return EventuallyWithOffset(1, func() time.Time { +func checkPodDeleted(node *v1.Node) bool { + return EventuallyWithOffset(1, func() *metav1.Time { pod := findSnrPod(node) - return pod.CreationTimestamp.Time - - }, 7*time.Minute, 10*time.Second).Should(BeTemporally(">", oldPodCreationTime)) + return pod.DeletionTimestamp + }, podDeletedTimeout, 30*time.Second).ShouldNot(BeNil()) } func createSNR(node *v1.Node, remediationStrategy v1alpha1.RemediationStrategyType) *v1alpha1.SelfNodeRemediation { @@ -390,21 +287,6 @@ func createSNR(node *v1.Node, remediationStrategy v1alpha1.RemediationStrategyTy return snr } -func getBootTime(node *v1.Node) (*time.Time, error) { - var bootTime *time.Time - EventuallyWithOffset(1, func() error { - ctx, cancel := context.WithTimeout(context.Background(), nodeExecTimeout) - defer cancel() - var err error - bootTime, err = utils.GetBootTime(ctx, k8sClientSet, node.GetName(), testNamespace) - if err != nil { - return err - } - return nil - }, 15*time.Minute, 10*time.Second).ShouldNot(HaveOccurred(), "Could not get boot time on target node") - return bootTime, nil -} - func checkNoExecuteTaintRemoved(node *v1.Node) { By("checking if NoExecute taint was removed") checkTaintRemoved(node, controllers.NodeNoExecuteTaint) @@ -435,21 +317,6 @@ func checkTaintRemoved(node *v1.Node, taintToCheck *v1.Taint) { }, 1*time.Minute, 10*time.Second).Should(Succeed()) } -func checkReboot(node *v1.Node, oldBootTime *time.Time) { - By("checking reboot") - logger.Info("boot time", "old", oldBootTime) - // Note: short timeout only because this check runs after node re-create check, - // where already multiple minute were spent - EventuallyWithOffset(1, func() time.Time { - newBootTime, err := getBootTime(node) - if err != nil { - return time.Time{} - } - logger.Info("boot time", "new", newBootTime) - return *newBootTime - }, 7*time.Minute, 10*time.Second).Should(BeTemporally(">", *oldBootTime)) -} - func killApiConnection(node *v1.Node, apiIPs []string, withReconnect bool) { msg := fmt.Sprintf("killing api connectivity on NODE: %s and API ep: %v", node.Name, apiIPs) By(msg) @@ -460,25 +327,8 @@ func killApiConnection(node *v1.Node, apiIPs []string, withReconnect bool) { script += composeScript(reconnectCommand, apiIPs) } - var ctx context.Context - var cancel context.CancelFunc - if withReconnect { - ctx, cancel = context.WithTimeout(context.Background(), reconnectInterval+nodeExecTimeout) - } else { - ctx, cancel = context.WithTimeout(context.Background(), nodeExecTimeout) - } - defer cancel() - pod := findSnrPod(node) - _, err := utils.RunCommandInPod(ctx, k8sClientSet, pod, script) - - if withReconnect { - //in case the sleep didn't work - deadline, _ := ctx.Deadline() - EventuallyWithOffset(1, func() bool { - return time.Now().After(deadline) - }, reconnectInterval+nodeExecTimeout+time.Second, 1*time.Second).Should(BeTrue()) - } + _, err := utils.RunCommandInPod(context.Background(), k8sClientSet, pod, script) // deadline exceeded is ok... the command does not return because of the killed connection Expect(err).To( @@ -502,30 +352,6 @@ func composeScript(commandTemplate string, ips []string) string { return script } -func checkNoNodeRecreate(node *v1.Node, oldUID types.UID) { - By("checking if node was recreated") - logger.Info("UID", "old", oldUID) - ExpectWithOffset(1, k8sClient.Get(context.Background(), client.ObjectKeyFromObject(node), node)).ToNot(HaveOccurred()) - Expect(node.UID).To(Equal(oldUID)) -} - -func checkNoReboot(node *v1.Node, oldBootTime *time.Time) { - By("checking no reboot") - logger.Info("boot time", "old", oldBootTime) - // Note: short timeout because this check runs after api connection was restored, - // and multiple minutes were spent already on this test - // we still need Eventually because getting the boot time might still fail after fiddling with api connectivity - EventuallyWithOffset(1, func() time.Time { - newBootTime, err := getBootTime(node) - if err != nil { - logger.Error(err, "failed to get boot time, might retry") - return time.Time{} - } - logger.Info("boot time", "new", newBootTime) - return *newBootTime - }, 5*time.Minute, 10*time.Second).Should(BeTemporally("==", *oldBootTime)) -} - func checkSnrLogs(node *v1.Node, expected []string, since *metav1.Time) { By("checking logs") pod := findSnrPod(node) @@ -552,66 +378,30 @@ func findSnrPod(node *v1.Node) *v1.Pod { var snrPod *v1.Pod EventuallyWithOffset(2, func() bool { pods := &v1.PodList{} - err := k8sClient.List(context.Background(), pods) + listOptions := &client.ListOptions{ + LabelSelector: labels.SelectorFromSet(labels.Set{ + "app.kubernetes.io/name": "self-node-remediation", + "app.kubernetes.io/component": "agent", + }), + } + err := k8sClient.List(context.Background(), pods, listOptions) if err != nil && !errors.IsNotFound(err) { logger.Error(err, "failed to list pods") return false } for i := range pods.Items { pod := pods.Items[i] - if strings.HasPrefix(pod.GetName(), "self-node-remediation-ds") && pod.Spec.NodeName == node.GetName() { + if pod.Spec.NodeName == node.GetName() { snrPod = &pod return true } } return false - }, 9*time.Minute, 10*time.Second).Should(BeTrue(), "didn't find SNR pod") + }, 10*time.Minute, 30*time.Second).Should(BeTrue(), "didn't find SNR pod") return snrPod } -func restartSnrPods(nodes *v1.NodeList) { - wg := sync.WaitGroup{} - for i := range nodes.Items { - wg.Add(1) - node := &nodes.Items[i] - go func() { - defer GinkgoRecover() - defer wg.Done() - restartSnrPod(node) - }() - } - wg.Wait() -} - -func restartSnrPod(node *v1.Node) { - By("restarting snr pod for resetting logs") - pod := findSnrPod(node) - ExpectWithOffset(1, pod).ToNot(BeNil()) - - //no need to restart the pod - for _, cond := range pod.Status.Conditions { - if cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue { - return - } - } - - oldPodUID := pod.GetUID() - - deleteAndWait(pod) - - // wait for restart - var newPod *v1.Pod - EventuallyWithOffset(1, func() types.UID { - newPod = findSnrPod(node) - if newPod == nil { - return oldPodUID - } - return newPod.GetUID() - }, 2*time.Minute, 10*time.Second).ShouldNot(Equal(oldPodUID)) - - utils.WaitForPodReady(k8sClient, newPod) -} - +// getApiIPs gets the IP address(es) of the default/kubernetes service, which is used for API server connections func getApiIPs() []string { key := client.ObjectKey{ Namespace: "default", @@ -659,15 +449,3 @@ func ensureSnrRunning(nodes *v1.NodeList) { } wg.Wait() } - -//func printSNRLogsFromNode(node *v1.Node) { -// By("printing self node remediation log of healthy node") -// pod := findSnrPod(node) -// logs, err := utils.GetLogs(k8sClientSet, pod) -// Expect(err).ToNot(HaveOccurred()) -// logger.Info("BEGIN logs of healthy self-node-remediation pod", "name", pod.GetName()) -// for _, line := range strings.Split(logs, "\n") { -// logger.Info(line) -// } -// logger.Info("END logs of healthy self-node-remediation pod", "name", pod.GetName()) -//} diff --git a/e2e/utils/command.go b/e2e/utils/command.go index 869c142f..9138d583 100644 --- a/e2e/utils/command.go +++ b/e2e/utils/command.go @@ -5,10 +5,12 @@ import ( "context" "fmt" "os" - ctrl "sigs.k8s.io/controller-runtime" "strings" "time" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -16,15 +18,64 @@ import ( "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/remotecommand" "k8s.io/utils/pointer" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client/config" ) +const ( + // this is time need to execute a command on the node, including potentially pod creation time + nodeExecTimeout = 300 * time.Second + + // timeout for waiting for pod ready + podReadyTimeout = 120 * time.Second + + // additional timeout (after podDeletedTimeout) when the node should be rebooted + nodeRebootedTimeout = 10 * time.Minute +) + var ( log = ctrl.Log.WithName("testutils") ) +func CheckReboot(ctx context.Context, c *kubernetes.Clientset, node *corev1.Node, oldBootTime *time.Time, testNamespace string) { + By("checking reboot") + log.Info("boot time", "old", oldBootTime) + EventuallyWithOffset(1, func() time.Time { + newBootTime, err := getBootTime(ctx, c, node.GetName(), testNamespace) + if err != nil { + return time.Time{} + } + log.Info("boot time", "new", newBootTime) + return *newBootTime + }, nodeRebootedTimeout, 1*time.Minute).Should(BeTemporally(">", *oldBootTime)) +} + +func CheckNoReboot(ctx context.Context, c *kubernetes.Clientset, node *corev1.Node, oldBootTime *time.Time, testNamespace string) { + By("checking no reboot") + log.Info("boot time", "old", oldBootTime) + ConsistentlyWithOffset(1, func() time.Time { + newBootTime, err := getBootTime(ctx, c, node.GetName(), testNamespace) + if err != nil { + log.Error(err, "failed to get boot time, might retry") + return time.Time{} + } + log.Info("boot time", "new", newBootTime) + return *newBootTime + }, nodeRebootedTimeout, 1*time.Minute).Should(BeTemporally("==", *oldBootTime)) +} + // GetBootTime gets the boot time of the given node by running a pod on it executing uptime command -func GetBootTime(ctx context.Context, c *kubernetes.Clientset, nodeName string, ns string) (*time.Time, error) { +func GetBootTime(ctx context.Context, c *kubernetes.Clientset, node *corev1.Node, testNamespace string) (*time.Time, error) { + var bootTime *time.Time + EventuallyWithOffset(1, func() error { + var err error + bootTime, err = getBootTime(ctx, c, node.GetName(), testNamespace) + return err + }, nodeExecTimeout, 30*time.Second).ShouldNot(HaveOccurred(), "Could not get boot time on target node") + return bootTime, nil +} + +func getBootTime(ctx context.Context, c *kubernetes.Clientset, nodeName string, ns string) (*time.Time, error) { output, err := RunCommandInCluster(ctx, c, nodeName, ns, "dnf install procps -y >/dev/null 2>&1 && uptime -s") if err != nil { return nil, err @@ -48,7 +99,7 @@ func RunCommandInCluster(ctx context.Context, c *kubernetes.Clientset, nodeName return "", err } - err = waitForCondition(ctx, c, pod, corev1.PodReady, corev1.ConditionTrue, 3*time.Minute) + err = waitForCondition(ctx, c, pod, corev1.PodReady, corev1.ConditionTrue, podReadyTimeout) if err != nil { return "", err } @@ -69,17 +120,15 @@ func RunCommandInPod(ctx context.Context, c *kubernetes.Clientset, pod *corev1.P func waitForPodOutput(ctx context.Context, c *kubernetes.Clientset, pod *corev1.Pod, command []string) ([]byte, error) { var out []byte - if err := wait.PollImmediate(15*time.Second, time.Minute, func() (done bool, err error) { + if err := wait.PollImmediateWithContext(ctx, 15*time.Second, time.Minute, func(ctx context.Context) (done bool, err error) { out, err = execCommandOnPod(ctx, c, pod, command) if err != nil { return false, err } - return len(out) != 0, nil }); err != nil { return nil, err } - return out, nil } @@ -113,7 +162,7 @@ func execCommandOnPod(ctx context.Context, c *kubernetes.Clientset, pod *corev1. return nil, err } - err = exec.Stream(remotecommand.StreamOptions{ + err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ Stdin: os.Stdin, Stdout: &outputBuf, Stderr: &errorBuf, @@ -132,7 +181,7 @@ func execCommandOnPod(ctx context.Context, c *kubernetes.Clientset, pod *corev1. // waitForCondition waits until the pod will have specified condition type with the expected status func waitForCondition(ctx context.Context, c *kubernetes.Clientset, pod *corev1.Pod, conditionType corev1.PodConditionType, conditionStatus corev1.ConditionStatus, timeout time.Duration) error { - return wait.PollImmediate(time.Second, timeout, func() (bool, error) { + return wait.PollImmediateWithContext(ctx, time.Second, timeout, func(ctx context.Context) (bool, error) { updatedPod := &corev1.Pod{} var err error if updatedPod, err = c.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}); err != nil { @@ -171,9 +220,16 @@ func getPod(nodeName string) *corev1.Pod { SecurityContext: &corev1.SecurityContext{ Privileged: pointer.Bool(true), }, - Command: []string{"sleep", "2m"}, + Command: []string{"sleep", "10m"}, + }, + }, + Tolerations: []corev1.Toleration{ + { + Effect: corev1.TaintEffectNoExecute, + Operator: corev1.TolerationOpExists, }, }, + TerminationGracePeriodSeconds: pointer.Int64(600), }, } } diff --git a/e2e/utils/pod.go b/e2e/utils/pod.go index 4784c4d0..b7854e15 100644 --- a/e2e/utils/pod.go +++ b/e2e/utils/pod.go @@ -41,5 +41,5 @@ func WaitForPodReady(c client.Client, pod *corev1.Pod) { } } return corev1.ConditionUnknown - }, 20*time.Minute, 10*time.Second).Should(Equal(corev1.ConditionTrue), "pod did not get ready in time") + }, 10*time.Minute, 10*time.Second).Should(Equal(corev1.ConditionTrue), "pod did not get ready in time") } diff --git a/pkg/peerhealth/peerhealth.pb.go b/pkg/peerhealth/peerhealth.pb.go index e9701175..6d86fdbe 100644 --- a/pkg/peerhealth/peerhealth.pb.go +++ b/pkg/peerhealth/peerhealth.pb.go @@ -7,11 +7,10 @@ package peerhealth import ( - reflect "reflect" - sync "sync" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" ) const ( diff --git a/pkg/peerhealth/peerhealth_grpc.pb.go b/pkg/peerhealth/peerhealth_grpc.pb.go index 4c1c8a58..cf883900 100644 --- a/pkg/peerhealth/peerhealth_grpc.pb.go +++ b/pkg/peerhealth/peerhealth_grpc.pb.go @@ -4,7 +4,6 @@ package peerhealth import ( context "context" - grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status"