unknowntpo commented on code in PR #22458:
URL: https://github.com/apache/kafka/pull/22458#discussion_r3392772311


##########
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##########
@@ -312,6 +309,38 @@ public TopologyConfig(final String topologyName, final 
StreamsConfig globalAppCo
                 .getOrDefault(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG, 
"false")));
     }
 
+    private int configureMaxBufferedSize() {
+        final boolean bufferedRecordsPerPartitionOverridden = 
isTopologyOverride(BUFFERED_RECORDS_PER_PARTITION_CONFIG, topologyOverrides);
+        final boolean inputBufferMaxBytesOverridden = 
isTopologyOverride(INPUT_BUFFER_MAX_BYTES_CONFIG, topologyOverrides);

Review Comment:
   This should also consider the application-level config. If 
`input.buffer.max.bytes` is explicitly set globally, a named topology should 
not be able to re-enable the deprecated
     `buffered.records.per.partition` guard through topology overrides.
   
     ```suggestion
             final boolean inputBufferMaxBytesOverridden =
                 isTopologyOverride(INPUT_BUFFER_MAX_BYTES_CONFIG, 
topologyOverrides)
                     || 
globalAppConfigs.originals().containsKey(INPUT_BUFFER_MAX_BYTES_CONFIG);
     ```



##########
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##########
@@ -312,6 +309,38 @@ public TopologyConfig(final String topologyName, final 
StreamsConfig globalAppCo
                 .getOrDefault(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG, 
"false")));
     }
 
+    private int configureMaxBufferedSize() {
+        final boolean bufferedRecordsPerPartitionOverridden = 
isTopologyOverride(BUFFERED_RECORDS_PER_PARTITION_CONFIG, topologyOverrides);
+        final boolean inputBufferMaxBytesOverridden = 
isTopologyOverride(INPUT_BUFFER_MAX_BYTES_CONFIG, topologyOverrides);
+
+        if (!bufferedRecordsPerPartitionOverridden && 
!inputBufferMaxBytesOverridden) {
+            return getBufferedRecordsPerPartition(globalAppConfigs);
+        }
+        return 
setMaxBufferedRecordsPerPartition(bufferedRecordsPerPartitionOverridden, 
inputBufferMaxBytesOverridden);
+    }
+
+    @SuppressWarnings("deprecation")
+    private int setMaxBufferedRecordsPerPartition(final boolean 
bufferedRecordsPerPartitionOverridden,
+                                                  final boolean 
inputBufferMaxBytesOverridden) {
+        int resolvedMaxBufferedSize = -1;

Review Comment:
   Could we use early returns here and avoid reassigning 
`resolvedMaxBufferedSize`?
   
     Also, instead of returning the hardcoded `-1`, it would be clearer to 
return a named sentinel value such as `UNDEFINED_MAX_BUFFERED_SIZE`. I do not 
think we need to make
     `StreamTask.UNDEFINED_MAX_BUFFERED_SIZE` public just for this, since that 
constant is a `StreamTask` implementation detail. A private constant in 
`TopologyConfig` with proper documentation should be
     enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to