Repository: kafka
Updated Branches:
  refs/heads/trunk 2802bd081 -> 969d0cb0a


KAFKA-2826: Make Kafka Connect ducktape services easier to extend.

Author: Ewen Cheslack-Postava <m...@ewencp.org>

Reviewers: Gwen Shapira

Closes #522 from ewencp/kafka-2826-extensible-connect-services


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/969d0cb0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/969d0cb0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/969d0cb0

Branch: refs/heads/trunk
Commit: 969d0cb0ae316ba0dfdb34ed096bfd56fe86ad92
Parents: 2802bd0
Author: Ewen Cheslack-Postava <m...@ewencp.org>
Authored: Thu Nov 12 18:54:20 2015 -0800
Committer: Gwen Shapira <csh...@gmail.com>
Committed: Thu Nov 12 18:54:20 2015 -0800

----------------------------------------------------------------------
 tests/kafkatest/services/connect.py | 23 +++++++++++++++--------
 1 file changed, 15 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/969d0cb0/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py 
b/tests/kafkatest/services/connect.py
index 26feb99..a6e902f 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -151,6 +151,13 @@ class ConnectStandaloneService(ConnectServiceBase):
     def node(self):
         return self.nodes[0]
 
+    def start_cmd(self, node, connector_configs):
+        cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " 
% self.LOG4J_CONFIG_FILE
+        cmd += "/opt/%s/bin/connect-standalone.sh %s " % (kafka_dir(node), 
self.CONFIG_FILE)
+        cmd += " ".join(connector_configs)
+        cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, 
self.STDERR_FILE, self.PID_FILE)
+        return cmd
+
     def start_node(self, node):
         node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, 
allow_fail=False)
 
@@ -164,10 +171,7 @@ class ConnectStandaloneService(ConnectServiceBase):
 
         self.logger.info("Starting Kafka Connect standalone process on " + 
str(node.account))
         with node.account.monitor_log(self.LOG_FILE) as monitor:
-            node.account.ssh("( export 
KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE +
-                             "/opt/%s/bin/connect-standalone.sh %s " % 
(kafka_dir(node), self.CONFIG_FILE) +
-                             " ".join(remote_connector_configs) +
-                             (" & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % 
(self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE)))
+            node.account.ssh(self.start_cmd(node, remote_connector_configs))
             monitor.wait_until('Kafka Connect started', timeout_sec=15, 
err_msg="Never saw message indicating Kafka Connect finished startup on " + 
str(node.account))
 
         if len(self.pids(node)) == 0:
@@ -182,6 +186,12 @@ class ConnectDistributedService(ConnectServiceBase):
         self.offsets_topic = offsets_topic
         self.configs_topic = configs_topic
 
+    def start_cmd(self, node):
+        cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " 
% self.LOG4J_CONFIG_FILE
+        cmd += "/opt/%s/bin/connect-distributed.sh %s " % (kafka_dir(node), 
self.CONFIG_FILE)
+        cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, 
self.STDERR_FILE, self.PID_FILE)
+        return cmd
+
     def start_node(self, node):
         node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, 
allow_fail=False)
 
@@ -192,10 +202,7 @@ class ConnectDistributedService(ConnectServiceBase):
 
         self.logger.info("Starting Kafka Connect distributed process on " + 
str(node.account))
         with node.account.monitor_log(self.LOG_FILE) as monitor:
-            cmd = "( export 
KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
-            cmd += "/opt/%s/bin/connect-distributed.sh %s " % 
(kafka_dir(node), self.CONFIG_FILE)
-            cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, 
self.STDERR_FILE, self.PID_FILE)
-            node.account.ssh(cmd)
+            node.account.ssh(self.start_cmd(node))
             monitor.wait_until('Kafka Connect started', timeout_sec=15, 
err_msg="Never saw message indicating Kafka Connect finished startup on " + 
str(node.account))
 
         if len(self.pids(node)) == 0:

Reply via email to