This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new d28c534 KAFKA-7515: Trogdor - Add Consumer Group Benchmark Specification (#5810) d28c534 is described below commit d28c5348197256db09b59d1ebbfe7db9d3934f47 Author: Stanislav Kozlovski <stanislav_kozlov...@outlook.com> AuthorDate: Mon Oct 29 19:51:07 2018 +0200 KAFKA-7515: Trogdor - Add Consumer Group Benchmark Specification (#5810) This ConsumeBenchWorker now supports using consumer groups. The groups may be either used to store offsets, or as subscriptions. --- TROGDOR.md | 3 +- tests/bin/trogdor-run-consume-bench.sh | 7 +- ...bench_workload.py => consume_bench_workload.py} | 27 +++-- .../services/trogdor/produce_bench_workload.py | 6 +- tests/kafkatest/tests/core/consume_bench_test.py | 132 +++++++++++++++++++++ tests/kafkatest/tests/core/produce_bench_test.py | 2 + .../kafka/trogdor/workload/ConsumeBenchSpec.java | 122 +++++++++++++++++-- .../kafka/trogdor/workload/ConsumeBenchWorker.java | 94 +++++++++++---- .../apache/kafka/trogdor/workload/TopicsSpec.java | 7 +- .../trogdor/workload/ConsumeBenchSpecTest.java | 78 ++++++++++++ 10 files changed, 419 insertions(+), 59 deletions(-) diff --git a/TROGDOR.md b/TROGDOR.md index 3783d7e..d71455a 100644 --- a/TROGDOR.md +++ b/TROGDOR.md @@ -141,7 +141,8 @@ ProduceBench starts a Kafka producer on a single agent node, producing to severa RoundTripWorkload tests both production and consumption. The workload starts a Kafka producer and consumer on a single node. The consumer will read back the messages that were produced by the producer. ### ConsumeBench -ConsumeBench starts a Kafka consumer on a single agent node. The workload measures the average produce latency, as well as the median, 95th percentile, and 99th percentile latency. +ConsumeBench starts a Kafka consumer on a single agent node. Depending on the passed in configuration (see ConsumeBenchSpec), the consumer either subscribes to a set of topics (leveraging consumer group functionality) or manually assigns partitions to itself. +The workload measures the average produce latency, as well as the median, 95th percentile, and 99th percentile latency. Faults ======================================== diff --git a/tests/bin/trogdor-run-consume-bench.sh b/tests/bin/trogdor-run-consume-bench.sh index 2e0239e..be9a2f1 100755 --- a/tests/bin/trogdor-run-consume-bench.sh +++ b/tests/bin/trogdor-run-consume-bench.sh @@ -26,12 +26,7 @@ cat <<EOF "consumerNode": "node0", "bootstrapServers": "localhost:9092", "maxMessages": 100, - "activeTopics": { - "foo[1-3]": { - "numPartitions": 3, - "replicationFactor": 1 - } - } + "activeTopics": ["foo[1-3]"] } } EOF diff --git a/tests/kafkatest/services/trogdor/produce_bench_workload.py b/tests/kafkatest/services/trogdor/consume_bench_workload.py similarity index 66% copy from tests/kafkatest/services/trogdor/produce_bench_workload.py copy to tests/kafkatest/services/trogdor/consume_bench_workload.py index 7eac4ee..9e61b11 100644 --- a/tests/kafkatest/services/trogdor/produce_bench_workload.py +++ b/tests/kafkatest/services/trogdor/consume_bench_workload.py @@ -18,26 +18,29 @@ from ducktape.services.service import Service from kafkatest.services.trogdor.task_spec import TaskSpec -class ProduceBenchWorkloadSpec(TaskSpec): - def __init__(self, start_ms, duration_ms, producer_node, bootstrap_servers, - target_messages_per_sec, max_messages, producer_conf, - inactive_topics, active_topics): - super(ProduceBenchWorkloadSpec, self).__init__(start_ms, duration_ms) - self.message["class"] = "org.apache.kafka.trogdor.workload.ProduceBenchSpec" - self.message["producerNode"] = producer_node +class ConsumeBenchWorkloadSpec(TaskSpec): + def __init__(self, start_ms, duration_ms, consumer_node, bootstrap_servers, + target_messages_per_sec, max_messages, active_topics, + consumer_conf, common_client_conf, admin_client_conf, consumer_group=None): + super(ConsumeBenchWorkloadSpec, self).__init__(start_ms, duration_ms) + self.message["class"] = "org.apache.kafka.trogdor.workload.ConsumeBenchSpec" + self.message["consumerNode"] = consumer_node self.message["bootstrapServers"] = bootstrap_servers self.message["targetMessagesPerSec"] = target_messages_per_sec self.message["maxMessages"] = max_messages - self.message["producerConf"] = producer_conf - self.message["inactiveTopics"] = inactive_topics + self.message["consumerConf"] = consumer_conf + self.message["adminClientConf"] = admin_client_conf + self.message["commonClientConf"] = common_client_conf self.message["activeTopics"] = active_topics + if consumer_group is not None: + self.message["consumerGroup"] = consumer_group -class ProduceBenchWorkloadService(Service): +class ConsumeBenchWorkloadService(Service): def __init__(self, context, kafka): Service.__init__(self, context, num_nodes=1) self.bootstrap_servers = kafka.bootstrap_servers(validate=False) - self.producer_node = self.nodes[0].account.hostname + self.consumer_node = self.nodes[0].account.hostname def free(self): Service.free(self) @@ -49,4 +52,4 @@ class ProduceBenchWorkloadService(Service): pass def clean_node(self, node): - pass + pass \ No newline at end of file diff --git a/tests/kafkatest/services/trogdor/produce_bench_workload.py b/tests/kafkatest/services/trogdor/produce_bench_workload.py index 7eac4ee..cf6a962 100644 --- a/tests/kafkatest/services/trogdor/produce_bench_workload.py +++ b/tests/kafkatest/services/trogdor/produce_bench_workload.py @@ -20,8 +20,8 @@ from kafkatest.services.trogdor.task_spec import TaskSpec class ProduceBenchWorkloadSpec(TaskSpec): def __init__(self, start_ms, duration_ms, producer_node, bootstrap_servers, - target_messages_per_sec, max_messages, producer_conf, - inactive_topics, active_topics): + target_messages_per_sec, max_messages, producer_conf, admin_client_conf, + common_client_conf, inactive_topics, active_topics): super(ProduceBenchWorkloadSpec, self).__init__(start_ms, duration_ms) self.message["class"] = "org.apache.kafka.trogdor.workload.ProduceBenchSpec" self.message["producerNode"] = producer_node @@ -29,6 +29,8 @@ class ProduceBenchWorkloadSpec(TaskSpec): self.message["targetMessagesPerSec"] = target_messages_per_sec self.message["maxMessages"] = max_messages self.message["producerConf"] = producer_conf + self.message["adminClientConf"] = admin_client_conf + self.message["commonClientConf"] = common_client_conf self.message["inactiveTopics"] = inactive_topics self.message["activeTopics"] = active_topics diff --git a/tests/kafkatest/tests/core/consume_bench_test.py b/tests/kafkatest/tests/core/consume_bench_test.py new file mode 100644 index 0000000..bec7416 --- /dev/null +++ b/tests/kafkatest/tests/core/consume_bench_test.py @@ -0,0 +1,132 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from ducktape.mark import parametrize +from ducktape.tests.test import Test +from kafkatest.services.kafka import KafkaService +from kafkatest.services.trogdor.produce_bench_workload import ProduceBenchWorkloadService, ProduceBenchWorkloadSpec +from kafkatest.services.trogdor.consume_bench_workload import ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec +from kafkatest.services.trogdor.task_spec import TaskSpec +from kafkatest.services.trogdor.trogdor import TrogdorService +from kafkatest.services.zookeeper import ZookeeperService + + +class ConsumeBenchTest(Test): + def __init__(self, test_context): + """:type test_context: ducktape.tests.test.TestContext""" + super(ConsumeBenchTest, self).__init__(test_context) + self.zk = ZookeeperService(test_context, num_nodes=3) + self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk) + self.producer_workload_service = ProduceBenchWorkloadService(test_context, self.kafka) + self.consumer_workload_service = ConsumeBenchWorkloadService(test_context, self.kafka) + self.consumer_workload_service_2 = ConsumeBenchWorkloadService(test_context, self.kafka) + self.active_topics = {"consume_bench_topic[0-5]": {"numPartitions": 5, "replicationFactor": 3}} + self.trogdor = TrogdorService(context=self.test_context, + client_services=[self.kafka, self.producer_workload_service, + self.consumer_workload_service, + self.consumer_workload_service_2]) + + def setUp(self): + self.trogdor.start() + self.zk.start() + self.kafka.start() + + def teardown(self): + self.trogdor.stop() + self.kafka.stop() + self.zk.stop() + + def produce_messages(self, topics, max_messages=10000): + produce_spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS, + self.producer_workload_service.producer_node, + self.producer_workload_service.bootstrap_servers, + target_messages_per_sec=1000, + max_messages=max_messages, + producer_conf={}, + admin_client_conf={}, + common_client_conf={}, + inactive_topics={}, + active_topics=topics) + produce_workload = self.trogdor.create_task("produce_workload", produce_spec) + produce_workload.wait_for_done(timeout_sec=180) + self.logger.debug("Produce workload finished") + + @parametrize(topics=["consume_bench_topic[0-5]"]) # topic subscription + @parametrize(topics=["consume_bench_topic[0-5]:[0-4]"]) # manual topic assignment + def test_consume_bench(self, topics): + """ + Runs a ConsumeBench workload to consume messages + """ + self.produce_messages(self.active_topics) + consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS, + self.consumer_workload_service.consumer_node, + self.consumer_workload_service.bootstrap_servers, + target_messages_per_sec=1000, + max_messages=10000, + consumer_conf={}, + admin_client_conf={}, + common_client_conf={}, + active_topics=topics) + consume_workload = self.trogdor.create_task("consume_workload", consume_spec) + consume_workload.wait_for_done(timeout_sec=360) + self.logger.debug("Consume workload finished") + tasks = self.trogdor.tasks() + self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) + + def test_consume_bench_single_partition(self): + """ + Run a ConsumeBench against a single partition + """ + active_topics = {"consume_bench_topic": {"numPartitions": 2, "replicationFactor": 3}} + self.produce_messages(active_topics, 5000) + consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS, + self.consumer_workload_service.consumer_node, + self.consumer_workload_service.bootstrap_servers, + target_messages_per_sec=1000, + max_messages=2500, + consumer_conf={}, + admin_client_conf={}, + common_client_conf={}, + active_topics=["consume_bench_topic:1"]) + consume_workload = self.trogdor.create_task("consume_workload", consume_spec) + consume_workload.wait_for_done(timeout_sec=180) + self.logger.debug("Consume workload finished") + tasks = self.trogdor.tasks() + self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) + + def test_consume_group_bench(self): + """ + Runs two ConsumeBench workloads in the same consumer group to read messages from topics + """ + self.produce_messages(self.active_topics) + consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS, + self.consumer_workload_service.consumer_node, + self.consumer_workload_service.bootstrap_servers, + target_messages_per_sec=1000, + max_messages=2000, # both should read at least 2k messages + consumer_conf={}, + admin_client_conf={}, + common_client_conf={}, + consumer_group="testGroup", + active_topics=["consume_bench_topic[0-5]"]) + consume_workload_1 = self.trogdor.create_task("consume_workload_1", consume_spec) + consume_workload_2 = self.trogdor.create_task("consume_workload_2", consume_spec) + consume_workload_1.wait_for_done(timeout_sec=360) + self.logger.debug("Consume workload 1 finished") + consume_workload_2.wait_for_done(timeout_sec=360) + self.logger.debug("Consume workload 2 finished") + tasks = self.trogdor.tasks() + self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, indent=2)) diff --git a/tests/kafkatest/tests/core/produce_bench_test.py b/tests/kafkatest/tests/core/produce_bench_test.py index 6a1724d..125ee94 100644 --- a/tests/kafkatest/tests/core/produce_bench_test.py +++ b/tests/kafkatest/tests/core/produce_bench_test.py @@ -51,6 +51,8 @@ class ProduceBenchTest(Test): target_messages_per_sec=1000, max_messages=100000, producer_conf={}, + admin_client_conf={}, + common_client_conf={}, inactive_topics=inactive_topics, active_topics=active_topics) workload1 = self.trogdor.create_task("workload1", spec) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java index 1b429ea..6d4c67c 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java @@ -20,20 +20,65 @@ package org.apache.kafka.trogdor.workload; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.kafka.trogdor.common.Topology; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.trogdor.common.StringExpander; import org.apache.kafka.trogdor.task.TaskController; import org.apache.kafka.trogdor.task.TaskSpec; import org.apache.kafka.trogdor.task.TaskWorker; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.HashMap; import java.util.Set; +import java.util.HashSet; /** - * The specification for a benchmark that produces messages to a set of topics. + * The specification for a benchmark that consumer messages from a set of topic/partitions. + * + * If a consumer group is not given to the specification, a random one will be generated and + * used to track offsets/subscribe to topics. + * + * This specification uses a specific way to represent a topic partition via its "activeTopics" field. + * The notation for that is topic_name:partition_number (e.g "foo:1" represents partition-1 of topic "foo") + * Note that a topic name cannot have more than one colon. + * + * The "activeTopics" field also supports ranges that get expanded. See #{@link StringExpander}. + * + * There now exists a clever and succinct way to represent multiple partitions of multiple topics. + * Example: + * Given "activeTopics": ["foo[1-3]:[1-3]"], "foo[1-3]:[1-3]" will get + * expanded to [foo1:1, foo1:2, foo1:3, foo2:1, ..., foo3:3]. + * This represents all partitions 1-3 for the three topics foo1, foo2 and foo3. + * + * If there is at least one topic:partition pair, the consumer will be manually assigned partitions via + * #{@link org.apache.kafka.clients.consumer.KafkaConsumer#assign(Collection)}. + * Note that in this case the consumer will fetch and assign all partitions for a topic if no partition is given for it (e.g ["foo:1", "bar"]) + * + * If there are no topic:partition pairs given, the consumer will subscribe to the topics via + * #{@link org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection)}. + * It will be assigned partitions dynamically from the consumer group. + * + * An example JSON representation which will result in a consumer that is part of the consumer group "cg" and + * subscribed to topics foo1, foo2, foo3 and bar. + * #{@code + * { + * "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec", + * "durationMs": 10000000, + * "consumerNode": "node0", + * "bootstrapServers": "localhost:9092", + * "maxMessages": 100, + * "consumerGroup": "cg", + * "activeTopics": ["foo[1-3]", "bar"] + * } + * } */ public class ConsumeBenchSpec extends TaskSpec { + static final String EMPTY_CONSUMER_GROUP = ""; + private static final String VALID_EXPANDED_TOPIC_NAME_PATTERN = "^[^:]+(:[\\d]+|[^:]*)$"; private final String consumerNode; private final String bootstrapServers; private final int targetMessagesPerSec; @@ -41,7 +86,8 @@ public class ConsumeBenchSpec extends TaskSpec { private final Map<String, String> consumerConf; private final Map<String, String> adminClientConf; private final Map<String, String> commonClientConf; - private final TopicsSpec activeTopics; + private final List<String> activeTopics; + private final String consumerGroup; @JsonCreator public ConsumeBenchSpec(@JsonProperty("startMs") long startMs, @@ -50,10 +96,11 @@ public class ConsumeBenchSpec extends TaskSpec { @JsonProperty("bootstrapServers") String bootstrapServers, @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec, @JsonProperty("maxMessages") int maxMessages, + @JsonProperty("consumerGroup") String consumerGroup, @JsonProperty("consumerConf") Map<String, String> consumerConf, @JsonProperty("commonClientConf") Map<String, String> commonClientConf, @JsonProperty("adminClientConf") Map<String, String> adminClientConf, - @JsonProperty("activeTopics") TopicsSpec activeTopics) { + @JsonProperty("activeTopics") List<String> activeTopics) { super(startMs, durationMs); this.consumerNode = (consumerNode == null) ? "" : consumerNode; this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers; @@ -62,7 +109,8 @@ public class ConsumeBenchSpec extends TaskSpec { this.consumerConf = configOrEmptyMap(consumerConf); this.commonClientConf = configOrEmptyMap(commonClientConf); this.adminClientConf = configOrEmptyMap(adminClientConf); - this.activeTopics = activeTopics == null ? TopicsSpec.EMPTY : activeTopics.immutableCopy(); + this.activeTopics = activeTopics == null ? new ArrayList<>() : activeTopics; + this.consumerGroup = consumerGroup == null ? EMPTY_CONSUMER_GROUP : consumerGroup; } @JsonProperty @@ -71,6 +119,11 @@ public class ConsumeBenchSpec extends TaskSpec { } @JsonProperty + public String consumerGroup() { + return consumerGroup; + } + + @JsonProperty public String bootstrapServers() { return bootstrapServers; } @@ -101,22 +154,67 @@ public class ConsumeBenchSpec extends TaskSpec { } @JsonProperty - public TopicsSpec activeTopics() { + public List<String> activeTopics() { return activeTopics; } @Override public TaskController newController(String id) { - return new TaskController() { - @Override - public Set<String> targetNodes(Topology topology) { - return Collections.singleton(consumerNode); - } - }; + return topology -> Collections.singleton(consumerNode); } @Override public TaskWorker newTaskWorker(String id) { return new ConsumeBenchWorker(id, this); } + + /** + * Materializes a list of topic names (optionally with ranges) into a map of the topics and their partitions + * + * Example: + * ['foo[1-3]', 'foobar:2', 'bar[1-2]:[1-2]'] => {'foo1': [], 'foo2': [], 'foo3': [], 'foobar': [2], + * 'bar1': [1, 2], 'bar2': [1, 2] } + */ + Map<String, List<TopicPartition>> materializeTopics() { + Map<String, List<TopicPartition>> partitionsByTopics = new HashMap<>(); + + for (String rawTopicName : this.activeTopics) { + Set<String> expandedNames = expandTopicName(rawTopicName); + if (!expandedNames.iterator().next().matches(VALID_EXPANDED_TOPIC_NAME_PATTERN)) + throw new IllegalArgumentException(String.format("Expanded topic name %s is invalid", rawTopicName)); + + for (String topicName : expandedNames) { + TopicPartition partition = null; + if (topicName.contains(":")) { + String[] topicAndPartition = topicName.split(":"); + topicName = topicAndPartition[0]; + partition = new TopicPartition(topicName, Integer.parseInt(topicAndPartition[1])); + } + if (!partitionsByTopics.containsKey(topicName)) { + partitionsByTopics.put(topicName, new ArrayList<>()); + } + if (partition != null) { + partitionsByTopics.get(topicName).add(partition); + } + } + } + + return partitionsByTopics; + } + + /** + * Expands a topic name until there are no more ranges in it + */ + private Set<String> expandTopicName(String topicName) { + Set<String> expandedNames = StringExpander.expand(topicName); + if (expandedNames.size() == 1) { + return expandedNames; + } + + Set<String> newNames = new HashSet<>(); + for (String name : expandedNames) { + newNames.addAll(expandTopicName(name)); + } + return newNames; + } } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java index c3a90e4..b0998f0 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java @@ -39,9 +39,10 @@ import org.slf4j.LoggerFactory; import org.apache.kafka.trogdor.task.TaskWorker; import java.time.Duration; -import java.util.Collection; -import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.UUID; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.Executors; @@ -49,6 +50,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; public class ConsumeBenchWorker implements TaskWorker { private static final Logger log = LoggerFactory.getLogger(ConsumeBenchWorker.class); @@ -86,18 +88,62 @@ public class ConsumeBenchWorker implements TaskWorker { @Override public void run() { try { - HashSet<TopicPartition> partitions = new HashSet<>(); - for (Map.Entry<String, PartitionsSpec> entry : spec.activeTopics().materialize().entrySet()) { - for (Integer partitionNumber : entry.getValue().partitionNumbers()) { - partitions.add(new TopicPartition(entry.getKey(), partitionNumber)); - } - } - log.info("Will consume from {}", partitions); - executor.submit(new ConsumeMessages(partitions)); + executor.submit(consumeTask()); } catch (Throwable e) { WorkerUtils.abort(log, "Prepare", e, doneFuture); } } + + private ConsumeMessages consumeTask() { + String consumerGroup = spec.consumerGroup(); + Map<String, List<TopicPartition>> partitionsByTopic = spec.materializeTopics(); + boolean toUseGroupPartitionAssignment = partitionsByTopic.values().isEmpty(); + + if (consumerGroup.equals(ConsumeBenchSpec.EMPTY_CONSUMER_GROUP)) // consumer group is undefined, the consumer should use a random group + consumerGroup = generateConsumerGroup(); + + consumer = consumer(consumerGroup); + if (!toUseGroupPartitionAssignment) + partitionsByTopic = populatePartitionsByTopic(consumer, partitionsByTopic); + + return new ConsumeMessages(consumer, partitionsByTopic, toUseGroupPartitionAssignment); + } + + private KafkaConsumer<byte[], byte[]> consumer(String consumerGroup) { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers()); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id); + props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000); + // these defaults maybe over-written by the user-specified commonClientConf or consumerConf + WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.consumerConf()); + return new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer()); + } + + private String generateConsumerGroup() { + return "consume-bench-" + UUID.randomUUID().toString(); + } + + private Map<String, List<TopicPartition>> populatePartitionsByTopic(KafkaConsumer<byte[], byte[]> consumer, + Map<String, List<TopicPartition>> materializedTopics) { + // fetch partitions for topics who do not have any listed + for (Map.Entry<String, List<TopicPartition>> entry : materializedTopics.entrySet()) { + String topicName = entry.getKey(); + List<TopicPartition> partitions = entry.getValue(); + + if (partitions.isEmpty()) { + List<TopicPartition> fetchedPartitions = consumer.partitionsFor(topicName).stream() + .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition())) + .collect(Collectors.toList()); + partitions.addAll(fetchedPartitions); + } + + materializedTopics.put(topicName, partitions); + } + + return materializedTopics; + } } public class ConsumeMessages implements Callable<Void> { @@ -105,24 +151,26 @@ public class ConsumeBenchWorker implements TaskWorker { private final Histogram messageSizeHistogram; private final Future<?> statusUpdaterFuture; private final Throttle throttle; + private final KafkaConsumer<byte[], byte[]> consumer; - ConsumeMessages(Collection<TopicPartition> topicPartitions) { + ConsumeMessages(KafkaConsumer<byte[], byte[]> consumer, Map<String, List<TopicPartition>> topicPartitionsByTopic, + boolean toUseGroupAssignment) { this.latencyHistogram = new Histogram(5000); this.messageSizeHistogram = new Histogram(2 * 1024 * 1024); this.statusUpdaterFuture = executor.scheduleAtFixedRate( new StatusUpdater(latencyHistogram, messageSizeHistogram), 1, 1, TimeUnit.MINUTES); - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers()); - props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-1"); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000); - // these defaults maybe over-written by the user-specified commonClientConf or - // consumerConf - WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.consumerConf()); - consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), - new ByteArrayDeserializer()); - consumer.assign(topicPartitions); + this.consumer = consumer; + if (toUseGroupAssignment) { + Set<String> topics = topicPartitionsByTopic.keySet(); + log.info("Will consume from topics {} via dynamic group assignment.", topics); + this.consumer.subscribe(topics); + } else { + List<TopicPartition> partitions = topicPartitionsByTopic.values().stream() + .flatMap(List::stream).collect(Collectors.toList()); + log.info("Will consume from topic partitions {} via manual assignment.", partitions); + this.consumer.assign(partitions); + } + int perPeriod = WorkerUtils.perSecToPerPeriod( spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS); this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS); diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java index a9b550d..dcb8d8a 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java @@ -80,9 +80,10 @@ public class TopicsSpec extends Message { public Map<String, PartitionsSpec> materialize() { HashMap<String, PartitionsSpec> all = new HashMap<>(); for (Map.Entry<String, PartitionsSpec> entry : map.entrySet()) { - for (String topicName : StringExpander.expand(entry.getKey())) { - all.put(topicName, entry.getValue()); - } + String topicName = entry.getKey(); + PartitionsSpec partitions = entry.getValue(); + for (String expandedTopicName : StringExpander.expand(topicName)) + all.put(expandedTopicName, partitions); } return all; } diff --git a/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java b/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java new file mode 100644 index 0000000..117954b --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.workload; + +import org.apache.kafka.common.TopicPartition; +import org.junit.Test; + +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class ConsumeBenchSpecTest { + + @Test + public void testMaterializeTopicsWithNoPartitions() { + Map<String, List<TopicPartition>> materializedTopics = consumeBenchSpec(Arrays.asList("topic[1-3]", "secondTopic")).materializeTopics(); + Map<String, List<TopicPartition>> expected = new HashMap<>(); + expected.put("topic1", new ArrayList<>()); + expected.put("topic2", new ArrayList<>()); + expected.put("topic3", new ArrayList<>()); + expected.put("secondTopic", new ArrayList<>()); + + assertEquals(expected, materializedTopics); + } + + @Test + public void testMaterializeTopicsWithSomePartitions() { + Map<String, List<TopicPartition>> materializedTopics = consumeBenchSpec(Arrays.asList("topic[1-3]:[1-5]", "secondTopic", "thirdTopic:1")).materializeTopics(); + Map<String, List<TopicPartition>> expected = new HashMap<>(); + expected.put("topic1", IntStream.range(1, 6).asLongStream().mapToObj(i -> new TopicPartition("topic1", (int) i)).collect(Collectors.toList())); + expected.put("topic2", IntStream.range(1, 6).asLongStream().mapToObj(i -> new TopicPartition("topic2", (int) i)).collect(Collectors.toList())); + expected.put("topic3", IntStream.range(1, 6).asLongStream().mapToObj(i -> new TopicPartition("topic3", (int) i)).collect(Collectors.toList())); + expected.put("secondTopic", new ArrayList<>()); + expected.put("thirdTopic", Collections.singletonList(new TopicPartition("thirdTopic", 1))); + + assertEquals(expected, materializedTopics); + } + + @Test + public void testInvalidTopicNameRaisesExceptionInMaterialize() { + for (String invalidName : Arrays.asList("In:valid", "invalid:", ":invalid", "in:valid:1", "invalid:2:2", "invalid::1", "invalid[1-3]:")) { + try { + consumeBenchSpec(Collections.singletonList(invalidName)).materializeTopics(); + fail(String.format("Invalid topic name (%s) should have raised an exception.", invalidName)); + } catch (IllegalArgumentException ignored) { } + } + + } + + private ConsumeBenchSpec consumeBenchSpec(List<String> activeTopics) { + return new ConsumeBenchSpec(0, 0, "node", "localhost", + 123, 1234, "cg-1", + Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), activeTopics); + } +}