Skip to content

Commit

Permalink
improve: introduce github.com/panjf2000/ants for routine pool
Browse files Browse the repository at this point in the history
  • Loading branch information
hhyasdf committed Jan 17, 2024
1 parent abd2689 commit a74374d
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 188 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/fatih/color v1.16.0
github.com/opencontainers/go-digest v1.0.0
github.com/opencontainers/image-spec v1.1.0-rc5
github.com/panjf2000/ants v1.3.0
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.0
github.com/stretchr/testify v1.8.4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ github.com/opencontainers/runc v1.1.10 h1:EaL5WeO9lv9wmS6SASjszOeQdSctvpbu0DdBQB
github.com/opencontainers/runc v1.1.10/go.mod h1:+/R6+KmDlh+hOO8NkjmgkG9Qzvypzk0yXxAPYYR65+M=
github.com/opencontainers/runtime-spec v1.1.0 h1:HHUyrt9mwHUjtasSbXSMvs4cyFxh+Bll4AjJ9odEGpg=
github.com/opencontainers/runtime-spec v1.1.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
github.com/panjf2000/ants v1.3.0 h1:8pQ+8leaLc9lys2viEEr8md0U4RN6uOSUCE9bOYjQ9M=
github.com/panjf2000/ants v1.3.0/go.mod h1:AaACblRPzq35m1g3enqYcxspbbiOJJYaxU2wMpm1cXY=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
133 changes: 63 additions & 70 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"os"
"time"

"github.com/AliyunContainerService/image-syncer/pkg/utils/types"
"github.com/fatih/color"
"github.com/panjf2000/ants"
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"

"github.com/AliyunContainerService/image-syncer/pkg/concurrent"
"github.com/AliyunContainerService/image-syncer/pkg/task"
"github.com/fatih/color"
"github.com/sirupsen/logrus"
"github.com/AliyunContainerService/image-syncer/pkg/utils/types"
)

// Client describes a synchronization client
Expand Down Expand Up @@ -97,7 +98,48 @@ func (c *Client) Run() error {
}
}

c.openRoutinesHandleTaskAndWaitForFinish()
routinePool, _ := ants.NewPoolWithFunc(c.routineNum, func(i interface{}) {
tTask, ok := i.(task.Task)
if !ok {
c.logger.Errorf("invalid task %v", i)
return
}

nextTasks, message, err := tTask.Run()
count, total := c.taskCounter.Increase()
finishedNumString := color.New(color.FgGreen).Sprintf("%d", count)
totalNumString := color.New(color.FgGreen).Sprintf("%d", total)

if err != nil {
c.failedTaskList.PushBack(tTask)
c.failedTaskCounter.IncreaseTotal()
c.logger.Errorf("Failed to executed %v: %v. Now %v/%v tasks have been processed.", tTask.String(), err,
finishedNumString, totalNumString)
} else {
if tTask.Type() == task.ManifestType {
// TODO: the ignored images will not be recorded in success images list
c.successImagesList.Add(tTask.GetSource().String(), tTask.GetDestination().String())
}

if len(message) != 0 {
c.logger.Infof("Finish %v: %v. Now %v/%v tasks have been processed.", tTask.String(), message,
finishedNumString, totalNumString)
} else {
c.logger.Infof("Finish %v. Now %v/%v tasks have been processed.", tTask.String(),
finishedNumString, totalNumString)
}
}

for _, t := range nextTasks {
c.taskList.PushFront(t)
c.taskCounter.IncreaseTotal()
}
})
defer routinePool.Release()

if err = c.handleTasks(routinePool); err != nil {
return fmt.Errorf("failed to handle tasks: %v", err)
}

