wcarlson5 commented on code in PR #12044:
URL: https://github.com/apache/kafka/pull/12044#discussion_r849921212
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -223,15 +227,15 @@ void commitOffsetsOrTransaction(final Map<Task,
Map<TopicPartition, OffsetAndMet
updateTaskCommitMetadata(allOffsets);
} catch (final CommitFailedException error) {
throw new TaskMigratedException("Consumer committing
offsets failed, " +
- "indicating the corresponding thread is no longer
part of the group", error);
+ "indicating the
corresponding thread is no longer part of the group", error);
Review Comment:
Same here?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -76,12 +76,16 @@ int process(final int maxNumRecords, final Time time) {
for (final Task task : tasks.activeTasks()) {
final long now = time.milliseconds();
try {
- if (taskExecutionMetadata.canProcessTask(task, now)) {
+ final TaskStatus taskStatus =
taskScheduler.getTaskStatus(task, now);
+ if (taskStatus != TaskStatus.BACKOFF) {
lastProcessed = task;
totalProcessed += processTask(task, maxNumRecords, now,
time);
+ if (taskStatus == TaskStatus.RETRIABLE) {
+ taskScheduler.registerTaskSuccess(task);
+ }
}
} catch (final Throwable t) {
- taskExecutionMetadata.registerTaskError(task, t, now);
+ taskScheduler.registerTaskError(task, now);
Review Comment:
Do we not care what the error is here?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java:
##########
@@ -117,7 +121,7 @@ private long processTask(final Task task, final int
maxNumRecords, final long be
);
} catch (final TaskMigratedException e) {
log.info("Failed to process stream task {} since it got migrated
to another thread already. " +
- "Will trigger a new rebalance and close all tasks as zombies
together.", task.id());
+ "Will trigger a new rebalance and close all tasks as
zombies together.", task.id());
Review Comment:
Nit: this whitespace seems off
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]