This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 09313b5  KAFKA-8449: Restart tasks on reconfiguration under 
incremental cooperative rebalancing (#6850)
09313b5 is described below

commit 09313b5fda6ecf629a29d2e8285c6b21c91ddff2
Author: Konstantine Karantasis <konstant...@confluent.io>
AuthorDate: Mon Jun 3 09:13:40 2019 -0700

    KAFKA-8449: Restart tasks on reconfiguration under incremental cooperative 
rebalancing (#6850)
    
    Restart task on reconfiguration under incremental cooperative rebalancing, 
and keep execution paths separate for config updates between eager and 
cooperative. Include the group generation in the log message when the worker 
receives its assignment.
    
    Author: Konstantine Karantasis <konstant...@confluent.io>
    Reviewer: Randall Hauch <rha...@gmail.com>
---
 .../runtime/distributed/DistributedHerder.java     | 203 ++++++++++++++++-----
 .../integration/MonitorableSourceConnector.java    |   5 +-
 .../RebalanceSourceConnectorsIntegrationTest.java  |  63 ++++++-
 3 files changed, 220 insertions(+), 51 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 585836e..52709f7 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -80,6 +80,8 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import static 
org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
 
@@ -141,6 +143,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
     // and the from other nodes are safe to process
     private boolean rebalanceResolved;
     private ExtendedAssignment runningAssignment = ExtendedAssignment.empty();
+    private Set<ConnectorTaskId> tasksToRestart = new HashSet<>();
     private ExtendedAssignment assignment;
     private boolean canReadConfigs;
     private ClusterConfigState configState;
@@ -151,6 +154,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
     // Config updates can be collected and applied together when possible. 
Also, we need to take care to rebalance when
     // needed (e.g. task reconfiguration, which requires everyone to 
coordinate offset commits).
     private Set<String> connectorConfigUpdates = new HashSet<>();
+    private Set<ConnectorTaskId> taskConfigUpdates = new HashSet<>();
     // Similarly collect target state changes (when observed by the config 
storage listener) for handling in the
     // herder's main thread.
     private Set<String> connectorTargetStateChanges = new HashSet<>();
@@ -304,51 +308,47 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
         }
 
         // Process any configuration updates
