This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0d55f0f  KAFKA-8102: Add an interval-based Trogdor transaction 
generator (#6444)
0d55f0f is described below

commit 0d55f0f3ec8f97bc250b325481f6f2fa70f52a5c
Author: Stanislav Kozlovski <stanislav_kozlov...@outlook.com>
AuthorDate: Mon Mar 25 09:58:11 2019 -0700

    KAFKA-8102: Add an interval-based Trogdor transaction generator (#6444)
    
    This patch adds a TimeIntervalTransactionsGenerator class which enables the 
Trogdor ProduceBench worker to commit transactions based on a configurable 
millisecond time interval.
    
    Also, we now handle 409 create task responses in the coordinator 
command-line client by printing a more informative message
    
    Reviewers: Colin P. McCabe <cmcc...@apache.org>
---
 TROGDOR.md                                         |  1 +
 tests/spec/transactional-produce-bench.json        |  2 +-
 .../trogdor/coordinator/CoordinatorClient.java     | 12 +++-
 .../kafka/trogdor/coordinator/TaskManager.java     |  1 +
 .../TimeIntervalTransactionsGenerator.java         | 67 ++++++++++++++++++++++
 .../trogdor/workload/TransactionGenerator.java     |  1 +
 .../TimeIntervalTransactionsGeneratorTest.java     | 42 ++++++++++++++
 7 files changed, 123 insertions(+), 3 deletions(-)

diff --git a/TROGDOR.md b/TROGDOR.md
index b551773..8b446b3 100644
--- a/TROGDOR.md
+++ b/TROGDOR.md
@@ -105,6 +105,7 @@ Trogdor can run several workloads.  Workloads perform 
operations on the cluster
 
 ### ProduceBench
 ProduceBench starts a Kafka producer on a single agent node, producing to 
several partitions.  The workload measures the average produce latency, as well 
as the median, 95th percentile, and 99th percentile latency.
+It can be configured to use a transactional producer which can commit 
transactions based on a set time interval or number of messages.
 
 ### RoundTripWorkload
 RoundTripWorkload tests both production and consumption.  The workload starts 
a Kafka producer and consumer on a single node.  The consumer will read back 
the messages that were produced by the producer.
diff --git a/tests/spec/transactional-produce-bench.json 
b/tests/spec/transactional-produce-bench.json
index 40f008b..bf1b6ca 100644
--- a/tests/spec/transactional-produce-bench.json
+++ b/tests/spec/transactional-produce-bench.json
@@ -15,7 +15,7 @@
 
 //
 // An example task specification for running a transactional producer benchmark
-in Trogdor.  See TROGDOR.md for details.
+// in Trogdor.  See TROGDOR.md for details.
 //
 
 {
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
index 476c32d..ba40e7b 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
@@ -44,6 +44,7 @@ import org.apache.kafka.trogdor.rest.TasksRequest;
 import org.apache.kafka.trogdor.rest.TaskState;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 import org.apache.kafka.trogdor.task.TaskSpec;
+import org.apache.kafka.trogdor.rest.RequestConflictException;
 import org.apache.kafka.trogdor.rest.UptimeResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -426,8 +427,15 @@ public class CoordinatorClient {
                 TaskSpec taskSpec = JsonUtil.
                     objectFromCommandLineArgument(res.getString("taskSpec"), 
TaskSpec.class);
                 CreateTaskRequest req = new CreateTaskRequest(taskId, 
taskSpec);
-                client.createTask(req);
-                System.out.printf("Sent CreateTaskRequest for task %s.%n", 
req.id());
+                try {
+                    client.createTask(req);
+                    System.out.printf("Sent CreateTaskRequest for task %s.%n", 
req.id());
+                } catch (RequestConflictException rce) {
+                    System.out.printf("CreateTaskRequest for task %s got a 409 
status code - " +
+                        "a task with the same ID but a different specification 
already exists.%nException: %s%n",
+                        req.id(), rce.getMessage());
+                    Exit.exit(1);
+                }
                 break;
             }
             case "stopTask": {
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
index 941656e..60e2b1e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
@@ -302,6 +302,7 @@ public final class TaskManager {
      *
      * @param id                    The ID of the task to create.
      * @param spec                  The specification of the task to create.
+     * @throws RequestConflictException - if a task with the same ID but 
different spec exists
      */
     public void createTask(final String id, TaskSpec spec)
             throws Throwable {
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/TimeIntervalTransactionsGenerator.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/TimeIntervalTransactionsGenerator.java
new file mode 100644
index 0000000..8d5f05b
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/TimeIntervalTransactionsGenerator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.common.utils.Time;
+
+/**
+ * A transactions generator where we commit a transaction every N milliseconds
+ */
+public class TimeIntervalTransactionsGenerator implements TransactionGenerator 
{
+
+    private static final long NULL_START_MS = -1;
+
+    private final Time time;
+    private final int intervalMs;
+
+    private long lastTransactionStartMs = NULL_START_MS;
+
+    @JsonCreator
+    public 
TimeIntervalTransactionsGenerator(@JsonProperty("transactionIntervalMs") int 
intervalMs) {
+        this(intervalMs, Time.SYSTEM);
+    }
+
+    TimeIntervalTransactionsGenerator(@JsonProperty("transactionIntervalMs") 
int intervalMs,
+                                      Time time) {
+        if (intervalMs < 1) {
+            throw new IllegalArgumentException("Cannot have a negative 
interval");
+        }
+        this.time = time;
+        this.intervalMs = intervalMs;
+    }
+
+    @JsonProperty
+    public int transactionIntervalMs() {
+        return intervalMs;
+    }
+
+    @Override
+    public synchronized TransactionAction nextAction() {
+        if (lastTransactionStartMs == NULL_START_MS) {
+            lastTransactionStartMs = time.milliseconds();
+            return TransactionAction.BEGIN_TRANSACTION;
+        }
+        if (time.milliseconds() - lastTransactionStartMs >= intervalMs) {
+            lastTransactionStartMs = NULL_START_MS;
+            return TransactionAction.COMMIT_TRANSACTION;
+        }
+
+        return TransactionAction.NO_OP;
+    }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/TransactionGenerator.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/TransactionGenerator.java
index 5ec47ec..b2e8add 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/TransactionGenerator.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/TransactionGenerator.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
     property = "type")
 @JsonSubTypes(value = {
     @JsonSubTypes.Type(value = UniformTransactionsGenerator.class, name = 
"uniform"),
+    @JsonSubTypes.Type(value = TimeIntervalTransactionsGenerator.class, name = 
"interval"),
 })
 public interface TransactionGenerator {
     enum TransactionAction {
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/workload/TimeIntervalTransactionsGeneratorTest.java
 
b/tools/src/test/java/org/apache/kafka/trogdor/workload/TimeIntervalTransactionsGeneratorTest.java
new file mode 100644
index 0000000..29ed3c8
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/workload/TimeIntervalTransactionsGeneratorTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TimeIntervalTransactionsGeneratorTest {
+    @Test
+    public void testCommitsTransactionAfterIntervalPasses() {
+        MockTime time = new MockTime();
+        TimeIntervalTransactionsGenerator generator = new 
TimeIntervalTransactionsGenerator(100, time);
+
+        assertEquals(100, generator.transactionIntervalMs());
+        assertEquals(TransactionGenerator.TransactionAction.BEGIN_TRANSACTION, 
generator.nextAction());
+        assertEquals(TransactionGenerator.TransactionAction.NO_OP, 
generator.nextAction());
+        time.sleep(50);
+        assertEquals(TransactionGenerator.TransactionAction.NO_OP, 
generator.nextAction());
+        time.sleep(49);
+        assertEquals(TransactionGenerator.TransactionAction.NO_OP, 
generator.nextAction());
+        time.sleep(1);
+        
assertEquals(TransactionGenerator.TransactionAction.COMMIT_TRANSACTION, 
generator.nextAction());
+        assertEquals(TransactionGenerator.TransactionAction.BEGIN_TRANSACTION, 
generator.nextAction());
+    }
+}

Reply via email to