ableegoldman commented on a change in pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#discussion_r738863140



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1221,13 +1236,27 @@ private long getCacheSizePerThread(final int 
numStreamThreads) {
         return totalCacheSize / (numStreamThreads + 
(topologyMetadata.hasGlobalTopology() ? 1 : 0));
     }
 
+    private long getBufferSizePerThread(final int numStreamThreads) {

Review comment:
       Seems like this is exactly the same logic as `getCacheSizePerThread`, 
can we instead just change the name of the existing method to match both cases 
rather than writing duplicate code? Maybe something like 
`getMemorySharePerThread` or `getMemorySizePerThread`?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -385,7 +385,7 @@ public synchronized final InternalTopologyBuilder 
rewriteTopology(final StreamsC
         
setApplicationId(config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
 
         // maybe strip out caching layers
-        if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) == 
0L) {
+        if (config.getLong(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG) == 
0L) {

Review comment:
       The original config is deprecated but by definition may still be in use, 
so we need to check both configs. Also need to consider what to do if both are 
defined (imo just use whatever the value for the new config and ignore the old 
one, but you could log a warning if both are set to different values)

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1013,12 +1017,16 @@ private static Metrics getMetrics(final StreamsConfig 
config, final Time time, f
                 final int threadIdx = getNextThreadIndex();
                 final int numLiveThreads = getNumLiveStreamThreads();
                 final long cacheSizePerThread = 
getCacheSizePerThread(numLiveThreads + 1);
+                final long maxBufferSizePerThread = 
getBufferSizePerThread(numLiveThreads + 1);
                 log.info("Adding StreamThread-{}, there will now be {} live 
threads and the new cache size per thread is {}",
                          threadIdx, numLiveThreads + 1, cacheSizePerThread);
                 resizeThreadCache(cacheSizePerThread);
+                log.info("Adding StreamThread-{}, there will now be {} live 
threads and the new buffer size per thread is {}",

Review comment:
       Seems kind of weird to log this twice, it makes it seem like we're 
adding this StreamThread a second time. How about instead just adding the new 
buffer size to the original log message (eg something like)
   ```suggestion
                   log.info("Adding StreamThread-{}, there are now {} threads 
with a buffer size {} and cache size {} per thread.",
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1030,8 +1038,11 @@ private static Metrics getMetrics(final StreamsConfig 
config, final Time time, f
                     streamThread.shutdown();
                     threads.remove(streamThread);
                     final long cacheSizePerThread = 
getCacheSizePerThread(getNumLiveStreamThreads());
+                    final long maxBufferSizePerThread = 
getBufferSizePerThread(getNumLiveStreamThreads());
                     log.info("Resizing thread cache due to terminating added 
thread, new cache size per thread is {}", cacheSizePerThread);

Review comment:
       The phrasing here is a little awkward, can we try to clean it up a bit 
here and in the new line below? (Also worth considering to combine into a 
single log line here, although in this case imo it's fine either way)
   ```suggestion
                       log.info("Resizing thread cache again since new thread 
can not be started, final cache size per thread is {}", cacheSizePerThread);
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -747,11 +764,6 @@
                     "",
                     Importance.LOW,
                     APPLICATION_SERVER_DOC)
-            .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,

Review comment:
       I think you still need (or at least want) to keep these deprecated 
configs defined

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -601,6 +608,10 @@ boolean runLoop() {
                 if (size != -1L) {
                     cacheResizer.accept(size);
                 }
+                final long bufferBytesSize = 
maxBufferResizeSize.getAndSet(-1L);

Review comment:
       Do we need this? Seems like we can get rid of the extra 
`maxBufferSizeBytes` and instead just directly read out the value of 
`maxBufferResizeSize` (in which case we should probably rename 
`maxBufferResizeSize`  to `maxBufferSizeBytes` -- my point is, I think we can 
get away with just storing the current size of the buffer as a `long` and the 
max size as an `AtomicLong` that can be updated from outside the thread)

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1221,13 +1236,27 @@ private long getCacheSizePerThread(final int 
numStreamThreads) {
         return totalCacheSize / (numStreamThreads + 
(topologyMetadata.hasGlobalTopology() ? 1 : 0));
     }
 
+    private long getBufferSizePerThread(final int numStreamThreads) {
+        if (numStreamThreads == 0) {
+            return inputBufferMaxBytes;
+        }
+        return inputBufferMaxBytes / (numStreamThreads + 
(topologyMetadata.hasGlobalTopology() ? 1 : 0));
+    }
+
     private void resizeThreadCache(final long cacheSizePerThread) {
         processStreamThread(thread -> thread.resizeCache(cacheSizePerThread));
         if (globalStreamThread != null) {
             globalStreamThread.resize(cacheSizePerThread);
         }
     }
 
+    private void resizeMaxBufferSize(final long maxBufferSize) {

Review comment:
       Same here, if it's going to be exactly the same then we only need one 
method.
   
   Although, I don't think the GlobalThread actually even has an input buffer 
the way StreamThreads do (doesn't seem to make sense for it to need one because 
it can just process all of its polled records right away, whereas the 
StreamThread may need to buffer them for various reasons)
   
   You could still probably combine into a single method and just include a 
flag for whether or not to call `resize` on the global thread (with it being 
true for the cache case, and false for the input buffer resizing)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -736,11 +734,8 @@ public boolean process(final long wallClockTime) {
             consumedOffsets.put(partition, record.offset());
             commitNeeded = true;
 
-            // after processing this record, if its partition queue's buffered 
size has been
-            // decreased to the threshold, we can then resume the consumption 
on this partition
-            if (recordInfo.queue().size() == maxBufferedSize) {
-                mainConsumer.resume(singleton(partition));
-            }

Review comment:
       Unfortunately we can't remove this logic yet, since some users may still 
be setting only the `buffered.records.per.partition` config, and they shouldn't 
see a sudden explosion of memory just because they didn't switch over to the 
new config right away

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -721,6 +732,10 @@ public void resizeCache(final long size) {
         cacheResizeSize.set(size);
     }
 
+    public void resize(final long size) {

Review comment:
       nit: call this one `resizeCache` to differentiate between this and the 
buffer size? Also, is there a method somewhere to update the input buffer size? 
Seems like we're always resizing the cache when a thread is added/removed

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
##########
@@ -47,6 +47,7 @@
     private final ArrayDeque<ConsumerRecord<byte[], byte[]>> fifoQueue;
 
     private StampedRecord headRecord = null;
+    private long headRecordSizeInBytes = 0;

Review comment:
       Is this used anywhere?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1057,6 +1057,17 @@ void addRecordsToTasks(final ConsumerRecords<byte[], 
byte[]> records) {
         }
     }
 
+    /**
+     *  Fetch all non-empty partitions for pausing

Review comment:
       Why do we only pause the non-empty partitions? If the buffer is full, we 
have to pause all of them, no?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -793,6 +809,10 @@ void runOnce() {
 
                     totalProcessed += processed;
                     totalRecordsProcessedSinceLastSummary += processed;
+                    if (bufferSize > maxBufferSizeBytes && bufferSize - 
processedData.totalBytesConsumed <= maxBufferSizeBytes) {

Review comment:
       This logic is a little difficult to read 😅  -- would it help to just 
move the `bufferSize -= processedData.totalBytesConsumed;` to before this check?
   
   Although on that note, it might be cleaner to just move this check regarding 
whether to resume consuming to right before we call `poll`, that way there's a 
nice symmetry between the `pause` and `resume` checks, and all the logic is 
consolidated to one place




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to