Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unchunking evalScan and apply logging to chunks for performance #64

Merged
merged 3 commits into from
Feb 19, 2024

Conversation

bcarter97
Copy link
Member

@bcarter97 bcarter97 commented Feb 19, 2024

Description

Currently we use evalScan to perform the logic of producing a ConsumerRecord and the non-empty partition map. this is effectful as we perform logging upon reaching the highest offset.

It would be more efficient to use the non-effectful scan, as evalScan de-chunks a stream into singleton chunks. There unfortunately is no evalScanChunks equivalent of scanChunks.

We still perform the logging, however we can operate on the chunk for performance by using evalTapChunk

Related Issues

Definition of Done:

The Below tasks should be completed before marking a PR as ready for review.

  • Added / Removed / Updated relevant tests
  • Scaladoc updated

Comment on lines 173 to 186
reachedHighest match {
case Some(highest) =>
HighestOffsetsWithRecord(
partitionOffsets = t.partitionOffsets - highest,
consumerRecord = emittableRecord,
partitionLast = PartitionLast(highest, r.offset).some
)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we've reached the highest offset for this partition, store the Topic Partition and Offset inside the HighestOffsetsWithRecord so we can signal this upstream

.takeWhile(_.partitionOffsets.nonEmpty, takeFailure = true)
.evalTapChunk(_.partitionLast.traverse { last =>
logger.warn(s"Finished loading data from ${last.topicPartition.show} at offset ${last.offset}")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i know this hasn't changed but why is this a warning? this should be info

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

66541f3 (#64)

Was like that in the Akka lib, always assumed it was intentional 👀 I've lowered it to info

Copy link

@lacarvalho91 lacarvalho91 Feb 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I stand corrected, guess I left it on while debugging and forgot to lower it 😨

@@ -22,9 +22,12 @@ object TopicLoader extends TopicLoader {
private[topicloader] given [K, V]: Show[ConsumerRecord[K, V]] =
Show.show(cr => s"${cr.topic}-${cr.partition}")

private case class PartitionLast(topicPartition: TopicPartition, offset: Long)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PartitionLastOffset might make this clearer?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bcarter97 bcarter97 force-pushed the 60-remove-unchunking-scan branch from ad4363f to 66541f3 Compare February 19, 2024 16:40
@bcarter97 bcarter97 enabled auto-merge (squash) February 19, 2024 16:49
@bcarter97 bcarter97 merged commit cb2636d into main Feb 19, 2024
1 check passed
@bcarter97 bcarter97 deleted the 60-remove-unchunking-scan branch February 19, 2024 16:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants