eduwercamacaro commented on code in PR #20749:
URL: https://github.com/apache/kafka/pull/20749#discussion_r2788115921
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -237,35 +252,18 @@ public void initializeStartupTasks(final TopologyMetadata
topologyMetadata,
inputPartitions
);
- final InternalProcessorContext<Object, Object> context =
new ProcessorContextImpl(
- id,
- config,
- stateManager,
- streamsMetrics,
- dummyCache
- );
-
- final Task task = new StandbyTask(
- id,
- inputPartitions,
- subTopology,
- topologyMetadata.taskConfig(id),
- streamsMetrics,
- stateManager,
- this,
- dummyCache,
- context
- );
-
+ final StartupContext initContext = new StartupContext(id,
config, temporaryStateManager, metricsImpl, cache);
try {
- task.initializeIfNeeded();
-
- tasksForLocalState.put(id, task);
- } catch (final TaskCorruptedException e) {
- // Task is corrupt - wipe it out (under EOS) and don't
initialize a Standby for it
- task.suspend();
- task.closeDirty();
+ StateManagerUtil.registerStateStores(log,
threadLogPrefix, subTopology, temporaryStateManager, this, initContext);
+ } catch (final TaskCorruptedException tce) {
+ log.warn("Failed to register startup state stores for
task {}: {}", id, tce.getMessage());
+ } finally {
+ // Make sure the state manager writes the local
checkpoint file before closing the stores
+ // This will be replaced in the future when removing
the checkpoint file dependency.
+ temporaryStateManager.checkpoint();
Review Comment:
I moved this logic inside the try/catch block to ensure that we always call
the `close()` method. Any exception from the `close()` method is considered
fatal and will break the main thread.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -237,35 +252,18 @@ public void initializeStartupTasks(final TopologyMetadata
topologyMetadata,
inputPartitions
);
- final InternalProcessorContext<Object, Object> context =
new ProcessorContextImpl(
- id,
- config,
- stateManager,
- streamsMetrics,
- dummyCache
- );
-
- final Task task = new StandbyTask(
- id,
- inputPartitions,
- subTopology,
- topologyMetadata.taskConfig(id),
- streamsMetrics,
- stateManager,
- this,
- dummyCache,
- context
- );
-
+ final StartupContext initContext = new StartupContext(id,
config, temporaryStateManager, metricsImpl, cache);
try {
- task.initializeIfNeeded();
-
- tasksForLocalState.put(id, task);
- } catch (final TaskCorruptedException e) {
- // Task is corrupt - wipe it out (under EOS) and don't
initialize a Standby for it
- task.suspend();
- task.closeDirty();
+ StateManagerUtil.registerStateStores(log,
threadLogPrefix, subTopology, temporaryStateManager, this, initContext);
+ } catch (final TaskCorruptedException tce) {
+ log.warn("Failed to register startup state stores for
task {}: {}", id, tce.getMessage());
+ } finally {
+ // Make sure the state manager writes the local
checkpoint file before closing the stores
+ // This will be replaced in the future when removing
the checkpoint file dependency.
+ temporaryStateManager.checkpoint();
+ temporaryStateManager.close();
Review Comment:
Currently, it is not needed because we keep the lock on the main thread
until it is assigned to a SthreadThread. TaskManager changes the ownership of
the Task using 'StateDirectory.removeStartupState'.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -237,35 +252,18 @@ public void initializeStartupTasks(final TopologyMetadata
topologyMetadata,
inputPartitions
);
- final InternalProcessorContext<Object, Object> context =
new ProcessorContextImpl(
- id,
- config,
- stateManager,
- streamsMetrics,
- dummyCache
- );
-
- final Task task = new StandbyTask(
- id,
- inputPartitions,
- subTopology,
- topologyMetadata.taskConfig(id),
- streamsMetrics,
- stateManager,
- this,
- dummyCache,
- context
- );
-
+ final StartupContext initContext = new StartupContext(id,
config, temporaryStateManager, metricsImpl, cache);
try {
- task.initializeIfNeeded();
-
- tasksForLocalState.put(id, task);
- } catch (final TaskCorruptedException e) {
- // Task is corrupt - wipe it out (under EOS) and don't
initialize a Standby for it
- task.suspend();
- task.closeDirty();
+ StateManagerUtil.registerStateStores(log,
threadLogPrefix, subTopology, temporaryStateManager, this, initContext);
+ } catch (final TaskCorruptedException tce) {
+ log.warn("Failed to register startup state stores for
task {}: {}", id, tce.getMessage());
+ } finally {
+ // Make sure the state manager writes the local
checkpoint file before closing the stores
+ // This will be replaced in the future when removing
the checkpoint file dependency.
+ temporaryStateManager.checkpoint();
+ temporaryStateManager.close();
}
+ tasksForLocalState.put(id, new StartupState(id,
subTopology, temporaryStateManager));
Review Comment:
Yep. Excellent observation.
You are right, we can use `Set<TaskId>`` instead.
I completely removed the `StartupState` class and the process to close the
stores because we are immediately closing them.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -237,35 +252,18 @@ public void initializeStartupTasks(final TopologyMetadata
topologyMetadata,
inputPartitions
);
- final InternalProcessorContext<Object, Object> context =
new ProcessorContextImpl(
- id,
- config,
- stateManager,
- streamsMetrics,
- dummyCache
- );
-
- final Task task = new StandbyTask(
- id,
- inputPartitions,
- subTopology,
- topologyMetadata.taskConfig(id),
- streamsMetrics,
- stateManager,
- this,
- dummyCache,
- context
- );
-
+ final StartupContext initContext = new StartupContext(id,
config, temporaryStateManager, metricsImpl, cache);
try {
- task.initializeIfNeeded();
-
- tasksForLocalState.put(id, task);
- } catch (final TaskCorruptedException e) {
- // Task is corrupt - wipe it out (under EOS) and don't
initialize a Standby for it
- task.suspend();
- task.closeDirty();
+ StateManagerUtil.registerStateStores(log,
threadLogPrefix, subTopology, temporaryStateManager, this, initContext);
+ } catch (final TaskCorruptedException tce) {
+ log.warn("Failed to register startup state stores for
task {}: {}", id, tce.getMessage());
Review Comment:
Good point.
I added some comments and JavaDoc for this method.
Those exceptions are considered fatal and will break the StreamThread. We
only handle `TaskCorruptedException` by logging and continuing with the
initialization as this exception will be handled by the StreamThread later in
the first assignment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]