Repository: incubator-distributedlog Updated Branches: refs/heads/master 463f49727 -> 93bdad0ea
Improve distributedlog benchmark - use batching interface - fix the stats provider start issue Author: Sijie Guo <sij...@twitter.com> Reviewers: Leigh Stewart <lstew...@twitter.com> Closes #34 from sijie/sijie/more_dl_benchmark_fix Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/93bdad0e Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/93bdad0e Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/93bdad0e Branch: refs/heads/master Commit: 93bdad0eabc916513d26c6d8ce38c71c572d1c5a Parents: 463f497 Author: Sijie Guo <sij...@twitter.com> Authored: Thu Oct 13 00:44:09 2016 -0700 Committer: Sijie Guo <sij...@twitter.com> Committed: Thu Oct 13 00:44:09 2016 -0700 ---------------------------------------------------------------------- .../distributedlog/benchmark/Benchmarker.java | 24 +++++- .../distributedlog/benchmark/ReaderWorker.java | 50 +++++++++---- .../distributedlog/benchmark/WriterWorker.java | 79 ++++++++++++++------ .../benchmark/utils/ShiftableRateLimiter.java | 13 ++++ .../client/DistributedLogMultiStreamWriter.java | 28 +++++-- .../TestDistributedLogMultiStreamWriter.java | 2 +- .../service/DistributedLogServerApp.java | 1 - 7 files changed, 146 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/93bdad0e/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java index 7114dbb..ea5757d 100644 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java +++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java @@ -79,6 +79,8 @@ public class Benchmarker { int sendBufferSize = 1024 * 1024; int recvBufferSize = 1024 * 1024; boolean enableBatching = false; + int batchBufferSize = 256 * 1024; + int batchFlushIntervalMicros = 2000; final DistributedLogConfiguration conf = new DistributedLogConfiguration(); final StatsReceiver statsReceiver = new OstrichStatsReceiver(); @@ -116,7 +118,9 @@ public class Benchmarker { options.addOption("rfh", "read-from-head", false, "Read from head of the stream"); options.addOption("sb", "send-buffer", true, "Channel send buffer size, in bytes"); options.addOption("rb", "recv-buffer", true, "Channel recv buffer size, in bytes"); - options.addOption("bt", "enable-batch", true, "Enable batching on writers"); + options.addOption("bt", "enable-batch", false, "Enable batching on writers"); + options.addOption("bbs", "batch-buffer-size", true, "The batch buffer size in bytes"); + options.addOption("bfi", "batch-flush-interval", true, "The batch buffer flush interval in micros"); options.addOption("h", "help", false, "Print usage."); } @@ -217,6 +221,12 @@ public class Benchmarker { handshakeWithClientInfo = cmdline.hasOption("hsci"); readFromHead = cmdline.hasOption("rfh"); enableBatching = cmdline.hasOption("bt"); + if (cmdline.hasOption("bbs")) { + batchBufferSize = Integer.parseInt(cmdline.getOptionValue("bbs")); + } + if (cmdline.hasOption("bfi")) { + batchFlushIntervalMicros = Integer.parseInt(cmdline.getOptionValue("bfi")); + } Preconditions.checkArgument(shardId >= 0, "shardId must be >= 0"); Preconditions.checkArgument(numStreams > 0, "numStreams must be > 0"); @@ -295,7 +305,9 @@ public class Benchmarker { handshakeWithClientInfo, sendBufferSize, recvBufferSize, - enableBatching); + enableBatching, + batchBufferSize, + batchFlushIntervalMicros); } protected WriterWorker createWriteWorker( @@ -317,7 +329,9 @@ public class Benchmarker { boolean handshakeWithClientInfo, int sendBufferSize, int recvBufferSize, - boolean enableBatching) { + boolean enableBatching, + int batchBufferSize, + int batchFlushIntervalMicros) { return new WriterWorker( streamPrefix, uri, @@ -337,7 +351,9 @@ public class Benchmarker { handshakeWithClientInfo, sendBufferSize, recvBufferSize, - enableBatching); + enableBatching, + batchBufferSize, + batchFlushIntervalMicros); } Worker runDLWriter() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/93bdad0e/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java index 5b34939..62cd78f 100644 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java +++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java @@ -25,6 +25,7 @@ import com.twitter.distributedlog.AsyncLogReader; import com.twitter.distributedlog.DLSN; import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.DistributedLogManager; +import com.twitter.distributedlog.LogRecordSet; import com.twitter.distributedlog.LogRecordWithDLSN; import com.twitter.distributedlog.benchmark.thrift.Message; import com.twitter.distributedlog.client.serverset.DLZkServerSet; @@ -55,6 +56,7 @@ import java.io.IOException; import java.net.URI; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -69,6 +71,7 @@ public class ReaderWorker implements Worker { final int startStreamId; final int endStreamId; final ScheduledExecutorService executorService; + final ExecutorService callbackExecutor; final DistributedLogNamespace namespace; final DistributedLogManager[] dlms; final AsyncLogReader[] logReaders; @@ -94,7 +97,7 @@ public class ReaderWorker implements Worker { final Counter invalidRecordsCounter; final Counter outOfOrderSequenceIdCounter; - class StreamReader implements FutureEventListener<LogRecordWithDLSN>, Runnable, Gauge<Number> { + class StreamReader implements FutureEventListener<List<LogRecordWithDLSN>>, Runnable, Gauge<Number> { final int streamIdx; final String streamName; @@ -109,7 +112,31 @@ public class ReaderWorker implements Worker { } @Override - public void onSuccess(final LogRecordWithDLSN record) { + public void onSuccess(final List<LogRecordWithDLSN> records) { + for (final LogRecordWithDLSN record : records) { + if (record.isRecordSet()) { + try { + processRecordSet(record); + } catch (IOException e) { + onFailure(e); + } + } else { + processRecord(record); + } + } + readLoop(); + } + + public void processRecordSet(final LogRecordWithDLSN record) throws IOException { + LogRecordSet.Reader reader = LogRecordSet.of(record); + LogRecordWithDLSN nextRecord = reader.nextRecord(); + while (null != nextRecord) { + processRecord(nextRecord); + nextRecord = reader.nextRecord(); + } + } + + public void processRecord(final LogRecordWithDLSN record) { Message msg; try { msg = Utils.parseMessage(record.getPayload()); @@ -132,17 +159,8 @@ public class ReaderWorker implements Worker { } else { negativeDeliveryStat.registerSuccessfulEvent(-deliveryLatency); } - synchronized (this) { - if (record.getSequenceId() <= prevSequenceId - || (prevSequenceId >= 0L && record.getSequenceId() != prevSequenceId + 1)) { - outOfOrderSequenceIdCounter.inc(); - LOG.warn("Encountered decreasing sequence id for stream {} : previous = {}, current = {}", - new Object[]{streamIdx, prevSequenceId, record.getSequenceId()}); - } - prevSequenceId = record.getSequenceId(); - } + prevDLSN = record.getDlsn(); - readLoop(); } @Override @@ -162,7 +180,7 @@ public class ReaderWorker implements Worker { if (!running) { return; } - logReaders[streamIdx].readNext().addEventListener(this); + logReaders[streamIdx].readBulk(10).addEventListener(this); } @Override @@ -228,9 +246,14 @@ public class ReaderWorker implements Worker { this.outOfOrderSequenceIdCounter = this.statsLogger.getCounter("out_of_order_seq_id"); this.executorService = Executors.newScheduledThreadPool( readThreadPoolSize, new ThreadFactoryBuilder().setNameFormat("benchmark.reader-%d").build()); + this.callbackExecutor = Executors.newFixedThreadPool( + Runtime.getRuntime().availableProcessors(), + new ThreadFactoryBuilder().setNameFormat("benchmark.reader-callback-%d").build()); this.finagleNames = finagleNames; this.serverSets = createServerSets(serverSetPaths); + conf.setDeserializeRecordSetOnReads(false); + if (truncationIntervalInSeconds > 0 && (!finagleNames.isEmpty() || !serverSetPaths.isEmpty())) { // Construct client for truncation DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder() @@ -412,6 +435,7 @@ public class ReaderWorker implements Worker { } namespace.close(); SchedulerUtils.shutdownScheduler(executorService, 2, TimeUnit.MINUTES); + SchedulerUtils.shutdownScheduler(callbackExecutor, 2, TimeUnit.MINUTES); if (this.dlc != null) { this.dlc.close(); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/93bdad0e/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java index 92fc090..a587375 100644 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java +++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java @@ -18,7 +18,6 @@ package com.twitter.distributedlog.benchmark; import com.google.common.base.Preconditions; - import com.twitter.common.zookeeper.ServerSet; import com.twitter.distributedlog.DLSN; import com.twitter.distributedlog.benchmark.utils.ShiftableRateLimiter; @@ -50,6 +49,7 @@ import java.util.List; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class WriterWorker implements Worker { @@ -77,6 +77,8 @@ public class WriterWorker implements Worker { final int sendBufferSize; final int recvBufferSize; final boolean enableBatching; + final int batchBufferSize; + final int batchFlushIntervalMicros; volatile boolean running = true; @@ -86,6 +88,9 @@ public class WriterWorker implements Worker { final StatsLogger exceptionsLogger; final StatsLogger dlErrorCodeLogger; + // callback thread + final ExecutorService executor; + public WriterWorker(String streamPrefix, URI uri, int startStreamId, @@ -104,7 +109,9 @@ public class WriterWorker implements Worker { boolean handshakeWithClientInfo, int sendBufferSize, int recvBufferSize, - boolean enableBatching) { + boolean enableBatching, + int batchBufferSize, + int batchFlushIntervalMicros) { Preconditions.checkArgument(startStreamId <= endStreamId); Preconditions.checkArgument(!finagleNames.isEmpty() || !serverSetPaths.isEmpty()); this.streamPrefix = streamPrefix; @@ -129,8 +136,11 @@ public class WriterWorker implements Worker { this.sendBufferSize = sendBufferSize; this.recvBufferSize = recvBufferSize; this.enableBatching = enableBatching; + this.batchBufferSize = batchBufferSize; + this.batchFlushIntervalMicros = batchFlushIntervalMicros; this.finagleNames = finagleNames; this.serverSets = createServerSets(serverSetPaths); + this.executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // Streams streamNames = new ArrayList<String>(endStreamId - startStreamId); @@ -165,7 +175,7 @@ public class WriterWorker implements Worker { .hostConnectionCoresize(hostConnectionCoreSize) .tcpConnectTimeout(Duration$.MODULE$.fromMilliseconds(200)) .connectTimeout(Duration$.MODULE$.fromMilliseconds(200)) - .requestTimeout(Duration$.MODULE$.fromSeconds(2)) + .requestTimeout(Duration$.MODULE$.fromSeconds(10)) .sendBufferSize(sendBufferSize) .recvBufferSize(recvBufferSize); @@ -177,7 +187,7 @@ public class WriterWorker implements Worker { .thriftmux(thriftmux) .redirectBackoffStartMs(100) .redirectBackoffMaxMs(500) - .requestTimeoutMs(2000) + .requestTimeoutMs(10000) .statsReceiver(statsReceiver) .streamNameRegex("^" + streamPrefix + "_[0-9]+$") .handshakeWithClientInfo(handshakeWithClientInfo) @@ -230,9 +240,12 @@ public class WriterWorker implements Worker { return bufferList; } - class TimedRequestHandler implements FutureEventListener<DLSN> { + class TimedRequestHandler implements FutureEventListener<DLSN>, Runnable { final String streamName; final long requestMillis; + DLSN dlsn = null; + Throwable cause = null; + TimedRequestHandler(String streamName, long requestMillis) { this.streamName = streamName; @@ -240,16 +253,27 @@ public class WriterWorker implements Worker { } @Override public void onSuccess(DLSN value) { - requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis); + dlsn = value; + executor.submit(this); } @Override public void onFailure(Throwable cause) { - LOG.error("Failed to publish to {} : ", streamName, cause); - requestStat.registerFailedEvent(System.currentTimeMillis() - requestMillis); - exceptionsLogger.getCounter(cause.getClass().getName()).inc(); - if (cause instanceof DLException) { - DLException dle = (DLException) cause; - dlErrorCodeLogger.getCounter(dle.getCode().toString()).inc(); + this.cause = cause; + executor.submit(this); + } + + @Override + public void run() { + if (null != dlsn) { + requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis); + } else { + LOG.error("Failed to publish to {} : ", streamName, cause); + requestStat.registerFailedEvent(System.currentTimeMillis() - requestMillis); + exceptionsLogger.getCounter(cause.getClass().getName()).inc(); + if (cause instanceof DLException) { + DLException dle = (DLException) cause; + dlErrorCodeLogger.getCounter(dle.getCode().toString()).inc(); + } } } } @@ -259,6 +283,7 @@ public class WriterWorker implements Worker { final int idx; final DistributedLogClient dlc; DistributedLogMultiStreamWriter writer = null; + final ShiftableRateLimiter limiter; Writer(int idx) { this.idx = idx; @@ -268,20 +293,22 @@ public class WriterWorker implements Worker { .client(this.dlc) .streams(streamNames) .compressionCodec(CompressionCodec.Type.NONE) - .flushIntervalMs(20) - .bufferSize(64 * 1024) - .firstSpeculativeTimeoutMs(50) - .maxSpeculativeTimeoutMs(200) + .flushIntervalMicros(batchFlushIntervalMicros) + .bufferSize(batchBufferSize) + .firstSpeculativeTimeoutMs(9000) + .maxSpeculativeTimeoutMs(9000) + .requestTimeoutMs(10000) .speculativeBackoffMultiplier(2) .build(); } + this.limiter = rateLimiter.duplicate(); } @Override public void run() { LOG.info("Started writer {}.", idx); while (running) { - rateLimiter.getLimiter().acquire(); + this.limiter.getLimiter().acquire(); final String streamName = streamNames.get(random.nextInt(numStreams)); final long requestMillis = System.currentTimeMillis(); final ByteBuffer data = buildBuffer(requestMillis, messageSizeBytes); @@ -337,14 +364,18 @@ public class WriterWorker implements Worker { public void run() { LOG.info("Starting writer (concurrency = {}, prefix = {}, batchSize = {})", new Object[] { writeConcurrency, streamPrefix, batchSize }); - for (int i = 0; i < writeConcurrency; i++) { - Runnable writer = null; - if (batchSize > 0) { - writer = new BulkWriter(i); - } else { - writer = new Writer(i); + try { + for (int i = 0; i < writeConcurrency; i++) { + Runnable writer = null; + if (batchSize > 0) { + writer = new BulkWriter(i); + } else { + writer = new Writer(i); + } + executorService.submit(writer); } - executorService.submit(writer); + } catch (Throwable t) { + LOG.error("Unhandled exception caught", t); } } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/93bdad0e/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/ShiftableRateLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/ShiftableRateLimiter.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/ShiftableRateLimiter.java index 8f4ec10..ba51e81 100644 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/ShiftableRateLimiter.java +++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/ShiftableRateLimiter.java @@ -31,6 +31,8 @@ public class ShiftableRateLimiter implements Runnable { private final RateLimiter rateLimiter; private final ScheduledExecutorService executor; private final double initialRate, maxRate, changeRate; + private final long changeInterval; + private final TimeUnit changeIntervalUnit; private double nextRate; public ShiftableRateLimiter(double initialRate, @@ -42,11 +44,22 @@ public class ShiftableRateLimiter implements Runnable { this.maxRate = maxRate; this.changeRate = changeRate; this.nextRate = initialRate; + this.changeInterval = changeInterval; + this.changeIntervalUnit = changeIntervalUnit; this.rateLimiter = RateLimiter.create(initialRate); this.executor = Executors.newSingleThreadScheduledExecutor(); this.executor.scheduleAtFixedRate(this, changeInterval, changeInterval, changeIntervalUnit); } + public ShiftableRateLimiter duplicate() { + return new ShiftableRateLimiter( + initialRate, + maxRate, + changeRate, + changeInterval, + changeIntervalUnit); + } + @Override public void run() { this.nextRate = Math.min(nextRate + changeRate, maxRate); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/93bdad0e/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java index 9682daf..fa3dceb 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogMultiStreamWriter.java @@ -63,7 +63,7 @@ public class DistributedLogMultiStreamWriter implements Runnable { private DistributedLogClient _client = null; private List<String> _streams = null; private int _bufferSize = 16 * 1024; // 16k - private int _flushIntervalMs = 10; // 10ms + private long _flushIntervalMicros = 2000; // 2ms private CompressionCodec.Type _codec = CompressionCodec.Type.NONE; private ScheduledExecutorService _executorService = null; private long _requestTimeoutMs = 500; // 500ms @@ -120,7 +120,19 @@ public class DistributedLogMultiStreamWriter implements Runnable { * @return builder */ public Builder flushIntervalMs(int flushIntervalMs) { - this._flushIntervalMs = flushIntervalMs; + this._flushIntervalMicros = TimeUnit.MILLISECONDS.toMicros(flushIntervalMs); + return this; + } + + /** + * Set the flush interval in microseconds. + * + * @param flushIntervalMicros + * flush interval in microseconds. + * @return builder + */ + public Builder flushIntervalMicros(int flushIntervalMicros) { + this._flushIntervalMicros = flushIntervalMicros; return this; } @@ -247,7 +259,7 @@ public class DistributedLogMultiStreamWriter implements Runnable { _streams, _client, Math.min(_bufferSize, MAX_LOGRECORDSET_SIZE), - _flushIntervalMs, + _flushIntervalMicros, _requestTimeoutMs, _firstSpeculativeTimeoutMs, _maxSpeculativeTimeoutMs, @@ -341,7 +353,7 @@ public class DistributedLogMultiStreamWriter implements Runnable { private DistributedLogMultiStreamWriter(List<String> streams, DistributedLogClient client, int bufferSize, - int flushIntervalMs, + long flushIntervalMicros, long requestTimeoutMs, int firstSpecultiveTimeoutMs, int maxSpeculativeTimeoutMs, @@ -376,12 +388,12 @@ public class DistributedLogMultiStreamWriter implements Runnable { this.nextStreamId = new AtomicInteger(0); this.recordSetWriter = newRecordSetWriter(); - if (flushIntervalMs > 0) { + if (flushIntervalMicros > 0) { this.scheduler.scheduleAtFixedRate( this, - flushIntervalMs, - flushIntervalMs, - TimeUnit.MILLISECONDS); + flushIntervalMicros, + flushIntervalMicros, + TimeUnit.MICROSECONDS); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/93bdad0e/distributedlog-client/src/test/java/com/twitter/distributedlog/client/TestDistributedLogMultiStreamWriter.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/TestDistributedLogMultiStreamWriter.java b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/TestDistributedLogMultiStreamWriter.java index 4237377..dd205a6 100644 --- a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/TestDistributedLogMultiStreamWriter.java +++ b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/TestDistributedLogMultiStreamWriter.java @@ -156,7 +156,7 @@ public class TestDistributedLogMultiStreamWriter { .flushIntervalMs(1000) .scheduler(executorService) .build(); - verify(executorService, times(1)).scheduleAtFixedRate(writer, 1000, 1000, TimeUnit.MILLISECONDS); + verify(executorService, times(1)).scheduleAtFixedRate(writer, 1000000, 1000000, TimeUnit.MICROSECONDS); } @Test(timeout = 20000) http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/93bdad0e/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java index 3a9a987..a339261 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java @@ -118,7 +118,6 @@ public class DistributedLogServerApp { return ReflectionUtils.newInstance(name, StatsProvider.class); } }).or(new NullStatsProvider()); - statsProvider.start(dlConf); final DistributedLogServer server = DistributedLogServer.runServer( getOptionalStringArg(cmdline, "u"),