Skip to content

Commit

Permalink
feat: Implement paging on list operations
Browse files Browse the repository at this point in the history
  • Loading branch information
TwiN committed Oct 5, 2022
1 parent 4c127cb commit 5b7b6da
Showing 1 changed file with 39 additions and 26 deletions.
65 changes: 39 additions & 26 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/xhit/go-str2duration/v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
Expand All @@ -23,6 +24,8 @@ const (
ExecutionTimeout = 20 * time.Minute // Maximum time for each reconciliation before timing out
ExecutionInterval = 5 * time.Minute // Interval between each reconciliation
ThrottleDuration = 50 * time.Millisecond // Duration to sleep for throttling purposes

ListLimit = 500 // Maximum number of items to list at once
)

var (
Expand Down Expand Up @@ -109,39 +112,49 @@ func DoReconcile(dynamicClient dynamic.Interface, resources []*metav1.APIResourc
}
// List all items under the resource
gvr.Resource = apiResource.Name
list, err := dynamicClient.Resource(gvr).List(context.TODO(), metav1.ListOptions{TimeoutSeconds: &listTimeoutSeconds})
if err != nil {
log.Printf("Error checking %s from %s: %s", gvr.Resource, gvr.GroupVersion(), err)
continue
}
if debug {
log.Println("Checking", len(list.Items), gvr.Resource, "from", gvr.GroupVersion())
}
for _, item := range list.Items {
ttl, exists := item.GetAnnotations()[AnnotationTTL]
if !exists {
continue
}
ttlInDuration, err := str2duration.ParseDuration(ttl)
var list *unstructured.UnstructuredList
var continueToken string
var err error
for list == nil || continueToken != "" {
list, err = dynamicClient.Resource(gvr).List(context.TODO(), metav1.ListOptions{TimeoutSeconds: &listTimeoutSeconds, Continue: continueToken, Limit: ListLimit})
if err != nil {
log.Printf("[%s/%s] has an invalid TTL '%s': %s\n", apiResource.Name, item.GetName(), ttl, err)
log.Printf("Error checking %s from %s: %s", gvr.Resource, gvr.GroupVersion(), err)
continue
}
ttlExpired := time.Now().After(item.GetCreationTimestamp().Add(ttlInDuration))
if ttlExpired {
log.Printf("[%s/%s] is configured with a TTL of %s, which means it has expired %s ago", apiResource.Name, item.GetName(), ttl, time.Since(item.GetCreationTimestamp().Add(ttlInDuration)).Round(time.Second))
err := dynamicClient.Resource(gvr).Namespace(item.GetNamespace()).Delete(context.TODO(), item.GetName(), metav1.DeleteOptions{})
if list != nil {
continueToken = list.GetContinue()
}
if debug {
log.Println("Checking", len(list.Items), gvr.Resource, "from", gvr.GroupVersion())
}
for _, item := range list.Items {
ttl, exists := item.GetAnnotations()[AnnotationTTL]
if !exists {
continue
}
ttlInDuration, err := str2duration.ParseDuration(ttl)
if err != nil {
log.Printf("[%s/%s] failed to delete: %s\n", apiResource.Name, item.GetName(), err)
// XXX: Should we retry with GracePeriodSeconds set to &0 to force immediate deletion after the first attempt failed?
log.Printf("[%s/%s] has an invalid TTL '%s': %s\n", apiResource.Name, item.GetName(), ttl, err)
continue
}
ttlExpired := time.Now().After(item.GetCreationTimestamp().Add(ttlInDuration))
if ttlExpired {
log.Printf("[%s/%s] is configured with a TTL of %s, which means it has expired %s ago", apiResource.Name, item.GetName(), ttl, time.Since(item.GetCreationTimestamp().Add(ttlInDuration)).Round(time.Second))
err := dynamicClient.Resource(gvr).Namespace(item.GetNamespace()).Delete(context.TODO(), item.GetName(), metav1.DeleteOptions{})
if err != nil {
log.Printf("[%s/%s] failed to delete: %s\n", apiResource.Name, item.GetName(), err)
// XXX: Should we retry with GracePeriodSeconds set to &0 to force immediate deletion after the first attempt failed?
} else {
log.Printf("[%s/%s] deleted", apiResource.Name, item.GetName())
}
// Cool off a tiny bit to avoid hitting the API too often
time.Sleep(ThrottleDuration)
} else {
log.Printf("[%s/%s] deleted", apiResource.Name, item.GetName())
log.Printf("[%s/%s] is configured with a TTL of %s, which means it will expire in %s", apiResource.Name, item.GetName(), ttl, time.Until(item.GetCreationTimestamp().Add(ttlInDuration)).Round(time.Second))
}
// Cool off a tiny bit to avoid hitting the API too often
time.Sleep(ThrottleDuration)
} else {
log.Printf("[%s/%s] is configured with a TTL of %s, which means it will expire in %s", apiResource.Name, item.GetName(), ttl, time.Until(item.GetCreationTimestamp().Add(ttlInDuration)).Round(time.Second))
}
// Cool off a tiny bit to avoid hitting the API too often
time.Sleep(ThrottleDuration)
}
// Cool off a tiny bit to avoid hitting the API too often
time.Sleep(ThrottleDuration)
Expand Down

0 comments on commit 5b7b6da

Please sign in to comment.