Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add the deadline for the allocating requests #740

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@
if pod.PodENI {
resourceRequests = append(resourceRequests, &eni.RemoteIPRequest{})
} else {
req := &eni.LocalIPRequest{}
req := eni.NewLocalIPRequest()

Check warning on line 193 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L193

Added line #L193 was not covered by tests
if pod.ERdma {
req.LocalIPType = eni.LocalIPTypeERDMA
}
Expand All @@ -208,7 +208,7 @@
if pod.PodENI || n.ipamType == types.IPAMTypeCRD {
resourceRequests = append(resourceRequests, &eni.RemoteIPRequest{})
} else {
req := &eni.LocalIPRequest{}
req := eni.NewLocalIPRequest()

Check warning on line 211 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L211

Added line #L211 was not covered by tests

if len(oldRes.GetResourceItemByType(daemon.ResourceTypeENI)) == 1 {
old := oldRes.GetResourceItemByType(daemon.ResourceTypeENI)[0]
Expand Down
165 changes: 123 additions & 42 deletions pkg/eni/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
"sync"
"time"

"github.com/go-logr/logr"
"github.com/samber/lo"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/cache"
Expand Down Expand Up @@ -51,13 +53,24 @@

var _ ResourceRequest = &LocalIPRequest{}

func NewLocalIPRequest() *LocalIPRequest {
ctx, cancel := context.WithCancel(context.Background())
return &LocalIPRequest{
workerCtx: ctx,
cancel: cancel,
}
}

type LocalIPRequest struct {
NetworkInterfaceID string
LocalIPType string
IPv4 netip.Addr
IPv6 netip.Addr

NoCache bool // do not use cached ip

workerCtx context.Context
cancel context.CancelFunc
}

func (l *LocalIPRequest) ResourceType() ResourceType {
Expand Down Expand Up @@ -122,7 +135,9 @@
batchSize int

cap int
allocatingV4, allocatingV6 int
allocatingV4, allocatingV6 AllocatingRequests
// danging, used for release
dangingV4, dangingV6 AllocatingRequests

eni *daemon.ENI
ipAllocInhibitExpireAt time.Time
Expand Down Expand Up @@ -375,30 +390,30 @@
return nil, nil
}

lo, ok := request.(*LocalIPRequest)
localIPRequest, ok := request.(*LocalIPRequest)
if !ok {
return nil, []Trace{{Condition: ResourceTypeMismatch}}
}

if lo.NetworkInterfaceID != "" && l.eni != nil && l.eni.ID != lo.NetworkInterfaceID {
if localIPRequest.NetworkInterfaceID != "" && l.eni != nil && l.eni.ID != localIPRequest.NetworkInterfaceID {
return nil, []Trace{{Condition: NetworkInterfaceMismatch}}
}

log := logf.FromContext(ctx)
log.Info(fmt.Sprintf("local request %v", lo))
log := logr.FromContextOrDiscard(ctx)
log.Info(fmt.Sprintf("local request %v", localIPRequest))

expectV4 := 0
expectV6 := 0

if l.enableIPv4 {
if lo.NoCache {
if len(l.ipv4)+l.allocatingV4 >= l.cap {
if localIPRequest.NoCache {
if len(l.ipv4)+l.allocatingV4.Len() >= l.cap {
return nil, []Trace{{Condition: Full}}
}
expectV4 = 1
} else {
ipv4 := l.ipv4.PeekAvailable(cni.PodID)
if ipv4 == nil && len(l.ipv4)+l.allocatingV4 >= l.cap {
if ipv4 == nil && len(l.ipv4)+l.allocatingV4.Len() >= l.cap {
return nil, []Trace{{Condition: Full}}
} else if ipv4 == nil {
expectV4 = 1
Expand All @@ -407,14 +422,14 @@
}

if l.enableIPv6 {
if lo.NoCache {
if len(l.ipv6)+l.allocatingV6 >= l.cap {
if localIPRequest.NoCache {
if len(l.ipv6)+l.allocatingV6.Len() >= l.cap {
return nil, []Trace{{Condition: Full}}
}
expectV6 = 1
} else {
ipv6 := l.ipv6.PeekAvailable(cni.PodID)
if ipv6 == nil && len(l.ipv6)+l.allocatingV6 >= l.cap {
if ipv6 == nil && len(l.ipv6)+l.allocatingV6.Len() >= l.cap {

Check warning on line 432 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L432

Added line #L432 was not covered by tests
return nil, []Trace{{Condition: Full}}
} else if ipv6 == nil {
expectV6 = 1
Expand All @@ -427,21 +442,18 @@
return nil, []Trace{{Condition: InsufficientVSwitchIP, Reason: fmt.Sprintf("alloc inhibit, expire at %s", l.ipAllocInhibitExpireAt.String())}}
}

l.allocatingV4 += expectV4
l.allocatingV6 += expectV6
for i := 0; i < expectV4; i++ {
l.allocatingV4 = append(l.allocatingV4, localIPRequest)
}
for i := 0; i < expectV6; i++ {
l.allocatingV6 = append(l.allocatingV6, localIPRequest)
}

l.cond.Broadcast()

respCh := make(chan *AllocResp)

go l.allocWorker(ctx, cni, lo, respCh, func() {
// current roll back ip at same time
l.allocatingV4 -= expectV4
l.allocatingV4 = max(l.allocatingV4, 0)
l.allocatingV6 -= expectV6
l.allocatingV6 = max(l.allocatingV6, 0)
log.Info("rollback ipv4", "ipv4", expectV4)
})
go l.allocWorker(ctx, cni, localIPRequest, respCh)

return respCh, nil
}
Expand All @@ -461,7 +473,7 @@
return false, nil
}

log := logf.FromContext(ctx)
log := logr.FromContextOrDiscard(ctx)

if res.IP.IPv4.IsValid() {
l.ipv4.Release(cni.PodID, res.IP.IPv4)
Expand Down Expand Up @@ -509,13 +521,24 @@
}

// allocWorker started with each Allocate call
func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *LocalIPRequest, respCh chan *AllocResp, onErrLocked func()) {
func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *LocalIPRequest, respCh chan *AllocResp) {
done := make(chan struct{})
defer close(done)

l.cond.L.Lock()
defer l.cond.L.Unlock()

defer func() {
if request == nil {
return
}

l.switchIPv4(request)
l.switchIPv6(request)

request.cancel()
}()

go func() {
select {
case <-ctx.Done():
Expand All @@ -526,13 +549,11 @@
}
}()

log := logf.FromContext(ctx)
log := logr.FromContextOrDiscard(ctx)
for {
select {
case <-ctx.Done():
// parent cancel the context, so close the ch
onErrLocked()

close(respCh)
return
default:
Expand Down Expand Up @@ -592,7 +613,7 @@
func (l *Local) factoryAllocWorker(ctx context.Context) {
l.cond.L.Lock()

log := logf.FromContext(ctx)
log := logr.FromContextOrDiscard(ctx)
for {
if log.V(4).Enabled() {
log.V(4).Info("call allocWorker")
Expand All @@ -605,7 +626,7 @@
default:
}

if l.allocatingV4 <= 0 && l.allocatingV6 <= 0 {
if l.allocatingV4.Len() <= 0 && l.allocatingV6.Len() <= 0 {
l.cond.Wait()
continue
}
Expand All @@ -629,8 +650,8 @@

if l.eni == nil {
// create eni
v4Count := min(l.batchSize, max(l.allocatingV4, 1))
v6Count := min(l.batchSize, l.allocatingV6)
v4Count := min(l.batchSize, max(l.allocatingV4.Len(), 1))
v6Count := min(l.batchSize, l.allocatingV6.Len())

Check warning on line 654 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L653-L654

Added lines #L653 - L654 were not covered by tests

l.status = statusCreating
l.cond.L.Unlock()
Expand Down Expand Up @@ -669,11 +690,8 @@

l.eni = eni

l.allocatingV4 -= v4Count
l.allocatingV6 -= v6Count

l.allocatingV4 = max(l.allocatingV4, 0)
l.allocatingV6 = max(l.allocatingV6, 0)
l.popNIPv4Jobs(v4Count)
l.popNIPv6Jobs(v6Count)

Check warning on line 694 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L693-L694

Added lines #L693 - L694 were not covered by tests

primary, err := netip.ParseAddr(eni.PrimaryIP.IPv4.String())
if err == nil {
Expand All @@ -693,8 +711,8 @@
l.status = statusInUse
} else {
eniID := l.eni.ID
v4Count := min(l.batchSize, l.allocatingV4)
v6Count := min(l.batchSize, l.allocatingV6)
v4Count := min(l.batchSize, l.allocatingV4.Len())
v6Count := min(l.batchSize, l.allocatingV6.Len())

if v4Count > 0 {
l.cond.L.Unlock()
Expand All @@ -718,8 +736,7 @@
continue
}

l.allocatingV4 -= len(ipv4Set)
l.allocatingV4 = max(l.allocatingV4, 0)
l.popNIPv4Jobs(len(ipv4Set))

l.ipv4.PutValid(ipv4Set...)

Expand Down Expand Up @@ -751,8 +768,7 @@
continue
}

l.allocatingV6 -= len(ipv6Set)
l.allocatingV6 = max(l.allocatingV6, 0)
l.popNIPv6Jobs(len(ipv6Set))

l.ipv6.PutValid(ipv6Set...)

Expand Down Expand Up @@ -841,7 +857,7 @@
func (l *Local) factoryDisposeWorker(ctx context.Context) {
l.cond.L.Lock()

log := logf.FromContext(ctx)
log := logr.FromContextOrDiscard(ctx)

Check warning on line 860 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L860

Added line #L860 was not covered by tests
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -1062,3 +1078,68 @@
}
return parts[0], parts[1], nil
}

func (l *Local) switchIPv4(req *LocalIPRequest) {
found := false
l.allocatingV4 = lo.Filter(l.allocatingV4, func(item *LocalIPRequest, index int) bool {
if item != req {
// true to keep
return true
}
found = true
return false
})
if !found {
return
}

Check warning on line 1094 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L1093-L1094

Added lines #L1093 - L1094 were not covered by tests

if l.dangingV4.Len() == 0 {
// this may not happen
// call the Len() to make sure canceled job will be removed
return
}

Check warning on line 1100 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L1097-L1100

Added lines #L1097 - L1100 were not covered by tests
l.allocatingV4 = append(l.allocatingV4, l.dangingV4[0])
l.dangingV4 = l.dangingV4[1:]
}

func (l *Local) switchIPv6(req *LocalIPRequest) {
found := false
l.allocatingV6 = lo.Filter(l.allocatingV6, func(item *LocalIPRequest, index int) bool {
if item != req {
// true to keep
return true
}
found = true
return false
})
if !found {
return
}

Check warning on line 1117 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L1116-L1117

Added lines #L1116 - L1117 were not covered by tests

if l.dangingV6.Len() == 0 {
// this may not happen
return
}
l.allocatingV6 = append(l.allocatingV6, l.dangingV6[0])
l.dangingV6 = l.dangingV6[1:]
}

func (l *Local) popNIPv4Jobs(count int) {
firstPart, secondPart := Split(l.allocatingV4, count)
l.dangingV4 = append(l.dangingV4, firstPart...)
l.allocatingV4 = secondPart
}

func (l *Local) popNIPv6Jobs(count int) {
firstPart, secondPart := Split(l.allocatingV6, count)
l.dangingV6 = append(l.dangingV6, firstPart...)
l.allocatingV6 = secondPart
}

func Split[T any](arr []T, index int) ([]T, []T) {
if index < 0 || index > len(arr) {
return arr, nil
}

return arr[:index], arr[index:]
}
Loading
Loading