vamossagar12 commented on code in PR #16628:
URL: https://github.com/apache/kafka/pull/16628#discussion_r1683251403


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -926,6 +933,119 @@ public void testPollTimeoutExpiry() throws Exception {
         }
     }
 
+    @Test
+    public void testNoDuplicateTaskAssignmentOnWorkerPollTimeoutExpiry() 
throws Exception {
+        String statusTopic = "status-topic";
+        // This is a fabricated test to ensure that a poll timeout expiry 
happens. The tick thread awaits on
+        // task#stop method which is blocked. The timeouts have been set 
accordingly
+        workerProps.put(REBALANCE_TIMEOUT_MS_CONFIG, 
Long.toString(TimeUnit.SECONDS.toMillis(10)));
+        // This is set to a high value to ensure that all tasks can stop in 
time and also, we don't have the blocked
+        // task meddling with the rest of the test by being started midway.
+        workerProps.put(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, 
Long.toString(TimeUnit.SECONDS.toMillis(60)));
+        workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, 
Long.toString(TimeUnit.SECONDS.toMillis(5)));
+        workerProps.put(STATUS_STORAGE_TOPIC_CONFIG, statusTopic);
+        workerProps.put(STATUS_STORAGE_PARTITIONS_CONFIG, Integer.toString(1));
+        connect = connectBuilder
+            .numBrokers(1)
+            .numWorkers(1)
+            .build();
+
+        connect.start();
+        WorkerHandle leader = connect.workers().iterator().next();
+
+        Map<String, String> connectorConfig = 
defaultSourceConnectorProps("topic1");
+        connectorConfig.put(TASKS_MAX_CONFIG, "1");
+        connect.configureConnector(CONNECTOR_NAME, connectorConfig);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+            CONNECTOR_NAME, 1, "connector and tasks did not start in time"
+        );
+
+        // The task that has a blocking stop call gets scheduled on this 
worker eventually leading to a poll timeout.
+        WorkerHandle timingOutWorker = connect.addWorker();
+        connect.assertions().assertExactlyNumWorkersAreUp(2, "Workers didn't 
start in time");
+
+        Map<String, String> blockingTaskConnectorConfig = new HashMap<>();
+        blockingTaskConnectorConfig.put(CONNECTOR_CLASS_CONFIG, 
BlockingConnectorTest.BlockingSourceConnector.class.getSimpleName());
+        blockingTaskConnectorConfig.put(TASKS_MAX_CONFIG, "1");
+        
blockingTaskConnectorConfig.put(BlockingConnectorTest.Block.BLOCK_CONFIG, 
Objects.requireNonNull(TASK_STOP));
+        connect.configureConnector(CONNECTOR_NAME + "-1", 
blockingTaskConnectorConfig);
+
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+            CONNECTOR_NAME + "-1", 1, "connector and tasks did not start in 
time"
+        );
+
+        connectorConfig.put(TOPIC_CONFIG, "topic2");
+        connectorConfig.put(TASKS_MAX_CONFIG, "2");
+        connect.configureConnector(CONNECTOR_NAME + "-2", connectorConfig);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+            CONNECTOR_NAME + "-2", 2, "connector and tasks did not start in 
time"
+        );
+        // We verify this task id because it is the one which gets duplicated. 
Banking upon the assignment logic of
+        // ICR here.
+        String taskIdToVerify = new ConnectorTaskId(CONNECTOR_NAME + "-2", 
1).toString();
+
+        // Restarting the task on a separate thread to not block the test 
thread.
+        Thread restartThread = new Thread(() -> {
+            try {
+                connect.restartTask(CONNECTOR_NAME + "-1", 0);
+            } catch (Exception e) {
+                log.error("Exception while restarting task", e);
+            }
+        });
+        restartThread.start();
+
+        // rebalance.timeout.ms + scheduled.rebalance.delay + 5s buffer.
+        Thread.sleep(Duration.ofSeconds(20).toMillis());
+        List<Map<String, Object>> statuses = new ArrayList<>();
+        try (JsonConverter converter = new JsonConverter()) {
+            ConsumerRecords<byte[], byte[]> records = 
connect.kafka().consumeAll(Duration.ofSeconds(30).toMillis(), statusTopic);
+            
converter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG,
 "false"), false);
