bbejeck commented on code in PR #20749:
URL: https://github.com/apache/kafka/pull/20749#discussion_r2604522491
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -255,6 +279,10 @@ void assignToStreamThread(final LogContext logContext,
this.sourcePartitions.addAll(sourcePartitions);
}
+ void reuseState() {
Review Comment:
What are the semantics around `reuseState`? Seems a little odd that calling
`reuse` sets false, probably could use a small comment or a method rename.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -205,7 +212,8 @@ public ProcessorStateManager(final TaskId taskId,
final ChangelogRegister changelogReader,
final Map<String, String>
storeToChangelogTopic,
final Collection<TopicPartition>
sourcePartitions,
- final boolean stateUpdaterEnabled) throws
ProcessorStateException {
+ final boolean stateUpdaterEnabled,
+ final boolean startupState) throws
ProcessorStateException {
Review Comment:
What does `startupState` represent, maybe rename to something like
`hasStateOnStartup`?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -333,25 +333,19 @@ private void closeDirtyAndRevive(final Collection<Task>
taskWithChangelogs, fina
}
}
- private Map<Task, Set<TopicPartition>> assignStartupTasks(final
Map<TaskId, Set<TopicPartition>> tasksToAssign,
- final String
threadLogPrefix,
- final
TopologyMetadata topologyMetadata,
Review Comment:
Why did these parameters go away?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##########
@@ -220,6 +228,22 @@ public ProcessorStateManager(final TaskId taskId,
this.checkpointFile = new
OffsetCheckpoint(stateDirectory.checkpointFileFor(taskId));
log.debug("Created state store manager for task {}", taskId);
+ this.startupState = new AtomicBoolean(startupState);
Review Comment:
same would apply to the `AtomicBoolean` variable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]