-        Set<String> connectorConfigUpdatesCopy = null;
-        Set<String> connectorTargetStateChangesCopy = null;
-        synchronized (this) {
-            if (needsReconfigRebalance || !connectorConfigUpdates.isEmpty() || 
!connectorTargetStateChanges.isEmpty()) {
-                // Connector reconfigs only need local updates since there is 
no coordination between workers required.
-                // However, if connectors were added or removed, work needs to 
be rebalanced since we have more work
-                // items to distribute among workers.
-                configState = configBackingStore.snapshot();
-
-                if (needsReconfigRebalance) {
-                    // Task reconfigs require a rebalance. Request the 
rebalance, clean out state, and then restart
-                    // this loop, which will then ensure the rebalance occurs 
without any other requests being
-                    // processed until it completes.
-                    member.requestRejoin();
-                    // Any connector config updates or target state changes 
will be addressed during the rebalance too
-                    connectorConfigUpdates.clear();
-                    connectorTargetStateChanges.clear();
-                    needsReconfigRebalance = false;
-                    log.debug("Requesting rebalance due to reconfiguration of 
tasks (needsReconfigRebalance: {})",
-                            needsReconfigRebalance);
-                    return;
-                } else {
-                    if (!connectorConfigUpdates.isEmpty()) {
-                        // We can't start/stop while locked since starting 
connectors can cause task updates that will
-                        // require writing configs, which in turn make 
callbacks into this class from another thread that
-                        // require acquiring a lock. This leads to deadlock. 
Instead, just copy the info we need and process
-                        // the updates after unlocking.
-                        connectorConfigUpdatesCopy = connectorConfigUpdates;
-                        connectorConfigUpdates = new HashSet<>();
-                    }
+        AtomicReference<Set<String>> connectorConfigUpdatesCopy = new 
AtomicReference<>();
+        AtomicReference<Set<String>> connectorTargetStateChangesCopy = new 
AtomicReference<>();
+        AtomicReference<Set<ConnectorTaskId>> taskConfigUpdatesCopy = new 
AtomicReference<>();
+
+        boolean shouldReturn;
+        if (member.currentProtocolVersion() == CONNECT_PROTOCOL_V0) {
+            shouldReturn = updateConfigsWithEager(connectorConfigUpdatesCopy,
+                    connectorTargetStateChangesCopy);
+            // With eager protocol we should return immediately if 
needsReconfigRebalance has
+            // been set to retain the old workflow
+            if (shouldReturn) {
+                return;
+            }
 
-                    if (!connectorTargetStateChanges.isEmpty()) {
-                        // Similarly for target state changes which can cause 
connectors to be restarted
-                        connectorTargetStateChangesCopy = 
connectorTargetStateChanges;
-                        connectorTargetStateChanges = new HashSet<>();
-                    }
-                }
+            if (connectorConfigUpdatesCopy.get() != null) {
+                
processConnectorConfigUpdates(connectorConfigUpdatesCopy.get());
             }
-        }
 
-        if (connectorConfigUpdatesCopy != null)
-            processConnectorConfigUpdates(connectorConfigUpdatesCopy);
+            if (connectorTargetStateChangesCopy.get() != null) {
+                
processTargetStateChanges(connectorTargetStateChangesCopy.get());
+            }
+        } else {
+            shouldReturn = 
updateConfigsWithIncrementalCooperative(connectorConfigUpdatesCopy,
+                    connectorTargetStateChangesCopy, taskConfigUpdatesCopy);
+
+            if (connectorConfigUpdatesCopy.get() != null) {
+                
processConnectorConfigUpdates(connectorConfigUpdatesCopy.get());
+            }
+
+            if (connectorTargetStateChangesCopy.get() != null) {
+                
processTargetStateChanges(connectorTargetStateChangesCopy.get());
+            }
 
-        if (connectorTargetStateChangesCopy != null)
-            processTargetStateChanges(connectorTargetStateChangesCopy);
+            if (taskConfigUpdatesCopy.get() != null) {
+                
processTaskConfigUpdatesWithIncrementalCooperative(taskConfigUpdatesCopy.get());
+            }
+
+            if (shouldReturn) {
+                return;
+            }
+        }
 
         // Let the group take any actions it needs to
         try {
@@ -360,6 +360,95 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
         }
     }
 
