guozhangwang commented on code in PR #13523: URL: https://github.com/apache/kafka/pull/13523#discussion_r1160970925
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java: ########## @@ -219,13 +219,13 @@ public boolean isActive() { } @Override - public void maybeRecordRestored(final Time time, final long numRecords) { - maybeRecordSensor(numRecords, time, restoreSensor); - maybeRecordSensor(-1 * numRecords, time, restoreRemainingSensor); - } - - public void initRemainingRecordsToRestore(final Time time, final long numRecords) { - maybeRecordSensor(numRecords, time, restoreRemainingSensor); + public void recordRestoration(final Time time, final long numRecords, final boolean initRemaining) { Review Comment: This is a minor refactor to make sure all sensor recordings are inherited from the `Task` interface so that we do not need to cast to StreamTask. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ########## @@ -748,7 +748,7 @@ public Set<Task> getTasks() { @Override public boolean restoresActiveTasks() { return !executeWithQueuesLocked( - () -> getStreamOfNonPausedTasks().filter(Task::isActive).collect(Collectors.toSet()) + () -> getStreamOfTasks().filter(Task::isActive).collect(Collectors.toSet()) Review Comment: This is to make the case with state-updater consistent with the one without state-updater: when tasks are paused, they are still considered as the ones that need to be restoration completed, before we can transit the main thread to RUNNING. See below the TODO marker on the interface API for details. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -644,20 +650,19 @@ private int restoreChangelog(final Task task, final ChangelogMetadata changelogM final int numRecords = changelogMetadata.bufferedLimitIndex; if (numRecords != 0) { - final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>(changelogMetadata.bufferedRecords.subList(0, numRecords)); + final List<ConsumerRecord<byte[], byte[]>> records = changelogMetadata.bufferedRecords.subList(0, numRecords); Review Comment: This is the fix 2) in the description. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -749,6 +754,11 @@ private Map<TopicPartition, Long> committedOffsetForChangelogs(final Map<TaskId, } } + private void filterNewPartitionsToRestore(final Map<TaskId, Task> tasks, final Set<ChangelogMetadata> newPartitionsToRestore) { Review Comment: This is to fix the issue found before: if a task is paused, we should not initialize their changelogs, and hence the restore consumer would not poll their records, and hence we would never got a task to be restore while it's not in the `tasks` set. ########## streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java: ########## @@ -188,12 +188,11 @@ public void shouldAllowForTopologiesToStartPaused(final boolean stateUpdaterEnab kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled); kafkaStreams.pause(); kafkaStreams.start(); - waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, STARTUP_TIMEOUT); + waitForApplicationState(singletonList(kafkaStreams), State.REBALANCING, STARTUP_TIMEOUT); Review Comment: This is mainly for 4) in the description: when we pause the tasks, the state should not transit to RUNNING. This test did not fail because of the bug we are not fixing in line 823 of StoreChangelogReader above. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -988,6 +1010,14 @@ private void prepareChangelogs(final Map<TaskId, Task> tasks, } catch (final Exception e) { throw new StreamsException("State restore listener failed on batch restored", e); } + + final TaskId taskId = changelogMetadata.stateManager.taskId(); Review Comment: This is to bring back the existing code that previous hot-fix has to remove. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ########## @@ -482,7 +482,7 @@ public void shouldReturnFalseForRestoreActiveTasksIfTaskPaused() throws Exceptio verifyRemovedTasks(); verifyPausedTasks(task); - assertFalse(stateUpdater.restoresActiveTasks()); + assertTrue(stateUpdater.restoresActiveTasks()); Review Comment: This is related to the change making sure with and without state updater has consistent behavior. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -456,15 +456,21 @@ public long restore(final Map<TaskId, Task> tasks) { // TODO: we always try to restore as a batch when some records are accumulated, which may result in // small batches; this can be optimized in the future, e.g. wait longer for larger batches. final TaskId taskId = changelogs.get(partition).stateManager.taskId(); - try { - final Task task = tasks.get(taskId); - final ChangelogMetadata changelogMetadata = changelogs.get(partition); - totalRestored += restoreChangelog(task, changelogMetadata); - } catch (final TimeoutException timeoutException) { - tasks.get(taskId).maybeInitTaskTimeoutOrThrow( - time.milliseconds(), - timeoutException - ); + final Task task = tasks.get(taskId); Review Comment: Similarly, given the fix below in line 757, the task should never be null. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -691,7 +696,7 @@ private int restoreChangelog(final Task task, final ChangelogMetadata changelogM } } - if (task != null && (numRecords > 0 || changelogMetadata.state().equals(ChangelogState.COMPLETED))) { + if (numRecords > 0 || changelogMetadata.state().equals(ChangelogState.COMPLETED)) { Review Comment: Given the fix in 757 below, the task should never be null. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ########## @@ -880,6 +896,9 @@ private void initializeChangelogs(final Map<TaskId, Task> tasks, } private void addChangelogsToRestoreConsumer(final Set<TopicPartition> partitions) { + if (partitions.isEmpty()) Review Comment: This and the line 929 below is to reduce verbose logging lines of KafkaConsumer when it calls `unsubscribe`: if the partitions set is empty, while the current assignment is also empty, we should not proceed at all. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java: ########## @@ -266,12 +264,21 @@ public void shouldGetDroppedRecordsSensor() { try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = mockStatic(StreamsMetricsImpl.class)) { final Sensor sensor = TaskMetrics.droppedRecordsSensor(THREAD_ID, TASK_ID, streamsMetrics); streamsMetricsStaticMock.verify( - () -> StreamsMetricsImpl.addInvocationRateAndCountToSensor( + () -> StreamsMetricsImpl.addInvocationRateToSensor( Review Comment: This is related to 3) in the description. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org