muralibasani commented on code in PR #22458:
URL: https://github.com/apache/kafka/pull/22458#discussion_r3397898076


##########
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##########
@@ -312,6 +309,38 @@ public TopologyConfig(final String topologyName, final 
StreamsConfig globalAppCo
                 .getOrDefault(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG, 
"false")));
     }
 
+    private int configureMaxBufferedSize() {
+        final boolean bufferedRecordsPerPartitionOverridden = 
isTopologyOverride(BUFFERED_RECORDS_PER_PARTITION_CONFIG, topologyOverrides);
+        final boolean inputBufferMaxBytesOverridden = 
isTopologyOverride(INPUT_BUFFER_MAX_BYTES_CONFIG, topologyOverrides);
+
+        if (!bufferedRecordsPerPartitionOverridden && 
!inputBufferMaxBytesOverridden) {
+            return getBufferedRecordsPerPartition(globalAppConfigs);
+        }
+        return 
setMaxBufferedRecordsPerPartition(bufferedRecordsPerPartitionOverridden, 
inputBufferMaxBytesOverridden);
+    }
+
+    @SuppressWarnings("deprecation")
+    private int setMaxBufferedRecordsPerPartition(final boolean 
bufferedRecordsPerPartitionOverridden,
+                                                  final boolean 
inputBufferMaxBytesOverridden) {
+        int resolvedMaxBufferedSize = -1;

Review Comment:
   @unknowntpo  thanks. refactored setMaxBufferedRecordsPerPartition to use 
early returns; added a local UNDEFINED_MAX_BUFFERED_SIZE = -1 constant in 
TopologyConfig



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to