This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 0d55f0f KAFKA-8102: Add an interval-based Trogdor transaction generator (#6444) 0d55f0f is described below commit 0d55f0f3ec8f97bc250b325481f6f2fa70f52a5c Author: Stanislav Kozlovski <stanislav_kozlov...@outlook.com> AuthorDate: Mon Mar 25 09:58:11 2019 -0700 KAFKA-8102: Add an interval-based Trogdor transaction generator (#6444) This patch adds a TimeIntervalTransactionsGenerator class which enables the Trogdor ProduceBench worker to commit transactions based on a configurable millisecond time interval. Also, we now handle 409 create task responses in the coordinator command-line client by printing a more informative message Reviewers: Colin P. McCabe <cmcc...@apache.org> --- TROGDOR.md | 1 + tests/spec/transactional-produce-bench.json | 2 +- .../trogdor/coordinator/CoordinatorClient.java | 12 +++- .../kafka/trogdor/coordinator/TaskManager.java | 1 + .../TimeIntervalTransactionsGenerator.java | 67 ++++++++++++++++++++++ .../trogdor/workload/TransactionGenerator.java | 1 + .../TimeIntervalTransactionsGeneratorTest.java | 42 ++++++++++++++ 7 files changed, 123 insertions(+), 3 deletions(-) diff --git a/TROGDOR.md b/TROGDOR.md index b551773..8b446b3 100644 --- a/TROGDOR.md +++ b/TROGDOR.md @@ -105,6 +105,7 @@ Trogdor can run several workloads. Workloads perform operations on the cluster ### ProduceBench ProduceBench starts a Kafka producer on a single agent node, producing to several partitions. The workload measures the average produce latency, as well as the median, 95th percentile, and 99th percentile latency. +It can be configured to use a transactional producer which can commit transactions based on a set time interval or number of messages. ### RoundTripWorkload RoundTripWorkload tests both production and consumption. The workload starts a Kafka producer and consumer on a single node. The consumer will read back the messages that were produced by the producer. diff --git a/tests/spec/transactional-produce-bench.json b/tests/spec/transactional-produce-bench.json index 40f008b..bf1b6ca 100644 --- a/tests/spec/transactional-produce-bench.json +++ b/tests/spec/transactional-produce-bench.json @@ -15,7 +15,7 @@ // // An example task specification for running a transactional producer benchmark -in Trogdor. See TROGDOR.md for details. +// in Trogdor. See TROGDOR.md for details. // { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java index 476c32d..ba40e7b 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java @@ -44,6 +44,7 @@ import org.apache.kafka.trogdor.rest.TasksRequest; import org.apache.kafka.trogdor.rest.TaskState; import org.apache.kafka.trogdor.rest.TasksResponse; import org.apache.kafka.trogdor.task.TaskSpec; +import org.apache.kafka.trogdor.rest.RequestConflictException; import org.apache.kafka.trogdor.rest.UptimeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -426,8 +427,15 @@ public class CoordinatorClient { TaskSpec taskSpec = JsonUtil. objectFromCommandLineArgument(res.getString("taskSpec"), TaskSpec.class); CreateTaskRequest req = new CreateTaskRequest(taskId, taskSpec); - client.createTask(req); - System.out.printf("Sent CreateTaskRequest for task %s.%n", req.id()); + try { + client.createTask(req); + System.out.printf("Sent CreateTaskRequest for task %s.%n", req.id()); + } catch (RequestConflictException rce) { + System.out.printf("CreateTaskRequest for task %s got a 409 status code - " + + "a task with the same ID but a different specification already exists.%nException: %s%n", + req.id(), rce.getMessage()); + Exit.exit(1); + } break; } case "stopTask": { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java index 941656e..60e2b1e 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java @@ -302,6 +302,7 @@ public final class TaskManager { * * @param id The ID of the task to create. * @param spec The specification of the task to create. + * @throws RequestConflictException - if a task with the same ID but different spec exists */ public void createTask(final String id, TaskSpec spec) throws Throwable { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/TimeIntervalTransactionsGenerator.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/TimeIntervalTransactionsGenerator.java new file mode 100644 index 0000000..8d5f05b --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/TimeIntervalTransactionsGenerator.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.trogdor.workload; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.common.utils.Time; + +/** + * A transactions generator where we commit a transaction every N milliseconds + */ +public class TimeIntervalTransactionsGenerator implements TransactionGenerator { + + private static final long NULL_START_MS = -1; + + private final Time time; + private final int intervalMs; + + private long lastTransactionStartMs = NULL_START_MS; + + @JsonCreator + public TimeIntervalTransactionsGenerator(@JsonProperty("transactionIntervalMs") int intervalMs) { + this(intervalMs, Time.SYSTEM); + } + + TimeIntervalTransactionsGenerator(@JsonProperty("transactionIntervalMs") int intervalMs, + Time time) { + if (intervalMs < 1) { + throw new IllegalArgumentException("Cannot have a negative interval"); + } + this.time = time; + this.intervalMs = intervalMs; + } + + @JsonProperty + public int transactionIntervalMs() { + return intervalMs; + } + + @Override + public synchronized TransactionAction nextAction() { + if (lastTransactionStartMs == NULL_START_MS) { + lastTransactionStartMs = time.milliseconds(); + return TransactionAction.BEGIN_TRANSACTION; + } + if (time.milliseconds() - lastTransactionStartMs >= intervalMs) { + lastTransactionStartMs = NULL_START_MS; + return TransactionAction.COMMIT_TRANSACTION; + } + + return TransactionAction.NO_OP; + } +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/TransactionGenerator.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/TransactionGenerator.java index 5ec47ec..b2e8add 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/TransactionGenerator.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/TransactionGenerator.java @@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; property = "type") @JsonSubTypes(value = { @JsonSubTypes.Type(value = UniformTransactionsGenerator.class, name = "uniform"), + @JsonSubTypes.Type(value = TimeIntervalTransactionsGenerator.class, name = "interval"), }) public interface TransactionGenerator { enum TransactionAction { diff --git a/tools/src/test/java/org/apache/kafka/trogdor/workload/TimeIntervalTransactionsGeneratorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/workload/TimeIntervalTransactionsGeneratorTest.java new file mode 100644 index 0000000..29ed3c8 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/trogdor/workload/TimeIntervalTransactionsGeneratorTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.trogdor.workload; + +import org.apache.kafka.common.utils.MockTime; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TimeIntervalTransactionsGeneratorTest { + @Test + public void testCommitsTransactionAfterIntervalPasses() { + MockTime time = new MockTime(); + TimeIntervalTransactionsGenerator generator = new TimeIntervalTransactionsGenerator(100, time); + + assertEquals(100, generator.transactionIntervalMs()); + assertEquals(TransactionGenerator.TransactionAction.BEGIN_TRANSACTION, generator.nextAction()); + assertEquals(TransactionGenerator.TransactionAction.NO_OP, generator.nextAction()); + time.sleep(50); + assertEquals(TransactionGenerator.TransactionAction.NO_OP, generator.nextAction()); + time.sleep(49); + assertEquals(TransactionGenerator.TransactionAction.NO_OP, generator.nextAction()); + time.sleep(1); + assertEquals(TransactionGenerator.TransactionAction.COMMIT_TRANSACTION, generator.nextAction()); + assertEquals(TransactionGenerator.TransactionAction.BEGIN_TRANSACTION, generator.nextAction()); + } +}