Skip to content

Commit

Permalink
Merge pull request #1071 from askervin/5Ql_balloons_threadsfromdiffer…
Browse files Browse the repository at this point in the history
…entcores

balloons: PreferSpreadOnPhysicalCores & balloon-specific CPU allocator
  • Loading branch information
klihub authored Oct 13, 2023
2 parents 2fffffe + 1bccf56 commit b81663d
Show file tree
Hide file tree
Showing 7 changed files with 434 additions and 31 deletions.
12 changes: 12 additions & 0 deletions docs/policy/balloons.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ Balloons policy parameters:
pack new balloons tightly into the same NUMAs/dies/packages. This
helps keeping large portions of hardware idle and entering into deep
power saving states.
- `PreferSpreadOnPhysicalCores` prefers allocating logical CPUs
(possibly hyperthreads) for a balloon from separate physical CPU
cores. This prevents workloads in the balloon from interfering with
themselves as they do not compete on the resources of the same CPU
cores. On the other hand, it allows more interference between
workloads in different balloons. The default is `false`: balloons
are packed tightly to a minimum number of physical CPU cores. The
value set here is the default for all balloon types, but it can be
overridden with the balloon type specific setting with the same
name.
- `BalloonTypes` is a list of balloon type definitions. Each type can
be configured with the following parameters:
- `Name` of the balloon type. This is used in pod annotations to
Expand Down Expand Up @@ -135,6 +145,8 @@ Balloons policy parameters:
- `numa`: ...in the same numa node(s) as the balloon.
- `core`: ...allowed to use idle CPU threads in the same cores with
the balloon.
- `PreferSpreadOnPhysicalCores` overrides the policy level option
with the same name in the scope of this balloon type.
- `AllocatorPriority` (0: High, 1: Normal, 2: Low, 3: None). CPU
allocator parameter, used when creating new or resizing existing
balloons. If there are balloon types with pre-created balloons
Expand Down
43 changes: 32 additions & 11 deletions pkg/cri/resource-manager/policy/builtin/balloons/balloons-policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ type Balloon struct {
// - len(PodIDs) is the number of pods in the balloon.
// - len(PodIDs[podID]) is the number of containers of podID
// currently assigned to the balloon.
PodIDs map[string][]string
PodIDs map[string][]string
cpuTreeAllocator *cpuTreeAllocator
}

var log logger.Logger = logger.NewLogger("policy")
Expand Down Expand Up @@ -544,14 +545,32 @@ func (p *balloons) newBalloon(blnDef *BalloonDef, confCpus bool) (*Balloon, erro
break
}
}
// Configure new cpuTreeAllocator for this balloon if there
// are type specific allocator options, otherwise use policy
// default allocator.
cpuTreeAllocator := p.cpuTreeAllocator
if blnDef.AllocatorTopologyBalancing != nil || blnDef.PreferSpreadOnPhysicalCores != nil {
allocatorOptions := cpuTreeAllocatorOptions{
topologyBalancing: p.bpoptions.AllocatorTopologyBalancing,
preferSpreadOnPhysicalCores: p.bpoptions.PreferSpreadOnPhysicalCores,
}
if blnDef.AllocatorTopologyBalancing != nil {
allocatorOptions.topologyBalancing = *blnDef.AllocatorTopologyBalancing
}
if blnDef.PreferSpreadOnPhysicalCores != nil {
allocatorOptions.preferSpreadOnPhysicalCores = *blnDef.PreferSpreadOnPhysicalCores
}
cpuTreeAllocator = p.cpuTree.NewAllocator(allocatorOptions)
}

