guozhangwang commented on a change in pull request #8496:
URL: https://github.com/apache/kafka/pull/8496#discussion_r418201398



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -838,6 +838,7 @@
     static {
         final Map<String, Object> tempProducerDefaultOverrides = new 
HashMap<>();
         tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, 
"100");
+        
tempProducerDefaultOverrides.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
10000);

Review comment:
       nit: Could you add the original comment explaining why we set it to 
smaller value too?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java
##########
@@ -1064,14 +1071,16 @@ private void addAllKeys(final Set<Long> allKeys, final 
List<KeyValue<Long, Long>
 
     // must be public to allow KafkaProducer to instantiate it
     public static class KeyPartitioner implements Partitioner {
+        private final static LongDeserializer LONG_DESERIALIZER = new 
LongDeserializer();
+
         @Override
         public int partition(final String topic,
                              final Object key,
                              final byte[] keyBytes,
                              final Object value,
                              final byte[] valueBytes,
                              final Cluster cluster) {
-            return ((Long) key).intValue() % NUM_TOPIC_PARTITIONS;
+            return LONG_DESERIALIZER.deserialize(topic, keyBytes).intValue() % 
NUM_TOPIC_PARTITIONS;

Review comment:
       Hmm, this sounds to me that the StreamProducer's own `partitionsFor` did 
not return the num.partitions so we ended up calling `send` with `partition == 
null`, since otherwise we will get the `partition` as
   
   ```
   partition = partitioner.partition(topic, key, value, partitions.size());
   ```
   
   where `partitioner` is the `StreamsPartitioner` and the producer's own 
partitioner should not be used. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to