ableegoldman commented on a change in pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#discussion_r738863140
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1221,13 +1236,27 @@ private long getCacheSizePerThread(final int
numStreamThreads) {
return totalCacheSize / (numStreamThreads +
(topologyMetadata.hasGlobalTopology() ? 1 : 0));
}
+ private long getBufferSizePerThread(final int numStreamThreads) {
Review comment:
Seems like this is exactly the same logic as `getCacheSizePerThread`,
can we instead just change the name of the existing method to match both cases
rather than writing duplicate code? Maybe something like
`getMemorySharePerThread` or `getMemorySizePerThread`?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -385,7 +385,7 @@ public synchronized final InternalTopologyBuilder
rewriteTopology(final StreamsC
setApplicationId(config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
// maybe strip out caching layers
- if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) ==
0L) {
+ if (config.getLong(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG) ==
0L) {
Review comment:
The original config is deprecated but by definition may still be in use,
so we need to check both configs. Also need to consider what to do if both are
defined (imo just use whatever the value for the new config and ignore the old
one, but you could log a warning if both are set to different values)
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1013,12 +1017,16 @@ private static Metrics getMetrics(final StreamsConfig
config, final Time time, f
final int threadIdx = getNextThreadIndex();
final int numLiveThreads = getNumLiveStreamThreads();
final long cacheSizePerThread =
getCacheSizePerThread(numLiveThreads + 1);
+ final long maxBufferSizePerThread =
getBufferSizePerThread(numLiveThreads + 1);
log.info("Adding StreamThread-{}, there will now be {} live
threads and the new cache size per thread is {}",
threadIdx, numLiveThreads + 1, cacheSizePerThread);
resizeThreadCache(cacheSizePerThread);
+ log.info("Adding StreamThread-{}, there will now be {} live
threads and the new buffer size per thread is {}",
Review comment:
Seems kind of weird to log this twice, it makes it seem like we're
adding this StreamThread a second time. How about instead just adding the new
buffer size to the original log message (eg something like)
```suggestion
log.info("Adding StreamThread-{}, there are now {} threads
with a buffer size {} and cache size {} per thread.",
```
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1030,8 +1038,11 @@ private static Metrics getMetrics(final StreamsConfig
config, final Time time, f
streamThread.shutdown();
threads.remove(streamThread);
final long cacheSizePerThread =
getCacheSizePerThread(getNumLiveStreamThreads());
+ final long maxBufferSizePerThread =
getBufferSizePerThread(getNumLiveStreamThreads());
log.info("Resizing thread cache due to terminating added
thread, new cache size per thread is {}", cacheSizePerThread);
Review comment:
The phrasing here is a little awkward, can we try to clean it up a bit
here and in the new line below? (Also worth considering to combine into a
single log line here, although in this case imo it's fine either way)
```suggestion
log.info("Resizing thread cache again since new thread
can not be started, final cache size per thread is {}", cacheSizePerThread);
```
##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -747,11 +764,6 @@
"",
Importance.LOW,
APPLICATION_SERVER_DOC)
- .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
Review comment:
I think you still need (or at least want) to keep these deprecated
configs defined
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -601,6 +608,10 @@ boolean runLoop() {
if (size != -1L) {
cacheResizer.accept(size);
}
+ final long bufferBytesSize =
maxBufferResizeSize.getAndSet(-1L);
Review comment:
Do we need this? Seems like we can get rid of the extra
`maxBufferSizeBytes` and instead just directly read out the value of
`maxBufferResizeSize` (in which case we should probably rename
`maxBufferResizeSize` to `maxBufferSizeBytes` -- my point is, I think we can
get away with just storing the current size of the buffer as a `long` and the
max size as an `AtomicLong` that can be updated from outside the thread)
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1221,13 +1236,27 @@ private long getCacheSizePerThread(final int
numStreamThreads) {
return totalCacheSize / (numStreamThreads +
(topologyMetadata.hasGlobalTopology() ? 1 : 0));
}
+ private long getBufferSizePerThread(final int numStreamThreads) {
+ if (numStreamThreads == 0) {
+ return inputBufferMaxBytes;
+ }
+ return inputBufferMaxBytes / (numStreamThreads +
(topologyMetadata.hasGlobalTopology() ? 1 : 0));
+ }
+
private void resizeThreadCache(final long cacheSizePerThread) {
processStreamThread(thread -> thread.resizeCache(cacheSizePerThread));
if (globalStreamThread != null) {
globalStreamThread.resize(cacheSizePerThread);
}
}
+ private void resizeMaxBufferSize(final long maxBufferSize) {
Review comment:
Same here, if it's going to be exactly the same then we only need one
method.
Although, I don't think the GlobalThread actually even has an input buffer
the way StreamThreads do (doesn't seem to make sense for it to need one because
it can just process all of its polled records right away, whereas the
StreamThread may need to buffer them for various reasons)
You could still probably combine into a single method and just include a
flag for whether or not to call `resize` on the global thread (with it being
true for the cache case, and false for the input buffer resizing)
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -736,11 +734,8 @@ public boolean process(final long wallClockTime) {
consumedOffsets.put(partition, record.offset());
commitNeeded = true;
- // after processing this record, if its partition queue's buffered
size has been
- // decreased to the threshold, we can then resume the consumption
on this partition
- if (recordInfo.queue().size() == maxBufferedSize) {
- mainConsumer.resume(singleton(partition));
- }
Review comment:
Unfortunately we can't remove this logic yet, since some users may still
be setting only the `buffered.records.per.partition` config, and they shouldn't
see a sudden explosion of memory just because they didn't switch over to the
new config right away
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -721,6 +732,10 @@ public void resizeCache(final long size) {
cacheResizeSize.set(size);
}
+ public void resize(final long size) {
Review comment:
nit: call this one `resizeCache` to differentiate between this and the
buffer size? Also, is there a method somewhere to update the input buffer size?
Seems like we're always resizing the cache when a thread is added/removed
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
##########
@@ -47,6 +47,7 @@
private final ArrayDeque<ConsumerRecord<byte[], byte[]>> fifoQueue;
private StampedRecord headRecord = null;
+ private long headRecordSizeInBytes = 0;
Review comment:
Is this used anywhere?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1057,6 +1057,17 @@ void addRecordsToTasks(final ConsumerRecords<byte[],
byte[]> records) {
}
}
+ /**
+ * Fetch all non-empty partitions for pausing
Review comment:
Why do we only pause the non-empty partitions? If the buffer is full, we
have to pause all of them, no?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -793,6 +809,10 @@ void runOnce() {
totalProcessed += processed;
totalRecordsProcessedSinceLastSummary += processed;
+ if (bufferSize > maxBufferSizeBytes && bufferSize -
processedData.totalBytesConsumed <= maxBufferSizeBytes) {
Review comment:
This logic is a little difficult to read 😅 -- would it help to just
move the `bufferSize -= processedData.totalBytesConsumed;` to before this check?
Although on that note, it might be cleaner to just move this check regarding
whether to resume consuming to right before we call `poll`, that way there's a
nice symmetry between the `pause` and `resume` checks, and all the logic is
consolidated to one place
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]