bbejeck commented on code in PR #20749:
URL: https://github.com/apache/kafka/pull/20749#discussion_r2604522491


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -255,6 +279,10 @@ void assignToStreamThread(final LogContext logContext,
         this.sourcePartitions.addAll(sourcePartitions);
     }
 
+    void reuseState() {

Review Comment:
   What are the semantics around `reuseState`? Seems a little odd that calling 
`reuse` sets false, probably could use a small comment or a method rename.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -205,7 +212,8 @@ public ProcessorStateManager(final TaskId taskId,
                                  final ChangelogRegister changelogReader,
                                  final Map<String, String> 
storeToChangelogTopic,
                                  final Collection<TopicPartition> 
sourcePartitions,
-                                 final boolean stateUpdaterEnabled) throws 
ProcessorStateException {
+                                 final boolean stateUpdaterEnabled,
+                                 final boolean startupState) throws 
ProcessorStateException {

Review Comment:
   What does `startupState` represent, maybe rename to something like 
`hasStateOnStartup`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -333,25 +333,19 @@ private void closeDirtyAndRevive(final Collection<Task> 
taskWithChangelogs, fina
         }
     }
 
-    private Map<Task, Set<TopicPartition>> assignStartupTasks(final 
Map<TaskId, Set<TopicPartition>> tasksToAssign,
-                                                              final String 
threadLogPrefix,
-                                                              final 
TopologyMetadata topologyMetadata,

Review Comment:
   Why did these parameters go away?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -220,6 +228,22 @@ public ProcessorStateManager(final TaskId taskId,
         this.checkpointFile = new 
OffsetCheckpoint(stateDirectory.checkpointFileFor(taskId));
 
         log.debug("Created state store manager for task {}", taskId);
+        this.startupState = new AtomicBoolean(startupState);

Review Comment:
   same would apply to the `AtomicBoolean` variable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to