This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d28c534  KAFKA-7515: Trogdor - Add Consumer Group Benchmark 
Specification (#5810)
d28c534 is described below

commit d28c5348197256db09b59d1ebbfe7db9d3934f47
Author: Stanislav Kozlovski <stanislav_kozlov...@outlook.com>
AuthorDate: Mon Oct 29 19:51:07 2018 +0200

    KAFKA-7515: Trogdor - Add Consumer Group Benchmark Specification (#5810)
    
    This ConsumeBenchWorker now supports using consumer groups.  The groups may 
be either used to store offsets, or as subscriptions.
---
 TROGDOR.md                                         |   3 +-
 tests/bin/trogdor-run-consume-bench.sh             |   7 +-
 ...bench_workload.py => consume_bench_workload.py} |  27 +++--
 .../services/trogdor/produce_bench_workload.py     |   6 +-
 tests/kafkatest/tests/core/consume_bench_test.py   | 132 +++++++++++++++++++++
 tests/kafkatest/tests/core/produce_bench_test.py   |   2 +
 .../kafka/trogdor/workload/ConsumeBenchSpec.java   | 122 +++++++++++++++++--
 .../kafka/trogdor/workload/ConsumeBenchWorker.java |  94 +++++++++++----
 .../apache/kafka/trogdor/workload/TopicsSpec.java  |   7 +-
 .../trogdor/workload/ConsumeBenchSpecTest.java     |  78 ++++++++++++
 10 files changed, 419 insertions(+), 59 deletions(-)

diff --git a/TROGDOR.md b/TROGDOR.md
index 3783d7e..d71455a 100644
--- a/TROGDOR.md
+++ b/TROGDOR.md
@@ -141,7 +141,8 @@ ProduceBench starts a Kafka producer on a single agent 
node, producing to severa
 RoundTripWorkload tests both production and consumption.  The workload starts 
a Kafka producer and consumer on a single node.  The consumer will read back 
the messages that were produced by the producer.
 
 ### ConsumeBench
-ConsumeBench starts a Kafka consumer on a single agent node.  The workload 
measures the average produce latency, as well as the median, 95th percentile, 
and 99th percentile latency.
+ConsumeBench starts a Kafka consumer on a single agent node. Depending on the 
passed in configuration (see ConsumeBenchSpec), the consumer either subscribes 
to a set of topics (leveraging consumer group functionality) or manually 
assigns partitions to itself.
+The workload measures the average produce latency, as well as the median, 95th 
percentile, and 99th percentile latency.
 
 Faults
 ========================================
diff --git a/tests/bin/trogdor-run-consume-bench.sh 
b/tests/bin/trogdor-run-consume-bench.sh
index 2e0239e..be9a2f1 100755
--- a/tests/bin/trogdor-run-consume-bench.sh
+++ b/tests/bin/trogdor-run-consume-bench.sh
@@ -26,12 +26,7 @@ cat <<EOF
         "consumerNode": "node0",
         "bootstrapServers": "localhost:9092",
         "maxMessages": 100,
-        "activeTopics": {
-            "foo[1-3]": {
-                "numPartitions": 3,
-                "replicationFactor": 1
-            }
-        }
+        "activeTopics": ["foo[1-3]"]
     }
 }
 EOF
diff --git a/tests/kafkatest/services/trogdor/produce_bench_workload.py 
b/tests/kafkatest/services/trogdor/consume_bench_workload.py
similarity index 66%
copy from tests/kafkatest/services/trogdor/produce_bench_workload.py
copy to tests/kafkatest/services/trogdor/consume_bench_workload.py
index 7eac4ee..9e61b11 100644
--- a/tests/kafkatest/services/trogdor/produce_bench_workload.py
+++ b/tests/kafkatest/services/trogdor/consume_bench_workload.py
@@ -18,26 +18,29 @@ from ducktape.services.service import Service
 from kafkatest.services.trogdor.task_spec import TaskSpec
 
 