+            for (ConsumerRecord<byte[], byte[]> record : records) {
+                String key = new String(record.key());
+                if (!key.equals("status-task-" + taskIdToVerify)) {
+                    continue;
+                }
+                Object valueObject = converter.toConnectData("unused", 
record.value()).value();
+                @SuppressWarnings("unchecked")
+                Map<String, Object> value = assertAndCast(valueObject, 
Map.class, "Value");
+                statuses.add(value);
+            }
+        }
+        restartThread.interrupt();
+        String leaderWorkerId = leader.url().getHost() + ":" + 
leader.url().getPort();
+        String timedOutWorkerId = timingOutWorker.url().getHost() + ":" + 
timingOutWorker.url().getPort();
+
+        // The task goes through 3 states. RUNNING on first worker, UNASSIGNED 
on the same worker and then starting on the other
+        // worker. If we had duplicate assignments, because the worker doesn't 
revoke tasks on poll timeout expiry,
+        // we notice just 2 RUNNING statuses on 2 different workers which 
means duplicate instances. Note that in some
+        // cases, it could also mean that we couldn't write the UNASSIGNED 
status to the status topic, but the timeouts
+        // have been set to a high value and I have run the test multiple 
times to observe the same behaviour.
+        assertEquals(3, statuses.size());
+        assertEquals("RUNNING", statuses.get(0).get("state"));
+        assertEquals(timedOutWorkerId, statuses.get(0).get("worker_id"));
+
+        assertEquals("UNASSIGNED", statuses.get(1).get("state"));
+        assertEquals(timedOutWorkerId, statuses.get(1).get("worker_id"));
+
+        assertEquals("RUNNING", statuses.get(2).get("state"));
+        assertEquals(leaderWorkerId, statuses.get(2).get("worker_id"));

Review Comment:
   For reference here is the dump of the status topic with and w/o the fix.
   **With revoking assignments on poll timeout**
   ```
   [2024-07-18 11:19:20,217] INFO Record, 
key::status-connector-simple-connector, 
value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50292","generation":2}
 (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991)
   [2024-07-18 11:19:20,217] INFO Record, key::status-task-simple-connector-0, 
value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50292","generation":3}
 (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991)
   [2024-07-18 11:19:20,217] INFO Record, 
key::status-connector-simple-connector-1, 
value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50317","generation":5}
 (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991)
   [2024-07-18 11:19:20,218] INFO Record, 
key::status-task-simple-connector-1-0, 
value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50317","generation":6}
 (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991)
   [2024-07-18 11:19:20,218] INFO Record, 
key::status-connector-simple-connector-2, 
value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50292","generation":7}
 (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991)
   [2024-07-18 11:19:20,218] INFO Record, 
key::status-task-simple-connector-2-0, 
value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50292","generation":8}
 (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991)
   [2024-07-18 11:19:20,218] INFO Record, 
key::status-task-simple-connector-2-1, 
value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50317","generation":8}
 (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991)
   [2024-07-18 11:19:20,218] INFO Record, 
key::status-connector-simple-connector-1, 
value::{"state":"UNASSIGNED","trace":null,"worker_id":"localhost:50317","generation":8}
 (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991)
   [2024-07-18 11:19:20,218] INFO Record, 
key::status-task-simple-connector-2-1, 
value::{"state":"UNASSIGNED","trace":null,"worker_id":"localhost:50317","generation":8}
 (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991)
   [2024-07-18 11:19:20,219] INFO Record, 
key::status-connector-simple-connector-1, 
value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50292","generation":10}
 (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991)
   [2024-07-18 11:19:20,219] INFO Record, 
key::status-task-simple-connector-1-0, 
value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50292","generation":10}
 (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991)
   [2024-07-18 11:19:20,219] INFO Record, 
key::status-task-simple-connector-2-1, 
value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50292","generation":10}
 (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991)
   ```
   
   **Without revoking assignments**
   ```
   [2024-07-18 11:23:07,574] INFO Record, 
key::status-connector-simple-connector, 
value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50384","generation":2}
 (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991)
   [2024-07-18 11:23:07,574] INFO Record, key::status-task-simple-connector-0, 
value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50384","generation":3}
 (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991)
   [2024-07-18 11:23:07,574] INFO Record, 
key::status-connector-simple-connector-1, 
value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50409","generation":5}
 (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991)
   [2024-07-18 11:23:07,574] INFO Record, 
key::status-task-simple-connector-1-0, 
value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50409","generation":6}
 (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991)
   [2024-07-18 11:23:07,574] INFO Record, 
key::status-connector-simple-connector-2, 
value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50384","generation":7}
 (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991)
   [2024-07-18 11:23:07,575] INFO Record, 
key::status-task-simple-connector-2-1, 
value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50409","generation":8}
 (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991)
   [2024-07-18 11:23:07,575] INFO Record, 
key::status-task-simple-connector-2-0, 
value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50384","generation":8}
 (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991)
   [2024-07-18 11:23:07,575] INFO Record, 
key::status-connector-simple-connector-1, 
value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50384","generation":10}
 (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991)
   [2024-07-18 11:23:07,575] INFO Record, 
key::status-task-simple-connector-1-0, 
value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50384","generation":10}
 (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991)
   [2024-07-18 11:23:07,575] INFO Record, 
key::status-task-simple-connector-2-1, 
value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50384","generation":10}
 (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to