Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#191 Docker Compose support #209

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions connector/collector/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ func (c *Mock) run() {
c.IOBytesWrite = rand.Int63n(8098) * c.aggression

for {
if c.done {
break
}
c.CPUUtil += rand.Intn(2) * int(c.aggression)
if c.CPUUtil >= 100 {
c.CPUUtil = 0
Expand All @@ -74,9 +77,6 @@ func (c *Mock) run() {
}
c.MemPercent = percent(float64(c.MemUsage), float64(c.MemLimit))
c.stream <- c.Metrics
if c.done {
break
}
time.Sleep(1 * time.Second)
}

Expand Down
54 changes: 46 additions & 8 deletions connector/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type StatusUpdate struct {
type Docker struct {
client *api.Client
containers map[string]*container.Container
noneStack *container.Stack
stacks map[string]*container.Stack
needsRefresh chan string // container IDs requiring refresh
statuses chan StatusUpdate
closed chan struct{}
Expand All @@ -46,6 +48,8 @@ func NewDocker() (Connector, error) {
cm := &Docker{
client: client,
containers: make(map[string]*container.Container),
noneStack: container.NewStack("", "none"),
stacks: make(map[string]*container.Stack),
needsRefresh: make(chan string, 60),
statuses: make(chan StatusUpdate, 60),
closed: make(chan struct{}),
Expand Down Expand Up @@ -159,16 +163,17 @@ func ipsFormat(networks map[string]api.ContainerNetwork) string {
return strings.Join(ips, "\n")
}

func (cm *Docker) refresh(c *container.Container) {
insp, found, failed := cm.inspect(c.Id)
func (cm *Docker) refresh(id string) {
insp, found, failed := cm.inspect(id)
if failed {
return
}
// remove container if no longer exists
if !found {
cm.delByID(c.Id)
cm.delByID(id)
return
}
c := cm.MustGet(id, insp.Config.Labels)
c.SetMeta("name", shortName(insp.Name))
c.SetMeta("image", insp.Config.Image)
c.SetMeta("IPs", ipsFormat(insp.NetworkSettings.Networks))
Expand Down Expand Up @@ -204,19 +209,43 @@ func (cm *Docker) refreshAll() {
}

for _, i := range allContainers {
c := cm.MustGet(i.ID)
c := cm.MustGet(i.ID, i.Labels)
c.SetMeta("name", shortName(i.Names[0]))
c.SetState(i.State)
cm.needsRefresh <- c.Id
}
}

func (cm *Docker) initContainerStack(c *container.Container, labels map[string]string) {
stackName := labels["com.docker.compose.project"]
if stackName == "" {
c.Stack = cm.noneStack
} else {
// try to find the existing stack
stack := cm.stacks[stackName]
if stack != nil {
c.Stack = stack
} else {
// create and remember the new stack
c.Stack = container.NewStack(stackName, "compose")
c.Stack.WorkDir = labels["com.docker.compose.project.working_dir"]
c.Stack.Config = labels["com.docker.compose.project.config_files"]
cm.stacks[stackName] = c.Stack
// set compose service for the container
composeService := labels["com.docker.compose.service"]
if composeService != "" {
c.SetMeta("service", composeService)
}
}
}
c.Stack.Count++
}

func (cm *Docker) Loop() {
for {
select {
case id := <-cm.needsRefresh:
c := cm.MustGet(id)
cm.refresh(c)
cm.refresh(id)
case <-cm.closed:
return
}
Expand All @@ -242,7 +271,7 @@ func (cm *Docker) LoopStatuses() {
}

// MustGet gets a single container, creating one anew if not existing
func (cm *Docker) MustGet(id string) *container.Container {
func (cm *Docker) MustGet(id string, labels map[string]string) *container.Container {
c, ok := cm.Get(id)
// append container struct for new containers
if !ok {
Expand All @@ -254,6 +283,7 @@ func (cm *Docker) MustGet(id string) *container.Container {
c = container.New(id, collector, manager)
cm.lock.Lock()
cm.containers[id] = c
cm.initContainerStack(c, labels)
cm.lock.Unlock()
}
return c
Expand All @@ -270,7 +300,15 @@ func (cm *Docker) Get(id string) (*container.Container, bool) {
// Remove containers by ID
func (cm *Docker) delByID(id string) {
cm.lock.Lock()
delete(cm.containers, id)
c, hasContainer := cm.containers[id]
if hasContainer {
c.Stack.Count--
// if this was the last container in stack then remove stack
if c.Stack != cm.noneStack && c.Stack.Count <= 0 {
delete(cm.stacks, c.Stack.Name)
}
delete(cm.containers, id)
}
cm.lock.Unlock()
log.Infof("removed dead container: %s", id)
}
Expand Down
22 changes: 16 additions & 6 deletions connector/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ func init() { enabled["mock"] = NewMock }

type Mock struct {
containers container.Containers
noneStack *container.Stack
}

func NewMock() (Connector, error) {
cs := &Mock{}
cs := &Mock{
noneStack: container.NewStack("", "none"),
}
go cs.Init()
go cs.Loop()
return cs, nil
Expand All @@ -30,13 +33,17 @@ func NewMock() (Connector, error) {
// Create Mock containers
func (cs *Mock) Init() {
rand.Seed(int64(time.Now().Nanosecond()))
stack1 := container.NewStack("stack1", "compose")
stack2 := container.NewStack("stack2", "compose")

for i := 0; i < 4; i++ {
cs.makeContainer(3, true)
for i := 0; i < 2; i++ {
cs.makeContainer(3, true, cs.noneStack)
cs.makeContainer(3, true, stack1)
}

for i := 0; i < 16; i++ {
cs.makeContainer(1, false)
for i := 0; i < 8; i++ {
cs.makeContainer(1, false, cs.noneStack)
cs.makeContainer(1, false, stack2)
}

}
Expand All @@ -52,10 +59,12 @@ func (cs *Mock) Wait() struct{} {

var healthStates = []string{"starting", "healthy", "unhealthy"}

func (cs *Mock) makeContainer(aggression int64, health bool) {
func (cs *Mock) makeContainer(aggression int64, health bool, stack *container.Stack) {
collector := collector.NewMock(aggression)
manager := manager.NewMock()
c := container.New(makeID(), collector, manager)
c.Stack = stack
c.Stack.Count++
c.SetMeta("name", makeName())
c.SetState(makeState())
if health {
Expand Down Expand Up @@ -109,6 +118,7 @@ func (cs *Mock) All() container.Containers {
func (cs *Mock) delByID(id string) {
for n, c := range cs.containers {
if c.Id == id {
c.Stack.Count--
cs.del(n)
return
}
Expand Down
10 changes: 9 additions & 1 deletion connector/runc.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Runc struct {
closed chan struct{}
needsRefresh chan string // container IDs requiring refresh
lock sync.RWMutex
noneStack *container.Stack
}

func NewRunc() (Connector, error) {
Expand All @@ -74,6 +75,7 @@ func NewRunc() (Connector, error) {
opts: opts,
factory: factory,
containers: make(map[string]*container.Container),
noneStack: container.NewStack("", "none"),
libContainers: make(map[string]libcontainer.Container),
closed: make(chan struct{}),
lock: sync.RWMutex{},
Expand Down Expand Up @@ -193,6 +195,8 @@ func (cm *Runc) MustGet(id string) *container.Container {
// create container
manager := manager.NewRunc()
c = container.New(id, collector, manager)
c.Stack = cm.noneStack
c.Stack.Count++

name := libc.ID()
// set initial metadata
Expand All @@ -215,7 +219,11 @@ func (cm *Runc) MustGet(id string) *container.Container {
// Remove containers by ID
func (cm *Runc) delByID(id string) {
cm.lock.Lock()
delete(cm.containers, id)
c, hasContainer := cm.containers[id]
if hasContainer {
c.Stack.Count--
delete(cm.containers, id)
}
delete(cm.libContainers, id)
cm.lock.Unlock()
log.Infof("removed dead container: %s", id)
Expand Down
31 changes: 29 additions & 2 deletions container/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,22 @@ const (
running = "running"
)

// Docker compose project
type Stack struct {
Name string
WorkDir string
Config string
Count int // Containers Count
Widgets *compact.CompactRow
Metrics models.Metrics
}

// Metrics and metadata representing a container
type Container struct {
models.Metrics
Id string
Meta models.Meta
Stack *Stack
Widgets *compact.CompactRow
Display bool // display this container in compact view
updater cwidgets.WidgetUpdater
Expand All @@ -32,16 +43,27 @@ type Container struct {
func New(id string, collector collector.Collector, manager manager.Manager) *Container {
widgets := compact.NewCompactRow()
return &Container{
Metrics: models.NewMetrics(),
Metrics: models.Metrics{},
Id: id,
Meta: models.NewMeta("id", id[:12]),
Stack: nil,
Widgets: widgets,
updater: widgets,
collector: collector,
manager: manager,
}
}

func NewStack(name string, stackType string) *Stack {
p := &Stack{Name: name}
// create a compact row for the stack
widgets := compact.NewCompactRow()
meta := models.NewMeta("name", name, "stackType", stackType)
widgets.SetMeta(meta)
p.Widgets = widgets
return p
}

func (c *Container) RecreateWidgets() {
c.SetUpdater(cwidgets.NullWidgetUpdater{})
c.Widgets = compact.NewCompactRow()
Expand Down Expand Up @@ -84,11 +106,16 @@ func (c *Container) Logs() collector.LogCollector {
func (c *Container) Read(stream chan models.Metrics) {
go func() {
for metrics := range stream {
oldContainerMetrics := c.Metrics
c.Stack.Metrics.Subtract(oldContainerMetrics)
c.Stack.Metrics.Add(metrics)
c.Stack.Widgets.SetMetrics(c.Stack.Metrics)
c.Metrics = metrics
c.updater.SetMetrics(metrics)
}
log.Infof("reader stopped for container: %s", c.Id)
c.Metrics = models.NewMetrics()
c.Stack.Metrics.Subtract(c.Metrics)
c.Metrics = models.Metrics{}
c.Widgets.Reset()
}()
log.Infof("reader started for container: %s", c.Id)
Expand Down
Loading