+    private synchronized boolean 
updateConfigsWithEager(AtomicReference<Set<String>> connectorConfigUpdatesCopy,
+                                                        
AtomicReference<Set<String>> connectorTargetStateChangesCopy) {
+        // This branch is here to avoid creating a snapshot if not needed
+        if (needsReconfigRebalance
+                || !connectorConfigUpdates.isEmpty()
+                || !connectorTargetStateChanges.isEmpty()) {
+            // Connector reconfigs only need local updates since there is no 
coordination between workers required.
+            // However, if connectors were added or removed, work needs to be 
rebalanced since we have more work
+            // items to distribute among workers.
+            configState = configBackingStore.snapshot();
+
+            if (needsReconfigRebalance) {
+                // Task reconfigs require a rebalance. Request the rebalance, 
clean out state, and then restart
+                // this loop, which will then ensure the rebalance occurs 
without any other requests being
+                // processed until it completes.
+                log.debug("Requesting rebalance due to reconfiguration of 
tasks (needsReconfigRebalance: {})",
+                        needsReconfigRebalance);
+                member.requestRejoin();
+                needsReconfigRebalance = false;
+                // Any connector config updates or target state changes will 
be addressed during the rebalance too
+                connectorConfigUpdates.clear();
+                connectorTargetStateChanges.clear();
+                return true;
+            } else {
+                if (!connectorConfigUpdates.isEmpty()) {
+                    // We can't start/stop while locked since starting 
connectors can cause task updates that will
+                    // require writing configs, which in turn make callbacks 
into this class from another thread that
+                    // require acquiring a lock. This leads to deadlock. 
Instead, just copy the info we need and process
+                    // the updates after unlocking.
+                    connectorConfigUpdatesCopy.set(connectorConfigUpdates);
+                    connectorConfigUpdates = new HashSet<>();
+                }
+
+                if (!connectorTargetStateChanges.isEmpty()) {
+                    // Similarly for target state changes which can cause 
connectors to be restarted
+                    
connectorTargetStateChangesCopy.set(connectorTargetStateChanges);
+                    connectorTargetStateChanges = new HashSet<>();
+                }
+            }
+        }
+        return false;
+    }
+
+    private synchronized boolean 
updateConfigsWithIncrementalCooperative(AtomicReference<Set<String>> 
connectorConfigUpdatesCopy,
+                                                                         
AtomicReference<Set<String>> connectorTargetStateChangesCopy,
+                                                                         
AtomicReference<Set<ConnectorTaskId>> taskConfigUpdatesCopy) {
+        boolean retValue = false;
+        // This branch is here to avoid creating a snapshot if not needed
+        if (needsReconfigRebalance
+                || !connectorConfigUpdates.isEmpty()
+                || !connectorTargetStateChanges.isEmpty()
+                || !taskConfigUpdates.isEmpty()) {
+            // Connector reconfigs only need local updates since there is no 
coordination between workers required.
+            // However, if connectors were added or removed, work needs to be 
rebalanced since we have more work
+            // items to distribute among workers.
+            configState = configBackingStore.snapshot();
+
+            if (needsReconfigRebalance) {
+                log.debug("Requesting rebalance due to reconfiguration of 
tasks (needsReconfigRebalance: {})",
+                        needsReconfigRebalance);
+                member.requestRejoin();
+                needsReconfigRebalance = false;
+                retValue = true;
+            }
+
+            if (!connectorConfigUpdates.isEmpty()) {
+                // We can't start/stop while locked since starting connectors 
can cause task updates that will
+                // require writing configs, which in turn make callbacks into 
this class from another thread that
+                // require acquiring a lock. This leads to deadlock. Instead, 
just copy the info we need and process
+                // the updates after unlocking.
+                connectorConfigUpdatesCopy.set(connectorConfigUpdates);
+                connectorConfigUpdates = new HashSet<>();
+            }
+
+            if (!connectorTargetStateChanges.isEmpty()) {
+                // Similarly for target state changes which can cause 
connectors to be restarted
+                
connectorTargetStateChangesCopy.set(connectorTargetStateChanges);
+                connectorTargetStateChanges = new HashSet<>();
+            }
+
+            if (!taskConfigUpdates.isEmpty()) {
+                // Similarly for task config updates
+                taskConfigUpdatesCopy.set(taskConfigUpdates);
+                taskConfigUpdates = new HashSet<>();
+            }
+        }
+        return retValue;
+    }
+
     private void processConnectorConfigUpdates(Set<String> 
connectorConfigUpdates) {
         // If we only have connector config updates, we can just bounce the 
updated connectors that are
         // currently assigned to this worker.
@@ -396,6 +485,21 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
         }
     }
 
