ableegoldman commented on a change in pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#discussion_r740700461



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -385,7 +385,7 @@ public synchronized final InternalTopologyBuilder 
rewriteTopology(final StreamsC
         
setApplicationId(config.getString(StreamsConfig.APPLICATION_ID_CONFIG));
 
         // maybe strip out caching layers
-        if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) == 
0L) {
+        if (config.getLong(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG) == 
0L) {

Review comment:
       Ah I wasn't trying to suggest we shouldn't start using the new config 
name everywhere, we absolutely should (even if it does explode the PR quite a 
bit -- I'll live). My point here was that we need to check the value of both of 
these configs, since we don't know which (if any) the user actually set.
   
   Which, by the way,  you can check via `config.originals()` -- this returns 
the set of configs the user passed in, otherwise it's tricky to figure out 
since eg `config.getLong` would return the config default value if not set

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1057,6 +1057,17 @@ void addRecordsToTasks(final ConsumerRecords<byte[], 
byte[]> records) {
         }
     }
 
+    /**
+     *  Fetch all non-empty partitions for pausing

Review comment:
       Ah now I remember, thanks 😄 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1057,6 +1057,17 @@ void addRecordsToTasks(final ConsumerRecords<byte[], 
byte[]> records) {
         }
     }
 
+    /**
+     *  Fetch all non-empty partitions for pausing

Review comment:
       Can you leave a comment above this method explaining this? Don't want to 
forget why we did this a year from now and then accidentally break things

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -736,11 +734,8 @@ public boolean process(final long wallClockTime) {
             consumedOffsets.put(partition, record.offset());
             commitNeeded = true;
 
-            // after processing this record, if its partition queue's buffered 
size has been
-            // decreased to the threshold, we can then resume the consumption 
on this partition
-            if (recordInfo.queue().size() == maxBufferedSize) {
-                mainConsumer.resume(singleton(partition));
-            }

Review comment:
       >  Maybe we can add some check that we do this only if maxBufferedSize 
is set to some value?
   
   Yeah, sorry, I should have been more clear here -- we only need to continue 
doing this if the user is still setting the `buffered.records.per.partition` 
config. I mentioned this in another comment, but just in case you weren't 
aware, you can call `originals()` on a StreamsConfig object to get a map of the 
actual configs passed in by the user -- that way you know what they actually 
set vs what's just a default.
   
   Then you can just set `maxBufferedSize` to null or define a static constant 
`NOT_SET = -1` and then only continue doing this partition-level pause/resume 
if the user is still using `buffered.records.per.partition`. Does that make 
sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to