Skip to content

Commit

Permalink
feat: update normal sync pool service in vip loadbalancer
Browse files Browse the repository at this point in the history
Signed-off-by: wangxye <[email protected]>
  • Loading branch information
wangxye committed Aug 6, 2024
1 parent e0cca83 commit 68803c7
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ type IPVRID struct {
}

type IPManager struct {
IPRanges []string
// ipPools indicates if ip is assign
ipPools map[string]int
IPPools map[string]int
// ipVRIDs indicates which IPs are assigned to vrid
ipVRIDs map[int][]string
IPVRIDs map[int][]string
}

func NewIPVRID(ips []string, vrid int) IPVRID {
Expand All @@ -46,15 +47,11 @@ func NewIPVRID(ips []string, vrid int) IPVRID {
}
}

func NewIPManager(ipRanges string) (*IPManager, error) {
func NewIPManager(ipr []string) (*IPManager, error) {
manager := &IPManager{
ipPools: map[string]int{},
ipVRIDs: make(map[int][]string),
}

iprs := ParseIP(ipRanges)
for _, ipr := range iprs {
manager.ipPools[ipr] = VRIDEVICTED
IPRanges: ipr,
IPPools: make(map[string]int),
IPVRIDs: make(map[int][]string),
}

return manager, nil
Expand Down Expand Up @@ -101,11 +98,11 @@ func incrementIP(ip net.IP) net.IP {
// Get return a IPVRID with a available IP and VRID combination
func (m *IPManager) Get() (IPVRID, error) {
for vrid := 0; vrid < VRIDMAXVALUE; vrid++ {
if ips, ok := m.ipVRIDs[vrid]; !ok || len(ips) == 0 {
for ip, used := range m.ipPools {
if ips, ok := m.IPVRIDs[vrid]; !ok || len(ips) == 0 {
for ip, used := range m.IPPools {
if used == VRIDEVICTED {
m.ipPools[ip] = vrid
m.ipVRIDs[vrid] = []string{ip}
m.IPPools[ip] = vrid
m.IPVRIDs[vrid] = []string{ip}
return IPVRID{IPs: []string{ip}, VRID: vrid}, nil
}
}
Expand All @@ -119,71 +116,72 @@ func (m *IPManager) Get() (IPVRID, error) {
func (m *IPManager) Assign(ips []string) (IPVRID, error) {
var noConflictIPs []string
for _, ip := range ips {
// if conflict, just use no conflict
if m.ipPools[ip] != VRIDEVICTED {
// if conflicted, just use no conflict
if _, ok := m.IPPools[ip]; ok {
continue
}
noConflictIPs = append(noConflictIPs, ip)
}

// if no avalible ip, get a new ipvrid
// if no available ip, get a new ipvrid
if len(noConflictIPs) == 0 {
return m.Get()
}
var vrid int
for ; vrid < VRIDMAXVALUE; vrid++ {
if _, ok := m.ipVRIDs[vrid]; !ok {
m.ipVRIDs[vrid] = append(m.ipVRIDs[vrid], noConflictIPs...)
if _, ok := m.IPVRIDs[vrid]; ok {
continue
}

for _, ip := range noConflictIPs {
m.ipPools[ip] = vrid
}
break
m.IPVRIDs[vrid] = append(m.IPVRIDs[vrid], noConflictIPs...)
for _, ip := range noConflictIPs {
m.IPPools[ip] = vrid
}
break
}

// Get fully vrid-ips pair
return IPVRID{VRID: vrid, IPs: m.ipVRIDs[vrid]}, nil
return IPVRID{VRID: vrid, IPs: m.IPVRIDs[vrid]}, nil
}

// Release release ips from vrid, if vrid is not assigned, return error
// Release ips from vrid, if vrid is not assigned, return error
func (m *IPManager) Release(ipVRID IPVRID) error {
if err := m.IsValid(ipVRID); err != nil {
return err
}

if _, ok := m.ipVRIDs[ipVRID.VRID]; !ok {
if _, ok := m.IPVRIDs[ipVRID.VRID]; !ok {
return fmt.Errorf("VRID %d does not assign ips", ipVRID.VRID)
}
remain := make([]string, len(m.ipVRIDs[ipVRID.VRID])-len(ipVRID.IPs))
remain := make([]string, len(m.IPVRIDs[ipVRID.VRID])-len(ipVRID.IPs))

for _, ip := range m.ipVRIDs[ipVRID.VRID] {
for _, ip := range m.IPVRIDs[ipVRID.VRID] {
if m.isIPPresent(ip, ipVRID.IPs) {
continue
}

remain = append(remain, ip)
}

if len(remain) == len(m.ipVRIDs[ipVRID.VRID]) {
if len(remain) == len(m.IPVRIDs[ipVRID.VRID]) {
return fmt.Errorf("IP %v is not assigned", ipVRID.IPs)
}

for _, ip := range remain {
m.ipPools[ip] = VRIDEVICTED
m.IPPools[ip] = VRIDEVICTED
}

return nil
}

// check if ip and vrid is valid in this ip-pools, if not return error
// IsValid check if ip and vrid is valid in this ip-pools, if not return error
func (m *IPManager) IsValid(ipvrid IPVRID) error {
if len(ipvrid.IPs) == 0 {
return fmt.Errorf("IPs is empty")
}

for _, ip := range ipvrid.IPs {
if _, ok := m.ipPools[ip]; !ok {
if _, ok := m.IPPools[ip]; !ok {
return fmt.Errorf("IP: %s is not found in IP-Pools", ip)
}
}
Expand All @@ -203,15 +201,15 @@ func (m *IPManager) Sync(ipVRIDs []IPVRID) error {
ips := ipVRID.IPs
vrid := ipVRID.VRID

app, del := m.findDiffIPs(ips, m.ipVRIDs[vrid])
app, del := m.findDiffIPs(ips, m.IPVRIDs[vrid])

for _, ip := range del {
m.ipPools[ip] = VRIDEVICTED
m.IPPools[ip] = VRIDEVICTED
}

m.ipVRIDs[vrid] = ips
m.IPVRIDs[vrid] = ips
for _, ip := range app {
m.ipPools[ip] = vrid
m.IPPools[ip] = vrid
}

}
Expand All @@ -222,13 +220,13 @@ func (m *IPManager) Sync(ipVRIDs []IPVRID) error {
// findDiffIPs find the difference between des and cur, return the difference between des and cur
func (m *IPManager) findDiffIPs(des, cur []string) (app, del []string) {
for _, dip := range des {
if exsit := m.isIPPresent(dip, cur); !exsit {
if exist := m.isIPPresent(dip, cur); !exist {
app = append(app, dip)
}
}

for _, cip := range cur {
if exsit := m.isIPPresent(cip, des); !exsit {
if exist := m.isIPPresent(cip, des); !exist {
del = append(del, cip)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

func TestIPMAnager(t *testing.T) {
ipRanges := "192.168.0.1-192.168.1.5, 10.0.0.1-10.0.0.3"
manager, err := vip.NewIPManager(ipRanges)
manager, err := vip.NewIPManager(vip.ParseIP(ipRanges))
if err != nil {
t.Fatalf("Failed to create IPManager: %v", err)
}
Expand Down Expand Up @@ -54,7 +54,8 @@ func TestIPMAnager(t *testing.T) {

t.Run("get ip when none are available", func(t *testing.T) {
// Test getting IPVRID when none are available
m, err := vip.NewIPManager("192.168.0.1")
ipr := "192.168.0.1"
m, err := vip.NewIPManager(vip.ParseIP(ipr))
if err != nil {
t.Errorf("Failed to create IPManager: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,19 @@ type ReconcileVipLoadBalancer struct {
recorder record.EventRecorder
mapper meta.RESTMapper

Configration config.VipLoadBalancerControllerConfiguration
IPManagers map[string]*IPManager
Configuration config.VipLoadBalancerControllerConfiguration
IPManagers map[string]*IPManager
}

// newReconciler returns a new reconcile.Reconciler
func newReconciler(c *appconfig.CompletedConfig, mgr manager.Manager) *ReconcileVipLoadBalancer {
return &ReconcileVipLoadBalancer{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
mapper: mgr.GetRESTMapper(),
recorder: mgr.GetEventRecorderFor(names.VipLoadBalancerController),
Configration: c.ComponentConfig.VipLoadBalancerController,
IPManagers: make(map[string]*IPManager),
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
mapper: mgr.GetRESTMapper(),
recorder: mgr.GetEventRecorderFor(names.VipLoadBalancerController),
Configuration: c.ComponentConfig.VipLoadBalancerController,
IPManagers: make(map[string]*IPManager),
}
}

Expand Down Expand Up @@ -154,6 +154,7 @@ func (r *ReconcileVipLoadBalancer) Reconcile(ctx context.Context, request reconc

copyPoolService := poolService.DeepCopy()

// Check if the PoolService instance is deleted
if poolService.DeletionTimestamp != nil {
return r.reconcileDelete(ctx, copyPoolService)
}
Expand Down Expand Up @@ -185,7 +186,7 @@ func (r *ReconcileVipLoadBalancer) reconcilePoolService(ctx context.Context, poo
func (r *ReconcileVipLoadBalancer) syncPoolService(ctx context.Context, poolService *netv1alpha1.PoolService) error {
klog.V(4).Infof(Format("SyncPoolServices VipLoadBalancer %s/%s", poolService.Namespace, poolService.Name))

// sync VRID from the poolservice
// sync IP-VRID pairs from the recently poolservice
if err := r.syncIPVRIDs(ctx, poolService); err != nil {
klog.Errorf(Format("Failed to sync VRID on Pool Service %s/%s: %v", poolService.Namespace, poolService.Name, err))
return err
Expand All @@ -206,45 +207,61 @@ func (r *ReconcileVipLoadBalancer) syncPoolService(ctx context.Context, poolServ

func (r *ReconcileVipLoadBalancer) syncIPVRIDs(ctx context.Context, poolService *netv1alpha1.PoolService) error {
poolName := poolService.Labels[network.LabelNodePoolName]
poolAddress, err := r.getCurrentPoolAddress(ctx, poolService)
// Get the pool address from nodepool label
if err := r.syncIPAddressPools(ctx, poolName); err != nil {
return fmt.Errorf("failed to sync ip addresses from nodepool: %v", err)
}

currentIPVRIDs, err := r.getCurrentAssignedIPVRIDs(ctx, poolService)
if err != nil {
return fmt.Errorf("failed to get avalible Pool address of nodepool: %v", err)
return fmt.Errorf("failed to get current PoolServices: %v", err)
}

// if nodepool has not address-pools
// Sync the IPVRIDs
r.IPManagers[poolName].Sync(currentIPVRIDs)
return nil
}

func (r *ReconcileVipLoadBalancer) syncIPAddressPools(ctx context.Context, poolName string) error {
poolAddress, err := r.getCurrentPoolAddress(ctx, poolName)
if err != nil {
return fmt.Errorf("failed to get available Pool address of nodepool: %v", err)
}

// check if nodepool has not address-pools
if _, ok := r.IPManagers[poolName]; !ok {
r.IPManagers[poolName], err = NewIPManager(poolAddress)
if err != nil {
return fmt.Errorf("failed to create IPManager for nodepool %s: %v", poolName, err)
}
}

currentIPVRIDs, err := r.getCurrentIPVRIDs(ctx, poolService)
if err != nil {
return fmt.Errorf("failed to get current PoolServices: %v", err)
// TODO: if user update poolAddress label in the nodepool
if poolAddress != nil && strings.Join(poolAddress, ",") != strings.Join(r.IPManagers[poolName].IPRanges, ",") {
klog.Infof(Format("NodePool: %s 's IP address pool has been updated, please delete it and reconfigure it", poolName))
}

// Sync the IPVRIDs
r.IPManagers[poolService.Labels[network.LabelNodePoolName]].Sync(currentIPVRIDs)

return nil
}

func (r *ReconcileVipLoadBalancer) getCurrentPoolAddress(ctx context.Context, poolService *netv1alpha1.PoolService) (string, error) {
func (r *ReconcileVipLoadBalancer) getCurrentPoolAddress(ctx context.Context, poolName string) ([]string, error) {
np := &v1beta1.NodePool{}
if err := r.Get(ctx, client.ObjectKey{Name: poolService.Labels[network.LabelNodePoolName]}, np); err != nil {
return "", err

// Get All NodePools from the cluster by nodepool name
if err := r.Get(ctx, client.ObjectKey{Name: poolName}, np); err != nil {
return nil, err
}

// Check if the NodePool has address pools
if np.Annotations == nil {
return "", fmt.Errorf("NodePool %s doesn't have annotations", np.Name)
return nil, fmt.Errorf("NodePool %s doesn't have not available annotations", np.Name)
}

if _, ok := np.Annotations[AnnotationNodePoolAddressPools]; !ok {
return "", fmt.Errorf("NodePool %s doesn't have address pools", np.Name)
if poolAddress, ok := np.Annotations[AnnotationNodePoolAddressPools]; ok {
return ParseIP(poolAddress), nil
}

return np.Annotations[AnnotationNodePoolAddressPools], nil
return nil, fmt.Errorf("NodePool %s is not assigned address pools", np.Name)
}

func (r *ReconcileVipLoadBalancer) syncPoolServiceStatus(ctx context.Context, poolService *netv1alpha1.PoolService) error {
Expand All @@ -257,7 +274,7 @@ func (r *ReconcileVipLoadBalancer) syncPoolServiceStatus(ctx context.Context, po

desiredLbStatus, err := r.desiredLbStatus(poolService)
if err != nil {
return fmt.Errorf("failed to calculate desire lb stattus for poolservice %s/%s: %v", poolService.Namespace, poolService.Name, err)
return fmt.Errorf("failed to calculate desire lb status for poolservice %s/%s: %v", poolService.Namespace, poolService.Name, err)
}

poolService.Status.LoadBalancer = desiredLbStatus
Expand Down Expand Up @@ -291,7 +308,7 @@ func (r *ReconcileVipLoadBalancer) desiredLbStatus(poolService *netv1alpha1.Pool
}, nil
}

func (r *ReconcileVipLoadBalancer) getCurrentIPVRIDs(ctx context.Context, poolService *netv1alpha1.PoolService) ([]IPVRID, error) {
func (r *ReconcileVipLoadBalancer) getCurrentAssignedIPVRIDs(ctx context.Context, poolService *netv1alpha1.PoolService) ([]IPVRID, error) {
// Get the poolservice list
listSelector := &client.ListOptions{
LabelSelector: labels.SelectorFromSet(map[string]string{
Expand Down Expand Up @@ -334,11 +351,7 @@ func filterInvalidPoolService(poolServices []netv1alpha1.PoolService) []IPVRID {
}

func (r *ReconcileVipLoadBalancer) checkIPVRIDs(poolService netv1alpha1.PoolService) (*IPVRID, error) {
if !r.containsVRID(&poolService) {
// not have VRID
return nil, fmt.Errorf("PoolService %s/%s doesn't have VRID", poolService.Namespace, poolService.Name)
}

// check if the poolservice has vrid-ips
ipvrid, err := r.isValidIPVRID(&poolService)
if err != nil {
// ip-vrid is invalid
Expand All @@ -362,10 +375,15 @@ func (r *ReconcileVipLoadBalancer) containsVRID(poolService *netv1alpha1.PoolSer
}

func (r *ReconcileVipLoadBalancer) isValidIPVRID(poolService *netv1alpha1.PoolService) (*IPVRID, error) {
poolName := poolService.Labels[network.LabelNodePoolName]
// check if the poolservice has vrid
if !r.containsVRID(poolService) {
// not have VRID
return nil, fmt.Errorf("PoolService %s/%s doesn't have VRID", poolService.Namespace, poolService.Name)
}

vrid, err := strconv.Atoi(poolService.Annotations[AnnotationVipLoadBalancerVRID])
if err != nil {
return nil, fmt.Errorf("invalid VRID: %v", err)
return nil, fmt.Errorf("PoolService %s/%s has invalid VRID: %v", poolService.Namespace, poolService.Name, err)
}

ips := []string{}
Expand All @@ -376,17 +394,17 @@ func (r *ReconcileVipLoadBalancer) isValidIPVRID(poolService *netv1alpha1.PoolSe
}

ipvrid := NewIPVRID(ips, vrid)
if err := r.IPManagers[poolName].IsValid(ipvrid); err != nil {
if err := r.IPManagers[poolService.Labels[network.LabelNodePoolName]].IsValid(ipvrid); err != nil {
return nil, fmt.Errorf("VRID: %d is not valid: %v", vrid, err)
}

return &ipvrid, nil
}

func (r *ReconcileVipLoadBalancer) handleIPVRIDs(ctx context.Context, poolService *netv1alpha1.PoolService) error {
// Check if the PoolService has a VRID
_, err := r.checkIPVRIDs(*poolService)
if err == nil {
// Check if the PoolService has a available VRID
if _, err := r.checkIPVRIDs(*poolService); err == nil {
// If yes, use the user-specified vrid
return nil
}

Expand Down
Loading

0 comments on commit 68803c7

Please sign in to comment.