diff --git a/pkg/eni/local.go b/pkg/eni/local.go index 04f3b492..da1f4eb4 100644 --- a/pkg/eni/local.go +++ b/pkg/eni/local.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/samber/lo" "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/cache" @@ -122,7 +123,7 @@ type Local struct { batchSize int cap int - allocatingV4, allocatingV6 int + allocatingV4, allocatingV6 AllocatingRequests eni *daemon.ENI ipAllocInhibitExpireAt time.Time @@ -375,30 +376,30 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR return nil, nil } - lo, ok := request.(*LocalIPRequest) + localIPRequest, ok := request.(*LocalIPRequest) if !ok { return nil, []Trace{{Condition: ResourceTypeMismatch}} } - if lo.NetworkInterfaceID != "" && l.eni != nil && l.eni.ID != lo.NetworkInterfaceID { + if localIPRequest.NetworkInterfaceID != "" && l.eni != nil && l.eni.ID != localIPRequest.NetworkInterfaceID { return nil, []Trace{{Condition: NetworkInterfaceMismatch}} } log := logf.FromContext(ctx) - log.Info(fmt.Sprintf("local request %v", lo)) + log.Info(fmt.Sprintf("local request %v", localIPRequest)) expectV4 := 0 expectV6 := 0 if l.enableIPv4 { - if lo.NoCache { - if len(l.ipv4)+l.allocatingV4 >= l.cap { + if localIPRequest.NoCache { + if len(l.ipv4)+l.allocatingV4.Len() >= l.cap { return nil, []Trace{{Condition: Full}} } expectV4 = 1 } else { ipv4 := l.ipv4.PeekAvailable(cni.PodID) - if ipv4 == nil && len(l.ipv4)+l.allocatingV4 >= l.cap { + if ipv4 == nil && len(l.ipv4)+l.allocatingV4.Len() >= l.cap { return nil, []Trace{{Condition: Full}} } else if ipv4 == nil { expectV4 = 1 @@ -407,14 +408,14 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR } if l.enableIPv6 { - if lo.NoCache { - if len(l.ipv6)+l.allocatingV6 >= l.cap { + if localIPRequest.NoCache { + if len(l.ipv6)+l.allocatingV6.Len() >= l.cap { return nil, []Trace{{Condition: Full}} } expectV6 = 1 } else { ipv6 := l.ipv6.PeekAvailable(cni.PodID) - if ipv6 == nil && len(l.ipv6)+l.allocatingV6 >= l.cap { + if ipv6 == nil && len(l.ipv6)+l.allocatingV6.Len() >= l.cap { return nil, []Trace{{Condition: Full}} } else if ipv6 == nil { expectV6 = 1 @@ -427,19 +428,23 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR return nil, []Trace{{Condition: InsufficientVSwitchIP, Reason: fmt.Sprintf("alloc inhibit, expire at %s", l.ipAllocInhibitExpireAt.String())}} } - l.allocatingV4 += expectV4 - l.allocatingV6 += expectV6 + deadline := time.Now().Add(2 * time.Minute) + + for i := 0; i < expectV4; i++ { + l.allocatingV4 = append(l.allocatingV4, AllocatingRequest{deadline: deadline}) + } + for i := 0; i < expectV6; i++ { + l.allocatingV6 = append(l.allocatingV6, AllocatingRequest{deadline: deadline}) + } l.cond.Broadcast() respCh := make(chan *AllocResp) - go l.allocWorker(ctx, cni, lo, respCh, func() { + go l.allocWorker(ctx, cni, localIPRequest, respCh, func() { // current roll back ip at same time - l.allocatingV4 -= expectV4 - l.allocatingV4 = max(l.allocatingV4, 0) - l.allocatingV6 -= expectV6 - l.allocatingV6 = max(l.allocatingV6, 0) + lo.Drop(l.allocatingV4, expectV4) + lo.Drop(l.allocatingV6, expectV6) log.Info("rollback ipv4", "ipv4", expectV4) }) @@ -605,7 +610,7 @@ func (l *Local) factoryAllocWorker(ctx context.Context) { default: } - if l.allocatingV4 <= 0 && l.allocatingV6 <= 0 { + if l.allocatingV4.Len() <= 0 && l.allocatingV6.Len() <= 0 { l.cond.Wait() continue } @@ -629,8 +634,8 @@ func (l *Local) factoryAllocWorker(ctx context.Context) { if l.eni == nil { // create eni - v4Count := min(l.batchSize, max(l.allocatingV4, 1)) - v6Count := min(l.batchSize, l.allocatingV6) + v4Count := min(l.batchSize, max(l.allocatingV4.Len(), 1)) + v6Count := min(l.batchSize, l.allocatingV6.Len()) l.status = statusCreating l.cond.L.Unlock() @@ -669,11 +674,8 @@ func (l *Local) factoryAllocWorker(ctx context.Context) { l.eni = eni - l.allocatingV4 -= v4Count - l.allocatingV6 -= v6Count - - l.allocatingV4 = max(l.allocatingV4, 0) - l.allocatingV6 = max(l.allocatingV6, 0) + lo.Drop(l.allocatingV4, v4Count) + lo.Drop(l.allocatingV6, v6Count) primary, err := netip.ParseAddr(eni.PrimaryIP.IPv4.String()) if err == nil { @@ -693,8 +695,8 @@ func (l *Local) factoryAllocWorker(ctx context.Context) { l.status = statusInUse } else { eniID := l.eni.ID - v4Count := min(l.batchSize, l.allocatingV4) - v6Count := min(l.batchSize, l.allocatingV6) + v4Count := min(l.batchSize, l.allocatingV4.Len()) + v6Count := min(l.batchSize, l.allocatingV6.Len()) if v4Count > 0 { l.cond.L.Unlock() @@ -718,8 +720,7 @@ func (l *Local) factoryAllocWorker(ctx context.Context) { continue } - l.allocatingV4 -= len(ipv4Set) - l.allocatingV4 = max(l.allocatingV4, 0) + lo.Drop(l.allocatingV4, len(ipv4Set)) l.ipv4.PutValid(ipv4Set...) @@ -751,8 +752,7 @@ func (l *Local) factoryAllocWorker(ctx context.Context) { continue } - l.allocatingV6 -= len(ipv6Set) - l.allocatingV6 = max(l.allocatingV6, 0) + lo.Drop(l.allocatingV6, len(ipv6Set)) l.ipv6.PutValid(ipv6Set...) diff --git a/pkg/eni/types.go b/pkg/eni/types.go index c9142c63..c7f71846 100644 --- a/pkg/eni/types.go +++ b/pkg/eni/types.go @@ -4,6 +4,8 @@ import ( "net/netip" "time" + "github.com/samber/lo" + "github.com/AliyunContainerService/terway/rpc" "github.com/AliyunContainerService/terway/types/daemon" ) @@ -257,3 +259,20 @@ const ( NetworkInterfaceMismatch InsufficientVSwitchIP ) + +type AllocatingRequests []AllocatingRequest + +type AllocatingRequest struct { + deadline time.Time +} + +// Len return the valid slice size +func (a *AllocatingRequests) Len() int { + // true to keep + filtered := lo.Filter(*a, func(item AllocatingRequest, index int) bool { + return time.Now().Before(item.deadline) + }) + + *a = filtered + return len(*a) +} diff --git a/pkg/eni/types_test.go b/pkg/eni/types_test.go index 2bf72be6..b964dced 100644 --- a/pkg/eni/types_test.go +++ b/pkg/eni/types_test.go @@ -5,6 +5,7 @@ import ( "reflect" "sort" "testing" + "time" ) func Test_syncIPLocked(t *testing.T) { @@ -103,3 +104,49 @@ func TestSet_Allocatable(t *testing.T) { }) } } + +func TestAllocatingRequests_Len(t *testing.T) { + tests := []struct { + name string + requests AllocatingRequests + expected int + }{ + { + name: "No expired requests", + requests: AllocatingRequests{ + {deadline: time.Now().Add(1 * time.Hour)}, + {deadline: time.Now().Add(2 * time.Hour)}, + }, + expected: 2, + }, + { + name: "Some expired requests", + requests: AllocatingRequests{ + {deadline: time.Now().Add(-1 * time.Hour)}, + {deadline: time.Now().Add(1 * time.Hour)}, + }, + expected: 1, + }, + { + name: "All expired requests", + requests: AllocatingRequests{ + {deadline: time.Now().Add(-1 * time.Hour)}, + {deadline: time.Now().Add(-2 * time.Hour)}, + }, + expected: 0, + }, + { + name: "Empty requests", + requests: AllocatingRequests{}, + expected: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.requests.Len(); got != tt.expected { + t.Errorf("Len() = %v, want %v", got, tt.expected) + } + }) + } +}