guozhangwang commented on a change in pull request #8248: URL: https://github.com/apache/kafka/pull/8248#discussion_r429694775
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ########## @@ -109,7 +94,7 @@ public void logChange(final String storeName, final long timestamp) { throwUnsupportedOperationExceptionIfStandby("logChange"); // Sending null headers to changelog topics (KIP-244) - collector.send( + streamTask.recordCollector().send( Review comment: maybe we can refactor the interfaces to `transitToActive` and `transitToStandby` such that: * transitToActive: takes the streamTask (in the future I'd suggest we just pass in the record-collector after removing schedule and refactored request-commit) and the cache as parameters, check the current state manager's taskType is active. * transitToStandby: takes no parameter, reset streamTask and cache, check the current state manager's taskType is active. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ########## @@ -189,38 +190,52 @@ private void prepareClose(final boolean clean) { @Override public void closeClean(final Map<TopicPartition, Long> checkpoint) { Objects.requireNonNull(checkpoint); - close(true); + close(true, false); log.info("Closed clean"); } @Override public void closeDirty() { - close(false); + close(false, false); log.info("Closed dirty"); } - private void close(final boolean clean) { + @Override + public void closeAndRecycleState() { + prepareClose(true); + close(true, true); Review comment: nit: I'd suggest we have a separate `recycle()` function rather than piggy-backing on `close` with additional flag, since the `clean` is always true, i.e. we would always just 1) write checkpoints, 2) call stateMgr.recycle(), 3) then transit to CLOSED and if it is not in CREATED / RUNNING then do nothing. Though it has some duplicated lines but it is a bit straight-forward. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -482,7 +488,8 @@ public void closeDirty() { * </pre> */ private void close(final boolean clean, - final Map<TopicPartition, Long> checkpoint) { + final Map<TopicPartition, Long> checkpoint, Review comment: Ditto here, I'd suggest add `cycle` as a separate function. ########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java ########## @@ -46,7 +46,6 @@ private final SessionKeySchema keySchema; private final SegmentedCacheFunction cacheFunction; private String cacheName; - private ThreadCache cache; Review comment: nit: I feel it is not necessary to remove the extra reference on the cache and then call `context.cache()` every time we need it, but I think this is really a very very nit point so your call. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ########## @@ -42,48 +41,36 @@ import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore; public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier { - // The below are both null for standby tasks - private final StreamTask streamTask; - private final RecordCollector collector; + private StreamTask streamTask; // always null for standby tasks private final ToInternal toInternal = new ToInternal(); private final static To SEND_TO_ALL = To.all(); final Map<String, String> storeToChangelogTopic = new HashMap<>(); - ProcessorContextImpl(final TaskId id, - final StreamTask streamTask, - final StreamsConfig config, - final RecordCollector collector, - final ProcessorStateManager stateMgr, - final StreamsMetricsImpl metrics, - final ThreadCache cache) { + public ProcessorContextImpl(final TaskId id, + final StreamsConfig config, + final ProcessorStateManager stateMgr, + final StreamsMetricsImpl metrics, + final ThreadCache cache) { super(id, config, metrics, stateMgr, cache); - this.streamTask = streamTask; - this.collector = collector; + } - if (streamTask == null && taskType() == TaskType.ACTIVE) { - throw new IllegalStateException("Tried to create context for active task but the streamtask was null"); - } + @Override + public void transitionTaskType(final TaskType newType, + final ThreadCache cache) { + this.cache = cache; Review comment: newType is not needed since it is overridden via state manager? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ########## @@ -42,48 +41,36 @@ import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore; public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier { - // The below are both null for standby tasks - private final StreamTask streamTask; - private final RecordCollector collector; + private StreamTask streamTask; // always null for standby tasks private final ToInternal toInternal = new ToInternal(); private final static To SEND_TO_ALL = To.all(); final Map<String, String> storeToChangelogTopic = new HashMap<>(); - ProcessorContextImpl(final TaskId id, - final StreamTask streamTask, - final StreamsConfig config, - final RecordCollector collector, - final ProcessorStateManager stateMgr, - final StreamsMetricsImpl metrics, - final ThreadCache cache) { + public ProcessorContextImpl(final TaskId id, + final StreamsConfig config, + final ProcessorStateManager stateMgr, + final StreamsMetricsImpl metrics, + final ThreadCache cache) { super(id, config, metrics, stateMgr, cache); - this.streamTask = streamTask; - this.collector = collector; + } - if (streamTask == null && taskType() == TaskType.ACTIVE) { - throw new IllegalStateException("Tried to create context for active task but the streamtask was null"); - } + @Override + public void transitionTaskType(final TaskType newType, + final ThreadCache cache) { + this.cache = cache; Review comment: Thinking about this a bit more, seems we do not need `transition` but just a `registerCache`? Could this be consolidated with the function below? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ########## @@ -42,48 +41,36 @@ import static org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator.getReadWriteStore; public class ProcessorContextImpl extends AbstractProcessorContext implements RecordCollector.Supplier { - // The below are both null for standby tasks - private final StreamTask streamTask; - private final RecordCollector collector; + private StreamTask streamTask; // always null for standby tasks private final ToInternal toInternal = new ToInternal(); private final static To SEND_TO_ALL = To.all(); final Map<String, String> storeToChangelogTopic = new HashMap<>(); - ProcessorContextImpl(final TaskId id, - final StreamTask streamTask, - final StreamsConfig config, - final RecordCollector collector, - final ProcessorStateManager stateMgr, - final StreamsMetricsImpl metrics, - final ThreadCache cache) { + public ProcessorContextImpl(final TaskId id, + final StreamsConfig config, + final ProcessorStateManager stateMgr, + final StreamsMetricsImpl metrics, + final ThreadCache cache) { super(id, config, metrics, stateMgr, cache); - this.streamTask = streamTask; - this.collector = collector; + } - if (streamTask == null && taskType() == TaskType.ACTIVE) { - throw new IllegalStateException("Tried to create context for active task but the streamtask was null"); - } + @Override + public void transitionTaskType(final TaskType newType, + final ThreadCache cache) { + this.cache = cache; + streamTask = null; } - ProcessorContextImpl(final TaskId id, - final StreamsConfig config, - final ProcessorStateManager stateMgr, - final StreamsMetricsImpl metrics) { - this( - id, - null, - config, - null, - stateMgr, - metrics, - new ThreadCache( - new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())), - 0, - metrics - ) - ); + @Override + public void registerNewTask(final Task task) { Review comment: Not a comment for this PR: since in 2.6+ we always try to commit all tasks when user request commits via context, we do not need to maintain the flag per-task, but per-thread. And then when we removed the deprecated schedule function we can remove the `stream-task` reference inside. Can we add a TODO marker to do that in the future? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ########## @@ -425,12 +449,14 @@ public void flush() { /** * {@link StateStore#close() Close} all stores (even in case of failure). - * Log all exception and re-throw the first exception that did occur at the end. + * Log all exceptions and re-throw the first exception that occurred at the end. * * @throws ProcessorStateException if any error happens when closing the state stores */ @Override public void close() throws ProcessorStateException { + unregisterAllStoresWithChangelogReader(); + Review comment: nit: I think now it's better to move the debug line 464 before `unregisterAllStoresWithChangelogReader`. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java ########## @@ -112,6 +103,7 @@ static void closeStateManager(final Logger log, final AtomicReference<ProcessorStateException> firstException = new AtomicReference<>(null); try { if (stateDirectory.lock(id)) { + Review comment: Are these new lines intentional? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -272,6 +274,36 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks, } } + if (taskCloseExceptions.isEmpty()) { + Task oldTask = null; + final Iterator<Task> transitioningTasksIter = tasksToRecycle.iterator(); + try { + while (transitioningTasksIter.hasNext()) { + oldTask = transitioningTasksIter.next(); + final Task newTask; + if (oldTask.isActive()) { + final Set<TopicPartition> partitions = standbyTasksToCreate.remove(oldTask.id()); + newTask = standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, partitions); + } else { + final Set<TopicPartition> partitions = activeTasksToCreate.remove(oldTask.id()); + newTask = activeTaskCreator.createActiveTaskFromStandby((StandbyTask) oldTask, partitions, mainConsumer); + } + tasks.remove(oldTask.id()); + addNewTask(newTask); + transitioningTasksIter.remove(); + } + } catch (final RuntimeException e) { + final String uncleanMessage = String.format("Failed to recycle task %s cleanly. Attempting to close remaining tasks before re-throwing:", oldTask.id()); + log.error(uncleanMessage, e); + taskCloseExceptions.put(oldTask.id(), e); + + dirtyTasks.addAll(tasksToRecycle); // contains the tasks we have not yet tried to transition + dirtyTasks.addAll(tasks.values()); // contains the new tasks we just created Review comment: This will add all current tasks, is that right? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org