muralibasani commented on code in PR #22458:
URL: https://github.com/apache/kafka/pull/22458#discussion_r3397898076
##########
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##########
@@ -312,6 +309,38 @@ public TopologyConfig(final String topologyName, final
StreamsConfig globalAppCo
.getOrDefault(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG,
"false")));
}
+ private int configureMaxBufferedSize() {
+ final boolean bufferedRecordsPerPartitionOverridden =
isTopologyOverride(BUFFERED_RECORDS_PER_PARTITION_CONFIG, topologyOverrides);
+ final boolean inputBufferMaxBytesOverridden =
isTopologyOverride(INPUT_BUFFER_MAX_BYTES_CONFIG, topologyOverrides);
+
+ if (!bufferedRecordsPerPartitionOverridden &&
!inputBufferMaxBytesOverridden) {
+ return getBufferedRecordsPerPartition(globalAppConfigs);
+ }
+ return
setMaxBufferedRecordsPerPartition(bufferedRecordsPerPartitionOverridden,
inputBufferMaxBytesOverridden);
+ }
+
+ @SuppressWarnings("deprecation")
+ private int setMaxBufferedRecordsPerPartition(final boolean
bufferedRecordsPerPartitionOverridden,
+ final boolean
inputBufferMaxBytesOverridden) {
+ int resolvedMaxBufferedSize = -1;
Review Comment:
@unknowntpo thanks. refactored setMaxBufferedRecordsPerPartition to use
early returns; added a local UNDEFINED_MAX_BUFFERED_SIZE = -1 constant in
TopologyConfig
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]