+    private void 
processTaskConfigUpdatesWithIncrementalCooperative(Set<ConnectorTaskId> 
taskConfigUpdates) {
+        Set<ConnectorTaskId> localTasks = assignment == null
+                                          ? Collections.emptySet()
+                                          : new HashSet<>(assignment.tasks());
+        Set<String> connectorsWhoseTasksToStop = taskConfigUpdates.stream()
+                .map(ConnectorTaskId::connector).collect(Collectors.toSet());
+
+        List<ConnectorTaskId> tasksToStop = localTasks.stream()
+                .filter(taskId -> 
connectorsWhoseTasksToStop.contains(taskId.connector()))
+                .collect(Collectors.toList());
+        log.info("Handling task config update by restarting tasks {}", 
tasksToStop);
+        worker.stopAndAwaitTasks(tasksToStop);
+        tasksToRestart.addAll(tasksToStop);
+    }
+
     // public for testing
     public void halt() {
         synchronized (this) {
@@ -900,6 +1004,12 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
             callables.add(getConnectorStartingCallable(connectorName));
         }
 
+        // These tasks have been stopped by this worker due to task 
reconfiguration. In order to
+        // restart them, they are removed just before the overall task startup 
from the set of
+        // currently running tasks. Therefore, they'll be restarted only if 
they are included in
+        // the assignment that was just received after rebalancing.
+        runningAssignment.tasks().removeAll(tasksToRestart);
+        tasksToRestart.clear();
         for (ConnectorTaskId taskId : assignmentDifference(assignment.tasks(), 
runningAssignment.tasks())) {
             callables.add(getTaskStartingCallable(taskId));
         }
@@ -1172,12 +1282,17 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
         public void onTaskConfigUpdate(Collection<ConnectorTaskId> tasks) {
             log.info("Tasks {} configs updated", tasks);
 
-            // Stage the update and wake up the work thread. No need to record 
the set of tasks here because task reconfigs
-            // always need a rebalance to ensure offsets get committed.
+            // Stage the update and wake up the work thread.
+            // The set of tasks is recorder for incremental cooperative 
rebalancing, in which
+            // tasks don't get restarted unless they are balanced between 
workers.
+            // With eager rebalancing there's no need to record the set of 
tasks because task reconfigs
+            // always need a rebalance to ensure offsets get committed. In 
eager rebalancing the
+            // recorded set of tasks remains unused.
             // TODO: As an optimization, some task config updates could avoid 
a rebalance. In particular, single-task
             // connectors clearly don't need any coordination.
             synchronized (DistributedHerder.this) {
                 needsReconfigRebalance = true;
+                taskConfigUpdates.addAll(tasks);
             }
             member.wakeup();
         }
@@ -1279,7 +1394,7 @@ public class DistributedHerder extends AbstractHerder 
implements Runnable {
             // catch up (or backoff if we fail) not executed in a callback, 
and so we'll be able to invoke other
             // group membership actions (e.g., we may need to explicitly leave 
the group if we cannot handle the
             // assigned tasks).
-            log.info("Joined group and got assignment: {}", assignment);
+            log.info("Joined group at generation {} and got assignment: {}", 
generation, assignment);
             synchronized (DistributedHerder.this) {
                 DistributedHerder.this.assignment = assignment;
                 DistributedHerder.this.generation = generation;
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
index 8bc8953..2ca7698 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
@@ -44,6 +44,7 @@ import java.util.stream.LongStream;
 public class MonitorableSourceConnector extends TestSourceConnector {
     private static final Logger log = 
LoggerFactory.getLogger(MonitorableSourceConnector.class);
 
+    public static final String TOPIC_CONFIG = "topic";
     private String connectorName;
     private ConnectorHandle connectorHandle;
     private Map<String, String> commonConfigs;
@@ -105,7 +106,7 @@ public class MonitorableSourceConnector extends 
TestSourceConnector {
         public void start(Map<String, String> props) {
             taskId = props.get("task.id");
             connectorName = props.get("connector.name");
-            topicName = props.getOrDefault("topic", "sequential-topic");
+            topicName = props.getOrDefault(TOPIC_CONFIG, "sequential-topic");
             throughput = Long.valueOf(props.getOrDefault("throughput", "-1"));
             batchSize = 
Integer.valueOf(props.getOrDefault("messages.per.poll", "1"));
             taskHandle = 
RuntimeHandles.get().connectorHandle(connectorName).taskHandle(taskId);
@@ -113,7 +114,7 @@ public class MonitorableSourceConnector extends 
TestSourceConnector {
                     
context.offsetStorageReader().offset(Collections.singletonMap("task.id", 
taskId)))
                     .orElse(Collections.emptyMap());
             startingSeqno = Optional.ofNullable((Long) 
offset.get("saved")).orElse(0L);
-            log.info("Started {} task {}", this.getClass().getSimpleName(), 
taskId);
+            log.info("Started {} task {} with properties {}", 
this.getClass().getSimpleName(), taskId, props);
             throttler = new ThroughputThrottler(throughput, 
System.currentTimeMillis());
         }
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index b0125b2..d3cc8db 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -39,17 +39,18 @@ import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static 
org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
 import static 
org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
-import static 
org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
 import static 
org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.COMPATIBLE;
 import static 
org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONNECT_PROTOCOL_CONFIG;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Integration tests for incremental cooperative rebalancing between Connect 
workers
@@ -109,7 +110,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
         props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
         props.put("throughput", String.valueOf(1));
         props.put("messages.per.poll", String.valueOf(10));
-        props.put(TOPICS_CONFIG, TOPIC_NAME);
+        props.put(TOPIC_CONFIG, TOPIC_NAME);
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
 
@@ -130,6 +131,58 @@ public class RebalanceSourceConnectorsIntegrationTest {
     }
 
     @Test
+    public void testReconfigConnector() throws Exception {
+        ConnectorHandle connectorHandle = 
RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
+
+        // create test topic
+        String anotherTopic = "another-topic";
+        connect.kafka().createTopic(TOPIC_NAME, NUM_TOPIC_PARTITIONS);
+        connect.kafka().createTopic(anotherTopic, NUM_TOPIC_PARTITIONS);
+
+        // setup up props for the source connector
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, 
MonitorableSourceConnector.class.getSimpleName());
+        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+        props.put("throughput", String.valueOf(1));
+        props.put("messages.per.poll", String.valueOf(10));
+        props.put(TOPIC_CONFIG, TOPIC_NAME);
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
+
+        // start a source connector
+        connect.configureConnector(CONNECTOR_NAME, props);
+
+        waitForCondition(() -> 
this.assertConnectorAndTasksRunning(CONNECTOR_NAME, NUM_TASKS).orElse(false),
+                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in 
time.");
+
+        int numRecordsProduced = 100;
+        int recordTransferDurationMs = 5000;
+
+        // consume all records from the source topic or fail, to ensure that 
they were correctly produced
+        int recordNum = connect.kafka().consume(numRecordsProduced, 
recordTransferDurationMs, TOPIC_NAME).count();
+        assertTrue("Not enough records produced by source connector. Expected 
at least: " + numRecordsProduced + " + but got " + recordNum,
+                recordNum >= numRecordsProduced);
+
+        // Reconfigure the source connector by changing the Kafka topic used 
as output
+        props.put(TOPIC_CONFIG, anotherTopic);
+        connect.configureConnector(CONNECTOR_NAME, props);
+
+        waitForCondition(() -> 
this.assertConnectorAndTasksRunning(CONNECTOR_NAME, NUM_TASKS).orElse(false),
+                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in 
time.");
+
+        // expect all records to be produced by the connector
+        connectorHandle.expectedRecords(numRecordsProduced);
+
+        // expect all records to be produced by the connector
+        connectorHandle.expectedCommits(numRecordsProduced);
+
+        // consume all records from the source topic or fail, to ensure that 
they were correctly produced
+        recordNum = connect.kafka().consume(numRecordsProduced, 
recordTransferDurationMs, anotherTopic).count();
+        assertTrue("Not enough records produced by source connector. Expected 
at least: " + numRecordsProduced + " + but got " + recordNum,
+                recordNum >= numRecordsProduced);
+    }
+
+    @Test
     public void testDeleteConnector() throws Exception {
         // create test topic
         connect.kafka().createTopic(TOPIC_NAME, NUM_TOPIC_PARTITIONS);
@@ -140,7 +193,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
         props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
         props.put("throughput", String.valueOf(1));
         props.put("messages.per.poll", String.valueOf(10));
-        props.put(TOPICS_CONFIG, TOPIC_NAME);
+        props.put(TOPIC_CONFIG, TOPIC_NAME);
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
 
@@ -181,7 +234,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
         props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
         props.put("throughput", String.valueOf(1));
         props.put("messages.per.poll", String.valueOf(10));
-        props.put(TOPICS_CONFIG, TOPIC_NAME);
+        props.put(TOPIC_CONFIG, TOPIC_NAME);
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
 
@@ -224,7 +277,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
         props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
         props.put("throughput", String.valueOf(1));
         props.put("messages.per.poll", String.valueOf(10));
-        props.put(TOPICS_CONFIG, TOPIC_NAME);
+        props.put(TOPIC_CONFIG, TOPIC_NAME);
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, 
StringConverter.class.getName());
 

Reply via email to