nicktelford commented on code in PR #16922: URL: https://github.com/apache/kafka/pull/16922#discussion_r1818812104
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java: ########## @@ -182,6 +196,105 @@ private boolean lockStateDirectory() { return stateDirLock != null; } + public void initializeTasksForLocalState(final TopologyMetadata topologyMetadata, + final StreamsMetricsImpl streamsMetrics, + final LogContext logContext) { + final List<TaskDirectory> nonEmptyTaskDirectories = listNonEmptyTaskDirectories(); + if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) { + final ThreadCache dummyCache = new ThreadCache(logContext, 0, streamsMetrics); + final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config); + final boolean stateUpdaterEnabled = StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals()); + + // discover all non-empty task directories in StateDirectory + for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) { + final String dirName = taskDirectory.file().getName(); + final TaskId id = parseTaskDirectoryName(dirName, taskDirectory.namedTopology()); + final ProcessorTopology subTopology = topologyMetadata.buildSubtopology(id); + + // we still check if the task's sub-topology is stateful, even though we know its directory contains state, + // because it's possible that the topology has changed since that data was written, and is now stateless Review Comment: I think the more common case would be when sub-topologies get re-ordered, so the Task ordinals change. I have [KIP-816](https://cwiki.apache.org/confluence/display/KAFKA/KIP-816%3A+Topology+changes+without+local+state+reset) to address this, but until it is addressed, I'd hesitate to log a warning here, as it would be spurious most of the time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org