Repository: kafka
Updated Branches:
  refs/heads/trunk fc4ef4791 -> 2e4aed707


KAFKA-2516: Rename o.a.k.client.tools to o.a.k.tools

Author: Grant Henke <granthe...@gmail.com>

Reviewers: Gwen Shapira, Ewen Cheslack-Postava

Closes #310 from granthenke/tools-packaging


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2e4aed70
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2e4aed70
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2e4aed70

Branch: refs/heads/trunk
Commit: 2e4aed7070f0283e2c1e0e563fdb3324482463a5
Parents: fc4ef47
Author: Grant Henke <granthe...@gmail.com>
Authored: Tue Oct 27 07:44:32 2015 -0700
Committer: Gwen Shapira <csh...@gmail.com>
Committed: Tue Oct 27 07:44:32 2015 -0700

----------------------------------------------------------------------
 bin/kafka-verifiable-producer.sh                |   2 +-
 checkstyle/import-control.xml                   |  14 +-
 .../kafkatest/services/kafka_log4j_appender.py  |   2 +-
 .../performance/producer_performance.py         |   2 +-
 .../clients/tools/ProducerPerformance.java      | 201 ------------
 .../clients/tools/ThroughputThrottler.java      | 118 -------
 .../clients/tools/VerifiableLog4jAppender.java  | 162 ----------
 .../kafka/clients/tools/VerifiableProducer.java | 324 -------------------
 .../apache/kafka/tools/ProducerPerformance.java | 201 ++++++++++++
 .../apache/kafka/tools/ThroughputThrottler.java | 117 +++++++
 .../kafka/tools/VerifiableLog4jAppender.java    | 162 ++++++++++
 .../apache/kafka/tools/VerifiableProducer.java  | 324 +++++++++++++++++++
 12 files changed, 814 insertions(+), 815 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2e4aed70/bin/kafka-verifiable-producer.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-verifiable-producer.sh b/bin/kafka-verifiable-producer.sh
index d0aa6c5..98fe557 100755
--- a/bin/kafka-verifiable-producer.sh
+++ b/bin/kafka-verifiable-producer.sh
@@ -17,4 +17,4 @@
 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
     export KAFKA_HEAP_OPTS="-Xmx512M"
 fi
-exec $(dirname $0)/kafka-run-class.sh 
org.apache.kafka.clients.tools.VerifiableProducer $@
+exec $(dirname $0)/kafka-run-class.sh 
org.apache.kafka.tools.VerifiableProducer $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e4aed70/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index e1ea93c..187bee8 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -105,14 +105,14 @@
     <subpackage name="producer">
       <allow pkg="org.apache.kafka.clients.producer" />
     </subpackage>
+  </subpackage>
 
-    <subpackage name="tools">
-      <allow pkg="org.apache.kafka.clients.producer" />
-      <allow pkg="org.apache.kafka.clients.consumer" />
-      <allow pkg="com.fasterxml.jackson" />
-      <allow pkg="net.sourceforge.argparse4j" />
-      <allow pkg="org.apache.log4j" />
-    </subpackage>
+  <subpackage name="tools">
+    <allow pkg="org.apache.kafka.clients.producer" />
+    <allow pkg="org.apache.kafka.clients.consumer" />
+    <allow pkg="com.fasterxml.jackson" />
+    <allow pkg="net.sourceforge.argparse4j" />
+    <allow pkg="org.apache.log4j" />
   </subpackage>
 
   <subpackage name="streams">

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e4aed70/tests/kafkatest/services/kafka_log4j_appender.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka_log4j_appender.py 
b/tests/kafkatest/services/kafka_log4j_appender.py
index 11369aa..ff6bb18 100644
--- a/tests/kafkatest/services/kafka_log4j_appender.py
+++ b/tests/kafkatest/services/kafka_log4j_appender.py
@@ -38,7 +38,7 @@ class KafkaLog4jAppender(BackgroundThreadService):
 
     @property
     def start_cmd(self):
-        cmd = "/opt/kafka/bin/kafka-run-class.sh 
org.apache.kafka.clients.tools.VerifiableLog4jAppender" \
+        cmd = "/opt/kafka/bin/kafka-run-class.sh 
org.apache.kafka.tools.VerifiableLog4jAppender" \
               " --topic %s --broker-list %s" % (self.topic, 
self.kafka.bootstrap_servers())
         if self.max_messages > 0:
             cmd += " --max-messages %s" % str(self.max_messages)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e4aed70/tests/kafkatest/services/performance/producer_performance.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/performance/producer_performance.py 
b/tests/kafkatest/services/performance/producer_performance.py
index f842026..25911af 100644
--- a/tests/kafkatest/services/performance/producer_performance.py
+++ b/tests/kafkatest/services/performance/producer_performance.py
@@ -46,7 +46,7 @@ class ProducerPerformanceService(JmxMixin, 
PerformanceService):
     def _worker(self, idx, node):
         args = self.args.copy()
         args.update({'bootstrap_servers': self.kafka.bootstrap_servers(), 
'jmx_port': self.jmx_port, 'client_id': self.client_id})
-        cmd = "JMX_PORT=%(jmx_port)d /opt/kafka/bin/kafka-run-class.sh 
org.apache.kafka.clients.tools.ProducerPerformance " \
+        cmd = "JMX_PORT=%(jmx_port)d /opt/kafka/bin/kafka-run-class.sh 
org.apache.kafka.tools.ProducerPerformance " \
               "%(topic)s %(num_records)d %(record_size)d %(throughput)d 
bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % args
 
         self.security_config.setup_node(node)

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e4aed70/tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git 
a/tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java 
b/tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
deleted file mode 100644
index 1a9cf04..0000000
--- 
a/tools/src/main/java/org/apache/kafka/clients/tools/ProducerPerformance.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.clients.tools;
-
-import java.util.Arrays;
-import java.util.Properties;
-
-import org.apache.kafka.clients.producer.*;
-
-public class ProducerPerformance {
-
-    public static void main(String[] args) throws Exception {
-        if (args.length < 4) {
-            System.err.println("USAGE: java " + 
ProducerPerformance.class.getName() +
-                               " topic_name num_records record_size 
target_records_sec [prop_name=prop_value]*");
-            System.exit(1);
-        }
-
-        /* parse args */
-        String topicName = args[0];
-        long numRecords = Long.parseLong(args[1]);
-        int recordSize = Integer.parseInt(args[2]);
-        int throughput = Integer.parseInt(args[3]);
-
-        Properties props = new Properties();
-        for (int i = 4; i < args.length; i++) {
-            String[] pieces = args[i].split("=");
-            if (pieces.length != 2)
-                throw new IllegalArgumentException("Invalid property: " + 
args[i]);
-            props.put(pieces[0], pieces[1]);
-        }
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
-        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], 
byte[]>(props);
-
-        /* setup perf test */
-        byte[] payload = new byte[recordSize];
-        Arrays.fill(payload, (byte) 1);
-        ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], 
byte[]>(topicName, payload);
-        Stats stats = new Stats(numRecords, 5000);
-        long startMs = System.currentTimeMillis();
-
-        ThroughputThrottler throttler = new ThroughputThrottler(throughput, 
startMs);
-        for (int i = 0; i < numRecords; i++) {
-            long sendStartMs = System.currentTimeMillis();
-            Callback cb = stats.nextCompletion(sendStartMs, payload.length, 
stats);
-            producer.send(record, cb);
-
-            if (throttler.shouldThrottle(i, sendStartMs)) {
-                throttler.throttle();
-            }
-        }
-
-        /* print final results */
-        producer.close();
-        stats.printTotal();
-    }
-
-    private static class Stats {
-        private long start;
-        private long windowStart;
-        private int[] latencies;
-        private int sampling;
-        private int iteration;
-        private int index;
-        private long count;
-        private long bytes;
-        private int maxLatency;
-        private long totalLatency;
-        private long windowCount;
-        private int windowMaxLatency;
-        private long windowTotalLatency;
-        private long windowBytes;
-        private long reportingInterval;
-
-        public Stats(long numRecords, int reportingInterval) {
-            this.start = System.currentTimeMillis();
-            this.windowStart = System.currentTimeMillis();
-            this.index = 0;
-            this.iteration = 0;
-            this.sampling = (int) (numRecords / Math.min(numRecords, 500000));
-            this.latencies = new int[(int) (numRecords / this.sampling) + 1];
-            this.index = 0;
-            this.maxLatency = 0;
-            this.totalLatency = 0;
-            this.windowCount = 0;
-            this.windowMaxLatency = 0;
-            this.windowTotalLatency = 0;
-            this.windowBytes = 0;
-            this.totalLatency = 0;
-            this.reportingInterval = reportingInterval;
-        }
-
-        public void record(int iter, int latency, int bytes, long time) {
-            this.count++;
-            this.bytes += bytes;
-            this.totalLatency += latency;
-            this.maxLatency = Math.max(this.maxLatency, latency);
-            this.windowCount++;
-            this.windowBytes += bytes;
-            this.windowTotalLatency += latency;
-            this.windowMaxLatency = Math.max(windowMaxLatency, latency);
-            if (iter % this.sampling == 0) {
-                this.latencies[index] = latency;
-                this.index++;
-            }
-            /* maybe report the recent perf */
-            if (time - windowStart >= reportingInterval) {
-                printWindow();
-                newWindow();
-            }
-        }
-
-        public Callback nextCompletion(long start, int bytes, Stats stats) {
-            Callback cb = new PerfCallback(this.iteration, start, bytes, 
stats);
-            this.iteration++;
-            return cb;
-        }
-
-        public void printWindow() {
-            long ellapsed = System.currentTimeMillis() - windowStart;
-            double recsPerSec = 1000.0 * windowCount / (double) ellapsed;
-            double mbPerSec = 1000.0 * this.windowBytes / (double) ellapsed / 
(1024.0 * 1024.0);
-            System.out.printf("%d records sent, %.1f records/sec (%.2f 
MB/sec), %.1f ms avg latency, %.1f max latency.\n",
-                              windowCount,
-                              recsPerSec,
-                              mbPerSec,
-                              windowTotalLatency / (double) windowCount,
-                              (double) windowMaxLatency);
-        }
-
-        public void newWindow() {
-            this.windowStart = System.currentTimeMillis();
-            this.windowCount = 0;
-            this.windowMaxLatency = 0;
-            this.windowTotalLatency = 0;
-            this.windowBytes = 0;
-        }
-
-        public void printTotal() {
-            long elapsed = System.currentTimeMillis() - start;
-            double recsPerSec = 1000.0 * count / (double) elapsed;
-            double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 
* 1024.0);
-            int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 
0.999);
-            System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), 
%.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, 
%d ms 99.9th.\n",
-                              count,
-                              recsPerSec,
-                              mbPerSec,
-                              totalLatency / (double) count,
-                              (double) maxLatency,
-                              percs[0],
-                              percs[1],
-                              percs[2],
-                              percs[3]);
-        }
-
-        private static int[] percentiles(int[] latencies, int count, double... 
percentiles) {
-            int size = Math.min(count, latencies.length);
-            Arrays.sort(latencies, 0, size);
-            int[] values = new int[percentiles.length];
-            for (int i = 0; i < percentiles.length; i++) {
-                int index = (int) (percentiles[i] * size);
-                values[i] = latencies[index];
-            }
-            return values;
-        }
-    }
-
-    private static final class PerfCallback implements Callback {
-        private final long start;
-        private final int iteration;
-        private final int bytes;
-        private final Stats stats;
-
-        public PerfCallback(int iter, long start, int bytes, Stats stats) {
-            this.start = start;
-            this.stats = stats;
-            this.iteration = iter;
-            this.bytes = bytes;
-        }
-
-        public void onCompletion(RecordMetadata metadata, Exception exception) 
{
-            long now = System.currentTimeMillis();
-            int latency = (int) (now - start);
-            this.stats.record(iteration, latency, bytes, now);
-            if (exception != null)
-                exception.printStackTrace();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e4aed70/tools/src/main/java/org/apache/kafka/clients/tools/ThroughputThrottler.java
----------------------------------------------------------------------
diff --git 
a/tools/src/main/java/org/apache/kafka/clients/tools/ThroughputThrottler.java 
b/tools/src/main/java/org/apache/kafka/clients/tools/ThroughputThrottler.java
deleted file mode 100644
index 06c443f..0000000
--- 
a/tools/src/main/java/org/apache/kafka/clients/tools/ThroughputThrottler.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.tools;
-
-
-/**
- * This class helps producers throttle throughput.
- * 
- * If targetThroughput >= 0, the resulting average throughput will be 
approximately
- * min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0,
- * no throttling will occur. 
- * 
- * To use, do this between successive send attempts:
- * <pre>
- *     {@code     
- *      if (throttler.shouldThrottle(...)) {
- *          throttler.throttle();
- *      } 
- *     } 
- * </pre>
- * 
- * Note that this can be used to throttle message throughput or data 
throughput.
- */
-public class ThroughputThrottler {
-    
-    private static final long NS_PER_MS = 1000000L;
-    private static final long NS_PER_SEC = 1000 * NS_PER_MS;
-    private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
-
-    long sleepTimeNs;
-    long sleepDeficitNs = 0;
-    long targetThroughput = -1;
-    long startMs;
-
-    /**
-     * @param targetThroughput Can be messages/sec or bytes/sec
-     * @param startMs          When the very first message is sent
-     */
-    public ThroughputThrottler(long targetThroughput, long startMs) {
-        this.startMs = startMs;
-        this.targetThroughput = targetThroughput;
-        this.sleepTimeNs = targetThroughput > 0 ?
-                           NS_PER_SEC / targetThroughput : 
-                           Long.MAX_VALUE;
-    }
-
-    /**
-     * @param amountSoFar bytes produced so far if you want to throttle data 
throughput, or
-     *                    messages produced so far if you want to throttle 
message throughput.
-     * @param sendStartMs timestamp of the most recently sent message
-     * @return
-     */
-    public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
-        if (this.targetThroughput < 0) {
-            // No throttling in this case
-            return false;
-        }
-
-        float elapsedMs = (sendStartMs - startMs) / 1000.f;
-        return elapsedMs > 0 && (amountSoFar / elapsedMs) > 
this.targetThroughput;
-    }
-
-    /**
-     * Occasionally blocks for small amounts of time to achieve 
targetThroughput.
-     * 
-     * Note that if targetThroughput is 0, this will block extremely 
aggressively.
-     */
-    public void throttle() {
-        if (targetThroughput == 0) {
-            try {
-                Thread.sleep(Long.MAX_VALUE);
-            } catch (InterruptedException e) {
-                // do nothing
-            }
-            return;
-        }
-        
-        // throttle throughput by sleeping, on average,
-        // (1 / this.throughput) seconds between "things sent"
-        sleepDeficitNs += sleepTimeNs;
-
-        // If enough sleep deficit has accumulated, sleep a little
-        if (sleepDeficitNs >= MIN_SLEEP_NS) {
-            long sleepMs = sleepDeficitNs / 1000000;
-            long sleepNs = sleepDeficitNs - sleepMs * 1000000;
-
-            long sleepStartNs = System.nanoTime();
-            try {
-                Thread.sleep(sleepMs, (int) sleepNs);
-                sleepDeficitNs = 0;
-            } catch (InterruptedException e) {
-                // If sleep is cut short, reduce deficit by the amount of
-                // time we actually spent sleeping
-                long sleepElapsedNs = System.nanoTime() - sleepStartNs;
-                if (sleepElapsedNs <= sleepDeficitNs) {
-                    sleepDeficitNs -= sleepElapsedNs;
-                }
-            }
-        }
-    }
-}
-    
-    
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e4aed70/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableLog4jAppender.java
----------------------------------------------------------------------
diff --git 
a/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableLog4jAppender.java
 
b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableLog4jAppender.java
deleted file mode 100644
index bf289b7..0000000
--- 
a/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableLog4jAppender.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.tools;
-
-
-import net.sourceforge.argparse4j.ArgumentParsers;
-import net.sourceforge.argparse4j.inf.ArgumentParser;
-import net.sourceforge.argparse4j.inf.ArgumentParserException;
-import net.sourceforge.argparse4j.inf.Namespace;
-import org.apache.log4j.Logger;
-import org.apache.log4j.PropertyConfigurator;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import static net.sourceforge.argparse4j.impl.Arguments.store;
-
-/**
- * Primarily intended for use with system testing, this appender produces 
message
- * to Kafka on each "append" request. For example, this helps with end-to-end 
tests
- * of KafkaLog4jAppender.
- *
- * When used as a command-line tool, it appends increasing integers. It will 
produce a
- * fixed number of messages unless the default max-messages -1 is used, in 
which case
- * it appends indefinitely.
- */
-
-public class VerifiableLog4jAppender {
-    Logger logger = Logger.getLogger(VerifiableLog4jAppender.class);
-
-    // If maxMessages < 0, log until the process is killed externally
-    private long maxMessages = -1;
-
-    // Hook to trigger logging thread to stop logging messages
-    private volatile boolean stopLogging = false;
-
-    /** Get the command-line argument parser. */
-    private static ArgumentParser argParser() {
-        ArgumentParser parser = ArgumentParsers
-            .newArgumentParser("verifiable-log4j-appender")
-            .defaultHelp(true)
-            .description("This tool produces increasing integers to the 
specified topic using KafkaLog4jAppender.");
-
-        parser.addArgument("--topic")
-            .action(store())
-            .required(true)
-            .type(String.class)
-            .metavar("TOPIC")
-            .help("Produce messages to this topic.");
-
-        parser.addArgument("--broker-list")
-            .action(store())
-            .required(true)
-            .type(String.class)
-            .metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
-            .dest("brokerList")
-            .help("Comma-separated list of Kafka brokers in the form 
HOST1:PORT1,HOST2:PORT2,...");
-
-        parser.addArgument("--max-messages")
-            .action(store())
-            .required(false)
-            .setDefault(-1)
-            .type(Integer.class)
-            .metavar("MAX-MESSAGES")
-            .dest("maxMessages")
-            .help("Produce this many messages. If -1, produce messages until 
the process is killed externally.");
-
-        parser.addArgument("--acks")
-            .action(store())
-            .required(false)
-            .setDefault("-1")
-            .type(String.class)
-            .choices("0", "1", "-1")
-            .metavar("ACKS")
-            .help("Acks required on each produced message. See Kafka docs on 
request.required.acks for details.");
-
-        return parser;
-    }
-
-    /** Construct a VerifiableLog4jAppender object from command-line 
arguments. */
-    public static VerifiableLog4jAppender createFromArgs(String[] args) {
-        ArgumentParser parser = argParser();
-        VerifiableLog4jAppender producer = null;
-
-        try {
-            Namespace res = parser.parseArgs(args);
-
-            int maxMessages = res.getInt("maxMessages");
-            String topic = res.getString("topic");
-
-
-            Properties props = new Properties();
-            props.setProperty("log4j.rootLogger", "INFO, KAFKA");
-            props.setProperty("log4j.appender.KAFKA", 
"org.apache.kafka.log4jappender.KafkaLog4jAppender");
-            props.setProperty("log4j.appender.KAFKA.layout", 
"org.apache.log4j.PatternLayout");
-            props.setProperty("log4j.appender.KAFKA.layout.ConversionPattern", 
"%-5p: %c - %m%n");
-            props.setProperty("log4j.appender.KAFKA.BrokerList", 
res.getString("brokerList"));
-            props.setProperty("log4j.appender.KAFKA.Topic", topic);
-            props.setProperty("log4j.appender.KAFKA.RequiredNumAcks", 
res.getString("acks"));
-            props.setProperty("log4j.appender.KAFKA.SyncSend", "true");
-            props.setProperty("log4j.logger.kafka.log4j", "INFO, KAFKA");
-
-            producer = new VerifiableLog4jAppender(props, maxMessages);
-        } catch (ArgumentParserException e) {
-            if (args.length == 0) {
-                parser.printHelp();
-                System.exit(0);
-            } else {
-                parser.handleError(e);
-                System.exit(1);
-            }
-        }
-
-        return producer;
-    }
-
-
-    public VerifiableLog4jAppender(Properties props, int maxMessages) {
-        this.maxMessages = maxMessages;
-        PropertyConfigurator.configure(props);
-    }
-
-    public static void main(String[] args) throws IOException {
-
-        final VerifiableLog4jAppender appender = createFromArgs(args);
-        boolean infinite = appender.maxMessages < 0;
-
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                // Trigger main thread to stop producing messages
-                appender.stopLogging = true;
-            }
-        });
-
-        long maxMessages = infinite ? Long.MAX_VALUE : appender.maxMessages;
-        for (long i = 0; i < maxMessages; i++) {
-            if (appender.stopLogging) {
-                break;
-            }
-            appender.append(String.format("%d", i));
-        }
-    }
-
-    private void append(String msg) {
-        logger.info(msg);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e4aed70/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java
----------------------------------------------------------------------
diff --git 
a/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java 
b/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java
deleted file mode 100644
index a79f78e..0000000
--- a/tools/src/main/java/org/apache/kafka/clients/tools/VerifiableProducer.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.tools;
-
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.utils.Utils;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import static net.sourceforge.argparse4j.impl.Arguments.store;
-
-import net.sourceforge.argparse4j.ArgumentParsers;
-import net.sourceforge.argparse4j.inf.ArgumentParser;
-import net.sourceforge.argparse4j.inf.ArgumentParserException;
-import net.sourceforge.argparse4j.inf.Namespace;
-
-/**
- * Primarily intended for use with system testing, this producer prints 
metadata
- * in the form of JSON to stdout on each "send" request. For example, this 
helps
- * with end-to-end correctness tests by making externally visible which 
messages have been
- * acked and which have not.
- *
- * When used as a command-line tool, it produces increasing integers. It will 
produce a 
- * fixed number of messages unless the default max-messages -1 is used, in 
which case
- * it produces indefinitely.
- *  
- * If logging is left enabled, log output on stdout can be easily ignored by 
checking
- * whether a given line is valid JSON.
- */
-public class VerifiableProducer {
-    
-    String topic;
-    private Producer<String, String> producer;
-    // If maxMessages < 0, produce until the process is killed externally
-    private long maxMessages = -1;
-    
-    // Number of messages for which acks were received
-    private long numAcked = 0;
-    
-    // Number of send attempts
-    private long numSent = 0;
-    
-    // Throttle message throughput if this is set >= 0
-    private long throughput;
-    
-    // Hook to trigger producing thread to stop sending messages
-    private boolean stopProducing = false;
-
-    public VerifiableProducer(
-            Properties producerProps, String topic, int throughput, int 
maxMessages) {
-
-        this.topic = topic;
-        this.throughput = throughput;
-        this.maxMessages = maxMessages;
-        this.producer = new KafkaProducer<String, String>(producerProps);
-    }
-
-    /** Get the command-line argument parser. */
-    private static ArgumentParser argParser() {
-        ArgumentParser parser = ArgumentParsers
-                .newArgumentParser("verifiable-producer")
-                .defaultHelp(true)
-                .description("This tool produces increasing integers to the 
specified topic and prints JSON metadata to stdout on each \"send\" request, 
making externally visible which messages have been acked and which have not.");
-
-        parser.addArgument("--topic")
-                .action(store())
-                .required(true)
-                .type(String.class)
-                .metavar("TOPIC")
-                .help("Produce messages to this topic.");
-
-        parser.addArgument("--broker-list")
-                .action(store())
-                .required(true)
-                .type(String.class)
-                .metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
-                .dest("brokerList")
-                .help("Comma-separated list of Kafka brokers in the form 
HOST1:PORT1,HOST2:PORT2,...");
-        
-        parser.addArgument("--max-messages")
-                .action(store())
-                .required(false)
-                .setDefault(-1)
-                .type(Integer.class)
-                .metavar("MAX-MESSAGES")
-                .dest("maxMessages")
-                .help("Produce this many messages. If -1, produce messages 
until the process is killed externally.");
-
-        parser.addArgument("--throughput")
-                .action(store())
-                .required(false)
-                .setDefault(-1)
-                .type(Integer.class)
-                .metavar("THROUGHPUT")
-                .help("If set >= 0, throttle maximum message throughput to 
*approximately* THROUGHPUT messages/sec.");
-
-        parser.addArgument("--acks")
-                .action(store())
-                .required(false)
-                .setDefault(-1)
-                .type(Integer.class)
-                .choices(0, 1, -1)
-                .metavar("ACKS")
-                .help("Acks required on each produced message. See Kafka docs 
on request.required.acks for details.");
-
-        parser.addArgument("--producer.config")
-                .action(store())
-                .required(false)
-                .type(String.class)
-                .metavar("CONFIG_FILE")
-                .help("Producer config properties file.");
-
-        return parser;
-    }
-  
-    /** Construct a VerifiableProducer object from command-line arguments. */
-    public static VerifiableProducer createFromArgs(String[] args) {
-        ArgumentParser parser = argParser();
-        VerifiableProducer producer = null;
-        
-        try {
-            Namespace res;
-            res = parser.parseArgs(args);
-
-            int maxMessages = res.getInt("maxMessages");
-            String topic = res.getString("topic");
-            int throughput = res.getInt("throughput");
-            String configFile = res.getString("producer.config");
-
-            Properties producerProps = new Properties();
-            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
res.getString("brokerList"));
-            producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                              
"org.apache.kafka.common.serialization.StringSerializer");
-            producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                              
"org.apache.kafka.common.serialization.StringSerializer");
-            producerProps.put(ProducerConfig.ACKS_CONFIG, 
Integer.toString(res.getInt("acks")));
-            // No producer retries
-            producerProps.put("retries", "0");
-            if (configFile != null) {
-                try {
-                    producerProps.putAll(Utils.loadProps(configFile));
-                } catch (IOException e) {
-                    throw new ArgumentParserException(e.getMessage(), parser);
-                }
-            }
-
-            producer = new VerifiableProducer(producerProps, topic, 
throughput, maxMessages);
-        } catch (ArgumentParserException e) {
-            if (args.length == 0) {
-                parser.printHelp();
-                System.exit(0);
-            } else {
-                parser.handleError(e);
-                System.exit(1);
-            }
-        }
-        
-        return producer;
-    }
-  
-    /** Produce a message with given key and value. */
-    public void send(String key, String value) {
-        ProducerRecord<String, String> record = new ProducerRecord<String, 
String>(topic, key, value);
-        numSent++;
-        try {
-            producer.send(record, new PrintInfoCallback(key, value));
-        } catch (Exception e) {
-
-            synchronized (System.out) {
-                System.out.println(errorString(e, key, value, 
System.currentTimeMillis()));
-            }
-        }
-    }
-  
-    /** Close the producer to flush any remaining messages. */
-    public void close() {
-        producer.close();
-    }
-  
-    /**
-     * Return JSON string encapsulating basic information about the exception, 
as well
-     * as the key and value which triggered the exception.
-     */
-    String errorString(Exception e, String key, String value, Long nowMs) {
-        assert e != null : "Expected non-null exception.";
-
-        Map<String, Object> errorData = new HashMap<>();
-        errorData.put("class", this.getClass().toString());
-        errorData.put("name", "producer_send_error");
-
-        errorData.put("time_ms", nowMs);
-        errorData.put("exception", e.getClass().toString());
-        errorData.put("message", e.getMessage());
-        errorData.put("topic", this.topic);
-        errorData.put("key", key);
-        errorData.put("value", value);
-        
-        return toJsonString(errorData);
-    }
-  
-    String successString(RecordMetadata recordMetadata, String key, String 
value, Long nowMs) {
-        assert recordMetadata != null : "Expected non-null recordMetadata 
object.";
-
-        Map<String, Object> successData = new HashMap<>();
-        successData.put("class", this.getClass().toString());
-        successData.put("name", "producer_send_success");
-
-        successData.put("time_ms", nowMs);
-        successData.put("topic", this.topic);
-        successData.put("partition", recordMetadata.partition());
-        successData.put("offset", recordMetadata.offset());
-        successData.put("key", key);
-        successData.put("value", value);
-        
-        return toJsonString(successData);
-    }
-    
-    private String toJsonString(Map<String, Object> data) {
-        String json;
-        try {
-            ObjectMapper mapper = new ObjectMapper();
-            json = mapper.writeValueAsString(data);
-        } catch (JsonProcessingException e) {
-            json = "Bad data can't be written as json: " + e.getMessage();
-        }
-        return json;
-    }
-  
-    /** Callback which prints errors to stdout when the producer fails to 
send. */
-    private class PrintInfoCallback implements Callback {
-        
-        private String key;
-        private String value;
-    
-        PrintInfoCallback(String key, String value) {
-            this.key = key;
-            this.value = value;
-        }
-    
-        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
-            synchronized (System.out) {
-                if (e == null) {
-                    VerifiableProducer.this.numAcked++;
-                    System.out.println(successString(recordMetadata, this.key, 
this.value, System.currentTimeMillis()));
-                } else {
-                    System.out.println(errorString(e, this.key, this.value, 
System.currentTimeMillis()));
-                }
-            }
-        }
-    }
-  
-    public static void main(String[] args) throws IOException {
-        
-        final VerifiableProducer producer = createFromArgs(args);
-        final long startMs = System.currentTimeMillis();
-        boolean infinite = producer.maxMessages < 0;
-
-        Runtime.getRuntime().addShutdownHook(new Thread() {
-            @Override
-            public void run() {
-                // Trigger main thread to stop producing messages
-                producer.stopProducing = true;
-                
-                // Flush any remaining messages
-                producer.close();
-
-                // Print a summary
-                long stopMs = System.currentTimeMillis();
-                double avgThroughput = 1000 * ((producer.numAcked) / (double) 
(stopMs - startMs));
-                
-                Map<String, Object> data = new HashMap<>();
-                data.put("class", producer.getClass().toString());
-                data.put("name", "tool_data");
-                data.put("sent", producer.numSent);
-                data.put("acked", producer.numAcked);
-                data.put("target_throughput", producer.throughput);
-                data.put("avg_throughput", avgThroughput);
-                
-                System.out.println(producer.toJsonString(data));
-            }
-        });
-
-        ThroughputThrottler throttler = new 
ThroughputThrottler(producer.throughput, startMs);
-        long maxMessages = infinite ? Long.MAX_VALUE : producer.maxMessages;
-        for (long i = 0; i < maxMessages; i++) {
-            if (producer.stopProducing) {
-                break;
-            }
-            long sendStartMs = System.currentTimeMillis();
-            producer.send(null, String.format("%d", i));
-            
-            if (throttler.shouldThrottle(i, sendStartMs)) {
-                throttler.throttle();
-            }
-        }
-    }
-        
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e4aed70/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
----------------------------------------------------------------------
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java 
b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
new file mode 100644
index 0000000..3a90a10
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.*;
+
+public class ProducerPerformance {
+
+    public static void main(String[] args) throws Exception {
+        if (args.length < 4) {
+            System.err.println("USAGE: java " + 
ProducerPerformance.class.getName() +
+                               " topic_name num_records record_size 
target_records_sec [prop_name=prop_value]*");
+            System.exit(1);
+        }
+
+        /* parse args */
+        String topicName = args[0];
+        long numRecords = Long.parseLong(args[1]);
+        int recordSize = Integer.parseInt(args[2]);
+        int throughput = Integer.parseInt(args[3]);
+
+        Properties props = new Properties();
+        for (int i = 4; i < args.length; i++) {
+            String[] pieces = args[i].split("=");
+            if (pieces.length != 2)
+                throw new IllegalArgumentException("Invalid property: " + 
args[i]);
+            props.put(pieces[0], pieces[1]);
+        }
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
+        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], 
byte[]>(props);
+
+        /* setup perf test */
+        byte[] payload = new byte[recordSize];
+        Arrays.fill(payload, (byte) 1);
+        ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], 
byte[]>(topicName, payload);
+        Stats stats = new Stats(numRecords, 5000);
+        long startMs = System.currentTimeMillis();
+
+        ThroughputThrottler throttler = new ThroughputThrottler(throughput, 
startMs);
+        for (int i = 0; i < numRecords; i++) {
+            long sendStartMs = System.currentTimeMillis();
+            Callback cb = stats.nextCompletion(sendStartMs, payload.length, 
stats);
+            producer.send(record, cb);
+
+            if (throttler.shouldThrottle(i, sendStartMs)) {
+                throttler.throttle();
+            }
+        }
+
+        /* print final results */
+        producer.close();
+        stats.printTotal();
+    }
+
+    private static class Stats {
+        private long start;
+        private long windowStart;
+        private int[] latencies;
+        private int sampling;
+        private int iteration;
+        private int index;
+        private long count;
+        private long bytes;
+        private int maxLatency;
+        private long totalLatency;
+        private long windowCount;
+        private int windowMaxLatency;
+        private long windowTotalLatency;
+        private long windowBytes;
+        private long reportingInterval;
+
+        public Stats(long numRecords, int reportingInterval) {
+            this.start = System.currentTimeMillis();
+            this.windowStart = System.currentTimeMillis();
+            this.index = 0;
+            this.iteration = 0;
+            this.sampling = (int) (numRecords / Math.min(numRecords, 500000));
+            this.latencies = new int[(int) (numRecords / this.sampling) + 1];
+            this.index = 0;
+            this.maxLatency = 0;
+            this.totalLatency = 0;
+            this.windowCount = 0;
+            this.windowMaxLatency = 0;
+            this.windowTotalLatency = 0;
+            this.windowBytes = 0;
+            this.totalLatency = 0;
+            this.reportingInterval = reportingInterval;
+        }
+
+        public void record(int iter, int latency, int bytes, long time) {
+            this.count++;
+            this.bytes += bytes;
+            this.totalLatency += latency;
+            this.maxLatency = Math.max(this.maxLatency, latency);
+            this.windowCount++;
+            this.windowBytes += bytes;
+            this.windowTotalLatency += latency;
+            this.windowMaxLatency = Math.max(windowMaxLatency, latency);
+            if (iter % this.sampling == 0) {
+                this.latencies[index] = latency;
+                this.index++;
+            }
+            /* maybe report the recent perf */
+            if (time - windowStart >= reportingInterval) {
+                printWindow();
+                newWindow();
+            }
+        }
+
+        public Callback nextCompletion(long start, int bytes, Stats stats) {
+            Callback cb = new PerfCallback(this.iteration, start, bytes, 
stats);
+            this.iteration++;
+            return cb;
+        }
+
+        public void printWindow() {
+            long ellapsed = System.currentTimeMillis() - windowStart;
+            double recsPerSec = 1000.0 * windowCount / (double) ellapsed;
+            double mbPerSec = 1000.0 * this.windowBytes / (double) ellapsed / 
(1024.0 * 1024.0);
+            System.out.printf("%d records sent, %.1f records/sec (%.2f 
MB/sec), %.1f ms avg latency, %.1f max latency.\n",
+                              windowCount,
+                              recsPerSec,
+                              mbPerSec,
+                              windowTotalLatency / (double) windowCount,
+                              (double) windowMaxLatency);
+        }
+
+        public void newWindow() {
+            this.windowStart = System.currentTimeMillis();
+            this.windowCount = 0;
+            this.windowMaxLatency = 0;
+            this.windowTotalLatency = 0;
+            this.windowBytes = 0;
+        }
+
+        public void printTotal() {
+            long elapsed = System.currentTimeMillis() - start;
+            double recsPerSec = 1000.0 * count / (double) elapsed;
+            double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 
* 1024.0);
+            int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 
0.999);
+            System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), 
%.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, 
%d ms 99.9th.\n",
+                              count,
+                              recsPerSec,
+                              mbPerSec,
+                              totalLatency / (double) count,
+                              (double) maxLatency,
+                              percs[0],
+                              percs[1],
+                              percs[2],
+                              percs[3]);
+        }
+
+        private static int[] percentiles(int[] latencies, int count, double... 
percentiles) {
+            int size = Math.min(count, latencies.length);
+            Arrays.sort(latencies, 0, size);
+            int[] values = new int[percentiles.length];
+            for (int i = 0; i < percentiles.length; i++) {
+                int index = (int) (percentiles[i] * size);
+                values[i] = latencies[index];
+            }
+            return values;
+        }
+    }
+
+    private static final class PerfCallback implements Callback {
+        private final long start;
+        private final int iteration;
+        private final int bytes;
+        private final Stats stats;
+
+        public PerfCallback(int iter, long start, int bytes, Stats stats) {
+            this.start = start;
+            this.stats = stats;
+            this.iteration = iter;
+            this.bytes = bytes;
+        }
+
+        public void onCompletion(RecordMetadata metadata, Exception exception) 
{
+            long now = System.currentTimeMillis();
+            int latency = (int) (now - start);
+            this.stats.record(iteration, latency, bytes, now);
+            if (exception != null)
+                exception.printStackTrace();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e4aed70/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java
----------------------------------------------------------------------
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java 
b/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java
new file mode 100644
index 0000000..d8deb22
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/ThroughputThrottler.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+
+/**
+ * This class helps producers throttle throughput.
+ *
+ * If targetThroughput >= 0, the resulting average throughput will be 
approximately
+ * min(targetThroughput, maximumPossibleThroughput). If targetThroughput < 0,
+ * no throttling will occur.
+ *
+ * To use, do this between successive send attempts:
+ * <pre>
+ *     {@code
+ *      if (throttler.shouldThrottle(...)) {
+ *          throttler.throttle();
+ *      }
+ *     }
+ * </pre>
+ *
+ * Note that this can be used to throttle message throughput or data 
throughput.
+ */
+public class ThroughputThrottler {
+
+    private static final long NS_PER_MS = 1000000L;
+    private static final long NS_PER_SEC = 1000 * NS_PER_MS;
+    private static final long MIN_SLEEP_NS = 2 * NS_PER_MS;
+
+    long sleepTimeNs;
+    long sleepDeficitNs = 0;
+    long targetThroughput = -1;
+    long startMs;
+
+    /**
+     * @param targetThroughput Can be messages/sec or bytes/sec
+     * @param startMs          When the very first message is sent
+     */
+    public ThroughputThrottler(long targetThroughput, long startMs) {
+        this.startMs = startMs;
+        this.targetThroughput = targetThroughput;
+        this.sleepTimeNs = targetThroughput > 0 ?
+                           NS_PER_SEC / targetThroughput :
+                           Long.MAX_VALUE;
+    }
+
+    /**
+     * @param amountSoFar bytes produced so far if you want to throttle data 
throughput, or
+     *                    messages produced so far if you want to throttle 
message throughput.
+     * @param sendStartMs timestamp of the most recently sent message
+     * @return
+     */
+    public boolean shouldThrottle(long amountSoFar, long sendStartMs) {
+        if (this.targetThroughput < 0) {
+            // No throttling in this case
+            return false;
+        }
+
+        float elapsedMs = (sendStartMs - startMs) / 1000.f;
+        return elapsedMs > 0 && (amountSoFar / elapsedMs) > 
this.targetThroughput;
+    }
+
+    /**
+     * Occasionally blocks for small amounts of time to achieve 
targetThroughput.
+     *
+     * Note that if targetThroughput is 0, this will block extremely 
aggressively.
+     */
+    public void throttle() {
+        if (targetThroughput == 0) {
+            try {
+                Thread.sleep(Long.MAX_VALUE);
+            } catch (InterruptedException e) {
+                // do nothing
+            }
+            return;
+        }
+
+        // throttle throughput by sleeping, on average,
+        // (1 / this.throughput) seconds between "things sent"
+        sleepDeficitNs += sleepTimeNs;
+
+        // If enough sleep deficit has accumulated, sleep a little
+        if (sleepDeficitNs >= MIN_SLEEP_NS) {
+            long sleepMs = sleepDeficitNs / 1000000;
+            long sleepNs = sleepDeficitNs - sleepMs * 1000000;
+
+            long sleepStartNs = System.nanoTime();
+            try {
+                Thread.sleep(sleepMs, (int) sleepNs);
+                sleepDeficitNs = 0;
+            } catch (InterruptedException e) {
+                // If sleep is cut short, reduce deficit by the amount of
+                // time we actually spent sleeping
+                long sleepElapsedNs = System.nanoTime() - sleepStartNs;
+                if (sleepElapsedNs <= sleepDeficitNs) {
+                    sleepDeficitNs -= sleepElapsedNs;
+                }
+            }
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e4aed70/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
----------------------------------------------------------------------
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java 
b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
new file mode 100644
index 0000000..64a099f
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PropertyConfigurator;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+/**
+ * Primarily intended for use with system testing, this appender produces 
message
+ * to Kafka on each "append" request. For example, this helps with end-to-end 
tests
+ * of KafkaLog4jAppender.
+ *
+ * When used as a command-line tool, it appends increasing integers. It will 
produce a
+ * fixed number of messages unless the default max-messages -1 is used, in 
which case
+ * it appends indefinitely.
+ */
+
+public class VerifiableLog4jAppender {
+    Logger logger = Logger.getLogger(VerifiableLog4jAppender.class);
+
+    // If maxMessages < 0, log until the process is killed externally
+    private long maxMessages = -1;
+
+    // Hook to trigger logging thread to stop logging messages
+    private volatile boolean stopLogging = false;
+
+    /** Get the command-line argument parser. */
+    private static ArgumentParser argParser() {
+        ArgumentParser parser = ArgumentParsers
+            .newArgumentParser("verifiable-log4j-appender")
+            .defaultHelp(true)
+            .description("This tool produces increasing integers to the 
specified topic using KafkaLog4jAppender.");
+
+        parser.addArgument("--topic")
+            .action(store())
+            .required(true)
+            .type(String.class)
+            .metavar("TOPIC")
+            .help("Produce messages to this topic.");
+
+        parser.addArgument("--broker-list")
+            .action(store())
+            .required(true)
+            .type(String.class)
+            .metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
+            .dest("brokerList")
+            .help("Comma-separated list of Kafka brokers in the form 
HOST1:PORT1,HOST2:PORT2,...");
+
+        parser.addArgument("--max-messages")
+            .action(store())
+            .required(false)
+            .setDefault(-1)
+            .type(Integer.class)
+            .metavar("MAX-MESSAGES")
+            .dest("maxMessages")
+            .help("Produce this many messages. If -1, produce messages until 
the process is killed externally.");
+
+        parser.addArgument("--acks")
+            .action(store())
+            .required(false)
+            .setDefault("-1")
+            .type(String.class)
+            .choices("0", "1", "-1")
+            .metavar("ACKS")
+            .help("Acks required on each produced message. See Kafka docs on 
request.required.acks for details.");
+
+        return parser;
+    }
+
+    /** Construct a VerifiableLog4jAppender object from command-line 
arguments. */
+    public static VerifiableLog4jAppender createFromArgs(String[] args) {
+        ArgumentParser parser = argParser();
+        VerifiableLog4jAppender producer = null;
+
+        try {
+            Namespace res = parser.parseArgs(args);
+
+            int maxMessages = res.getInt("maxMessages");
+            String topic = res.getString("topic");
+
+
+            Properties props = new Properties();
+            props.setProperty("log4j.rootLogger", "INFO, KAFKA");
+            props.setProperty("log4j.appender.KAFKA", 
"org.apache.kafka.log4jappender.KafkaLog4jAppender");
+            props.setProperty("log4j.appender.KAFKA.layout", 
"org.apache.log4j.PatternLayout");
+            props.setProperty("log4j.appender.KAFKA.layout.ConversionPattern", 
"%-5p: %c - %m%n");
+            props.setProperty("log4j.appender.KAFKA.BrokerList", 
res.getString("brokerList"));
+            props.setProperty("log4j.appender.KAFKA.Topic", topic);
+            props.setProperty("log4j.appender.KAFKA.RequiredNumAcks", 
res.getString("acks"));
+            props.setProperty("log4j.appender.KAFKA.SyncSend", "true");
+            props.setProperty("log4j.logger.kafka.log4j", "INFO, KAFKA");
+
+            producer = new VerifiableLog4jAppender(props, maxMessages);
+        } catch (ArgumentParserException e) {
+            if (args.length == 0) {
+                parser.printHelp();
+                System.exit(0);
+            } else {
+                parser.handleError(e);
+                System.exit(1);
+            }
+        }
+
+        return producer;
+    }
+
+
+    public VerifiableLog4jAppender(Properties props, int maxMessages) {
+        this.maxMessages = maxMessages;
+        PropertyConfigurator.configure(props);
+    }
+
+    public static void main(String[] args) throws IOException {
+
+        final VerifiableLog4jAppender appender = createFromArgs(args);
+        boolean infinite = appender.maxMessages < 0;
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                // Trigger main thread to stop producing messages
+                appender.stopLogging = true;
+            }
+        });
+
+        long maxMessages = infinite ? Long.MAX_VALUE : appender.maxMessages;
+        for (long i = 0; i < maxMessages; i++) {
+            if (appender.stopLogging) {
+                break;
+            }
+            appender.append(String.format("%d", i));
+        }
+    }
+
+    private void append(String msg) {
+        logger.info(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2e4aed70/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java 
b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
new file mode 100644
index 0000000..dd695cf
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.utils.Utils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.Namespace;
+
+/**
+ * Primarily intended for use with system testing, this producer prints 
metadata
+ * in the form of JSON to stdout on each "send" request. For example, this 
helps
+ * with end-to-end correctness tests by making externally visible which 
messages have been
+ * acked and which have not.
+ *
+ * When used as a command-line tool, it produces increasing integers. It will 
produce a
+ * fixed number of messages unless the default max-messages -1 is used, in 
which case
+ * it produces indefinitely.
+ *
+ * If logging is left enabled, log output on stdout can be easily ignored by 
checking
+ * whether a given line is valid JSON.
+ */
+public class VerifiableProducer {
+
+    String topic;
+    private Producer<String, String> producer;
+    // If maxMessages < 0, produce until the process is killed externally
+    private long maxMessages = -1;
+
+    // Number of messages for which acks were received
+    private long numAcked = 0;
+
+    // Number of send attempts
+    private long numSent = 0;
+
+    // Throttle message throughput if this is set >= 0
+    private long throughput;
+
+    // Hook to trigger producing thread to stop sending messages
+    private boolean stopProducing = false;
+
+    public VerifiableProducer(
+            Properties producerProps, String topic, int throughput, int 
maxMessages) {
+
+        this.topic = topic;
+        this.throughput = throughput;
+        this.maxMessages = maxMessages;
+        this.producer = new KafkaProducer<String, String>(producerProps);
+    }
+
+    /** Get the command-line argument parser. */
+    private static ArgumentParser argParser() {
+        ArgumentParser parser = ArgumentParsers
+                .newArgumentParser("verifiable-producer")
+                .defaultHelp(true)
+                .description("This tool produces increasing integers to the 
specified topic and prints JSON metadata to stdout on each \"send\" request, 
making externally visible which messages have been acked and which have not.");
+
+        parser.addArgument("--topic")
+                .action(store())
+                .required(true)
+                .type(String.class)
+                .metavar("TOPIC")
+                .help("Produce messages to this topic.");
+
+        parser.addArgument("--broker-list")
+                .action(store())
+                .required(true)
+                .type(String.class)
+                .metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
+                .dest("brokerList")
+                .help("Comma-separated list of Kafka brokers in the form 
HOST1:PORT1,HOST2:PORT2,...");
+
+        parser.addArgument("--max-messages")
+                .action(store())
+                .required(false)
+                .setDefault(-1)
+                .type(Integer.class)
+                .metavar("MAX-MESSAGES")
+                .dest("maxMessages")
+                .help("Produce this many messages. If -1, produce messages 
until the process is killed externally.");
+
+        parser.addArgument("--throughput")
+                .action(store())
+                .required(false)
+                .setDefault(-1)
+                .type(Integer.class)
+                .metavar("THROUGHPUT")
+                .help("If set >= 0, throttle maximum message throughput to 
*approximately* THROUGHPUT messages/sec.");
+
+        parser.addArgument("--acks")
+                .action(store())
+                .required(false)
+                .setDefault(-1)
+                .type(Integer.class)
+                .choices(0, 1, -1)
+                .metavar("ACKS")
+                .help("Acks required on each produced message. See Kafka docs 
on request.required.acks for details.");
+
+        parser.addArgument("--producer.config")
+                .action(store())
+                .required(false)
+                .type(String.class)
+                .metavar("CONFIG_FILE")
+                .help("Producer config properties file.");
+
+        return parser;
+    }
+
+    /** Construct a VerifiableProducer object from command-line arguments. */
+    public static VerifiableProducer createFromArgs(String[] args) {
+        ArgumentParser parser = argParser();
+        VerifiableProducer producer = null;
+
+        try {
+            Namespace res;
+            res = parser.parseArgs(args);
+
+            int maxMessages = res.getInt("maxMessages");
+            String topic = res.getString("topic");
+            int throughput = res.getInt("throughput");
+            String configFile = res.getString("producer.config");
+
+            Properties producerProps = new Properties();
+            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
res.getString("brokerList"));
+            producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                              
"org.apache.kafka.common.serialization.StringSerializer");
+            producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                              
"org.apache.kafka.common.serialization.StringSerializer");
+            producerProps.put(ProducerConfig.ACKS_CONFIG, 
Integer.toString(res.getInt("acks")));
+            // No producer retries
+            producerProps.put("retries", "0");
+            if (configFile != null) {
+                try {
+                    producerProps.putAll(Utils.loadProps(configFile));
+                } catch (IOException e) {
+                    throw new ArgumentParserException(e.getMessage(), parser);
+                }
+            }
+
+            producer = new VerifiableProducer(producerProps, topic, 
throughput, maxMessages);
+        } catch (ArgumentParserException e) {
+            if (args.length == 0) {
+                parser.printHelp();
+                System.exit(0);
+            } else {
+                parser.handleError(e);
+                System.exit(1);
+            }
+        }
+
+        return producer;
+    }
+
+    /** Produce a message with given key and value. */
+    public void send(String key, String value) {
+        ProducerRecord<String, String> record = new ProducerRecord<String, 
String>(topic, key, value);
+        numSent++;
+        try {
+            producer.send(record, new PrintInfoCallback(key, value));
+        } catch (Exception e) {
+
+            synchronized (System.out) {
+                System.out.println(errorString(e, key, value, 
System.currentTimeMillis()));
+            }
+        }
+    }
+
+    /** Close the producer to flush any remaining messages. */
+    public void close() {
+        producer.close();
+    }
+
+    /**
+     * Return JSON string encapsulating basic information about the exception, 
as well
+     * as the key and value which triggered the exception.
+     */
+    String errorString(Exception e, String key, String value, Long nowMs) {
+        assert e != null : "Expected non-null exception.";
+
+        Map<String, Object> errorData = new HashMap<>();
+        errorData.put("class", this.getClass().toString());
+        errorData.put("name", "producer_send_error");
+
+        errorData.put("time_ms", nowMs);
+        errorData.put("exception", e.getClass().toString());
+        errorData.put("message", e.getMessage());
+        errorData.put("topic", this.topic);
+        errorData.put("key", key);
+        errorData.put("value", value);
+
+        return toJsonString(errorData);
+    }
+
+    String successString(RecordMetadata recordMetadata, String key, String 
value, Long nowMs) {
+        assert recordMetadata != null : "Expected non-null recordMetadata 
object.";
+
+        Map<String, Object> successData = new HashMap<>();
+        successData.put("class", this.getClass().toString());
+        successData.put("name", "producer_send_success");
+
+        successData.put("time_ms", nowMs);
+        successData.put("topic", this.topic);
+        successData.put("partition", recordMetadata.partition());
+        successData.put("offset", recordMetadata.offset());
+        successData.put("key", key);
+        successData.put("value", value);
+
+        return toJsonString(successData);
+    }
+
+    private String toJsonString(Map<String, Object> data) {
+        String json;
+        try {
+            ObjectMapper mapper = new ObjectMapper();
+            json = mapper.writeValueAsString(data);
+        } catch (JsonProcessingException e) {
+            json = "Bad data can't be written as json: " + e.getMessage();
+        }
+        return json;
+    }
+
+    /** Callback which prints errors to stdout when the producer fails to 
send. */
+    private class PrintInfoCallback implements Callback {
+
+        private String key;
+        private String value;
+
+        PrintInfoCallback(String key, String value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
+            synchronized (System.out) {
+                if (e == null) {
+                    VerifiableProducer.this.numAcked++;
+                    System.out.println(successString(recordMetadata, this.key, 
this.value, System.currentTimeMillis()));
+                } else {
+                    System.out.println(errorString(e, this.key, this.value, 
System.currentTimeMillis()));
+                }
+            }
+        }
+    }
+
+    public static void main(String[] args) throws IOException {
+
+        final VerifiableProducer producer = createFromArgs(args);
+        final long startMs = System.currentTimeMillis();
+        boolean infinite = producer.maxMessages < 0;
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                // Trigger main thread to stop producing messages
+                producer.stopProducing = true;
+
+                // Flush any remaining messages
+                producer.close();
+
+                // Print a summary
+                long stopMs = System.currentTimeMillis();
+                double avgThroughput = 1000 * ((producer.numAcked) / (double) 
(stopMs - startMs));
+
+                Map<String, Object> data = new HashMap<>();
+                data.put("class", producer.getClass().toString());
+                data.put("name", "tool_data");
+                data.put("sent", producer.numSent);
+                data.put("acked", producer.numAcked);
+                data.put("target_throughput", producer.throughput);
+                data.put("avg_throughput", avgThroughput);
+
+                System.out.println(producer.toJsonString(data));
+            }
+        });
+
+        ThroughputThrottler throttler = new 
ThroughputThrottler(producer.throughput, startMs);
+        long maxMessages = infinite ? Long.MAX_VALUE : producer.maxMessages;
+        for (long i = 0; i < maxMessages; i++) {
+            if (producer.stopProducing) {
+                break;
+            }
+            long sendStartMs = System.currentTimeMillis();
+            producer.send(null, String.format("%d", i));
+
+            if (throttler.shouldThrottle(i, sendStartMs)) {
+                throttler.throttle();
+            }
+        }
+    }
+
+}

Reply via email to