Skip to content

Commit

Permalink
Add additional timestamps for evaluation
Browse files Browse the repository at this point in the history
  • Loading branch information
tomijange committed Nov 26, 2024
1 parent 358e18c commit 7b1f094
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 7 deletions.
17 changes: 12 additions & 5 deletions src/main/java/ganges/GangesEvaluation.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,16 @@ public JobExecutionResult execute(ParameterTool parameters) throws Exception {

String evalDescription = String.format("k%d_delta%d_l%d_beta%d_zeta%d_mu%d_run%d", k, delta, l, beta, zeta, mu, runId);

SingleOutputStreamOperator<Tuple18<Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object>> source = env.socketTextStream(sutHost, sutPortWrite)
SingleOutputStreamOperator<Tuple22<Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object>> source = env.socketTextStream(sutHost, sutPortWrite)
.map(new StringToTuple<>());
// Create a stream of custom elements and apply transformations
DataStream<Tuple19<Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object>> dataStream = source
.returns(TypeInformation.of(new TypeHint<Tuple18<Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object>>() {
DataStream<Tuple23<Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object>> dataStream = source
.returns(TypeInformation.of(new TypeHint<Tuple22<Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object>>() {
}))
.keyBy(tuple -> tuple.getField(0))
.connect(ruleBroadcastStream)
.process(new CastleFunction<Long, Tuple18<Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object>, Tuple19<Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object>>(0, k, l, delta, beta, zeta, mu, true, 0, rules))
.returns(TypeInformation.of(new TypeHint<Tuple19<Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object>>() {
.process(new CastleFunction<Long, Tuple22<Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object>, Tuple23<Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object>>(0, k, l, delta, beta, zeta, mu, true, 0, rules))
.returns(TypeInformation.of(new TypeHint<Tuple23<Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object, Object>>() {
}))
.name(evalDescription);

Expand Down Expand Up @@ -115,6 +115,11 @@ public enum DatasetFields {
M_ID,
// ingestion timestamp
TS,
// for performance timestamps
T_BS,
T_BSE,
T_D,
T_DE
;

private final BaseGeneralizer generalizer;
Expand Down Expand Up @@ -162,6 +167,8 @@ public static class TupleToString<T extends Tuple> implements SerializationSchem

@Override
public byte[] serialize(T element) {
element.setField(System.currentTimeMillis(), DatasetFields.T_DE.getId());

StringWriter writer = new StringWriter();
for (int i = 0 ; i < element.getArity() ; i++) {
String sub = StringUtils.arrayAwareToString(element.getField(i));
Expand Down
11 changes: 9 additions & 2 deletions src/main/java/prink/CastleFunction.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package prink;

import ganges.GangesEvaluation;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
Expand Down Expand Up @@ -192,8 +193,7 @@ public void processElement(INPUT tuple, ReadOnlyContext context, Collector<OUTPU
LOG.debug("Element received: key: {}", tuple.getField(posTupleId).toString());

TimerService ts = context.timerService();

// tuple.setField(ts.currentProcessingTime(), 2); // enable for performance testing only
tuple.setField(System.currentTimeMillis(), GangesEvaluation.DatasetFields.T_BS.getId());

eventTimeLag.update(ts.currentProcessingTime() - ts.currentWatermark());

Expand All @@ -211,6 +211,8 @@ public void processElement(INPUT tuple, ReadOnlyContext context, Collector<OUTPU
bestCluster.addEntry(tuple);
globalTuples.add(tuple);

tuple.setField(System.currentTimeMillis(), GangesEvaluation.DatasetFields.T_BSE.getId());

// Different approach than CASTLE. Approach from the CASTLEGUARD code (see: https://github.com/hallnath1/CASTLEGUARD)
if(globalTuples.size() > delta) delayConstraint(globalTuples.get(0), output);
}
Expand All @@ -227,6 +229,11 @@ private void delayConstraint(Tuple input, Collector<OUTPUT> output) {
LOG.error("delayConstraint -> clusterWithInput is NULL");
return;
}

for (Tuple tuple: globalTuples) {
tuple.setField(System.currentTimeMillis(), GangesEvaluation.DatasetFields.T_D.getId());
}

if(clusterWithInput.size() >= k && clusterWithInput.diversity(posSensibleAttributes) >= l){
outputCluster(clusterWithInput, output);
}else{
Expand Down

0 comments on commit 7b1f094

Please sign in to comment.