This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 93e0341 KAFKA-6771. Make specifying partitions more flexible (#4850) 93e0341 is described below commit 93e03414f72c24854abaa6925149196580cfc691 Author: Colin Patrick McCabe <co...@cmccabe.xyz> AuthorDate: Mon Apr 16 00:55:13 2018 -0700 KAFKA-6771. Make specifying partitions more flexible (#4850) --- .../kafka/trogdor/common/StringExpander.java | 56 +++++++++ .../kafka/trogdor/workload/ConsumeBenchSpec.java | 27 +--- .../kafka/trogdor/workload/ConsumeBenchWorker.java | 22 ++-- .../kafka/trogdor/workload/PartitionsSpec.java | 90 +++++++++++++ .../kafka/trogdor/workload/ProduceBenchSpec.java | 51 ++------ .../kafka/trogdor/workload/ProduceBenchWorker.java | 70 ++++++----- .../kafka/trogdor/workload/RoundTripWorker.java | 140 ++++++++++++++++----- .../trogdor/workload/RoundTripWorkloadSpec.java | 15 +-- .../apache/kafka/trogdor/workload/TopicsSpec.java | 89 +++++++++++++ .../trogdor/common/JsonSerializationTest.java | 8 +- .../kafka/trogdor/common/StringExpanderTest.java | 62 +++++++++ .../kafka/trogdor/workload/TopicsSpecTest.java | 80 ++++++++++++ 12 files changed, 563 insertions(+), 147 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java b/tools/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java new file mode 100644 index 0000000..82f5003 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/common/StringExpander.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.common; + +import java.util.HashSet; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Utilities for expanding strings that have range expressions in them. + * + * For example, 'foo[1-3]' would be expaneded to foo1, foo2, foo3. + * Strings that have no range expressions will not be expanded. + */ +public class StringExpander { + private final static Pattern NUMERIC_RANGE_PATTERN = + Pattern.compile("(.*?)\\[([0-9]*)\\-([0-9]*)\\](.*?)"); + + public static HashSet<String> expand(String val) { + HashSet<String> set = new HashSet<>(); + Matcher matcher = NUMERIC_RANGE_PATTERN.matcher(val); + if (!matcher.matches()) { + set.add(val); + return set; + } + String prequel = matcher.group(1); + String rangeStart = matcher.group(2); + String rangeEnd = matcher.group(3); + String epilog = matcher.group(4); + int rangeStartInt = Integer.parseInt(rangeStart); + int rangeEndInt = Integer.parseInt(rangeEnd); + if (rangeEndInt < rangeStartInt) { + throw new RuntimeException("Invalid range: start " + rangeStartInt + + " is higher than end " + rangeEndInt); + } + for (int i = rangeStartInt; i <= rangeEndInt; i++) { + set.add(String.format("%s%d%s", prequel, i, epilog)); + } + return set; + } +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java index cef913b..1b429ea 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchSpec.java @@ -41,10 +41,7 @@ public class ConsumeBenchSpec extends TaskSpec { private final Map<String, String> consumerConf; private final Map<String, String> adminClientConf; private final Map<String, String> commonClientConf; - private final String topicRegex; - private final int startPartition; - private final int endPartition; - + private final TopicsSpec activeTopics; @JsonCreator public ConsumeBenchSpec(@JsonProperty("startMs") long startMs, @@ -56,9 +53,7 @@ public class ConsumeBenchSpec extends TaskSpec { @JsonProperty("consumerConf") Map<String, String> consumerConf, @JsonProperty("commonClientConf") Map<String, String> commonClientConf, @JsonProperty("adminClientConf") Map<String, String> adminClientConf, - @JsonProperty("topicRegex") String topicRegex, - @JsonProperty("startPartition") int startPartition, - @JsonProperty("endPartition") int endPartition) { + @JsonProperty("activeTopics") TopicsSpec activeTopics) { super(startMs, durationMs); this.consumerNode = (consumerNode == null) ? "" : consumerNode; this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers; @@ -67,9 +62,7 @@ public class ConsumeBenchSpec extends TaskSpec { this.consumerConf = configOrEmptyMap(consumerConf); this.commonClientConf = configOrEmptyMap(commonClientConf); this.adminClientConf = configOrEmptyMap(adminClientConf); - this.topicRegex = topicRegex; - this.startPartition = startPartition; - this.endPartition = endPartition; + this.activeTopics = activeTopics == null ? TopicsSpec.EMPTY : activeTopics.immutableCopy(); } @JsonProperty @@ -108,18 +101,8 @@ public class ConsumeBenchSpec extends TaskSpec { } @JsonProperty - public String topicRegex() { - return topicRegex; - } - - @JsonProperty - public int startPartition() { - return startPartition; - } - - @JsonProperty - public int endPartition() { - return endPartition; + public TopicsSpec activeTopics() { + return activeTopics; } @Override diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java index 5c74d90..1a85296 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java @@ -24,7 +24,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.Time; @@ -40,6 +39,8 @@ import org.slf4j.LoggerFactory; import org.apache.kafka.trogdor.task.TaskWorker; import java.util.Collection; +import java.util.HashSet; +import java.util.Map; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.Executors; @@ -84,19 +85,14 @@ public class ConsumeBenchWorker implements TaskWorker { @Override public void run() { try { - // find topics to consume from based on provided topic regular expression - if (spec.topicRegex() == null) { - throw new ConfigException( - "Must provide topic name or regular expression to match existing topics."); + HashSet<TopicPartition> partitions = new HashSet<>(); + for (Map.Entry<String, PartitionsSpec> entry : spec.activeTopics().materialize().entrySet()) { + for (Integer partitionNumber : entry.getValue().partitionNumbers()) { + partitions.add(new TopicPartition(entry.getKey(), partitionNumber)); + } } - Collection<TopicPartition> topicPartitions = - WorkerUtils.getMatchingTopicPartitions( - log, spec.bootstrapServers(), - spec.commonClientConf(), spec.adminClientConf(), - spec.topicRegex(), spec.startPartition(), spec.endPartition()); - log.info("Will consume from {}", topicPartitions); - - executor.submit(new ConsumeMessages(topicPartitions)); + log.info("Will consume from {}", partitions); + executor.submit(new ConsumeMessages(partitions)); } catch (Throwable e) { WorkerUtils.abort(log, "Prepare", e, doneFuture); } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java new file mode 100644 index 0000000..75f85c4 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/PartitionsSpec.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.workload; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.trogdor.rest.Message; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Describes some partitions. + */ +public class PartitionsSpec extends Message { + private final static short DEFAULT_REPLICATION_FACTOR = 3; + private final static short DEFAULT_NUM_PARTITIONS = 1; + + private final int numPartitions; + private final short replicationFactor; + private final Map<Integer, List<Integer>> partitionAssignments; + + @JsonCreator + public PartitionsSpec(@JsonProperty("numPartitions") int numPartitions, + @JsonProperty("replicationFactor") short replicationFactor, + @JsonProperty("partitionAssignments") Map<Integer, List<Integer>> partitionAssignments) { + this.numPartitions = numPartitions; + this.replicationFactor = replicationFactor; + this.partitionAssignments = partitionAssignments == null ? + new HashMap<Integer, List<Integer>>() : partitionAssignments; + } + + @JsonProperty + public int numPartitions() { + return numPartitions; + } + + public List<Integer> partitionNumbers() { + if (partitionAssignments.isEmpty()) { + ArrayList<Integer> partitionNumbers = new ArrayList<>(); + int effectiveNumPartitions = numPartitions <= 0 ? DEFAULT_NUM_PARTITIONS : numPartitions; + for (int i = 0; i < effectiveNumPartitions; i++) { + partitionNumbers.add(i); + } + return partitionNumbers; + } else { + return new ArrayList<>(partitionAssignments.keySet()); + } + } + + @JsonProperty + public short replicationFactor() { + return replicationFactor; + } + + @JsonProperty + public Map<Integer, List<Integer>> partitionAssignmentsap() { + return partitionAssignments; + } + + public NewTopic newTopic(String topicName) { + if (partitionAssignments.isEmpty()) { + int effectiveNumPartitions = numPartitions <= 0 ? + DEFAULT_NUM_PARTITIONS : numPartitions; + short effectiveReplicationFactor = replicationFactor <= 0 ? + DEFAULT_REPLICATION_FACTOR : replicationFactor; + return new NewTopic(topicName, effectiveNumPartitions, effectiveReplicationFactor); + } else { + return new NewTopic(topicName, partitionAssignments); + } + } +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java index ec6e309..30878bf 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java @@ -32,11 +32,6 @@ import java.util.Set; * The specification for a benchmark that produces messages to a set of topics. */ public class ProduceBenchSpec extends TaskSpec { - - private static final String DEFAULT_TOPIC_PREFIX = "produceBenchTopic"; - private static final int DEFAULT_NUM_PARTITIONS = 1; - private static final short DEFAULT_REPLICATION_FACTOR = 3; - private final String producerNode; private final String bootstrapServers; private final int targetMessagesPerSec; @@ -46,11 +41,8 @@ public class ProduceBenchSpec extends TaskSpec { private final Map<String, String> producerConf; private final Map<String, String> adminClientConf; private final Map<String, String> commonClientConf; - private final int totalTopics; - private final int activeTopics; - private final String topicPrefix; - private final int numPartitions; - private final short replicationFactor; + private final TopicsSpec activeTopics; + private final TopicsSpec inactiveTopics; @JsonCreator public ProduceBenchSpec(@JsonProperty("startMs") long startMs, @@ -64,11 +56,8 @@ public class ProduceBenchSpec extends TaskSpec { @JsonProperty("producerConf") Map<String, String> producerConf, @JsonProperty("commonClientConf") Map<String, String> commonClientConf, @JsonProperty("adminClientConf") Map<String, String> adminClientConf, - @JsonProperty("totalTopics") int totalTopics, - @JsonProperty("activeTopics") int activeTopics, - @JsonProperty("topicPrefix") String topicPrefix, - @JsonProperty("partitionsPerTopic") int partitionsPerTopic, - @JsonProperty("replicationFactor") short replicationFactor) { + @JsonProperty("activeTopics") TopicsSpec activeTopics, + @JsonProperty("inactiveTopics") TopicsSpec inactiveTopics) { super(startMs, durationMs); this.producerNode = (producerNode == null) ? "" : producerNode; this.bootstrapServers = (bootstrapServers == null) ? "" : bootstrapServers; @@ -81,13 +70,10 @@ public class ProduceBenchSpec extends TaskSpec { this.producerConf = configOrEmptyMap(producerConf); this.commonClientConf = configOrEmptyMap(commonClientConf); this.adminClientConf = configOrEmptyMap(adminClientConf); - this.totalTopics = totalTopics; - this.activeTopics = activeTopics; - this.topicPrefix = (topicPrefix == null) ? DEFAULT_TOPIC_PREFIX : topicPrefix; - this.numPartitions = (partitionsPerTopic == 0) - ? DEFAULT_NUM_PARTITIONS : partitionsPerTopic; - this.replicationFactor = (replicationFactor == 0) - ? DEFAULT_REPLICATION_FACTOR : replicationFactor; + this.activeTopics = (activeTopics == null) ? + TopicsSpec.EMPTY : activeTopics.immutableCopy(); + this.inactiveTopics = (inactiveTopics == null) ? + TopicsSpec.EMPTY : inactiveTopics.immutableCopy(); } @JsonProperty @@ -136,28 +122,13 @@ public class ProduceBenchSpec extends TaskSpec { } @JsonProperty - public int totalTopics() { - return totalTopics; - } - - @JsonProperty - public int activeTopics() { + public TopicsSpec activeTopics() { return activeTopics; } @JsonProperty - public String topicPrefix() { - return topicPrefix; - } - - @JsonProperty - public int numPartitions() { - return numPartitions; - } - - @JsonProperty - public short replicationFactor() { - return replicationFactor; + public TopicsSpec inactiveTopics() { + return inactiveTopics; } @Override diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java index 4c3095f..dc749eb 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java @@ -26,7 +26,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.utils.Time; @@ -40,6 +40,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; import java.util.Map; import java.util.Properties; import java.util.concurrent.Callable; @@ -66,16 +68,6 @@ public class ProduceBenchWorker implements TaskWorker { private KafkaFutureImpl<String> doneFuture; - /** - * Generate a topic name based on a topic number. - * - * @param topicIndex The topic number. - * @return The topic name. - */ - public String topicIndexToName(int topicIndex) { - return String.format("%s%05d", spec.topicPrefix(), topicIndex); - } - public ProduceBenchWorker(String id, ProduceBenchSpec spec) { this.id = id; this.spec = spec; @@ -88,7 +80,9 @@ public class ProduceBenchWorker implements TaskWorker { throw new IllegalStateException("ProducerBenchWorker is already running."); } log.info("{}: Activating ProduceBenchWorker with {}", id, spec); - this.executor = Executors.newScheduledThreadPool(1, + // Create an executor with 2 threads. We need the second thread so + // that the StatusUpdater can run in parallel with SendRecords. + this.executor = Executors.newScheduledThreadPool(2, ThreadUtils.createThreadFactory("ProduceBenchWorkerThread%d", false)); this.status = status; this.doneFuture = doneFuture; @@ -99,25 +93,31 @@ public class ProduceBenchWorker implements TaskWorker { @Override public void run() { try { - if (spec.activeTopics() == 0) { - throw new ConfigException("Can't have activeTopics == 0."); + Map<String, NewTopic> newTopics = new HashMap<>(); + HashSet<TopicPartition> active = new HashSet<>(); + for (Map.Entry<String, PartitionsSpec> entry : + spec.activeTopics().materialize().entrySet()) { + String topicName = entry.getKey(); + PartitionsSpec partSpec = entry.getValue(); + newTopics.put(topicName, partSpec.newTopic(topicName)); + for (Integer partitionNumber : partSpec.partitionNumbers()) { + active.add(new TopicPartition(topicName, partitionNumber)); + } } - if (spec.totalTopics() < spec.activeTopics()) { - throw new ConfigException(String.format( - "activeTopics was %d, but totalTopics was only %d. activeTopics must " + - "be less than or equal to totalTopics.", spec.activeTopics(), spec.totalTopics())); + if (active.isEmpty()) { + throw new RuntimeException("You must specify at least one active topic."); } - Map<String, NewTopic> newTopics = new HashMap<>(); - for (int i = 0; i < spec.totalTopics(); i++) { - String name = topicIndexToName(i); - newTopics.put(name, new NewTopic(name, spec.numPartitions(), - spec.replicationFactor())); + for (Map.Entry<String, PartitionsSpec> entry : + spec.inactiveTopics().materialize().entrySet()) { + String topicName = entry.getKey(); + PartitionsSpec partSpec = entry.getValue(); + newTopics.put(topicName, partSpec.newTopic(topicName)); } - status.update(new TextNode("Creating " + spec.totalTopics() + " topic(s)")); + status.update(new TextNode("Creating " + newTopics.keySet().size() + " topic(s)")); WorkerUtils.createTopics(log, spec.bootstrapServers(), spec.commonClientConf(), spec.adminClientConf(), newTopics, false); - status.update(new TextNode("Created " + spec.totalTopics() + " topic(s)")); - executor.submit(new SendRecords()); + status.update(new TextNode("Created " + newTopics.keySet().size() + " topic(s)")); + executor.submit(new SendRecords(active)); } catch (Throwable e) { WorkerUtils.abort(log, "Prepare", e, doneFuture); } @@ -167,6 +167,8 @@ public class ProduceBenchWorker implements TaskWorker { } public class SendRecords implements Callable<Void> { + private final HashSet<TopicPartition> activePartitions; + private final Histogram histogram; private final Future<?> statusUpdaterFuture; @@ -179,7 +181,8 @@ public class ProduceBenchWorker implements TaskWorker { private final Throttle throttle; - SendRecords() { + SendRecords(HashSet<TopicPartition> activePartitions) { + this.activePartitions = activePartitions; this.histogram = new Histogram(5000); int perPeriod = WorkerUtils.perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS); this.statusUpdaterFuture = executor.scheduleWithFixedDelay( @@ -201,13 +204,16 @@ public class ProduceBenchWorker implements TaskWorker { try { Future<RecordMetadata> future = null; try { + Iterator<TopicPartition> iter = activePartitions.iterator(); for (int m = 0; m < spec.maxMessages(); m++) { - for (int i = 0; i < spec.activeTopics(); i++) { - ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>( - topicIndexToName(i), 0, keys.next(), values.next()); - future = producer.send(record, - new SendRecordsCallback(this, Time.SYSTEM.milliseconds())); + if (!iter.hasNext()) { + iter = activePartitions.iterator(); } + TopicPartition partition = iter.next(); + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>( + partition.topic(), partition.partition(), keys.next(), values.next()); + future = producer.send(record, + new SendRecordsCallback(this, Time.SYSTEM.milliseconds())); throttle.increment(); } } finally { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java index 12b0c08..570f6a1 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java @@ -17,6 +17,9 @@ package org.apache.kafka.trogdor.workload; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.node.TextNode; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -27,6 +30,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.WakeupException; @@ -35,6 +39,7 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.trogdor.common.JsonUtil; import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.common.ThreadUtils; import org.apache.kafka.trogdor.common.WorkerUtils; @@ -46,33 +51,31 @@ import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.ArrayList; -import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.TreeSet; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; public class RoundTripWorker implements TaskWorker { private static final int THROTTLE_PERIOD_MS = 100; - private static final int MESSAGE_SIZE = 512; - private static final int LOG_INTERVAL_MS = 5000; private static final int LOG_NUM_MESSAGES = 10; - private static final String TOPIC_NAME = "round_trip_topic"; - private static final Logger log = LoggerFactory.getLogger(RoundTripWorker.class); private static final PayloadGenerator KEY_GENERATOR = new SequentialPayloadGenerator(4, 0); - private final ToReceiveTracker toReceiveTracker = new ToReceiveTracker(); + private ToReceiveTracker toReceiveTracker; private final String id; @@ -80,18 +83,20 @@ public class RoundTripWorker implements TaskWorker { private final AtomicBoolean running = new AtomicBoolean(false); - private ExecutorService executor; + private ScheduledExecutorService executor; + + private WorkerStatusTracker status; private KafkaFutureImpl<String> doneFuture; private KafkaProducer<byte[], byte[]> producer; - private PayloadGenerator payloadGenerator; - private KafkaConsumer<byte[], byte[]> consumer; private CountDownLatch unackedSends; + private ToSendTracker toSendTracker; + public RoundTripWorker(String id, RoundTripWorkloadSpec spec) { this.id = id; this.spec = spec; @@ -104,8 +109,9 @@ public class RoundTripWorker implements TaskWorker { throw new IllegalStateException("RoundTripWorker is already running."); } log.info("{}: Activating RoundTripWorker.", id); - this.executor = Executors.newCachedThreadPool( + this.executor = Executors.newScheduledThreadPool(3, ThreadUtils.createThreadFactory("RoundTripWorker%d", false)); + this.status = status; this.doneFuture = doneFuture; this.producer = null; this.consumer = null; @@ -120,16 +126,31 @@ public class RoundTripWorker implements TaskWorker { if (spec.targetMessagesPerSec() <= 0) { throw new ConfigException("Can't have targetMessagesPerSec <= 0."); } - if ((spec.partitionAssignments() == null) || spec.partitionAssignments().isEmpty()) { - throw new ConfigException("Invalid null or empty partitionAssignments."); + Map<String, NewTopic> newTopics = new HashMap<>(); + HashSet<TopicPartition> active = new HashSet<>(); + for (Map.Entry<String, PartitionsSpec> entry : + spec.activeTopics().materialize().entrySet()) { + String topicName = entry.getKey(); + PartitionsSpec partSpec = entry.getValue(); + newTopics.put(topicName, partSpec.newTopic(topicName)); + for (Integer partitionNumber : partSpec.partitionNumbers()) { + active.add(new TopicPartition(topicName, partitionNumber)); + } + } + if (active.isEmpty()) { + throw new RuntimeException("You must specify at least one active topic."); } - WorkerUtils.createTopics( - log, spec.bootstrapServers(), spec.commonClientConf(), spec.adminClientConf(), - Collections.singletonMap(TOPIC_NAME, - new NewTopic(TOPIC_NAME, spec.partitionAssignments())), - true); - executor.submit(new ProducerRunnable()); - executor.submit(new ConsumerRunnable()); + status.update(new TextNode("Creating " + newTopics.keySet().size() + " topic(s)")); + WorkerUtils.createTopics(log, spec.bootstrapServers(), spec.commonClientConf(), + spec.adminClientConf(), newTopics, true); + status.update(new TextNode("Created " + newTopics.keySet().size() + " topic(s)")); + toSendTracker = new ToSendTracker(spec.maxMessages()); + toReceiveTracker = new ToReceiveTracker(); + executor.submit(new ProducerRunnable(active)); + executor.submit(new ConsumerRunnable(active)); + executor.submit(new StatusUpdater()); + executor.scheduleWithFixedDelay( + new StatusUpdater(), 30, 30, TimeUnit.SECONDS); } catch (Throwable e) { WorkerUtils.abort(log, "Prepare", e, doneFuture); } @@ -159,6 +180,10 @@ public class RoundTripWorker implements TaskWorker { failed.add(index); } + synchronized int frontier() { + return frontier; + } + synchronized ToSendTrackerResult next() { if (failed.isEmpty()) { if (frontier >= maxMessages) { @@ -173,9 +198,11 @@ public class RoundTripWorker implements TaskWorker { } class ProducerRunnable implements Runnable { + private final HashSet<TopicPartition> partitions; private final Throttle throttle; - ProducerRunnable() { + ProducerRunnable(HashSet<TopicPartition> partitions) { + this.partitions = partitions; Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers()); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024); @@ -195,11 +222,11 @@ public class RoundTripWorker implements TaskWorker { @Override public void run() { - final ToSendTracker toSendTracker = new ToSendTracker(spec.maxMessages()); long messagesSent = 0; long uniqueMessagesSent = 0; log.debug("{}: Starting RoundTripWorker#ProducerRunnable.", id); try { + Iterator<TopicPartition> iter = partitions.iterator(); while (true) { final ToSendTrackerResult result = toSendTracker.next(); if (result == null) { @@ -212,9 +239,13 @@ public class RoundTripWorker implements TaskWorker { uniqueMessagesSent++; } messagesSent++; + if (!iter.hasNext()) { + iter = partitions.iterator(); + } + TopicPartition partition = iter.next(); // we explicitly specify generator position based on message index - ProducerRecord<byte[], byte[]> record = new ProducerRecord(TOPIC_NAME, 0, - KEY_GENERATOR.generate(messageIndex), + ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(partition.topic(), + partition.partition(), KEY_GENERATOR.generate(messageIndex), spec.valueGenerator().generate(messageIndex)); producer.send(record, new Callback() { @Override @@ -242,12 +273,23 @@ public class RoundTripWorker implements TaskWorker { private class ToReceiveTracker { private final TreeSet<Integer> pending = new TreeSet<>(); + private int totalReceived = 0; + synchronized void addPending(int messageIndex) { pending.add(messageIndex); } synchronized boolean removePending(int messageIndex) { - return pending.remove(messageIndex); + if (pending.remove(messageIndex)) { + totalReceived++; + return true; + } else { + return false; + } + } + + synchronized int totalReceived() { + return totalReceived; } void log() { @@ -269,7 +311,7 @@ public class RoundTripWorker implements TaskWorker { class ConsumerRunnable implements Runnable { private final Properties props; - ConsumerRunnable() { + ConsumerRunnable(HashSet<TopicPartition> partitions) { this.props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers()); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer." + id); @@ -281,7 +323,7 @@ public class RoundTripWorker implements TaskWorker { WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.consumerConf()); consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer()); - consumer.subscribe(Collections.singleton(TOPIC_NAME)); + consumer.assign(partitions); } @Override @@ -296,7 +338,8 @@ public class RoundTripWorker implements TaskWorker { try { pollInvoked++; ConsumerRecords<byte[], byte[]> records = consumer.poll(50); - for (ConsumerRecord<byte[], byte[]> record : records.records(TOPIC_NAME)) { + for (Iterator<ConsumerRecord<byte[], byte[]>> iter = records.iterator(); iter.hasNext(); ) { + ConsumerRecord<byte[], byte[]> record = iter.next(); int messageIndex = ByteBuffer.wrap(record.key()).order(ByteOrder.LITTLE_ENDIAN).getInt(); messagesReceived++; if (toReceiveTracker.removePending(messageIndex)) { @@ -306,6 +349,7 @@ public class RoundTripWorker implements TaskWorker { "Waiting for all sends to be acked...", id, spec.maxMessages()); unackedSends.await(); log.info("{}: all sends have been acked.", id); + new StatusUpdater().update(); doneFuture.complete(""); return; } @@ -332,6 +376,46 @@ public class RoundTripWorker implements TaskWorker { } } + public class StatusUpdater implements Runnable { + @Override + public void run() { + try { + update(); + } catch (Exception e) { + WorkerUtils.abort(log, "StatusUpdater", e, doneFuture); + } + } + + StatusData update() { + StatusData statusData = + new StatusData(toSendTracker.frontier(), toReceiveTracker.totalReceived()); + status.update(JsonUtil.JSON_SERDE.valueToTree(statusData)); + return statusData; + } + } + + public static class StatusData { + private final long totalUniqueSent; + private final long totalReceived; + + @JsonCreator + public StatusData(@JsonProperty("totalUniqueSent") long totalUniqueSent, + @JsonProperty("totalReceived") long totalReceived) { + this.totalUniqueSent = totalUniqueSent; + this.totalReceived = totalReceived; + } + + @JsonProperty + public long totalUniqueSent() { + return totalUniqueSent; + } + + @JsonProperty + public long totalReceived() { + return totalReceived; + } + } + @Override public void stop(Platform platform) throws Exception { if (!running.compareAndSet(true, false)) { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java index 3d0e3ef..9522e0a 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorkloadSpec.java @@ -25,11 +25,8 @@ import org.apache.kafka.trogdor.task.TaskSpec; import org.apache.kafka.trogdor.task.TaskWorker; import java.util.Collections; -import java.util.List; import java.util.Map; -import java.util.NavigableMap; import java.util.Set; -import java.util.TreeMap; /** * The specification for a workload that sends messages to a broker and then @@ -39,8 +36,8 @@ public class RoundTripWorkloadSpec extends TaskSpec { private final String clientNode; private final String bootstrapServers; private final int targetMessagesPerSec; - private final NavigableMap<Integer, List<Integer>> partitionAssignments; private final PayloadGenerator valueGenerator; + private final TopicsSpec activeTopics; private final int maxMessages; private final Map<String, String> commonClientConf; private final Map<String, String> producerConf; @@ -57,17 +54,17 @@ public class RoundTripWorkloadSpec extends TaskSpec { @JsonProperty("consumerConf") Map<String, String> consumerConf, @JsonProperty("producerConf") Map<String, String> producerConf, @JsonProperty("targetMessagesPerSec") int targetMessagesPerSec, - @JsonProperty("partitionAssignments") NavigableMap<Integer, List<Integer>> partitionAssignments, @JsonProperty("valueGenerator") PayloadGenerator valueGenerator, + @JsonProperty("activeTopics") TopicsSpec activeTopics, @JsonProperty("maxMessages") int maxMessages) { super(startMs, durationMs); this.clientNode = clientNode == null ? "" : clientNode; this.bootstrapServers = bootstrapServers == null ? "" : bootstrapServers; this.targetMessagesPerSec = targetMessagesPerSec; - this.partitionAssignments = partitionAssignments == null ? - new TreeMap<Integer, List<Integer>>() : partitionAssignments; this.valueGenerator = valueGenerator == null ? new UniformRandomPayloadGenerator(32, 123, 10) : valueGenerator; + this.activeTopics = activeTopics == null ? + TopicsSpec.EMPTY : activeTopics.immutableCopy(); this.maxMessages = maxMessages; this.commonClientConf = configOrEmptyMap(commonClientConf); this.adminClientConf = configOrEmptyMap(adminClientConf); @@ -91,8 +88,8 @@ public class RoundTripWorkloadSpec extends TaskSpec { } @JsonProperty - public NavigableMap<Integer, List<Integer>> partitionAssignments() { - return partitionAssignments; + public TopicsSpec activeTopics() { + return activeTopics; } @JsonProperty diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java new file mode 100644 index 0000000..a9b550d --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/TopicsSpec.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.workload; + +import com.fasterxml.jackson.annotation.JsonAnyGetter; +import com.fasterxml.jackson.annotation.JsonAnySetter; +import com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.kafka.trogdor.common.StringExpander; +import org.apache.kafka.trogdor.rest.Message; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * TopicsSpec maps topic names to descriptions of the partitions in them. + * + * In JSON form, this is serialized as a map whose keys are topic names, + * and whose entries are partition descriptions. + * Keys may also refer to multiple partitions. For example, this specification + * refers to 3 topics foo1, foo2, and foo3: + * + * { + * "foo[1-3]" : { + * "numPartitions": 3 + * "replicationFactor": 3 + * } + * } + */ +public class TopicsSpec extends Message { + public static final TopicsSpec EMPTY = new TopicsSpec().immutableCopy(); + + private final Map<String, PartitionsSpec> map; + + @JsonCreator + public TopicsSpec() { + this.map = new HashMap<>(); + } + + private TopicsSpec(Map<String, PartitionsSpec> map) { + this.map = map; + } + + @JsonAnyGetter + public Map<String, PartitionsSpec> get() { + return map; + } + + @JsonAnySetter + public void set(String name, PartitionsSpec value) { + map.put(name, value); + } + + public TopicsSpec immutableCopy() { + HashMap<String, PartitionsSpec> mapCopy = new HashMap<>(); + mapCopy.putAll(map); + return new TopicsSpec(Collections.unmodifiableMap(mapCopy)); + } + + /** + * Enumerate the partitions inside this TopicsSpec. + * + * @return A map from topic names to PartitionsSpec objects. + */ + public Map<String, PartitionsSpec> materialize() { + HashMap<String, PartitionsSpec> all = new HashMap<>(); + for (Map.Entry<String, PartitionsSpec> entry : map.entrySet()) { + for (String topicName : StringExpander.expand(entry.getKey())) { + all.put(topicName, entry.getValue()); + } + } + return all; + } +} diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java index c1f7490..55ecb1a 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java @@ -26,9 +26,10 @@ import org.apache.kafka.trogdor.rest.TasksResponse; import org.apache.kafka.trogdor.rest.WorkerDone; import org.apache.kafka.trogdor.rest.WorkerRunning; import org.apache.kafka.trogdor.rest.WorkerStopping; -import org.apache.kafka.trogdor.task.SampleTaskSpec; +import org.apache.kafka.trogdor.workload.PartitionsSpec; import org.apache.kafka.trogdor.workload.ProduceBenchSpec; import org.apache.kafka.trogdor.workload.RoundTripWorkloadSpec; +import org.apache.kafka.trogdor.workload.TopicsSpec; import org.junit.Test; import java.lang.reflect.Field; @@ -49,10 +50,11 @@ public class JsonSerializationTest { verify(new WorkerRunning(null, null, 0, null)); verify(new WorkerStopping(null, null, 0, null)); verify(new ProduceBenchSpec(0, 0, null, null, - 0, 0, null, null, null, null, null, 0, 0, "test-topic", 1, (short) 3)); + 0, 0, null, null, null, null, null, null, null)); verify(new RoundTripWorkloadSpec(0, 0, null, null, null, null, null, null, 0, null, null, 0)); - verify(new SampleTaskSpec(0, 0, null, null)); + verify(new TopicsSpec()); + verify(new PartitionsSpec(0, (short) 0, null)); } private <T> void verify(T val1) throws Exception { diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/StringExpanderTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/StringExpanderTest.java new file mode 100644 index 0000000..72e1c20 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/StringExpanderTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.common; + +import org.junit.Rule; +import org.junit.rules.Timeout; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; + +import static org.junit.Assert.assertEquals; + +public class StringExpanderTest { + @Rule + final public Timeout globalTimeout = Timeout.millis(120000); + + @Test + public void testNoExpansionNeeded() throws Exception { + assertEquals(Collections.singleton("foo"), StringExpander.expand("foo")); + assertEquals(Collections.singleton("bar"), StringExpander.expand("bar")); + assertEquals(Collections.singleton(""), StringExpander.expand("")); + } + + @Test + public void testExpansions() throws Exception { + HashSet<String> expected1 = new HashSet<>(Arrays.asList( + "foo1", + "foo2", + "foo3" + )); + assertEquals(expected1, StringExpander.expand("foo[1-3]")); + + HashSet<String> expected2 = new HashSet<>(Arrays.asList( + "foo bar baz 0" + )); + assertEquals(expected2, StringExpander.expand("foo bar baz [0-0]")); + + HashSet<String> expected3 = new HashSet<>(Arrays.asList( + "[[ wow50 ]]", + "[[ wow51 ]]", + "[[ wow52 ]]" + )); + assertEquals(expected3, StringExpander.expand("[[ wow[50-52] ]]")); + } +} diff --git a/tools/src/test/java/org/apache/kafka/trogdor/workload/TopicsSpecTest.java b/tools/src/test/java/org/apache/kafka/trogdor/workload/TopicsSpecTest.java new file mode 100644 index 0000000..f86ca0f --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/trogdor/workload/TopicsSpecTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.workload; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TopicsSpecTest { + @Rule + final public Timeout globalTimeout = Timeout.millis(120000); + + private final static TopicsSpec FOO; + private final static PartitionsSpec PARTSA; + private final static PartitionsSpec PARTSB; + + static { + FOO = new TopicsSpec(); + + PARTSA = new PartitionsSpec(3, (short) 3, null); + FOO.set("topicA[0-2]", PARTSA); + + Map<Integer, List<Integer>> assignmentsB = new HashMap<>(); + assignmentsB.put(0, Arrays.asList(0, 1, 2)); + assignmentsB.put(1, Arrays.asList(2, 3, 4)); + PARTSB = new PartitionsSpec(0, (short) 0, assignmentsB); + FOO.set("topicB", PARTSB); + } + + @Test + public void testMaterialize() { + Map<String, PartitionsSpec> parts = FOO.materialize(); + assertTrue(parts.containsKey("topicA0")); + assertTrue(parts.containsKey("topicA1")); + assertTrue(parts.containsKey("topicA2")); + assertTrue(parts.containsKey("topicB")); + assertEquals(4, parts.keySet().size()); + assertEquals(PARTSA, parts.get("topicA0")); + assertEquals(PARTSA, parts.get("topicA1")); + assertEquals(PARTSA, parts.get("topicA2")); + assertEquals(PARTSB, parts.get("topicB")); + } + + @Test + public void testPartitionNumbers() { + List<Integer> partsANumbers = PARTSA.partitionNumbers(); + assertEquals(Integer.valueOf(0), partsANumbers.get(0)); + assertEquals(Integer.valueOf(1), partsANumbers.get(1)); + assertEquals(Integer.valueOf(2), partsANumbers.get(2)); + assertEquals(3, partsANumbers.size()); + + List<Integer> partsBNumbers = PARTSB.partitionNumbers(); + assertEquals(Integer.valueOf(0), partsBNumbers.get(0)); + assertEquals(Integer.valueOf(1), partsBNumbers.get(1)); + assertEquals(2, partsBNumbers.size()); + } +} -- To stop receiving notification emails like this one, please contact rsiva...@apache.org.