vamossagar12 commented on code in PR #16628: URL: https://github.com/apache/kafka/pull/16628#discussion_r1683251403
########## connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java: ########## @@ -926,6 +933,119 @@ public void testPollTimeoutExpiry() throws Exception { } } + @Test + public void testNoDuplicateTaskAssignmentOnWorkerPollTimeoutExpiry() throws Exception { + String statusTopic = "status-topic"; + // This is a fabricated test to ensure that a poll timeout expiry happens. The tick thread awaits on + // task#stop method which is blocked. The timeouts have been set accordingly + workerProps.put(REBALANCE_TIMEOUT_MS_CONFIG, Long.toString(TimeUnit.SECONDS.toMillis(10))); + // This is set to a high value to ensure that all tasks can stop in time and also, we don't have the blocked + // task meddling with the rest of the test by being started midway. + workerProps.put(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Long.toString(TimeUnit.SECONDS.toMillis(60))); + workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, Long.toString(TimeUnit.SECONDS.toMillis(5))); + workerProps.put(STATUS_STORAGE_TOPIC_CONFIG, statusTopic); + workerProps.put(STATUS_STORAGE_PARTITIONS_CONFIG, Integer.toString(1)); + connect = connectBuilder + .numBrokers(1) + .numWorkers(1) + .build(); + + connect.start(); + WorkerHandle leader = connect.workers().iterator().next(); + + Map<String, String> connectorConfig = defaultSourceConnectorProps("topic1"); + connectorConfig.put(TASKS_MAX_CONFIG, "1"); + connect.configureConnector(CONNECTOR_NAME, connectorConfig); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, 1, "connector and tasks did not start in time" + ); + + // The task that has a blocking stop call gets scheduled on this worker eventually leading to a poll timeout. + WorkerHandle timingOutWorker = connect.addWorker(); + connect.assertions().assertExactlyNumWorkersAreUp(2, "Workers didn't start in time"); + + Map<String, String> blockingTaskConnectorConfig = new HashMap<>(); + blockingTaskConnectorConfig.put(CONNECTOR_CLASS_CONFIG, BlockingConnectorTest.BlockingSourceConnector.class.getSimpleName()); + blockingTaskConnectorConfig.put(TASKS_MAX_CONFIG, "1"); + blockingTaskConnectorConfig.put(BlockingConnectorTest.Block.BLOCK_CONFIG, Objects.requireNonNull(TASK_STOP)); + connect.configureConnector(CONNECTOR_NAME + "-1", blockingTaskConnectorConfig); + + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME + "-1", 1, "connector and tasks did not start in time" + ); + + connectorConfig.put(TOPIC_CONFIG, "topic2"); + connectorConfig.put(TASKS_MAX_CONFIG, "2"); + connect.configureConnector(CONNECTOR_NAME + "-2", connectorConfig); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME + "-2", 2, "connector and tasks did not start in time" + ); + // We verify this task id because it is the one which gets duplicated. Banking upon the assignment logic of + // ICR here. + String taskIdToVerify = new ConnectorTaskId(CONNECTOR_NAME + "-2", 1).toString(); + + // Restarting the task on a separate thread to not block the test thread. + Thread restartThread = new Thread(() -> { + try { + connect.restartTask(CONNECTOR_NAME + "-1", 0); + } catch (Exception e) { + log.error("Exception while restarting task", e); + } + }); + restartThread.start(); + + // rebalance.timeout.ms + scheduled.rebalance.delay + 5s buffer. + Thread.sleep(Duration.ofSeconds(20).toMillis()); + List<Map<String, Object>> statuses = new ArrayList<>(); + try (JsonConverter converter = new JsonConverter()) { + ConsumerRecords<byte[], byte[]> records = connect.kafka().consumeAll(Duration.ofSeconds(30).toMillis(), statusTopic); + converter.configure(Collections.singletonMap(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false"), false); + for (ConsumerRecord<byte[], byte[]> record : records) { + String key = new String(record.key()); + if (!key.equals("status-task-" + taskIdToVerify)) { + continue; + } + Object valueObject = converter.toConnectData("unused", record.value()).value(); + @SuppressWarnings("unchecked") + Map<String, Object> value = assertAndCast(valueObject, Map.class, "Value"); + statuses.add(value); + } + } + restartThread.interrupt(); + String leaderWorkerId = leader.url().getHost() + ":" + leader.url().getPort(); + String timedOutWorkerId = timingOutWorker.url().getHost() + ":" + timingOutWorker.url().getPort(); + + // The task goes through 3 states. RUNNING on first worker, UNASSIGNED on the same worker and then starting on the other + // worker. If we had duplicate assignments, because the worker doesn't revoke tasks on poll timeout expiry, + // we notice just 2 RUNNING statuses on 2 different workers which means duplicate instances. Note that in some + // cases, it could also mean that we couldn't write the UNASSIGNED status to the status topic, but the timeouts + // have been set to a high value and I have run the test multiple times to observe the same behaviour. + assertEquals(3, statuses.size()); + assertEquals("RUNNING", statuses.get(0).get("state")); + assertEquals(timedOutWorkerId, statuses.get(0).get("worker_id")); + + assertEquals("UNASSIGNED", statuses.get(1).get("state")); + assertEquals(timedOutWorkerId, statuses.get(1).get("worker_id")); + + assertEquals("RUNNING", statuses.get(2).get("state")); + assertEquals(leaderWorkerId, statuses.get(2).get("worker_id")); Review Comment: For reference here is the dump of the status topic with and w/o the fix. **With revoking assignments on poll timeout** ``` [2024-07-18 11:19:20,217] INFO Record, key::status-connector-simple-connector, value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50292","generation":2} (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991) [2024-07-18 11:19:20,217] INFO Record, key::status-task-simple-connector-0, value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50292","generation":3} (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991) [2024-07-18 11:19:20,217] INFO Record, key::status-connector-simple-connector-1, value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50317","generation":5} (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991) [2024-07-18 11:19:20,218] INFO Record, key::status-task-simple-connector-1-0, value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50317","generation":6} (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991) [2024-07-18 11:19:20,218] INFO Record, key::status-connector-simple-connector-2, value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50292","generation":7} (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991) [2024-07-18 11:19:20,218] INFO Record, key::status-task-simple-connector-2-0, value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50292","generation":8} (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991) [2024-07-18 11:19:20,218] INFO Record, key::status-task-simple-connector-2-1, value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50317","generation":8} (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991) [2024-07-18 11:19:20,218] INFO Record, key::status-connector-simple-connector-1, value::{"state":"UNASSIGNED","trace":null,"worker_id":"localhost:50317","generation":8} (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991) [2024-07-18 11:19:20,218] INFO Record, key::status-task-simple-connector-2-1, value::{"state":"UNASSIGNED","trace":null,"worker_id":"localhost:50317","generation":8} (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991) [2024-07-18 11:19:20,219] INFO Record, key::status-connector-simple-connector-1, value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50292","generation":10} (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991) [2024-07-18 11:19:20,219] INFO Record, key::status-task-simple-connector-1-0, value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50292","generation":10} (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991) [2024-07-18 11:19:20,219] INFO Record, key::status-task-simple-connector-2-1, value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50292","generation":10} (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991) ``` **Without revoking assignments** ``` [2024-07-18 11:23:07,574] INFO Record, key::status-connector-simple-connector, value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50384","generation":2} (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991) [2024-07-18 11:23:07,574] INFO Record, key::status-task-simple-connector-0, value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50384","generation":3} (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991) [2024-07-18 11:23:07,574] INFO Record, key::status-connector-simple-connector-1, value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50409","generation":5} (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991) [2024-07-18 11:23:07,574] INFO Record, key::status-task-simple-connector-1-0, value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50409","generation":6} (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991) [2024-07-18 11:23:07,574] INFO Record, key::status-connector-simple-connector-2, value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50384","generation":7} (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991) [2024-07-18 11:23:07,575] INFO Record, key::status-task-simple-connector-2-1, value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50409","generation":8} (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991) [2024-07-18 11:23:07,575] INFO Record, key::status-task-simple-connector-2-0, value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50384","generation":8} (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991) [2024-07-18 11:23:07,575] INFO Record, key::status-connector-simple-connector-1, value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50384","generation":10} (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991) [2024-07-18 11:23:07,575] INFO Record, key::status-task-simple-connector-1-0, value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50384","generation":10} (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991) [2024-07-18 11:23:07,575] INFO Record, key::status-task-simple-connector-2-1, value::{"state":"RUNNING","trace":null,"worker_id":"localhost:50384","generation":10} (org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest:991) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org