From 5b7b6dae97ce963d7c4b02646a64092797384010 Mon Sep 17 00:00:00 2001 From: TwiN Date: Tue, 4 Oct 2022 20:13:54 -0400 Subject: [PATCH] feat: Implement paging on list operations --- main.go | 65 ++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 39 insertions(+), 26 deletions(-) diff --git a/main.go b/main.go index 6c839f1..aca6de2 100644 --- a/main.go +++ b/main.go @@ -11,6 +11,7 @@ import ( "github.com/xhit/go-str2duration/v2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" @@ -23,6 +24,8 @@ const ( ExecutionTimeout = 20 * time.Minute // Maximum time for each reconciliation before timing out ExecutionInterval = 5 * time.Minute // Interval between each reconciliation ThrottleDuration = 50 * time.Millisecond // Duration to sleep for throttling purposes + + ListLimit = 500 // Maximum number of items to list at once ) var ( @@ -109,39 +112,49 @@ func DoReconcile(dynamicClient dynamic.Interface, resources []*metav1.APIResourc } // List all items under the resource gvr.Resource = apiResource.Name - list, err := dynamicClient.Resource(gvr).List(context.TODO(), metav1.ListOptions{TimeoutSeconds: &listTimeoutSeconds}) - if err != nil { - log.Printf("Error checking %s from %s: %s", gvr.Resource, gvr.GroupVersion(), err) - continue - } - if debug { - log.Println("Checking", len(list.Items), gvr.Resource, "from", gvr.GroupVersion()) - } - for _, item := range list.Items { - ttl, exists := item.GetAnnotations()[AnnotationTTL] - if !exists { - continue - } - ttlInDuration, err := str2duration.ParseDuration(ttl) + var list *unstructured.UnstructuredList + var continueToken string + var err error + for list == nil || continueToken != "" { + list, err = dynamicClient.Resource(gvr).List(context.TODO(), metav1.ListOptions{TimeoutSeconds: &listTimeoutSeconds, Continue: continueToken, Limit: ListLimit}) if err != nil { - log.Printf("[%s/%s] has an invalid TTL '%s': %s\n", apiResource.Name, item.GetName(), ttl, err) + log.Printf("Error checking %s from %s: %s", gvr.Resource, gvr.GroupVersion(), err) continue } - ttlExpired := time.Now().After(item.GetCreationTimestamp().Add(ttlInDuration)) - if ttlExpired { - log.Printf("[%s/%s] is configured with a TTL of %s, which means it has expired %s ago", apiResource.Name, item.GetName(), ttl, time.Since(item.GetCreationTimestamp().Add(ttlInDuration)).Round(time.Second)) - err := dynamicClient.Resource(gvr).Namespace(item.GetNamespace()).Delete(context.TODO(), item.GetName(), metav1.DeleteOptions{}) + if list != nil { + continueToken = list.GetContinue() + } + if debug { + log.Println("Checking", len(list.Items), gvr.Resource, "from", gvr.GroupVersion()) + } + for _, item := range list.Items { + ttl, exists := item.GetAnnotations()[AnnotationTTL] + if !exists { + continue + } + ttlInDuration, err := str2duration.ParseDuration(ttl) if err != nil { - log.Printf("[%s/%s] failed to delete: %s\n", apiResource.Name, item.GetName(), err) - // XXX: Should we retry with GracePeriodSeconds set to &0 to force immediate deletion after the first attempt failed? + log.Printf("[%s/%s] has an invalid TTL '%s': %s\n", apiResource.Name, item.GetName(), ttl, err) + continue + } + ttlExpired := time.Now().After(item.GetCreationTimestamp().Add(ttlInDuration)) + if ttlExpired { + log.Printf("[%s/%s] is configured with a TTL of %s, which means it has expired %s ago", apiResource.Name, item.GetName(), ttl, time.Since(item.GetCreationTimestamp().Add(ttlInDuration)).Round(time.Second)) + err := dynamicClient.Resource(gvr).Namespace(item.GetNamespace()).Delete(context.TODO(), item.GetName(), metav1.DeleteOptions{}) + if err != nil { + log.Printf("[%s/%s] failed to delete: %s\n", apiResource.Name, item.GetName(), err) + // XXX: Should we retry with GracePeriodSeconds set to &0 to force immediate deletion after the first attempt failed? + } else { + log.Printf("[%s/%s] deleted", apiResource.Name, item.GetName()) + } + // Cool off a tiny bit to avoid hitting the API too often + time.Sleep(ThrottleDuration) } else { - log.Printf("[%s/%s] deleted", apiResource.Name, item.GetName()) + log.Printf("[%s/%s] is configured with a TTL of %s, which means it will expire in %s", apiResource.Name, item.GetName(), ttl, time.Until(item.GetCreationTimestamp().Add(ttlInDuration)).Round(time.Second)) } - // Cool off a tiny bit to avoid hitting the API too often - time.Sleep(ThrottleDuration) - } else { - log.Printf("[%s/%s] is configured with a TTL of %s, which means it will expire in %s", apiResource.Name, item.GetName(), ttl, time.Until(item.GetCreationTimestamp().Add(ttlInDuration)).Round(time.Second)) } + // Cool off a tiny bit to avoid hitting the API too often + time.Sleep(ThrottleDuration) } // Cool off a tiny bit to avoid hitting the API too often time.Sleep(ThrottleDuration)