ableegoldman commented on a change in pull request #11278:
URL: https://github.com/apache/kafka/pull/11278#discussion_r703973474



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -84,13 +85,34 @@ public synchronized void resize(final long 
newCacheSizeBytes) {
             while (sizeBytes() > maxCacheSizeBytes) {
                 final NamedCache cache = circularIterator.next();
                 cache.evict();
-                numEvicts++;
+                numEvicts.incrementAndGet();
             }
         } else {
             log.debug("Cache size was expanded to {}", newCacheSizeBytes);
         }
     }
 
+    public synchronized void resize(final Map<String, Long> newCacheSizes) {
+        maxCacheSizeBytes = newCacheSizes.values().stream().reduce(0L, 
Long::sum);
+        log.debug("Cache size was changed to {}", newCacheSizes);
+        for (final Map.Entry<String, Long> taskMaxSize: 
newCacheSizes.entrySet()) {
+            for (final Map.Entry<String, NamedCache> cache: caches.entrySet()) 
{
+                if (cache.getKey().contains(taskMaxSize.getKey())) {
+                    cache.getValue().setMaxBytes(taskMaxSize.getValue());
+                }
+            }
+        }
+        if (caches.values().isEmpty()) {
+            return;
+        }
+        final CircularIterator<NamedCache> circularIterator = new 
CircularIterator<>(caches.values());
+        while (sizeBytes() > maxCacheSizeBytes) {
+            final NamedCache cache = circularIterator.next();
+            cache.evict();
+            numEvicts.incrementAndGet();
+        }

Review comment:
       nit: we do this same thing in the other `#resize` for thread count 
changes, can you factor it out into a helper method? Then I think we can narrow 
the scope and make only that helper synchronized (should double check that 
though)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -84,13 +85,34 @@ public synchronized void resize(final long 
newCacheSizeBytes) {
             while (sizeBytes() > maxCacheSizeBytes) {
                 final NamedCache cache = circularIterator.next();
                 cache.evict();
-                numEvicts++;
+                numEvicts.incrementAndGet();
             }
         } else {
             log.debug("Cache size was expanded to {}", newCacheSizeBytes);
         }
     }
 
+    public synchronized void resize(final Map<String, Long> newCacheSizes) {
+        maxCacheSizeBytes = newCacheSizes.values().stream().reduce(0L, 
Long::sum);
+        log.debug("Cache size was changed to {}", newCacheSizes);
+        for (final Map.Entry<String, Long> taskMaxSize: 
newCacheSizes.entrySet()) {
+            for (final Map.Entry<String, NamedCache> cache: caches.entrySet()) 
{
+                if (cache.getKey().contains(taskMaxSize.getKey())) {
+                    cache.getValue().setMaxBytes(taskMaxSize.getValue());
+                }
+            }
+        }
+        if (caches.values().isEmpty()) {

Review comment:
       Any reason this checks emptiness of `caches.values()` instead of 
`caches.keys()`?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##########
@@ -43,7 +44,7 @@
     // internal stats
     private long numPuts = 0;
     private long numGets = 0;
-    private long numEvicts = 0;
+    private AtomicLong numEvicts = new AtomicLong(0);

Review comment:
       why make this atomic, we're still only ever evicting/accessing this from 
the actual StreamThread right?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -502,7 +504,8 @@ public StreamThread(final Time time,
         this.assignmentErrorCode = assignmentErrorCode;
         this.shutdownErrorHook = shutdownErrorHook;
         this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
-        this.cacheResizer = cacheResizer;
+        this.threadCache = threadCache;
+        cacheSizes = new ConcurrentHashMap<>();

Review comment:
       Does this need to be a concurrent map? Seems to only be accessed by the 
StreamThread itself

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -247,4 +247,6 @@ default boolean commitRequested() {
      * @return This returns the time the task started idling. If it is not 
idling it returns empty.
      */
     Optional<Long> timeCurrentIdlingStarted();
+
+    long maxBuffer();

Review comment:
       Should probably specify what kind of buffer in the name (esp. with 
KIP-770 adding another relevant buffer type)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to