This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 9a62eb8 Fixed count of messages per thread in managed ledger writer (#1438) 9a62eb8 is described below commit 9a62eb801c9e2b7412ab75d4091027cd68fce971 Author: Matteo Merli <mme...@apache.org> AuthorDate: Sun Mar 25 16:57:24 2018 -0700 Fixed count of messages per thread in managed ledger writer (#1438) --- .../pulsar/testclient/ManagedLedgerWriter.java | 62 ++++++++++++---------- 1 file changed, 35 insertions(+), 27 deletions(-) diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java index bf82907..e975b37 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java @@ -26,10 +26,10 @@ import com.beust.jcommander.ParameterException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.util.concurrent.RateLimiter; import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.util.concurrent.DefaultThreadFactory; @@ -37,11 +37,11 @@ import java.text.DecimalFormat; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.LongAdder; @@ -153,7 +153,7 @@ public class ManagedLedgerWriter { log.info("Starting Pulsar managed-ledger perf writer with config: {}", w.writeValueAsString(arguments)); byte[] payloadData = new byte[arguments.msgSize]; - ByteBuf payloadBuffer = Unpooled.directBuffer(arguments.msgSize); + ByteBuf payloadBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(arguments.msgSize); payloadBuffer.writerIndex(arguments.msgSize); // Now processing command line arguments @@ -216,40 +216,46 @@ public class ManagedLedgerWriter { Collections.shuffle(managedLedgers); AtomicBoolean isDone = new AtomicBoolean(); - AddEntryCallback addEntryCallback = new AddEntryCallback() { - @Override - public void addComplete(Position position, Object ctx) { - long sendTime = (Long) (ctx); - messagesSent.increment(); - bytesSent.add(payloadData.length); - - long latencyMicros = NANOSECONDS.toMicros(System.nanoTime() - sendTime); - recorder.recordValue(latencyMicros); - cumulativeRecorder.recordValue(latencyMicros); - } - - @Override - public void addFailed(ManagedLedgerException exception, Object ctx) { - log.warn("Write error on message", exception); - System.exit(-1); - } - }; - List<List<ManagedLedger>> managedLedgersPerThread = Lists.partition(managedLedgers, Math.max(1, managedLedgers.size() / arguments.numThreads)); for (int i = 0; i < arguments.numThreads; i++) { List<ManagedLedger> managedLedgersForThisThread = managedLedgersPerThread.get(i); int nunManagedLedgersForThisThread = managedLedgersForThisThread.size(); + long numMessagesForThisThread = arguments.numMessages / arguments.numThreads; + int maxOutstandingForThisThread = arguments.maxOutstanding; executor.submit(() -> { try { - double msgRate = arguments.msgRate / (double) arguments.numThreads; - RateLimiter rateLimiter = RateLimiter.create(msgRate); + final double msgRate = arguments.msgRate / (double) arguments.numThreads; + final RateLimiter rateLimiter = RateLimiter.create(msgRate); // Acquire 1 sec worth of messages to have a slower ramp-up rateLimiter.acquire((int) msgRate); - long startTime = System.currentTimeMillis(); + final long startTime = System.currentTimeMillis(); + + final Semaphore semaphore = new Semaphore(maxOutstandingForThisThread); + + final AddEntryCallback addEntryCallback = new AddEntryCallback() { + @Override + public void addComplete(Position position, Object ctx) { + long sendTime = (Long) (ctx); + messagesSent.increment(); + bytesSent.add(payloadData.length); + + long latencyMicros = NANOSECONDS.toMicros(System.nanoTime() - sendTime); + recorder.recordValue(latencyMicros); + cumulativeRecorder.recordValue(latencyMicros); + + semaphore.release(); + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + log.warn("Write error on message", exception); + System.exit(-1); + } + }; // Send messages on all topics/producers long totalSent = 0; @@ -265,8 +271,8 @@ public class ManagedLedgerWriter { } } - if (arguments.numMessages > 0) { - if (totalSent++ >= arguments.numMessages) { + if (numMessagesForThisThread > 0) { + if (totalSent++ >= numMessagesForThisThread) { log.info("------------------- DONE -----------------------"); printAggregatedStats(); isDone.set(true); @@ -274,6 +280,8 @@ public class ManagedLedgerWriter { System.exit(0); } } + + semaphore.acquire(); rateLimiter.acquire(); final long sendTime = System.nanoTime(); -- To stop receiving notification emails like this one, please contact mme...@apache.org.