guozhangwang commented on a change in pull request #8964: URL: https://github.com/apache/kafka/pull/8964#discussion_r450577191
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ########## @@ -454,6 +456,41 @@ public void flush() { } } + public void flushCache() { + RuntimeException firstException = null; + // attempting to flush the stores + if (!stores.isEmpty()) { + log.debug("Flushing all store caches registered in the state manager: {}", stores); + for (final StateStoreMetadata metadata : stores.values()) { + final StateStore store = metadata.stateStore; + + try { + // buffer should be flushed to send all records to changelog + if (store instanceof TimeOrderedKeyValueBuffer) { Review comment: Admit this is kinda hacky, but I'd have to do this for cached store and suppression buffer. Moving forward I think the first can be removed when we decouple caching with emitting, but for suppression buffer maybe we can have a more general way to fix it, for example maybe we could just have changelogger to always buffer itself so that suppression buffers do not need to buffer itself to changelogger. cc @vvcephei ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -478,9 +479,11 @@ void handleRevocation(final Collection<TopicPartition> revokedPartitions) { commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); + // We do not need to enforce checkpointing upon suspending a task: if it is resumed later we just + // proceed normally; if it is closed we would checkpoint then for (final Task task : revokedTasks) { try { - task.postCommit(); + task.postCommit(false); Review comment: Actually after rebased on latest trunk I think upon suspending we can still not enforcing the checkpointing, but only enforce upon closing / recycling in handleAssignment. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java ########## @@ -38,13 +41,39 @@ */ final class StateManagerUtil { static final String CHECKPOINT_FILE_NAME = ".checkpoint"; + static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L; Review comment: This is just a placeholder for making less frequent flushing than commits, but it's open for debate whether we have a better mechanism to decide when flushing should be executed. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org