for times := 0; times < c.retries; times++ {
c.taskCounter, c.failedTaskCounter = c.failedTaskCounter, concurrent.NewCounter(0, 0)
Expand All @@ -110,7 +152,9 @@ func (c *Client) Run() error {
if c.taskList.Len() != 0 {
// retry to handle task
c.logger.Infof("Start to retry tasks, please wait ...")
c.openRoutinesHandleTaskAndWaitForFinish()
if err = c.handleTasks(routinePool); err != nil {
return fmt.Errorf("failed to handle tasks: %v", err)
}
}
}

Expand Down Expand Up @@ -144,73 +188,22 @@ func (c *Client) Run() error {
return nil
}

func (c *Client) openRoutinesHandleTaskAndWaitForFinish() {
broadcastChan := concurrent.NewBroadcastChan(c.routineNum)
broadcastChan.Broadcast()

go func() {
for {
// if all the worker routines is hung and taskList is empty, stop everything
<-broadcastChan.TotalHungChan()
if c.taskList.Len() == 0 {
broadcastChan.Close()
func (c *Client) handleTasks(routinePool *ants.PoolWithFunc) error {
for {
item := c.taskList.PopFront()
// no more tasks need to handle
if item == nil {
if routinePool.Running() == 0 {
break
} else {

Check warning on line 198 in pkg/client/client.go

View workflow job for this annotation

GitHub Actions / golangci-lint

superfluous-else: if block ends with a break statement, so drop this else and outdent its block (revive)
time.Sleep(1 * time.Second)
continue
}
}
}()

concurrent.CreateRoutinesAndWaitForFinish(c.routineNum, func() {
for {
closed := broadcastChan.Wait()

// run out of exist tasks
for {
item := c.taskList.PopFront()
// no more tasks need to handle
if item == nil {
break
}

tTask := item.(task.Task)

c.logger.Infof("Executing %v...", tTask.String())
nextTasks, message, err := tTask.Run()

count, total := c.taskCounter.Increase()
finishedNumString := color.New(color.FgGreen).Sprintf("%d", count)
totalNumString := color.New(color.FgGreen).Sprintf("%d", total)

if err != nil {
c.failedTaskList.PushBack(tTask)
c.failedTaskCounter.IncreaseTotal()
c.logger.Errorf("Failed to executed %v: %v. Now %v/%v tasks have been processed.", tTask.String(), err,
finishedNumString, totalNumString)
} else {
if tTask.Type() == task.ManifestType {
// TODO: the ignored images will not be recorded in success images list
c.successImagesList.Add(tTask.GetSource().String(), tTask.GetDestination().String())
}

if len(message) != 0 {
c.logger.Infof("Finish %v: %v. Now %v/%v tasks have been processed.", tTask.String(), message,
finishedNumString, totalNumString)
} else {
c.logger.Infof("Finish %v. Now %v/%v tasks have been processed.", tTask.String(),
finishedNumString, totalNumString)
}
}

if nextTasks != nil {
for _, t := range nextTasks {
c.taskList.PushFront(t)
c.taskCounter.IncreaseTotal()
}
broadcastChan.Broadcast()
}
}

if closed {
return
}
if err := routinePool.Invoke(item); err != nil {
return fmt.Errorf("failed to invoke routine: %v", err)
}
})
}
return nil
}
54 changes: 0 additions & 54 deletions pkg/concurrent/broadcastChan.go

This file was deleted.

29 changes: 11 additions & 18 deletions pkg/concurrent/counter.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
package concurrent

import "sync"

type Counter struct {
c chan struct{}
sync.Mutex
count int
total int
}

func NewCounter(count, total int) *Counter {
return &Counter{
c: make(chan struct{}, 1),
count: count,
total: total,
}
}

func (c *Counter) Decrease() (int, int) {
c.c <- struct{}{}
defer func() {
<-c.c
}()
c.Lock()
defer c.Unlock()

if c.count > 0 {
c.count--
Expand All @@ -27,10 +26,8 @@ func (c *Counter) Decrease() (int, int) {
}

func (c *Counter) Increase() (int, int) {
c.c <- struct{}{}
defer func() {
<-c.c
}()
c.Lock()
defer c.Unlock()

if c.count < c.total {
c.count++
Expand All @@ -39,21 +36,17 @@ func (c *Counter) Increase() (int, int) {
}

func (c *Counter) IncreaseTotal() (int, int) {
c.c <- struct{}{}
defer func() {
<-c.c
}()
c.Lock()
defer c.Unlock()

c.total++
return c.count, c.total
}

// Value return count and total
func (c *Counter) Value() (int, int) {
c.c <- struct{}{}
defer func() {
<-c.c
}()
c.Lock()
defer c.Unlock()

return c.count, c.total
}
35 changes: 13 additions & 22 deletions pkg/concurrent/imageList.go
Original file line number Diff line number Diff line change
@@ -1,62 +1,53 @@
package concurrent

import (
"sync"

"github.com/AliyunContainerService/image-syncer/pkg/utils/types"
)

type ImageList struct {
c chan struct{}
sync.Mutex
content types.ImageList
}

func NewImageList() *ImageList {
return &ImageList{
c: make(chan struct{}, 1),
content: types.ImageList{},
}
}

func (i *ImageList) Add(src, dst string) {
i.c <- struct{}{}
defer func() {
<-i.c
}()
i.Lock()
defer i.Unlock()

i.content.Add(src, dst)
}

func (i *ImageList) Query(src, dst string) bool {
i.c <- struct{}{}
defer func() {
<-i.c
}()
i.Lock()
defer i.Unlock()

return i.content.Query(src, dst)
}

func (i *ImageList) Delete(key string) {
i.c <- struct{}{}
defer func() {
<-i.c
}()
i.Lock()
defer i.Unlock()

delete(i.content, key)
}

func (i *ImageList) Rest() {
i.c <- struct{}{}
defer func() {
<-i.c
}()
i.Lock()
defer i.Unlock()

i.content = types.ImageList{}
}

func (i *ImageList) Content() types.ImageList {
i.c <- struct{}{}
defer func() {
<-i.c
}()
i.Lock()
defer i.Unlock()

return i.content
}
Loading

0 comments on commit a74374d

Please sign in to comment.