[
https://issues.apache.org/jira/browse/KAFKA-16774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17846912#comment-17846912
]
Chia-Ping Tsai edited comment on KAFKA-16774 at 5/16/24 11:11 AM:
------------------------------------------------------------------
[~mjsax] nice question. It seems to me that is a flaky. IIRC,
`onPartitionsAssigned` should be executed by thread same to the one calling
consumer#poll` [0]. However, in that test case, `onPartitionsAssigned` is
called by junit tread [1] and so it causes race condition.
We can fix it by returning a copy of `pendingTasksToInit` [2] with sync. That
is similar to `allTasksPerId` [3]. WDYT?
[0]
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L147
[1]
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L1408
[2]
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java#L112
[3]
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java#L302
was (Author: chia7712):
[~mjsax] nice question. It seems to me that is a flaky. IIRC,
`onPartitionsAssigned` should be executed by thread same to the one calling
consumer#poll` [0]. However, in that test case, `onPartitionsAssigned` is
called by junit tread [1] and so it causes race condition.
We can fix it by returning a copy of `pendingTasksToInit` [2]. That is similar
to `allTasksPerId` [3]. WDYT?
[0]
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java#L147
[1]
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L1408
[2]
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java#L112
[3]
https://github.com/apache/kafka/blob/7b1fe33d01cb3de9f457a7e4d711eacb7c8f1c4a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java#L302
> fix flaky StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled
> -------------------------------------------------------------------------
>
> Key: KAFKA-16774
> URL: https://issues.apache.org/jira/browse/KAFKA-16774
> Project: Kafka
> Issue Type: Test
> Components: streams, unit tests
> Reporter: Chia-Ping Tsai
> Priority: Minor
> Labels: flaky-test
>
> java.util.ConcurrentModificationException
> at
> java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1720)
> at
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
> at
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
> at
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
> at
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1686)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.releaseLockedUnassignedTaskDirectories(TaskManager.java:1364)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.handleRebalanceComplete(TaskManager.java:208)
> at
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:79)
> at
> org.apache.kafka.streams.processor.internals.StreamThreadTest.shouldCloseAllTaskProducersOnCloseIfEosEnabled(StreamThreadTest.java:1408)
> at
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)