Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add alert lifecycle observer #1

Open
wants to merge 1 commit into
base: release-2024-1-16
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions alertobserver/alertobserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2023 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package alertobserver

import (
"github.com/prometheus/alertmanager/types"
)

const (
EventAlertReceived string = "received"
EventAlertRejected string = "rejected"
EventAlertAddedToAggrGroup string = "addedAggrGroup"
EventAlertFailedAddToAggrGroup string = "failedAddAggrGroup"
EventAlertPipelineStart string = "pipelineStart"
EventAlertPipelinePassStage string = "pipelinePassStage"
EventAlertMuted string = "muted"
EventAlertSent string = "sent"
EventAlertSendFailed string = "sendFailed"
)

type AlertEventMeta map[string]interface{}

type LifeCycleObserver interface {
Observe(event string, alerts []*types.Alert, meta AlertEventMeta)
}
46 changes: 46 additions & 0 deletions alertobserver/testing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2023 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package alertobserver

import (
"sync"

"github.com/prometheus/alertmanager/types"
)

type FakeLifeCycleObserver struct {
AlertsPerEvent map[string][]*types.Alert
PipelineStageAlerts map[string][]*types.Alert
MetaPerEvent map[string][]AlertEventMeta
Mtx sync.RWMutex
}

func (o *FakeLifeCycleObserver) Observe(event string, alerts []*types.Alert, meta AlertEventMeta) {
o.Mtx.Lock()
defer o.Mtx.Unlock()
if event == EventAlertPipelinePassStage {
o.PipelineStageAlerts[meta["stageName"].(string)] = append(o.PipelineStageAlerts[meta["stageName"].(string)], alerts...)
} else {
o.AlertsPerEvent[event] = append(o.AlertsPerEvent[event], alerts...)
}
o.MetaPerEvent[event] = append(o.MetaPerEvent[event], meta)
}

func NewFakeLifeCycleObserver() *FakeLifeCycleObserver {
return &FakeLifeCycleObserver{
PipelineStageAlerts: map[string][]*types.Alert{},
AlertsPerEvent: map[string][]*types.Alert{},
MetaPerEvent: map[string][]AlertEventMeta{},
}
}
6 changes: 6 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"

"github.com/prometheus/alertmanager/alertobserver"
apiv1 "github.com/prometheus/alertmanager/api/v1"
apiv2 "github.com/prometheus/alertmanager/api/v2"
"github.com/prometheus/alertmanager/cluster"
Expand Down Expand Up @@ -81,6 +82,9 @@ type Options struct {
GroupInfoFunc func(func(*dispatch.Route) bool) dispatch.AlertGroupInfos
// APICallback define the callback function that each api call will perform before returned.
APICallback callback.Callback
// AlertLCObserver is used to add hooks to the different alert life cycle events.
// If nil then no observer methods will be invoked in the life cycle events.
AlertLCObserver alertobserver.LifeCycleObserver
}

