Repository: incubator-distributedlog
Updated Branches:
  refs/heads/master 463f49727 -> 93bdad0ea


Improve distributedlog benchmark

- use batching interface
- fix the stats provider start issue

Author: Sijie Guo <sij...@twitter.com>

Reviewers: Leigh Stewart <lstew...@twitter.com>

Closes #34 from sijie/sijie/more_dl_benchmark_fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/93bdad0e
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/93bdad0e
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/93bdad0e

Branch: refs/heads/master
Commit: 93bdad0eabc916513d26c6d8ce38c71c572d1c5a
Parents: 463f497
Author: Sijie Guo <sij...@twitter.com>
Authored: Thu Oct 13 00:44:09 2016 -0700
Committer: Sijie Guo <sij...@twitter.com>
Committed: Thu Oct 13 00:44:09 2016 -0700

----------------------------------------------------------------------
 .../distributedlog/benchmark/Benchmarker.java   | 24 +++++-
 .../distributedlog/benchmark/ReaderWorker.java  | 50 +++++++++----
 .../distributedlog/benchmark/WriterWorker.java  | 79 ++++++++++++++------
 .../benchmark/utils/ShiftableRateLimiter.java   | 13 ++++
 .../client/DistributedLogMultiStreamWriter.java | 28 +++++--
 .../TestDistributedLogMultiStreamWriter.java    |  2 +-
 .../service/DistributedLogServerApp.java        |  1 -
 7 files changed, 146 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/93bdad0e/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
 
