From 542c0ffcb1609bfafb776d74cdc828bbb6a4035b Mon Sep 17 00:00:00 2001 From: l1b0k Date: Thu, 12 Dec 2024 11:59:39 +0800 Subject: [PATCH 1/2] validate the eni status to inUse resent eni attach behave changed, it will fail and rollback to Available , so check is necessary Signed-off-by: l1b0k --- pkg/aliyun/client/ecs.go | 2 +- pkg/factory/aliyun/aliyun.go | 31 +++++++++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/pkg/aliyun/client/ecs.go b/pkg/aliyun/client/ecs.go index 619366b2..74f21a8d 100644 --- a/pkg/aliyun/client/ecs.go +++ b/pkg/aliyun/client/ecs.go @@ -136,7 +136,7 @@ func (a *OpenAPI) DescribeNetworkInterface(ctx context.Context, vpcID string, en result = append(result, FromDescribeResp(&r)) } - l.V(4).Info("describe enis") + l.WithValues(LogFieldRequestID, resp.RequestId).Info("describe enis") if len(resp.NetworkInterfaceSets.NetworkInterfaceSet) < maxSinglePageSize { break diff --git a/pkg/factory/aliyun/aliyun.go b/pkg/factory/aliyun/aliyun.go index c3c0027c..35f6b1b5 100644 --- a/pkg/factory/aliyun/aliyun.go +++ b/pkg/factory/aliyun/aliyun.go @@ -2,6 +2,7 @@ package aliyun import ( "context" + "fmt" "net/netip" "strings" "time" @@ -180,6 +181,8 @@ func (a *Aliyun) CreateNetworkInterface(ipv4, ipv6 int, eniType string) (*daemon return r, nil, nil, err } + timeout := time.After(2 * time.Second) + // 3. wait metadata ready & update cidr err = validateIPInMetadata(ctx, v4Set, func() []netip.Addr { exists, err := metadata.GetIPv4ByMac(r.MAC) @@ -242,6 +245,34 @@ func (a *Aliyun) CreateNetworkInterface(ipv4, ipv6 int, eniType string) (*daemon r.GatewayIP.SetIP(gw.String()) } + // safe to use in go 1.23 + <-timeout + + var innerErr error + // we check openAPI at last to ensure the eni is at InUse status + err = wait.ExponentialBackoffWithContext(a.ctx, backoff.Backoff(backoff.ENIOps), func(ctx context.Context) (done bool, err error) { + var eniSet []*client.NetworkInterface + eniSet, innerErr = a.openAPI.DescribeNetworkInterface(ctx, "", []string{r.ID}, "", "", "", nil) + if innerErr != nil { + return false, nil + } + if len(eniSet) != 1 { + innerErr = fmt.Errorf("can not found eni %s, resp %#v", r.ID, eniSet) + return false, nil + } + if eniSet[0].Status != client.ENIStatusInUse { + innerErr = fmt.Errorf("eni %s at %s", r.ID, eniSet[0].Status) + return false, nil + } + return true, nil + }) + if err != nil { + if innerErr != nil { + err = innerErr + } + return r, v4Set, v6Set, err + } + return r, v4Set, v6Set, nil } From 5f09866505a770af85f37b45eff6cbe049eeb6fd Mon Sep 17 00:00:00 2001 From: l1b0k Date: Thu, 12 Dec 2024 16:04:46 +0800 Subject: [PATCH 2/2] add the deadline for the allocating requests Signed-off-by: l1b0k --- daemon/daemon.go | 4 +- pkg/eni/local.go | 165 ++++++++++++++++++++------- pkg/eni/local_test.go | 246 ++++++++++++++++++++++++++++++++++++++-- pkg/eni/manager.go | 5 +- pkg/eni/manager_test.go | 16 ++- pkg/eni/types.go | 20 ++++ 6 files changed, 397 insertions(+), 59 deletions(-) diff --git a/daemon/daemon.go b/daemon/daemon.go index b08555dd..294588c6 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -179,7 +179,7 @@ func (n *networkService) AllocIP(ctx context.Context, r *rpc.AllocIPRequest) (*r if pod.PodENI { resourceRequests = append(resourceRequests, &eni.RemoteIPRequest{}) } else { - req := &eni.LocalIPRequest{} + req := eni.NewLocalIPRequest() if pod.ERdma { req.LocalIPType = eni.LocalIPTypeERDMA } @@ -197,7 +197,7 @@ func (n *networkService) AllocIP(ctx context.Context, r *rpc.AllocIPRequest) (*r if pod.PodENI || n.ipamType == types.IPAMTypeCRD { resourceRequests = append(resourceRequests, &eni.RemoteIPRequest{}) } else { - req := &eni.LocalIPRequest{} + req := eni.NewLocalIPRequest() if len(oldRes.GetResourceItemByType(daemon.ResourceTypeENI)) == 1 { old := oldRes.GetResourceItemByType(daemon.ResourceTypeENI)[0] diff --git a/pkg/eni/local.go b/pkg/eni/local.go index 605e5c7e..d6bba351 100644 --- a/pkg/eni/local.go +++ b/pkg/eni/local.go @@ -11,6 +11,8 @@ import ( "sync" "time" + "github.com/go-logr/logr" + "github.com/samber/lo" "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/cache" @@ -51,6 +53,14 @@ var rateLimit = rate.Every(1 * time.Minute / 10) var _ ResourceRequest = &LocalIPRequest{} +func NewLocalIPRequest() *LocalIPRequest { + ctx, cancel := context.WithCancel(context.Background()) + return &LocalIPRequest{ + workerCtx: ctx, + cancel: cancel, + } +} + type LocalIPRequest struct { NetworkInterfaceID string LocalIPType string @@ -58,6 +68,9 @@ type LocalIPRequest struct { IPv6 netip.Addr NoCache bool // do not use cached ip + + workerCtx context.Context + cancel context.CancelFunc } func (l *LocalIPRequest) ResourceType() ResourceType { @@ -122,7 +135,9 @@ type Local struct { batchSize int cap int - allocatingV4, allocatingV6 int + allocatingV4, allocatingV6 AllocatingRequests + // danging, used for release + dangingV4, dangingV6 AllocatingRequests eni *daemon.ENI ipAllocInhibitExpireAt time.Time @@ -396,30 +411,30 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR return nil, nil } - lo, ok := request.(*LocalIPRequest) + localIPRequest, ok := request.(*LocalIPRequest) if !ok { return nil, []Trace{{Condition: ResourceTypeMismatch}} } - if lo.NetworkInterfaceID != "" && l.eni != nil && l.eni.ID != lo.NetworkInterfaceID { + if localIPRequest.NetworkInterfaceID != "" && l.eni != nil && l.eni.ID != localIPRequest.NetworkInterfaceID { return nil, []Trace{{Condition: NetworkInterfaceMismatch}} } - log := logf.FromContext(ctx) - log.Info(fmt.Sprintf("local request %v", lo)) + log := logr.FromContextOrDiscard(ctx) + log.Info(fmt.Sprintf("local request %v", localIPRequest)) expectV4 := 0 expectV6 := 0 if l.enableIPv4 { - if lo.NoCache { - if len(l.ipv4)+l.allocatingV4 >= l.cap { + if localIPRequest.NoCache { + if len(l.ipv4)+l.allocatingV4.Len() >= l.cap { return nil, []Trace{{Condition: Full}} } expectV4 = 1 } else { ipv4 := l.ipv4.PeekAvailable(cni.PodID) - if ipv4 == nil && len(l.ipv4)+l.allocatingV4 >= l.cap { + if ipv4 == nil && len(l.ipv4)+l.allocatingV4.Len() >= l.cap { return nil, []Trace{{Condition: Full}} } else if ipv4 == nil { expectV4 = 1 @@ -428,14 +443,14 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR } if l.enableIPv6 { - if lo.NoCache { - if len(l.ipv6)+l.allocatingV6 >= l.cap { + if localIPRequest.NoCache { + if len(l.ipv6)+l.allocatingV6.Len() >= l.cap { return nil, []Trace{{Condition: Full}} } expectV6 = 1 } else { ipv6 := l.ipv6.PeekAvailable(cni.PodID) - if ipv6 == nil && len(l.ipv6)+l.allocatingV6 >= l.cap { + if ipv6 == nil && len(l.ipv6)+l.allocatingV6.Len() >= l.cap { return nil, []Trace{{Condition: Full}} } else if ipv6 == nil { expectV6 = 1 @@ -448,21 +463,18 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR return nil, []Trace{{Condition: InsufficientVSwitchIP, Reason: fmt.Sprintf("alloc inhibit, expire at %s", l.ipAllocInhibitExpireAt.String())}} } - l.allocatingV4 += expectV4 - l.allocatingV6 += expectV6 + for i := 0; i < expectV4; i++ { + l.allocatingV4 = append(l.allocatingV4, localIPRequest) + } + for i := 0; i < expectV6; i++ { + l.allocatingV6 = append(l.allocatingV6, localIPRequest) + } l.cond.Broadcast() respCh := make(chan *AllocResp) - go l.allocWorker(ctx, cni, lo, respCh, func() { - // current roll back ip at same time - l.allocatingV4 -= expectV4 - l.allocatingV4 = max(l.allocatingV4, 0) - l.allocatingV6 -= expectV6 - l.allocatingV6 = max(l.allocatingV6, 0) - log.Info("rollback ipv4", "ipv4", expectV4) - }) + go l.allocWorker(ctx, cni, localIPRequest, respCh) return respCh, nil } @@ -482,7 +494,7 @@ func (l *Local) Release(ctx context.Context, cni *daemon.CNI, request NetworkRes return false } - log := logf.FromContext(ctx) + log := logr.FromContextOrDiscard(ctx) if res.IP.IPv4.IsValid() { l.ipv4.Release(cni.PodID, res.IP.IPv4) @@ -530,13 +542,24 @@ func (l *Local) Priority() int { } // allocWorker started with each Allocate call -func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *LocalIPRequest, respCh chan *AllocResp, onErrLocked func()) { +func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *LocalIPRequest, respCh chan *AllocResp) { done := make(chan struct{}) defer close(done) l.cond.L.Lock() defer l.cond.L.Unlock() + defer func() { + if request == nil { + return + } + + l.switchIPv4(request) + l.switchIPv6(request) + + request.cancel() + }() + go func() { select { case <-ctx.Done(): @@ -547,13 +570,11 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local } }() - log := logf.FromContext(ctx) + log := logr.FromContextOrDiscard(ctx) for { select { case <-ctx.Done(): // parent cancel the context, so close the ch - onErrLocked() - close(respCh) return default: @@ -613,7 +634,7 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local func (l *Local) factoryAllocWorker(ctx context.Context) { l.cond.L.Lock() - log := logf.FromContext(ctx) + log := logr.FromContextOrDiscard(ctx) for { if log.V(4).Enabled() { log.V(4).Info("call allocWorker") @@ -626,7 +647,7 @@ func (l *Local) factoryAllocWorker(ctx context.Context) { default: } - if l.allocatingV4 <= 0 && l.allocatingV6 <= 0 { + if l.allocatingV4.Len() <= 0 && l.allocatingV6.Len() <= 0 { l.cond.Wait() continue } @@ -650,8 +671,8 @@ func (l *Local) factoryAllocWorker(ctx context.Context) { if l.eni == nil { // create eni - v4Count := min(l.batchSize, max(l.allocatingV4, 1)) - v6Count := min(l.batchSize, l.allocatingV6) + v4Count := min(l.batchSize, max(l.allocatingV4.Len(), 1)) + v6Count := min(l.batchSize, l.allocatingV6.Len()) l.status = statusCreating l.cond.L.Unlock() @@ -690,11 +711,8 @@ func (l *Local) factoryAllocWorker(ctx context.Context) { l.eni = eni - l.allocatingV4 -= v4Count - l.allocatingV6 -= v6Count - - l.allocatingV4 = max(l.allocatingV4, 0) - l.allocatingV6 = max(l.allocatingV6, 0) + l.popNIPv4Jobs(v4Count) + l.popNIPv6Jobs(v6Count) primary, err := netip.ParseAddr(eni.PrimaryIP.IPv4.String()) if err == nil { @@ -714,8 +732,8 @@ func (l *Local) factoryAllocWorker(ctx context.Context) { l.status = statusInUse } else { eniID := l.eni.ID - v4Count := min(l.batchSize, l.allocatingV4) - v6Count := min(l.batchSize, l.allocatingV6) + v4Count := min(l.batchSize, l.allocatingV4.Len()) + v6Count := min(l.batchSize, l.allocatingV6.Len()) if v4Count > 0 { l.cond.L.Unlock() @@ -739,8 +757,7 @@ func (l *Local) factoryAllocWorker(ctx context.Context) { continue } - l.allocatingV4 -= len(ipv4Set) - l.allocatingV4 = max(l.allocatingV4, 0) + l.popNIPv4Jobs(len(ipv4Set)) l.ipv4.PutValid(ipv4Set...) @@ -772,8 +789,7 @@ func (l *Local) factoryAllocWorker(ctx context.Context) { continue } - l.allocatingV6 -= len(ipv6Set) - l.allocatingV6 = max(l.allocatingV6, 0) + l.popNIPv6Jobs(len(ipv6Set)) l.ipv6.PutValid(ipv6Set...) @@ -862,7 +878,7 @@ func (l *Local) Dispose(n int) int { func (l *Local) factoryDisposeWorker(ctx context.Context) { l.cond.L.Lock() - log := logf.FromContext(ctx) + log := logr.FromContextOrDiscard(ctx) for { select { case <-ctx.Done(): @@ -1083,3 +1099,68 @@ func parseResourceID(id string) (string, string, error) { } return parts[0], parts[1], nil } + +func (l *Local) switchIPv4(req *LocalIPRequest) { + found := false + l.allocatingV4 = lo.Filter(l.allocatingV4, func(item *LocalIPRequest, index int) bool { + if item != req { + // true to keep + return true + } + found = true + return false + }) + if !found { + return + } + + if l.dangingV4.Len() == 0 { + // this may not happen + // call the Len() to make sure canceled job will be removed + return + } + l.allocatingV4 = append(l.allocatingV4, l.dangingV4[0]) + l.dangingV4 = l.dangingV4[1:] +} + +func (l *Local) switchIPv6(req *LocalIPRequest) { + found := false + l.allocatingV6 = lo.Filter(l.allocatingV6, func(item *LocalIPRequest, index int) bool { + if item != req { + // true to keep + return true + } + found = true + return false + }) + if !found { + return + } + + if l.dangingV6.Len() == 0 { + // this may not happen + return + } + l.allocatingV6 = append(l.allocatingV6, l.dangingV6[0]) + l.dangingV6 = l.dangingV6[1:] +} + +func (l *Local) popNIPv4Jobs(count int) { + firstPart, secondPart := Split(l.allocatingV4, count) + l.dangingV4 = append(l.dangingV4, firstPart...) + l.allocatingV4 = secondPart +} + +func (l *Local) popNIPv6Jobs(count int) { + firstPart, secondPart := Split(l.allocatingV6, count) + l.dangingV6 = append(l.dangingV6, firstPart...) + l.allocatingV6 = secondPart +} + +func Split[T any](arr []T, index int) ([]T, []T) { + if index < 0 || index > len(arr) { + return arr, nil + } + + return arr[:index], arr[index:] +} diff --git a/pkg/eni/local_test.go b/pkg/eni/local_test.go index c0d52a3d..582b9814 100644 --- a/pkg/eni/local_test.go +++ b/pkg/eni/local_test.go @@ -14,6 +14,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "github.com/AliyunContainerService/terway/pkg/factory" + factorymocks "github.com/AliyunContainerService/terway/pkg/factory/mocks" "github.com/AliyunContainerService/terway/types" "github.com/AliyunContainerService/terway/types/daemon" ) @@ -119,7 +120,7 @@ func TestLocal_AllocWorker_EnableIPv4(t *testing.T) { cni := &daemon.CNI{PodID: "pod-1"} respCh := make(chan *AllocResp) - go local.allocWorker(context.Background(), cni, nil, respCh, func() {}) + go local.allocWorker(context.Background(), cni, nil, respCh) go func() { local.cond.L.Lock() @@ -144,7 +145,7 @@ func TestLocal_AllocWorker_EnableIPv6(t *testing.T) { cni := &daemon.CNI{PodID: "pod-1"} respCh := make(chan *AllocResp) - go local.allocWorker(context.Background(), cni, nil, respCh, func() {}) + go local.allocWorker(context.Background(), cni, nil, respCh) go func() { local.cond.L.Lock() @@ -170,7 +171,7 @@ func TestLocal_AllocWorker_ParentCancelContext(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) respCh := make(chan *AllocResp) - go local.allocWorker(ctx, cni, nil, respCh, func() {}) + go local.allocWorker(ctx, cni, nil, respCh) cancel() @@ -209,7 +210,8 @@ func TestLocal_DisposeWholeENI(t *testing.T) { func TestLocal_Allocate_NoCache(t *testing.T) { local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, nil, &types.PoolConfig{MaxIPPerENI: 2, EnableIPv4: true}, "") - request := &LocalIPRequest{NoCache: true} + request := NewLocalIPRequest() + request.NoCache = true cni := &daemon.CNI{PodID: "pod-1"} local.ipv4.Add(NewValidIP(netip.MustParseAddr("192.0.2.1"), false)) @@ -220,6 +222,22 @@ func TestLocal_Allocate_NoCache(t *testing.T) { assert.Equal(t, 1, len(resp)) } +func TestLocal_Allocate_NoCache_AllocSuccess(t *testing.T) { + local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, nil, &types.PoolConfig{ + MaxIPPerENI: 10, EnableIPv4: true, EnableIPv6: true}, "") + + request := NewLocalIPRequest() + request.NoCache = true + cni := &daemon.CNI{PodID: "pod-1"} + + local.ipv4.Add(NewValidIP(netip.MustParseAddr("192.0.2.1"), false)) + local.ipv4.Add(NewValidIP(netip.MustParseAddr("192.0.2.2"), false)) + + ch, resp := local.Allocate(context.Background(), cni, request) + assert.NotNil(t, ch) + assert.Equal(t, 0, len(resp)) +} + func TestLocal_DisposeWholeERDMA(t *testing.T) { local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, nil, &types.PoolConfig{}, "erdma") local.status = statusInUse @@ -234,7 +252,8 @@ func TestLocal_DisposeWholeERDMA(t *testing.T) { func TestLocal_Allocate_ERDMA(t *testing.T) { localErdma := NewLocalTest(&daemon.ENI{ID: "eni-1"}, nil, &types.PoolConfig{MaxIPPerENI: 2, EnableIPv4: true}, "erdma") - request := &LocalIPRequest{NoCache: true} + request := NewLocalIPRequest() + request.NoCache = true cni := &daemon.CNI{PodID: "pod-1"} localErdma.ipv4.Add(NewValidIP(netip.MustParseAddr("192.0.2.1"), false)) @@ -245,7 +264,9 @@ func TestLocal_Allocate_ERDMA(t *testing.T) { assert.Equal(t, 1, len(resp)) assert.Equal(t, ResourceTypeMismatch, resp[0].Condition) - request = &LocalIPRequest{NoCache: true, LocalIPType: LocalIPTypeERDMA} + request = NewLocalIPRequest() + request.NoCache = true + request.LocalIPType = LocalIPTypeERDMA _, resp = localErdma.Allocate(context.Background(), cni, request) assert.Equal(t, 1, len(resp)) @@ -255,13 +276,16 @@ func TestLocal_Allocate_ERDMA(t *testing.T) { local.ipv4.Add(NewValidIP(netip.MustParseAddr("192.0.2.1"), false)) local.ipv4.Add(NewValidIP(netip.MustParseAddr("192.0.2.2"), false)) - request = &LocalIPRequest{NoCache: true} + request = NewLocalIPRequest() + request.NoCache = true _, resp = local.Allocate(context.Background(), cni, request) assert.Equal(t, 1, len(resp)) assert.NotEqual(t, ResourceTypeMismatch, resp[0].Condition) - request = &LocalIPRequest{NoCache: true, LocalIPType: LocalIPTypeERDMA} + request = NewLocalIPRequest() + request.NoCache = true + request.LocalIPType = LocalIPTypeERDMA _, resp = local.Allocate(context.Background(), cni, request) assert.Equal(t, 1, len(resp)) @@ -271,7 +295,7 @@ func TestLocal_Allocate_ERDMA(t *testing.T) { func TestLocal_Allocate_Inhibit(t *testing.T) { local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, nil, &types.PoolConfig{MaxIPPerENI: 2, EnableIPv4: true}, "") - request := &LocalIPRequest{} + request := NewLocalIPRequest() cni := &daemon.CNI{PodID: "pod-1"} local.ipAllocInhibitExpireAt = time.Now().Add(time.Minute) @@ -338,3 +362,207 @@ func Test_orphanIP(t *testing.T) { v, _ = invalidIPCache.Get(netip.MustParseAddr("127.0.0.2")) assert.Equal(t, 2, v) } + +func Test_switchIPv4(t *testing.T) { + l := &Local{} + + req := NewLocalIPRequest() + l.allocatingV4 = append(l.allocatingV4, + req, + NewLocalIPRequest(), + ) + + l.dangingV4 = append(l.dangingV4, NewLocalIPRequest()) + + l.switchIPv4(req) + assert.Equal(t, 2, l.allocatingV4.Len()) + assert.Equal(t, 0, l.dangingV4.Len()) +} + +func Test_switchIPv6(t *testing.T) { + l := &Local{} + + req := NewLocalIPRequest() + l.allocatingV6 = append(l.allocatingV6, + req, + NewLocalIPRequest(), + ) + + l.switchIPv6(req) + assert.Equal(t, 1, l.allocatingV6.Len()) + assert.Equal(t, 0, l.dangingV6.Len()) +} + +func TestPopNIPv4JobsMovesCorrectNumberOfJobs(t *testing.T) { + l := &Local{ + allocatingV4: AllocatingRequests{NewLocalIPRequest(), NewLocalIPRequest(), NewLocalIPRequest()}, + dangingV4: AllocatingRequests{}, + } + + l.popNIPv4Jobs(2) + + assert.Len(t, l.allocatingV4, 1) + assert.Len(t, l.dangingV4, 2) +} + +func TestPopNIPv6JobsMovesCorrectNumberOfJobs(t *testing.T) { + l := &Local{ + allocatingV6: AllocatingRequests{NewLocalIPRequest(), NewLocalIPRequest(), NewLocalIPRequest()}, + dangingV6: AllocatingRequests{}, + } + + l.popNIPv6Jobs(2) + + assert.Len(t, l.allocatingV6, 1) + assert.Len(t, l.dangingV6, 2) +} + +func TestPopNIPv6JobsMovesAllJobsWhenCountExceeds(t *testing.T) { + l := &Local{ + allocatingV6: AllocatingRequests{NewLocalIPRequest(), NewLocalIPRequest()}, + dangingV6: AllocatingRequests{}, + } + + l.popNIPv6Jobs(5) + + assert.Len(t, l.allocatingV6, 0) + assert.Len(t, l.dangingV6, 2) +} + +func TestPopNIPv6JobsMovesNoJobsWhenCountIsZero(t *testing.T) { + l := &Local{ + allocatingV6: AllocatingRequests{NewLocalIPRequest(), NewLocalIPRequest()}, + dangingV6: AllocatingRequests{}, + } + + l.popNIPv6Jobs(0) + + assert.Len(t, l.allocatingV6, 2) + assert.Len(t, l.dangingV6, 0) +} + +func TestPriorityReturnsNegativeWhenStatusIsDeleting(t *testing.T) { + l := &Local{ + cond: sync.NewCond(&sync.Mutex{}), + status: statusDeleting, + } + + prio := l.Priority() + + assert.Equal(t, -100, prio) +} + +func TestPriorityReturnsZeroWhenStatusIsInit(t *testing.T) { + l := &Local{ + cond: sync.NewCond(&sync.Mutex{}), + status: statusInit, + } + + prio := l.Priority() + + assert.Equal(t, 0, prio) +} + +func TestPriorityReturnsTenWhenStatusIsCreating(t *testing.T) { + l := &Local{ + cond: sync.NewCond(&sync.Mutex{}), + status: statusCreating, + } + + prio := l.Priority() + + assert.Equal(t, 10, prio) +} + +func TestPriorityReturnsFiftyPlusIPv4CountWhenStatusIsInUseAndIPv4Enabled(t *testing.T) { + l := &Local{ + cond: sync.NewCond(&sync.Mutex{}), + status: statusInUse, + enableIPv4: true, + ipv4: Set{netip.MustParseAddr("192.0.2.1"): &IP{}}, + } + + prio := l.Priority() + + assert.Equal(t, 51, prio) +} + +func TestPriorityReturnsFiftyPlusIPv6CountWhenStatusIsInUseAndIPv6Enabled(t *testing.T) { + l := &Local{ + cond: sync.NewCond(&sync.Mutex{}), + status: statusInUse, + enableIPv6: true, + ipv6: Set{netip.MustParseAddr("fd00:46dd:e::1"): &IP{}}, + } + + prio := l.Priority() + + assert.Equal(t, 51, prio) +} + +func TestPriorityReturnsFiftyPlusIPv4AndIPv6CountWhenStatusIsInUseAndBothEnabled(t *testing.T) { + l := &Local{ + cond: sync.NewCond(&sync.Mutex{}), + status: statusInUse, + enableIPv4: true, + enableIPv6: true, + ipv4: Set{netip.MustParseAddr("192.0.2.1"): &IP{}}, + ipv6: Set{netip.MustParseAddr("fd00:46dd:e::1"): &IP{}}, + } + + prio := l.Priority() + + assert.Equal(t, 52, prio) +} + +func TestAllocFromFactory(t *testing.T) { + // 1. test factory worker finish req1, and alloc worker consumed req2 + + f := factorymocks.NewFactory(t) + // even we have two jobs ,we only get one ip + f.On("AssignNIPv4", "eni-1", 2, "").Return([]netip.Addr{netip.MustParseAddr("192.0.2.1")}, nil).Once() + f.On("AssignNIPv6", "eni-1", 2, "").Return([]netip.Addr{netip.MustParseAddr("fd00::1")}, nil).Once() + f.On("AssignNIPv4", "eni-1", 1, "").Return(nil, nil).Maybe() + f.On("AssignNIPv6", "eni-1", 1, "").Return(nil, nil).Maybe() + + local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, f, &types.PoolConfig{ + EnableIPv4: true, + EnableIPv6: true, + BatchSize: 10, + }, "") + local.status = statusInUse + + req1 := NewLocalIPRequest() + req2 := NewLocalIPRequest() + + local.allocatingV4 = append(local.allocatingV4, req1, req2) + local.allocatingV6 = append(local.allocatingV6, req1, req2) + + ctx, cancel := context.WithCancel(context.Background()) + + // expect req1 is moved to danging + go local.factoryAllocWorker(ctx) + + local.cond.Broadcast() + req2Ch := make(chan *AllocResp) + go local.allocWorker(ctx, &daemon.CNI{}, req2, req2Ch) + + <-req2Ch + cancel() + + local.cond.Broadcast() + // worker may not exist + time.Sleep(time.Second) + + local.cond.L.Lock() + defer local.cond.L.Unlock() + + assert.Equal(t, 1, len(local.allocatingV4)) + assert.Equal(t, 0, len(local.dangingV4)) + assert.Equal(t, 1, len(local.allocatingV6)) + assert.Equal(t, 0, len(local.dangingV6)) + + // check the job is switched + assert.Equal(t, req1, local.allocatingV4[0]) + assert.Equal(t, req1, local.allocatingV6[0]) +} diff --git a/pkg/eni/manager.go b/pkg/eni/manager.go index 4cf7e651..9ef674e5 100644 --- a/pkg/eni/manager.go +++ b/pkg/eni/manager.go @@ -328,9 +328,12 @@ func (m *Manager) syncPool(ctx context.Context) { go func() { defer wg.Done() + req := NewLocalIPRequest() + req.NoCache = true + _, err := m.Allocate(ctx, &daemon.CNI{}, &AllocRequest{ ResourceRequests: []ResourceRequest{ - &LocalIPRequest{NoCache: true}, + req, }, }) if err != nil { diff --git a/pkg/eni/manager_test.go b/pkg/eni/manager_test.go index ecb02c31..7275c344 100644 --- a/pkg/eni/manager_test.go +++ b/pkg/eni/manager_test.go @@ -86,8 +86,9 @@ func TestManagerAllocateReturnsResourcesWhenSuccessful(t *testing.T) { mockNI := &success{} manager := NewManager(0, 0, 0, 0, []NetworkInterface{mockNI}, types.EniSelectionPolicyMostIPs, &FakeK8s{}) + request := NewLocalIPRequest() resources, err := manager.Allocate(context.Background(), &daemon.CNI{}, &AllocRequest{ - ResourceRequests: []ResourceRequest{&LocalIPRequest{}}, + ResourceRequests: []ResourceRequest{request}, }) assert.Nil(t, err) @@ -109,8 +110,9 @@ func TestManagerAllocateSelectionPolicy(t *testing.T) { { manager := NewManager(0, 0, 0, 0, []NetworkInterface{mockNI, mockNI2}, types.EniSelectionPolicyMostIPs, &FakeK8s{}) + request := NewLocalIPRequest() resources, err := manager.Allocate(context.Background(), &daemon.CNI{}, &AllocRequest{ - ResourceRequests: []ResourceRequest{&LocalIPRequest{}}, + ResourceRequests: []ResourceRequest{request}, }) assert.Nil(t, err) @@ -121,8 +123,9 @@ func TestManagerAllocateSelectionPolicy(t *testing.T) { { manager := NewManager(0, 0, 0, 0, []NetworkInterface{mockNI, mockNI2}, types.EniSelectionPolicyLeastIPs, &FakeK8s{}) + request := NewLocalIPRequest() resources, err := manager.Allocate(context.Background(), &daemon.CNI{}, &AllocRequest{ - ResourceRequests: []ResourceRequest{&LocalIPRequest{}}, + ResourceRequests: []ResourceRequest{request}, }) assert.Nil(t, err) @@ -134,8 +137,9 @@ func TestManagerAllocateSelectionPolicy(t *testing.T) { func TestManagerAllocateReturnsErrorWhenNoBackendCanHandleAllocation(t *testing.T) { manager := NewManager(0, 0, 0, 0, []NetworkInterface{}, types.EniSelectionPolicyMostIPs, &FakeK8s{}) + request := NewLocalIPRequest() _, err := manager.Allocate(context.Background(), &daemon.CNI{}, &AllocRequest{ - ResourceRequests: []ResourceRequest{&LocalIPRequest{}}, + ResourceRequests: []ResourceRequest{request}, }) assert.NotNil(t, err) @@ -147,8 +151,10 @@ func TestManagerAllocateWithTimeoutWhenAllocationFails(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() + + request := NewLocalIPRequest() _, err := manager.Allocate(ctx, &daemon.CNI{}, &AllocRequest{ - ResourceRequests: []ResourceRequest{&LocalIPRequest{}}, + ResourceRequests: []ResourceRequest{request}, }) assert.NotNil(t, err) } diff --git a/pkg/eni/types.go b/pkg/eni/types.go index c9142c63..cc7bda9b 100644 --- a/pkg/eni/types.go +++ b/pkg/eni/types.go @@ -4,6 +4,8 @@ import ( "net/netip" "time" + "github.com/samber/lo" + "github.com/AliyunContainerService/terway/rpc" "github.com/AliyunContainerService/terway/types/daemon" ) @@ -257,3 +259,21 @@ const ( NetworkInterfaceMismatch InsufficientVSwitchIP ) + +type AllocatingRequests []*LocalIPRequest + +// Len return the valid slice size +func (a *AllocatingRequests) Len() int { + // true to keep + filtered := lo.Filter(*a, func(item *LocalIPRequest, index int) bool { + select { + case <-item.workerCtx.Done(): + return false + default: + return true + } + }) + + *a = filtered + return len(*a) +}