-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[server][controller] Add MaterializedViewWriter and support view writers in L/F #1296
base: main
Are you sure you want to change the base?
Conversation
e43850a
to
a711263
Compare
791712b
to
bcee9bc
Compare
bcee9bc
to
3c59c8c
Compare
...ient/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java
Outdated
Show resolved
Hide resolved
...ts/da-vinci-client/src/main/java/com/linkedin/davinci/store/view/MaterializedViewWriter.java
Outdated
Show resolved
Hide resolved
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java
Show resolved
Hide resolved
3c59c8c
to
37c15cc
Compare
internal/venice-common/src/main/java/com/linkedin/venice/meta/MaterializedViewParameters.java
Show resolved
Hide resolved
...ces/venice-controller/src/main/java/com/linkedin/venice/pushmonitor/AbstractPushMonitor.java
Show resolved
Hide resolved
...ient/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java
Outdated
Show resolved
Hide resolved
...client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java
Outdated
Show resolved
Hide resolved
...client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java
Outdated
Show resolved
Hide resolved
...ient/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java
Outdated
Show resolved
Hide resolved
...client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java
Show resolved
Hide resolved
clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java
Outdated
Show resolved
Hide resolved
...-job/src/main/java/com/linkedin/venice/hadoop/mapreduce/datawriter/reduce/VeniceReducer.java
Show resolved
Hide resolved
@@ -14,16 +21,36 @@ public class ChangeCaptureView extends VeniceView { | |||
public static final String CHANGE_CAPTURE_VIEW_WRITER_CLASS_NAME = | |||
"com.linkedin.davinci.store.view.ChangeCaptureViewWriter"; | |||
|
|||
public ChangeCaptureView(Properties props, Store store, Map<String, String> viewParameters) { | |||
super(props, store, viewParameters); | |||
public ChangeCaptureView(Properties props, String storeName, Map<String, String> viewParameters) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove this since we don't have plan to support it in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was not aware of this, are you suggesting we remove the entire ChangeCaptureView
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah.
It is supposed to be fully deprecated.
@sixpluszero
Do you know a valid use case?
internal/venice-common/src/main/java/com/linkedin/venice/writer/CompositeVeniceWriter.java
Outdated
Show resolved
Hide resolved
try (VeniceWriter veniceWriter = getVeniceWriterFactory().createVeniceWriter(vwOptionsBuilder.build())) { | ||
veniceWriter.broadcastStartOfPush( | ||
false, | ||
version.isChunkingEnabled(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to make sure AASIT
and LFSIT
would honor the store-level config: compression and chunking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They should right? Also we are getting these configs from Version
here and in AASIT and LFSIT when building the internal VeniceWriter in view writers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
…ers in L/F 1. View writers will be invoked in L/F SIT too instead of only in A/A SIT. We rely on view config validation to ensure views that do require A/A are only added to stores with A/A enabled. 2. This PR only includes creation of materialized view topics, writing of data records and control messages to the materialized view topics in server and controller. - Materialized view topics are created during version creation time along with other view topics. - SOP is sent during view topic creation time with same chunking and compression configs as the store version. - EOP is sent when servers have reported EOP in every partition. - Incremental push control messages SOIP and EOIP are not propagated to the view topic for now because the end to end incremental push tracking story for view topics is not clear yet. Store owners will likely just disable the requirement to wait for view consumers to fully ingest the incremental push. - Ingestion heartbeats will be propagated in a broadcast manner. See implementation for details. - Version swap for CDC users will be implemented in a separate PR to keep this PR somewhat short for review. 3. TODO: one pending issue to be resolved is that during processing of batch records in the native replication source fabric, where we consume local VT, a leader transfer could result in missing records in the materialized view topic. This is because we don't do any global checkpointing across leader and followers when consuming local VT.
045cb18
to
51028b4
Compare
51028b4
to
1e7493f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few more comments.
// because we don't need to replicate these CMs to view topics. If we don't synchronize before producing the | ||
// CMs then in the VT we might get out of order messages and with pass-through DIV that's going to be an | ||
// issue. e.g. a PUT record belonging to seg:0 can come after the EOS of seg:0 due to view writer delays. | ||
checkAndWaitForLastVTProduceFuture(partitionConsumptionState); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory, these CM producing can be done asynchronously by registering a callback into the last VT produce Future, right?
We don't have to make the change in this PR, and let us monitor the ingestion performance when materialized view is enabled.
Async will certainly make logic tricky to understand.
@@ -2534,6 +2552,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord( | |||
* In such case the heartbeat is produced to VT with updated {@link LeaderMetadataWrapper}. | |||
*/ | |||
if (!consumerRecord.getTopicPartition().getPubSubTopic().isRealTime()) { | |||
checkAndWaitForLastVTProduceFuture(partitionConsumptionState); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EOP
is rare, but HB
messages are common, so the perf impact could be bigger since this is blocking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also will there be an issue if we produce these CMs asynchronously?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can make it work if we also set lastVTProduceCallFuture
when producing CMs to VT. Currently we do not set it when writing CMs like heartbeat.
@@ -24,7 +24,8 @@ public Map<String, VeniceViewWriter> buildStoreViewWriters(Store store, int vers | |||
String className = viewConfig.getValue().getViewClassName(); | |||
Map<String, String> extraParams = viewConfig.getValue().getViewParameters(); | |||
VeniceViewWriter viewWriter = | |||
ViewWriterUtils.getVeniceViewWriter(className, properties, store, keySchema, extraParams); | |||
ViewWriterUtils.getVeniceViewWriter(className, properties, store, version, keySchema, extraParams); | |||
viewWriter.configureWriterOptions(store); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the above call will pass store
instance into the ViewWriter constructor, shall we modify the ViewWriter constructors to configure writer options based on the passed store
instance there?
Essentially, in theory, we don't need to explicitly call this method here.
CompletableFuture[] childFutures = new CompletableFuture[childWriters.length + 1]; | ||
int index = 0; | ||
childFutures[index++] = lastWriteFuture; | ||
for (VeniceWriter<K, V, U> writer: childWriters) { | ||
childFutures[index++] = writer.delete(key, callback, deleteMetadata); | ||
} | ||
CompletableFuture.allOf(childFutures).whenCompleteAsync((ignored, childException) -> { | ||
if (childException == null) { | ||
CompletableFuture<PubSubProduceResult> mainFuture = mainWriter.delete(key, callback, deleteMetadata); | ||
mainFuture.whenCompleteAsync((result, mainWriteException) -> { | ||
if (mainWriteException == null) { | ||
finalFuture.complete(result); | ||
} else { | ||
finalFuture.completeExceptionally(new VeniceException(mainWriteException)); | ||
} | ||
}); | ||
} else { | ||
VeniceException veniceException = new VeniceException(childException); | ||
finalFuture.completeExceptionally(veniceException); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can see some duplicate code in functions: put
and delete
, so is it easy to de-dup them?
} | ||
|
||
@Override | ||
public void close(boolean gracefulClose) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally, the class should only cleanup the instances created on its own, and since both mainWriter
and childWriters
are created outside, in theory, these instances should be closed elsewhere.
If we truly want to create and close these writers here, we can pass two suppliers and one for main writer and the other one for child writers and we can trigger the creation in the constructor.
try (VeniceWriter veniceWriter = getVeniceWriterFactory().createVeniceWriter(vwOptionsBuilder.build())) { | ||
veniceWriter.broadcastStartOfPush( | ||
false, | ||
version.isChunkingEnabled(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.
// Produce directly into one of the child fabric | ||
vwOptionsBuilder.setBrokerAddress(finalVersion.getPushStreamSourceAddress()); | ||
vwOptionsBuilder.setBrokerAddress(version.getPushStreamSourceAddress()); | ||
} | ||
veniceWriter = getVeniceWriterFactory().createVeniceWriter(vwOptionsBuilder.build()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have logic to send SOP
to view topics? And sort
flag should be always false
for view topics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, we currently are using the same method to write SOP
for VT. I think it's worth it to create a new one explicitly for view topics and we always set sort
flag to false
.
} | ||
} catch (Exception e) { | ||
String newStatusDetails = "Failed to start EOP procedures"; | ||
handleTerminalOfflinePushUpdate(offlinePushStatus, new ExecutionStatusWithDetails(ERROR, newStatusDetails)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Controller has logics to mutate version topics in the following scenarios:
- Version topic will be deleted when the version is being deleted and Controller should delete the corresponding view topics as well.
- Controller will enable log compaction in version topic for hybrid store when push job is completed, and I think we should enable it for view topics as well and in theory.
Do we have logic to cover the above cases?
childCallback, | ||
DEFAULT_LEADER_METADATA_WRAPPER, | ||
APP_DEFAULT_LOGICAL_TS, | ||
putMetadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One small perf enhancement:
View Topics doesn't really need a full RMD, and maybe we only need to keep rmd offset
for changelog consumer and the tombstone values are useless as there won't be any leader replicas for View Topics.
[server][controller] Add MaterializedViewWriter and support view writers in L/F
View writers will be invoked in L/F SIT too instead of only in A/A SIT. We rely on view config validation to ensure views that do require A/A are only added to stores with A/A enabled.
This PR only includes creation of materialized view topics, writing of data records and control messages to the materialized view topics in server and controller.
How was this PR tested?
Unit and integration tests
Does this PR introduce any user-facing changes?