-class ProduceBenchWorkloadSpec(TaskSpec):
-    def __init__(self, start_ms, duration_ms, producer_node, bootstrap_servers,
-                 target_messages_per_sec, max_messages, producer_conf,
-                 inactive_topics, active_topics):
-        super(ProduceBenchWorkloadSpec, self).__init__(start_ms, duration_ms)
-        self.message["class"] = 
"org.apache.kafka.trogdor.workload.ProduceBenchSpec"
-        self.message["producerNode"] = producer_node
+class ConsumeBenchWorkloadSpec(TaskSpec):
+    def __init__(self, start_ms, duration_ms, consumer_node, bootstrap_servers,
+                 target_messages_per_sec, max_messages, active_topics,
+                 consumer_conf, common_client_conf, admin_client_conf, 
consumer_group=None):
+        super(ConsumeBenchWorkloadSpec, self).__init__(start_ms, duration_ms)
+        self.message["class"] = 
"org.apache.kafka.trogdor.workload.ConsumeBenchSpec"
+        self.message["consumerNode"] = consumer_node
         self.message["bootstrapServers"] = bootstrap_servers
         self.message["targetMessagesPerSec"] = target_messages_per_sec
         self.message["maxMessages"] = max_messages
-        self.message["producerConf"] = producer_conf
-        self.message["inactiveTopics"] = inactive_topics
+        self.message["consumerConf"] = consumer_conf
+        self.message["adminClientConf"] = admin_client_conf
+        self.message["commonClientConf"] = common_client_conf
         self.message["activeTopics"] = active_topics
+        if consumer_group is not None:
+            self.message["consumerGroup"] = consumer_group
 
 
-class ProduceBenchWorkloadService(Service):
+class ConsumeBenchWorkloadService(Service):
     def __init__(self, context, kafka):
         Service.__init__(self, context, num_nodes=1)
         self.bootstrap_servers = kafka.bootstrap_servers(validate=False)
-        self.producer_node = self.nodes[0].account.hostname
+        self.consumer_node = self.nodes[0].account.hostname
 
     def free(self):
         Service.free(self)
@@ -49,4 +52,4 @@ class ProduceBenchWorkloadService(Service):
         pass
 
     def clean_node(self, node):
-        pass
+        pass
\ No newline at end of file
diff --git a/tests/kafkatest/services/trogdor/produce_bench_workload.py 
b/tests/kafkatest/services/trogdor/produce_bench_workload.py
index 7eac4ee..cf6a962 100644
--- a/tests/kafkatest/services/trogdor/produce_bench_workload.py
+++ b/tests/kafkatest/services/trogdor/produce_bench_workload.py
@@ -20,8 +20,8 @@ from kafkatest.services.trogdor.task_spec import TaskSpec
 
 class ProduceBenchWorkloadSpec(TaskSpec):
     def __init__(self, start_ms, duration_ms, producer_node, bootstrap_servers,
-                 target_messages_per_sec, max_messages, producer_conf,
-                 inactive_topics, active_topics):
+                 target_messages_per_sec, max_messages, producer_conf, 
admin_client_conf,
+                 common_client_conf, inactive_topics, active_topics):
         super(ProduceBenchWorkloadSpec, self).__init__(start_ms, duration_ms)
         self.message["class"] = 
"org.apache.kafka.trogdor.workload.ProduceBenchSpec"
         self.message["producerNode"] = producer_node
@@ -29,6 +29,8 @@ class ProduceBenchWorkloadSpec(TaskSpec):
         self.message["targetMessagesPerSec"] = target_messages_per_sec
         self.message["maxMessages"] = max_messages
         self.message["producerConf"] = producer_conf
+        self.message["adminClientConf"] = admin_client_conf
+        self.message["commonClientConf"] = common_client_conf
         self.message["inactiveTopics"] = inactive_topics
         self.message["activeTopics"] = active_topics
 
