unknowntpo commented on code in PR #22458:
URL: https://github.com/apache/kafka/pull/22458#discussion_r3392772311
##########
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##########
@@ -312,6 +309,38 @@ public TopologyConfig(final String topologyName, final
StreamsConfig globalAppCo
.getOrDefault(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG,
"false")));
}
+ private int configureMaxBufferedSize() {
+ final boolean bufferedRecordsPerPartitionOverridden =
isTopologyOverride(BUFFERED_RECORDS_PER_PARTITION_CONFIG, topologyOverrides);
+ final boolean inputBufferMaxBytesOverridden =
isTopologyOverride(INPUT_BUFFER_MAX_BYTES_CONFIG, topologyOverrides);
Review Comment:
This should also consider the application-level config. If
`input.buffer.max.bytes` is explicitly set globally, a named topology should
not be able to re-enable the deprecated
`buffered.records.per.partition` guard through topology overrides.
```suggestion
final boolean inputBufferMaxBytesOverridden =
isTopologyOverride(INPUT_BUFFER_MAX_BYTES_CONFIG,
topologyOverrides)
||
globalAppConfigs.originals().containsKey(INPUT_BUFFER_MAX_BYTES_CONFIG);
```
##########
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##########
@@ -312,6 +309,38 @@ public TopologyConfig(final String topologyName, final
StreamsConfig globalAppCo
.getOrDefault(StreamsConfig.TRANSACTIONAL_STATE_STORES_CONFIG,
"false")));
}
+ private int configureMaxBufferedSize() {
+ final boolean bufferedRecordsPerPartitionOverridden =
isTopologyOverride(BUFFERED_RECORDS_PER_PARTITION_CONFIG, topologyOverrides);
+ final boolean inputBufferMaxBytesOverridden =
isTopologyOverride(INPUT_BUFFER_MAX_BYTES_CONFIG, topologyOverrides);
+
+ if (!bufferedRecordsPerPartitionOverridden &&
!inputBufferMaxBytesOverridden) {
+ return getBufferedRecordsPerPartition(globalAppConfigs);
+ }
+ return
setMaxBufferedRecordsPerPartition(bufferedRecordsPerPartitionOverridden,
inputBufferMaxBytesOverridden);
+ }
+
+ @SuppressWarnings("deprecation")
+ private int setMaxBufferedRecordsPerPartition(final boolean
bufferedRecordsPerPartitionOverridden,
+ final boolean
inputBufferMaxBytesOverridden) {
+ int resolvedMaxBufferedSize = -1;
Review Comment:
Could we use early returns here and avoid reassigning
`resolvedMaxBufferedSize`?
Also, instead of returning the hardcoded `-1`, it would be clearer to
return a named sentinel value such as `UNDEFINED_MAX_BUFFERED_SIZE`. I do not
think we need to make
`StreamTask.UNDEFINED_MAX_BUFFERED_SIZE` public just for this, since that
constant is a `StreamTask` implementation detail. A private constant in
`TopologyConfig` with proper documentation should be
enough.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]