Repository: kafka Updated Branches: refs/heads/trunk 6acd37720 -> 1d2ae89c5
KAFKA-2439; Add MirrorMaker service class for system tests Added MirrorMaker service and a few corresponding sanity checks, as well as necessary config template files. A few additional updates to accomodate the change in wait_until from ducktape0.2.0->0.3.0 Author: Geoff Anderson <ge...@confluent.io> Reviewers: Ewen Cheslack-Postava, Gwen Shapira Closes #148 from granders/KAFKA-2439 and squashes the following commits: c7c3ebd [Geoff Anderson] MirrorMaker now can run as multi-node service. Added kill -9 to various clean_node methods. 1e806f2 [Geoff Anderson] Various cleanups per review. 1b4b049 [Geoff Anderson] Added MirrorMaker service and a few corresponding sanity checks, as well as necessary config template files. A few additional updates to accomodate the change in wait_until from ducktape0.2.0->0.3.0 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d2ae89c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d2ae89c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d2ae89c Branch: refs/heads/trunk Commit: 1d2ae89c5a1dc5d18b8188bf737a8e1d195be325 Parents: 6acd377 Author: Geoff Anderson <ge...@confluent.io> Authored: Sat Aug 22 19:23:36 2015 -0700 Committer: Gwen Shapira <csh...@gmail.com> Committed: Sat Aug 22 19:23:36 2015 -0700 ---------------------------------------------------------------------- .../sanity_checks/test_console_consumer.py | 12 +- .../sanity_checks/test_mirror_maker.py | 90 ++++++++++ tests/kafkatest/services/console_consumer.py | 17 +- tests/kafkatest/services/kafka.py | 1 + tests/kafkatest/services/mirror_maker.py | 165 +++++++++++++++++++ .../templates/console_consumer.properties | 4 +- .../templates/console_consumer_log4j.properties | 26 --- .../services/templates/consumer.properties | 23 +++ .../services/templates/kafka.properties | 80 --------- .../services/templates/producer.properties | 28 ++++ .../services/templates/tools_log4j.properties | 26 +++ tests/kafkatest/services/verifiable_producer.py | 4 + tests/kafkatest/services/zookeeper.py | 16 ++ tests/kafkatest/tests/replication_test.py | 8 +- tests/setup.py | 2 +- 15 files changed, 379 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/sanity_checks/test_console_consumer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py index cd8c8f9..3e523e1 100644 --- a/tests/kafkatest/sanity_checks/test_console_consumer.py +++ b/tests/kafkatest/sanity_checks/test_console_consumer.py @@ -61,20 +61,20 @@ class ConsoleConsumerTest(Test): self.consumer.start() node = self.consumer.nodes[0] - if not wait_until(lambda: self.consumer.alive(node), timeout_sec=10, backoff_sec=.2): - raise Exception("Consumer was too slow to start") + wait_until(lambda: self.consumer.alive(node), + timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start") self.logger.info("consumer started in %s seconds " % str(time.time() - t0)) # Verify that log output is happening - if not wait_until(lambda: file_exists(node, ConsoleConsumer.LOG_FILE), timeout_sec=10): - raise Exception("Timed out waiting for log file to exist") + wait_until(lambda: file_exists(node, ConsoleConsumer.LOG_FILE), timeout_sec=10, + err_msg="Timed out waiting for logging to start.") assert line_count(node, ConsoleConsumer.LOG_FILE) > 0 # Verify no consumed messages assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0 self.consumer.stop_node(node) - if not wait_until(lambda: not self.consumer.alive(node), timeout_sec=10, backoff_sec=.2): - raise Exception("Took too long for consumer to die.") + + http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/sanity_checks/test_mirror_maker.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/sanity_checks/test_mirror_maker.py b/tests/kafkatest/sanity_checks/test_mirror_maker.py new file mode 100644 index 0000000..3481d7a --- /dev/null +++ b/tests/kafkatest/sanity_checks/test_mirror_maker.py @@ -0,0 +1,90 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ducktape.tests.test import Test + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.mirror_maker import MirrorMaker + + +class TestMirrorMakerService(Test): + """Sanity checks on mirror maker service class.""" + def __init__(self, test_context): + super(TestMirrorMakerService, self).__init__(test_context) + + self.topic = "topic" + self.source_zk = ZookeeperService(test_context, num_nodes=1) + self.target_zk = ZookeeperService(test_context, num_nodes=1) + + self.source_kafka = KafkaService(test_context, num_nodes=1, zk=self.source_zk, + topics={self.topic: {"partitions": 1, "replication-factor": 1}}) + self.target_kafka = KafkaService(test_context, num_nodes=1, zk=self.target_zk, + topics={self.topic: {"partitions": 1, "replication-factor": 1}}) + + self.num_messages = 1000 + # This will produce to source kafka cluster + self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.source_kafka, topic=self.topic, + max_messages=self.num_messages, throughput=1000) + + # Use a regex whitelist to check that the start command is well-formed in this case + self.mirror_maker = MirrorMaker(test_context, num_nodes=1, source=self.source_kafka, target=self.target_kafka, + whitelist=".*", consumer_timeout_ms=2000) + + # This will consume from target kafka cluster + self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.target_kafka, topic=self.topic, + consumer_timeout_ms=1000) + + def setUp(self): + # Source cluster + self.source_zk.start() + self.source_kafka.start() + + # Target cluster + self.target_zk.start() + self.target_kafka.start() + + def test_end_to_end(self): + """ + Test end-to-end behavior under non-failure conditions. + + Setup: two single node Kafka clusters, each connected to its own single node zookeeper cluster. + One is source, and the other is target. Single-node mirror maker mirrors from source to target. + + - Start mirror maker. + - Produce a small number of messages to the source cluster. + - Consume messages from target. + - Verify that number of consumed messages matches the number produced. + """ + self.mirror_maker.start() + # Check that consumer_timeout_ms setting made it to config file + self.mirror_maker.nodes[0].account.ssh( + "grep \"consumer\.timeout\.ms\" %s" % MirrorMaker.CONSUMER_CONFIG, allow_fail=False) + + self.producer.start() + self.producer.wait() + self.consumer.start() + self.consumer.wait() + + num_consumed = len(self.consumer.messages_consumed[1]) + num_produced = self.producer.num_acked + assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages) + assert num_produced == num_consumed, "num_produced: %d, num_consumed: %d" % (num_produced, num_consumed) + + self.mirror_maker.stop() + http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/console_consumer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 18c9f63..ffde6a2 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -14,8 +14,10 @@ # limitations under the License. from ducktape.services.background_thread import BackgroundThreadService +from ducktape.utils.util import wait_until import os +import subprocess def is_int(msg): @@ -91,7 +93,7 @@ class ConsoleConsumer(BackgroundThreadService): "collect_default": True} } - def __init__(self, context, num_nodes, kafka, topic, message_validator=is_int, from_beginning=True, consumer_timeout_ms=None): + def __init__(self, context, num_nodes, kafka, topic, message_validator=None, from_beginning=True, consumer_timeout_ms=None): """ Args: context: standard context @@ -141,7 +143,7 @@ class ConsoleConsumer(BackgroundThreadService): cmd = "ps ax | grep -i console_consumer | grep java | grep -v grep | awk '{print $1}'" pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] return pid_arr - except: + except (subprocess.CalledProcessError, ValueError) as e: return [] def alive(self, node): @@ -161,7 +163,7 @@ class ConsoleConsumer(BackgroundThreadService): node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file) # Create and upload log properties - log_config = self.render('console_consumer_log4j.properties', log_file=ConsoleConsumer.LOG_FILE) + log_config = self.render('tools_log4j.properties', log_file=ConsoleConsumer.LOG_FILE) node.account.create_file(ConsoleConsumer.LOG4J_CONFIG, log_config) # Run and capture output @@ -169,7 +171,8 @@ class ConsoleConsumer(BackgroundThreadService): self.logger.debug("Console consumer %d command: %s", idx, cmd) for line in node.account.ssh_capture(cmd, allow_fail=False): msg = line.strip() - msg = self.message_validator(msg) + if self.message_validator is not None: + msg = self.message_validator(msg) if msg is not None: self.logger.debug("consumed a message: " + str(msg)) self.messages_consumed[idx].append(msg) @@ -179,7 +182,13 @@ class ConsoleConsumer(BackgroundThreadService): def stop_node(self, node): node.account.kill_process("java", allow_fail=True) + wait_until(lambda: not self.alive(node), timeout_sec=10, backoff_sec=.2, + err_msg="Timed out waiting for consumer to stop.") def clean_node(self, node): + if self.alive(node): + self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." % + (self.__class__.__name__, node.account)) + node.account.kill_process("java", clean_shutdown=False, allow_fail=True) node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False) http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/kafka.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/kafka.py b/tests/kafkatest/services/kafka.py index 34ec5ef..76f9cf6 100644 --- a/tests/kafkatest/services/kafka.py +++ b/tests/kafkatest/services/kafka.py @@ -93,6 +93,7 @@ class KafkaService(Service): node.account.ssh("rm -f /mnt/kafka.pid", allow_fail=False) def clean_node(self, node): + node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True) node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid", allow_fail=False) def create_topic(self, topic_cfg): http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/mirror_maker.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py new file mode 100644 index 0000000..afbed13 --- /dev/null +++ b/tests/kafkatest/services/mirror_maker.py @@ -0,0 +1,165 @@ + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.services.service import Service +from ducktape.utils.util import wait_until + +import os +import subprocess + +""" +0.8.2.1 MirrorMaker options + +Option Description +------ ----------- +--abort.on.send.failure <Stop the Configure the mirror maker to exit on + entire mirror maker when a send a failed send. (default: true) + failure occurs> +--blacklist <Java regex (String)> Blacklist of topics to mirror. +--consumer.config <config file> Embedded consumer config for consuming + from the source cluster. +--consumer.rebalance.listener <A The consumer rebalance listener to use + custom rebalance listener of type for mirror maker consumer. + ConsumerRebalanceListener> +--help Print this message. +--message.handler <A custom message Message handler which will process + handler of type every record in-between consumer and + MirrorMakerMessageHandler> producer. +--message.handler.args <Arguments Arguments used by custom rebalance + passed to message handler listener for mirror maker consumer + constructor.> +--num.streams <Integer: Number of Number of consumption streams. + threads> (default: 1) +--offset.commit.interval.ms <Integer: Offset commit interval in ms (default: + offset commit interval in 60000) + millisecond> +--producer.config <config file> Embedded producer config. +--rebalance.listener.args <Arguments Arguments used by custom rebalance + passed to custom rebalance listener listener for mirror maker consumer + constructor as a string.> +--whitelist <Java regex (String)> Whitelist of topics to mirror. +""" + + +class MirrorMaker(Service): + + # Root directory for persistent output + PERSISTENT_ROOT = "/mnt/mirror_maker" + LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs") + LOG_FILE = os.path.join(LOG_DIR, "mirror_maker.log") + LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") + PRODUCER_CONFIG = os.path.join(PERSISTENT_ROOT, "producer.properties") + CONSUMER_CONFIG = os.path.join(PERSISTENT_ROOT, "consumer.properties") + KAFKA_HOME = "/opt/kafka/" + + logs = { + "mirror_maker_log": { + "path": LOG_FILE, + "collect_default": True} + } + + def __init__(self, context, num_nodes, source, target, whitelist=None, blacklist=None, num_streams=1, consumer_timeout_ms=None): + """ + MirrorMaker mirrors messages from one or more source clusters to a single destination cluster. + + Args: + context: standard context + source: source Kafka cluster + target: target Kafka cluster to which data will be mirrored + whitelist: whitelist regex for topics to mirror + blacklist: blacklist regex for topics not to mirror + num_streams: number of consumer threads to create; can be a single int, or a list with + one value per node, allowing num_streams to be the same for each node, + or configured independently per-node + consumer_timeout_ms: consumer stops if t > consumer_timeout_ms elapses between consecutive messages + """ + super(MirrorMaker, self).__init__(context, num_nodes=num_nodes) + + self.consumer_timeout_ms = consumer_timeout_ms + self.num_streams = num_streams + if not isinstance(num_streams, int): + # if not an integer, num_streams should be configured per-node + assert len(num_streams) == num_nodes + self.whitelist = whitelist + self.blacklist = blacklist + self.source = source + self.target = target + + def start_cmd(self, node): + cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR + cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG + cmd += " %s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % MirrorMaker.KAFKA_HOME + cmd += " --consumer.config %s" % MirrorMaker.CONSUMER_CONFIG + cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG + if isinstance(self.num_streams, int): + cmd += " --num.streams %d" % self.num_streams + else: + # config num_streams separately on each node + cmd += " --num.streams %d" % self.num_streams[self.idx(node) - 1] + if self.whitelist is not None: + cmd += " --whitelist=\"%s\"" % self.whitelist + if self.blacklist is not None: + cmd += " --blacklist=\"%s\"" % self.blacklist + cmd += " 1>> %s 2>> %s &" % (MirrorMaker.LOG_FILE, MirrorMaker.LOG_FILE) + return cmd + + def pids(self, node): + try: + cmd = "ps ax | grep -i MirrorMaker | grep java | grep -v grep | awk '{print $1}'" + pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] + return pid_arr + except (subprocess.CalledProcessError, ValueError) as e: + return [] + + def alive(self, node): + return len(self.pids(node)) > 0 + + def start_node(self, node): + node.account.ssh("mkdir -p %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False) + node.account.ssh("mkdir -p %s" % MirrorMaker.LOG_DIR, allow_fail=False) + + # Create, upload one consumer config file for source cluster + consumer_props = self.render('consumer.properties', zookeeper_connect=self.source.zk.connect_setting()) + node.account.create_file(MirrorMaker.CONSUMER_CONFIG, consumer_props) + + # Create, upload producer properties file for target cluster + producer_props = self.render('producer.properties', broker_list=self.target.bootstrap_servers(), + producer_type="async") + node.account.create_file(MirrorMaker.PRODUCER_CONFIG, producer_props) + + # Create and upload log properties + log_config = self.render('tools_log4j.properties', log_file=MirrorMaker.LOG_FILE) + node.account.create_file(MirrorMaker.LOG4J_CONFIG, log_config) + + # Run mirror maker + cmd = self.start_cmd(node) + self.logger.debug("Mirror maker command: %s", cmd) + node.account.ssh(cmd, allow_fail=False) + wait_until(lambda: self.alive(node), timeout_sec=10, backoff_sec=.5, + err_msg="Mirror maker took to long to start.") + self.logger.debug("Mirror maker is alive") + + def stop_node(self, node): + node.account.kill_process("java", allow_fail=True) + wait_until(lambda: not self.alive(node), timeout_sec=10, backoff_sec=.5, + err_msg="Mirror maker took to long to stop.") + + def clean_node(self, node): + if self.alive(node): + self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." % + (self.__class__.__name__, node.account)) + node.account.kill_process("java", clean_shutdown=False, allow_fail=True) + node.account.ssh("rm -rf %s" % MirrorMaker.PERSISTENT_ROOT, allow_fail=False) http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/templates/console_consumer.properties ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/templates/console_consumer.properties b/tests/kafkatest/services/templates/console_consumer.properties index 944c2c9..7143179 100644 --- a/tests/kafkatest/services/templates/console_consumer.properties +++ b/tests/kafkatest/services/templates/console_consumer.properties @@ -14,6 +14,6 @@ # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults -{% if consumer_timeout_ms is not none %} +{% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %} consumer.timeout.ms={{ consumer_timeout_ms }} -{% endif %} \ No newline at end of file +{% endif %} http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/templates/console_consumer_log4j.properties ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/templates/console_consumer_log4j.properties b/tests/kafkatest/services/templates/console_consumer_log4j.properties deleted file mode 100644 index e63e6d6..0000000 --- a/tests/kafkatest/services/templates/console_consumer_log4j.properties +++ /dev/null @@ -1,26 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Define the root logger with appender file -log4j.rootLogger = INFO, FILE - -log4j.appender.FILE=org.apache.log4j.FileAppender -log4j.appender.FILE.File={{ log_file }} -log4j.appender.FILE.ImmediateFlush=true -log4j.appender.FILE.Threshold=debug -# Set the append to false, overwrite -log4j.appender.FILE.Append=false -log4j.appender.FILE.layout=org.apache.log4j.PatternLayout -log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/templates/consumer.properties ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/templates/consumer.properties b/tests/kafkatest/services/templates/consumer.properties new file mode 100644 index 0000000..b8723d1 --- /dev/null +++ b/tests/kafkatest/services/templates/consumer.properties @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.consumer.ConsumerConfig for more details + +zookeeper.connect={{ zookeeper_connect }} +zookeeper.connection.timeout.ms={{ zookeeper_connection_timeout_ms|default(6000) }} +group.id={{ group_id|default('test-consumer-group') }} + +{% if consumer_timeout_ms is defined and consumer_timeout_ms is not none %} +consumer.timeout.ms={{ consumer_timeout_ms }} +{% endif %} http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/templates/kafka.properties ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/templates/kafka.properties b/tests/kafkatest/services/templates/kafka.properties index db1077a..6650d23 100644 --- a/tests/kafkatest/services/templates/kafka.properties +++ b/tests/kafkatest/services/templates/kafka.properties @@ -14,108 +14,28 @@ # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults -############################# Server Basics ############################# -# The id of the broker. This must be set to a unique integer for each broker. broker.id={{ broker_id }} - -############################# Socket Server Settings ############################# - -# The port the socket server listens on port=9092 - -# Hostname the broker will bind to. If not set, the server will bind to all interfaces #host.name=localhost - -# Hostname the broker will advertise to producers and consumers. If not set, it uses the -# value for "host.name" if configured. Otherwise, it will use the value returned from -# java.net.InetAddress.getCanonicalHostName(). advertised.host.name={{ node.account.hostname }} - -# The port to publish to ZooKeeper for clients to use. If this is not set, -# it will publish the same port that the broker binds to. #advertised.port=<port accessible by clients> - -# The number of threads handling network requests num.network.threads=3 - -# The number of threads doing disk I/O num.io.threads=8 - -# The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 - -# The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=65536 - -# The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 - -############################# Log Basics ############################# - -# A comma seperated list of directories under which to store log files log.dirs=/mnt/kafka-logs - -# The default number of log partitions per topic. More partitions allow greater -# parallelism for consumption, but this will also result in more files across -# the brokers. num.partitions=1 - -# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. -# This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 - -############################# Log Flush Policy ############################# - -# Messages are immediately written to the filesystem but by default we only fsync() to sync -# the OS cache lazily. The following configurations control the flush of data to disk. -# There are a few important trade-offs here: -# 1. Durability: Unflushed data may be lost if you are not using replication. -# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. -# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. -# The settings below allow one to configure the flush policy to flush data after a period of time or -# every N messages (or both). This can be done globally and overridden on a per-topic basis. - -# The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 - -# The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 - -############################# Log Retention Policy ############################# - -# The following configurations control the disposal of log segments. The policy can -# be set to delete segments after a period of time, or after a given size has accumulated. -# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens -# from the end of the log. - -# The minimum age of a log file to be eligible for deletion log.retention.hours=168 - -# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining -# segments don't drop below log.retention.bytes. #log.retention.bytes=1073741824 - -# The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 - -# The interval at which log segments are checked to see if they can be deleted according -# to the retention policies log.retention.check.interval.ms=300000 - -# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. -# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. log.cleaner.enable=false -############################# Zookeeper ############################# - -# Zookeeper connection string (see zookeeper docs for details). -# This is a comma separated host:port pairs, each corresponding to a zk -# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". -# You can also append an optional chroot string to the urls to specify the -# root directory for all kafka znodes. zookeeper.connect={{ zk.connect_setting() }} - -# Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=2000 http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/templates/producer.properties ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/templates/producer.properties b/tests/kafkatest/services/templates/producer.properties new file mode 100644 index 0000000..ede60c8 --- /dev/null +++ b/tests/kafkatest/services/templates/producer.properties @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.producer.ProducerConfig for more details + +metadata.broker.list={{ broker_list }} +bootstrap.servers = {{ broker_list }} +producer.type={{ producer_type }} # sync or async +compression.codec=none +serializer.class=kafka.serializer.DefaultEncoder + +#partitioner.class= +#compressed.topics= +#queue.buffering.max.ms= +#queue.buffering.max.messages= +#queue.enqueue.timeout.ms= +#batch.num.messages= http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/templates/tools_log4j.properties ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/templates/tools_log4j.properties b/tests/kafkatest/services/templates/tools_log4j.properties new file mode 100644 index 0000000..e63e6d6 --- /dev/null +++ b/tests/kafkatest/services/templates/tools_log4j.properties @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Define the root logger with appender file +log4j.rootLogger = INFO, FILE + +log4j.appender.FILE=org.apache.log4j.FileAppender +log4j.appender.FILE.File={{ log_file }} +log4j.appender.FILE.ImmediateFlush=true +log4j.appender.FILE.Threshold=debug +# Set the append to false, overwrite +log4j.appender.FILE.Append=false +log4j.appender.FILE.layout=org.apache.log4j.PatternLayout +log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/verifiable_producer.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index cca8227..158db7a 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -89,12 +89,16 @@ class VerifiableProducer(BackgroundThreadService): def stop_node(self, node): node.account.kill_process("VerifiableProducer", allow_fail=False) + if self.worker_threads is None: + return + # block until the corresponding thread exits if len(self.worker_threads) >= self.idx(node): # Need to guard this because stop is preemptively called before the worker threads are added and started self.worker_threads[self.idx(node) - 1].join() def clean_node(self, node): + node.account.kill_process("VerifiableProducer", clean_shutdown=False, allow_fail=False) node.account.ssh("rm -rf /mnt/producer.log", allow_fail=False) def try_parse_json(self, string): http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/services/zookeeper.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py index 56f4606..09bec35 100644 --- a/tests/kafkatest/services/zookeeper.py +++ b/tests/kafkatest/services/zookeeper.py @@ -16,6 +16,7 @@ from ducktape.services.service import Service +import subprocess import time @@ -51,6 +52,17 @@ class ZookeeperService(Service): time.sleep(5) # give it some time to start + def pids(self, node): + try: + cmd = "ps ax | grep -i zookeeper | grep java | grep -v grep | awk '{print $1}'" + pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] + return pid_arr + except (subprocess.CalledProcessError, ValueError) as e: + return [] + + def alive(self, node): + return len(self.pids(node)) > 0 + def stop_node(self, node): idx = self.idx(node) self.logger.info("Stopping %s node %d on %s" % (type(self).__name__, idx, node.account.hostname)) @@ -58,6 +70,10 @@ class ZookeeperService(Service): def clean_node(self, node): self.logger.info("Cleaning ZK node %d on %s", self.idx(node), node.account.hostname) + if self.alive(node): + self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." % + (self.__class__.__name__, node.account)) + node.account.kill_process("zookeeper", clean_shutdown=False, allow_fail=True) node.account.ssh("rm -rf /mnt/zookeeper /mnt/zookeeper.properties /mnt/zk.log", allow_fail=False) def connect_setting(self): http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/kafkatest/tests/replication_test.py ---------------------------------------------------------------------- diff --git a/tests/kafkatest/tests/replication_test.py b/tests/kafkatest/tests/replication_test.py index fed1ea1..755fb42 100644 --- a/tests/kafkatest/tests/replication_test.py +++ b/tests/kafkatest/tests/replication_test.py @@ -19,7 +19,7 @@ from ducktape.utils.util import wait_until from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService from kafkatest.services.verifiable_producer import VerifiableProducer -from kafkatest.services.console_consumer import ConsoleConsumer +from kafkatest.services.console_consumer import ConsoleConsumer, is_int import signal import time @@ -76,12 +76,12 @@ class ReplicationTest(Test): """ self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput) - self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=3000) + self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, consumer_timeout_ms=3000, message_validator=is_int) # Produce in a background thread while driving broker failures self.producer.start() - if not wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5): - raise RuntimeError("Producer failed to start in a reasonable amount of time.") + wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5, + err_msg="Producer failed to start in a reasonable amount of time.") failure() self.producer.stop() http://git-wip-us.apache.org/repos/asf/kafka/blob/1d2ae89c/tests/setup.py ---------------------------------------------------------------------- diff --git a/tests/setup.py b/tests/setup.py index 5ce4bb7..a2fa71a 100644 --- a/tests/setup.py +++ b/tests/setup.py @@ -23,5 +23,5 @@ setup(name="kafkatest", platforms=["any"], license="apache2.0", packages=find_packages(), - requires=["ducktape(>=0.2.0)"] + requires=["ducktape(==0.3.0)"] )