diff --git a/Makefile b/Makefile index ea6740b..39be4c2 100644 --- a/Makefile +++ b/Makefile @@ -10,13 +10,14 @@ GOARCH=amd64 help: @echo @echo "Available targets:" - @echo " * build - build the binary, output to $(ARC_BINARY)" - @echo " * linux - build the binary, output to $(ARC_BINARY)" + @echo " * build - build the binary, output to $(BUILD_DIR)" + @echo " * linux - build the binary, output to $(BUILD_DIR)" @echo " * docker - build docker image" .PHONY: build build: @mkdir -p $(BUILD_DIR) + # Build sources go build -o $(MOSQUITTO_EXPORTER_BINARY) -ldflags="$(LDFLAGS)" $(PKG_NAME) linux: export GOOS=linux diff --git a/main.go b/main.go index 195b052..751af3d 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "log" "net/http" "os" @@ -41,8 +42,9 @@ var ( "$SYS/broker/clients/maximum": "The maximum number of clients connected simultaneously since the broker started", "$SYS/broker/clients/total": "The total number of clients connected since the broker started.", } - counterMetrics = map[string]*MosquittoCounter{} - gaugeMetrics = map[string]prometheus.Gauge{} + counterMetrics = map[string]*MosquittoCounter{} + gaugeMetrics = map[string]prometheus.Gauge{} + firstMessageReceived = false ) func main() { @@ -110,6 +112,23 @@ func runServer(c *cli.Context) { opts := mqtt.NewClientOptions() opts.SetCleanSession(true) opts.AddBroker(c.String("endpoint")) + opts.SetClientID(fmt.Sprintf("prometheus_mosquitto_exporter_%v", time.Now().Unix())) + + // initializes the "broker_connection_up" metric to 0 (down) + gaugeMetrics["broker_connection_up"] = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "broker_connection_up", + Help: "broker up/down", + }) + prometheus.MustRegister(gaugeMetrics["broker_connection_up"]) + gaugeMetrics["broker_connection_up"].Set(0) + + // initializes the "seconds_from_last_update" metric to -1 + gaugeMetrics["broker_seconds_from_last_update"] = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "broker_seconds_from_last_update", + Help: "secondes from las update of metrics, if not received ever: -1", + }) + prometheus.MustRegister(gaugeMetrics["broker_seconds_from_last_update"]) + gaugeMetrics["broker_seconds_from_last_update"].Set(-1) // if you have a username you'll need a password with it if c.String("user") != "" { @@ -141,6 +160,8 @@ func runServer(c *cli.Context) { opts.OnConnect = func(client mqtt.Client) { log.Printf("Connected to %s", c.String("endpoint")) + // update the "broker_connection_up" metric (up) + gaugeMetrics["broker_connection_up"].Set(1) // subscribe on every (re)connect token := client.Subscribe("$SYS/#", 0, func(_ mqtt.Client, msg mqtt.Message) { processUpdate(msg.Topic(), string(msg.Payload())) @@ -154,9 +175,30 @@ func runServer(c *cli.Context) { } opts.OnConnectionLost = func(client mqtt.Client, err error) { log.Printf("Error: Connection to %s lost: %s", c.String("endpoint"), err) + // update the "broker_connection_up" metric (down) + gaugeMetrics["broker_connection_up"].Set(0) + // try to reconnect + //mqttConnect(client, c) } client := mqtt.NewClient(opts) + // launch the first connection in another thread so it is no blocking + // and exporter can serve metrics in case of no connection + go mqttConnect(client, c) + + // start counter of seconds from last update + go increaseSecondsSinceLastUpdate() + + // init the router and server + http.Handle("/metrics", prometheus.Handler()) + http.HandleFunc("/", serveVersion) + log.Printf("Listening on %s...", c.GlobalString("bind-address")) + err := http.ListenAndServe(c.GlobalString("bind-address"), nil) + fatalfOnError(err, "Failed to bind on %s: ", c.GlobalString("bind-address")) +} + +// try to connect forever with the MQTT broker +func mqttConnect(client mqtt.Client, c *cli.Context) { // try to connect forever for { token := client.Connect() @@ -164,22 +206,15 @@ func runServer(c *cli.Context) { if token.Error() == nil { break } - log.Printf("Error: Failed to connect to broker: %s", token.Error()) + log.Printf("Error: Failed to connect to broker: %v", token.Error()) } else { - log.Printf("Timeout connecting to endpoint %s", c.String("endpoint")) + log.Printf("Timeout connecting to endpoint") } time.Sleep(5 * time.Second) } - - // init the router and server - http.Handle("/metrics", prometheus.Handler()) - http.HandleFunc("/", serveVersion) - log.Printf("Listening on %s...", c.GlobalString("bind-address")) - err := http.ListenAndServe(c.GlobalString("bind-address"), nil) - fatalfOnError(err, "Failed to bind on %s: ", c.GlobalString("bind-address")) } -// $SYS/broker/bytes/received +// process the messages received in $SYS/ func processUpdate(topic, payload string) { //log.Printf("Got broker update with topic %s and data %s", topic, payload) if _, ok := ignoreKeyMetrics[topic]; !ok { @@ -190,6 +225,25 @@ func processUpdate(topic, payload string) { //log.Printf("Processing gauge metric %s with data %s", topic, payload) processGaugeMetric(topic, payload) } + restartSecondsSinceLastUpdate() + } +} + +func restartSecondsSinceLastUpdate() { + // if it is the first message received set the firstMessageReceived flag true + if firstMessageReceived == false { + firstMessageReceived = true + } + // restart the timer from last message received to 0 + gaugeMetrics["broker_seconds_from_last_update"].Set(0) +} + +func increaseSecondsSinceLastUpdate() { + for { + if firstMessageReceived == true { + gaugeMetrics["broker_seconds_from_last_update"].Inc() + } + time.Sleep(time.Second) } }