Gerrrr commented on a change in pull request #11945:
URL: https://github.com/apache/kafka/pull/11945#discussion_r835106474



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -1197,7 +1197,7 @@ boolean sendingOldValueEnabled() {
 
         final StreamPartitioner<K, SubscriptionResponseWrapper<VO>> 
foreignResponseSinkPartitioner =
             tableJoinedInternal.partitioner() == null
-                ? null
+                ? (topic, key, subscriptionResponseWrapper, numPartitions) -> 
subscriptionResponseWrapper.getPrimaryPartition()

Review comment:
       Yes, this is correct. For completeness, here is how I reasoned about it. 
Without this patch the partitioner is `null`, so `RecordCollectorImpl` sets the 
partition to `null` here:
   
   
https://github.com/apache/kafka/blob/ce883892270a02a72e91afbdb1fabdd50d7da474/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L136
   
   With this patch, the partitioner is always not `null`, so `RecordCollector` 
uses it to find desired partition here:
   
   
https://github.com/apache/kafka/blob/ce883892270a02a72e91afbdb1fabdd50d7da474/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L130
   
   In the case when the new partitioner returns `null` for the v0 data, we get 
to the same state as without this patch - the following call to another version 
of `RecordCollectorImpl#send` has `partition` set to `null`:
   
   
https://github.com/apache/kafka/blob/ce883892270a02a72e91afbdb1fabdd50d7da474/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java#L139




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to