nicktelford commented on code in PR #16922:
URL: https://github.com/apache/kafka/pull/16922#discussion_r1818812104


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -182,6 +196,105 @@ private boolean lockStateDirectory() {
         return stateDirLock != null;
     }
 
+    public void initializeTasksForLocalState(final TopologyMetadata 
topologyMetadata,
+                                             final StreamsMetricsImpl 
streamsMetrics,
+                                             final LogContext logContext) {
+        final List<TaskDirectory> nonEmptyTaskDirectories = 
listNonEmptyTaskDirectories();
+        if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
+            final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
+            final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+            final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
+
+            // discover all non-empty task directories in StateDirectory
+            for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
+                final String dirName = taskDirectory.file().getName();
+                final TaskId id = parseTaskDirectoryName(dirName, 
taskDirectory.namedTopology());
+                final ProcessorTopology subTopology = 
topologyMetadata.buildSubtopology(id);
+
+                // we still check if the task's sub-topology is stateful, even 
though we know its directory contains state,
+                // because it's possible that the topology has changed since 
that data was written, and is now stateless

Review Comment:
   I think the more common case would be when sub-topologies get re-ordered, so 
the Task ordinals change.
   I have 
[KIP-816](https://cwiki.apache.org/confluence/display/KAFKA/KIP-816%3A+Topology+changes+without+local+state+reset)
 to address this, but until it is addressed, I'd hesitate to log a warning 
here, as it would be spurious most of the time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to