diff --git a/tests/kafkatest/tests/core/consume_bench_test.py 
b/tests/kafkatest/tests/core/consume_bench_test.py
new file mode 100644
index 0000000..bec7416
--- /dev/null
+++ b/tests/kafkatest/tests/core/consume_bench_test.py
@@ -0,0 +1,132 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from ducktape.mark import parametrize
+from ducktape.tests.test import Test
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.trogdor.produce_bench_workload import 
ProduceBenchWorkloadService, ProduceBenchWorkloadSpec
+from kafkatest.services.trogdor.consume_bench_workload import 
ConsumeBenchWorkloadService, ConsumeBenchWorkloadSpec
+from kafkatest.services.trogdor.task_spec import TaskSpec
+from kafkatest.services.trogdor.trogdor import TrogdorService
+from kafkatest.services.zookeeper import ZookeeperService
+
+
+class ConsumeBenchTest(Test):
+    def __init__(self, test_context):
+        """:type test_context: ducktape.tests.test.TestContext"""
+        super(ConsumeBenchTest, self).__init__(test_context)
+        self.zk = ZookeeperService(test_context, num_nodes=3)
+        self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk)
+        self.producer_workload_service = 
ProduceBenchWorkloadService(test_context, self.kafka)
+        self.consumer_workload_service = 
ConsumeBenchWorkloadService(test_context, self.kafka)
+        self.consumer_workload_service_2 = 
ConsumeBenchWorkloadService(test_context, self.kafka)
+        self.active_topics = {"consume_bench_topic[0-5]": {"numPartitions": 5, 
"replicationFactor": 3}}
+        self.trogdor = TrogdorService(context=self.test_context,
+                                      client_services=[self.kafka, 
self.producer_workload_service,
+                                                       
self.consumer_workload_service,
+                                                       
self.consumer_workload_service_2])
+
+    def setUp(self):
+        self.trogdor.start()
+        self.zk.start()
+        self.kafka.start()
+
+    def teardown(self):
+        self.trogdor.stop()
+        self.kafka.stop()
+        self.zk.stop()
+
+    def produce_messages(self, topics, max_messages=10000):
+        produce_spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
+                                                
self.producer_workload_service.producer_node,
+                                                
self.producer_workload_service.bootstrap_servers,
+                                                target_messages_per_sec=1000,
+                                                max_messages=max_messages,
+                                                producer_conf={},
+                                                admin_client_conf={},
+                                                common_client_conf={},
+                                                inactive_topics={},
+                                                active_topics=topics)
+        produce_workload = self.trogdor.create_task("produce_workload", 
produce_spec)
+        produce_workload.wait_for_done(timeout_sec=180)
+        self.logger.debug("Produce workload finished")
+
+    @parametrize(topics=["consume_bench_topic[0-5]"]) # topic subscription
+    @parametrize(topics=["consume_bench_topic[0-5]:[0-4]"])  # manual topic 
assignment
+    def test_consume_bench(self, topics):
+        """
+        Runs a ConsumeBench workload to consume messages
+        """
+        self.produce_messages(self.active_topics)
+        consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
+                                                
self.consumer_workload_service.consumer_node,
+                                                
self.consumer_workload_service.bootstrap_servers,
+                                                target_messages_per_sec=1000,
+                                                max_messages=10000,
+                                                consumer_conf={},
+                                                admin_client_conf={},
+                                                common_client_conf={},
+                                                active_topics=topics)
+        consume_workload = self.trogdor.create_task("consume_workload", 
consume_spec)
+        consume_workload.wait_for_done(timeout_sec=360)
+        self.logger.debug("Consume workload finished")
+        tasks = self.trogdor.tasks()
+        self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, 
indent=2))
+
+    def test_consume_bench_single_partition(self):
+        """
+        Run a ConsumeBench against a single partition
+        """
+        active_topics = {"consume_bench_topic": {"numPartitions": 2, 
"replicationFactor": 3}}
+        self.produce_messages(active_topics, 5000)
+        consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
+                                                
self.consumer_workload_service.consumer_node,
+                                                
self.consumer_workload_service.bootstrap_servers,
+                                                target_messages_per_sec=1000,
+                                                max_messages=2500,
+                                                consumer_conf={},
+                                                admin_client_conf={},
+                                                common_client_conf={},
+                                                
active_topics=["consume_bench_topic:1"])
+        consume_workload = self.trogdor.create_task("consume_workload", 
consume_spec)
+        consume_workload.wait_for_done(timeout_sec=180)
+        self.logger.debug("Consume workload finished")
+        tasks = self.trogdor.tasks()
+        self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, 
indent=2))
+
+    def test_consume_group_bench(self):
+        """
+        Runs two ConsumeBench workloads in the same consumer group to read 
messages from topics
+        """
+        self.produce_messages(self.active_topics)
+        consume_spec = ConsumeBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
+                                                
self.consumer_workload_service.consumer_node,
+                                                
self.consumer_workload_service.bootstrap_servers,
+                                                target_messages_per_sec=1000,
+                                                max_messages=2000, # both 
should read at least 2k messages
+                                                consumer_conf={},
+                                                admin_client_conf={},
+                                                common_client_conf={},
+                                                consumer_group="testGroup",
+                                                
active_topics=["consume_bench_topic[0-5]"])
+        consume_workload_1 = self.trogdor.create_task("consume_workload_1", 
consume_spec)
+        consume_workload_2 = self.trogdor.create_task("consume_workload_2", 
consume_spec)
+        consume_workload_1.wait_for_done(timeout_sec=360)
+        self.logger.debug("Consume workload 1 finished")
+        consume_workload_2.wait_for_done(timeout_sec=360)
+        self.logger.debug("Consume workload 2 finished")
+        tasks = self.trogdor.tasks()
+        self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, 
indent=2))
diff --git a/tests/kafkatest/tests/core/produce_bench_test.py 
b/tests/kafkatest/tests/core/produce_bench_test.py
index 6a1724d..125ee94 100644
--- a/tests/kafkatest/tests/core/produce_bench_test.py
+++ b/tests/kafkatest/tests/core/produce_bench_test.py
@@ -51,6 +51,8 @@ class ProduceBenchTest(Test):
                                         target_messages_per_sec=1000,
                                         max_messages=100000,
                                         producer_conf={},
