This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 93e0341  KAFKA-6771. Make specifying partitions more flexible (#4850)
93e0341 is described below

commit 93e03414f72c24854abaa6925149196580cfc691
Author: Colin Patrick McCabe <co...@cmccabe.xyz>
AuthorDate: Mon Apr 16 00:55:13 2018 -0700

    KAFKA-6771. Make specifying partitions more flexible (#4850)
---
 .../kafka/trogdor/common/StringExpander.java       |  56 +++++++++
 .../kafka/trogdor/workload/ConsumeBenchSpec.java   |  27 +---
 .../kafka/trogdor/workload/ConsumeBenchWorker.java |  22 ++--
 .../kafka/trogdor/workload/PartitionsSpec.java     |  90 +++++++++++++
 .../kafka/trogdor/workload/ProduceBenchSpec.java   |  51 ++------
 .../kafka/trogdor/workload/ProduceBenchWorker.java |  70 ++++++-----
 .../kafka/trogdor/workload/RoundTripWorker.java    | 140 ++++++++++++++++-----
 .../trogdor/workload/RoundTripWorkloadSpec.java    |  15 +--
 .../apache/kafka/trogdor/workload/TopicsSpec.java  |  89 +++++++++++++
 .../trogdor/common/JsonSerializationTest.java      |   8 +-
 .../kafka/trogdor/common/StringExpanderTest.java   |  62 +++++++++
 .../kafka/trogdor/workload/TopicsSpecTest.java     |  80 ++++++++++++
 12 files changed, 563 insertions(+), 147 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java 
b/tools/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java
new file mode 100644
index 0000000..82f5003
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.common;
+
+import java.util.HashSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Utilities for expanding strings that have range expressions in them.
+ *
+ * For example, 'foo[1-3]' would be expaneded to foo1, foo2, foo3.
+ * Strings that have no range expressions will not be expanded.
+ */
+public class StringExpander {
+    private final static Pattern NUMERIC_RANGE_PATTERN =
+        Pattern.compile("(.*?)\\[([0-9]*)\\-([0-9]*)\\](.*?)");
+
+    public static HashSet<String> expand(String val) {
+        HashSet<String> set = new HashSet<>();
+        Matcher matcher = NUMERIC_RANGE_PATTERN.matcher(val);
+        if (!matcher.matches()) {
+            set.add(val);
+            return set;
+        }
+        String prequel = matcher.group(1);
+        String rangeStart = matcher.group(2);
+        String rangeEnd = matcher.group(3);
+        String epilog = matcher.group(4);
+        int rangeStartInt = Integer.parseInt(rangeStart);
+        int rangeEndInt = Integer.parseInt(rangeEnd);
+        if (rangeEndInt < rangeStartInt) {
+            throw new RuntimeException("Invalid range: start " + rangeStartInt 
+
+                    " is higher than end " + rangeEndInt);
+        }
+        for (int i = rangeStartInt; i <= rangeEndInt; i++) {
+            set.add(String.format("%s%d%s", prequel, i, epilog));
+        }
+        return set;
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
index cef913b..1b429ea 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java
@@ -41,10 +41,7 @@ public class ConsumeBenchSpec extends TaskSpec {
     private final Map<String, String> consumerConf;
     private final Map<String, String> adminClientConf;
     private final Map<String, String> commonClientConf;
-    private final String topicRegex;
-    private final int startPartition;
-    private final int endPartition;
-
+    private final TopicsSpec activeTopics;
 
     @JsonCreator
     public ConsumeBenchSpec(@JsonProperty("startMs") long startMs,
@@ -56,9 +53,7 @@ public class ConsumeBenchSpec extends TaskSpec {
                             @JsonProperty("consumerConf") Map<String, String> 
consumerConf,
                             @JsonProperty("commonClientConf") Map<String, 
String> commonClientConf,
                             @JsonProperty("adminClientConf") Map<String, 
String> adminClientConf,
-                            @JsonProperty("topicRegex") String topicRegex,
-                            @JsonProperty("startPartition") int startPartition,
-                            @JsonProperty("endPartition") int endPartition) {
+                            @JsonProperty("activeTopics") TopicsSpec 
activeTopics) {
         super(startMs, durationMs);
         this.consumerNode = (consumerNode == null) ? "" : consumerNode;
         this.bootstrapServers = (bootstrapServers == null) ? "" : 
bootstrapServers;
@@ -67,9 +62,7 @@ public class ConsumeBenchSpec extends TaskSpec {
         this.consumerConf = configOrEmptyMap(consumerConf);
         this.commonClientConf = configOrEmptyMap(commonClientConf);
         this.adminClientConf = configOrEmptyMap(adminClientConf);
-        this.topicRegex = topicRegex;
-        this.startPartition = startPartition;
-        this.endPartition = endPartition;
+        this.activeTopics = activeTopics == null ? TopicsSpec.EMPTY : 
activeTopics.immutableCopy();
     }
 
     @JsonProperty
@@ -108,18 +101,8 @@ public class ConsumeBenchSpec extends TaskSpec {
     }
 
     @JsonProperty
-    public String topicRegex() {
-        return topicRegex;
-    }
-
-    @JsonProperty
-    public int startPartition() {
-        return startPartition;
-    }
-
-    @JsonProperty
-    public int endPartition() {
-        return endPartition;
+    public TopicsSpec activeTopics() {
+        return activeTopics;
     }
 
     @Override
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
index 5c74d90..1a85296 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java
@@ -24,7 +24,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.Time;
@@ -40,6 +39,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.kafka.trogdor.task.TaskWorker;
 
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
@@ -84,19 +85,14 @@ public class ConsumeBenchWorker implements TaskWorker {
         @Override
         public void run() {
             try {
-                // find topics to consume from based on provided topic regular 
expression
-                if (spec.topicRegex() == null) {
-                    throw new ConfigException(
-                        "Must provide topic name or regular expression to 
match existing topics.");
+                HashSet<TopicPartition> partitions = new HashSet<>();
+                for (Map.Entry<String, PartitionsSpec> entry : 
spec.activeTopics().materialize().entrySet()) {
+                    for (Integer partitionNumber : 
entry.getValue().partitionNumbers()) {
+                        partitions.add(new TopicPartition(entry.getKey(), 
partitionNumber));
+                    }
                 }
-                Collection<TopicPartition> topicPartitions =
-                    WorkerUtils.getMatchingTopicPartitions(
-                        log, spec.bootstrapServers(),
-                        spec.commonClientConf(), spec.adminClientConf(),
-                        spec.topicRegex(), spec.startPartition(), 
spec.endPartition());
-                log.info("Will consume from {}", topicPartitions);
-
-                executor.submit(new ConsumeMessages(topicPartitions));
+                log.info("Will consume from {}", partitions);
+                executor.submit(new ConsumeMessages(partitions));
             } catch (Throwable e) {
                 WorkerUtils.abort(log, "Prepare", e, doneFuture);
             }
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java
new file mode 100644
index 0000000..75f85c4
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.trogdor.rest.Message;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Describes some partitions.
+ */
+public class PartitionsSpec extends Message {
+    private final static short DEFAULT_REPLICATION_FACTOR = 3;
+    private final static short DEFAULT_NUM_PARTITIONS = 1;
+
+    private final int numPartitions;
+    private final short replicationFactor;
+    private final Map<Integer, List<Integer>> partitionAssignments;
+
+    @JsonCreator
+    public PartitionsSpec(@JsonProperty("numPartitions") int numPartitions,
+            @JsonProperty("replicationFactor") short replicationFactor,
+            @JsonProperty("partitionAssignments") Map<Integer, List<Integer>> 
partitionAssignments) {
+        this.numPartitions = numPartitions;
+        this.replicationFactor = replicationFactor;
+        this.partitionAssignments = partitionAssignments == null ?
+            new HashMap<Integer, List<Integer>>() : partitionAssignments;
+    }
+
+    @JsonProperty
+    public int numPartitions() {
+        return numPartitions;
+    }
+
+    public List<Integer> partitionNumbers() {
+        if (partitionAssignments.isEmpty()) {
+            ArrayList<Integer> partitionNumbers = new ArrayList<>();
+            int effectiveNumPartitions = numPartitions <= 0 ? 
DEFAULT_NUM_PARTITIONS : numPartitions;
+            for (int i = 0; i < effectiveNumPartitions; i++) {
+                partitionNumbers.add(i);
+            }
+            return partitionNumbers;
+        } else {
+            return new ArrayList<>(partitionAssignments.keySet());
+        }
+    }
+
+    @JsonProperty
+    public short replicationFactor() {
+        return replicationFactor;
+    }
+
+    @JsonProperty
+    public Map<Integer, List<Integer>> partitionAssignmentsap() {
+        return partitionAssignments;
+    }
+
+    public NewTopic newTopic(String topicName) {
+        if (partitionAssignments.isEmpty()) {
+            int effectiveNumPartitions = numPartitions <= 0 ?
+                DEFAULT_NUM_PARTITIONS : numPartitions;
+            short effectiveReplicationFactor = replicationFactor <= 0 ?
+                DEFAULT_REPLICATION_FACTOR : replicationFactor;
+            return new NewTopic(topicName, effectiveNumPartitions, 
effectiveReplicationFactor);
+        } else {
+            return new NewTopic(topicName, partitionAssignments);
+        }
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
index ec6e309..30878bf 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
@@ -32,11 +32,6 @@ import java.util.Set;
  * The specification for a benchmark that produces messages to a set of topics.
  */
 public class ProduceBenchSpec extends TaskSpec {
-
-    private static final String DEFAULT_TOPIC_PREFIX = "produceBenchTopic";
-    private static final int DEFAULT_NUM_PARTITIONS = 1;
-    private static final short DEFAULT_REPLICATION_FACTOR = 3;
-
     private final String producerNode;
     private final String bootstrapServers;
     private final int targetMessagesPerSec;
@@ -46,11 +41,8 @@ public class ProduceBenchSpec extends TaskSpec {
     private final Map<String, String> producerConf;
     private final Map<String, String> adminClientConf;
     private final Map<String, String> commonClientConf;
-    private final int totalTopics;
-    private final int activeTopics;
-    private final String topicPrefix;
-    private final int numPartitions;
-    private final short replicationFactor;
+    private final TopicsSpec activeTopics;
+    private final TopicsSpec inactiveTopics;
 
     @JsonCreator
     public ProduceBenchSpec(@JsonProperty("startMs") long startMs,
@@ -64,11 +56,8 @@ public class ProduceBenchSpec extends TaskSpec {
                          @JsonProperty("producerConf") Map<String, String> 
producerConf,
                          @JsonProperty("commonClientConf") Map<String, String> 
commonClientConf,
                          @JsonProperty("adminClientConf") Map<String, String> 
adminClientConf,
-                         @JsonProperty("totalTopics") int totalTopics,
-                         @JsonProperty("activeTopics") int activeTopics,
-                         @JsonProperty("topicPrefix") String topicPrefix,
-                         @JsonProperty("partitionsPerTopic") int 
partitionsPerTopic,
-                         @JsonProperty("replicationFactor") short 
replicationFactor) {
+                         @JsonProperty("activeTopics") TopicsSpec activeTopics,
+                         @JsonProperty("inactiveTopics") TopicsSpec 
inactiveTopics) {
         super(startMs, durationMs);
         this.producerNode = (producerNode == null) ? "" : producerNode;
         this.bootstrapServers = (bootstrapServers == null) ? "" : 
bootstrapServers;
@@ -81,13 +70,10 @@ public class ProduceBenchSpec extends TaskSpec {
         this.producerConf = configOrEmptyMap(producerConf);
         this.commonClientConf = configOrEmptyMap(commonClientConf);
         this.adminClientConf = configOrEmptyMap(adminClientConf);
-        this.totalTopics = totalTopics;
-        this.activeTopics = activeTopics;
-        this.topicPrefix = (topicPrefix == null) ? DEFAULT_TOPIC_PREFIX : 
topicPrefix;
-        this.numPartitions = (partitionsPerTopic == 0)
-                             ? DEFAULT_NUM_PARTITIONS : partitionsPerTopic;
-        this.replicationFactor = (replicationFactor == 0)
-                                 ? DEFAULT_REPLICATION_FACTOR : 
replicationFactor;
+        this.activeTopics = (activeTopics == null) ?
+            TopicsSpec.EMPTY : activeTopics.immutableCopy();
+        this.inactiveTopics = (inactiveTopics == null) ?
+            TopicsSpec.EMPTY : inactiveTopics.immutableCopy();
     }
 
     @JsonProperty
@@ -136,28 +122,13 @@ public class ProduceBenchSpec extends TaskSpec {
     }
 
     @JsonProperty
-    public int totalTopics() {
-        return totalTopics;
-    }
-
-    @JsonProperty
-    public int activeTopics() {
+    public TopicsSpec activeTopics() {
         return activeTopics;
     }
 
     @JsonProperty
-    public String topicPrefix() {
-        return topicPrefix;
-    }
-
-    @JsonProperty
-    public int numPartitions() {
-        return numPartitions;
-    }
-
-    @JsonProperty
-    public short replicationFactor() {
-        return replicationFactor;
+    public TopicsSpec inactiveTopics() {
+        return inactiveTopics;
     }
 
     @Override
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
index 4c3095f..dc749eb 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
@@ -26,7 +26,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.Time;
@@ -40,6 +40,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Callable;
@@ -66,16 +68,6 @@ public class ProduceBenchWorker implements TaskWorker {
 
     private KafkaFutureImpl<String> doneFuture;
 
-    /**
-     * Generate a topic name based on a topic number.
-     *
-     * @param topicIndex        The topic number.
-     * @return                  The topic name.
-     */
-    public String topicIndexToName(int topicIndex) {
-        return String.format("%s%05d", spec.topicPrefix(), topicIndex);
-    }
-
     public ProduceBenchWorker(String id, ProduceBenchSpec spec) {
         this.id = id;
         this.spec = spec;
@@ -88,7 +80,9 @@ public class ProduceBenchWorker implements TaskWorker {
             throw new IllegalStateException("ProducerBenchWorker is already 
running.");
         }
         log.info("{}: Activating ProduceBenchWorker with {}", id, spec);
-        this.executor = Executors.newScheduledThreadPool(1,
+        // Create an executor with 2 threads.  We need the second thread so
+        // that the StatusUpdater can run in parallel with SendRecords.
+        this.executor = Executors.newScheduledThreadPool(2,
             ThreadUtils.createThreadFactory("ProduceBenchWorkerThread%d", 
false));
         this.status = status;
         this.doneFuture = doneFuture;
@@ -99,25 +93,31 @@ public class ProduceBenchWorker implements TaskWorker {
         @Override
         public void run() {
             try {
-                if (spec.activeTopics() == 0) {
-                    throw new ConfigException("Can't have activeTopics == 0.");
+                Map<String, NewTopic> newTopics = new HashMap<>();
+                HashSet<TopicPartition> active = new HashSet<>();
+                for (Map.Entry<String, PartitionsSpec> entry :
+                        spec.activeTopics().materialize().entrySet()) {
+                    String topicName = entry.getKey();
+                    PartitionsSpec partSpec = entry.getValue();
+                    newTopics.put(topicName, partSpec.newTopic(topicName));
+                    for (Integer partitionNumber : 
partSpec.partitionNumbers()) {
+                        active.add(new TopicPartition(topicName, 
partitionNumber));
+                    }
                 }
-                if (spec.totalTopics() < spec.activeTopics()) {
-                    throw new ConfigException(String.format(
-                        "activeTopics was %d, but totalTopics was only %d.  
activeTopics must " +
-                            "be less than or equal to totalTopics.", 
spec.activeTopics(), spec.totalTopics()));
+                if (active.isEmpty()) {
+                    throw new RuntimeException("You must specify at least one 
active topic.");
                 }
-                Map<String, NewTopic> newTopics = new HashMap<>();
-                for (int i = 0; i < spec.totalTopics(); i++) {
-                    String name = topicIndexToName(i);
-                    newTopics.put(name, new NewTopic(name, 
spec.numPartitions(),
-                                                     
spec.replicationFactor()));
+                for (Map.Entry<String, PartitionsSpec> entry :
+                        spec.inactiveTopics().materialize().entrySet()) {
+                    String topicName = entry.getKey();
+                    PartitionsSpec partSpec = entry.getValue();
+                    newTopics.put(topicName, partSpec.newTopic(topicName));
                 }
-                status.update(new TextNode("Creating " + spec.totalTopics() + 
" topic(s)"));
+                status.update(new TextNode("Creating " + 
newTopics.keySet().size() + " topic(s)"));
                 WorkerUtils.createTopics(log, spec.bootstrapServers(), 
spec.commonClientConf(),
                                          spec.adminClientConf(), newTopics, 
false);
-                status.update(new TextNode("Created " + spec.totalTopics() + " 
topic(s)"));
-                executor.submit(new SendRecords());
+                status.update(new TextNode("Created " + 
newTopics.keySet().size() + " topic(s)"));
+                executor.submit(new SendRecords(active));
             } catch (Throwable e) {
                 WorkerUtils.abort(log, "Prepare", e, doneFuture);
             }
@@ -167,6 +167,8 @@ public class ProduceBenchWorker implements TaskWorker {
     }
 
     public class SendRecords implements Callable<Void> {
+        private final HashSet<TopicPartition> activePartitions;
+
         private final Histogram histogram;
 
         private final Future<?> statusUpdaterFuture;
@@ -179,7 +181,8 @@ public class ProduceBenchWorker implements TaskWorker {
 
         private final Throttle throttle;
 
-        SendRecords() {
+        SendRecords(HashSet<TopicPartition> activePartitions) {
+            this.activePartitions = activePartitions;
             this.histogram = new Histogram(5000);
             int perPeriod = 
WorkerUtils.perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
             this.statusUpdaterFuture = executor.scheduleWithFixedDelay(
@@ -201,13 +204,16 @@ public class ProduceBenchWorker implements TaskWorker {
             try {
                 Future<RecordMetadata> future = null;
                 try {
+                    Iterator<TopicPartition> iter = 
activePartitions.iterator();
                     for (int m = 0; m < spec.maxMessages(); m++) {
-                        for (int i = 0; i < spec.activeTopics(); i++) {
-                            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<byte[], byte[]>(
-                                topicIndexToName(i), 0, keys.next(), 
values.next());
-                            future = producer.send(record,
-                                new SendRecordsCallback(this, 
Time.SYSTEM.milliseconds()));
+                        if (!iter.hasNext()) {
+                            iter = activePartitions.iterator();
                         }
+                        TopicPartition partition = iter.next();
+                        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(
+                            partition.topic(), partition.partition(), 
keys.next(), values.next());
+                        future = producer.send(record,
+                            new SendRecordsCallback(this, 
Time.SYSTEM.milliseconds()));
                         throttle.increment();
                     }
                 } finally {
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
index 12b0c08..570f6a1 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -17,6 +17,9 @@
 
 package org.apache.kafka.trogdor.workload;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.node.TextNode;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -27,6 +30,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
@@ -35,6 +39,7 @@ import 
org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.trogdor.common.JsonUtil;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.ThreadUtils;
 import org.apache.kafka.trogdor.common.WorkerUtils;
@@ -46,33 +51,31 @@ import org.slf4j.LoggerFactory;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.ArrayList;
-import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.TreeSet;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class RoundTripWorker implements TaskWorker {
     private static final int THROTTLE_PERIOD_MS = 100;
 
-    private static final int MESSAGE_SIZE = 512;
-
     private static final int LOG_INTERVAL_MS = 5000;
 
     private static final int LOG_NUM_MESSAGES = 10;
 
-    private static final String TOPIC_NAME = "round_trip_topic";
-
     private static final Logger log = 
LoggerFactory.getLogger(RoundTripWorker.class);
 
     private static final PayloadGenerator KEY_GENERATOR = new 
SequentialPayloadGenerator(4, 0);
 
-    private final ToReceiveTracker toReceiveTracker = new ToReceiveTracker();
+    private ToReceiveTracker toReceiveTracker;
 
     private final String id;
 
@@ -80,18 +83,20 @@ public class RoundTripWorker implements TaskWorker {
 
     private final AtomicBoolean running = new AtomicBoolean(false);
 
-    private ExecutorService executor;
+    private ScheduledExecutorService executor;
+
+    private WorkerStatusTracker status;
 
     private KafkaFutureImpl<String> doneFuture;
 
     private KafkaProducer<byte[], byte[]> producer;
 
-    private PayloadGenerator payloadGenerator;
-
     private KafkaConsumer<byte[], byte[]> consumer;
 
     private CountDownLatch unackedSends;
 
+    private ToSendTracker toSendTracker;
+
     public RoundTripWorker(String id, RoundTripWorkloadSpec spec) {
         this.id = id;
         this.spec = spec;
@@ -104,8 +109,9 @@ public class RoundTripWorker implements TaskWorker {
             throw new IllegalStateException("RoundTripWorker is already 
running.");
         }
         log.info("{}: Activating RoundTripWorker.", id);
-        this.executor = Executors.newCachedThreadPool(
+        this.executor = Executors.newScheduledThreadPool(3,
             ThreadUtils.createThreadFactory("RoundTripWorker%d", false));
+        this.status = status;
         this.doneFuture = doneFuture;
         this.producer = null;
         this.consumer = null;
@@ -120,16 +126,31 @@ public class RoundTripWorker implements TaskWorker {
                 if (spec.targetMessagesPerSec() <= 0) {
                     throw new ConfigException("Can't have targetMessagesPerSec 
<= 0.");
                 }
-                if ((spec.partitionAssignments() == null) || 
spec.partitionAssignments().isEmpty()) {
-                    throw new ConfigException("Invalid null or empty 
partitionAssignments.");
+                Map<String, NewTopic> newTopics = new HashMap<>();
+                HashSet<TopicPartition> active = new HashSet<>();
+                for (Map.Entry<String, PartitionsSpec> entry :
+                    spec.activeTopics().materialize().entrySet()) {
+                    String topicName = entry.getKey();
+                    PartitionsSpec partSpec = entry.getValue();
+                    newTopics.put(topicName, partSpec.newTopic(topicName));
+                    for (Integer partitionNumber : 
partSpec.partitionNumbers()) {
+                        active.add(new TopicPartition(topicName, 
partitionNumber));
+                    }
+                }
+                if (active.isEmpty()) {
+                    throw new RuntimeException("You must specify at least one 
active topic.");
                 }
-                WorkerUtils.createTopics(
-                    log, spec.bootstrapServers(), spec.commonClientConf(), 
spec.adminClientConf(),
-                    Collections.singletonMap(TOPIC_NAME,
-                                             new NewTopic(TOPIC_NAME, 
spec.partitionAssignments())),
-                    true);
-                executor.submit(new ProducerRunnable());
-                executor.submit(new ConsumerRunnable());
+                status.update(new TextNode("Creating " + 
newTopics.keySet().size() + " topic(s)"));
+                WorkerUtils.createTopics(log, spec.bootstrapServers(), 
spec.commonClientConf(),
+                    spec.adminClientConf(), newTopics, true);
+                status.update(new TextNode("Created " + 
newTopics.keySet().size() + " topic(s)"));
+                toSendTracker = new ToSendTracker(spec.maxMessages());
+                toReceiveTracker = new ToReceiveTracker();
+                executor.submit(new ProducerRunnable(active));
+                executor.submit(new ConsumerRunnable(active));
+                executor.submit(new StatusUpdater());
+                executor.scheduleWithFixedDelay(
+                    new StatusUpdater(), 30, 30, TimeUnit.SECONDS);
             } catch (Throwable e) {
                 WorkerUtils.abort(log, "Prepare", e, doneFuture);
             }
@@ -159,6 +180,10 @@ public class RoundTripWorker implements TaskWorker {
             failed.add(index);
         }
 
+        synchronized int frontier() {
+            return frontier;
+        }
+
         synchronized ToSendTrackerResult next() {
             if (failed.isEmpty()) {
                 if (frontier >= maxMessages) {
@@ -173,9 +198,11 @@ public class RoundTripWorker implements TaskWorker {
     }
 
     class ProducerRunnable implements Runnable {
+        private final HashSet<TopicPartition> partitions;
         private final Throttle throttle;
 
-        ProducerRunnable() {
+        ProducerRunnable(HashSet<TopicPartition> partitions) {
+            this.partitions = partitions;
             Properties props = new Properties();
             props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
spec.bootstrapServers());
             props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
@@ -195,11 +222,11 @@ public class RoundTripWorker implements TaskWorker {
 
         @Override
         public void run() {
-            final ToSendTracker toSendTracker = new 
ToSendTracker(spec.maxMessages());
             long messagesSent = 0;
             long uniqueMessagesSent = 0;
             log.debug("{}: Starting RoundTripWorker#ProducerRunnable.", id);
             try {
+                Iterator<TopicPartition> iter = partitions.iterator();
                 while (true) {
                     final ToSendTrackerResult result = toSendTracker.next();
                     if (result == null) {
@@ -212,9 +239,13 @@ public class RoundTripWorker implements TaskWorker {
                         uniqueMessagesSent++;
                     }
                     messagesSent++;
+                    if (!iter.hasNext()) {
+                        iter = partitions.iterator();
+                    }
+                    TopicPartition partition = iter.next();
                     // we explicitly specify generator position based on 
message index
-                    ProducerRecord<byte[], byte[]> record = new 
ProducerRecord(TOPIC_NAME, 0,
-                        KEY_GENERATOR.generate(messageIndex),
+                    ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(partition.topic(),
+                        partition.partition(), 
KEY_GENERATOR.generate(messageIndex),
                         spec.valueGenerator().generate(messageIndex));
                     producer.send(record, new Callback() {
                         @Override
@@ -242,12 +273,23 @@ public class RoundTripWorker implements TaskWorker {
     private class ToReceiveTracker {
         private final TreeSet<Integer> pending = new TreeSet<>();
 
+        private int totalReceived = 0;
+
         synchronized void addPending(int messageIndex) {
             pending.add(messageIndex);
         }
 
         synchronized boolean removePending(int messageIndex) {
-            return pending.remove(messageIndex);
+            if (pending.remove(messageIndex)) {
+                totalReceived++;
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        synchronized int totalReceived() {
+            return totalReceived;
         }
 
         void log() {
@@ -269,7 +311,7 @@ public class RoundTripWorker implements TaskWorker {
     class ConsumerRunnable implements Runnable {
         private final Properties props;
 
-        ConsumerRunnable() {
+        ConsumerRunnable(HashSet<TopicPartition> partitions) {
             this.props = new Properties();
             props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
spec.bootstrapServers());
             props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id);
@@ -281,7 +323,7 @@ public class RoundTripWorker implements TaskWorker {
             WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), 
spec.consumerConf());
             consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(),
                 new ByteArrayDeserializer());
-            consumer.subscribe(Collections.singleton(TOPIC_NAME));
+            consumer.assign(partitions);
         }
 
         @Override
@@ -296,7 +338,8 @@ public class RoundTripWorker implements TaskWorker {
                     try {
                         pollInvoked++;
                         ConsumerRecords<byte[], byte[]> records = 
consumer.poll(50);
-                        for (ConsumerRecord<byte[], byte[]> record : 
records.records(TOPIC_NAME)) {
+                        for (Iterator<ConsumerRecord<byte[], byte[]>> iter = 
records.iterator(); iter.hasNext(); ) {
+                            ConsumerRecord<byte[], byte[]> record = 
iter.next();
                             int messageIndex = 
ByteBuffer.wrap(record.key()).order(ByteOrder.LITTLE_ENDIAN).getInt();
                             messagesReceived++;
                             if (toReceiveTracker.removePending(messageIndex)) {
@@ -306,6 +349,7 @@ public class RoundTripWorker implements TaskWorker {
                                         "Waiting for all sends to be 
acked...", id, spec.maxMessages());
                                     unackedSends.await();
                                     log.info("{}: all sends have been acked.", 
id);
+                                    new StatusUpdater().update();
                                     doneFuture.complete("");
                                     return;
                                 }
@@ -332,6 +376,46 @@ public class RoundTripWorker implements TaskWorker {
         }
     }
 
+    public class StatusUpdater implements Runnable {
+        @Override
+        public void run() {
+            try {
+                update();
+            } catch (Exception e) {
+                WorkerUtils.abort(log, "StatusUpdater", e, doneFuture);
+            }
+        }
+
+        StatusData update() {
+            StatusData statusData =
+                new StatusData(toSendTracker.frontier(), 
toReceiveTracker.totalReceived());
+            status.update(JsonUtil.JSON_SERDE.valueToTree(statusData));
+            return statusData;
+        }
+    }
+
+    public static class StatusData {
+        private final long totalUniqueSent;
+        private final long totalReceived;
+
+        @JsonCreator
+        public StatusData(@JsonProperty("totalUniqueSent") long 
totalUniqueSent,
+                          @JsonProperty("totalReceived") long totalReceived) {
+            this.totalUniqueSent = totalUniqueSent;
+            this.totalReceived = totalReceived;
+        }
+
+        @JsonProperty
+        public long totalUniqueSent() {
+            return totalUniqueSent;
+        }
+
+        @JsonProperty
+        public long totalReceived() {
+            return totalReceived;
+        }
+    }
+
     @Override
     public void stop(Platform platform) throws Exception {
         if (!running.compareAndSet(true, false)) {
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
index 3d0e3ef..9522e0a 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java
@@ -25,11 +25,8 @@ import org.apache.kafka.trogdor.task.TaskSpec;
 import org.apache.kafka.trogdor.task.TaskWorker;
 
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
-import java.util.NavigableMap;
 import java.util.Set;
-import java.util.TreeMap;
 
 /**
  * The specification for a workload that sends messages to a broker and then
@@ -39,8 +36,8 @@ public class RoundTripWorkloadSpec extends TaskSpec {
     private final String clientNode;
     private final String bootstrapServers;
     private final int targetMessagesPerSec;
-    private final NavigableMap<Integer, List<Integer>> partitionAssignments;
     private final PayloadGenerator valueGenerator;
+    private final TopicsSpec activeTopics;
     private final int maxMessages;
     private final Map<String, String> commonClientConf;
     private final Map<String, String> producerConf;
@@ -57,17 +54,17 @@ public class RoundTripWorkloadSpec extends TaskSpec {
              @JsonProperty("consumerConf") Map<String, String> consumerConf,
              @JsonProperty("producerConf") Map<String, String> producerConf,
              @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec,
-             @JsonProperty("partitionAssignments") NavigableMap<Integer, 
List<Integer>> partitionAssignments,
              @JsonProperty("valueGenerator") PayloadGenerator valueGenerator,
+             @JsonProperty("activeTopics") TopicsSpec activeTopics,
              @JsonProperty("maxMessages") int maxMessages) {
         super(startMs, durationMs);
         this.clientNode = clientNode == null ? "" : clientNode;
         this.bootstrapServers = bootstrapServers == null ? "" : 
bootstrapServers;
         this.targetMessagesPerSec = targetMessagesPerSec;
-        this.partitionAssignments = partitionAssignments == null ?
-            new TreeMap<Integer, List<Integer>>() : partitionAssignments;
         this.valueGenerator = valueGenerator == null ?
             new UniformRandomPayloadGenerator(32, 123, 10) : valueGenerator;
+        this.activeTopics = activeTopics == null ?
+            TopicsSpec.EMPTY : activeTopics.immutableCopy();
         this.maxMessages = maxMessages;
         this.commonClientConf = configOrEmptyMap(commonClientConf);
         this.adminClientConf = configOrEmptyMap(adminClientConf);
@@ -91,8 +88,8 @@ public class RoundTripWorkloadSpec extends TaskSpec {
     }
 
     @JsonProperty
-    public NavigableMap<Integer, List<Integer>> partitionAssignments() {
-        return partitionAssignments;
+    public TopicsSpec activeTopics() {
+        return activeTopics;
     }
 
     @JsonProperty
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java
new file mode 100644
index 0000000..a9b550d
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonAnyGetter;
+import com.fasterxml.jackson.annotation.JsonAnySetter;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.kafka.trogdor.common.StringExpander;
+import org.apache.kafka.trogdor.rest.Message;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * TopicsSpec maps topic names to descriptions of the partitions in them.
+ *
+ * In JSON form, this is serialized as a map whose keys are topic names,
+ * and whose entries are partition descriptions.
+ * Keys may also refer to multiple partitions.  For example, this specification
+ * refers to 3 topics foo1, foo2, and foo3:
+ *
+ * {
+ *   "foo[1-3]" : {
+ *      "numPartitions": 3
+ *      "replicationFactor": 3
+ *    }
+ * }
+ */
+public class TopicsSpec extends Message {
+    public static final TopicsSpec EMPTY = new TopicsSpec().immutableCopy();
+
+    private final Map<String, PartitionsSpec> map;
+
+    @JsonCreator
+    public TopicsSpec() {
+        this.map = new HashMap<>();
+    }
+
+    private TopicsSpec(Map<String, PartitionsSpec> map) {
+        this.map = map;
+    }
+
+    @JsonAnyGetter
+    public Map<String, PartitionsSpec> get() {
+        return map;
+    }
+
+    @JsonAnySetter
+    public void set(String name, PartitionsSpec value) {
+        map.put(name, value);
+    }
+
+    public TopicsSpec immutableCopy() {
+        HashMap<String, PartitionsSpec> mapCopy = new HashMap<>();
+        mapCopy.putAll(map);
+        return new TopicsSpec(Collections.unmodifiableMap(mapCopy));
+    }
+
+    /**
+     * Enumerate the partitions inside this TopicsSpec.
+     *
+     * @return      A map from topic names to PartitionsSpec objects.
+     */
+    public Map<String, PartitionsSpec> materialize() {
+        HashMap<String, PartitionsSpec> all = new HashMap<>();
+        for (Map.Entry<String, PartitionsSpec> entry : map.entrySet()) {
+            for (String topicName : StringExpander.expand(entry.getKey())) {
+                all.put(topicName, entry.getValue());
+            }
+        }
+        return all;
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
 
b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
index c1f7490..55ecb1a 100644
--- 
a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
@@ -26,9 +26,10 @@ import org.apache.kafka.trogdor.rest.TasksResponse;
 import org.apache.kafka.trogdor.rest.WorkerDone;
 import org.apache.kafka.trogdor.rest.WorkerRunning;
 import org.apache.kafka.trogdor.rest.WorkerStopping;
-import org.apache.kafka.trogdor.task.SampleTaskSpec;
+import org.apache.kafka.trogdor.workload.PartitionsSpec;
 import org.apache.kafka.trogdor.workload.ProduceBenchSpec;
 import org.apache.kafka.trogdor.workload.RoundTripWorkloadSpec;
+import org.apache.kafka.trogdor.workload.TopicsSpec;
 import org.junit.Test;
 
 import java.lang.reflect.Field;
@@ -49,10 +50,11 @@ public class JsonSerializationTest {
         verify(new WorkerRunning(null, null, 0, null));
         verify(new WorkerStopping(null, null, 0, null));
         verify(new ProduceBenchSpec(0, 0, null, null,
-            0, 0, null, null, null, null, null, 0, 0, "test-topic", 1, (short) 
3));
+            0, 0, null, null, null, null, null, null, null));
         verify(new RoundTripWorkloadSpec(0, 0, null, null, null, null, null, 
null,
             0, null, null, 0));
-        verify(new SampleTaskSpec(0, 0, null, null));
+        verify(new TopicsSpec());
+        verify(new PartitionsSpec(0, (short) 0, null));
     }
 
     private <T> void verify(T val1) throws Exception {
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/common/StringExpanderTest.java 
b/tools/src/test/java/org/apache/kafka/trogdor/common/StringExpanderTest.java
new file mode 100644
index 0000000..72e1c20
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/common/StringExpanderTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.common;
+
+import org.junit.Rule;
+import org.junit.rules.Timeout;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+
+import static org.junit.Assert.assertEquals;
+
+public class StringExpanderTest {
+    @Rule
+    final public Timeout globalTimeout = Timeout.millis(120000);
+
+    @Test
+    public void testNoExpansionNeeded() throws Exception {
+        assertEquals(Collections.singleton("foo"), 
StringExpander.expand("foo"));
+        assertEquals(Collections.singleton("bar"), 
StringExpander.expand("bar"));
+        assertEquals(Collections.singleton(""), StringExpander.expand(""));
+    }
+
+    @Test
+    public void testExpansions() throws Exception {
+        HashSet<String> expected1 = new HashSet<>(Arrays.asList(
+            "foo1",
+            "foo2",
+            "foo3"
+        ));
+        assertEquals(expected1, StringExpander.expand("foo[1-3]"));
+
+        HashSet<String> expected2 = new HashSet<>(Arrays.asList(
+            "foo bar baz 0"
+        ));
+        assertEquals(expected2, StringExpander.expand("foo bar baz [0-0]"));
+
+        HashSet<String> expected3 = new HashSet<>(Arrays.asList(
+            "[[ wow50 ]]",
+            "[[ wow51 ]]",
+            "[[ wow52 ]]"
+        ));
+        assertEquals(expected3, StringExpander.expand("[[ wow[50-52] ]]"));
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/workload/TopicsSpecTest.java 
b/tools/src/test/java/org/apache/kafka/trogdor/workload/TopicsSpecTest.java
new file mode 100644
index 0000000..f86ca0f
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/trogdor/workload/TopicsSpecTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TopicsSpecTest {
+    @Rule
+    final public Timeout globalTimeout = Timeout.millis(120000);
+
+    private final static TopicsSpec FOO;
+    private final static PartitionsSpec PARTSA;
+    private final static PartitionsSpec PARTSB;
+
+    static {
+        FOO = new TopicsSpec();
+
+        PARTSA = new PartitionsSpec(3, (short) 3, null);
+        FOO.set("topicA[0-2]", PARTSA);
+
+        Map<Integer, List<Integer>> assignmentsB = new HashMap<>();
+        assignmentsB.put(0, Arrays.asList(0, 1, 2));
+        assignmentsB.put(1, Arrays.asList(2, 3, 4));
+        PARTSB = new PartitionsSpec(0, (short) 0, assignmentsB);
+        FOO.set("topicB", PARTSB);
+    }
+
+    @Test
+    public void testMaterialize() {
+        Map<String, PartitionsSpec> parts = FOO.materialize();
+        assertTrue(parts.containsKey("topicA0"));
+        assertTrue(parts.containsKey("topicA1"));
+        assertTrue(parts.containsKey("topicA2"));
+        assertTrue(parts.containsKey("topicB"));
+        assertEquals(4, parts.keySet().size());
+        assertEquals(PARTSA, parts.get("topicA0"));
+        assertEquals(PARTSA, parts.get("topicA1"));
+        assertEquals(PARTSA, parts.get("topicA2"));
+        assertEquals(PARTSB, parts.get("topicB"));
+    }
+
+    @Test
+    public void testPartitionNumbers() {
+        List<Integer> partsANumbers = PARTSA.partitionNumbers();
+        assertEquals(Integer.valueOf(0), partsANumbers.get(0));
+        assertEquals(Integer.valueOf(1), partsANumbers.get(1));
+        assertEquals(Integer.valueOf(2), partsANumbers.get(2));
+        assertEquals(3, partsANumbers.size());
+
+        List<Integer> partsBNumbers = PARTSB.partitionNumbers();
+        assertEquals(Integer.valueOf(0), partsBNumbers.get(0));
+        assertEquals(Integer.valueOf(1), partsBNumbers.get(1));
+        assertEquals(2, partsBNumbers.size());
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
rsiva...@apache.org.

Reply via email to