Repository: kafka Updated Branches: refs/heads/trunk 2802bd081 -> 969d0cb0a
KAFKA-2826: Make Kafka Connect ducktape services easier to extend. Author: Ewen Cheslack-Postava <m...@ewencp.org> Reviewers: Gwen Shapira Closes #522 from ewencp/kafka-2826-extensible-connect-services Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/969d0cb0 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/969d0cb0 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/969d0cb0 Branch: refs/heads/trunk Commit: 969d0cb0ae316ba0dfdb34ed096bfd56fe86ad92 Parents: 2802bd0 Author: Ewen Cheslack-Postava <m...@ewencp.org> Authored: Thu Nov 12 18:54:20 2015 -0800 Committer: Gwen Shapira <csh...@gmail.com> Committed: Thu Nov 12 18:54:20 2015 -0800 ---------------------------------------------------------------------- tests/kafkatest/services/connect.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/969d0cb0/tests/kafkatest/services/connect.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py index 26feb99..a6e902f 100644 --- a/tests/kafkatest/services/connect.py +++ b/tests/kafkatest/services/connect.py @@ -151,6 +151,13 @@ class ConnectStandaloneService(ConnectServiceBase): def node(self): return self.nodes[0] + def start_cmd(self, node, connector_configs): + cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE + cmd += "/opt/%s/bin/connect-standalone.sh %s " % (kafka_dir(node), self.CONFIG_FILE) + cmd += " ".join(connector_configs) + cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE) + return cmd + def start_node(self, node): node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False) @@ -164,10 +171,7 @@ class ConnectStandaloneService(ConnectServiceBase): self.logger.info("Starting Kafka Connect standalone process on " + str(node.account)) with node.account.monitor_log(self.LOG_FILE) as monitor: - node.account.ssh("( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE + - "/opt/%s/bin/connect-standalone.sh %s " % (kafka_dir(node), self.CONFIG_FILE) + - " ".join(remote_connector_configs) + - (" & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE))) + node.account.ssh(self.start_cmd(node, remote_connector_configs)) monitor.wait_until('Kafka Connect started', timeout_sec=15, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account)) if len(self.pids(node)) == 0: @@ -182,6 +186,12 @@ class ConnectDistributedService(ConnectServiceBase): self.offsets_topic = offsets_topic self.configs_topic = configs_topic + def start_cmd(self, node): + cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE + cmd += "/opt/%s/bin/connect-distributed.sh %s " % (kafka_dir(node), self.CONFIG_FILE) + cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE) + return cmd + def start_node(self, node): node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False) @@ -192,10 +202,7 @@ class ConnectDistributedService(ConnectServiceBase): self.logger.info("Starting Kafka Connect distributed process on " + str(node.account)) with node.account.monitor_log(self.LOG_FILE) as monitor: - cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE - cmd += "/opt/%s/bin/connect-distributed.sh %s " % (kafka_dir(node), self.CONFIG_FILE) - cmd += " & echo $! >&3 ) 1>> %s 2>> %s 3> %s" % (self.STDOUT_FILE, self.STDERR_FILE, self.PID_FILE) - node.account.ssh(cmd) + node.account.ssh(self.start_cmd(node)) monitor.wait_until('Kafka Connect started', timeout_sec=15, err_msg="Never saw message indicating Kafka Connect finished startup on " + str(node.account)) if len(self.pids(node)) == 0: