mjsax commented on code in PR #16922:
URL: https://github.com/apache/kafka/pull/16922#discussion_r1760384313


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -185,6 +200,108 @@ private boolean lockStateDirectory() {
         return stateDirLock != null;
     }
 
+    public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+        final List<TaskDirectory> nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
+        if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+            final LogContext logContext = new LogContext("main-thread ");
+            final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
+            final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+            final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+            // discover all non-empty task directories in StateDirectory
+            for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+                final String dirName = taskDirectory.file().getName();
+                final TaskId id = parseTaskDirectoryName(dirName, 
taskDirectory.namedTopology());
+                final ProcessorTopology topology = 
topologyMetadata.buildSubtopology(id);
+                final Set<TopicPartition> inputPartitions = 
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic, 
id.partition())).collect(Collectors.toSet());
+
+                // we still check whether the Task sub-topology is stateful, 
even though we know its directory contains
+                // state, because it's possible the topology has changed since 
that data was written, and is now stateless

Review Comment:
   ```suggestion
                   // state, because it's possible that the topology has 
changed since that data was written, and is now stateless
   ```



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -662,6 +662,9 @@ private void maybeSetRunning() {
                 return;
             }
 
+            // all (alive) threads have received their assignment, close any 
remaining "pending" Tasks, they're not needed

Review Comment:
   Not sure why you spell `Task` with capital letter? As "task" is just a 
concept and not a name (and referring to `Task` interface seems odd?).
   
   Applies to all comments.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -185,6 +200,108 @@ private boolean lockStateDirectory() {
         return stateDirLock != null;
     }
 
+    public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+        final List<TaskDirectory> nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
+        if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+            final LogContext logContext = new LogContext("main-thread ");
+            final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
+            final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+            final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+            // discover all non-empty task directories in StateDirectory
+            for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+                final String dirName = taskDirectory.file().getName();
+                final TaskId id = parseTaskDirectoryName(dirName, 
taskDirectory.namedTopology());
+                final ProcessorTopology topology = 
topologyMetadata.buildSubtopology(id);

Review Comment:
   ```suggestion
                   final ProcessorTopology subTopology = 
topologyMetadata.buildSubtopology(id);
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -185,6 +200,108 @@ private boolean lockStateDirectory() {
         return stateDirLock != null;
     }
 
+    public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+        final List<TaskDirectory> nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
+        if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+            final LogContext logContext = new LogContext("main-thread ");

Review Comment:
   Should we rather pass `LogContext` as parameter and use the one from 
`KafkaStreams` instead of creating a new one?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -325,6 +325,31 @@ private void closeDirtyAndRevive(final Collection<Task> 
taskWithChangelogs, fina
         }
     }
 
+    private Map<Task, Set<TopicPartition>> assignPendingTasks(final 
Map<TaskId, Set<TopicPartition>> tasksToAssign,
+                                                              final String 
threadLogPrefix,
+                                                              final 
TopologyMetadata topologyMetadata,
+                                                              final 
ChangelogRegister changelogReader) {
+        if (stateDirectory.hasPendingTasks()) {
+            final Map<Task, Set<TopicPartition>> assignedTasks = new 
HashMap<>(tasksToAssign.size());
+            for (final Map.Entry<TaskId, Set<TopicPartition>> entry : 
tasksToAssign.entrySet()) {
+                final TaskId taskId = entry.getKey();
+                final Set<TopicPartition> inputPartitions = entry.getValue();

Review Comment:
   Seems we need `inputPartitions` only inside the `if` block -- should we move 
this line inside the if-block?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -185,6 +200,108 @@ private boolean lockStateDirectory() {
         return stateDirLock != null;
     }
 
+    public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+        final List<TaskDirectory> nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
+        if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+            final LogContext logContext = new LogContext("main-thread ");
+            final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
+            final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+            final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+            // discover all non-empty task directories in StateDirectory
+            for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+                final String dirName = taskDirectory.file().getName();
+                final TaskId id = parseTaskDirectoryName(dirName, 
taskDirectory.namedTopology());
+                final ProcessorTopology topology = 
topologyMetadata.buildSubtopology(id);
+                final Set<TopicPartition> inputPartitions = 
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic, 
id.partition())).collect(Collectors.toSet());
+
+                // we still check whether the Task sub-topology is stateful, 
even though we know its directory contains

Review Comment:
   ```suggestion
                   // we still check if the task's sub-topology is stateful, 
