wcarlson5 commented on code in PR #12044:
URL: https://github.com/apache/kafka/pull/12044#discussion_r849921212


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -223,15 +227,15 @@ void commitOffsetsOrTransaction(final Map<Task, 
Map<TopicPartition, OffsetAndMet
                         updateTaskCommitMetadata(allOffsets);
                     } catch (final CommitFailedException error) {
                         throw new TaskMigratedException("Consumer committing 
offsets failed, " +
-                            "indicating the corresponding thread is no longer 
part of the group", error);
+                                                            "indicating the 
corresponding thread is no longer part of the group", error);

Review Comment:
   Same here?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -76,12 +76,16 @@ int process(final int maxNumRecords, final Time time) {
         for (final Task task : tasks.activeTasks()) {
             final long now = time.milliseconds();
             try {
-                if (taskExecutionMetadata.canProcessTask(task, now)) {
+                final TaskStatus taskStatus = 
taskScheduler.getTaskStatus(task, now);
+                if (taskStatus != TaskStatus.BACKOFF) {
                     lastProcessed = task;
                     totalProcessed += processTask(task, maxNumRecords, now, 
time);
+                    if (taskStatus == TaskStatus.RETRIABLE) {
+                        taskScheduler.registerTaskSuccess(task);
+                    }
                 }
             } catch (final Throwable t) {
-                taskExecutionMetadata.registerTaskError(task, t, now);
+                taskScheduler.registerTaskError(task, now);

Review Comment:
   Do we not care what the error is here?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -117,7 +121,7 @@ private long processTask(final Task task, final int 
maxNumRecords, final long be
             );
         } catch (final TaskMigratedException e) {
             log.info("Failed to process stream task {} since it got migrated 
to another thread already. " +
-                "Will trigger a new rebalance and close all tasks as zombies 
together.", task.id());
+                         "Will trigger a new rebalance and close all tasks as 
zombies together.", task.id());

Review Comment:
   Nit: this whitespace seems off



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to