+                                        admin_client_conf={},
+                                        common_client_conf={},
                                         inactive_topics=inactive_topics,
                                         active_topics=active_topics)
         workload1 = self.trogdor.create_task("workload1", spec)
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
index 1b429ea..6d4c67c 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
@@ -20,20 +20,65 @@ package org.apache.kafka.trogdor.workload;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
-import org.apache.kafka.trogdor.common.Topology;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.trogdor.common.StringExpander;
 import org.apache.kafka.trogdor.task.TaskController;
 import org.apache.kafka.trogdor.task.TaskSpec;
 import org.apache.kafka.trogdor.task.TaskWorker;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.HashMap;
 import java.util.Set;
+import java.util.HashSet;
 
 /**
- * The specification for a benchmark that produces messages to a set of topics.
+ * The specification for a benchmark that consumer messages from a set of 
topic/partitions.
+ *
+ * If a consumer group is not given to the specification, a random one will be 
generated and
+ *  used to track offsets/subscribe to topics.
+ *
+ * This specification uses a specific way to represent a topic partition via 
its "activeTopics" field.
+ * The notation for that is topic_name:partition_number (e.g "foo:1" 
represents partition-1 of topic "foo")
+ * Note that a topic name cannot have more than one colon.
+ *
+ * The "activeTopics" field also supports ranges that get expanded. See 
#{@link StringExpander}.
+ *
+ * There now exists a clever and succinct way to represent multiple partitions 
of multiple topics.
+ * Example:
+ * Given "activeTopics": ["foo[1-3]:[1-3]"], "foo[1-3]:[1-3]" will get
+ * expanded to [foo1:1, foo1:2, foo1:3, foo2:1, ..., foo3:3].
+ * This represents all partitions 1-3 for the three topics foo1, foo2 and foo3.
+ *
+ * If there is at least one topic:partition pair, the consumer will be 
manually assigned partitions via
+ * #{@link org.apache.kafka.clients.consumer.KafkaConsumer#assign(Collection)}.
+ * Note that in this case the consumer will fetch and assign all partitions 
for a topic if no partition is given for it (e.g ["foo:1", "bar"])
+ *
+ * If there are no topic:partition pairs given, the consumer will subscribe to 
the topics via
+ * #{@link 
org.apache.kafka.clients.consumer.KafkaConsumer#subscribe(Collection)}.
+ * It will be assigned partitions dynamically from the consumer group.
+ *
+ * An example JSON representation which will result in a consumer that is part 
of the consumer group "cg" and
+ * subscribed to topics foo1, foo2, foo3 and bar.
+ * #{@code
+ *    {
+ *        "class": "org.apache.kafka.trogdor.workload.ConsumeBenchSpec",
+ *        "durationMs": 10000000,
+ *        "consumerNode": "node0",
+ *        "bootstrapServers": "localhost:9092",
+ *        "maxMessages": 100,
+ *        "consumerGroup": "cg",
+ *        "activeTopics": ["foo[1-3]", "bar"]
+ *    }
+ * }
  */
 public class ConsumeBenchSpec extends TaskSpec {
 
+    static final String EMPTY_CONSUMER_GROUP = "";
+    private static final String VALID_EXPANDED_TOPIC_NAME_PATTERN = 
"^[^:]+(:[\\d]+|[^:]*)$";
     private final String consumerNode;
     private final String bootstrapServers;
     private final int targetMessagesPerSec;
@@ -41,7 +86,8 @@ public class ConsumeBenchSpec extends TaskSpec {
     private final Map<String, String> consumerConf;
     private final Map<String, String> adminClientConf;
     private final Map<String, String> commonClientConf;
-    private final TopicsSpec activeTopics;
+    private final List<String> activeTopics;
+    private final String consumerGroup;
 
     @JsonCreator
     public ConsumeBenchSpec(@JsonProperty("startMs") long startMs,
@@ -50,10 +96,11 @@ public class ConsumeBenchSpec extends TaskSpec {
                             @JsonProperty("bootstrapServers") String 
bootstrapServers,
                             @JsonProperty("targetMessagesPerSec") int 
targetMessagesPerSec,
                             @JsonProperty("maxMessages") int maxMessages,
+                            @JsonProperty("consumerGroup") String 
consumerGroup,
                             @JsonProperty("consumerConf") Map<String, String> 
consumerConf,
                             @JsonProperty("commonClientConf") Map<String, 
String> commonClientConf,
                             @JsonProperty("adminClientConf") Map<String, 
String> adminClientConf,
-                            @JsonProperty("activeTopics") TopicsSpec 
activeTopics) {
+                            @JsonProperty("activeTopics") List<String> 
activeTopics) {
         super(startMs, durationMs);
         this.consumerNode = (consumerNode == null) ? "" : consumerNode;
         this.bootstrapServers = (bootstrapServers == null) ? "" : 
bootstrapServers;
@@ -62,7 +109,8 @@ public class ConsumeBenchSpec extends TaskSpec {
         this.consumerConf = configOrEmptyMap(consumerConf);
         this.commonClientConf = configOrEmptyMap(commonClientConf);
         this.adminClientConf = configOrEmptyMap(adminClientConf);
-        this.activeTopics = activeTopics == null ? TopicsSpec.EMPTY : 
activeTopics.immutableCopy();
+        this.activeTopics = activeTopics == null ? new ArrayList<>() : 
activeTopics;
+        this.consumerGroup = consumerGroup == null ? EMPTY_CONSUMER_GROUP : 
consumerGroup;
     }
 
     @JsonProperty
@@ -71,6 +119,11 @@ public class ConsumeBenchSpec extends TaskSpec {
     }
 
     @JsonProperty
+    public String consumerGroup() {
+        return consumerGroup;
+    }
+
+    @JsonProperty
     public String bootstrapServers() {
         return bootstrapServers;
     }
@@ -101,22 +154,67 @@ public class ConsumeBenchSpec extends TaskSpec {
     }
 
     @JsonProperty
-    public TopicsSpec activeTopics() {
+    public List<String> activeTopics() {
         return activeTopics;
     }
 
     @Override
     public TaskController newController(String id) {
-        return new TaskController() {
-            @Override
-            public Set<String> targetNodes(Topology topology) {
-                return Collections.singleton(consumerNode);
-            }
-        };
+        return topology -> Collections.singleton(consumerNode);
     }
 
     @Override
     public TaskWorker newTaskWorker(String id) {
         return new ConsumeBenchWorker(id, this);
     }
+
+    /**
+     * Materializes a list of topic names (optionally with ranges) into a map 
of the topics and their partitions
+     *
+     * Example:
+     * ['foo[1-3]', 'foobar:2', 'bar[1-2]:[1-2]'] => {'foo1': [], 'foo2': [], 
'foo3': [], 'foobar': [2],
+     *                                                'bar1': [1, 2], 'bar2': 
[1, 2] }
+     */
+    Map<String, List<TopicPartition>> materializeTopics() {
+        Map<String, List<TopicPartition>> partitionsByTopics = new HashMap<>();
+
+        for (String rawTopicName : this.activeTopics) {
+            Set<String> expandedNames = expandTopicName(rawTopicName);
+            if 
(!expandedNames.iterator().next().matches(VALID_EXPANDED_TOPIC_NAME_PATTERN))
+                throw new IllegalArgumentException(String.format("Expanded 
topic name %s is invalid", rawTopicName));
+
+            for (String topicName : expandedNames) {
+                TopicPartition partition = null;
+                if (topicName.contains(":")) {
+                    String[] topicAndPartition = topicName.split(":");
+                    topicName = topicAndPartition[0];
+                    partition = new TopicPartition(topicName, 
Integer.parseInt(topicAndPartition[1]));
+                }
+                if (!partitionsByTopics.containsKey(topicName)) {
+                    partitionsByTopics.put(topicName, new ArrayList<>());
+                }
+                if (partition != null) {
+                    partitionsByTopics.get(topicName).add(partition);
+                }
+            }
+        }
+
+        return partitionsByTopics;
+    }
+
+    /**
+     * Expands a topic name until there are no more ranges in it
+     */
+    private Set<String> expandTopicName(String topicName) {
+        Set<String> expandedNames = StringExpander.expand(topicName);
+        if (expandedNames.size() == 1) {
+            return expandedNames;
+        }
+
+        Set<String> newNames = new HashSet<>();
+        for (String name : expandedNames) {
+            newNames.addAll(expandTopicName(name));
+        }
+        return newNames;
+    }
 }
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
index c3a90e4..b0998f0 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
@@ -39,9 +39,10 @@ import org.slf4j.LoggerFactory;
 import org.apache.kafka.trogdor.task.TaskWorker;
 
 import java.time.Duration;
-import java.util.Collection;
-import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
@@ -49,6 +50,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 public class ConsumeBenchWorker implements TaskWorker {
     private static final Logger log = 
LoggerFactory.getLogger(ConsumeBenchWorker.class);
@@ -86,18 +88,62 @@ public class ConsumeBenchWorker implements TaskWorker {
         @Override
         public void run() {
             try {
-                HashSet<TopicPartition> partitions = new HashSet<>();
-                for (Map.Entry<String, PartitionsSpec> entry : 
spec.activeTopics().materialize().entrySet()) {
-                    for (Integer partitionNumber : 
entry.getValue().partitionNumbers()) {
-                        partitions.add(new TopicPartition(entry.getKey(), 
partitionNumber));
-                    }
-                }
-                log.info("Will consume from {}", partitions);
-                executor.submit(new ConsumeMessages(partitions));
+                executor.submit(consumeTask());
             } catch (Throwable e) {
                 WorkerUtils.abort(log, "Prepare", e, doneFuture);
             }
         }
+
+        private ConsumeMessages consumeTask() {
+            String consumerGroup = spec.consumerGroup();
+            Map<String, List<TopicPartition>> partitionsByTopic = 
spec.materializeTopics();
+            boolean toUseGroupPartitionAssignment = 
partitionsByTopic.values().isEmpty();
+
+            if (consumerGroup.equals(ConsumeBenchSpec.EMPTY_CONSUMER_GROUP)) 
// consumer group is undefined, the consumer should use a random group
+                consumerGroup = generateConsumerGroup();
+
+            consumer = consumer(consumerGroup);
+            if (!toUseGroupPartitionAssignment)
+                partitionsByTopic = populatePartitionsByTopic(consumer, 
partitionsByTopic);
+
+            return new ConsumeMessages(consumer, partitionsByTopic, 
toUseGroupPartitionAssignment);
+        }
+
+        private KafkaConsumer<byte[], byte[]> consumer(String consumerGroup) {
+            Properties props = new Properties();
+            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
spec.bootstrapServers());
+            props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
+            props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
+            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+            props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
+            // these defaults maybe over-written by the user-specified 
commonClientConf or consumerConf
+            WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), 
spec.consumerConf());
+            return new KafkaConsumer<>(props, new ByteArrayDeserializer(), new 
ByteArrayDeserializer());
+        }
+
+        private String generateConsumerGroup() {
+            return "consume-bench-" + UUID.randomUUID().toString();
+        }
+
+        private Map<String, List<TopicPartition>> 
populatePartitionsByTopic(KafkaConsumer<byte[], byte[]> consumer,
+                                                                         
Map<String, List<TopicPartition>> materializedTopics) {
+            // fetch partitions for topics who do not have any listed
+            for (Map.Entry<String, List<TopicPartition>> entry : 
materializedTopics.entrySet()) {
+                String topicName = entry.getKey();
+                List<TopicPartition> partitions = entry.getValue();
+
+                if (partitions.isEmpty()) {
+                    List<TopicPartition> fetchedPartitions = 
consumer.partitionsFor(topicName).stream()
+                        .map(partitionInfo -> new 
TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
+                        .collect(Collectors.toList());
+                    partitions.addAll(fetchedPartitions);
+                }
+
+                materializedTopics.put(topicName, partitions);
+            }
+
+            return materializedTopics;
+        }
     }
 
     public class ConsumeMessages implements Callable<Void> {
@@ -105,24 +151,26 @@ public class ConsumeBenchWorker implements TaskWorker {
         private final Histogram messageSizeHistogram;
         private final Future<?> statusUpdaterFuture;
         private final Throttle throttle;
+        private final KafkaConsumer<byte[], byte[]> consumer;
 
-        ConsumeMessages(Collection<TopicPartition> topicPartitions) {
+        ConsumeMessages(KafkaConsumer<byte[], byte[]> consumer, Map<String, 
List<TopicPartition>> topicPartitionsByTopic,
+                        boolean toUseGroupAssignment) {
             this.latencyHistogram = new Histogram(5000);
             this.messageSizeHistogram = new Histogram(2 * 1024 * 1024);
             this.statusUpdaterFuture = executor.scheduleAtFixedRate(
                 new StatusUpdater(latencyHistogram, messageSizeHistogram), 1, 
1, TimeUnit.MINUTES);
-            Properties props = new Properties();
-            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
spec.bootstrapServers());
-            props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
-            props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-1");
-            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-            props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 100000);
-            // these defaults maybe over-written by the user-specified 
commonClientConf or
-            // consumerConf
-            WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), 
spec.consumerConf());
-            consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(),
-                                           new ByteArrayDeserializer());
-            consumer.assign(topicPartitions);
+            this.consumer = consumer;
+            if (toUseGroupAssignment) {
+                Set<String> topics = topicPartitionsByTopic.keySet();
+                log.info("Will consume from topics {} via dynamic group 
assignment.", topics);
+                this.consumer.subscribe(topics);
+            } else {
+                List<TopicPartition> partitions = 
topicPartitionsByTopic.values().stream()
+                    .flatMap(List::stream).collect(Collectors.toList());
+                log.info("Will consume from topic partitions {} via manual 
assignment.", partitions);
+                this.consumer.assign(partitions);
+            }
+
             int perPeriod = WorkerUtils.perSecToPerPeriod(
                 spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
             this.throttle = new Throttle(perPeriod, THROTTLE_PERIOD_MS);
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java
index a9b550d..dcb8d8a 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java
@@ -80,9 +80,10 @@ public class TopicsSpec extends Message {
     public Map<String, PartitionsSpec> materialize() {
         HashMap<String, PartitionsSpec> all = new HashMap<>();
         for (Map.Entry<String, PartitionsSpec> entry : map.entrySet()) {
-            for (String topicName : StringExpander.expand(entry.getKey())) {
-                all.put(topicName, entry.getValue());
-            }
+            String topicName = entry.getKey();
+            PartitionsSpec partitions = entry.getValue();
+            for (String expandedTopicName : StringExpander.expand(topicName))
+                all.put(expandedTopicName, partitions);
         }
         return all;
     }
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java
 
b/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java
new file mode 100644
index 0000000..117954b
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpecTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class ConsumeBenchSpecTest {
+
+    @Test
+    public void testMaterializeTopicsWithNoPartitions() {
+        Map<String, List<TopicPartition>> materializedTopics = 
consumeBenchSpec(Arrays.asList("topic[1-3]", 
"secondTopic")).materializeTopics();
+        Map<String, List<TopicPartition>> expected = new HashMap<>();
+        expected.put("topic1", new ArrayList<>());
+        expected.put("topic2", new ArrayList<>());
+        expected.put("topic3", new ArrayList<>());
+        expected.put("secondTopic", new ArrayList<>());
+
+        assertEquals(expected, materializedTopics);
+    }
+
+    @Test
+    public void testMaterializeTopicsWithSomePartitions() {
+        Map<String, List<TopicPartition>> materializedTopics = 
consumeBenchSpec(Arrays.asList("topic[1-3]:[1-5]", "secondTopic", 
"thirdTopic:1")).materializeTopics();
+        Map<String, List<TopicPartition>> expected = new HashMap<>();
+        expected.put("topic1", IntStream.range(1, 6).asLongStream().mapToObj(i 
-> new TopicPartition("topic1", (int) i)).collect(Collectors.toList()));
+        expected.put("topic2", IntStream.range(1, 6).asLongStream().mapToObj(i 
-> new TopicPartition("topic2", (int) i)).collect(Collectors.toList()));
+        expected.put("topic3", IntStream.range(1, 6).asLongStream().mapToObj(i 
-> new TopicPartition("topic3", (int) i)).collect(Collectors.toList()));
+        expected.put("secondTopic", new ArrayList<>());
+        expected.put("thirdTopic", Collections.singletonList(new 
TopicPartition("thirdTopic", 1)));
+
+        assertEquals(expected, materializedTopics);
+    }
+
+    @Test
+    public void testInvalidTopicNameRaisesExceptionInMaterialize() {
+        for (String invalidName : Arrays.asList("In:valid", "invalid:", 
":invalid", "in:valid:1", "invalid:2:2", "invalid::1", "invalid[1-3]:")) {
+            try {
+                
consumeBenchSpec(Collections.singletonList(invalidName)).materializeTopics();
+                fail(String.format("Invalid topic name (%s) should have raised 
an exception.", invalidName));
+            } catch (IllegalArgumentException ignored) { }
+        }
+
+    }
+
+    private ConsumeBenchSpec consumeBenchSpec(List<String> activeTopics) {
+        return new ConsumeBenchSpec(0, 0, "node", "localhost",
+            123, 1234, "cg-1",
+            Collections.emptyMap(), Collections.emptyMap(), 
Collections.emptyMap(), activeTopics);
+    }
+}

Reply via email to