Skip to content

Commit

Permalink
Refurbish snapshoter code (#215)
Browse files Browse the repository at this point in the history
* Refurbish snapshoter code

* Move snap locks back on top of the functions

* Move lock to avoid blocking while calculating kube resources to snap

* Do not use atomic to lock when not needed
  • Loading branch information
ffilippopoulos authored Sep 18, 2024
1 parent 8ddb0ac commit d0d8b7c
Showing 1 changed file with 41 additions and 36 deletions.
77 changes: 41 additions & 36 deletions xds/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,26 +81,34 @@ type NodeSnapshotResources struct {

// Deep copy function for Node resources
func deepCopyNodeResources(src map[int64]*NodeSnapshotResources) map[int64]*NodeSnapshotResources {
dst := make(map[int64]*NodeSnapshotResources)
for sID, resources := range src {
r := &NodeSnapshotResources{
services: make(map[string][]types.Resource),
servicesNames: make(map[string][]string),
endpoints: make(map[string][]types.Resource),
endpointsNames: make(map[string][]string),
}
for k, v := range resources.services {
r.services[k] = append(r.services[k], v...)
}
for k, v := range resources.servicesNames {
r.servicesNames[k] = append(r.servicesNames[k], v...)
dst := make(map[int64]*NodeSnapshotResources, len(src))

copyMap := func(src, dst map[string][]types.Resource) {
for k, v := range src {
dst[k] = make([]types.Resource, len(v))
copy(dst[k], v)
}
for k, v := range resources.endpoints {
r.endpoints[k] = append(r.endpoints[k], v...)
}
copyStringMap := func(src, dst map[string][]string) {
for k, v := range src {
dst[k] = make([]string, len(v))
copy(dst[k], v)
}
for k, v := range resources.endpointsNames {
r.endpointsNames[k] = append(r.endpointsNames[k], v...)
}

for sID, resources := range src {
r := &NodeSnapshotResources{
services: make(map[string][]types.Resource, len(resources.services)),
servicesNames: make(map[string][]string, len(resources.servicesNames)),
endpoints: make(map[string][]types.Resource, len(resources.endpoints)),
endpointsNames: make(map[string][]string, len(resources.endpointsNames)),
}

copyMap(resources.services, r.services)
copyStringMap(resources.servicesNames, r.servicesNames)
copyMap(resources.endpoints, r.endpoints)
copyStringMap(resources.endpointsNames, r.endpointsNames)

dst[sID] = r
}
return dst
Expand Down Expand Up @@ -169,8 +177,6 @@ func (s *Snapshotter) NodesMap() map[string]string {
// snapshots.
func (s *Snapshotter) SnapServices(serviceStore XdsServiceStore) error {
ctx := context.Background()
s.snapNodesMu.Lock()
defer s.snapNodesMu.Unlock()
cls, rds, lsnr, err := servicesToResources(serviceStore, "")
if err != nil {
return fmt.Errorf("Failed to snapshot Services: %v", err)
Expand All @@ -181,24 +187,22 @@ func (s *Snapshotter) SnapServices(serviceStore XdsServiceStore) error {
if err != nil {
return fmt.Errorf("Failed to snapshot Services: %v", err)
}
for _, c := range xdstpCLS {
cls = append(cls, c)
}
for _, r := range xdstpRDS {
rds = append(rds, r)
}
for _, l := range xdstpLSNR {
lsnr = append(lsnr, l)
}
cls = append(cls, xdstpCLS...)
rds = append(rds, xdstpRDS...)
lsnr = append(lsnr, xdstpLSNR...)
}

atomic.AddInt32(&s.serviceSnapVersion, 1)
s.snapNodesMu.Lock()
defer s.snapNodesMu.Unlock()
s.serviceSnapVersion += int32(1)
resources := map[string][]types.Resource{
resource.ClusterType: cls,
resource.ListenerType: lsnr,
resource.RouteType: rds,
}
snapshot, err := cache.NewSnapshot(fmt.Sprint(s.serviceSnapVersion), resources)
if err != nil {
return fmt.Errorf("Failed to create service snapshot: %v", err)
}
err = s.servicesCache.SetSnapshot(ctx, EmptyNodeID, snapshot)
if err != nil {
return fmt.Errorf("Failed to set services snapshot %v", err)
Expand Down Expand Up @@ -229,8 +233,6 @@ func (s *Snapshotter) SnapServices(serviceStore XdsServiceStore) error {
// endoints snapshots.
func (s *Snapshotter) SnapEndpoints(endpointStore XdsEndpointStore) error {
ctx := context.Background()
s.snapNodesMu.Lock()
defer s.snapNodesMu.Unlock()
eds, err := endpointSlicesToClusterLoadAssignments(endpointStore, "")
if err != nil {
return fmt.Errorf("Failed to snapshot EndpointSlices: %v", err)
Expand All @@ -241,15 +243,18 @@ func (s *Snapshotter) SnapEndpoints(endpointStore XdsEndpointStore) error {
if err != nil {
return fmt.Errorf("Failed to snapshot EndpointSlices: %v", err)
}
for _, e := range xdstpEDS {
eds = append(eds, e)
}
eds = append(eds, xdstpEDS...)
}
atomic.AddInt32(&s.endpointsSnapVersion, 1)
s.snapNodesMu.Lock()
defer s.snapNodesMu.Unlock()
s.endpointsSnapVersion += int32(1)
resources := map[string][]types.Resource{
resource.EndpointType: eds,
}
snapshot, err := cache.NewSnapshot(fmt.Sprint(s.endpointsSnapVersion), resources)
if err != nil {
return fmt.Errorf("Failed to create endpoints snapshot: %v", err)
}
err = s.endpointsCache.SetSnapshot(ctx, EmptyNodeID, snapshot)
if err != nil {
return fmt.Errorf("Failed to set endpoints snapshot %v", err)
Expand Down

0 comments on commit d0d8b7c

Please sign in to comment.