even though we know its directory contains
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -474,12 +596,18 @@ private void cleanRemovedTasksCalledByCleanerThread(final 
long cleanupDelayMs) {
         for (final TaskDirectory taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.file().getName();
             final TaskId id = parseTaskDirectoryName(dirName, 
taskDir.namedTopology());
-            if (!lockedTasksToOwner.containsKey(id)) {
+            if (!lockedTasksToOwner.containsKey(id) || 
tasksForLocalState.containsKey(id)) {

Review Comment:
   Is this condition correct?
   
   We don't want the cleaner thread to delete a local task directory, if we 
hold a lock (`!lockedTasksToOwner.containsKey(id)`) as for this case we own 
this task as active or standby, but we also don't want it to clean a task 
directory if we are in the startup phase, right?
   
   So should it be `&& !tasksForLocalState.containsKey(id)` ?
   
   Clean task dir, if task is not owned and if task is not a "pending standby".



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -222,6 +222,21 @@ public ProcessorStateManager(final TaskId taskId,
         log.debug("Created state store manager for task {}", taskId);
     }
 
+    // standby tasks initialized for local state on-startup need to have their 
stateManager internals updated for the

Review Comment:
   ```suggestion
       // standby tasks initialized for local state on-startup need to have 
their StateManager internals updated for the
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -397,9 +514,14 @@ synchronized void unlock(final TaskId taskId) {
         }
     }
 
+    Thread lockOwner(final TaskId taskId) {

Review Comment:
   Seems we need to only for testing? If yes, let's add a comment



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -185,6 +200,108 @@ private boolean lockStateDirectory() {
         return stateDirLock != null;
     }
 
+    public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata, final StreamsMetricsImpl streamsMetrics) {
+        final List<TaskDirectory> nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
+        if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+            final LogContext logContext = new LogContext("main-thread ");
+            final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
+            final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+            final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+            // discover all non-empty task directories in StateDirectory
+            for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+                final String dirName = taskDirectory.file().getName();
+                final TaskId id = parseTaskDirectoryName(dirName, 
taskDirectory.namedTopology());
+                final ProcessorTopology topology = 
topologyMetadata.buildSubtopology(id);
+                final Set<TopicPartition> inputPartitions = 
topology.sourceTopics().stream().map(topic -> new TopicPartition(topic, 
id.partition())).collect(Collectors.toSet());
+
+                // we still check whether the Task sub-topology is stateful, 
even though we know its directory contains
+                // state, because it's possible the topology has changed since 
that data was written, and is now stateless
+                // this therefore prevents us from creating unnecessary Tasks 
just because of some left-over state
+                if (topology.hasStateWithChangelogs()) {
+                    final ProcessorStateManager stateManager = new 
ProcessorStateManager(
+                        id,
+                        Task.TaskType.STANDBY,
+                        eosEnabled,
+                        logContext,
+                        this,
+                        null,
+                        topology.storeToChangelogTopic(),
+                        inputPartitions,
+                        stateUpdaterEnabled
+                    );
+
+                    final InternalProcessorContext<Object, Object> context = 
new ProcessorContextImpl(
+                        id,
+                        config,
+                        stateManager,
+                        streamsMetrics,
+                        dummyCache
+                    );
+
+                    final Task task = new StandbyTask(
+                        id,
+                        inputPartitions,
+                        topology,
+                        topologyMetadata.taskConfig(id),
+                        streamsMetrics,
+                        stateManager,
+                        this,
+                        dummyCache,
+                        context
+                    );
+
+                    // initialize and suspend new Tasks

Review Comment:
   Why "and suspend"? Does also not match the code.
   
   Do we need a comment at all?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java:
##########
@@ -829,6 +840,133 @@ public void shouldReadFutureProcessFileFormat() throws 
Exception {
         assertThat(directory.initializeProcessId(), equalTo(processId));
     }
 
+    @Test
+    public void shouldNotInitializeStandbyTasksWhenNoLocalState() {
+        initializeTasksForLocalState(new TaskId(0, 0), false);
+        assertFalse(directory.hasPendingTasks());
+    }
+
+    @Test
+    public void shouldInitializeStandbyTasksForLocalState() {
+        initializeTasksForLocalState(new TaskId(0, 0), true);
+        assertTrue(directory.hasPendingTasks());
+    }
+
+    @Test
+    public void shouldNotAssignPendingTasksWeDontHave() {
+        final TaskId taskId = new TaskId(0, 0);
+        initializeTasksForLocalState(taskId, false);
+        final Task task = directory.removePendingTask(taskId);
+        assertNull(task);
+    }
+
+    private class FakeStreamThread extends Thread {
+        private final TaskId taskId;
+        private final AtomicReference<Task> result;
+
+        private FakeStreamThread(final TaskId taskId, final 
AtomicReference<Task> result) {
+            this.taskId = taskId;
+            this.result = result;
+        }
+
+        @Override
+        public void run() {
+            result.set(directory.removePendingTask(taskId));
+        }
+    }
+
+    @Test
+    public void shouldAssignPendingTaskToStreamThread() throws 
InterruptedException {
+        final TaskId taskId = new TaskId(0, 0);
+
+        initializeTasksForLocalState(taskId, true);
+
+        // main thread owns the newly initialized tasks
+        assertThat(directory.lockOwner(taskId), is(Thread.currentThread()));
+
+        // spawn off a "fake" StreamThread, so we can verify the lock was 
updated to the correct thread
+        final AtomicReference<Task> result = new AtomicReference<>();
+        final Thread streamThread = new FakeStreamThread(taskId, result);
+        streamThread.start();
+        streamThread.join();
+        final Task task = result.get();
+
+        assertNotNull(task);
+        assertThat(task, instanceOf(StandbyTask.class));
+
+        // verify the owner of the task directory lock has been shifted over 
to our assigned StreamThread
+        assertThat(directory.lockOwner(taskId), 
is(instanceOf(FakeStreamThread.class)));
+    }
+
+    @Test
+    public void shouldClosePendingTasksOnDirectoryClose() {
+        final StateStore store = initializeTasksForLocalState(new TaskId(0, 
0), true);
+
+        assertTrue(directory.hasPendingTasks());
+        assertTrue(store.isOpen());
+
+        directory.close();
+
+        assertFalse(directory.hasPendingTasks());
+        assertFalse(store.isOpen());
+    }
+
+    @Test
+    public void shouldClosePendingTasksOnAutoCleanUp() {

Review Comment:
   Do we want to do it this way? I though this test should be 
`shoud[Not]ClosePendingTasksOnAutoCleanUp` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to