C0urante commented on code in PR #16621:
URL: https://github.com/apache/kafka/pull/16621#discussion_r1683225700
##########
connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java:
##########
@@ -1375,6 +1380,55 @@ public void testRuntimePropertyReconfiguration() throws
Exception {
);
}
+ @Test
+ public void testPluginAliases() throws Exception {
+ connect = connectBuilder.build();
+ // start the clusters
+ connect.start();
+
+ // Create a topic; not strictly necessary but prevents log spam when
we start a source connector later
+ final String topic = "kafka17510";
+ connect.kafka().createTopic(topic, 1);
+
+ Map<String, String> baseConnectorConfig = new HashMap<>();
+ // General connector properties
+ baseConnectorConfig.put(TASKS_MAX_CONFIG, Integer.toString(NUM_TASKS));
+ // Aliased converter classes
+ baseConnectorConfig.put(KEY_CONVERTER_CLASS_CONFIG,
StringConverter.class.getSimpleName());
+ baseConnectorConfig.put(VALUE_CONVERTER_CLASS_CONFIG,
StringConverter.class.getSimpleName());
+ baseConnectorConfig.put(HEADER_CONVERTER_CLASS_CONFIG,
StringConverter.class.getSimpleName());
+ // Aliased SMT and predicate classes
+ baseConnectorConfig.put(TRANSFORMS_CONFIG, "filter");
+ baseConnectorConfig.put(TRANSFORMS_CONFIG + ".filter.type",
Filter.class.getSimpleName());
+ baseConnectorConfig.put(TRANSFORMS_CONFIG + ".filter.predicate",
"tombstone");
+ baseConnectorConfig.put(PREDICATES_CONFIG, "tombstone");
+ baseConnectorConfig.put(PREDICATES_CONFIG + ".tombstone.type",
RecordIsTombstone.class.getSimpleName());
+
+ // Test a source connector
+ final String sourceConnectorName = "plugins-alias-test-source";
+ Map<String, String> sourceConnectorConfig = new
HashMap<>(baseConnectorConfig);
+ // Aliased source connector class
+ sourceConnectorConfig.put(CONNECTOR_CLASS_CONFIG,
MonitorableSourceConnector.class.getSimpleName());
+ // Connector-specific properties
+ sourceConnectorConfig.put(TOPIC_CONFIG, topic);
+ sourceConnectorConfig.put("throughput", "10");
+ sourceConnectorConfig.put("messages.per.poll",
String.valueOf(MESSAGES_PER_POLL));
+ // Create the connector and ensure it and its tasks can start
+ connect.configureConnector(sourceConnectorName, sourceConnectorConfig);
+
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(sourceConnectorName,
NUM_TASKS, "Connector and tasks did not start in time");
+
+ // Test a sink connector
+ final String sinkConnectorName = "plugins-alias-test-sink";
+ Map<String, String> sinkConnectorConfig = new
HashMap<>(baseConnectorConfig);
+ // Aliased sink connector class
+ sinkConnectorConfig.put(CONNECTOR_CLASS_CONFIG,
MonitorableSinkConnector.class.getSimpleName());
+ // Connector-specific properties
+ sinkConnectorConfig.put(TOPICS_CONFIG, topic);
+ // Create the connector and ensure it and its tasks can start
+ connect.configureConnector(sinkConnectorName, sinkConnectorConfig);
+
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(sourceConnectorName,
NUM_TASKS, "Connector and tasks did not start in time");
Review Comment:
🤦 Thanks for the catch! Fixed in the latest commit. I also deleted each
connector after they were no longer necessary, just to prevent typos like this
from slipping in in the future.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]