-
Notifications
You must be signed in to change notification settings - Fork 3
/
riemann.go
140 lines (124 loc) · 3.26 KB
/
riemann.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package mozzle
import (
"log"
"time"
"github.com/amir/raidman"
)
// RiemannEmitter implements Emitter that interpretes metrics as Riemann events
// and emits them to a Riemann instance.
type RiemannEmitter struct {
client *riemann
eventTTL float32
events chan *raidman.Event
done chan struct{}
connected bool
}
// Initialize prepares for emitting to Riemann.
// It should be called only once, before using the emitter.
//
// Known networks are "tcp", "tcp4", "tcp6", "udp", "udp4" and "udp6".
// The queueSize argument specifies how many events will be kept in-memory
// if there is problem with emission.
func (r *RiemannEmitter) Initialize(network, addr string, ttl float32, queueSize int) {
r.client = &riemann{
network: network,
addr: addr,
}
r.eventTTL = ttl
r.events = make(chan *raidman.Event, queueSize)
r.done = make(chan struct{})
go r.emitLoop()
}
// Close renders the emitter unusable and frees all allocated resources.
// The emitter should not be used after it has been closed.
// There is no guarantee that any queued events will be sent before closing.
// This particular close never fails.
func (r *RiemannEmitter) Close() error {
close(r.done)
return nil
}
// Emit constructs a riemann event from the specified metric and emits it to
// Riemann. It is non-blocking and safe for concurrent use by multiple goroutines.
//
// Emit must be used only after calling Initialize, and not after calling
// Shutdown.
func (r *RiemannEmitter) Emit(m Metric) {
e := &raidman.Event{}
e.Ttl = r.eventTTL
e.Time = time.Now().Unix()
if m.Time != 0 {
e.Time = m.Time
}
e.Host = m.Application
e.Service = m.Service
e.Metric = m.Metric
e.State = m.State
// Since we're modifying the map, we need a copy.
e.Attributes = copyMap(m.Attributes)
if e.Attributes == nil {
e.Attributes = make(map[string]string)
}
e.Attributes["application"] = m.Application
e.Attributes["application_id"] = m.ApplicationID
e.Attributes["org"] = m.Organization
e.Attributes["space"] = m.Space
select {
case r.events <- e:
default:
log.Printf("riemann: queue full, dropping events\n")
}
}
func (r *RiemannEmitter) emitLoop() {
r.connected = false
for {
select {
case e := <-r.events:
if !r.connected {
if err := r.client.Connect(); err != nil {
log.Printf("riemann: error connecting: %v\n", err)
continue
}
r.connected = true
}
if err := r.client.SendEvent(e); err != nil {
log.Printf("riemann: error sending event: %v\n", err)
if cerr := r.client.Close(); cerr != nil {
log.Printf("riemann: error closing conn: %v\n", cerr)
}
r.connected = false
}
case <-r.done:
return
}
}
}
type riemann struct {
network string
addr string
client *raidman.Client
}
func (r *riemann) Connect() error {
client, err := raidman.DialWithTimeout(r.network, r.addr, 5*time.Second)
if err != nil {
return err
}
r.client = client
return nil
}
func (r *riemann) Close() error {
return r.client.Close()
}
func (r *riemann) SendEvent(e *raidman.Event) error {
return r.client.Send(e)
}
// copyMap returns a copy of m. If m is nil, copyMap returns nil.
func copyMap(m map[string]string) map[string]string {
if m == nil {
return nil
}
cpy := make(map[string]string)
for k, v := range m {
cpy[k] = v
}
return cpy
}