C0urante commented on code in PR #16621:
URL: https://github.com/apache/kafka/pull/16621#discussion_r1683225700


##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -1375,6 +1380,55 @@ public void testRuntimePropertyReconfiguration() throws 
Exception {
         );
     }
 
+    @Test
+    public void testPluginAliases() throws Exception {
+        connect = connectBuilder.build();
+        // start the clusters
+        connect.start();
+
+        // Create a topic; not strictly necessary but prevents log spam when 
we start a source connector later
+        final String topic = "kafka17510";
+        connect.kafka().createTopic(topic, 1);
+
+        Map<String, String> baseConnectorConfig = new HashMap<>();
+        // General connector properties
+        baseConnectorConfig.put(TASKS_MAX_CONFIG, Integer.toString(NUM_TASKS));
+        // Aliased converter classes
+        baseConnectorConfig.put(KEY_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getSimpleName());
+        baseConnectorConfig.put(VALUE_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getSimpleName());
+        baseConnectorConfig.put(HEADER_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getSimpleName());
+        // Aliased SMT and predicate classes
+        baseConnectorConfig.put(TRANSFORMS_CONFIG, "filter");
+        baseConnectorConfig.put(TRANSFORMS_CONFIG + ".filter.type", 
Filter.class.getSimpleName());
+        baseConnectorConfig.put(TRANSFORMS_CONFIG + ".filter.predicate", 
"tombstone");
+        baseConnectorConfig.put(PREDICATES_CONFIG, "tombstone");
+        baseConnectorConfig.put(PREDICATES_CONFIG + ".tombstone.type", 
RecordIsTombstone.class.getSimpleName());
+
+        // Test a source connector
+        final String sourceConnectorName = "plugins-alias-test-source";
+        Map<String, String> sourceConnectorConfig = new 
HashMap<>(baseConnectorConfig);
+        // Aliased source connector class
+        sourceConnectorConfig.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSourceConnector.class.getSimpleName());
+        // Connector-specific properties
+        sourceConnectorConfig.put(TOPIC_CONFIG, topic);
+        sourceConnectorConfig.put("throughput", "10");
+        sourceConnectorConfig.put("messages.per.poll", 
String.valueOf(MESSAGES_PER_POLL));
+        // Create the connector and ensure it and its tasks can start
+        connect.configureConnector(sourceConnectorName, sourceConnectorConfig);
+        
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(sourceConnectorName,
 NUM_TASKS, "Connector and tasks did not start in time");
+
+        // Test a sink connector
+        final String sinkConnectorName = "plugins-alias-test-sink";
+        Map<String, String> sinkConnectorConfig = new 
HashMap<>(baseConnectorConfig);
+        // Aliased sink connector class
+        sinkConnectorConfig.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSinkConnector.class.getSimpleName());
+        // Connector-specific properties
+        sinkConnectorConfig.put(TOPICS_CONFIG, topic);
+        // Create the connector and ensure it and its tasks can start
+        connect.configureConnector(sinkConnectorName, sinkConnectorConfig);
+        
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(sourceConnectorName,
 NUM_TASKS, "Connector and tasks did not start in time");

Review Comment:
   🤦 Thanks for the catch! Fixed in the latest commit. I also deleted each 
connector after they were no longer necessary, just to prevent typos like this 
from slipping in in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to