This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push: new 32ea859 MINOR: Small Connect integration test fixes (#8100) 32ea859 is described below commit 32ea859f04f3714c31af1d8c653c8e06c454d6da Author: Konstantine Karantasis <konstant...@confluent.io> AuthorDate: Wed Feb 12 15:40:37 2020 -0800 MINOR: Small Connect integration test fixes (#8100) Author: Konstantine Karantasis <konstant...@confluent.io> Reviewer: Randall Hauch <rha...@gmail.com> --- .../mirror/MirrorConnectorsIntegrationTest.java | 39 +++++++++++++++------- .../integration/ConnectWorkerIntegrationTest.java | 3 +- .../ConnectorCientPolicyIntegrationTest.java | 15 +++++---- .../integration/ErrorHandlingIntegrationTest.java | 11 ++++-- .../integration/ExampleConnectIntegrationTest.java | 3 +- .../RebalanceSourceConnectorsIntegrationTest.java | 3 +- .../integration/RestExtensionIntegrationTest.java | 11 ++++-- .../SessionedProtocolIntegrationTest.java | 3 +- .../util/clusters/EmbeddedConnectCluster.java | 10 +++--- 9 files changed, 63 insertions(+), 35 deletions(-) diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java index 11abc14..8ae8df3 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java @@ -16,14 +16,13 @@ */ package org.apache.kafka.connect.mirror; -import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; -import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; -import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.common.TopicPartition; - +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import org.apache.kafka.test.IntegrationTest; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -31,17 +30,18 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; -import java.util.Collections; import java.util.Properties; -import java.util.concurrent.TimeoutException; -import java.time.Duration; +import java.util.Set; +import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.apache.kafka.test.TestUtils.waitForCondition; /** * Tests MM2 replication and failover/failback logic. @@ -66,7 +66,7 @@ public class MirrorConnectorsIntegrationTest { private EmbeddedConnectCluster backup; @Before - public void setup() throws IOException { + public void setup() throws InterruptedException { Properties brokerProps = new Properties(); brokerProps.put("auto.create.topics.enable", "false"); @@ -111,7 +111,11 @@ public class MirrorConnectorsIntegrationTest { .build(); primary.start(); + primary.assertions().assertAtLeastNumWorkersAreUp(3, + "Workers of primary-connect-cluster did not start in time."); backup.start(); + primary.assertions().assertAtLeastNumWorkersAreUp(3, + "Workers of backup-connect-cluster did not start in time."); // create these topics before starting the connectors so we don't need to wait for discovery primary.kafka().createTopic("test-topic-1", NUM_PARTITIONS); @@ -167,10 +171,21 @@ public class MirrorConnectorsIntegrationTest { primary.configureConnector("MirrorHeartbeatConnector", mm2Config.connectorBaseConfig(new SourceAndTarget("backup", "primary"), MirrorHeartbeatConnector.class)); + + waitUntilMirrorMakerIsRunning(primary, connectorNames); + } + + + private void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connectCluster, + Set<String> connNames) throws InterruptedException { + for (String connector : connNames) { + connectCluster.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connector, 1, + "Connector " + connector + " tasks did not start in time on cluster: " + connectCluster); + } } @After - public void close() throws IOException { + public void close() { for (String x : primary.connectors()) { primary.deleteConnector(x); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index 6cc43a4..b8e0497 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -28,7 +28,6 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -66,7 +65,7 @@ public class ConnectWorkerIntegrationTest { Properties brokerProps = new Properties(); @Before - public void setup() throws IOException { + public void setup() { // setup Connect worker properties workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS)); workerProps.put(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, "All"); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorCientPolicyIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorCientPolicyIntegrationTest.java index 499916b..00f541b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorCientPolicyIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorCientPolicyIntegrationTest.java @@ -29,7 +29,6 @@ import org.junit.After; import org.junit.Test; import org.junit.experimental.categories.Category; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -50,7 +49,6 @@ public class ConnectorCientPolicyIntegrationTest { private static final int NUM_WORKERS = 1; private static final String CONNECTOR_NAME = "simple-conn"; - @After public void close() { } @@ -73,7 +71,7 @@ public class ConnectorCientPolicyIntegrationTest { @Test public void testCreateWithAllowedOverridesForPrincipalPolicy() throws Exception { Map<String, String> props = basicConnectorConfig(); - props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAIN"); + props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT"); assertPassCreateConnector("Principal", props); } @@ -85,7 +83,7 @@ public class ConnectorCientPolicyIntegrationTest { assertPassCreateConnector("All", props); } - private EmbeddedConnectCluster connectClusterWithPolicy(String policy) throws IOException { + private EmbeddedConnectCluster connectClusterWithPolicy(String policy) throws InterruptedException { // setup Connect worker properties Map<String, String> workerProps = new HashMap<>(); workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000)); @@ -106,10 +104,13 @@ public class ConnectorCientPolicyIntegrationTest { // start the clusters connect.start(); + connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, + "Initial group of workers did not start in time."); + return connect; } - private void assertFailCreateConnector(String policy, Map<String, String> props) throws IOException { + private void assertFailCreateConnector(String policy, Map<String, String> props) throws InterruptedException { EmbeddedConnectCluster connect = connectClusterWithPolicy(policy); try { connect.configureConnector(CONNECTOR_NAME, props); @@ -121,10 +122,12 @@ public class ConnectorCientPolicyIntegrationTest { } } - private void assertPassCreateConnector(String policy, Map<String, String> props) throws IOException { + private void assertPassCreateConnector(String policy, Map<String, String> props) throws InterruptedException { EmbeddedConnectCluster connect = connectClusterWithPolicy(policy); try { connect.configureConnector(CONNECTOR_NAME, props); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + "Connector tasks did not start in time."); } catch (ConnectRestException e) { fail("Should be able to create connector"); } finally { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java index 33e6cf5..8963b8c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java @@ -34,7 +34,6 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -69,6 +68,7 @@ public class ErrorHandlingIntegrationTest { private static final Logger log = LoggerFactory.getLogger(ErrorHandlingIntegrationTest.class); + private static final int NUM_WORKERS = 1; private static final String DLQ_TOPIC = "my-connector-errors"; private static final String CONNECTOR_NAME = "error-conn"; private static final String TASK_ID = "error-conn-0"; @@ -83,12 +83,14 @@ public class ErrorHandlingIntegrationTest { private ConnectorHandle connectorHandle; @Before - public void setup() throws IOException { + public void setup() throws InterruptedException { // setup Connect cluster with defaults connect = new EmbeddedConnectCluster.Builder().build(); // start Connect cluster connect.start(); + connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, + "Initial group of workers did not start in time."); // get connector handles before starting test. connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME); @@ -134,6 +136,8 @@ public class ErrorHandlingIntegrationTest { connectorHandle.taskHandle(TASK_ID).expectedRecords(EXPECTED_CORRECT_RECORDS); connect.configureConnector(CONNECTOR_NAME, props); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS, + "Connector tasks did not start in time."); waitForCondition(this::checkForPartitionAssignment, CONNECTOR_SETUP_DURATION_MS, @@ -172,6 +176,9 @@ public class ErrorHandlingIntegrationTest { } connect.deleteConnector(CONNECTOR_NAME); + connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME, + "Connector tasks did not stop in time."); + } /** diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java index 4da89d7..f4cba87 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java @@ -27,7 +27,6 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -66,7 +65,7 @@ public class ExampleConnectIntegrationTest { private ConnectorHandle connectorHandle; @Before - public void setup() throws IOException { + public void setup() { // setup Connect worker properties Map<String, String> exampleWorkerProps = new HashMap<>(); exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000)); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java index 073b307..19e7863 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java @@ -28,7 +28,6 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -70,7 +69,7 @@ public class RebalanceSourceConnectorsIntegrationTest { private EmbeddedConnectCluster connect; @Before - public void setup() throws IOException { + public void setup() { // setup Connect worker properties Map<String, String> workerProps = new HashMap<>(); workerProps.put(CONNECT_PROTOCOL_CONFIG, COMPATIBLE.toString()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java index d5328a6..6ec86bd 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java @@ -34,7 +34,6 @@ import org.junit.experimental.categories.Category; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.core.Response; -import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -57,11 +56,12 @@ public class RestExtensionIntegrationTest { private static final long REST_EXTENSION_REGISTRATION_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1); private static final long CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1); + private static final int NUM_WORKERS = 1; private EmbeddedConnectCluster connect; @Test - public void testRestExtensionApi() throws IOException, InterruptedException { + public void testRestExtensionApi() throws InterruptedException { // setup Connect worker properties Map<String, String> workerProps = new HashMap<>(); workerProps.put(REST_EXTENSION_CLASSES_CONFIG, IntegrationTestRestExtension.class.getName()); @@ -69,7 +69,7 @@ public class RestExtensionIntegrationTest { // build a Connect cluster backed by Kafka and Zk connect = new EmbeddedConnectCluster.Builder() .name("connect-cluster") - .numWorkers(1) + .numWorkers(NUM_WORKERS) .numBrokers(1) .workerProps(workerProps) .build(); @@ -77,6 +77,9 @@ public class RestExtensionIntegrationTest { // start the clusters connect.start(); + connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS, + "Initial group of workers did not start in time."); + WorkerHandle worker = connect.workers().stream() .findFirst() .orElseThrow(() -> new AssertionError("At least one worker handle should be available")); @@ -99,6 +102,8 @@ public class RestExtensionIntegrationTest { connectorHandle.taskHandle(connectorHandle.name() + "-0"); StartAndStopLatch connectorStartLatch = connectorHandle.expectedStarts(1); connect.configureConnector(connectorHandle.name(), connectorProps); + connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorHandle.name(), 1, + "Connector tasks did not start in time."); connectorStartLatch.await(CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS, TimeUnit.MILLISECONDS); String workerId = String.format("%s:%d", worker.url().getHost(), worker.url().getPort()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java index 1f13c17..8956a86 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java @@ -28,7 +28,6 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -61,7 +60,7 @@ public class SessionedProtocolIntegrationTest { private ConnectorHandle connectorHandle; @Before - public void setup() throws IOException { + public void setup() { // setup Connect worker properties Map<String, String> workerProps = new HashMap<>(); workerProps.put(CONNECT_PROTOCOL_CONFIG, ConnectProtocolCompatibility.SESSIONED.protocol()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java index 2276340..961b1d8 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java @@ -284,7 +284,8 @@ public class EmbeddedConnectCluster { if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) { return responseToString(response); } - throw new ConnectRestException(response.getStatus(), "Could not execute PUT request"); + throw new ConnectRestException(response.getStatus(), + "Could not execute PUT request. Error response: " + responseToString(response)); } /** @@ -298,7 +299,8 @@ public class EmbeddedConnectCluster { String url = endpointForResource(String.format("connectors/%s", connName)); Response response = requestDelete(url); if (response.getStatus() >= Response.Status.BAD_REQUEST.getStatusCode()) { - throw new ConnectRestException(response.getStatus(), "Could not execute DELETE request."); + throw new ConnectRestException(response.getStatus(), + "Could not execute DELETE request. Error response: " + responseToString(response)); } } @@ -358,7 +360,7 @@ public class EmbeddedConnectCluster { * * @param resource the resource under the worker's admin endpoint * @return the admin endpoint URL - * @throws ConnectRestException if no admin REST endpoint is available + * @throws ConnectException if no admin REST endpoint is available */ public String adminEndpoint(String resource) { String url = connectCluster.stream() @@ -375,7 +377,7 @@ public class EmbeddedConnectCluster { * * @param resource the resource under the worker's admin endpoint * @return the admin endpoint URL - * @throws ConnectRestException if no REST endpoint is available + * @throws ConnectException if no REST endpoint is available */ public String endpointForResource(String resource) { String url = connectCluster.stream()