ableegoldman commented on a change in pull request #8988:
URL: https://github.com/apache/kafka/pull/8988#discussion_r459768271



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -365,72 +368,104 @@ private ChangelogMetadata 
restoringChangelogByPartition(final TopicPartition par
         return changelogMetadata;
     }
 
-    private Set<ChangelogMetadata> registeredChangelogs() {
+    private synchronized Set<ChangelogMetadata> offsetLimitChangelogs() {
+        return changelogs.entrySet().stream()
+                .filter(entry -> entry.getValue().stateManager.taskType() == 
Task.TaskType.STANDBY &&
+                        
entry.getValue().stateManager.changelogAsSource(entry.getKey()))
+                .map(Map.Entry::getValue).collect(Collectors.toSet());
+    }
+
+    private synchronized Set<ChangelogMetadata> registeredChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == 
ChangelogState.REGISTERED)
             .collect(Collectors.toSet());
     }
 
-    private Set<TopicPartition> restoringChangelogs() {
+    private synchronized Set<ChangelogMetadata> restoringChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == 
ChangelogState.RESTORING)
-            .map(metadata -> metadata.storeMetadata.changelogPartition())
             .collect(Collectors.toSet());
     }
 
-    private Set<TopicPartition> activeRestoringChangelogs() {
+    private synchronized Set<TopicPartition> activeRestoringChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == 
ChangelogState.RESTORING &&
                 metadata.stateManager.taskType() == Task.TaskType.ACTIVE)
             .map(metadata -> metadata.storeMetadata.changelogPartition())
             .collect(Collectors.toSet());
     }
 
-    private Set<TopicPartition> standbyRestoringChangelogs() {
+    private synchronized Set<TopicPartition> standbyRestoringChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == 
ChangelogState.RESTORING &&
                 metadata.stateManager.taskType() == Task.TaskType.STANDBY)
             .map(metadata -> metadata.storeMetadata.changelogPartition())
             .collect(Collectors.toSet());
     }
 