b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
index 7114dbb..ea5757d 100644
--- 
a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
+++ 
b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
@@ -79,6 +79,8 @@ public class Benchmarker {
     int sendBufferSize = 1024 * 1024;
     int recvBufferSize = 1024 * 1024;
     boolean enableBatching = false;
+    int batchBufferSize = 256 * 1024;
+    int batchFlushIntervalMicros = 2000;
 
     final DistributedLogConfiguration conf = new DistributedLogConfiguration();
     final StatsReceiver statsReceiver = new OstrichStatsReceiver();
@@ -116,7 +118,9 @@ public class Benchmarker {
         options.addOption("rfh", "read-from-head", false, "Read from head of 
the stream");
         options.addOption("sb", "send-buffer", true, "Channel send buffer 
size, in bytes");
         options.addOption("rb", "recv-buffer", true, "Channel recv buffer 
size, in bytes");
-        options.addOption("bt", "enable-batch", true, "Enable batching on 
writers");
+        options.addOption("bt", "enable-batch", false, "Enable batching on 
writers");
+        options.addOption("bbs", "batch-buffer-size", true, "The batch buffer 
size in bytes");
+        options.addOption("bfi", "batch-flush-interval", true, "The batch 
buffer flush interval in micros");
         options.addOption("h", "help", false, "Print usage.");
     }
 
@@ -217,6 +221,12 @@ public class Benchmarker {
         handshakeWithClientInfo = cmdline.hasOption("hsci");
         readFromHead = cmdline.hasOption("rfh");
         enableBatching = cmdline.hasOption("bt");
+        if (cmdline.hasOption("bbs")) {
+            batchBufferSize = Integer.parseInt(cmdline.getOptionValue("bbs"));
+        }
+        if (cmdline.hasOption("bfi")) {
+            batchFlushIntervalMicros = 
Integer.parseInt(cmdline.getOptionValue("bfi"));
+        }
 
         Preconditions.checkArgument(shardId >= 0, "shardId must be >= 0");
         Preconditions.checkArgument(numStreams > 0, "numStreams must be > 0");
@@ -295,7 +305,9 @@ public class Benchmarker {
                 handshakeWithClientInfo,
                 sendBufferSize,
                 recvBufferSize,
-                enableBatching);
+                enableBatching,
+                batchBufferSize,
+                batchFlushIntervalMicros);
     }
 
     protected WriterWorker createWriteWorker(
@@ -317,7 +329,9 @@ public class Benchmarker {
             boolean handshakeWithClientInfo,
             int sendBufferSize,
             int recvBufferSize,
-            boolean enableBatching) {
+            boolean enableBatching,
+            int batchBufferSize,
+            int batchFlushIntervalMicros) {
         return new WriterWorker(
                 streamPrefix,
                 uri,
@@ -337,7 +351,9 @@ public class Benchmarker {
                 handshakeWithClientInfo,
                 sendBufferSize,
                 recvBufferSize,
-                enableBatching);
+                enableBatching,
+                batchBufferSize,
+                batchFlushIntervalMicros);
     }
 
     Worker runDLWriter() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/93bdad0e/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
 
b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
index 5b34939..62cd78f 100644
--- 
a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
+++ 
b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
@@ -25,6 +25,7 @@ import com.twitter.distributedlog.AsyncLogReader;
 import com.twitter.distributedlog.DLSN;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.DistributedLogManager;
+import com.twitter.distributedlog.LogRecordSet;
 import com.twitter.distributedlog.LogRecordWithDLSN;
 import com.twitter.distributedlog.benchmark.thrift.Message;
 import com.twitter.distributedlog.client.serverset.DLZkServerSet;
@@ -55,6 +56,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -69,6 +71,7 @@ public class ReaderWorker implements Worker {
     final int startStreamId;
     final int endStreamId;
     final ScheduledExecutorService executorService;
+    final ExecutorService callbackExecutor;
     final DistributedLogNamespace namespace;
     final DistributedLogManager[] dlms;
     final AsyncLogReader[] logReaders;
@@ -94,7 +97,7 @@ public class ReaderWorker implements Worker {
     final Counter invalidRecordsCounter;
     final Counter outOfOrderSequenceIdCounter;
 
-    class StreamReader implements FutureEventListener<LogRecordWithDLSN>, 
Runnable, Gauge<Number> {
+    class StreamReader implements 
FutureEventListener<List<LogRecordWithDLSN>>, Runnable, Gauge<Number> {
 
         final int streamIdx;
         final String streamName;
@@ -109,7 +112,31 @@ public class ReaderWorker implements Worker {
         }
 
         @Override
-        public void onSuccess(final LogRecordWithDLSN record) {
+        public void onSuccess(final List<LogRecordWithDLSN> records) {
+            for (final LogRecordWithDLSN record : records) {
+                if (record.isRecordSet()) {
+                    try {
+                        processRecordSet(record);
+                    } catch (IOException e) {
+                        onFailure(e);
+                    }
+                } else {
+                    processRecord(record);
+                }
+            }
+            readLoop();
+        }
+
+        public void processRecordSet(final LogRecordWithDLSN record) throws 
IOException {
+            LogRecordSet.Reader reader = LogRecordSet.of(record);
+            LogRecordWithDLSN nextRecord = reader.nextRecord();
+            while (null != nextRecord) {
+                processRecord(nextRecord);
+                nextRecord = reader.nextRecord();
+            }
+        }
+
+        public void processRecord(final LogRecordWithDLSN record) {
             Message msg;
             try {
                 msg = Utils.parseMessage(record.getPayload());
@@ -132,17 +159,8 @@ public class ReaderWorker implements Worker {
             } else {
                 negativeDeliveryStat.registerSuccessfulEvent(-deliveryLatency);
             }
-            synchronized (this) {
-                if (record.getSequenceId() <= prevSequenceId
-                        || (prevSequenceId >= 0L && record.getSequenceId() != 
prevSequenceId + 1)) {
-                    outOfOrderSequenceIdCounter.inc();
-                    LOG.warn("Encountered decreasing sequence id for stream {} 
: previous = {}, current = {}",
-                            new Object[]{streamIdx, prevSequenceId, 
record.getSequenceId()});
-                }
-                prevSequenceId = record.getSequenceId();
-            }
+
             prevDLSN = record.getDlsn();
-            readLoop();
         }
 
         @Override
@@ -162,7 +180,7 @@ public class ReaderWorker implements Worker {
             if (!running) {
                 return;
             }
-            logReaders[streamIdx].readNext().addEventListener(this);
+            logReaders[streamIdx].readBulk(10).addEventListener(this);
         }
 
         @Override
@@ -228,9 +246,14 @@ public class ReaderWorker implements Worker {
         this.outOfOrderSequenceIdCounter = 
this.statsLogger.getCounter("out_of_order_seq_id");
         this.executorService = Executors.newScheduledThreadPool(
                 readThreadPoolSize, new 
ThreadFactoryBuilder().setNameFormat("benchmark.reader-%d").build());
+        this.callbackExecutor = Executors.newFixedThreadPool(
+                Runtime.getRuntime().availableProcessors(),
+                new 
ThreadFactoryBuilder().setNameFormat("benchmark.reader-callback-%d").build());
         this.finagleNames = finagleNames;
         this.serverSets = createServerSets(serverSetPaths);
 
+        conf.setDeserializeRecordSetOnReads(false);
+
         if (truncationIntervalInSeconds > 0 && (!finagleNames.isEmpty() || 
!serverSetPaths.isEmpty())) {
             // Construct client for truncation
             DistributedLogClientBuilder builder = 
DistributedLogClientBuilder.newBuilder()
@@ -412,6 +435,7 @@ public class ReaderWorker implements Worker {
         }
         namespace.close();
         SchedulerUtils.shutdownScheduler(executorService, 2, TimeUnit.MINUTES);
+        SchedulerUtils.shutdownScheduler(callbackExecutor, 2, 
TimeUnit.MINUTES);
         if (this.dlc != null) {
             this.dlc.close();
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/93bdad0e/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
 
b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
index 92fc090..a587375 100644
--- 
a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
+++ 
b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
@@ -18,7 +18,6 @@
 package com.twitter.distributedlog.benchmark;
 
 import com.google.common.base.Preconditions;
-
 import com.twitter.common.zookeeper.ServerSet;
 import com.twitter.distributedlog.DLSN;
 import com.twitter.distributedlog.benchmark.utils.ShiftableRateLimiter;
@@ -50,6 +49,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 public class WriterWorker implements Worker {
@@ -77,6 +77,8 @@ public class WriterWorker implements Worker {
     final int sendBufferSize;
     final int recvBufferSize;
     final boolean enableBatching;
+    final int batchBufferSize;
+    final int batchFlushIntervalMicros;
 
     volatile boolean running = true;
 
@@ -86,6 +88,9 @@ public class WriterWorker implements Worker {
     final StatsLogger exceptionsLogger;
     final StatsLogger dlErrorCodeLogger;
 
+    // callback thread
+    final ExecutorService executor;
+
     public WriterWorker(String streamPrefix,
                         URI uri,
                         int startStreamId,
@@ -104,7 +109,9 @@ public class WriterWorker implements Worker {
                         boolean handshakeWithClientInfo,
                         int sendBufferSize,
                         int recvBufferSize,
-                        boolean enableBatching) {
+                        boolean enableBatching,
+                        int batchBufferSize,
+                        int batchFlushIntervalMicros) {
         Preconditions.checkArgument(startStreamId <= endStreamId);
         Preconditions.checkArgument(!finagleNames.isEmpty() || 
!serverSetPaths.isEmpty());
         this.streamPrefix = streamPrefix;
@@ -129,8 +136,11 @@ public class WriterWorker implements Worker {
         this.sendBufferSize = sendBufferSize;
         this.recvBufferSize = recvBufferSize;
         this.enableBatching = enableBatching;
+        this.batchBufferSize = batchBufferSize;
+        this.batchFlushIntervalMicros = batchFlushIntervalMicros;
         this.finagleNames = finagleNames;
         this.serverSets = createServerSets(serverSetPaths);
+        this.executor = 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
 
         // Streams
         streamNames = new ArrayList<String>(endStreamId - startStreamId);
@@ -165,7 +175,7 @@ public class WriterWorker implements Worker {
             .hostConnectionCoresize(hostConnectionCoreSize)
             .tcpConnectTimeout(Duration$.MODULE$.fromMilliseconds(200))
             .connectTimeout(Duration$.MODULE$.fromMilliseconds(200))
-            .requestTimeout(Duration$.MODULE$.fromSeconds(2))
+            .requestTimeout(Duration$.MODULE$.fromSeconds(10))
             .sendBufferSize(sendBufferSize)
             .recvBufferSize(recvBufferSize);
 
@@ -177,7 +187,7 @@ public class WriterWorker implements Worker {
             .thriftmux(thriftmux)
             .redirectBackoffStartMs(100)
             .redirectBackoffMaxMs(500)
-            .requestTimeoutMs(2000)
+            .requestTimeoutMs(10000)
             .statsReceiver(statsReceiver)
             .streamNameRegex("^" + streamPrefix + "_[0-9]+$")
             .handshakeWithClientInfo(handshakeWithClientInfo)
@@ -230,9 +240,12 @@ public class WriterWorker implements Worker {
         return bufferList;
     }
 
-    class TimedRequestHandler implements FutureEventListener<DLSN> {
+    class TimedRequestHandler implements FutureEventListener<DLSN>, Runnable {
         final String streamName;
         final long requestMillis;
+        DLSN dlsn = null;
+        Throwable cause = null;
+
         TimedRequestHandler(String streamName,
                             long requestMillis) {
             this.streamName = streamName;
@@ -240,16 +253,27 @@ public class WriterWorker implements Worker {
         }
         @Override
         public void onSuccess(DLSN value) {
-            requestStat.registerSuccessfulEvent(System.currentTimeMillis() - 
requestMillis);
+            dlsn = value;
+            executor.submit(this);
         }
         @Override
         public void onFailure(Throwable cause) {
-            LOG.error("Failed to publish to {} : ", streamName, cause);
-            requestStat.registerFailedEvent(System.currentTimeMillis() - 
requestMillis);
-            exceptionsLogger.getCounter(cause.getClass().getName()).inc();
-            if (cause instanceof DLException) {
-                DLException dle = (DLException) cause;
-                dlErrorCodeLogger.getCounter(dle.getCode().toString()).inc();
+            this.cause = cause;
+            executor.submit(this);
+        }
+
+        @Override
+        public void run() {
+            if (null != dlsn) {
+                requestStat.registerSuccessfulEvent(System.currentTimeMillis() 
- requestMillis);
+            } else {
+                LOG.error("Failed to publish to {} : ", streamName, cause);
+                requestStat.registerFailedEvent(System.currentTimeMillis() - 
requestMillis);
+                exceptionsLogger.getCounter(cause.getClass().getName()).inc();
+                if (cause instanceof DLException) {
+                    DLException dle = (DLException) cause;
+                    
dlErrorCodeLogger.getCounter(dle.getCode().toString()).inc();
+                }
             }
         }
     }
@@ -259,6 +283,7 @@ public class WriterWorker implements Worker {
         final int idx;
         final DistributedLogClient dlc;
         DistributedLogMultiStreamWriter writer = null;
+        final ShiftableRateLimiter limiter;
 
         Writer(int idx) {
             this.idx = idx;
@@ -268,20 +293,22 @@ public class WriterWorker implements Worker {
                         .client(this.dlc)
                         .streams(streamNames)
                         .compressionCodec(CompressionCodec.Type.NONE)
-                        .flushIntervalMs(20)
-                        .bufferSize(64 * 1024)
-                        .firstSpeculativeTimeoutMs(50)
-                        .maxSpeculativeTimeoutMs(200)
+                        .flushIntervalMicros(batchFlushIntervalMicros)
+                        .bufferSize(batchBufferSize)
+                        .firstSpeculativeTimeoutMs(9000)
+                        .maxSpeculativeTimeoutMs(9000)
+                        .requestTimeoutMs(10000)
                         .speculativeBackoffMultiplier(2)
                         .build();
             }
+            this.limiter = rateLimiter.duplicate();
         }
 
         @Override
         public void run() {
             LOG.info("Started writer {}.", idx);
             while (running) {
-                rateLimiter.getLimiter().acquire();
+                this.limiter.getLimiter().acquire();
                 final String streamName = 
streamNames.get(random.nextInt(numStreams));
                 final long requestMillis = System.currentTimeMillis();
                 final ByteBuffer data = buildBuffer(requestMillis, 
messageSizeBytes);
@@ -337,14 +364,18 @@ public class WriterWorker implements Worker {
     public void run() {
         LOG.info("Starting writer (concurrency = {}, prefix = {}, batchSize = 
{})",
                  new Object[] { writeConcurrency, streamPrefix, batchSize });
-        for (int i = 0; i < writeConcurrency; i++) {
-            Runnable writer = null;
-            if (batchSize > 0) {
-                writer = new BulkWriter(i);
-            } else {
-                writer = new Writer(i);
+        try {
+            for (int i = 0; i < writeConcurrency; i++) {
+                Runnable writer = null;
+                if (batchSize > 0) {
+                    writer = new BulkWriter(i);
+                } else {
+                    writer = new Writer(i);
+                }
+                executorService.submit(writer);
             }
-            executorService.submit(writer);
+        } catch (Throwable t) {
+            LOG.error("Unhandled exception caught", t);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/93bdad0e/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/ShiftableRateLimiter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/ShiftableRateLimiter.java
 
b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/ShiftableRateLimiter.java
index 8f4ec10..ba51e81 100644
--- 
a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/ShiftableRateLimiter.java
+++ 
b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/ShiftableRateLimiter.java
@@ -31,6 +31,8 @@ public class ShiftableRateLimiter implements Runnable {
     private final RateLimiter rateLimiter;
     private final ScheduledExecutorService executor;
     private final double initialRate, maxRate, changeRate;
+    private final long changeInterval;
+    private final TimeUnit changeIntervalUnit;
     private double nextRate;
 
     public ShiftableRateLimiter(double initialRate,
@@ -42,11 +44,22 @@ public class ShiftableRateLimiter implements Runnable {
         this.maxRate = maxRate;
         this.changeRate = changeRate;
         this.nextRate = initialRate;
+        this.changeInterval = changeInterval;
+        this.changeIntervalUnit = changeIntervalUnit;
         this.rateLimiter = RateLimiter.create(initialRate);
         this.executor = Executors.newSingleThreadScheduledExecutor();
         this.executor.scheduleAtFixedRate(this, changeInterval, 
changeInterval, changeIntervalUnit);
     }
 
+    public ShiftableRateLimiter duplicate() {
+        return new ShiftableRateLimiter(
+                initialRate,
+                maxRate,
+                changeRate,
+                changeInterval,
+                changeIntervalUnit);
+    }
+
     @Override
     public void run() {
         this.nextRate = Math.min(nextRate + changeRate, maxRate);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/93bdad0e/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java
 
b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java
index 9682daf..fa3dceb 100644
--- 
a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java
+++ 
b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java
@@ -63,7 +63,7 @@ public class DistributedLogMultiStreamWriter implements 
Runnable {
         private DistributedLogClient _client = null;
         private List<String> _streams = null;
         private int _bufferSize = 16 * 1024; // 16k
-        private int _flushIntervalMs = 10; // 10ms
+        private long _flushIntervalMicros = 2000; // 2ms
         private CompressionCodec.Type _codec = CompressionCodec.Type.NONE;
         private ScheduledExecutorService _executorService = null;
         private long _requestTimeoutMs = 500; // 500ms
@@ -120,7 +120,19 @@ public class DistributedLogMultiStreamWriter implements 
Runnable {
          * @return builder
          */
         public Builder flushIntervalMs(int flushIntervalMs) {
-            this._flushIntervalMs = flushIntervalMs;
+            this._flushIntervalMicros = 
TimeUnit.MILLISECONDS.toMicros(flushIntervalMs);
+            return this;
+        }
+
+        /**
+         * Set the flush interval in microseconds.
+         *
+         * @param flushIntervalMicros
+         *          flush interval in microseconds.
+         * @return builder
+         */
+        public Builder flushIntervalMicros(int flushIntervalMicros) {
+            this._flushIntervalMicros = flushIntervalMicros;
             return this;
         }
 
@@ -247,7 +259,7 @@ public class DistributedLogMultiStreamWriter implements 
Runnable {
                     _streams,
                     _client,
                     Math.min(_bufferSize, MAX_LOGRECORDSET_SIZE),
-                    _flushIntervalMs,
+                    _flushIntervalMicros,
                     _requestTimeoutMs,
                     _firstSpeculativeTimeoutMs,
                     _maxSpeculativeTimeoutMs,
@@ -341,7 +353,7 @@ public class DistributedLogMultiStreamWriter implements 
Runnable {
     private DistributedLogMultiStreamWriter(List<String> streams,
                                             DistributedLogClient client,
                                             int bufferSize,
-                                            int flushIntervalMs,
+                                            long flushIntervalMicros,
                                             long requestTimeoutMs,
                                             int firstSpecultiveTimeoutMs,
                                             int maxSpeculativeTimeoutMs,
@@ -376,12 +388,12 @@ public class DistributedLogMultiStreamWriter implements 
Runnable {
         this.nextStreamId = new AtomicInteger(0);
         this.recordSetWriter = newRecordSetWriter();
 
-        if (flushIntervalMs > 0) {
+        if (flushIntervalMicros > 0) {
             this.scheduler.scheduleAtFixedRate(
                     this,
-                    flushIntervalMs,
-                    flushIntervalMs,
-                    TimeUnit.MILLISECONDS);
+                    flushIntervalMicros,
+                    flushIntervalMicros,
+                    TimeUnit.MICROSECONDS);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/93bdad0e/distributedlog-client/src/test/java/com/twitter/distributedlog/client/TestDistributedLogMultiStreamWriter.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/TestDistributedLogMultiStreamWriter.java
 
b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/TestDistributedLogMultiStreamWriter.java
index 4237377..dd205a6 100644
--- 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/TestDistributedLogMultiStreamWriter.java
+++ 
b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/TestDistributedLogMultiStreamWriter.java
@@ -156,7 +156,7 @@ public class TestDistributedLogMultiStreamWriter {
                 .flushIntervalMs(1000)
                 .scheduler(executorService)
                 .build();
-        verify(executorService, times(1)).scheduleAtFixedRate(writer, 1000, 
1000, TimeUnit.MILLISECONDS);
+        verify(executorService, times(1)).scheduleAtFixedRate(writer, 1000000, 
1000000, TimeUnit.MICROSECONDS);
     }
 
     @Test(timeout = 20000)

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/93bdad0e/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
index 3a9a987..a339261 100644
--- 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
+++ 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
@@ -118,7 +118,6 @@ public class DistributedLogServerApp {
                         return ReflectionUtils.newInstance(name, 
StatsProvider.class);
                     }
                 }).or(new NullStatsProvider());
-        statsProvider.start(dlConf);
 
         final DistributedLogServer server = DistributedLogServer.runServer(
                 getOptionalStringArg(cmdline, "u"),

Reply via email to