C0urante commented on code in PR #16122:
URL: https://github.com/apache/kafka/pull/16122#discussion_r1622584795


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -1123,6 +1134,137 @@ public void testTasksMaxEnforcement() throws Exception {
         );
     }
 
+    @Test
+    public void testKafka16838() throws Exception {
+        brokerProps.put("log.cleaner.backoff.ms", "100");
+        brokerProps.put("log.cleaner.delete.retention.ms", "1");
+        brokerProps.put("log.cleaner.max.compaction.lag.ms", "1");
+        brokerProps.put("log.cleaner.min.cleanable.ratio", "0");
+        brokerProps.put("log.cleaner.min.compaction.lag.ms", "1");
+        brokerProps.put("log.cleaner.threads", "1");
+
+        final String configTopic = "kafka-16838-configs";
+        final int offsetCommitIntervalMs = 100;
+        workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
+        workerProps.put(CONFIG_STORAGE_PREFIX + SEGMENT_MS_CONFIG, "100");
+        workerProps.put(CONFIG_STORAGE_PREFIX + DELETE_RETENTION_MS_CONFIG, 
"1");
+        workerProps.put(CONFIG_PROVIDERS_CONFIG, "file");
+        workerProps.put(CONFIG_PROVIDERS_CONFIG + ".file.class", 
FileConfigProvider.class.getName());
+        workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
Integer.toString(offsetCommitIntervalMs));
+
+        final int numWorkers = 1;
+        connect = connectBuilder
+                .numWorkers(numWorkers)
+                .build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(
+                numWorkers,
+                "Initial group of workers did not start in time."
+        );
+
+        final String connectorTopic = "connector-topic";
+        connect.kafka().createTopic(connectorTopic, 1);
+
+        final File secretsFile = tmp.newFile("test-secrets");
+        final Properties secrets = new Properties();
+        final String throughputSecretKey = "secret-throughput";
+        secrets.put(throughputSecretKey, "10");
+        try (FileOutputStream secretsOutputStream = new 
FileOutputStream(secretsFile)) {
+            secrets.store(secretsOutputStream, null);
+        }
+
+        ConnectorHandle connectorHandle = 
RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
+        connectorHandle.expectedCommits(NUM_TASKS * 2);
+
+        Map<String, String> connectorConfig = 
defaultSourceConnectorProps(connectorTopic);
+        connectorConfig.put(
+                "throughput",
+                "${file:" + secretsFile.getAbsolutePath() + ":" + 
throughputSecretKey + "}"
+        );
+        connect.configureConnector(CONNECTOR_NAME, connectorConfig);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                NUM_TASKS,
+                "Connector or its tasks did not start in time"
+        );
+        connectorHandle.awaitCommits(offsetCommitIntervalMs * 3);
+
+        connect.deleteConnector(CONNECTOR_NAME);
+
+        // Roll the entire cluster
+        connect.activeWorkers().forEach(connect::removeWorker);
+
+        // Miserable hack: produce directly to the config topic in order to 
trigger segment rollover

Review Comment:
   Yeah, agreed. We'll have to watch this carefully on Jenkins to make sure it 
doesn't lead to an increase in flakiness.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to