cadonna commented on code in PR #17209:
URL: https://github.com/apache/kafka/pull/17209#discussion_r1761355474


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -2116,4 +2126,42 @@ boolean needsInitializationOrRestoration() {
     void addTask(final Task task) {
         tasks.addTask(task);
     }
+
+    private void handleUnsuccessfulLockAcquiring(final Task task, final long 
nowMs) {
+        updateOrCreateBackoffRecord(task.id(), nowMs);
+        tasks.addPendingTasksToInit(Collections.singleton(task));
+    }
+
+    private boolean canTryLock(final TaskId taskId, final long nowMs) {
+        return !taskIdToBackoffRecord.containsKey(taskId) || 
taskIdToBackoffRecord.get(taskId).canAttempt(nowMs);
+    }
+
+    private void updateOrCreateBackoffRecord(final TaskId taskId, final long 
nowMs) {
+        if (taskIdToBackoffRecord.containsKey(taskId)) {
+            taskIdToBackoffRecord.get(taskId).recordAttempt(nowMs);
+        } else {
+            taskIdToBackoffRecord.put(taskId, new BackoffRecord(nowMs));
+        }
+    }
+
+    public static class BackoffRecord {
+        private long attempts;
+        private long lastAttemptMs;
+        private static final ExponentialBackoff EXPONENTIAL_BACKOFF = new 
ExponentialBackoff(1, 2, 10000, 0.5);

Review Comment:
   Should the exponential back-off be specified in terms of poll time? 
Something like 
   ```
   new ExponentialBackoff(pollTime, 2, 10000, 0.5);
   ```
   If it is to much trouble getting that config into the task manager, just 
choose something larger than 1ms. 1 ms sounds really small. The sequence of the 
back-offs would be 1ms, 2ms, 4ms, 8ms, 16ms, 32ms, 64, 128. At the same time, 
with default configs, the task initialization is attempted every 100ms. So, it 
seems there will not be much improvement to the current situation because the 
first 7 poll iterations you attempt to initialize the task.  



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1006,14 +1009,21 @@ private void addTasksToStateUpdater() {
     }
 
     private void addTaskToStateUpdater(final Task task) {
+        final long nowMs = System.currentTimeMillis();
         try {
-            task.initializeIfNeeded();
-            stateUpdater.add(task);
+            if (canTryLock(task.id(), nowMs)) {
+                task.initializeIfNeeded();
+                stateUpdater.add(task);
+                taskIdToBackoffRecord.remove(task.id());

Review Comment:
   Minor:
   I would swap those two lines. Once the task is initialized, the backoff can 
be removed.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1006,14 +1009,21 @@ private void addTasksToStateUpdater() {
     }
 
     private void addTaskToStateUpdater(final Task task) {
+        final long nowMs = System.currentTimeMillis();
         try {
-            task.initializeIfNeeded();
-            stateUpdater.add(task);
+            if (canTryLock(task.id(), nowMs)) {
+                task.initializeIfNeeded();
+                stateUpdater.add(task);
+                taskIdToBackoffRecord.remove(task.id());
+            } else {
+                log.trace("Task {} is still not allowed to retry acquiring the 
state directory lock", task.id());
+                handleUnsuccessfulLockAcquiring(task, nowMs);

Review Comment:
   Is this correct?
   Every time initialization is attempted before the back-off, the time of the 
last attempt is updated to the current time. If we assume an attempt every poll 
interval and the poll interval is less than the back-off time, the task will 
never be initialized.
   Assume the last unsuccessful attempt occurred at time 200 and now the 
current call to `canTryLock()` is 100ms later at time 300. Furthermore, assume 
the current back-off is 250. That is, `canTryLock()` should return `false` 
because 300 - 200 >= 250 is not `true`. The last attempt is updated to 300 and 
the backoff is exponentially updated with the increased number of attempt 
(let's say 500). If you try again in 100ms at 400 `canTryLock()` will again 
return `false`, because 400 - 300 >= 500 is still not true and it will also not 
be true next time. You should only update the back-off record if you actually 
have attempted to initialize the task and it was unsuccessful and not when you 
skipped the attempt due to the back-off.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -2116,4 +2126,42 @@ boolean needsInitializationOrRestoration() {
     void addTask(final Task task) {
         tasks.addTask(task);
     }
+
+    private void handleUnsuccessfulLockAcquiring(final Task task, final long 
nowMs) {
+        updateOrCreateBackoffRecord(task.id(), nowMs);
+        tasks.addPendingTasksToInit(Collections.singleton(task));
+    }
+
+    private boolean canTryLock(final TaskId taskId, final long nowMs) {
+        return !taskIdToBackoffRecord.containsKey(taskId) || 
taskIdToBackoffRecord.get(taskId).canAttempt(nowMs);
+    }
+
+    private void updateOrCreateBackoffRecord(final TaskId taskId, final long 
nowMs) {
+        if (taskIdToBackoffRecord.containsKey(taskId)) {
+            taskIdToBackoffRecord.get(taskId).recordAttempt(nowMs);
+        } else {
+            taskIdToBackoffRecord.put(taskId, new BackoffRecord(nowMs));
+        }
+    }
+
+    public static class BackoffRecord {
+        private long attempts;
+        private long lastAttemptMs;
+        private static final ExponentialBackoff EXPONENTIAL_BACKOFF = new 
ExponentialBackoff(1, 2, 10000, 0.5);
+
+
+        public BackoffRecord(final long nowMs) {
+            this.attempts = 1;
+            this.lastAttemptMs = nowMs;
+        }
+
+        public void recordAttempt(final long nowMs) {
+            this.attempts++;
+            this.lastAttemptMs = nowMs;
+        }
+
+        public boolean canAttempt(final long nowMs) {
+            return  nowMs - lastAttemptMs >= 
EXPONENTIAL_BACKOFF.backoff(attempts);

Review Comment:
   nit:
   ```suggestion
               return nowMs - lastAttemptMs >= 
EXPONENTIAL_BACKOFF.backoff(attempts);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to