guozhangwang commented on code in PR #13523:
URL: https://github.com/apache/kafka/pull/13523#discussion_r1160970925


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -219,13 +219,13 @@ public boolean isActive() {
     }
 
     @Override
-    public void maybeRecordRestored(final Time time, final long numRecords) {
-        maybeRecordSensor(numRecords, time, restoreSensor);
-        maybeRecordSensor(-1 * numRecords, time, restoreRemainingSensor);
-    }
-
-    public void initRemainingRecordsToRestore(final Time time, final long 
numRecords) {
-        maybeRecordSensor(numRecords, time, restoreRemainingSensor);
+    public void recordRestoration(final Time time, final long numRecords, 
final boolean initRemaining) {

Review Comment:
   This is a minor refactor to make sure all sensor recordings are inherited 
from the `Task` interface so that we do not need to cast to StreamTask.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -748,7 +748,7 @@ public Set<Task> getTasks() {
     @Override
     public boolean restoresActiveTasks() {
         return !executeWithQueuesLocked(
-            () -> 
getStreamOfNonPausedTasks().filter(Task::isActive).collect(Collectors.toSet())
+            () -> 
getStreamOfTasks().filter(Task::isActive).collect(Collectors.toSet())

Review Comment:
   This is to make the case with state-updater consistent with the one without 
state-updater: when tasks are paused, they are still considered as the ones 
that need to be restoration completed, before we can transit the main thread to 
RUNNING.
   
   See below the TODO marker on the interface API for details.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -644,20 +650,19 @@ private int restoreChangelog(final Task task, final 
ChangelogMetadata changelogM
         final int numRecords = changelogMetadata.bufferedLimitIndex;
 
         if (numRecords != 0) {
-            final List<ConsumerRecord<byte[], byte[]>> records = new 
ArrayList<>(changelogMetadata.bufferedRecords.subList(0, numRecords));
+            final List<ConsumerRecord<byte[], byte[]>> records = 
changelogMetadata.bufferedRecords.subList(0, numRecords);

Review Comment:
   This is the fix 2) in the description.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -749,6 +754,11 @@ private Map<TopicPartition, Long> 
committedOffsetForChangelogs(final Map<TaskId,
         }
     }
 
+    private void filterNewPartitionsToRestore(final Map<TaskId, Task> tasks, 
final Set<ChangelogMetadata> newPartitionsToRestore) {

Review Comment:
   This is to fix the issue found before: if a task is paused, we should not 
initialize their changelogs, and hence the restore consumer would not poll 
their records, and hence we would never got a task to be restore while it's not 
in the `tasks` set.



##########
streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java:
##########
@@ -188,12 +188,11 @@ public void shouldAllowForTopologiesToStartPaused(final 
boolean stateUpdaterEnab
         kafkaStreams = buildKafkaStreams(OUTPUT_STREAM_1, stateUpdaterEnabled);
         kafkaStreams.pause();
         kafkaStreams.start();
-        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
STARTUP_TIMEOUT);
+        waitForApplicationState(singletonList(kafkaStreams), 
State.REBALANCING, STARTUP_TIMEOUT);

Review Comment:
   This is mainly for 4) in the description: when we pause the tasks, the state 
should not transit to RUNNING. This test did not fail because of the bug we are 
not fixing in line 823 of StoreChangelogReader above.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -988,6 +1010,14 @@ private void prepareChangelogs(final Map<TaskId, Task> 
tasks,
                 } catch (final Exception e) {
                     throw new StreamsException("State restore listener failed 
on batch restored", e);
                 }
+
+                final TaskId taskId = changelogMetadata.stateManager.taskId();

Review Comment:
   This is to bring back the existing code that previous hot-fix has to remove.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -482,7 +482,7 @@ public void 
shouldReturnFalseForRestoreActiveTasksIfTaskPaused() throws Exceptio
         verifyRemovedTasks();
         verifyPausedTasks(task);
 
-        assertFalse(stateUpdater.restoresActiveTasks());
+        assertTrue(stateUpdater.restoresActiveTasks());

Review Comment:
   This is related to the change making sure with and without state updater has 
consistent behavior.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -456,15 +456,21 @@ public long restore(final Map<TaskId, Task> tasks) {
                 // TODO: we always try to restore as a batch when some records 
are accumulated, which may result in
                 //       small batches; this can be optimized in the future, 
e.g. wait longer for larger batches.
                 final TaskId taskId = 
changelogs.get(partition).stateManager.taskId();
-                try {
-                    final Task task = tasks.get(taskId);
-                    final ChangelogMetadata changelogMetadata = 
changelogs.get(partition);
-                    totalRestored += restoreChangelog(task, changelogMetadata);
-                } catch (final TimeoutException timeoutException) {
-                    tasks.get(taskId).maybeInitTaskTimeoutOrThrow(
-                        time.milliseconds(),
-                        timeoutException
-                    );
+                final Task task = tasks.get(taskId);

Review Comment:
   Similarly, given the fix below in line 757, the task should never be null.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -691,7 +696,7 @@ private int restoreChangelog(final Task task, final 
ChangelogMetadata changelogM
             }
         }
 
-        if (task != null && (numRecords > 0 || 
changelogMetadata.state().equals(ChangelogState.COMPLETED))) {
+        if (numRecords > 0 || 
changelogMetadata.state().equals(ChangelogState.COMPLETED)) {

Review Comment:
   Given the fix in 757 below, the task should never be null.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -880,6 +896,9 @@ private void initializeChangelogs(final Map<TaskId, Task> 
tasks,
     }
 
     private void addChangelogsToRestoreConsumer(final Set<TopicPartition> 
partitions) {
+        if (partitions.isEmpty())

Review Comment:
   This and the line 929 below is to reduce verbose logging lines of 
KafkaConsumer when it calls `unsubscribe`: if the partitions set is empty, 
while the current assignment is also empty, we should not proceed at all.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java:
##########
@@ -266,12 +264,21 @@ public void shouldGetDroppedRecordsSensor() {
         try (final MockedStatic<StreamsMetricsImpl> streamsMetricsStaticMock = 
mockStatic(StreamsMetricsImpl.class)) {
             final Sensor sensor = TaskMetrics.droppedRecordsSensor(THREAD_ID, 
TASK_ID, streamsMetrics);
             streamsMetricsStaticMock.verify(
-                () -> StreamsMetricsImpl.addInvocationRateAndCountToSensor(
+                () -> StreamsMetricsImpl.addInvocationRateToSensor(

Review Comment:
   This is related to 3) in the description.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to