[ 
https://issues.apache.org/jira/browse/KAFKA-10454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17188534#comment-17188534
 ] 

John Roesler commented on KAFKA-10454:
--------------------------------------

Huh, thanks for the report and repro, [~lkokhreidze] !

I think that if we didn't do selectKey in there, then you _would_ get an 
exception, since we would check that the stream and table have the same number 
of partitions for the join. I'm guessing that the selectKey is inserting a 
repartition node, which is (correctly) configured to be co-partitioned with the 
table, but the source-changelog optimization is kicking in and ignoring that 
the repartition node and source topic have different numbers of partitions.

If that sounds right to you, then the solution should be to add a 
partition-count check before applying the source-changelog optimization. Your 
workaround looks like it works also to disable the source-changelog 
optimization because the repartition() operator always forces a repartition 
node.

Thanks!

-John

> Kafka Streams Stuck in infinite REBALANCING loop when stream <> table join 
> partitions don't match
> -------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10454
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10454
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.0
>            Reporter: Levani Kokhreidze
>            Priority: Major
>
> Here's integration test: [https://github.com/apache/kafka/pull/9237]
>  
> From the first glance, issue is that when one joins stream to table, and 
> table source topic doesn't have same number of partitions as stream topic, 
> `StateChangelogReader` tries to recover state from changelog (which in this 
> case is the same as source topic) for table from partitions that don't exist. 
> Logs are spammed with: 
>  
> {code:java}
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,508] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,510] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,513] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,515] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,515] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,515] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,515] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,517] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,518] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,518] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,518] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,520] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,520] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,520] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,520] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,522] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,522] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,522] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,522] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,524] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,524] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,525] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,525] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,526] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,527] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,527] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,527] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-3 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,529] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-0 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,529] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-1 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716) 
> [2020-09-01 12:33:07,529] INFO stream-thread 
> [app-StreamTableJoinInfiniteLoopIntegrationTestloop-86ae06c3-5758-429f-9d29-94ed516db126-StreamThread-1]
>  End offset for changelog 
> topic-b-StreamTableJoinInfiniteLoopIntegrationTestloop-2 cannot be found; 
> will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader:716)
> {code}
> And Kafka Streams never moves to RUNNING state.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to