func (o Options) validate() error {
Expand Down Expand Up @@ -124,6 +128,7 @@ func New(opts Options) (*API, error) {
opts.Peer,
log.With(l, "version", "v1"),
opts.Registry,
opts.AlertLCObserver,
)

v2, err := apiv2.NewAPI(
Expand All @@ -136,6 +141,7 @@ func New(opts Options) (*API, error) {
opts.Peer,
log.With(l, "version", "v2"),
opts.Registry,
opts.AlertLCObserver,
)
if err != nil {
return nil, err
Expand Down
45 changes: 30 additions & 15 deletions api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/prometheus/common/route"
"github.com/prometheus/common/version"

"github.com/prometheus/alertmanager/alertobserver"
"github.com/prometheus/alertmanager/api/metrics"
"github.com/prometheus/alertmanager/cluster"
"github.com/prometheus/alertmanager/config"
Expand Down Expand Up @@ -67,14 +68,15 @@ func setCORS(w http.ResponseWriter) {

// API provides registration of handlers for API routes.
type API struct {
alerts provider.Alerts
silences *silence.Silences
config *config.Config
route *dispatch.Route
uptime time.Time
peer cluster.ClusterPeer
logger log.Logger
m *metrics.Alerts
alerts provider.Alerts
silences *silence.Silences
config *config.Config
route *dispatch.Route
uptime time.Time
peer cluster.ClusterPeer
logger log.Logger
m *metrics.Alerts
alertLCObserver alertobserver.LifeCycleObserver

getAlertStatus getAlertStatusFn

Expand All @@ -91,19 +93,21 @@ func New(
peer cluster.ClusterPeer,
l log.Logger,
r prometheus.Registerer,
o alertobserver.LifeCycleObserver,
) *API {
if l == nil {
l = log.NewNopLogger()
}

return &API{
alerts: alerts,
silences: silences,
getAlertStatus: sf,
uptime: time.Now(),
peer: peer,
logger: l,
m: metrics.NewAlerts("v1", r),
alerts: alerts,
silences: silences,
getAlertStatus: sf,
uptime: time.Now(),
peer: peer,
logger: l,
m: metrics.NewAlerts("v1", r),
alertLCObserver: o,
}
}

Expand Down Expand Up @@ -447,6 +451,10 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*
if err := a.Validate(); err != nil {
validationErrs.Add(err)
api.m.Invalid().Inc()
if api.alertLCObserver != nil {
m := alertobserver.AlertEventMeta{"msg": err.Error()}
api.alertLCObserver.Observe(alertobserver.EventAlertRejected, []*types.Alert{a}, m)
}
continue
}
validAlerts = append(validAlerts, a)
Expand All @@ -456,8 +464,15 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*
typ: errorInternal,
err: err,
}, nil)
if api.alertLCObserver != nil {
m := alertobserver.AlertEventMeta{"msg": err.Error()}
api.alertLCObserver.Observe(alertobserver.EventAlertRejected, validAlerts, m)
}
return
}
if api.alertLCObserver != nil {
api.alertLCObserver.Observe(alertobserver.EventAlertReceived, validAlerts, alertobserver.AlertEventMeta{})
}

if validationErrs.Len() > 0 {
api.respondError(w, apiError{
Expand Down
73 changes: 71 additions & 2 deletions api/v1/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/prometheus/alertmanager/alertobserver"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/dispatch"
"github.com/prometheus/alertmanager/pkg/labels"
Expand Down Expand Up @@ -134,7 +135,7 @@ func TestAddAlerts(t *testing.T) {
}

alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err)
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil)
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil, nil)
defaultGlobalConfig := config.DefaultGlobalConfig()
route := config.Route{}
api.Update(&config.Config{
Expand All @@ -153,6 +154,74 @@ func TestAddAlerts(t *testing.T) {
body, _ := io.ReadAll(res.Body)

require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, StartsAt %v, EndsAt %v, Response: %s", i, tc.start, tc.end, string(body)))

observer := alertobserver.NewFakeLifeCycleObserver()
api.alertLCObserver = observer
r, err = http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b))
w = httptest.NewRecorder()
if err != nil {
t.Errorf("Unexpected error %v", err)
}
api.addAlerts(w, r)
if tc.code == 200 {
require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), alerts[0].Fingerprint())
} else {
require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), alerts[0].Fingerprint())
}
}
}

func TestAddAlertsWithAlertLCObserver(t *testing.T) {
now := func(offset int) time.Time {
return time.Now().Add(time.Duration(offset) * time.Second)
}

for i, tc := range []struct {
start, end time.Time
err bool
code int
}{
{time.Time{}, time.Time{}, false, 200},
{now(1), now(0), false, 400},
{now(0), time.Time{}, true, 500},
} {
alerts := []model.Alert{{
StartsAt: tc.start,
EndsAt: tc.end,
Labels: model.LabelSet{"label1": "test1"},
Annotations: model.LabelSet{"annotation1": "some text"},
}}
b, err := json.Marshal(&alerts)
if err != nil {
t.Errorf("Unexpected error %v", err)
}

alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err)
observer := alertobserver.NewFakeLifeCycleObserver()
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil, observer)
defaultGlobalConfig := config.DefaultGlobalConfig()
route := config.Route{}
api.Update(&config.Config{
Global: &defaultGlobalConfig,
Route: &route,
})

