ableegoldman commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434051190



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -247,6 +247,9 @@ private void close(final boolean clean) {
                 "state manager close",
                 log
             );
+        } else if (state() == State.CLOSED) {

Review comment:
       Should we switch to `switch` here as well? 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -218,6 +218,10 @@ public void initializeIfNeeded() {
      */
     @Override
     public void completeRestoration() {
+        if (state() == State.RUNNING) {
+            return;
+        }
+

Review comment:
       Can we use if/ else if here for consistency?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -466,6 +510,11 @@ public void closeAndRecycleState() {
                 stateMgr.recycle();
                 recordCollector.close();
                 break;
+
+            case CLOSED:
+                log.trace("Skip close since state is {}", state());

Review comment:
       I think this might be one of those exceptions where we should still 
enforce that the state is not `CLOSED` (ie throw `IllegalStateException`) since 
there are related actions that occur outside of the Task implementation that 
will fail if we try to recycle a CLOSED task. Similar to prepare/post commit, 
resume, etc

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -390,19 +387,17 @@ boolean tryToCompleteRestoration() {
 
         final List<Task> restoringTasks = new LinkedList<>();
         for (final Task task : tasks.values()) {
-            if (task.state() == CREATED) {
-                try {
-                    task.initializeIfNeeded();
-                } catch (final LockException | TimeoutException e) {
-                    // it is possible that if there are multiple threads 
within the instance that one thread
-                    // trying to grab the task from the other, while the other 
has not released the lock since
-                    // it did not participate in the rebalance. In this case 
we can just retry in the next iteration
-                    log.debug("Could not initialize {} due to the following 
exception; will retry", task.id(), e);
-                    allRunning = false;
-                }
+            try {
+                task.initializeIfNeeded();
+            } catch (final LockException | TimeoutException e) {
+                // it is possible that if there are multiple threads within 
the instance that one thread
+                // trying to grab the task from the other, while the other has 
not released the lock since
+                // it did not participate in the rebalance. In this case we 
can just retry in the next iteration
+                log.debug("Could not initialize {} due to the following 
exception; will retry", task.id(), e);
+                allRunning = false;
             }
 
-            if (task.state() == RESTORING) {
+            if (task.isActive()) {

Review comment:
       Can we add a comment or rename `restoringTasks` to clarify that it's ok 
to put an active-but-not-restoring task in here since 
`Task#completeRestoration` is idempotent/no-op for RUNNING tasks?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -218,6 +218,10 @@ public void initializeIfNeeded() {
      */
     @Override
     public void completeRestoration() {
+        if (state() == State.RUNNING) {
+            return;
+        }
+
         if (state() == State.RESTORING) {
             initializeMetadata();
             initializeTopology();

Review comment:
       github won't let me leave a comment below this line, but can we use the 
`"Illegal state"`/`"Unknown state"` improvement in this method as well?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##########
@@ -247,6 +247,9 @@ private void close(final boolean clean) {
                 "state manager close",
                 log
             );
+        } else if (state() == State.CLOSED) {
+            log.trace("Skip closing since state is {}", state());
+            return;
         } else {
             throw new IllegalStateException("Illegal state " + state() + " 
while closing standby task " + id);

Review comment:
       `Illegal state` -> `Unknown state`? 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final 
Map<TopicPartition, Lo
         for (final Map.Entry<TopicPartition, Long> changelogEntry : 
changelogOffsets.entrySet()) {
             final long offset = changelogEntry.getValue();
 
-            offsetSum += offset;
-            if (offsetSum < 0) {
-                log.warn("Sum of changelog offsets for task {} overflowed, 
pinning to Long.MAX_VALUE", id);
-                return Long.MAX_VALUE;
+            if (offset == Task.LATEST_OFFSET) {

Review comment:
       nice




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to