From d4f26add5fe2e38789e5a3a82cc3cc9561e528f9 Mon Sep 17 00:00:00 2001 From: Gregor Zeitlinger Date: Tue, 1 Oct 2024 14:04:24 +0200 Subject: [PATCH] format Signed-off-by: Gregor Zeitlinger --- .../metrics/core/metrics/Buffer.java | 179 +++++++++--------- 1 file changed, 90 insertions(+), 89 deletions(-) diff --git a/prometheus-metrics-core/src/main/java/io/prometheus/metrics/core/metrics/Buffer.java b/prometheus-metrics-core/src/main/java/io/prometheus/metrics/core/metrics/Buffer.java index 74c1b998e..031939422 100644 --- a/prometheus-metrics-core/src/main/java/io/prometheus/metrics/core/metrics/Buffer.java +++ b/prometheus-metrics-core/src/main/java/io/prometheus/metrics/core/metrics/Buffer.java @@ -1,7 +1,6 @@ package io.prometheus.metrics.core.metrics; import io.prometheus.metrics.model.snapshots.DataPointSnapshot; - import java.util.Arrays; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; @@ -12,102 +11,104 @@ /** * Metrics support concurrent write and scrape operations. - *

- * This is implemented by switching to a Buffer when the scrape starts, - * and applying the values from the buffer after the scrape ends. + * + *

This is implemented by switching to a Buffer when the scrape starts, and applying the values + * from the buffer after the scrape ends. */ class Buffer { - private static final long bufferActiveBit = 1L << 63; - private final AtomicLong observationCount = new AtomicLong(0); - private double[] observationBuffer = new double[0]; - private int bufferPos = 0; - private boolean reset = false; - - ReentrantLock appendLock = new ReentrantLock(); - ReentrantLock runLock = new ReentrantLock(); - Condition bufferFilled = appendLock.newCondition(); - - boolean append(double value) { - long count = observationCount.incrementAndGet(); - if ((count & bufferActiveBit) == 0) { - return false; // sign bit not set -> buffer not active. + private static final long bufferActiveBit = 1L << 63; + private final AtomicLong observationCount = new AtomicLong(0); + private double[] observationBuffer = new double[0]; + private int bufferPos = 0; + private boolean reset = false; + + ReentrantLock appendLock = new ReentrantLock(); + ReentrantLock runLock = new ReentrantLock(); + Condition bufferFilled = appendLock.newCondition(); + + boolean append(double value) { + long count = observationCount.incrementAndGet(); + if ((count & bufferActiveBit) == 0) { + return false; // sign bit not set -> buffer not active. + } else { + doAppend(value); + return true; + } + } + + private void doAppend(double amount) { + try { + appendLock.lock(); + if (bufferPos >= observationBuffer.length) { + observationBuffer = Arrays.copyOf(observationBuffer, observationBuffer.length + 128); + } + observationBuffer[bufferPos] = amount; + bufferPos++; + + bufferFilled.signalAll(); + } finally { + appendLock.unlock(); + } + } + + /** Must be called by the runnable in the run() method. */ + void reset() { + reset = true; + } + + T run( + Function complete, + Supplier createResult, + Consumer observeFunction) { + double[] buffer; + int bufferSize; + T result; + try { + runLock.lock(); + + // Signal that the buffer is active. + Long expectedCount = observationCount.getAndAdd(bufferActiveBit); + try { + appendLock.lock(); + + while (!complete.apply(expectedCount)) { + // Wait until all in-flight threads have added their observations to the buffer. + bufferFilled.await(); + } + result = createResult.get(); + + // Signal that the buffer is inactive. + int expectedBufferSize; + if (reset) { + expectedBufferSize = + (int) ((observationCount.getAndSet(0) & ~bufferActiveBit) - expectedCount); + reset = false; } else { - doAppend(value); - return true; + expectedBufferSize = (int) (observationCount.addAndGet(bufferActiveBit) - expectedCount); } - } - private void doAppend(double amount) { - try { - appendLock.lock(); - if (bufferPos >= observationBuffer.length) { - observationBuffer = Arrays.copyOf(observationBuffer, observationBuffer.length + 128); - } - observationBuffer[bufferPos] = amount; - bufferPos++; - - bufferFilled.signalAll(); - } finally { - appendLock.unlock(); + while (bufferPos < expectedBufferSize) { + // Wait until all in-flight threads have added their observations to the buffer. + bufferFilled.await(); } + } finally { + appendLock.unlock(); + } + + buffer = observationBuffer; + bufferSize = bufferPos; + observationBuffer = new double[0]; + bufferPos = 0; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + runLock.unlock(); } - /** - * Must be called by the runnable in the run() method. - */ - void reset() { - reset = true; - } - - T run(Function complete, Supplier createResult, Consumer observeFunction) { - double[] buffer; - int bufferSize; - T result; - try { - runLock.lock(); - - // Signal that the buffer is active. - Long expectedCount = observationCount.getAndAdd(bufferActiveBit); - try { - appendLock.lock(); - - while (!complete.apply(expectedCount)) { - // Wait until all in-flight threads have added their observations to the buffer. - bufferFilled.await(); - } - result = createResult.get(); - - // Signal that the buffer is inactive. - int expectedBufferSize; - if (reset) { - expectedBufferSize = (int) ((observationCount.getAndSet(0) & ~bufferActiveBit) - expectedCount); - reset = false; - } else { - expectedBufferSize = (int) (observationCount.addAndGet(bufferActiveBit) - expectedCount); - } - - while (bufferPos < expectedBufferSize) { - // Wait until all in-flight threads have added their observations to the buffer. - bufferFilled.await(); - } - } finally { - appendLock.unlock(); - } - - buffer = observationBuffer; - bufferSize = bufferPos; - observationBuffer = new double[0]; - bufferPos = 0; - } catch (InterruptedException e) { - throw new RuntimeException(e); - } finally { - runLock.unlock(); - } - - for (int i = 0; i < bufferSize; i++) { - observeFunction.accept(buffer[i]); - } - return result; + for (int i = 0; i < bufferSize; i++) { + observeFunction.accept(buffer[i]); } + return result; + } }