Skip to content

Commit

Permalink
add the deadline for the allocating requests
Browse files Browse the repository at this point in the history
Signed-off-by: l1b0k <[email protected]>
  • Loading branch information
l1b0k committed Jan 3, 2025
1 parent 542c0ff commit 5f09866
Show file tree
Hide file tree
Showing 6 changed files with 397 additions and 59 deletions.
4 changes: 2 additions & 2 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (n *networkService) AllocIP(ctx context.Context, r *rpc.AllocIPRequest) (*r
if pod.PodENI {
resourceRequests = append(resourceRequests, &eni.RemoteIPRequest{})
} else {
req := &eni.LocalIPRequest{}
req := eni.NewLocalIPRequest()

Check warning on line 182 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L182

Added line #L182 was not covered by tests
if pod.ERdma {
req.LocalIPType = eni.LocalIPTypeERDMA
}
Expand All @@ -197,7 +197,7 @@ func (n *networkService) AllocIP(ctx context.Context, r *rpc.AllocIPRequest) (*r
if pod.PodENI || n.ipamType == types.IPAMTypeCRD {
resourceRequests = append(resourceRequests, &eni.RemoteIPRequest{})
} else {
req := &eni.LocalIPRequest{}
req := eni.NewLocalIPRequest()

Check warning on line 200 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L200

Added line #L200 was not covered by tests

if len(oldRes.GetResourceItemByType(daemon.ResourceTypeENI)) == 1 {
old := oldRes.GetResourceItemByType(daemon.ResourceTypeENI)[0]
Expand Down
165 changes: 123 additions & 42 deletions pkg/eni/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"sync"
"time"

"github.com/go-logr/logr"
"github.com/samber/lo"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/cache"
Expand Down Expand Up @@ -51,13 +53,24 @@ var rateLimit = rate.Every(1 * time.Minute / 10)

var _ ResourceRequest = &LocalIPRequest{}

func NewLocalIPRequest() *LocalIPRequest {
ctx, cancel := context.WithCancel(context.Background())
return &LocalIPRequest{
workerCtx: ctx,
cancel: cancel,
}
}

type LocalIPRequest struct {
NetworkInterfaceID string
LocalIPType string
IPv4 netip.Addr
IPv6 netip.Addr

NoCache bool // do not use cached ip

workerCtx context.Context
cancel context.CancelFunc
}

func (l *LocalIPRequest) ResourceType() ResourceType {
Expand Down Expand Up @@ -122,7 +135,9 @@ type Local struct {
batchSize int

cap int
allocatingV4, allocatingV6 int
allocatingV4, allocatingV6 AllocatingRequests
// danging, used for release
dangingV4, dangingV6 AllocatingRequests

eni *daemon.ENI
ipAllocInhibitExpireAt time.Time
Expand Down Expand Up @@ -396,30 +411,30 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
return nil, nil
}

lo, ok := request.(*LocalIPRequest)
localIPRequest, ok := request.(*LocalIPRequest)
if !ok {
return nil, []Trace{{Condition: ResourceTypeMismatch}}
}

if lo.NetworkInterfaceID != "" && l.eni != nil && l.eni.ID != lo.NetworkInterfaceID {
if localIPRequest.NetworkInterfaceID != "" && l.eni != nil && l.eni.ID != localIPRequest.NetworkInterfaceID {
return nil, []Trace{{Condition: NetworkInterfaceMismatch}}
}

log := logf.FromContext(ctx)
log.Info(fmt.Sprintf("local request %v", lo))
log := logr.FromContextOrDiscard(ctx)
log.Info(fmt.Sprintf("local request %v", localIPRequest))

expectV4 := 0
expectV6 := 0

if l.enableIPv4 {
if lo.NoCache {
if len(l.ipv4)+l.allocatingV4 >= l.cap {
if localIPRequest.NoCache {
if len(l.ipv4)+l.allocatingV4.Len() >= l.cap {
return nil, []Trace{{Condition: Full}}
}
expectV4 = 1
} else {
ipv4 := l.ipv4.PeekAvailable(cni.PodID)
if ipv4 == nil && len(l.ipv4)+l.allocatingV4 >= l.cap {
if ipv4 == nil && len(l.ipv4)+l.allocatingV4.Len() >= l.cap {
return nil, []Trace{{Condition: Full}}
} else if ipv4 == nil {
expectV4 = 1
Expand All @@ -428,14 +443,14 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
}

if l.enableIPv6 {
if lo.NoCache {
if len(l.ipv6)+l.allocatingV6 >= l.cap {
if localIPRequest.NoCache {
if len(l.ipv6)+l.allocatingV6.Len() >= l.cap {
return nil, []Trace{{Condition: Full}}
}
expectV6 = 1
} else {
ipv6 := l.ipv6.PeekAvailable(cni.PodID)
if ipv6 == nil && len(l.ipv6)+l.allocatingV6 >= l.cap {
if ipv6 == nil && len(l.ipv6)+l.allocatingV6.Len() >= l.cap {

Check warning on line 453 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L453

Added line #L453 was not covered by tests
return nil, []Trace{{Condition: Full}}
} else if ipv6 == nil {
expectV6 = 1
Expand All @@ -448,21 +463,18 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
return nil, []Trace{{Condition: InsufficientVSwitchIP, Reason: fmt.Sprintf("alloc inhibit, expire at %s", l.ipAllocInhibitExpireAt.String())}}
}

l.allocatingV4 += expectV4
l.allocatingV6 += expectV6
for i := 0; i < expectV4; i++ {
l.allocatingV4 = append(l.allocatingV4, localIPRequest)
}
for i := 0; i < expectV6; i++ {
l.allocatingV6 = append(l.allocatingV6, localIPRequest)
}

l.cond.Broadcast()

respCh := make(chan *AllocResp)

go l.allocWorker(ctx, cni, lo, respCh, func() {
// current roll back ip at same time
l.allocatingV4 -= expectV4
l.allocatingV4 = max(l.allocatingV4, 0)
l.allocatingV6 -= expectV6
l.allocatingV6 = max(l.allocatingV6, 0)
log.Info("rollback ipv4", "ipv4", expectV4)
})
go l.allocWorker(ctx, cni, localIPRequest, respCh)

return respCh, nil
}
Expand All @@ -482,7 +494,7 @@ func (l *Local) Release(ctx context.Context, cni *daemon.CNI, request NetworkRes
return false
}

log := logf.FromContext(ctx)
log := logr.FromContextOrDiscard(ctx)

if res.IP.IPv4.IsValid() {
l.ipv4.Release(cni.PodID, res.IP.IPv4)
Expand Down Expand Up @@ -530,13 +542,24 @@ func (l *Local) Priority() int {
}

// allocWorker started with each Allocate call
func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *LocalIPRequest, respCh chan *AllocResp, onErrLocked func()) {
func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *LocalIPRequest, respCh chan *AllocResp) {
done := make(chan struct{})
defer close(done)

l.cond.L.Lock()
defer l.cond.L.Unlock()

defer func() {
if request == nil {
return
}

l.switchIPv4(request)
l.switchIPv6(request)

request.cancel()
}()

go func() {
select {
case <-ctx.Done():
Expand All @@ -547,13 +570,11 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local
}
}()

log := logf.FromContext(ctx)
log := logr.FromContextOrDiscard(ctx)
for {
select {
case <-ctx.Done():
// parent cancel the context, so close the ch
onErrLocked()

close(respCh)
return
default:
Expand Down Expand Up @@ -613,7 +634,7 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local
func (l *Local) factoryAllocWorker(ctx context.Context) {
l.cond.L.Lock()

log := logf.FromContext(ctx)
log := logr.FromContextOrDiscard(ctx)
for {
if log.V(4).Enabled() {
log.V(4).Info("call allocWorker")
Expand All @@ -626,7 +647,7 @@ func (l *Local) factoryAllocWorker(ctx context.Context) {
default:
}

if l.allocatingV4 <= 0 && l.allocatingV6 <= 0 {
if l.allocatingV4.Len() <= 0 && l.allocatingV6.Len() <= 0 {
l.cond.Wait()
continue
}
Expand All @@ -650,8 +671,8 @@ func (l *Local) factoryAllocWorker(ctx context.Context) {

if l.eni == nil {
// create eni
v4Count := min(l.batchSize, max(l.allocatingV4, 1))
v6Count := min(l.batchSize, l.allocatingV6)
v4Count := min(l.batchSize, max(l.allocatingV4.Len(), 1))
v6Count := min(l.batchSize, l.allocatingV6.Len())

Check warning on line 675 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L674-L675

Added lines #L674 - L675 were not covered by tests

l.status = statusCreating
l.cond.L.Unlock()
Expand Down Expand Up @@ -690,11 +711,8 @@ func (l *Local) factoryAllocWorker(ctx context.Context) {

l.eni = eni

l.allocatingV4 -= v4Count
l.allocatingV6 -= v6Count

l.allocatingV4 = max(l.allocatingV4, 0)
l.allocatingV6 = max(l.allocatingV6, 0)
l.popNIPv4Jobs(v4Count)
l.popNIPv6Jobs(v6Count)

Check warning on line 715 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L714-L715

Added lines #L714 - L715 were not covered by tests

primary, err := netip.ParseAddr(eni.PrimaryIP.IPv4.String())
if err == nil {
Expand All @@ -714,8 +732,8 @@ func (l *Local) factoryAllocWorker(ctx context.Context) {
l.status = statusInUse
} else {
eniID := l.eni.ID
v4Count := min(l.batchSize, l.allocatingV4)
v6Count := min(l.batchSize, l.allocatingV6)
v4Count := min(l.batchSize, l.allocatingV4.Len())
v6Count := min(l.batchSize, l.allocatingV6.Len())

if v4Count > 0 {
l.cond.L.Unlock()
Expand All @@ -739,8 +757,7 @@ func (l *Local) factoryAllocWorker(ctx context.Context) {
continue
}

l.allocatingV4 -= len(ipv4Set)
l.allocatingV4 = max(l.allocatingV4, 0)
l.popNIPv4Jobs(len(ipv4Set))

l.ipv4.PutValid(ipv4Set...)

Expand Down Expand Up @@ -772,8 +789,7 @@ func (l *Local) factoryAllocWorker(ctx context.Context) {
continue
}

l.allocatingV6 -= len(ipv6Set)
l.allocatingV6 = max(l.allocatingV6, 0)
l.popNIPv6Jobs(len(ipv6Set))

l.ipv6.PutValid(ipv6Set...)

Expand Down Expand Up @@ -862,7 +878,7 @@ func (l *Local) Dispose(n int) int {
func (l *Local) factoryDisposeWorker(ctx context.Context) {
l.cond.L.Lock()

log := logf.FromContext(ctx)
log := logr.FromContextOrDiscard(ctx)

Check warning on line 881 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L881

Added line #L881 was not covered by tests
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -1083,3 +1099,68 @@ func parseResourceID(id string) (string, string, error) {
}
return parts[0], parts[1], nil
}

func (l *Local) switchIPv4(req *LocalIPRequest) {
found := false
l.allocatingV4 = lo.Filter(l.allocatingV4, func(item *LocalIPRequest, index int) bool {
if item != req {
// true to keep
return true
}
found = true
return false
})
if !found {
return
}

Check warning on line 1115 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L1114-L1115

Added lines #L1114 - L1115 were not covered by tests

if l.dangingV4.Len() == 0 {
// this may not happen
// call the Len() to make sure canceled job will be removed
return
}

Check warning on line 1121 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L1118-L1121

Added lines #L1118 - L1121 were not covered by tests
l.allocatingV4 = append(l.allocatingV4, l.dangingV4[0])
l.dangingV4 = l.dangingV4[1:]
}

func (l *Local) switchIPv6(req *LocalIPRequest) {
found := false
l.allocatingV6 = lo.Filter(l.allocatingV6, func(item *LocalIPRequest, index int) bool {
if item != req {
// true to keep
return true
}
found = true
return false
})
if !found {
return
}

Check warning on line 1138 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L1137-L1138

Added lines #L1137 - L1138 were not covered by tests

if l.dangingV6.Len() == 0 {
// this may not happen
return
}
l.allocatingV6 = append(l.allocatingV6, l.dangingV6[0])
l.dangingV6 = l.dangingV6[1:]
}

func (l *Local) popNIPv4Jobs(count int) {
firstPart, secondPart := Split(l.allocatingV4, count)
l.dangingV4 = append(l.dangingV4, firstPart...)
l.allocatingV4 = secondPart
}

func (l *Local) popNIPv6Jobs(count int) {
firstPart, secondPart := Split(l.allocatingV6, count)
l.dangingV6 = append(l.dangingV6, firstPart...)
l.allocatingV6 = secondPart
}

func Split[T any](arr []T, index int) ([]T, []T) {
if index < 0 || index > len(arr) {
return arr, nil
}

return arr[:index], arr[index:]
}
Loading

0 comments on commit 5f09866

Please sign in to comment.