// Allocate CPUs
if blnDef == p.reservedBalloonDef ||
(blnDef == p.defaultBalloonDef && blnDef.MinCpus == 0 && blnDef.MaxCpus == 0) {
// The reserved balloon uses ReservedResources CPUs.
// So does the default balloon unless its CPU counts are tweaked.
cpus = p.reserved
} else {
addFromCpus, _, err := p.cpuTreeAllocator.ResizeCpus(cpuset.New(), p.freeCpus, blnDef.MinCpus)
addFromCpus, _, err := cpuTreeAllocator.ResizeCpus(cpuset.New(), p.freeCpus, blnDef.MinCpus)
if err != nil {
return nil, balloonsError("failed to choose a cpuset for allocating first %d CPUs from %#s", blnDef.MinCpus, p.freeCpus)
}
Expand All @@ -562,12 +581,13 @@ func (p *balloons) newBalloon(blnDef *BalloonDef, confCpus bool) (*Balloon, erro
p.freeCpus = p.freeCpus.Difference(cpus)
}
bln := &Balloon{
Def: blnDef,
Instance: freeInstance,
PodIDs: make(map[string][]string),
Cpus: cpus,
SharedIdleCpus: cpuset.New(),
Mems: p.closestMems(cpus),
Def: blnDef,
Instance: freeInstance,
PodIDs: make(map[string][]string),
Cpus: cpus,
SharedIdleCpus: cpuset.New(),
Mems: p.closestMems(cpus),
cpuTreeAllocator: cpuTreeAllocator,
}
if confCpus {
if err = p.useCpuClass(bln); err != nil {
Expand Down Expand Up @@ -1046,7 +1066,8 @@ func (p *balloons) setConfig(bpoptions *BalloonsOptions) error {
p.freeCpus = p.allowed.Clone()
p.freeCpus = p.freeCpus.Difference(p.reserved)
p.cpuTreeAllocator = p.cpuTree.NewAllocator(cpuTreeAllocatorOptions{
topologyBalancing: bpoptions.AllocatorTopologyBalancing,
topologyBalancing: bpoptions.AllocatorTopologyBalancing,
preferSpreadOnPhysicalCores: bpoptions.PreferSpreadOnPhysicalCores,
})
// Instantiate built-in reserved and default balloons.
reservedBalloon, err := p.newBalloon(p.reservedBalloonDef, false)
Expand Down Expand Up @@ -1153,7 +1174,7 @@ func (p *balloons) resizeBalloon(bln *Balloon, newMilliCpus int) error {
defer p.useCpuClass(bln)
if cpuCountDelta > 0 {
// Inflate the balloon.
addFromCpus, _, err := p.cpuTreeAllocator.ResizeCpus(bln.Cpus, p.freeCpus, cpuCountDelta)
addFromCpus, _, err := bln.cpuTreeAllocator.ResizeCpus(bln.Cpus, p.freeCpus, cpuCountDelta)
if err != nil {
return balloonsError("resize/inflate: failed to choose a cpuset for allocating additional %d CPUs: %w", cpuCountDelta, err)
}
Expand All @@ -1167,7 +1188,7 @@ func (p *balloons) resizeBalloon(bln *Balloon, newMilliCpus int) error {
p.updatePinning(p.shareIdleCpus(p.freeCpus, newCpus)...)
} else {
// Deflate the balloon.
_, removeFromCpus, err := p.cpuTreeAllocator.ResizeCpus(bln.Cpus, p.freeCpus, cpuCountDelta)
_, removeFromCpus, err := bln.cpuTreeAllocator.ResizeCpus(bln.Cpus, p.freeCpus, cpuCountDelta)
if err != nil {
return balloonsError("resize/deflate: failed to choose a cpuset for releasing %d CPUs: %w", -cpuCountDelta, err)
}
Expand Down
182 changes: 180 additions & 2 deletions pkg/cri/resource-manager/policy/builtin/balloons/cputree.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ type cpuTreeAllocatorOptions struct {
// topologyBalancing true prefers allocating from branches
// with most free CPUs (spread allocations), while false is
// the opposite (packed allocations).
topologyBalancing bool
topologyBalancing bool
preferSpreadOnPhysicalCores bool
}

// Strings returns topology level as a string
Expand Down Expand Up @@ -131,6 +132,19 @@ func (t *cpuTreeNode) String() string {
return fmt.Sprintf("%s%v", t.name, t.children)
}

func (t *cpuTreeNode) PrettyPrint() string {
origDepth := t.Depth()
lines := []string{}
t.DepthFirstWalk(func(tn *cpuTreeNode) error {
lines = append(lines,
fmt.Sprintf("%s%s: %q cpus: %s",
strings.Repeat(" ", (tn.Depth()-origDepth)*4),
tn.level, tn.name, tn.cpus))
return nil
})
return strings.Join(lines, "\n")
}

// String returns cpuTreeNodeAttributes as a string.
func (tna cpuTreeNodeAttributes) String() string {
return fmt.Sprintf("%s{%d,%v,%d,%d}", tna.t.name, tna.depth,
Expand All @@ -146,6 +160,34 @@ func NewCpuTree(name string) *cpuTreeNode {
}
}

func (t *cpuTreeNode) CopyTree() *cpuTreeNode {
newNode := t.CopyNode()
newNode.children = make([]*cpuTreeNode, 0, len(t.children))
for _, child := range t.children {
newNode.AddChild(child.CopyTree())
}
return newNode
}

func (t *cpuTreeNode) CopyNode() *cpuTreeNode {
newNode := cpuTreeNode{
name: t.name,
level: t.level,
parent: t.parent,
children: t.children,
cpus: t.cpus,
}
return &newNode
}

// Depth returns the distance from the root node.
func (t *cpuTreeNode) Depth() int {
if t.parent == nil {
return 0
}
return t.parent.Depth() + 1
}

// AddChild adds new child node to a CPU tree node.
func (t *cpuTreeNode) AddChild(child *cpuTreeNode) {
child.parent = t
Expand All @@ -165,6 +207,38 @@ func (t *cpuTreeNode) Cpus() cpuset.CPUSet {
return t.cpus
}

// SiblingIndex returns the index of this node among its parents
// children. Returns -1 for the root node, -2 if this node is not
// listed among the children of its parent.
func (t *cpuTreeNode) SiblingIndex() int {
if t.parent == nil {
return -1
}
for idx, child := range t.parent.children {
if child == t {
return idx
}
}
return -2
}

func (t *cpuTreeNode) FindLeafWithCpu(cpu int) *cpuTreeNode {
var found *cpuTreeNode
t.DepthFirstWalk(func(tn *cpuTreeNode) error {
if len(tn.children) > 0 {
return nil
}
for _, cpuHere := range tn.cpus.List() {
if cpu == cpuHere {
found = tn
return WalkStop
}
}
return nil // not found here, no more children to search
})
return found
}

// WalkSkipChildren error returned from a DepthFirstWalk handler
// prevents walking deeper in the tree. The caller of the
// DepthFirstWalk will get no error.
Expand Down Expand Up @@ -236,13 +310,18 @@ func NewCpuTreeFromSystem() (*cpuTreeNode, error) {
nodeTree.level = CPUTopologyLevelNuma
dieTree.AddChild(nodeTree)
node := sys.Node(nodeID)
threadsSeen := map[int]struct{}{}
for _, cpuID := range node.CPUSet().List() {
if _, alreadySeen := threadsSeen[cpuID]; alreadySeen {
continue
}
cpuTree := NewCpuTree(fmt.Sprintf("p%dd%dn%dcpu%d", packageID, dieID, nodeID, cpuID))

cpuTree.level = CPUTopologyLevelCore
nodeTree.AddChild(cpuTree)
cpu := sys.CPU(cpuID)
for _, threadID := range cpu.ThreadCPUSet().List() {
threadsSeen[threadID] = struct{}{}
threadTree := NewCpuTree(fmt.Sprintf("p%dd%dn%dcpu%dt%d", packageID, dieID, nodeID, cpuID, threadID))
threadTree.level = CPUTopologyLevelThread
cpuTree.AddChild(threadTree)
Expand Down Expand Up @@ -312,13 +391,83 @@ func (t *cpuTreeNode) toAttributedSlice(
}
}

// SplitLevel returns the root node of a new CPU tree where all
// branches of a topology level have been split into new classes.
func (t *cpuTreeNode) SplitLevel(splitLevel CPUTopologyLevel, cpuClassifier func(int) int) *cpuTreeNode {
newRoot := t.CopyTree()
newRoot.DepthFirstWalk(func(tn *cpuTreeNode) error {
// Dive into the level that will be split.
if tn.level != splitLevel {
return nil
}
// Classify CPUs to the map: class -> list of cpus
classCpus := map[int][]int{}
for _, cpu := range t.cpus.List() {
class := cpuClassifier(cpu)
classCpus[class] = append(classCpus[class], cpu)
}
// Clear existing children of this node. New children
// will be classes whose children are masked versions
// of original children of this node.
origChildren := tn.children
tn.children = make([]*cpuTreeNode, 0, len(classCpus))
// Add new child corresponding each class.
for class, cpus := range classCpus {
cpuMask := cpuset.New(cpus...)
newNode := NewCpuTree(fmt.Sprintf("%sclass%d", tn.name, class))
tn.AddChild(newNode)
newNode.cpus = tn.cpus.Intersection(cpuMask)
newNode.level = tn.level
newNode.parent = tn
for _, child := range origChildren {
newChild := child.CopyTree()
newChild.DepthFirstWalk(func(cn *cpuTreeNode) error {
cn.cpus = cn.cpus.Intersection(cpuMask)
if cn.cpus.Size() == 0 && cn.parent != nil {
// all cpus masked
// out: cut out this
// branch
newSiblings := []*cpuTreeNode{}
for _, child := range cn.parent.children {
if child != cn {
newSiblings = append(newSiblings, child)
}
}
cn.parent.children = newSiblings
return WalkSkipChildren
}
return nil
})
newNode.AddChild(newChild)
}
}
return WalkSkipChildren
})
return newRoot
}

// NewAllocator returns new CPU allocator for allocating CPUs from a
// CPU tree branch.
func (t *cpuTreeNode) NewAllocator(options cpuTreeAllocatorOptions) *cpuTreeAllocator {
ta := &cpuTreeAllocator{
root: t,
options: options,
}
if options.preferSpreadOnPhysicalCores {
newTree := t.SplitLevel(CPUTopologyLevelNuma,
// CPU classifier: class of the CPU equals to
// the index in the child list of its parent
// node in the tree. Expect leaf node is a
// hyperthread, parent a physical core.
func(cpu int) int {
leaf := t.FindLeafWithCpu(cpu)
if leaf == nil {
log.Fatalf("SplitLevel CPU classifier: cpu %d not in tree:\n%s\n\n", cpu, t.PrettyPrint())
}
return leaf.SiblingIndex()
})
ta.root = newTree
}
return ta
}

Expand Down Expand Up @@ -409,7 +558,36 @@ func (ta *cpuTreeAllocator) sorterRelease(tnas []cpuTreeNodeAttributes) func(int
// abs(delta) CPUs can be freed.
func (ta *cpuTreeAllocator) ResizeCpus(currentCpus, freeCpus cpuset.CPUSet, delta int) (cpuset.CPUSet, cpuset.CPUSet, error) {
if delta > 0 {
return ta.resizeCpus(currentCpus, freeCpus, delta)
addFromSuperset, removeFromSuperset, err := ta.resizeCpus(currentCpus, freeCpus, delta)
if !ta.options.preferSpreadOnPhysicalCores || addFromSuperset.Size() == delta {
return addFromSuperset, removeFromSuperset, err
}
// addFromSuperset contains more CPUs (equally good
// choices) than actually needed. In case of
// preferSpreadOnPhysicalCores, however, selecting any
// of these does not result in equally good
// result. Therefore, in this case, construct addFrom
// set by adding one CPU at a time.
addFrom := cpuset.New()
for n := 0; n < delta; n++ {
addSingleFrom, _, err := ta.resizeCpus(currentCpus, freeCpus, 1)
if err != nil {
return addFromSuperset, removeFromSuperset, err
}
if addSingleFrom.Size() != 1 {
return addFromSuperset, removeFromSuperset, fmt.Errorf("internal error: failed to find single CPU to allocate, "+
"currentCpus=%s freeCpus=%s expectedSingle=%s",
currentCpus, freeCpus, addSingleFrom)
}
addFrom = addFrom.Union(addSingleFrom)
if addFrom.Size() != n+1 {
return addFromSuperset, removeFromSuperset, fmt.Errorf("internal error: double add the same CPU (%s) to cpuset %s on round %d",
addSingleFrom, addFrom, n+1)
}
currentCpus = currentCpus.Union(addSingleFrom)
freeCpus = freeCpus.Difference(addSingleFrom)
}
return addFrom, removeFromSuperset, nil
}
// In multi-CPU removal, remove CPUs one by one instead of
// trying to find a single topology element from which all of
Expand Down
Loading

0 comments on commit b81663d

Please sign in to comment.