[ 
https://issues.apache.org/jira/browse/KAFKA-7597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701009#comment-16701009
 ] 

ASF GitHub Bot commented on KAFKA-7597:
---------------------------------------

cmccabe closed pull request #5885: KAFKA-7597: Make Trogdor ProduceBenchWorker 
support transactions
URL: https://github.com/apache/kafka/pull/5885
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index 4d514dfcf97..5ce648adf77 100644
--- a/build.gradle
+++ b/build.gradle
@@ -565,6 +565,7 @@ project(':core') {
   dependencies {
     compile project(':clients')
     compile libs.jacksonDatabind
+    compile libs.jacksonJDK8Datatypes
     compile libs.joptSimple
     compile libs.metrics
     compile libs.scalaLibrary
@@ -830,6 +831,7 @@ project(':clients') {
     compile libs.snappy
     compile libs.slf4jApi
     compileOnly libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token 
parsing
+    compileOnly libs.jacksonJDK8Datatypes
 
     jacksonDatabindConfig libs.jacksonDatabind // to publish as provided scope 
dependency.
 
@@ -839,6 +841,7 @@ project(':clients') {
 
     testRuntime libs.slf4jlog4j
     testRuntime libs.jacksonDatabind
+    testRuntime libs.jacksonJDK8Datatypes
   }
 
   task determineCommitId {
@@ -918,6 +921,7 @@ project(':tools') {
     compile project(':log4j-appender')
     compile libs.argparse4j
     compile libs.jacksonDatabind
+    compile libs.jacksonJDK8Datatypes
     compile libs.slf4jApi
 
     compile libs.jacksonJaxrsJsonProvider
@@ -1347,6 +1351,7 @@ project(':connect:json') {
   dependencies {
     compile project(':connect:api')
     compile libs.jacksonDatabind
+    compile libs.jacksonJDK8Datatypes
     compile libs.slf4jApi
 
     testCompile libs.easymock
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 7dd3604db08..59f56fcd4ab 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -103,6 +103,7 @@ libs += [
   bcpkix: "org.bouncycastle:bcpkix-jdk15on:$versions.bcpkix",
   easymock: "org.easymock:easymock:$versions.easymock",
   jacksonDatabind: 
"com.fasterxml.jackson.core:jackson-databind:$versions.jackson",
+  jacksonJDK8Datatypes: 
"com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$versions.jackson",
   jacksonJaxrsJsonProvider: 
"com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson",
   jaxbApi: "javax.xml.bind:jaxb-api:$versions.jaxb",
   jaxrsApi: "javax.ws.rs:javax.ws.rs-api:$versions.jaxrs",
diff --git a/tests/bin/trogdor-run-transactional-produce-bench.sh 
b/tests/bin/trogdor-run-transactional-produce-bench.sh
new file mode 100755
index 00000000000..fd5ff0a01f2
--- /dev/null
+++ b/tests/bin/trogdor-run-transactional-produce-bench.sh
@@ -0,0 +1,51 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+COORDINATOR_ENDPOINT="localhost:8889"
+TASK_ID="produce_bench_$RANDOM"
+TASK_SPEC=$(
+cat <<EOF
+{
+    "id": "$TASK_ID",
+    "spec": {
+        "class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
+        "durationMs": 10000000,
+        "producerNode": "node0",
+        "bootstrapServers": "localhost:9092",
+        "targetMessagesPerSec": 100,
+        "maxMessages": 500,
+        "transactionGenerator" : {
+          "type" : "uniform",
+          "messagesPerTransaction" : 50
+        },
+        "activeTopics": {
+            "foo[1-3]": {
+                "numPartitions": 3,
+                "replicationFactor": 1
+            }
+        },
+        "inactiveTopics": {
+            "foo[4-5]": {
+                "numPartitions": 3,
+                "replicationFactor": 1
+            }
+        }
+    }
+}
+EOF
+)
+./bin/trogdor.sh client --create-task "${TASK_SPEC}" "${COORDINATOR_ENDPOINT}"
+echo "\$TASK_ID = $TASK_ID"
diff --git a/tests/kafkatest/services/trogdor/produce_bench_workload.py 
b/tests/kafkatest/services/trogdor/produce_bench_workload.py
index cf6a962b055..9afc81462ae 100644
--- a/tests/kafkatest/services/trogdor/produce_bench_workload.py
+++ b/tests/kafkatest/services/trogdor/produce_bench_workload.py
@@ -21,7 +21,8 @@
 class ProduceBenchWorkloadSpec(TaskSpec):
     def __init__(self, start_ms, duration_ms, producer_node, bootstrap_servers,
                  target_messages_per_sec, max_messages, producer_conf, 
admin_client_conf,
-                 common_client_conf, inactive_topics, active_topics):
+                 common_client_conf, inactive_topics, active_topics,
+                 transaction_generator=None):
         super(ProduceBenchWorkloadSpec, self).__init__(start_ms, duration_ms)
         self.message["class"] = 
"org.apache.kafka.trogdor.workload.ProduceBenchSpec"
         self.message["producerNode"] = producer_node
@@ -29,6 +30,7 @@ def __init__(self, start_ms, duration_ms, producer_node, 
bootstrap_servers,
         self.message["targetMessagesPerSec"] = target_messages_per_sec
         self.message["maxMessages"] = max_messages
         self.message["producerConf"] = producer_conf
+        self.message["transactionGenerator"] = transaction_generator
         self.message["adminClientConf"] = admin_client_conf
         self.message["commonClientConf"] = common_client_conf
         self.message["inactiveTopics"] = inactive_topics
diff --git a/tests/kafkatest/tests/core/produce_bench_test.py 
b/tests/kafkatest/tests/core/produce_bench_test.py
index 125ee941eb5..a316520335b 100644
--- a/tests/kafkatest/tests/core/produce_bench_test.py
+++ b/tests/kafkatest/tests/core/produce_bench_test.py
@@ -31,6 +31,8 @@ def __init__(self, test_context):
         self.workload_service = ProduceBenchWorkloadService(test_context, 
self.kafka)
         self.trogdor = TrogdorService(context=self.test_context,
                                       client_services=[self.kafka, 
self.workload_service])
+        self.active_topics = {"produce_bench_topic[0-1]": {"numPartitions": 1, 
"replicationFactor": 3}}
+        self.inactive_topics = {"produce_bench_topic[2-9]": {"numPartitions": 
1, "replicationFactor": 3}}
 
     def setUp(self):
         self.trogdor.start()
@@ -43,8 +45,6 @@ def teardown(self):
         self.zk.stop()
 
     def test_produce_bench(self):
-        active_topics={"produce_bench_topic[0-1]":{"numPartitions":1, 
"replicationFactor":3}}
-        inactive_topics={"produce_bench_topic[2-9]":{"numPartitions":1, 
"replicationFactor":3}}
         spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
                                         self.workload_service.producer_node,
                                         
self.workload_service.bootstrap_servers,
@@ -53,8 +53,29 @@ def test_produce_bench(self):
                                         producer_conf={},
                                         admin_client_conf={},
                                         common_client_conf={},
-                                        inactive_topics=inactive_topics,
-                                        active_topics=active_topics)
+                                        inactive_topics=self.inactive_topics,
+                                        active_topics=self.active_topics)
+        workload1 = self.trogdor.create_task("workload1", spec)
+        workload1.wait_for_done(timeout_sec=360)
+        tasks = self.trogdor.tasks()
+        self.logger.info("TASKS: %s\n" % json.dumps(tasks, sort_keys=True, 
indent=2))
+
+    def test_produce_bench_transactions(self):
+        spec = ProduceBenchWorkloadSpec(0, TaskSpec.MAX_DURATION_MS,
+                                        self.workload_service.producer_node,
+                                        
self.workload_service.bootstrap_servers,
+                                        target_messages_per_sec=1000,
+                                        max_messages=100000,
+                                        producer_conf={},
+                                        admin_client_conf={},
+                                        common_client_conf={},
+                                        inactive_topics=self.inactive_topics,
+                                        active_topics=self.active_topics,
+                                        transaction_generator={
+                                            # 10 transactions with 10k messages
+                                            "type": "uniform",
+                                            "messagesPerTransaction": "10000"
+                                        })
         workload1 = self.trogdor.create_task("workload1", spec)
         workload1.wait_for_done(timeout_sec=360)
         tasks = self.trogdor.tasks()
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java 
b/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java
index 70193c3beef..ad90ffc6e84 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/JsonUtil.java
@@ -22,6 +22,7 @@
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
 
 /**
  * Utilities for working with JSON.
@@ -33,6 +34,7 @@
         JSON_SERDE = new ObjectMapper();
         JSON_SERDE.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
         
JSON_SERDE.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
+        JSON_SERDE.registerModule(new Jdk8Module());
         JSON_SERDE.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
     }
 
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
index 30878bf303f..c0bbd7eb012 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchSpec.java
@@ -26,10 +26,39 @@
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 /**
  * The specification for a benchmark that produces messages to a set of topics.
+ *
+ * To configure a transactional producer, a #{@link TransactionGenerator} must 
be passed in.
+ * Said generator works in lockstep with the producer by instructing it what 
action to take next in regards to a transaction.
+ *
+ * An example JSON representation which will result in a producer that creates 
three topics (foo1, foo2, foo3)
+ * with three partitions each and produces to them:
+ * #{@code
+ *   {
+ *      "class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
+ *      "durationMs": 10000000,
+ *      "producerNode": "node0",
+ *      "bootstrapServers": "localhost:9092",
+ *      "targetMessagesPerSec": 10,
+ *      "maxMessages": 100,
+ *      "activeTopics": {
+ *        "foo[1-3]": {
+ *          "numPartitions": 3,
+ *          "replicationFactor": 1
+ *        }
+ *      },
+ *      "inactiveTopics": {
+ *        "foo[4-5]": {
+ *          "numPartitions": 3,
+ *          "replicationFactor": 1
+ *        }
+ *      }
+ *   }
+ * }
  */
 public class ProduceBenchSpec extends TaskSpec {
     private final String producerNode;
@@ -38,6 +67,7 @@
     private final int maxMessages;
     private final PayloadGenerator keyGenerator;
     private final PayloadGenerator valueGenerator;
+    private final Optional<TransactionGenerator> transactionGenerator;
     private final Map<String, String> producerConf;
     private final Map<String, String> adminClientConf;
     private final Map<String, String> commonClientConf;
@@ -53,6 +83,7 @@ public ProduceBenchSpec(@JsonProperty("startMs") long startMs,
                          @JsonProperty("maxMessages") int maxMessages,
                          @JsonProperty("keyGenerator") PayloadGenerator 
keyGenerator,
                          @JsonProperty("valueGenerator") PayloadGenerator 
valueGenerator,
+                         @JsonProperty("transactionGenerator") 
Optional<TransactionGenerator> txGenerator,
                          @JsonProperty("producerConf") Map<String, String> 
producerConf,
                          @JsonProperty("commonClientConf") Map<String, String> 
commonClientConf,
                          @JsonProperty("adminClientConf") Map<String, String> 
adminClientConf,
@@ -67,6 +98,7 @@ public ProduceBenchSpec(@JsonProperty("startMs") long startMs,
             new SequentialPayloadGenerator(4, 0) : keyGenerator;
         this.valueGenerator = valueGenerator == null ?
             new ConstantPayloadGenerator(512, new byte[0]) : valueGenerator;
+        this.transactionGenerator = txGenerator == null ? Optional.empty() : 
txGenerator;
         this.producerConf = configOrEmptyMap(producerConf);
         this.commonClientConf = configOrEmptyMap(commonClientConf);
         this.adminClientConf = configOrEmptyMap(adminClientConf);
@@ -106,6 +138,11 @@ public PayloadGenerator valueGenerator() {
         return valueGenerator;
     }
 
+    @JsonProperty
+    public Optional<TransactionGenerator> transactionGenerator() {
+        return transactionGenerator;
+    }
+
     @JsonProperty
     public Map<String, String> producerConf() {
         return producerConf;
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
index dc749eb65a4..abf59768921 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
@@ -36,6 +36,7 @@
 import org.apache.kafka.trogdor.common.WorkerUtils;
 import org.apache.kafka.trogdor.task.TaskWorker;
 import org.apache.kafka.trogdor.task.WorkerStatusTracker;
+import 
org.apache.kafka.trogdor.workload.TransactionGenerator.TransactionAction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,13 +44,16 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class ProduceBenchWorker implements TaskWorker {
     private static final Logger log = 
LoggerFactory.getLogger(ProduceBenchWorker.class);
@@ -179,18 +183,33 @@ protected synchronized void delay(long amount) throws 
InterruptedException {
 
         private final PayloadIterator values;
 
+        private final Optional<TransactionGenerator> transactionGenerator;
+
         private final Throttle throttle;
 
+        private Iterator<TopicPartition> partitionsIterator;
+        private Future<RecordMetadata> sendFuture;
+        private AtomicLong transactionsCommitted;
+        private boolean enableTransactions;
+
         SendRecords(HashSet<TopicPartition> activePartitions) {
             this.activePartitions = activePartitions;
+            this.partitionsIterator = activePartitions.iterator();
             this.histogram = new Histogram(5000);
+
+            this.transactionGenerator = spec.transactionGenerator();
+            this.enableTransactions = this.transactionGenerator.isPresent();
+            this.transactionsCommitted = new AtomicLong();
+
             int perPeriod = 
WorkerUtils.perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
             this.statusUpdaterFuture = executor.scheduleWithFixedDelay(
-                new StatusUpdater(histogram), 30, 30, TimeUnit.SECONDS);
+                new StatusUpdater(histogram, transactionsCommitted), 30, 30, 
TimeUnit.SECONDS);
+
             Properties props = new Properties();
             props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
spec.bootstrapServers());
-            // add common client configs to producer properties, and then 
user-specified producer
-            // configs
+            if (enableTransactions)
+                props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
"produce-bench-transaction-id-" + UUID.randomUUID());
+            // add common client configs to producer properties, and then 
user-specified producer configs
             WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), 
spec.producerConf());
             this.producer = new KafkaProducer<>(props, new 
ByteArraySerializer(), new ByteArraySerializer());
             this.keys = new PayloadIterator(spec.keyGenerator());
@@ -202,23 +221,29 @@ protected synchronized void delay(long amount) throws 
InterruptedException {
         public Void call() throws Exception {
             long startTimeMs = Time.SYSTEM.milliseconds();
             try {
-                Future<RecordMetadata> future = null;
                 try {
-                    Iterator<TopicPartition> iter = 
activePartitions.iterator();
-                    for (int m = 0; m < spec.maxMessages(); m++) {
-                        if (!iter.hasNext()) {
-                            iter = activePartitions.iterator();
+                    if (enableTransactions)
+                        producer.initTransactions();
+
+                    int sentMessages = 0;
+                    while (sentMessages < spec.maxMessages()) {
+                        if (enableTransactions) {
+                            boolean tookAction = takeTransactionAction();
+                            if (tookAction)
+                                continue;
                         }
-                        TopicPartition partition = iter.next();
-                        ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(
-                            partition.topic(), partition.partition(), 
keys.next(), values.next());
-                        future = producer.send(record,
-                            new SendRecordsCallback(this, 
Time.SYSTEM.milliseconds()));
-                        throttle.increment();
+                        sendMessage();
+                        sentMessages++;
                     }
+                    if (enableTransactions)
+                        takeTransactionAction(); // give the 
transactionGenerator a chance to commit if configured evenly
+                } catch (Exception e) {
+                    if (enableTransactions)
+                        producer.abortTransaction();
+                    throw e;
                 } finally {
-                    if (future != null) {
-                        future.get();
+                    if (sendFuture != null) {
+                        sendFuture.get();
                     }
                     producer.close();
                 }
@@ -226,7 +251,7 @@ public Void call() throws Exception {
                 WorkerUtils.abort(log, "SendRecords", e, doneFuture);
             } finally {
                 statusUpdaterFuture.cancel(false);
-                StatusData statusData = new StatusUpdater(histogram).update();
+                StatusData statusData = new StatusUpdater(histogram, 
transactionsCommitted).update();
                 long curTimeMs = Time.SYSTEM.milliseconds();
                 log.info("Sent {} total record(s) in {} ms.  status: {}",
                     histogram.summarize().numSamples(), curTimeMs - 
startTimeMs, statusData);
@@ -235,6 +260,42 @@ public Void call() throws Exception {
             return null;
         }
 
+        private boolean takeTransactionAction() {
+            boolean tookAction = true;
+            TransactionAction nextAction = 
transactionGenerator.get().nextAction();
+            switch (nextAction) {
+                case BEGIN_TRANSACTION:
+                    log.debug("Beginning transaction.");
+                    producer.beginTransaction();
+                    break;
+                case COMMIT_TRANSACTION:
+                    log.debug("Committing transaction.");
+                    producer.commitTransaction();
+                    transactionsCommitted.getAndIncrement();
+                    break;
+                case ABORT_TRANSACTION:
+                    log.debug("Aborting transaction.");
+                    producer.abortTransaction();
+                    break;
+                case NO_OP:
+                    tookAction = false;
+                    break;
+            }
+            return tookAction;
+        }
+
+        private void sendMessage() throws InterruptedException {
+            if (!partitionsIterator.hasNext())
+                partitionsIterator = activePartitions.iterator();
+
+            TopicPartition partition = partitionsIterator.next();
+            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(
+                partition.topic(), partition.partition(), keys.next(), 
values.next());
+            sendFuture = producer.send(record,
+                new SendRecordsCallback(this, Time.SYSTEM.milliseconds()));
+            throttle.increment();
+        }
+
         void recordDuration(long durationMs) {
             histogram.add(durationMs);
         }
@@ -242,9 +303,11 @@ void recordDuration(long durationMs) {
 
     public class StatusUpdater implements Runnable {
         private final Histogram histogram;
+        private final AtomicLong transactionsCommitted;
 
-        StatusUpdater(Histogram histogram) {
+        StatusUpdater(Histogram histogram, AtomicLong transactionsCommitted) {
             this.histogram = histogram;
+            this.transactionsCommitted = transactionsCommitted;
         }
 
         @Override
@@ -261,7 +324,8 @@ StatusData update() {
             StatusData statusData = new StatusData(summary.numSamples(), 
summary.average(),
                 summary.percentiles().get(0).value(),
                 summary.percentiles().get(1).value(),
-                summary.percentiles().get(2).value());
+                summary.percentiles().get(2).value(),
+                transactionsCommitted.get());
             status.update(JsonUtil.JSON_SERDE.valueToTree(statusData));
             return statusData;
         }
@@ -273,6 +337,7 @@ StatusData update() {
         private final int p50LatencyMs;
         private final int p95LatencyMs;
         private final int p99LatencyMs;
+        private final long transactionsCommitted;
 
         /**
          * The percentiles to use when calculating the histogram data.
@@ -285,12 +350,14 @@ StatusData update() {
                    @JsonProperty("averageLatencyMs") float averageLatencyMs,
                    @JsonProperty("p50LatencyMs") int p50latencyMs,
                    @JsonProperty("p95LatencyMs") int p95latencyMs,
-                   @JsonProperty("p99LatencyMs") int p99latencyMs) {
+                   @JsonProperty("p99LatencyMs") int p99latencyMs,
+                   @JsonProperty("transactionsCommitted") long 
transactionsCommitted) {
             this.totalSent = totalSent;
             this.averageLatencyMs = averageLatencyMs;
             this.p50LatencyMs = p50latencyMs;
             this.p95LatencyMs = p95latencyMs;
             this.p99LatencyMs = p99latencyMs;
+            this.transactionsCommitted = transactionsCommitted;
         }
 
         @JsonProperty
@@ -298,6 +365,11 @@ public long totalSent() {
             return totalSent;
         }
 
+        @JsonProperty
+        public long transactionsCommitted() {
+            return transactionsCommitted;
+        }
+
         @JsonProperty
         public float averageLatencyMs() {
             return averageLatencyMs;
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/TransactionGenerator.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/TransactionGenerator.java
new file mode 100644
index 00000000000..5ec47ec91c1
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/TransactionGenerator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+/**
+ * Generates actions that should be taken by a producer that uses transactions.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
+    include = JsonTypeInfo.As.PROPERTY,
+    property = "type")
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(value = UniformTransactionsGenerator.class, name = 
"uniform"),
+})
+public interface TransactionGenerator {
+    enum TransactionAction {
+        BEGIN_TRANSACTION, COMMIT_TRANSACTION, ABORT_TRANSACTION, NO_OP
+    }
+
+    /**
+     * Returns the next action that the producer should take in regards to 
transactions.
+     * This method should be called every time before a producer sends a 
message.
+     * This means that most of the time it should return #{@link 
TransactionAction#NO_OP}
+     * to signal the producer that its next step should be to send a message.
+     */
+    TransactionAction nextAction();
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/UniformTransactionsGenerator.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/UniformTransactionsGenerator.java
new file mode 100644
index 00000000000..1fbfbc23381
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/UniformTransactionsGenerator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A uniform transactions generator where every N records are grouped in a 
separate transaction
+ */
+public class UniformTransactionsGenerator implements TransactionGenerator {
+
+    private final int messagesPerTransaction;
+    private int messagesInTransaction = -1;
+
+    @JsonCreator
+    public 
UniformTransactionsGenerator(@JsonProperty("messagesPerTransaction") int 
messagesPerTransaction) {
+        if (messagesPerTransaction < 1)
+            throw new IllegalArgumentException("Cannot have less than one 
message per transaction.");
+
+        this.messagesPerTransaction = messagesPerTransaction;
+    }
+
+    @JsonProperty
+    public int messagesPerTransaction() {
+        return messagesPerTransaction;
+    }
+
+    @Override
+    public synchronized TransactionAction nextAction() {
+        if (messagesInTransaction == -1) {
+            messagesInTransaction = 0;
+            return TransactionAction.BEGIN_TRANSACTION;
+        }
+        if (messagesInTransaction == messagesPerTransaction) {
+            messagesInTransaction = -1;
+            return TransactionAction.COMMIT_TRANSACTION;
+        }
+
+        messagesInTransaction += 1;
+        return TransactionAction.NO_OP;
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
 
b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
index 5e6ff81e28b..c324ec4c708 100644
--- 
a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
@@ -37,6 +37,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.junit.Assert.assertNotNull;
 
@@ -54,7 +55,7 @@ public void testDeserializationDoesNotProduceNulls() throws 
Exception {
         verify(new WorkerRunning(null, null, 0, null));
         verify(new WorkerStopping(null, null, 0, null));
         verify(new ProduceBenchSpec(0, 0, null, null,
-            0, 0, null, null, null, null, null, null, null));
+            0, 0, null, null, Optional.empty(), null, null, null, null, null));
         verify(new RoundTripWorkloadSpec(0, 0, null, null, null, null, null, 
null,
             0, null, null, 0));
         verify(new TopicsSpec());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Trogdor - Support transactions in ProduceBenchWorker
> ----------------------------------------------------
>
>                 Key: KAFKA-7597
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7597
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Stanislav Kozlovski
>            Assignee: Stanislav Kozlovski
>            Priority: Minor
>
> The `ProduceBenchWorker` allows you to schedule a benchmark of a Kafka 
> Producer.
> It would prove useful if we supported transactions in this producer, as to 
> allow benchmarks with transactions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to