This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 6780338 MINOR: adding system tests for how streams functions with broker faiures (#4513) 6780338 is described below commit 67803384d9b7959661bb2a32129b03a374507c8a Author: Bill Bejeck <bbej...@gmail.com> AuthorDate: Wed Feb 7 20:21:35 2018 -0500 MINOR: adding system tests for how streams functions with broker faiures (#4513) System test for two cases: * Starting a multi-node streams application with the broker down initially, broker starts and confirm rebalance completes and streams application still able to process records. * Multi-node streams app running, broker goes down, stop stream instance(s) confirm after broker comes back remaining streams instance(s) still function. Reviewers: Guozhang Wang <guozh...@confluent.io>, Matthias J. Sax <matth...@confluent.io> --- .../tests/StreamsBrokerDownResilienceTest.java | 10 +- tests/kafkatest/services/streams.py | 18 +++ .../streams/streams_broker_down_resilience_test.py | 138 ++++++++++++++++++--- 3 files changed, 143 insertions(+), 23 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java index c8462ca..ed4cd27 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java @@ -78,13 +78,16 @@ public class StreamsBrokerDownResilienceTest { System.exit(1); } - final StreamsBuilder builder = new StreamsBuilder(); builder.stream(Collections.singletonList(SOURCE_TOPIC_1), Consumed.with(stringSerde, stringSerde)) .peek(new ForeachAction<String, String>() { + int messagesProcessed = 0; @Override public void apply(String key, String value) { System.out.println("received key " + key + " and value " + value); + messagesProcessed++; + System.out.println("processed" + messagesProcessed + "messages"); + System.out.flush(); } }).to(SINK_TOPIC); @@ -104,8 +107,9 @@ public class StreamsBrokerDownResilienceTest { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { - System.out.println("Shutting down streams now"); - streams.close(10, TimeUnit.SECONDS); + streams.close(30, TimeUnit.SECONDS); + System.out.println("Complete shutdown of streams resilience test app now"); + System.out.flush(); } })); diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 9c4bd87..0f484b4 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -208,9 +208,27 @@ class StreamsBrokerCompatibilityService(StreamsTestBaseService): "org.apache.kafka.streams.tests.BrokerCompatibilityTest", eosEnabled) + class StreamsBrokerDownResilienceService(StreamsTestBaseService): def __init__(self, test_context, kafka, configs): super(StreamsBrokerDownResilienceService, self).__init__(test_context, kafka, "org.apache.kafka.streams.tests.StreamsBrokerDownResilienceTest", configs) + + def start_cmd(self, node): + args = self.args.copy() + args['kafka'] = self.kafka.bootstrap_servers(validate=False) + args['state_dir'] = self.PERSISTENT_ROOT + args['stdout'] = self.STDOUT_FILE + args['stderr'] = self.STDERR_FILE + args['pidfile'] = self.PID_FILE + args['log4j'] = self.LOG4J_CONFIG_FILE + args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node) + + cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \ + "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \ + " %(kafka)s %(state_dir)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \ + " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args + + return cmd diff --git a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py index bd90d9f..7a0560d 100644 --- a/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_down_resilience_test.py @@ -40,61 +40,76 @@ class StreamsBrokerDownResilience(Test): num_nodes=1, zk=self.zk, topics={ - self.inputTopic: {'partitions': 1, 'replication-factor': 1}, + self.inputTopic: {'partitions': 3, 'replication-factor': 1}, self.outputTopic: {'partitions': 1, 'replication-factor': 1} }) - def get_consumer(self): + def get_consumer(self, num_messages): return VerifiableConsumer(self.test_context, 1, self.kafka, self.outputTopic, "stream-broker-resilience-verify-consumer", - max_messages=self.num_messages) + max_messages=num_messages) - def get_producer(self): + def get_producer(self, num_messages): return VerifiableProducer(self.test_context, 1, self.kafka, self.inputTopic, - max_messages=self.num_messages, + max_messages=num_messages, acks=1) - def assert_produce_consume(self, test_state): - producer = self.get_producer() + def assert_produce_consume(self, test_state, num_messages=5): + producer = self.get_producer(num_messages) producer.start() - wait_until(lambda: producer.num_acked > 0, + wait_until(lambda: producer.num_acked >= num_messages, timeout_sec=30, err_msg="At %s failed to send messages " % test_state) - consumer = self.get_consumer() + consumer = self.get_consumer(num_messages) consumer.start() - wait_until(lambda: consumer.total_consumed() > 0, + wait_until(lambda: consumer.total_consumed() >= num_messages, timeout_sec=60, err_msg="At %s streams did not process messages in 60 seconds " % test_state) - def setUp(self): - self.zk.start() - - def test_streams_resilient_to_broker_down(self): - self.kafka.start() - + @staticmethod + def get_configs(extra_configs=""): # Consumer max.poll.interval > min(max.block.ms, ((retries + 1) * request.timeout) consumer_poll_ms = "consumer.max.poll.interval.ms=50000" retries_config = "producer.retries=2" request_timeout = "producer.request.timeout.ms=15000" max_block_ms = "producer.max.block.ms=30000" + # java code expects configs in key=value,key=value format + updated_configs = consumer_poll_ms + "," + retries_config + "," + request_timeout + "," + max_block_ms + extra_configs + + return updated_configs + + def wait_for_verification(self, processor, message, file, num_lines=1): + wait_until(lambda: self.verify_from_file(processor, message, file) >= num_lines, + timeout_sec=60, + err_msg="Did expect to read '%s' from %s" % (message, processor.node.account)) + + @staticmethod + def verify_from_file(processor, message, file): + result = processor.node.account.ssh_output("grep '%s' %s | wc -l" % (message, file), allow_fail=False) + return int(result) + + + def setUp(self): + self.zk.start() + + def test_streams_resilient_to_broker_down(self): + self.kafka.start() + # Broker should be down over 2x of retries * timeout ms # So with (2 * 15000) = 30 seconds, we'll set downtime to 70 seconds broker_down_time_in_seconds = 70 - # java code expects configs in key=value,key=value format - updated_configs = consumer_poll_ms + "," + retries_config + "," + request_timeout + "," + max_block_ms - - processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, updated_configs) + processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, self.get_configs()) processor.start() # until KIP-91 is merged we'll only send 5 messages to assert Kafka Streams is running before taking the broker down @@ -112,3 +127,86 @@ class StreamsBrokerDownResilience(Test): self.assert_produce_consume("after_broker_stop") self.kafka.stop() + + def test_streams_runs_with_broker_down_initially(self): + self.kafka.start() + node = self.kafka.leader(self.inputTopic) + self.kafka.stop_node(node) + + configs = self.get_configs(extra_configs=",application.id=starting_wo_broker_id") + + # start streams with broker down initially + processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) + processor.start() + + processor_2 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) + processor_2.start() + + processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) + processor_3.start() + + broker_unavailable_message = "Broker may not be available" + + # verify streams instances unable to connect to broker, kept trying + self.wait_for_verification(processor, broker_unavailable_message, processor.LOG_FILE, 100) + self.wait_for_verification(processor_2, broker_unavailable_message, processor_2.LOG_FILE, 100) + self.wait_for_verification(processor_3, broker_unavailable_message, processor_3.LOG_FILE, 100) + + # now start broker + self.kafka.start_node(node) + + # assert streams can process when starting with broker down + self.assert_produce_consume("running_with_broker_down_initially", num_messages=9) + + message = "processed3messages" + # need to show all 3 instances processed messages + self.wait_for_verification(processor, message, processor.STDOUT_FILE) + self.wait_for_verification(processor_2, message, processor_2.STDOUT_FILE) + self.wait_for_verification(processor_3, message, processor_3.STDOUT_FILE) + + self.kafka.stop() + + def test_streams_should_scale_in_while_brokers_down(self): + self.kafka.start() + + configs = self.get_configs(extra_configs=",application.id=shutdown_with_broker_down") + + processor = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) + processor.start() + + processor_2 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) + processor_2.start() + + processor_3 = StreamsBrokerDownResilienceService(self.test_context, self.kafka, configs) + processor_3.start() + + # need to wait for rebalance once + self.wait_for_verification(processor_3, "State transition from REBALANCING to RUNNING", processor_3.LOG_FILE) + + # assert streams can process when starting with broker down + self.assert_produce_consume("waiting for rebalance to complete", num_messages=9) + + message = "processed3messages" + + self.wait_for_verification(processor, message, processor.STDOUT_FILE) + self.wait_for_verification(processor_2, message, processor_2.STDOUT_FILE) + self.wait_for_verification(processor_3, message, processor_3.STDOUT_FILE) + + node = self.kafka.leader(self.inputTopic) + self.kafka.stop_node(node) + + processor.stop() + processor_2.stop() + + shutdown_message = "Complete shutdown of streams resilience test app now" + self.wait_for_verification(processor, shutdown_message, processor.STDOUT_FILE) + self.wait_for_verification(processor_2, shutdown_message, processor_2.STDOUT_FILE) + + self.kafka.start_node(node) + + self.assert_produce_consume("sending_message_after_stopping_streams_instance_bouncing_broker", num_messages=9) + + self.wait_for_verification(processor_3, "processed9messages", processor_3.STDOUT_FILE) + + self.kafka.stop() + -- To stop receiving notification emails like this one, please contact guozh...@apache.org.