r, err := http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b))
w := httptest.NewRecorder()
if err != nil {
t.Errorf("Unexpected error %v", err)
}

api.addAlerts(w, r)
res := w.Result()
body, _ := io.ReadAll(res.Body)

require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, StartsAt %v, EndsAt %v, Response: %s", i, tc.start, tc.end, string(body)))
if tc.code == 200 {
require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), alerts[0].Fingerprint())
} else {
require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), alerts[0].Fingerprint())
}
}
}

Expand Down Expand Up @@ -267,7 +336,7 @@ func TestListAlerts(t *testing.T) {
},
} {
alertsProvider := newFakeAlerts(alerts, tc.err)
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil)
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil, nil)
api.route = dispatch.NewRoute(&config.Route{Receiver: "def-receiver"}, nil)

r, err := http.NewRequest("GET", "/api/v1/alerts", nil)
Expand Down
19 changes: 17 additions & 2 deletions api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
alertgroupinfolist_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alertgroupinfolist"
"github.com/prometheus/alertmanager/util/callback"

"github.com/prometheus/alertmanager/alertobserver"
"github.com/prometheus/alertmanager/api/metrics"
open_api_models "github.com/prometheus/alertmanager/api/v2/models"
"github.com/prometheus/alertmanager/api/v2/restapi"
Expand Down Expand Up @@ -76,8 +77,9 @@ type API struct {
route *dispatch.Route
setAlertStatus setAlertStatusFn

logger log.Logger
m *metrics.Alerts
logger log.Logger
m *metrics.Alerts
alertLCObserver alertobserver.LifeCycleObserver

Handler http.Handler
}
Expand All @@ -100,6 +102,7 @@ func NewAPI(
peer cluster.ClusterPeer,
l log.Logger,
r prometheus.Registerer,
o alertobserver.LifeCycleObserver,
) (*API, error) {
if apiCallback == nil {
apiCallback = callback.NoopAPICallback{}
Expand All @@ -115,6 +118,7 @@ func NewAPI(
logger: l,
m: metrics.NewAlerts("v2", r),
uptime: time.Now(),
alertLCObserver: o,
}

// Load embedded swagger file.
Expand Down Expand Up @@ -402,19 +406,30 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.
if err := a.Validate(); err != nil {
validationErrs.Add(err)
api.m.Invalid().Inc()
if api.alertLCObserver != nil {
m := alertobserver.AlertEventMeta{"msg": err.Error()}
api.alertLCObserver.Observe(alertobserver.EventAlertRejected, []*types.Alert{a}, m)
}
continue
}
validAlerts = append(validAlerts, a)
}
if err := api.alerts.Put(validAlerts...); err != nil {
level.Error(logger).Log("msg", "Failed to create alerts", "err", err)
if api.alertLCObserver != nil {
m := alertobserver.AlertEventMeta{"msg": err.Error()}
api.alertLCObserver.Observe(alertobserver.EventAlertRejected, validAlerts, m)
}
return alert_ops.NewPostAlertsInternalServerError().WithPayload(err.Error())
}

if validationErrs.Len() > 0 {
level.Error(logger).Log("msg", "Failed to validate alerts", "err", validationErrs.Error())
return alert_ops.NewPostAlertsBadRequest().WithPayload(validationErrs.Error())
}
if api.alertLCObserver != nil {
api.alertLCObserver.Observe(alertobserver.EventAlertReceived, validAlerts, alertobserver.AlertEventMeta{})
}

return alert_ops.NewPostAlertsOK()
}
Expand Down
Loading