mimaison commented on code in PR #16122:
URL: https://github.com/apache/kafka/pull/16122#discussion_r1622425065


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java:
##########
@@ -1116,6 +1118,30 @@ public void testConnectorOffsets() throws Exception {
         assertEquals(offsets, cb.get(1000, TimeUnit.MILLISECONDS));
     }
 
+    @Test
+    public void testTaskConfigComparison() {
+        ClusterConfigState snapshot = mock(ClusterConfigState.class);
+
+        when(snapshot.taskCount(CONN1)).thenReturn(TASK_CONFIGS.size());
+        TASK_CONFIGS_MAP.forEach((task, config) -> 
when(snapshot.rawTaskConfig(task)).thenReturn(config));
+        // Same task configs, same number of tasks--no change
+        assertFalse(AbstractHerder.taskConfigsChanged(snapshot, CONN1, 
TASK_CONFIGS));
+
+        when(snapshot.taskCount(CONN1)).thenReturn(TASK_CONFIGS.size() + 1);
+        // Different number of tasks; should report a change
+        assertTrue(AbstractHerder.taskConfigsChanged(snapshot, CONN1, 
TASK_CONFIGS));
+
+        when(snapshot.taskCount(CONN1)).thenReturn(TASK_CONFIG.size());
+        List<Map<String, String>> alteredTaskConfigs = new 
ArrayList<>(TASK_CONFIGS);
+        alteredTaskConfigs.set(alteredTaskConfigs.size() - 1, 
Collections.emptyMap());
+        // Last task config is different; should report a change
+        assertTrue(AbstractHerder.taskConfigsChanged(snapshot, CONN1, 
alteredTaskConfigs));
+
+        // Make sure we used exclusively raw task configs and never attempted 
transformation
+        // See KAFKA-16837

Review Comment:
   We tend to not reference Jiras and prefer providing details, if needed, 
directly.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -1123,6 +1134,137 @@ public void testTasksMaxEnforcement() throws Exception {
         );
     }
 
+    @Test
+    public void testKafka16838() throws Exception {

Review Comment:
   Can we find a name that describes what this is testing instead of referring 
to the Jira ID?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -1123,6 +1134,137 @@ public void testTasksMaxEnforcement() throws Exception {
         );
     }
 
+    @Test
+    public void testKafka16838() throws Exception {
+        brokerProps.put("log.cleaner.backoff.ms", "100");
+        brokerProps.put("log.cleaner.delete.retention.ms", "1");
+        brokerProps.put("log.cleaner.max.compaction.lag.ms", "1");
+        brokerProps.put("log.cleaner.min.cleanable.ratio", "0");
+        brokerProps.put("log.cleaner.min.compaction.lag.ms", "1");
+        brokerProps.put("log.cleaner.threads", "1");
+
+        final String configTopic = "kafka-16838-configs";
+        final int offsetCommitIntervalMs = 100;
+        workerProps.put(CONFIG_TOPIC_CONFIG, configTopic);
+        workerProps.put(CONFIG_STORAGE_PREFIX + SEGMENT_MS_CONFIG, "100");
+        workerProps.put(CONFIG_STORAGE_PREFIX + DELETE_RETENTION_MS_CONFIG, 
"1");
+        workerProps.put(CONFIG_PROVIDERS_CONFIG, "file");
+        workerProps.put(CONFIG_PROVIDERS_CONFIG + ".file.class", 
FileConfigProvider.class.getName());
+        workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
Integer.toString(offsetCommitIntervalMs));
+
+        final int numWorkers = 1;
+        connect = connectBuilder
+                .numWorkers(numWorkers)
+                .build();
+        // start the clusters
+        connect.start();
+
+        connect.assertions().assertAtLeastNumWorkersAreUp(
+                numWorkers,
+                "Initial group of workers did not start in time."
+        );
+
+        final String connectorTopic = "connector-topic";
+        connect.kafka().createTopic(connectorTopic, 1);
+
+        final File secretsFile = tmp.newFile("test-secrets");
+        final Properties secrets = new Properties();
+        final String throughputSecretKey = "secret-throughput";
+        secrets.put(throughputSecretKey, "10");
+        try (FileOutputStream secretsOutputStream = new 
FileOutputStream(secretsFile)) {
+            secrets.store(secretsOutputStream, null);
+        }
+
+        ConnectorHandle connectorHandle = 
RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
+        connectorHandle.expectedCommits(NUM_TASKS * 2);
+
+        Map<String, String> connectorConfig = 
defaultSourceConnectorProps(connectorTopic);
+        connectorConfig.put(
+                "throughput",
+                "${file:" + secretsFile.getAbsolutePath() + ":" + 
throughputSecretKey + "}"
+        );
+        connect.configureConnector(CONNECTOR_NAME, connectorConfig);
+        connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                CONNECTOR_NAME,
+                NUM_TASKS,
+                "Connector or its tasks did not start in time"
+        );
+        connectorHandle.awaitCommits(offsetCommitIntervalMs * 3);
+
+        connect.deleteConnector(CONNECTOR_NAME);
+
+        // Roll the entire cluster
+        connect.activeWorkers().forEach(connect::removeWorker);
+
+        // Miserable hack: produce directly to the config topic in order to 
trigger segment rollover

Review Comment:
   Unfortunately I'm not sure there's a better way to trigger compaction. This 
is probably preferable than messing with the KafkaServer objects directly.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -124,7 +124,7 @@
  */
 public abstract class AbstractHerder implements Herder, TaskStatus.Listener, 
ConnectorStatus.Listener {
 
-    private final Logger log = LoggerFactory.getLogger(AbstractHerder.class);
+    private static final Logger log = 
LoggerFactory.getLogger(AbstractHerder.class);

Review Comment:
   `log` -> `LOG`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to