Skip to content

Commit

Permalink
Add support for enforced labels.
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanGuedes committed Jan 6, 2025
1 parent 0462fb0 commit f5559b8
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 1 deletion.
51 changes: 50 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,16 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
continue
}

if missing, lbMissing := d.missingEnforcedLabels(stream, tenantID, validationContext); missing {
err := fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, tenantID, lbMissing)
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
validation.DiscardedSamples.WithLabelValues(validation.MissingEnforcedLabels, tenantID).Add(float64(len(stream.Entries)))
discardedBytes := util.EntriesTotalSize(stream.Entries)
validation.DiscardedBytes.WithLabelValues(validation.MissingEnforcedLabels, tenantID).Add(float64(discardedBytes))
continue
}

// if the given stream is blocked due to its scope ingestion, we skip it.
blocked, ingestionScope := d.streamIsBlocked(stream, tenantID, validationContext, now)
if blocked {
Expand Down Expand Up @@ -606,7 +616,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return &logproto.PushResponse{}, validationErr
}

// we first check if the ingestion is blocked globally for this tenant..
if block, until, retStatusCode := d.validator.ShouldBlockIngestion(validationContext, now); block {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCount, validatedLineSize, validation.BlockedIngestion)

Expand Down Expand Up @@ -733,6 +742,46 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
}

func (d *Distributor) fetchLbs(stream logproto.Stream, tenantID string, validationContext validationContext) (labels.Labels, error) {
val, ok := d.labelCache.Get(stream.Labels)
if ok {
return val.ls, nil
}

ls, err := syntax.ParseLabels(stream.Labels)
if err != nil {
return nil, fmt.Errorf(validation.InvalidLabelsErrorMsg, stream.Labels, err)
}

d.labelCache.Add(stream.Labels, labelData{ls, ls.Hash()})

return ls, nil
}

// missingEnforcedLabels returns true if the stream is missing any of the required labels.
//
// It also returns the first label that is missing if any (for the case of multiple labels missing).
func (d *Distributor) missingEnforcedLabels(stream logproto.Stream, tenantID string, validationContext validationContext) (bool, string) {
requiredLbs := validationContext.enforcedLabels
if len(requiredLbs) == 0 {
// no enforced labels configured.
return false, ""
}

ls, err := d.fetchLbs(stream, tenantID, validationContext)
if err != nil {
return true, ""
}

for _, lb := range requiredLbs {
if !ls.Has(lb) {
return true, lb
}
}

return false, ""
}

// streamIsBlocked checks if the given stream is blocked by looking at its scope ingestion when configured for the tenant.
func (d *Distributor) streamIsBlocked(stream logproto.Stream, tenantID string, validationContext validationContext, now time.Time) (bool, string) {
scopeIngestionLb := d.validator.Limits.ScopeIngestionLabel(tenantID)
Expand Down
1 change: 1 addition & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Limits interface {
BlockScopeIngestionUntil(userID string) map[string]flagext.Time
BlockScopeIngestionStatusCode(userID string) map[string]int
ScopeIngestionLabel(userID string) string
EnforcedLabels(userID string) []string

IngestionPartitionsTenantShardSize(userID string) int
}
11 changes: 11 additions & 0 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type validationContext struct {
blockScopeIngestionUntil map[string]flagext.Time
blockScopeIngestionStatusCode map[string]int
scopeIngestionLabel string
enforcedLabels []string

userID string
}
Expand Down Expand Up @@ -85,6 +86,7 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val
blockScopeIngestionUntil: v.BlockScopeIngestionUntil(userID),
blockScopeIngestionStatusCode: v.BlockScopeIngestionStatusCode(userID),
scopeIngestionLabel: v.ScopeIngestionLabel(userID),
enforcedLabels: v.EnforcedLabels(userID),
}
}

Expand Down Expand Up @@ -222,6 +224,15 @@ func (v Validator) ShouldBlockScopeIngestion(ctx validationContext, scope string
return now.Before(ts), ts, ctx.blockScopeIngestionStatusCode[scope]
}

func (v Validator) hasEnforcedLabels(enforcedLabels []string, stream logproto.Stream) bool {
for _, label := range enforcedLabels {
if !stream.Labels.Has(label) {
return false
}
}
return true
}

func updateMetrics(reason, userID string, stream logproto.Stream) {
validation.DiscardedSamples.WithLabelValues(reason, userID).Add(float64(len(stream.Entries)))
bytes := util.EntriesTotalSize(stream.Entries)
Expand Down
1 change: 1 addition & 0 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ type Limits struct {
BlockScopeIngestionUntil map[string]dskit_flagext.Time `yaml:"block_scope_ingestion_until" json:"block_scope_ingestion_until" category:"experimental"`
BlockScopeIngestionStatusCode map[string]int `yaml:"block_scope_ingestion_status_code" json:"block_scope_ingestion_status_code" category:"experimental"`
ScopeIngestionLabel string `yaml:"scope_ingestion_label" json:"scope_ingestion_label" category:"experimental"`
EnforcedLabels []string `yaml:"enforced_labels" json:"enforced_labels" category:"experimental"`

IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"`

Expand Down
2 changes: 2 additions & 0 deletions pkg/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ const (
BlockedIngestionErrorMsg = "ingestion blocked for user %s until '%s' with status code '%d'"
BlockedScopeIngestion = "blocked_scope_ingestion"
BlockedScopeIngestionErrorMsg = "ingestion blocked for user %s until '%s' with status code '%d' for scope '%s'"
MissingEnforcedLabels = "missing_enforced_labels"
MissingEnforcedLabelsErrorMsg = "missing required labels %s for user %s"
)

type ErrStreamRateLimit struct {
Expand Down

0 comments on commit f5559b8

Please sign in to comment.