mjsax commented on code in PR #16922: URL: https://github.com/apache/kafka/pull/16922#discussion_r1760384313
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java: ########## @@ -185,6 +200,108 @@ private boolean lockStateDirectory() { return stateDirLock != null; } + public void initializeTasksForLocalState(final TopologyMetadata topologyMetadata, final StreamsMetricsImpl streamsMetrics) { + final List<TaskDirectory> nonEmptyTaskDirectories = listNonEmptyTaskDirectories(); + if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) { + final LogContext logContext = new LogContext("main-thread "); + final ThreadCache dummyCache = new ThreadCache(logContext, 0, streamsMetrics); + final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config); + final boolean stateUpdaterEnabled = StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals()); + + // discover all non-empty task directories in StateDirectory + for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) { + final String dirName = taskDirectory.file().getName(); + final TaskId id = parseTaskDirectoryName(dirName, taskDirectory.namedTopology()); + final ProcessorTopology topology = topologyMetadata.buildSubtopology(id); + final Set<TopicPartition> inputPartitions = topology.sourceTopics().stream().map(topic -> new TopicPartition(topic, id.partition())).collect(Collectors.toSet()); + + // we still check whether the Task sub-topology is stateful, even though we know its directory contains + // state, because it's possible the topology has changed since that data was written, and is now stateless Review Comment: ```suggestion // state, because it's possible that the topology has changed since that data was written, and is now stateless ``` ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -662,6 +662,9 @@ private void maybeSetRunning() { return; } + // all (alive) threads have received their assignment, close any remaining "pending" Tasks, they're not needed Review Comment: Not sure why you spell `Task` with capital letter? As "task" is just a concept and not a name (and referring to `Task` interface seems odd?). Applies to all comments. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java: ########## @@ -185,6 +200,108 @@ private boolean lockStateDirectory() { return stateDirLock != null; } + public void initializeTasksForLocalState(final TopologyMetadata topologyMetadata, final StreamsMetricsImpl streamsMetrics) { + final List<TaskDirectory> nonEmptyTaskDirectories = listNonEmptyTaskDirectories(); + if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) { + final LogContext logContext = new LogContext("main-thread "); + final ThreadCache dummyCache = new ThreadCache(logContext, 0, streamsMetrics); + final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config); + final boolean stateUpdaterEnabled = StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals()); + + // discover all non-empty task directories in StateDirectory + for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) { + final String dirName = taskDirectory.file().getName(); + final TaskId id = parseTaskDirectoryName(dirName, taskDirectory.namedTopology()); + final ProcessorTopology topology = topologyMetadata.buildSubtopology(id); Review Comment: ```suggestion final ProcessorTopology subTopology = topologyMetadata.buildSubtopology(id); ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java: ########## @@ -185,6 +200,108 @@ private boolean lockStateDirectory() { return stateDirLock != null; } + public void initializeTasksForLocalState(final TopologyMetadata topologyMetadata, final StreamsMetricsImpl streamsMetrics) { + final List<TaskDirectory> nonEmptyTaskDirectories = listNonEmptyTaskDirectories(); + if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) { + final LogContext logContext = new LogContext("main-thread "); Review Comment: Should we rather pass `LogContext` as parameter and use the one from `KafkaStreams` instead of creating a new one? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -325,6 +325,31 @@ private void closeDirtyAndRevive(final Collection<Task> taskWithChangelogs, fina } } + private Map<Task, Set<TopicPartition>> assignPendingTasks(final Map<TaskId, Set<TopicPartition>> tasksToAssign, + final String threadLogPrefix, + final TopologyMetadata topologyMetadata, + final ChangelogRegister changelogReader) { + if (stateDirectory.hasPendingTasks()) { + final Map<Task, Set<TopicPartition>> assignedTasks = new HashMap<>(tasksToAssign.size()); + for (final Map.Entry<TaskId, Set<TopicPartition>> entry : tasksToAssign.entrySet()) { + final TaskId taskId = entry.getKey(); + final Set<TopicPartition> inputPartitions = entry.getValue(); Review Comment: Seems we need `inputPartitions` only inside the `if` block -- should we move this line inside the if-block? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java: ########## @@ -185,6 +200,108 @@ private boolean lockStateDirectory() { return stateDirLock != null; } + public void initializeTasksForLocalState(final TopologyMetadata topologyMetadata, final StreamsMetricsImpl streamsMetrics) { + final List<TaskDirectory> nonEmptyTaskDirectories = listNonEmptyTaskDirectories(); + if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) { + final LogContext logContext = new LogContext("main-thread "); + final ThreadCache dummyCache = new ThreadCache(logContext, 0, streamsMetrics); + final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config); + final boolean stateUpdaterEnabled = StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals()); + + // discover all non-empty task directories in StateDirectory + for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) { + final String dirName = taskDirectory.file().getName(); + final TaskId id = parseTaskDirectoryName(dirName, taskDirectory.namedTopology()); + final ProcessorTopology topology = topologyMetadata.buildSubtopology(id); + final Set<TopicPartition> inputPartitions = topology.sourceTopics().stream().map(topic -> new TopicPartition(topic, id.partition())).collect(Collectors.toSet()); + + // we still check whether the Task sub-topology is stateful, even though we know its directory contains Review Comment: ```suggestion // we still check if the task's sub-topology is stateful, even though we know its directory contains ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java: ########## @@ -474,12 +596,18 @@ private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) { for (final TaskDirectory taskDir : listAllTaskDirectories()) { final String dirName = taskDir.file().getName(); final TaskId id = parseTaskDirectoryName(dirName, taskDir.namedTopology()); - if (!lockedTasksToOwner.containsKey(id)) { + if (!lockedTasksToOwner.containsKey(id) || tasksForLocalState.containsKey(id)) { Review Comment: Is this condition correct? We don't want the cleaner thread to delete a local task directory, if we hold a lock (`!lockedTasksToOwner.containsKey(id)`) as for this case we own this task as active or standby, but we also don't want it to clean a task directory if we are in the startup phase, right? So should it be `&& !tasksForLocalState.containsKey(id)` ? Clean task dir, if task is not owned and if task is not a "pending standby". ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java: ########## @@ -222,6 +222,21 @@ public ProcessorStateManager(final TaskId taskId, log.debug("Created state store manager for task {}", taskId); } + // standby tasks initialized for local state on-startup need to have their stateManager internals updated for the Review Comment: ```suggestion // standby tasks initialized for local state on-startup need to have their StateManager internals updated for the ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java: ########## @@ -397,9 +514,14 @@ synchronized void unlock(final TaskId taskId) { } } + Thread lockOwner(final TaskId taskId) { Review Comment: Seems we need to only for testing? If yes, let's add a comment ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java: ########## @@ -185,6 +200,108 @@ private boolean lockStateDirectory() { return stateDirLock != null; } + public void initializeTasksForLocalState(final TopologyMetadata topologyMetadata, final StreamsMetricsImpl streamsMetrics) { + final List<TaskDirectory> nonEmptyTaskDirectories = listNonEmptyTaskDirectories(); + if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) { + final LogContext logContext = new LogContext("main-thread "); + final ThreadCache dummyCache = new ThreadCache(logContext, 0, streamsMetrics); + final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config); + final boolean stateUpdaterEnabled = StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals()); + + // discover all non-empty task directories in StateDirectory + for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) { + final String dirName = taskDirectory.file().getName(); + final TaskId id = parseTaskDirectoryName(dirName, taskDirectory.namedTopology()); + final ProcessorTopology topology = topologyMetadata.buildSubtopology(id); + final Set<TopicPartition> inputPartitions = topology.sourceTopics().stream().map(topic -> new TopicPartition(topic, id.partition())).collect(Collectors.toSet()); + + // we still check whether the Task sub-topology is stateful, even though we know its directory contains + // state, because it's possible the topology has changed since that data was written, and is now stateless + // this therefore prevents us from creating unnecessary Tasks just because of some left-over state + if (topology.hasStateWithChangelogs()) { + final ProcessorStateManager stateManager = new ProcessorStateManager( + id, + Task.TaskType.STANDBY, + eosEnabled, + logContext, + this, + null, + topology.storeToChangelogTopic(), + inputPartitions, + stateUpdaterEnabled + ); + + final InternalProcessorContext<Object, Object> context = new ProcessorContextImpl( + id, + config, + stateManager, + streamsMetrics, + dummyCache + ); + + final Task task = new StandbyTask( + id, + inputPartitions, + topology, + topologyMetadata.taskConfig(id), + streamsMetrics, + stateManager, + this, + dummyCache, + context + ); + + // initialize and suspend new Tasks Review Comment: Why "and suspend"? Does also not match the code. Do we need a comment at all? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java: ########## @@ -829,6 +840,133 @@ public void shouldReadFutureProcessFileFormat() throws Exception { assertThat(directory.initializeProcessId(), equalTo(processId)); } + @Test + public void shouldNotInitializeStandbyTasksWhenNoLocalState() { + initializeTasksForLocalState(new TaskId(0, 0), false); + assertFalse(directory.hasPendingTasks()); + } + + @Test + public void shouldInitializeStandbyTasksForLocalState() { + initializeTasksForLocalState(new TaskId(0, 0), true); + assertTrue(directory.hasPendingTasks()); + } + + @Test + public void shouldNotAssignPendingTasksWeDontHave() { + final TaskId taskId = new TaskId(0, 0); + initializeTasksForLocalState(taskId, false); + final Task task = directory.removePendingTask(taskId); + assertNull(task); + } + + private class FakeStreamThread extends Thread { + private final TaskId taskId; + private final AtomicReference<Task> result; + + private FakeStreamThread(final TaskId taskId, final AtomicReference<Task> result) { + this.taskId = taskId; + this.result = result; + } + + @Override + public void run() { + result.set(directory.removePendingTask(taskId)); + } + } + + @Test + public void shouldAssignPendingTaskToStreamThread() throws InterruptedException { + final TaskId taskId = new TaskId(0, 0); + + initializeTasksForLocalState(taskId, true); + + // main thread owns the newly initialized tasks + assertThat(directory.lockOwner(taskId), is(Thread.currentThread())); + + // spawn off a "fake" StreamThread, so we can verify the lock was updated to the correct thread + final AtomicReference<Task> result = new AtomicReference<>(); + final Thread streamThread = new FakeStreamThread(taskId, result); + streamThread.start(); + streamThread.join(); + final Task task = result.get(); + + assertNotNull(task); + assertThat(task, instanceOf(StandbyTask.class)); + + // verify the owner of the task directory lock has been shifted over to our assigned StreamThread + assertThat(directory.lockOwner(taskId), is(instanceOf(FakeStreamThread.class))); + } + + @Test + public void shouldClosePendingTasksOnDirectoryClose() { + final StateStore store = initializeTasksForLocalState(new TaskId(0, 0), true); + + assertTrue(directory.hasPendingTasks()); + assertTrue(store.isOpen()); + + directory.close(); + + assertFalse(directory.hasPendingTasks()); + assertFalse(store.isOpen()); + } + + @Test + public void shouldClosePendingTasksOnAutoCleanUp() { Review Comment: Do we want to do it this way? I though this test should be `shoud[Not]ClosePendingTasksOnAutoCleanUp` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org