-
Notifications
You must be signed in to change notification settings - Fork 807
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Eddie Torres <[email protected]>
- Loading branch information
Showing
6 changed files
with
325 additions
and
92 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,208 @@ | ||
// Package metrics provides functionality for recording and serving | ||
// metric information related to AWS API calls and internal driver operations. | ||
package metrics | ||
|
||
import ( | ||
"net/http" | ||
"sync" | ||
|
||
"k8s.io/component-base/metrics" | ||
"k8s.io/klog/v2" | ||
) | ||
|
||
const ( | ||
// Metric types | ||
AWS metricType = iota | ||
Internal | ||
|
||
// AWS API metrics | ||
awsDurationMetric = "cloudprovider_aws_api_request_duration_seconds" | ||
awsThrottleMetric = "cloudprovider_aws_api_throttled_requests_total" | ||
awsErrorMetric = "cloudprovider_aws_api_request_errors" | ||
|
||
// Internal driver metrics | ||
internalDurationMetric = "ebs_csi_aws_com_duration_seconds" | ||
internalValueMetric = "ebs_csi_aws_com_operation" | ||
internalErrorMetric = "ebs_csi_aws_com_errors_total" | ||
) | ||
|
||
var ( | ||
// r is the singleton instance of MetricRecorder. | ||
r *metricRecorder | ||
|
||
// initialize ensures that MetricRecorder is initialized only once. | ||
initialize sync.Once | ||
) | ||
|
||
// metricType is an enum for the different types of metrics. | ||
type metricType int | ||
|
||
// metricRecorder provides an organized way of recording and managing metrics related to both AWS and driver operations. | ||
type metricRecorder struct { | ||
// registry is the central store where all metrics are registered and exposed for collection. | ||
registry metrics.KubeRegistry | ||
|
||
// metrics maps a metric type (AWS or Internal) to its associated measurement data -- duration, values, error counts, and throttling events. | ||
metrics map[metricType]metricFields | ||
} | ||
|
||
// metricFields holds the metrics for a specific metric type. | ||
type metricFields struct { | ||
duration *metrics.HistogramVec | ||
value *metrics.HistogramVec | ||
err *metrics.CounterVec | ||
throttle *metrics.CounterVec | ||
} | ||
|
||
// Recorder returns the singleton instance of MetricRecorder. | ||
// Metrics are registered only once throughout the driver's lifetime. | ||
func Recorder() *metricRecorder { | ||
initialize.Do(func() { | ||
r = newRecorder() | ||
r.registerMetrics() | ||
}) | ||
return r | ||
} | ||
|
||
// Duration logs the duration of an operation for the specified metric type. | ||
// | ||
// Parameters: | ||
// - t: The type of the metric (e.g., AWS or Internal). | ||
// - o: The name of the operation for which the metric is being recorded. | ||
// - d: The duration taken for the action in seconds. | ||
func (m *metricRecorder) Duration(t metricType, o string, d float64) { | ||
if metric, ok := m.metrics[t]; ok { | ||
metric.duration.With(metrics.Labels{"request": o}).Observe(d) | ||
} else { | ||
klog.Warningf("Metric type %d not found", t) | ||
} | ||
} | ||
|
||
// Error logs an error for the specified metric type. | ||
// | ||
// Parameters: | ||
// - t: The type of the metric. | ||
// - o: The name of the operation where the error occurred. | ||
func (m *metricRecorder) Error(t metricType, o string) { | ||
if metric, ok := m.metrics[t]; ok { | ||
metric.err.With(metrics.Labels{"request": o}).Inc() | ||
} else { | ||
klog.Warningf("Metric type %d not found", t) | ||
} | ||
} | ||
|
||
// Throttle logs a throttling event for the metric type. | ||
// | ||
// Parameters: | ||
// - t: The type of the metric. | ||
// - o: The name of the operation that was throttled. | ||
func (m *metricRecorder) Throttle(t metricType, o string) { | ||
if metric, ok := m.metrics[t]; ok { | ||
metric.throttle.With(metrics.Labels{"operation_name": o}).Inc() | ||
} else { | ||
klog.Warningf("Metric type %d not found", t) | ||
} | ||
} | ||
|
||
// Value logs a value observation for the specified metric type. | ||
// Parameters: | ||
// - t: The type of the metric. | ||
// - o: The operation name. | ||
// - v: The observed value to be recorded. | ||
func (m *metricRecorder) Value(t metricType, o string, v float64) { | ||
if metric, ok := m.metrics[t]; ok { | ||
metric.value.With(metrics.Labels{"operation": o}).Observe(v) | ||
} else { | ||
klog.Warningf("Metric type %d not found", t) | ||
} | ||
} | ||
|
||
// InitializeHttpHandler sets up and starts an HTTP server to serve the recorded metrics. | ||
// | ||
// Parameters: | ||
// - address: The server's listening address. | ||
// - path: The URL path to access the metrics. | ||
func (m *metricRecorder) InitializeHttpHandler(address, path string) { | ||
mux := http.NewServeMux() | ||
|
||
mux.Handle(path, metrics.HandlerFor( | ||
m.registry, | ||
metrics.HandlerOpts{ | ||
ErrorHandling: metrics.ContinueOnError})) | ||
|
||
go func() { | ||
klog.InfoS("Metric server listening", "address", address, "path", path) | ||
|
||
if err := http.ListenAndServe(address, mux); err != nil { | ||
klog.ErrorS(err, "Failed to start metric server", "address", address, "path", path) | ||
klog.FlushAndExit(klog.ExitFlushTimeout, 1) | ||
} | ||
}() | ||
} | ||
|
||
// newRecorder returns a new instance of MetricRecorder. | ||
func newRecorder() *metricRecorder { | ||
valueBucket := []float64{.1, .25, .5, 1, 2.5, 5, 10, 100, 250, 500, 1000} | ||
|
||
return &metricRecorder{ | ||
registry: metrics.NewKubeRegistry(), | ||
|
||
metrics: map[metricType]metricFields{ | ||
AWS: { | ||
duration: createHistogramVec(awsDurationMetric, "Latency of AWS API calls", []string{"request"}, nil), | ||
throttle: createCounterVec(awsThrottleMetric, "AWS API throttled requests", []string{"operation_name"}), | ||
err: createCounterVec(awsErrorMetric, "AWS API errors", []string{"request"}), | ||
}, | ||
Internal: { | ||
value: createHistogramVec(internalValueMetric, "Driver operation metric values", []string{"operation"}, valueBucket), | ||
duration: createHistogramVec(internalDurationMetric, "Driver operation duration", []string{"operation"}, nil), | ||
err: createCounterVec(internalErrorMetric, "Driver operation errors", []string{"request"}), | ||
}, | ||
}, | ||
} | ||
} | ||
|
||
// registerMetrics registers metrics to the MetricRecorder's internal KubeRegistry. | ||
func (m *metricRecorder) registerMetrics() { | ||
for _, metric := range m.metrics { | ||
if metric.duration != nil { | ||
m.registry.MustRegister(metric.duration) | ||
} | ||
if metric.value != nil { | ||
m.registry.MustRegister(metric.value) | ||
} | ||
if metric.err != nil { | ||
m.registry.MustRegister(metric.err) | ||
} | ||
if metric.throttle != nil { | ||
m.registry.MustRegister(metric.throttle) | ||
} | ||
} | ||
} | ||
|
||
// createHistogramVec helper to create a histogram vector for a specific metric. | ||
func createHistogramVec(name, help string, labels []string, buckets []float64) *metrics.HistogramVec { | ||
opts := &metrics.HistogramOpts{ | ||
Name: name, | ||
Help: help, | ||
StabilityLevel: metrics.ALPHA, | ||
} | ||
|
||
if buckets != nil { | ||
opts.Buckets = buckets | ||
} | ||
|
||
return metrics.NewHistogramVec(opts, labels) | ||
} | ||
|
||
// createCounterVec helper to create a counter vector for a specific metric. | ||
func createCounterVec(name, help string, labels []string) *metrics.CounterVec { | ||
return metrics.NewCounterVec( | ||
&metrics.CounterOpts{ | ||
Name: name, | ||
Help: help, | ||
StabilityLevel: metrics.ALPHA, | ||
}, | ||
labels, | ||
) | ||
} |
Oops, something went wrong.