Skip to content

Commit

Permalink
Reduce reactor stack trace depth (#9923)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit authored Nov 27, 2023
1 parent c0ff484 commit 952ecd0
Showing 1 changed file with 20 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.util.function.Supplier;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
Expand Down Expand Up @@ -56,49 +57,53 @@ public TracingSubscriber(

@Override
public void onSubscribe(Subscription subscription) {
withActiveSpan(() -> subscriber.onSubscribe(subscription));
try (Scope ignore = openScope()) {
subscriber.onSubscribe(subscription);
}
}

@Override
public void onNext(T o) {
withActiveSpan(() -> subscriber.onNext(o));
try (Scope ignore = openScope()) {
subscriber.onNext(o);
}
}

@Override
public void onError(Throwable throwable) {
Supplier<Scope> scopeSupplier;
if (!hasContextToPropagate
&& (fluxRetrySubscriberClass == subscriber.getClass()
|| fluxRetryWhenSubscriberClass == subscriber.getClass())) {
// clear context for retry to avoid having retried operations run with currently active
// context as parent context
withActiveSpan(io.opentelemetry.context.Context.root(), () -> subscriber.onError(throwable));
scopeSupplier = () -> openScope(io.opentelemetry.context.Context.root());
} else {
withActiveSpan(() -> subscriber.onError(throwable));
scopeSupplier = () -> openScope();
}
try (Scope ignore = scopeSupplier.get()) {
subscriber.onError(throwable);
}
}

@Override
public void onComplete() {
withActiveSpan(subscriber::onComplete);
try (Scope ignore = openScope()) {
subscriber.onComplete();
}
}

@Override
public Context currentContext() {
return context;
}

private void withActiveSpan(Runnable runnable) {
withActiveSpan(hasContextToPropagate ? traceContext : null, runnable);
private Scope openScope() {
return openScope(hasContextToPropagate ? traceContext : null);
}

private static void withActiveSpan(io.opentelemetry.context.Context context, Runnable runnable) {
if (context != null) {
try (Scope ignored = context.makeCurrent()) {
runnable.run();
}
} else {
runnable.run();
}
private static Scope openScope(io.opentelemetry.context.Context context) {
return context != null ? context.makeCurrent() : null;
}

private static Class<?> getFluxRetrySubscriberClass() {
Expand Down

0 comments on commit 952ecd0

Please sign in to comment.