This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a62eb8  Fixed count of messages per thread in managed ledger writer 
(#1438)
9a62eb8 is described below

commit 9a62eb801c9e2b7412ab75d4091027cd68fce971
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Sun Mar 25 16:57:24 2018 -0700

    Fixed count of messages per thread in managed ledger writer (#1438)
---
 .../pulsar/testclient/ManagedLedgerWriter.java     | 62 ++++++++++++----------
 1 file changed, 35 insertions(+), 27 deletions(-)

diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
index bf82907..e975b37 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
@@ -26,10 +26,10 @@ import com.beust.jcommander.ParameterException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.RateLimiter;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
@@ -37,11 +37,11 @@ import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.LongAdder;
@@ -153,7 +153,7 @@ public class ManagedLedgerWriter {
         log.info("Starting Pulsar managed-ledger perf writer with config: {}", 
w.writeValueAsString(arguments));
 
         byte[] payloadData = new byte[arguments.msgSize];
-        ByteBuf payloadBuffer = Unpooled.directBuffer(arguments.msgSize);
+        ByteBuf payloadBuffer = 
PooledByteBufAllocator.DEFAULT.directBuffer(arguments.msgSize);
         payloadBuffer.writerIndex(arguments.msgSize);
 
         // Now processing command line arguments
@@ -216,40 +216,46 @@ public class ManagedLedgerWriter {
         Collections.shuffle(managedLedgers);
         AtomicBoolean isDone = new AtomicBoolean();
 
-        AddEntryCallback addEntryCallback = new AddEntryCallback() {
-            @Override
-            public void addComplete(Position position, Object ctx) {
-                long sendTime = (Long) (ctx);
-                messagesSent.increment();
-                bytesSent.add(payloadData.length);
-
-                long latencyMicros = NANOSECONDS.toMicros(System.nanoTime() - 
sendTime);
-                recorder.recordValue(latencyMicros);
-                cumulativeRecorder.recordValue(latencyMicros);
-            }
-
-            @Override
-            public void addFailed(ManagedLedgerException exception, Object 
ctx) {
-                log.warn("Write error on message", exception);
-                System.exit(-1);
-            }
-        };
-
         List<List<ManagedLedger>> managedLedgersPerThread = 
Lists.partition(managedLedgers,
                 Math.max(1, managedLedgers.size() / arguments.numThreads));
 
         for (int i = 0; i < arguments.numThreads; i++) {
             List<ManagedLedger> managedLedgersForThisThread = 
managedLedgersPerThread.get(i);
             int nunManagedLedgersForThisThread = 
managedLedgersForThisThread.size();
+            long numMessagesForThisThread = arguments.numMessages / 
arguments.numThreads;
+            int maxOutstandingForThisThread = arguments.maxOutstanding;
 
             executor.submit(() -> {
                 try {
-                    double msgRate = arguments.msgRate / (double) 
arguments.numThreads;
-                    RateLimiter rateLimiter = RateLimiter.create(msgRate);
+                    final double msgRate = arguments.msgRate / (double) 
arguments.numThreads;
+                    final RateLimiter rateLimiter = 
RateLimiter.create(msgRate);
 
                     // Acquire 1 sec worth of messages to have a slower ramp-up
                     rateLimiter.acquire((int) msgRate);
-                    long startTime = System.currentTimeMillis();
+                    final long startTime = System.currentTimeMillis();
+
+                    final Semaphore semaphore = new 
Semaphore(maxOutstandingForThisThread);
+
+                    final AddEntryCallback addEntryCallback = new 
AddEntryCallback() {
+                        @Override
+                        public void addComplete(Position position, Object ctx) 
{
+                            long sendTime = (Long) (ctx);
+                            messagesSent.increment();
+                            bytesSent.add(payloadData.length);
+
+                            long latencyMicros = 
NANOSECONDS.toMicros(System.nanoTime() - sendTime);
+                            recorder.recordValue(latencyMicros);
+                            cumulativeRecorder.recordValue(latencyMicros);
+
+                            semaphore.release();
+                        }
+
+                        @Override
+                        public void addFailed(ManagedLedgerException 
exception, Object ctx) {
+                            log.warn("Write error on message", exception);
+                            System.exit(-1);
+                        }
+                    };
 
                     // Send messages on all topics/producers
                     long totalSent = 0;
@@ -265,8 +271,8 @@ public class ManagedLedgerWriter {
                                 }
                             }
 
-                            if (arguments.numMessages > 0) {
-                                if (totalSent++ >= arguments.numMessages) {
+                            if (numMessagesForThisThread > 0) {
+                                if (totalSent++ >= numMessagesForThisThread) {
                                     log.info("------------------- DONE 
-----------------------");
                                     printAggregatedStats();
                                     isDone.set(true);
@@ -274,6 +280,8 @@ public class ManagedLedgerWriter {
                                     System.exit(0);
                                 }
                             }
+
+                            semaphore.acquire();
                             rateLimiter.acquire();
 
                             final long sendTime = System.nanoTime();

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to