ableegoldman commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r458282421



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -454,6 +456,41 @@ public void flush() {
         }
     }
 
+    public void flushCache() {
+        RuntimeException firstException = null;
+        // attempting to flush the stores
+        if (!stores.isEmpty()) {
+            log.debug("Flushing all store caches registered in the state 
manager: {}", stores);
+            for (final StateStoreMetadata metadata : stores.values()) {
+                final StateStore store = metadata.stateStore;
+
+                try {
+                    // buffer should be flushed to send all records to 
changelog
+                    if (store instanceof TimeOrderedKeyValueBuffer) {
+                        store.flush();
+                    } else if (store instanceof CachedStateStore) {
+                        ((CachedStateStore) store).flushCache();
+                    }
+                    log.trace("Flushed cache or buffer {}", store.name());
+                } catch (final RuntimeException exception) {
+                    if (firstException == null) {
+                        // do NOT wrap the error if it is actually caused by 
Streams itself
+                        if (exception instanceof StreamsException)
+                            firstException = exception;
+                        else
+                            firstException = new ProcessorStateException(

Review comment:
       nit: can you use braces here?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##########
@@ -520,20 +557,27 @@ void transitionTaskType(final TaskType newType, final 
LogContext logContext) {
         log.debug("Transitioning state manager for {} task {} to {}", oldType, 
taskId, newType);
     }
 
-    @Override
-    public void checkpoint(final Map<TopicPartition, Long> writtenOffsets) {
-        // first update each state store's current offset, then checkpoint
-        // those stores that are only logged and persistent to the checkpoint 
file
+    void updateChangelogOffsets(final Map<TopicPartition, Long> 
writtenOffsets) {
         for (final Map.Entry<TopicPartition, Long> entry : 
writtenOffsets.entrySet()) {
             final StateStoreMetadata store = findStore(entry.getKey());
 
             if (store != null) {
                 store.setOffset(entry.getValue());
 
                 log.debug("State store {} updated to written offset {} at 
changelog {}",
-                    store.stateStore.name(), store.offset, 
store.changelogPartition);
+                        store.stateStore.name(), store.offset, 
store.changelogPartition);
             }
         }
+    }
+
+    @Override
+    public void checkpoint(final Map<TopicPartition, Long> writtenOffsets) {
+        // first update each state store's current offset, then checkpoint
+        // those stores that are only logged and persistent to the checkpoint 
file
+        // TODO: we still need to keep it as part of the checkpoint for global 
tasks; this could be
+        //       removed though when we consolidate global tasks / state 
managers into this one

Review comment:
       >TODO: we still need to keep it as part of the checkpoint for global 
tasks
   
   Took me a while to realize that "it" refers to the argument here -- can you 
clarify the comment?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -466,15 +458,8 @@ public void postCommit() {
                  */
                 partitionGroup.clear();

Review comment:
       This is also maybe beyond the scope of this PR, but it seems like 
there's no reason to do things like this anymore. Specifically, today we 
enforce the order `suspend` -> `pre/postCommit` -> `close` where `suspend` only 
closes the topology and we only use the `SUSPENDED` state to enforce that we 
suspended before closing. Why not swap the order so that we `pre/postCommit` -> 
`suspend` -> `close` and then we can move this call from `postCommit` to 
`suspend` where it makes more sense. Thoughts?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -49,6 +61,31 @@
         this.stateDirectory = stateDirectory;
     }
 
+    protected void initializeCheckpoint() {
+        // we will delete the local checkpoint file after registering the 
state stores and loading them into the
+        // state manager, therefore we should initialize the snapshot as empty 
to indicate over-write checkpoint needed
+        offsetSnapshotSinceLastFlush = Collections.emptyMap();
+    }
+
+    /**
+     * The following exceptions maybe thrown from the state manager flushing 
call
+     *
+     * @throws TaskMigratedException recoverable error sending changelog 
records that would cause the task to be removed
+     * @throws StreamsException fatal error when flushing the state store, for 
example sending changelog records failed
+     *                          or flushing state store get IO errors; such 
error should cause the thread to die
+     */
+    protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
+        final Map<TopicPartition, Long> offsetSnapshot = 
stateMgr.changelogOffsets();
+        if (StateManagerUtil.checkpointNeeded(enforceCheckpoint, 
offsetSnapshotSinceLastFlush, offsetSnapshot)) {
+            // since there's no written offsets we can checkpoint with empty 
map,
+            // and the state's current offset would be used to checkpoint
+            stateMgr.flush();
+            stateMgr.checkpoint(Collections.emptyMap());

Review comment:
       Should we add an assertion to `ProcessorStateManager#checkpoint` that 
the passed in offsets are always empty?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##########
@@ -38,13 +41,39 @@
  */
 final class StateManagerUtil {
     static final String CHECKPOINT_FILE_NAME = ".checkpoint";
+    static final long OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT = 10_000L;
 
     private StateManagerUtil() {}
 
     static RecordConverter converterForStore(final StateStore store) {
         return isTimestamped(store) ? rawValueToTimestampedValue() : 
identity();
     }
 
+    static boolean checkpointNeeded(final boolean enforceCheckpoint,
+                                    final Map<TopicPartition, Long> 
oldOffsetSnapshot,
+                                    final Map<TopicPartition, Long> 
newOffsetSnapshot) {
+        // we should always have the old snapshot post completing the register 
state stores;
+        // if it is null it means the registration is not done and hence we 
should not overwrite the checkpoint
+        if (oldOffsetSnapshot == null)
+            return false;
+
+        // if the previous snapshot is empty while the current snapshot is not 
then we should always checkpoint;
+        // note if the task is stateless or stateful but no stores logged, the 
snapshot would also be empty
+        // and hence it's okay to not checkpoint
+        if (oldOffsetSnapshot.isEmpty() && !newOffsetSnapshot.isEmpty())
+            return true;
+
+        // we can checkpoint if the the difference between the current and the 
previous snapshot is large enough
+        long totalOffsetDelta = 0L;
+        for (final Map.Entry<TopicPartition, Long> entry : 
newOffsetSnapshot.entrySet()) {
+            totalOffsetDelta += 
Math.abs(oldOffsetSnapshot.getOrDefault(entry.getKey(), 0L) - entry.getValue());
+        }
+
+        // when enforcing checkpoint is required, we should overwrite the 
checkpoint if it is different from the old one;
+        // otherwise, we only overwrite the checkpoint if it is largely 
different from the old one
+        return enforceCheckpoint ? totalOffsetDelta > 0 : totalOffsetDelta > 
OFFSET_DELTA_THRESHOLD_FOR_CHECKPOINT;

Review comment:
       Is this out of date? It seems like we now never checkpoint during 
suspension so we don't have to bother with this optimization

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -120,14 +120,12 @@ public void suspend() {
         switch (state()) {
             case CREATED:
                 log.info("Suspended created");
-                checkpointNeededForSuspended = false;

Review comment:
       Happy to see this go 🙂 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -93,8 +93,8 @@ public boolean isActive() {
     public void initializeIfNeeded() {
         if (state() == State.CREATED) {
             StateManagerUtil.registerStateStores(log, logPrefix, topology, 
stateMgr, stateDirectory, processorContext);
-
             // initialize the snapshot with the current offsets as we don't 
need to commit then until they change

Review comment:
       Can you move this comment down a line? We should avoid confusion since 
we actually don't initialize the (flush) snapshot with the current offsets, 
just the (commit) snapshot.
   
   To be honest, it's already pretty confusing that we initialize the two 
snapshots differently. Maybe you could add a quick sentence explaining why for 
our own future reference

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -243,18 +242,24 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
 
         for (final Task task : tasksToClose) {
             try {
-                if (task.isActive()) {
-                    // Active tasks are revoked and suspended/committed during 
#handleRevocation
-                    if (!task.state().equals(State.SUSPENDED)) {
-                        log.error("Active task {} should be suspended prior to 
attempting to close but was in {}",
-                                  task.id(), task.state());
-                        throw new IllegalStateException("Active task " + 
task.id() + " should have been suspended");
-                    }
-                } else {
-                    task.suspend();
-                    task.prepareCommit();
-                    task.postCommit();
+                // Always try to first suspend and commit the task before 
closing it;
+                // some tasks may already be suspended which should be a no-op.
+                //
+                // Also since active tasks should already be suspended / 
committed and
+                // standby tasks should have no offsets to commit, we should 
expect nothing to commit
+                task.suspend();
+
+                final Map<TopicPartition, OffsetAndMetadata> offsets = 
task.prepareCommit();
+
+                if (!offsets.isEmpty()) {
+                    log.error("Task {} should has been committed prior to 
attempting to close, but it reports non-empty offsets {} to commit",

Review comment:
       Hm. What if we hit a TaskMigratedException during `handleRevocation`? We 
would never finish committing them so `commitNeeded` would still return true 
and `prepareCommit` would return non-empty offsets right?
   
   It's kind of a bummer that we can't enforce that the task was committed. 
What we really need to do is enforce that we _attempted_ to commit the task -- 
regardless of whether or not it was successful. If the commit failed we know 
that either it was fatal or it was due to TaskMigrated, in which case the task 
will have to be closed as dirty anyways.
   This might be beyond the scope of this PR, but just to throw out one hacky 
idea we could add a `commitSuccessful` parameter to `postCommit` and then 
always invoke that after a commit so that `commitNeeded` is set to false. (If 
`commitSuccessful` is false we just skip everything else in `postCommit`)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -542,13 +530,22 @@ public void closeCleanAndRecycleState() {
         log.info("Closed clean and recycled state");
     }
 
-    private void writeCheckpoint() {
+    /**
+     * The following exceptions maybe thrown from the state manager flushing 
call
+     *
+     * @throws TaskMigratedException recoverable error sending changelog 
records that would cause the task to be removed
+     * @throws StreamsException fatal error when flushing the state store, for 
example sending changelog records failed
+     *                          or flushing state store get IO errors; such 
error should cause the thread to die
+     */
+    @Override
+    protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {

Review comment:
       Was there a reason to not just add `#updateChangelogOffsets` to the 
`StateManager` interface and remove the checkpointable offsets argument from 
`#checkpoint`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to