lucasbru commented on code in PR #20665:
URL: https://github.com/apache/kafka/pull/20665#discussion_r2426272664
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -470,17 +470,24 @@ private OffsetAndMetadata findOffsetAndMetadata(final
TopicPartition partition)
Optional<Integer> leaderEpoch =
partitionGroup.headRecordLeaderEpoch(partition);
final long partitionTime =
partitionGroup.partitionTimestamp(partition);
if (offset == null) {
- try {
- if (nextOffsetsAndMetadataToBeConsumed.containsKey(partition))
{
- final OffsetAndMetadata offsetAndMetadata =
nextOffsetsAndMetadataToBeConsumed.get(partition);
- offset = offsetAndMetadata.offset();
- leaderEpoch = offsetAndMetadata.leaderEpoch();
- } else {
- // This indicates a bug and thus we rethrow it as fatal
`IllegalStateException`
- throw new IllegalStateException("Stream task " + id + "
does not know the partition: " + partition);
+ final OffsetAndMetadata offsetAndMetadata =
nextOffsetsAndMetadataToBeConsumed.get(partition);
+ if (offsetAndMetadata == null) {
+ try {
+ offset = mainConsumer.position(partition);
+ leaderEpoch = Optional.empty();
+ } catch (final TimeoutException error) {
+ // the `consumer.position()` call should never block,
because we know that we did process data
+ // for the requested partition and thus the consumer
should have a valid local position
+ // that it can return immediately
Review Comment:
Is that still valid in the corner case? I think in the case you are
describing, we haven't processed any records so we may not have a `position`
for the partition yet and this will actually block to do an offset fetch to
find the last committed offset for the partition?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]