Skip to content

Commit

Permalink
Block at entry/stream iteration level.
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanGuedes committed Jan 5, 2025
1 parent 4ee1420 commit cda0b3a
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 42 deletions.
79 changes: 37 additions & 42 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,18 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
continue
}

// if the given stream is blocked due to its scope ingestion, we skip it.
blocked := d.streamIsBlocked(ctx, stream, tenantID, validationContext, now)
if blocked {
err := fmt.Errorf(validation.BlockedScopeIngestionErrorMsg, tenantID, stream.Labels, now.Format(time.RFC3339), http.StatusOK)

Check failure on line 514 in pkg/distributor/distributor.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

printf: fmt.Errorf format %d has arg now.Format(time.RFC3339) of wrong type string (govet)
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
validation.DiscardedSamples.WithLabelValues(validation.BlockedScopeIngestion, tenantID).Add(float64(len(stream.Entries)))
discardedBytes := util.EntriesTotalSize(stream.Entries)
validation.DiscardedBytes.WithLabelValues(validation.BlockedScopeIngestion, tenantID).Add(float64(discardedBytes))
continue
}

// Truncate first so subsequent steps have consistent line lengths
d.truncateLines(validationContext, &stream)

Expand Down Expand Up @@ -610,48 +622,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return nil, httpgrpc.Errorf(retStatusCode, "%s", err.Error())
}

// now we check if the ingestion is blocked for the given scope.
scopeIngestionLb := d.validator.Limits.ScopeIngestionLabel(tenantID)
if scopeIngestionLb != "" {
allOk := true
var lastScopeErr error
unblockedStreams := make([]KeyedStream, 0, len(streams))
for _, stream := range streams {
lbs, _, _, err := d.parseStreamLabels(validationContext, stream.Stream.Labels, stream.Stream)
if err != nil {
continue
}
scope := lbs.Get(scopeIngestionLb)
if scope == "" {
// scope is not set, so we skip the scope ingestion check.
continue
}

if block, until, retStatusCode := d.validator.ShouldBlockScopeIngestion(validationContext, scope, now); block {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCount, validatedLineSize, validation.BlockedIngestion)
// TODO: remove the stream from the list

err = fmt.Errorf(validation.BlockedScopeIngestionErrorMsg, tenantID, scope, until.Format(time.RFC3339), retStatusCode)
d.writeFailuresManager.Log(tenantID, err)
lastScopeErr = err
if retStatusCode != http.StatusOK {
allOk = false
}
} else {
unblockedStreams = append(unblockedStreams, stream)
}
}
streams = unblockedStreams
if len(streams) == 0 && lastScopeErr != nil {
if allOk {
return &logproto.PushResponse{}, nil
}

// all streams are blocked, so we return the last error seen.
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "%s", lastScopeErr.Error())
}
}

if !d.ingestionRateLimiter.AllowN(now, tenantID, validatedLineSize) {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validatedLineCount, validatedLineSize, validation.RateLimited)

Expand Down Expand Up @@ -763,6 +733,31 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
}

// streamIsBlocked checks if the given stream is blocked by looking at its scope ingestion when configured for the tenant.
func (d *Distributor) streamIsBlocked(ctx context.Context, stream logproto.Stream, tenantID string, validationContext validationContext, now time.Time) bool {

Check warning on line 737 in pkg/distributor/distributor.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
scopeIngestionLb := d.validator.Limits.ScopeIngestionLabel(tenantID)
if scopeIngestionLb == "" {
// not configured.
return false
}

lbs, _, _, err := d.parseStreamLabels(validationContext, stream.Labels, stream)
if err != nil {
return false
}
scope := lbs.Get(scopeIngestionLb)
if scope == "" {
// stream isn't telling its scope, so we default to not blocking it.
return false
}

if block, _, _ := d.validator.ShouldBlockScopeIngestion(validationContext, scope, now); block {
return true
}

return false
}

func (d *Distributor) trackDiscardedData(
ctx context.Context,
req *logproto.PushRequest,
Expand Down
1 change: 1 addition & 0 deletions pkg/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ const (
StructuredMetadataTooManyErrorMsg = "stream '%s' has too many structured metadata labels: '%d', limit: '%d'. Please see `limits_config.max_structured_metadata_entries_count` or contact your Loki administrator to increase it."
BlockedIngestion = "blocked_ingestion"
BlockedIngestionErrorMsg = "ingestion blocked for user %s until '%s' with status code '%d'"
BlockedScopeIngestion = "blocked_scope_ingestion"
BlockedScopeIngestionErrorMsg = "ingestion blocked for user %s until '%s' with status code '%d' for scope '%s'"
)

Expand Down

0 comments on commit cda0b3a

Please sign in to comment.