-    private boolean allChangelogsCompleted() {
+    private synchronized Set<ChangelogMetadata> allChangelogs() {
+        // we need to make a shallow copy of this set for thread-safety
+        return new HashSet<>(changelogs.values());
+    }
+
+    private synchronized boolean allChangelogsCompleted() {
         return changelogs.values().stream()
             .allMatch(metadata -> metadata.changelogState == 
ChangelogState.COMPLETED);
     }
 
     @Override
-    public Set<TopicPartition> completedChangelogs() {
+    public synchronized Set<TopicPartition> completedChangelogs() {
         return changelogs.values().stream()
-            .filter(metadata -> metadata.changelogState == 
ChangelogState.COMPLETED)
-            .map(metadata -> metadata.storeMetadata.changelogPartition())
-            .collect(Collectors.toSet());
+                .filter(metadata -> metadata.changelogState == 
ChangelogState.COMPLETED)
+                .map(metadata -> metadata.storeMetadata.changelogPartition())
+                .collect(Collectors.toSet());
     }
 
-    // 1. if there are any registered changelogs that needs initialization, 
try to initialize them first;
-    // 2. if all changelogs have finished, return early;
-    // 3. if there are any restoring changelogs, try to read from the restore 
consumer and process them.
-    public void restore() {
-        initializeChangelogs(registeredChangelogs());
+    /**
+     * 1. if there are any registered changelogs that needs initialization, 
try to initialize them first;
+     * 2. if all changelogs have finished, return early;
+     * 3. if there are any restoring changelogs, try to read from the restore 
consumer and process them.
+     *
+     * @throws StreamsException       If there are unexpected exception thrown 
during the restoration
+     * @throws TaskCorruptedException If the changelog has been truncated 
while restoration is still on-going
+     */
+    public int restore() {
+        final ChangelogReaderState currentState;
+        synchronized (this) {
+            while (allChangelogsCompleted()) {
+                log.debug("All changelogs {} have completed restoration so 
far, will wait " +
+                        "until new changelogs are registered", 
changelogs.keySet());
+
+                try {
+                    wait();
+                } catch (final InterruptedException e) {
+                    log.trace("Interrupted with updated changelogs {}", 
changelogs.keySet());
+                }
+            }
+
+            currentState = state;
+        }
+
+        initializeChangelogs(currentState, registeredChangelogs());
 
-        if (!activeRestoringChangelogs().isEmpty() && state == 
ChangelogReaderState.STANDBY_UPDATING) {
+        if (!activeRestoringChangelogs().isEmpty() && currentState == 
ChangelogReaderState.STANDBY_UPDATING) {

Review comment:
       Why not let the StreamThread process some standbys while the restore 
thread does its restoring? Not sure if we plan to ultimately move standbys to a 
new thread, or to share with the restore thread, but it seems like we shouldn't 
block them on restoration or we're missing out on a huge piece of the available 
improvement.
   
   Especially with KIP-441 where for most tasks, the majority of restoration 
will actually occur as a standby and not with actual restoration. 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -411,7 +411,10 @@ boolean tryToCompleteRestoration() {
                 } else {
                     // we found a restoring task that isn't done restoring, 
which is evidence that
                     // not all tasks are running
+                    log.debug("Task {} has not completed restoration, will 
check next time", task.id());
+
                     allRunning = false;

Review comment:
       Don't we want to modify the TaskManager so the StreamThread doesn't have 
to wait for all tasks to finish restoring? It should be able to start 
processing any active tasks as soon as they finish restoring in the other 
thread. 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -659,13 +665,12 @@ void runOnce() {
             }
         }
 
-        // we can always let changelog reader try restoring in order to 
initialize the changelogs;
-        // if there's no active restoring or standby updating it would not try 
to fetch any data
-        changelogReader.restore();
-
-        // TODO: we should record the restore latency and its relative time 
spent ratio after
-        //       we figure out how to move this method out of the stream thread
-        advanceNowAndComputeLatency();
+        // check if restore thread has encountered TaskCorrupted exception; if 
yes
+        // rethrow it to trigger the handling logic
+        final TaskCorruptedException e = 
restoreThread.nextCorruptedException();

Review comment:
       Might be nice to gather and handle all TaskCorruptedExceptions at once 
rather than one per loop like this, especially since each one likely involves 
committing all tasks (and I would imagine that with EOS, when we get one 
TaskCorrupted we are likely to  also have more). That can definitely be 
follow-on work, just putting it out there

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
##########
@@ -365,72 +368,104 @@ private ChangelogMetadata 
restoringChangelogByPartition(final TopicPartition par
         return changelogMetadata;
     }
 
-    private Set<ChangelogMetadata> registeredChangelogs() {
+    private synchronized Set<ChangelogMetadata> offsetLimitChangelogs() {
+        return changelogs.entrySet().stream()
+                .filter(entry -> entry.getValue().stateManager.taskType() == 
Task.TaskType.STANDBY &&
+                        
entry.getValue().stateManager.changelogAsSource(entry.getKey()))
+                .map(Map.Entry::getValue).collect(Collectors.toSet());
+    }
+
+    private synchronized Set<ChangelogMetadata> registeredChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == 
ChangelogState.REGISTERED)
             .collect(Collectors.toSet());
     }
 
-    private Set<TopicPartition> restoringChangelogs() {
+    private synchronized Set<ChangelogMetadata> restoringChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == 
ChangelogState.RESTORING)
-            .map(metadata -> metadata.storeMetadata.changelogPartition())
             .collect(Collectors.toSet());
     }
 
-    private Set<TopicPartition> activeRestoringChangelogs() {
+    private synchronized Set<TopicPartition> activeRestoringChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == 
ChangelogState.RESTORING &&
                 metadata.stateManager.taskType() == Task.TaskType.ACTIVE)
             .map(metadata -> metadata.storeMetadata.changelogPartition())
             .collect(Collectors.toSet());
     }
 
-    private Set<TopicPartition> standbyRestoringChangelogs() {
+    private synchronized Set<TopicPartition> standbyRestoringChangelogs() {
         return changelogs.values().stream()
             .filter(metadata -> metadata.changelogState == 
ChangelogState.RESTORING &&
                 metadata.stateManager.taskType() == Task.TaskType.STANDBY)
             .map(metadata -> metadata.storeMetadata.changelogPartition())
             .collect(Collectors.toSet());
     }
 
-    private boolean allChangelogsCompleted() {
+    private synchronized Set<ChangelogMetadata> allChangelogs() {
+        // we need to make a shallow copy of this set for thread-safety
+        return new HashSet<>(changelogs.values());
+    }
+
+    private synchronized boolean allChangelogsCompleted() {
         return changelogs.values().stream()
             .allMatch(metadata -> metadata.changelogState == 
ChangelogState.COMPLETED);
     }
 
     @Override
-    public Set<TopicPartition> completedChangelogs() {
+    public synchronized Set<TopicPartition> completedChangelogs() {
         return changelogs.values().stream()
-            .filter(metadata -> metadata.changelogState == 
ChangelogState.COMPLETED)
-            .map(metadata -> metadata.storeMetadata.changelogPartition())
-            .collect(Collectors.toSet());
+                .filter(metadata -> metadata.changelogState == 
ChangelogState.COMPLETED)
+                .map(metadata -> metadata.storeMetadata.changelogPartition())
+                .collect(Collectors.toSet());
     }
 
-    // 1. if there are any registered changelogs that needs initialization, 
try to initialize them first;
-    // 2. if all changelogs have finished, return early;
-    // 3. if there are any restoring changelogs, try to read from the restore 
consumer and process them.
-    public void restore() {
-        initializeChangelogs(registeredChangelogs());
+    /**
+     * 1. if there are any registered changelogs that needs initialization, 
try to initialize them first;
+     * 2. if all changelogs have finished, return early;
+     * 3. if there are any restoring changelogs, try to read from the restore 
consumer and process them.
+     *
+     * @throws StreamsException       If there are unexpected exception thrown 
during the restoration
+     * @throws TaskCorruptedException If the changelog has been truncated 
while restoration is still on-going
+     */
+    public int restore() {
+        final ChangelogReaderState currentState;
+        synchronized (this) {
+            while (allChangelogsCompleted()) {
+                log.debug("All changelogs {} have completed restoration so 
far, will wait " +
+                        "until new changelogs are registered", 
changelogs.keySet());
+
+                try {
+                    wait();
+                } catch (final InterruptedException e) {
+                    log.trace("Interrupted with updated changelogs {}", 
changelogs.keySet());
+                }
+            }
+
+            currentState = state;
+        }
+
+        initializeChangelogs(currentState, registeredChangelogs());
 
-        if (!activeRestoringChangelogs().isEmpty() && state == 
ChangelogReaderState.STANDBY_UPDATING) {
+        if (!activeRestoringChangelogs().isEmpty() && currentState == 
ChangelogReaderState.STANDBY_UPDATING) {
             throw new IllegalStateException("Should not be in standby updating 
state if there are still un-completed active changelogs");
         }
 
-        if (allChangelogsCompleted()) {
-            log.debug("Finished restoring all changelogs {}", 
changelogs.keySet());
-            return;
+        // we would pause or resume the partitions for standbys depending on 
the state, this operation is idempotent
+        if (currentState == ChangelogReaderState.ACTIVE_RESTORING) {
+            pauseChangelogsFromRestoreConsumer(standbyRestoringChangelogs());
+        } else {
+            resumeChangelogsFromRestoreConsumer(standbyRestoringChangelogs());
         }
 
-        final Set<TopicPartition> restoringChangelogs = restoringChangelogs();
+        final Set<ChangelogMetadata> restoringChangelogs = 
restoringChangelogs();
+        int totalRecordsRestored = 0;
         if (!restoringChangelogs.isEmpty()) {
             final ConsumerRecords<byte[], byte[]> polledRecords;
 
             try {
-                // for restoring active and updating standby we may prefer 
different poll time
-                // in order to make sure we call the main consumer#poll in 
time.
-                // TODO: once we move ChangelogReader to a separate thread 
this may no longer be a concern
-                polledRecords = restoreConsumer.poll(state == 
ChangelogReaderState.STANDBY_UPDATING ? Duration.ZERO : pollTime);

Review comment:
       Why did we remove this logic? AFAICT standbys are still processed in the 
main StreamThread and the whole point of this was to make sure we don't block 
active processing on polling standby tasks.
   
   But I wasn't paying close attention to this particular issue/PR so I may 
have misremembered or misunderstood

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestoreThread.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+
+/**
+ * This is the thread responsible for restoring state stores for both active 
and standby tasks
+ */
+public class StateRestoreThread extends Thread {
+
+    private final Time time;
+    private final Logger log;
+    private final StreamsConfig config;
+    private final ChangelogReader changelogReader;
+    private final AtomicBoolean isRunning = new AtomicBoolean(true);
+    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+    private final ConcurrentLinkedDeque<TaskCorruptedException> 
corruptedExceptions;
+
+    public boolean isRunning() {
+        return isRunning.get();
+    }
+
+    public StateRestoreThread(final Time time,
+                              final StreamsConfig config,
+                              final String threadClientId,
+                              final ChangelogReader changelogReader) {
+        super(threadClientId);
+        this.time = time;
+        this.config = config;
+        this.changelogReader = changelogReader;
+        this.corruptedExceptions = new ConcurrentLinkedDeque<>();
+
+        final String logPrefix = String.format("state-restore-thread [%s] ", 
threadClientId);
+        final LogContext logContext = new LogContext(logPrefix);
+        this.log = logContext.logger(getClass());
+    }
+
+    @Override
+    public void run() {
+        try {
+            while (isRunning()) {
+                final long startMs = time.milliseconds();
+
+                try {
+                    // try to restore some changelogs, if there's nothing to 
restore it would wait inside this call
+                    final int numRestored = changelogReader.restore();
+                    // TODO: we should record the restoration related metrics
+                    log.info("Restored {} records in {} ms", numRestored, 
time.milliseconds() - startMs);
+                } catch (final TaskCorruptedException e) {
+                    log.warn("Detected the states of tasks " + 
e.corruptedTaskWithChangelogs() + " are corrupted. " +
+                            "Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
+
+                    // remove corrupted partitions form the changelog reader 
and continue; we can still proceed
+                    // and restore other partitions until the main thread come 
to handle this exception
+                    
changelogReader.unregister(e.corruptedTaskWithChangelogs().values().stream()

Review comment:
       I'm not sure this is strictly incorrect since unregistering is 
idempotent, but it certainly seems unnecessary. Why do we need to unregister 
the changelogs here? We do so in `ProcessorStateManager`'s `close` which should 
be called in `handleCorruption` from the main thread. 
   I'm also generally against unregistering something so far away from where it 
gets registered 🙂 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to