This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 32ea859  MINOR: Small Connect integration test fixes (#8100)
32ea859 is described below

commit 32ea859f04f3714c31af1d8c653c8e06c454d6da
Author: Konstantine Karantasis <konstant...@confluent.io>
AuthorDate: Wed Feb 12 15:40:37 2020 -0800

    MINOR: Small Connect integration test fixes (#8100)
    
    Author: Konstantine Karantasis <konstant...@confluent.io>
    Reviewer: Randall Hauch <rha...@gmail.com>
---
 .../mirror/MirrorConnectorsIntegrationTest.java    | 39 +++++++++++++++-------
 .../integration/ConnectWorkerIntegrationTest.java  |  3 +-
 .../ConnectorCientPolicyIntegrationTest.java       | 15 +++++----
 .../integration/ErrorHandlingIntegrationTest.java  | 11 ++++--
 .../integration/ExampleConnectIntegrationTest.java |  3 +-
 .../RebalanceSourceConnectorsIntegrationTest.java  |  3 +-
 .../integration/RestExtensionIntegrationTest.java  | 11 ++++--
 .../SessionedProtocolIntegrationTest.java          |  3 +-
 .../util/clusters/EmbeddedConnectCluster.java      | 10 +++---
 9 files changed, 63 insertions(+), 35 deletions(-)

diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
index 11abc14..8ae8df3 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
@@ -16,14 +16,13 @@
  */
 package org.apache.kafka.connect.mirror;
 
-import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
-import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
-import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.common.TopicPartition;
-
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.test.IntegrationTest;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -31,17 +30,18 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
-import java.util.Collections;
 import java.util.Properties;
-import java.util.concurrent.TimeoutException;
-import java.time.Duration;
+import java.util.Set;
 
+import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.apache.kafka.test.TestUtils.waitForCondition;
 
 /**
  * Tests MM2 replication and failover/failback logic.
@@ -66,7 +66,7 @@ public class MirrorConnectorsIntegrationTest {
     private EmbeddedConnectCluster backup;
 
     @Before
-    public void setup() throws IOException {
+    public void setup() throws InterruptedException {
         Properties brokerProps = new Properties();
         brokerProps.put("auto.create.topics.enable", "false");
 
@@ -111,7 +111,11 @@ public class MirrorConnectorsIntegrationTest {
                 .build();
 
         primary.start();
+        primary.assertions().assertAtLeastNumWorkersAreUp(3,
+                "Workers of primary-connect-cluster did not start in time.");
         backup.start();
+        primary.assertions().assertAtLeastNumWorkersAreUp(3,
+                "Workers of backup-connect-cluster did not start in time.");
 
         // create these topics before starting the connectors so we don't need 
to wait for discovery
         primary.kafka().createTopic("test-topic-1", NUM_PARTITIONS);
@@ -167,10 +171,21 @@ public class MirrorConnectorsIntegrationTest {
 
         primary.configureConnector("MirrorHeartbeatConnector", 
mm2Config.connectorBaseConfig(new SourceAndTarget("backup", "primary"),
             MirrorHeartbeatConnector.class));
+
+        waitUntilMirrorMakerIsRunning(primary, connectorNames);
+    }
+
+
+    private void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster 
connectCluster,
+        Set<String> connNames) throws InterruptedException {
+        for (String connector : connNames) {
+            
connectCluster.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connector,
 1,
+                    "Connector " + connector + " tasks did not start in time 
on cluster: " + connectCluster);
+        }
     }
 
     @After
-    public void close() throws IOException {
+    public void close() {
         for (String x : primary.connectors()) {
             primary.deleteConnector(x);
         }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 6cc43a4..b8e0497 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -28,7 +28,6 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -66,7 +65,7 @@ public class ConnectWorkerIntegrationTest {
     Properties brokerProps = new Properties();
 
     @Before
-    public void setup() throws IOException {
+    public void setup() {
         // setup Connect worker properties
         workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
         workerProps.put(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, "All");
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorCientPolicyIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorCientPolicyIntegrationTest.java
index 499916b..00f541b 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorCientPolicyIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorCientPolicyIntegrationTest.java
@@ -29,7 +29,6 @@ import org.junit.After;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -50,7 +49,6 @@ public class ConnectorCientPolicyIntegrationTest {
     private static final int NUM_WORKERS = 1;
     private static final String CONNECTOR_NAME = "simple-conn";
 
-
     @After
     public void close() {
     }
@@ -73,7 +71,7 @@ public class ConnectorCientPolicyIntegrationTest {
     @Test
     public void testCreateWithAllowedOverridesForPrincipalPolicy() throws 
Exception {
         Map<String, String> props = basicConnectorConfig();
-        props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + 
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAIN");
+        props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + 
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
         assertPassCreateConnector("Principal", props);
     }
 
@@ -85,7 +83,7 @@ public class ConnectorCientPolicyIntegrationTest {
         assertPassCreateConnector("All", props);
     }
 
-    private EmbeddedConnectCluster connectClusterWithPolicy(String policy) 
throws IOException {
+    private EmbeddedConnectCluster connectClusterWithPolicy(String policy) 
throws InterruptedException {
         // setup Connect worker properties
         Map<String, String> workerProps = new HashMap<>();
         workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(5_000));
@@ -106,10 +104,13 @@ public class ConnectorCientPolicyIntegrationTest {
 
         // start the clusters
         connect.start();
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+                "Initial group of workers did not start in time.");
+
         return connect;
     }
 
-    private void assertFailCreateConnector(String policy, Map<String, String> 
props) throws IOException {
+    private void assertFailCreateConnector(String policy, Map<String, String> 
props) throws InterruptedException {
         EmbeddedConnectCluster connect = connectClusterWithPolicy(policy);
         try {
             connect.configureConnector(CONNECTOR_NAME, props);
@@ -121,10 +122,12 @@ public class ConnectorCientPolicyIntegrationTest {
         }
     }
 
-    private void assertPassCreateConnector(String policy, Map<String, String> 
props) throws IOException {
+    private void assertPassCreateConnector(String policy, Map<String, String> 
props) throws InterruptedException {
         EmbeddedConnectCluster connect = connectClusterWithPolicy(policy);
         try {
             connect.configureConnector(CONNECTOR_NAME, props);
+            
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 NUM_TASKS,
+                    "Connector tasks did not start in time.");
         } catch (ConnectRestException e) {
             fail("Should be able to create connector");
         } finally {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
index 33e6cf5..8963b8c 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -34,7 +34,6 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -69,6 +68,7 @@ public class ErrorHandlingIntegrationTest {
 
     private static final Logger log = 
LoggerFactory.getLogger(ErrorHandlingIntegrationTest.class);
 
+    private static final int NUM_WORKERS = 1;
     private static final String DLQ_TOPIC = "my-connector-errors";
     private static final String CONNECTOR_NAME = "error-conn";
     private static final String TASK_ID = "error-conn-0";
@@ -83,12 +83,14 @@ public class ErrorHandlingIntegrationTest {
     private ConnectorHandle connectorHandle;
 
     @Before
-    public void setup() throws IOException {
+    public void setup() throws InterruptedException {
         // setup Connect cluster with defaults
         connect = new EmbeddedConnectCluster.Builder().build();
 
         // start Connect cluster
         connect.start();
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+                "Initial group of workers did not start in time.");
 
         // get connector handles before starting test.
         connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
@@ -134,6 +136,8 @@ public class ErrorHandlingIntegrationTest {
         
connectorHandle.taskHandle(TASK_ID).expectedRecords(EXPECTED_CORRECT_RECORDS);
 
         connect.configureConnector(CONNECTOR_NAME, props);
+        
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME,
 NUM_TASKS,
+                "Connector tasks did not start in time.");
 
         waitForCondition(this::checkForPartitionAssignment,
                 CONNECTOR_SETUP_DURATION_MS,
@@ -172,6 +176,9 @@ public class ErrorHandlingIntegrationTest {
         }
 
         connect.deleteConnector(CONNECTOR_NAME);
+        connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME,
+                "Connector tasks did not stop in time.");
+
     }
 
     /**
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
index 4da89d7..f4cba87 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
@@ -27,7 +27,6 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -66,7 +65,7 @@ public class ExampleConnectIntegrationTest {
     private ConnectorHandle connectorHandle;
 
     @Before
-    public void setup() throws IOException {
+    public void setup() {
         // setup Connect worker properties
         Map<String, String> exampleWorkerProps = new HashMap<>();
         exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, 
String.valueOf(5_000));
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index 073b307..19e7863 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -28,7 +28,6 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -70,7 +69,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
     private EmbeddedConnectCluster connect;
 
     @Before
-    public void setup() throws IOException {
+    public void setup() {
         // setup Connect worker properties
         Map<String, String> workerProps = new HashMap<>();
         workerProps.put(CONNECT_PROTOCOL_CONFIG, COMPATIBLE.toString());
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
index d5328a6..6ec86bd 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
@@ -34,7 +34,6 @@ import org.junit.experimental.categories.Category;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.core.Response;
-import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -57,11 +56,12 @@ public class RestExtensionIntegrationTest {
 
     private static final long REST_EXTENSION_REGISTRATION_TIMEOUT_MS = 
TimeUnit.MINUTES.toMillis(1);
     private static final long CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS = 
TimeUnit.MINUTES.toMillis(1);
+    private static final int NUM_WORKERS = 1;
 
     private EmbeddedConnectCluster connect;
 
     @Test
-    public void testRestExtensionApi() throws IOException, 
InterruptedException {
+    public void testRestExtensionApi() throws InterruptedException {
         // setup Connect worker properties
         Map<String, String> workerProps = new HashMap<>();
         workerProps.put(REST_EXTENSION_CLASSES_CONFIG, 
IntegrationTestRestExtension.class.getName());
@@ -69,7 +69,7 @@ public class RestExtensionIntegrationTest {
         // build a Connect cluster backed by Kafka and Zk
         connect = new EmbeddedConnectCluster.Builder()
             .name("connect-cluster")
-            .numWorkers(1)
+            .numWorkers(NUM_WORKERS)
             .numBrokers(1)
             .workerProps(workerProps)
             .build();
@@ -77,6 +77,9 @@ public class RestExtensionIntegrationTest {
         // start the clusters
         connect.start();
 
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+                "Initial group of workers did not start in time.");
+
         WorkerHandle worker = connect.workers().stream()
             .findFirst()
             .orElseThrow(() -> new AssertionError("At least one worker handle 
should be available"));
@@ -99,6 +102,8 @@ public class RestExtensionIntegrationTest {
             connectorHandle.taskHandle(connectorHandle.name() + "-0");
             StartAndStopLatch connectorStartLatch = 
connectorHandle.expectedStarts(1);
             connect.configureConnector(connectorHandle.name(), connectorProps);
+            
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorHandle.name(),
 1,
+                    "Connector tasks did not start in time.");
             connectorStartLatch.await(CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
 
             String workerId = String.format("%s:%d", worker.url().getHost(), 
worker.url().getPort());
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
index 1f13c17..8956a86 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
@@ -28,7 +28,6 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -61,7 +60,7 @@ public class SessionedProtocolIntegrationTest {
     private ConnectorHandle connectorHandle;
 
     @Before
-    public void setup() throws IOException {
+    public void setup() {
         // setup Connect worker properties
         Map<String, String> workerProps = new HashMap<>();
         workerProps.put(CONNECT_PROTOCOL_CONFIG, 
ConnectProtocolCompatibility.SESSIONED.protocol());
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index 2276340..961b1d8 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -284,7 +284,8 @@ public class EmbeddedConnectCluster {
         if (response.getStatus() < 
Response.Status.BAD_REQUEST.getStatusCode()) {
             return responseToString(response);
         }
-        throw new ConnectRestException(response.getStatus(), "Could not 
execute PUT request");
+        throw new ConnectRestException(response.getStatus(),
+                "Could not execute PUT request. Error response: " + 
responseToString(response));
     }
 
     /**
@@ -298,7 +299,8 @@ public class EmbeddedConnectCluster {
         String url = endpointForResource(String.format("connectors/%s", 
connName));
         Response response = requestDelete(url);
         if (response.getStatus() >= 
Response.Status.BAD_REQUEST.getStatusCode()) {
-            throw new ConnectRestException(response.getStatus(), "Could not 
execute DELETE request.");
+            throw new ConnectRestException(response.getStatus(),
+                    "Could not execute DELETE request. Error response: " + 
responseToString(response));
         }
     }
 
@@ -358,7 +360,7 @@ public class EmbeddedConnectCluster {
      *
      * @param resource the resource under the worker's admin endpoint
      * @return the admin endpoint URL
-     * @throws ConnectRestException if no admin REST endpoint is available
+     * @throws ConnectException if no admin REST endpoint is available
      */
     public String adminEndpoint(String resource) {
         String url = connectCluster.stream()
@@ -375,7 +377,7 @@ public class EmbeddedConnectCluster {
      *
      * @param resource the resource under the worker's admin endpoint
      * @return the admin endpoint URL
-     * @throws ConnectRestException if no REST endpoint is available
+     * @throws ConnectException if no REST endpoint is available
      */
     public String endpointForResource(String resource) {
         String url = connectCluster.stream()

Reply via email to