Skip to content

Commit

Permalink
add the deadline for the allocating requests
Browse files Browse the repository at this point in the history
Signed-off-by: l1b0k <[email protected]>
  • Loading branch information
l1b0k committed Dec 17, 2024
1 parent 495e3c0 commit a747478
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 31 deletions.
62 changes: 31 additions & 31 deletions pkg/eni/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"sync"
"time"

"github.com/samber/lo"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/cache"
Expand Down Expand Up @@ -122,7 +123,7 @@ type Local struct {
batchSize int

cap int
allocatingV4, allocatingV6 int
allocatingV4, allocatingV6 AllocatingRequests

eni *daemon.ENI
ipAllocInhibitExpireAt time.Time
Expand Down Expand Up @@ -375,30 +376,30 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
return nil, nil
}

lo, ok := request.(*LocalIPRequest)
localIPRequest, ok := request.(*LocalIPRequest)
if !ok {
return nil, []Trace{{Condition: ResourceTypeMismatch}}
}

if lo.NetworkInterfaceID != "" && l.eni != nil && l.eni.ID != lo.NetworkInterfaceID {
if localIPRequest.NetworkInterfaceID != "" && l.eni != nil && l.eni.ID != localIPRequest.NetworkInterfaceID {
return nil, []Trace{{Condition: NetworkInterfaceMismatch}}
}

log := logf.FromContext(ctx)
log.Info(fmt.Sprintf("local request %v", lo))
log.Info(fmt.Sprintf("local request %v", localIPRequest))

expectV4 := 0
expectV6 := 0

if l.enableIPv4 {
if lo.NoCache {
if len(l.ipv4)+l.allocatingV4 >= l.cap {
if localIPRequest.NoCache {
if len(l.ipv4)+l.allocatingV4.Len() >= l.cap {
return nil, []Trace{{Condition: Full}}
}
expectV4 = 1
} else {
ipv4 := l.ipv4.PeekAvailable(cni.PodID)
if ipv4 == nil && len(l.ipv4)+l.allocatingV4 >= l.cap {
if ipv4 == nil && len(l.ipv4)+l.allocatingV4.Len() >= l.cap {
return nil, []Trace{{Condition: Full}}
} else if ipv4 == nil {
expectV4 = 1
Expand All @@ -407,14 +408,14 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
}

if l.enableIPv6 {
if lo.NoCache {
if len(l.ipv6)+l.allocatingV6 >= l.cap {
if localIPRequest.NoCache {
if len(l.ipv6)+l.allocatingV6.Len() >= l.cap {

Check warning on line 412 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L411-L412

Added lines #L411 - L412 were not covered by tests
return nil, []Trace{{Condition: Full}}
}
expectV6 = 1
} else {
ipv6 := l.ipv6.PeekAvailable(cni.PodID)
if ipv6 == nil && len(l.ipv6)+l.allocatingV6 >= l.cap {
if ipv6 == nil && len(l.ipv6)+l.allocatingV6.Len() >= l.cap {

Check warning on line 418 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L418

Added line #L418 was not covered by tests
return nil, []Trace{{Condition: Full}}
} else if ipv6 == nil {
expectV6 = 1
Expand All @@ -427,19 +428,23 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
return nil, []Trace{{Condition: InsufficientVSwitchIP, Reason: fmt.Sprintf("alloc inhibit, expire at %s", l.ipAllocInhibitExpireAt.String())}}
}

l.allocatingV4 += expectV4
l.allocatingV6 += expectV6
deadline := time.Now().Add(2 * time.Minute)

for i := 0; i < expectV4; i++ {
l.allocatingV4 = append(l.allocatingV4, AllocatingRequest{deadline: deadline})
}
for i := 0; i < expectV6; i++ {
l.allocatingV6 = append(l.allocatingV6, AllocatingRequest{deadline: deadline})
}

Check warning on line 438 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L431-L438

Added lines #L431 - L438 were not covered by tests

l.cond.Broadcast()

respCh := make(chan *AllocResp)

go l.allocWorker(ctx, cni, lo, respCh, func() {
go l.allocWorker(ctx, cni, localIPRequest, respCh, func() {

Check warning on line 444 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L444

Added line #L444 was not covered by tests
// current roll back ip at same time
l.allocatingV4 -= expectV4
l.allocatingV4 = max(l.allocatingV4, 0)
l.allocatingV6 -= expectV6
l.allocatingV6 = max(l.allocatingV6, 0)
lo.Drop(l.allocatingV4, expectV4)
lo.Drop(l.allocatingV6, expectV6)

Check warning on line 447 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L446-L447

Added lines #L446 - L447 were not covered by tests
log.Info("rollback ipv4", "ipv4", expectV4)
})

Expand Down Expand Up @@ -605,7 +610,7 @@ func (l *Local) factoryAllocWorker(ctx context.Context) {
default:
}

if l.allocatingV4 <= 0 && l.allocatingV6 <= 0 {
if l.allocatingV4.Len() <= 0 && l.allocatingV6.Len() <= 0 {

Check warning on line 613 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L613

Added line #L613 was not covered by tests
l.cond.Wait()
continue
}
Expand All @@ -629,8 +634,8 @@ func (l *Local) factoryAllocWorker(ctx context.Context) {

if l.eni == nil {
// create eni
v4Count := min(l.batchSize, max(l.allocatingV4, 1))
v6Count := min(l.batchSize, l.allocatingV6)
v4Count := min(l.batchSize, max(l.allocatingV4.Len(), 1))
v6Count := min(l.batchSize, l.allocatingV6.Len())

Check warning on line 638 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L637-L638

Added lines #L637 - L638 were not covered by tests

l.status = statusCreating
l.cond.L.Unlock()
Expand Down Expand Up @@ -669,11 +674,8 @@ func (l *Local) factoryAllocWorker(ctx context.Context) {

l.eni = eni

l.allocatingV4 -= v4Count
l.allocatingV6 -= v6Count

l.allocatingV4 = max(l.allocatingV4, 0)
l.allocatingV6 = max(l.allocatingV6, 0)
lo.Drop(l.allocatingV4, v4Count)
lo.Drop(l.allocatingV6, v6Count)

Check warning on line 678 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L677-L678

Added lines #L677 - L678 were not covered by tests

primary, err := netip.ParseAddr(eni.PrimaryIP.IPv4.String())
if err == nil {
Expand All @@ -693,8 +695,8 @@ func (l *Local) factoryAllocWorker(ctx context.Context) {
l.status = statusInUse
} else {
eniID := l.eni.ID
v4Count := min(l.batchSize, l.allocatingV4)
v6Count := min(l.batchSize, l.allocatingV6)
v4Count := min(l.batchSize, l.allocatingV4.Len())
v6Count := min(l.batchSize, l.allocatingV6.Len())

Check warning on line 699 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L698-L699

Added lines #L698 - L699 were not covered by tests

if v4Count > 0 {
l.cond.L.Unlock()
Expand All @@ -718,8 +720,7 @@ func (l *Local) factoryAllocWorker(ctx context.Context) {
continue
}

l.allocatingV4 -= len(ipv4Set)
l.allocatingV4 = max(l.allocatingV4, 0)
lo.Drop(l.allocatingV4, len(ipv4Set))

Check warning on line 723 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L723

Added line #L723 was not covered by tests

l.ipv4.PutValid(ipv4Set...)

Expand Down Expand Up @@ -751,8 +752,7 @@ func (l *Local) factoryAllocWorker(ctx context.Context) {
continue
}

l.allocatingV6 -= len(ipv6Set)
l.allocatingV6 = max(l.allocatingV6, 0)
lo.Drop(l.allocatingV6, len(ipv6Set))

Check warning on line 755 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L755

Added line #L755 was not covered by tests

l.ipv6.PutValid(ipv6Set...)

Expand Down
19 changes: 19 additions & 0 deletions pkg/eni/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"net/netip"
"time"

"github.com/samber/lo"

"github.com/AliyunContainerService/terway/rpc"
"github.com/AliyunContainerService/terway/types/daemon"
)
Expand Down Expand Up @@ -257,3 +259,20 @@ const (
NetworkInterfaceMismatch
InsufficientVSwitchIP
)

type AllocatingRequests []AllocatingRequest

type AllocatingRequest struct {
deadline time.Time
}

// Len return the valid slice size
func (a *AllocatingRequests) Len() int {
// true to keep
filtered := lo.Filter(*a, func(item AllocatingRequest, index int) bool {
return time.Now().Before(item.deadline)
})

*a = filtered
return len(*a)
}
47 changes: 47 additions & 0 deletions pkg/eni/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"reflect"
"sort"
"testing"
"time"
)

func Test_syncIPLocked(t *testing.T) {
Expand Down Expand Up @@ -103,3 +104,49 @@ func TestSet_Allocatable(t *testing.T) {
})
}
}

func TestAllocatingRequests_Len(t *testing.T) {
tests := []struct {
name string
requests AllocatingRequests
expected int
}{
{
name: "No expired requests",
requests: AllocatingRequests{
{deadline: time.Now().Add(1 * time.Hour)},
{deadline: time.Now().Add(2 * time.Hour)},
},
expected: 2,
},
{
name: "Some expired requests",
requests: AllocatingRequests{
{deadline: time.Now().Add(-1 * time.Hour)},
{deadline: time.Now().Add(1 * time.Hour)},
},
expected: 1,
},
{
name: "All expired requests",
requests: AllocatingRequests{
{deadline: time.Now().Add(-1 * time.Hour)},
{deadline: time.Now().Add(-2 * time.Hour)},
},
expected: 0,
},
{
name: "Empty requests",
requests: AllocatingRequests{},
expected: 0,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := tt.requests.Len(); got != tt.expected {
t.Errorf("Len() = %v, want %v", got, tt.expected)
}
})
}
}

0 comments on commit a747478

Please sign in to comment.