Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TEST-6] Read after write latency testing. #6

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6f4b591
Add measurement based on system time.
May 17, 2021
b167346
Typo fix
May 17, 2021
cc7de2c
Change separator
May 17, 2021
af7cf21
Change separator
May 17, 2021
095071d
Add epoch into string.
May 17, 2021
f04dbbd
Change measurements
May 18, 2021
5ef5057
Refactor
May 18, 2021
8e0c3ab
Include write phase
May 18, 2021
6381fff
Try to measure commit <-> committed inside separate thread
May 19, 2021
67245f0
Added executor service to measure committing <-> committed
May 19, 2021
ea07936
Refactor
May 19, 2021
b64cfc1
Add measurement of write only
May 21, 2021
a3c6ccd
Add debug log level to Transaction Producer
May 23, 2021
b109d29
Fix markup
May 23, 2021
277760f
Change root logger level to debug
May 24, 2021
8540182
[M-Writers] Measure beginTxn, Write, Commit separately.
May 24, 2021
da5237d
Revert log4j-related changes
May 24, 2021
9e220f5
Add handling of empty transactions.
May 31, 2021
51951b2
Add handling of empty transactions.
May 31, 2021
9c04dea
Remove duplicating code
May 31, 2021
67086f6
Handle commit only
May 31, 2021
922a0e6
Add calculation of transaction count
Jun 15, 2021
bcecae8
Increase transaction timeout
Jun 18, 2021
19d2bb4
Put transaction ID with timestamp into payload
Jun 18, 2021
d2940c3
MEasure read-after-write latency for individual events
Jun 18, 2021
3c9e499
Update read-after-write latency. MEasure total amount of written tran…
Jun 21, 2021
bfb9422
Update read-after-write latency calculation
Jun 21, 2021
a5ba14a
Impliment read-after-write latency
Jun 22, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions benchmark-framework/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,18 @@
<version>4.1.50.Final</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.1.50.Final</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>4.1.50.Final</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ static class Arguments {

public static void main(String[] args) throws Exception {
final Arguments arguments = new Arguments();
System.out.println("Arguments" + arguments.toString());
JCommander jc = new JCommander(arguments);
jc.setProgramName("messaging-benchmark");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,19 @@ private void ensureTopicsAreReady() throws IOException {
// In this case we just publish 1 message and then wait for consumers to receive the data
worker.probeProducers();

while (true) {
CountersStats stats = worker.getCountersStats();

if (stats.messagesReceived < expectedMessages) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} else {
break;
}
}
// while (true) {
// CountersStats stats = worker.getCountersStats();
//
// if (stats.messagesReceived < expectedMessages) {
// try {
// Thread.sleep(100);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// } else {
// break;
// }
// }

log.info("All consumers are ready");
}
Expand Down
2 changes: 1 addition & 1 deletion benchmark-framework/src/main/resources/log4j2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ Configuration:
additivity: false
AppenderRef:
ref: Console


Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.UUID;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -44,10 +46,13 @@ public class PravegaBenchmarkConsumer implements BenchmarkConsumer {
private final ExecutorService executor;
private final EventStreamReader<ByteBuffer> reader;
private final AtomicBoolean closed = new AtomicBoolean(false);
private ConcurrentHashMap<String, List<Long>> txnsWithEvents;


public PravegaBenchmarkConsumer(String streamName, String scopeName, String subscriptionName, ConsumerCallback consumerCallback,
EventStreamClientFactory clientFactory, ReaderGroupManager readerGroupManager,
boolean includeTimestampInEvent) {
boolean includeTimestampInEvent, ConcurrentHashMap txnsWithEvents) {
this.txnsWithEvents = txnsWithEvents;
log.info("PravegaBenchmarkConsumer: BEGIN: subscriptionName={}, streamName={}", subscriptionName, streamName);
// Create reader group if it doesn't already exist.
final ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder()
Expand All @@ -66,16 +71,60 @@ public PravegaBenchmarkConsumer(String streamName, String scopeName, String subs
while (!closed.get()) {
try {
final ByteBuffer event = reader.readNextEvent(1000).getEvent();
final long eventInTxnReadTs = System.nanoTime();
if (event != null) {
long eventTimestamp;
if (includeTimestampInEvent) {
eventTimestamp = event.getLong();
} else {
// This will result in an invalid end-to-end latency measurement of 0 seconds.
eventTimestamp = TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE);
long eventTimestamp = TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE);;
// if (includeTimestampInEvent) {
// eventTimestamp = event.getLong();
// } else {
// // This will result in an invalid end-to-end latency measurement of 0 seconds.
// eventTimestamp = TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE);
// }
final String test = new String(event.array(), event.arrayOffset() + event.position(), event.remaining(), StandardCharsets.UTF_16);
final String[] payloadComponents = test.split("---");
if (payloadComponents.length < 2) {
continue;
}
final String txnId = payloadComponents[0];
final long txnWriteTs = Long.parseLong(payloadComponents[1]);
final long readDurationMs = (eventInTxnReadTs - txnWriteTs) / (long) 1000000;
this.txnsWithEvents.compute(txnId, (k,v) -> {
if (v == null) {
v = new LinkedList<Long>();
}
v.add(readDurationMs);
if (v.size() == 1000) {
double summary = 0.0;
for (Iterator<Long> i = v.iterator(); i.hasNext();) {
long currentTime = i.next();
summary += currentTime;
}
double average = (summary) / v.size();
log.info("READTXN---ID---" + txnId + "---AVG---" + average);

}
return v;
});
// List<Long> readEventsInTxn = this.txnsWithEvents.putIfAbsent(txnId, Arrays.asList(readDurationMs));
// if (readEventsInTxn != null) {
// readEventsInTxn.add(txnWriteTs);
// // TODO: make the value configurable
// if (readEventsInTxn.size() == 1000) {
// double summary = 0.0;
// for (Iterator<Long> i = readEventsInTxn.iterator(); i.hasNext();) {
// long currentTime = i.next();
// summary += currentTime;
// }
// double average = (summary) / readEventsInTxn.size();
// log.info("READTXN---ID---" + txnId + "---AVG---" + average);
//
// }
// }
// log.info("Transaction " + payloadComponents[0] + "---" + readDurationMs);
byte[] payload = new byte[event.remaining()];
event.get(payload);
// String payloadStr = new String(payload, StandardCharsets.UTF_16);
// event.get(payload);

consumerCallback.messageReceived(payload, eventTimestamp);
}
} catch (ReinitializationRequiredException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class PravegaBenchmarkDriver implements BenchmarkDriver {
private static final Logger log = LoggerFactory.getLogger(PravegaBenchmarkDriver.class);
Expand All @@ -56,17 +58,23 @@ public class PravegaBenchmarkDriver implements BenchmarkDriver {
private ReaderGroupManager readerGroupManager;
private EventStreamClientFactory clientFactory;
private final List<String> createdTopics = new ArrayList<>();
private ConcurrentHashMap<String, List> txnsWithEvents;
private AtomicInteger totalTxnAmount;


@Override
public void initialize(File configurationFile, StatsLogger statsLogger) throws IOException {
config = readConfig(configurationFile);
log.info("Pravega driver configuration: {}", objectWriter.writeValueAsString(config));

clientConfig = ClientConfig.builder().controllerURI(URI.create(config.client.controllerURI)).build();
clientConfig = ClientConfig.builder().controllerURI(URI.create(config.client.controllerURI)).validateHostName(false).build();
scopeName = config.client.scopeName;
streamManager = StreamManager.create(clientConfig);
readerGroupManager = ReaderGroupManager.withScope(scopeName, clientConfig);
clientFactory = EventStreamClientFactory.withScope(scopeName, clientConfig);

// Initialize fields for read-after-write measurement purposes
this.txnsWithEvents = new ConcurrentHashMap<>();
this.totalTxnAmount = new AtomicInteger();
}

private static final ObjectMapper mapper = new ObjectMapper(new YAMLFactory())
Expand Down Expand Up @@ -118,7 +126,7 @@ public CompletableFuture<BenchmarkProducer> createProducer(String topic) {
BenchmarkProducer producer = null;
if (config.enableTransaction) {
producer = new PravegaBenchmarkTransactionProducer(topic, clientFactory, config.includeTimestampInEvent,
config.writer.enableConnectionPooling, config.eventsPerTransaction);
config.writer.enableConnectionPooling, config.eventsPerTransaction, this.totalTxnAmount);
} else {
producer = new PravegaBenchmarkProducer(topic, clientFactory, config.includeTimestampInEvent,
config.writer.enableConnectionPooling);
Expand All @@ -132,11 +140,12 @@ public CompletableFuture<BenchmarkConsumer> createConsumer(String topic, String
topic = cleanName(topic);
subscriptionName = cleanName(subscriptionName);
BenchmarkConsumer consumer = new PravegaBenchmarkConsumer(topic, scopeName, subscriptionName, consumerCallback,
clientFactory, readerGroupManager, config.includeTimestampInEvent);
clientFactory, readerGroupManager, config.includeTimestampInEvent, this.txnsWithEvents);
return CompletableFuture.completedFuture(consumer);
}

private void deleteTopics() {
log.info("---CREATED-AMM-OF-TXN-FROM-ALL-PRODUCERS---" + this.totalTxnAmount.get());
synchronized (createdTopics) {
for (String topic : createdTopics) {
log.info("deleteTopics: topic={}", topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public PravegaBenchmarkProducer(String streamName, EventStreamClientFactory clie
new ByteBufferSerializer(),
EventWriterConfig.builder()
.enableConnectionPooling(enableConnectionPooling)
.transactionTimeoutTime(120000)
.build());
this.includeTimestampInEvent = includeTimestampInEvent;
}
Expand Down
Loading