guozhangwang commented on a change in pull request #8496:
URL: https://github.com/apache/kafka/pull/8496#discussion_r418214162



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -1064,14 +1071,16 @@ private void addAllKeys(final Set<Long> allKeys, final 
List<KeyValue<Long, Long>
 
     // must be public to allow KafkaProducer to instantiate it
     public static class KeyPartitioner implements Partitioner {
+        private final static LongDeserializer LONG_DESERIALIZER = new 
LongDeserializer();
+
         @Override
         public int partition(final String topic,
                              final Object key,
                              final byte[] keyBytes,
                              final Object value,
                              final byte[] valueBytes,
                              final Cluster cluster) {
-            return ((Long) key).intValue() % NUM_TOPIC_PARTITIONS;
+            return LONG_DESERIALIZER.deserialize(topic, keyBytes).intValue() % 
NUM_TOPIC_PARTITIONS;

Review comment:
       What I was asking is for the necessity of 
   
   ```
   
properties.put(StreamsConfig.producerPrefix(ProducerConfig.PARTITIONER_CLASS_CONFIG),
 KeyPartitioner.class);
   ```
   
   As I mentioned, Streams has its own StreamsPartitioner, and if it can get 
the actual not-null `partition` value passing to the `send` call, then the 
embedded producer's partitioner would not be used. Maybe I missed something 
critical here --- did you mean this config is only used for sending data to the 
source topics? If yes why put it into a streams props?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to