Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add the deadline for the allocating requests #748

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@
if pod.PodENI {
resourceRequests = append(resourceRequests, &eni.RemoteIPRequest{})
} else {
req := &eni.LocalIPRequest{}
req := eni.NewLocalIPRequest()

Check warning on line 198 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L198

Added line #L198 was not covered by tests
if pod.ERdma {
req.LocalIPType = eni.LocalIPTypeERDMA
}
Expand All @@ -213,7 +213,7 @@
if pod.PodENI || n.ipamType == types.IPAMTypeCRD {
resourceRequests = append(resourceRequests, &eni.RemoteIPRequest{})
} else {
req := &eni.LocalIPRequest{}
req := eni.NewLocalIPRequest()

Check warning on line 216 in daemon/daemon.go

View check run for this annotation

Codecov / codecov/patch

daemon/daemon.go#L216

Added line #L216 was not covered by tests

if len(oldRes.GetResourceItemByType(daemon.ResourceTypeENI)) == 1 {
old := oldRes.GetResourceItemByType(daemon.ResourceTypeENI)[0]
Expand Down
165 changes: 123 additions & 42 deletions pkg/eni/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
"sync"
"time"

"github.com/go-logr/logr"
"github.com/samber/lo"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/cache"
Expand Down Expand Up @@ -51,13 +53,24 @@

var _ ResourceRequest = &LocalIPRequest{}

func NewLocalIPRequest() *LocalIPRequest {
ctx, cancel := context.WithCancel(context.Background())
return &LocalIPRequest{
workerCtx: ctx,
cancel: cancel,
}
}

type LocalIPRequest struct {
NetworkInterfaceID string
LocalIPType string
IPv4 netip.Addr
IPv6 netip.Addr

NoCache bool // do not use cached ip

workerCtx context.Context
cancel context.CancelFunc
}

func (l *LocalIPRequest) ResourceType() ResourceType {
Expand Down Expand Up @@ -121,7 +134,9 @@
batchSize int

cap int
allocatingV4, allocatingV6 int
allocatingV4, allocatingV6 AllocatingRequests
// danging, used for release
dangingV4, dangingV6 AllocatingRequests

eni *daemon.ENI
ipAllocInhibitExpireAt time.Time
Expand Down Expand Up @@ -395,30 +410,30 @@
return nil, nil
}

lo, ok := request.(*LocalIPRequest)
localIPRequest, ok := request.(*LocalIPRequest)
if !ok {
return nil, []Trace{{Condition: ResourceTypeMismatch}}
}

if lo.NetworkInterfaceID != "" && l.eni != nil && l.eni.ID != lo.NetworkInterfaceID {
if localIPRequest.NetworkInterfaceID != "" && l.eni != nil && l.eni.ID != localIPRequest.NetworkInterfaceID {
return nil, []Trace{{Condition: NetworkInterfaceMismatch}}
}

log := logf.FromContext(ctx)
log.Info(fmt.Sprintf("local request %v", lo))
log := logr.FromContextOrDiscard(ctx)
log.Info(fmt.Sprintf("local request %v", localIPRequest))

expectV4 := 0
expectV6 := 0

if l.enableIPv4 {
if lo.NoCache {
if len(l.ipv4)+l.allocatingV4 >= l.cap {
if localIPRequest.NoCache {
if len(l.ipv4)+l.allocatingV4.Len() >= l.cap {
return nil, []Trace{{Condition: Full}}
}
expectV4 = 1
} else {
ipv4 := l.ipv4.PeekAvailable(cni.PodID)
if ipv4 == nil && len(l.ipv4)+l.allocatingV4 >= l.cap {
if ipv4 == nil && len(l.ipv4)+l.allocatingV4.Len() >= l.cap {
return nil, []Trace{{Condition: Full}}
} else if ipv4 == nil {
expectV4 = 1
Expand All @@ -427,14 +442,14 @@
}

if l.enableIPv6 {
if lo.NoCache {
if len(l.ipv6)+l.allocatingV6 >= l.cap {
if localIPRequest.NoCache {
if len(l.ipv6)+l.allocatingV6.Len() >= l.cap {
return nil, []Trace{{Condition: Full}}
}
expectV6 = 1
} else {
ipv6 := l.ipv6.PeekAvailable(cni.PodID)
if ipv6 == nil && len(l.ipv6)+l.allocatingV6 >= l.cap {
if ipv6 == nil && len(l.ipv6)+l.allocatingV6.Len() >= l.cap {

Check warning on line 452 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L452

Added line #L452 was not covered by tests
return nil, []Trace{{Condition: Full}}
} else if ipv6 == nil {
expectV6 = 1
Expand All @@ -447,21 +462,18 @@
return nil, []Trace{{Condition: InsufficientVSwitchIP, Reason: fmt.Sprintf("alloc inhibit, expire at %s", l.ipAllocInhibitExpireAt.String())}}
}

l.allocatingV4 += expectV4
l.allocatingV6 += expectV6
for i := 0; i < expectV4; i++ {
l.allocatingV4 = append(l.allocatingV4, localIPRequest)
}
for i := 0; i < expectV6; i++ {
l.allocatingV6 = append(l.allocatingV6, localIPRequest)
}

l.cond.Broadcast()

respCh := make(chan *AllocResp)

go l.allocWorker(ctx, cni, lo, respCh, func() {
// current roll back ip at same time
l.allocatingV4 -= expectV4
l.allocatingV4 = max(l.allocatingV4, 0)
l.allocatingV6 -= expectV6
l.allocatingV6 = max(l.allocatingV6, 0)
log.Info("rollback ipv4", "ipv4", expectV4)
})
go l.allocWorker(ctx, cni, localIPRequest, respCh)

return respCh, nil
}
Expand All @@ -481,7 +493,7 @@
return false
}

log := logf.FromContext(ctx)
log := logr.FromContextOrDiscard(ctx)

if res.IP.IPv4.IsValid() {
l.ipv4.Release(cni.PodID, res.IP.IPv4)
Expand Down Expand Up @@ -529,13 +541,24 @@
}

// allocWorker started with each Allocate call
func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *LocalIPRequest, respCh chan *AllocResp, onErrLocked func()) {
func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *LocalIPRequest, respCh chan *AllocResp) {
done := make(chan struct{})
defer close(done)

l.cond.L.Lock()
defer l.cond.L.Unlock()

defer func() {
if request == nil {
return
}

l.switchIPv4(request)
l.switchIPv6(request)

request.cancel()
}()

go func() {
select {
case <-ctx.Done():
Expand All @@ -546,13 +569,11 @@
}
}()

log := logf.FromContext(ctx)
log := logr.FromContextOrDiscard(ctx)
for {
select {
case <-ctx.Done():
// parent cancel the context, so close the ch
onErrLocked()

close(respCh)
return
default:
Expand Down Expand Up @@ -612,7 +633,7 @@
func (l *Local) factoryAllocWorker(ctx context.Context) {
l.cond.L.Lock()

log := logf.FromContext(ctx)
log := logr.FromContextOrDiscard(ctx)
for {
if log.V(4).Enabled() {
log.V(4).Info("call allocWorker")
Expand All @@ -625,7 +646,7 @@
default:
}

if l.allocatingV4 <= 0 && l.allocatingV6 <= 0 {
if l.allocatingV4.Len() <= 0 && l.allocatingV6.Len() <= 0 {
l.cond.Wait()
continue
}
Expand All @@ -649,8 +670,8 @@

if l.eni == nil {
// create eni
v4Count := min(l.batchSize, max(l.allocatingV4, 1))
v6Count := min(l.batchSize, l.allocatingV6)
v4Count := min(l.batchSize, max(l.allocatingV4.Len(), 1))
v6Count := min(l.batchSize, l.allocatingV6.Len())

Check warning on line 674 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L673-L674

Added lines #L673 - L674 were not covered by tests

l.status = statusCreating
l.cond.L.Unlock()
Expand Down Expand Up @@ -689,11 +710,8 @@

l.eni = eni

l.allocatingV4 -= v4Count
l.allocatingV6 -= v6Count

l.allocatingV4 = max(l.allocatingV4, 0)
l.allocatingV6 = max(l.allocatingV6, 0)
l.popNIPv4Jobs(v4Count)
l.popNIPv6Jobs(v6Count)

Check warning on line 714 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L713-L714

Added lines #L713 - L714 were not covered by tests

primary, err := netip.ParseAddr(eni.PrimaryIP.IPv4.String())
if err == nil {
Expand All @@ -713,8 +731,8 @@
l.status = statusInUse
} else {
eniID := l.eni.ID
v4Count := min(l.batchSize, l.allocatingV4)
v6Count := min(l.batchSize, l.allocatingV6)
v4Count := min(l.batchSize, l.allocatingV4.Len())
v6Count := min(l.batchSize, l.allocatingV6.Len())

if v4Count > 0 {
l.cond.L.Unlock()
Expand All @@ -738,8 +756,7 @@
continue
}

l.allocatingV4 -= len(ipv4Set)
l.allocatingV4 = max(l.allocatingV4, 0)
l.popNIPv4Jobs(len(ipv4Set))

l.ipv4.PutValid(ipv4Set...)

Expand Down Expand Up @@ -771,8 +788,7 @@
continue
}

l.allocatingV6 -= len(ipv6Set)
l.allocatingV6 = max(l.allocatingV6, 0)
l.popNIPv6Jobs(len(ipv6Set))

l.ipv6.PutValid(ipv6Set...)

Expand Down Expand Up @@ -861,7 +877,7 @@
func (l *Local) factoryDisposeWorker(ctx context.Context) {
l.cond.L.Lock()

log := logf.FromContext(ctx)
log := logr.FromContextOrDiscard(ctx)

Check warning on line 880 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L880

Added line #L880 was not covered by tests
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -1082,3 +1098,68 @@
}
return parts[0], parts[1], nil
}

func (l *Local) switchIPv4(req *LocalIPRequest) {
found := false
l.allocatingV4 = lo.Filter(l.allocatingV4, func(item *LocalIPRequest, index int) bool {
if item != req {
// true to keep
return true
}
found = true
return false
})
if !found {
return
}

Check warning on line 1114 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L1113-L1114

Added lines #L1113 - L1114 were not covered by tests

if l.dangingV4.Len() == 0 {
// this may not happen
// call the Len() to make sure canceled job will be removed
return
}

Check warning on line 1120 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L1117-L1120

Added lines #L1117 - L1120 were not covered by tests
l.allocatingV4 = append(l.allocatingV4, l.dangingV4[0])
l.dangingV4 = l.dangingV4[1:]
}

func (l *Local) switchIPv6(req *LocalIPRequest) {
found := false
l.allocatingV6 = lo.Filter(l.allocatingV6, func(item *LocalIPRequest, index int) bool {
if item != req {
// true to keep
return true
}
found = true
return false
})
if !found {
return
}

Check warning on line 1137 in pkg/eni/local.go

View check run for this annotation

Codecov / codecov/patch

pkg/eni/local.go#L1136-L1137

Added lines #L1136 - L1137 were not covered by tests

if l.dangingV6.Len() == 0 {
// this may not happen
return
}
l.allocatingV6 = append(l.allocatingV6, l.dangingV6[0])
l.dangingV6 = l.dangingV6[1:]
}

func (l *Local) popNIPv4Jobs(count int) {
firstPart, secondPart := Split(l.allocatingV4, count)
l.dangingV4 = append(l.dangingV4, firstPart...)
l.allocatingV4 = secondPart
}

func (l *Local) popNIPv6Jobs(count int) {
firstPart, secondPart := Split(l.allocatingV6, count)
l.dangingV6 = append(l.dangingV6, firstPart...)
l.allocatingV6 = secondPart
}

func Split[T any](arr []T, index int) ([]T, []T) {
if index < 0 || index > len(arr) {
return arr, nil
}

return arr[:index], arr[index:]
}
Loading
Loading