eolivelli closed pull request #649: Pool AddCompletions
URL: https://github.com/apache/bookkeeper/pull/649
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index c544701be..ffa030b56 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -562,8 +562,8 @@ void addEntry(final long ledgerId, byte[] masterKey, final 
long entryId, ByteBuf
         }
 
         completionObjects.put(completionKey,
-                              new AddCompletion(completionKey,
-                                                cb, ctx, ledgerId, entryId));
+                              acquireAddCompletion(completionKey,
+                                                   cb, ctx, ledgerId, 
entryId));
         final Channel c = channel;
         if (c == null) {
             // usually checked in writeAndFlush, but we have extra check
@@ -1162,14 +1162,14 @@ public void operationComplete(Future<Channel> future) 
throws Exception {
 
     // visible for testing
     abstract class CompletionValue {
-        final Object ctx;
-        protected final long ledgerId;
-        protected final long entryId;
-        private final long startTime;
         private final OpStatsLogger opLogger;
         private final OpStatsLogger timeoutOpLogger;
-        protected final Timeout timeout;
         private final String operationName;
+        protected Object ctx;
+        protected long ledgerId;
+        protected long entryId;
+        protected long startTime;
+        protected Timeout timeout;
 
         public CompletionValue(String operationName,
                                Object ctx,
@@ -1192,8 +1192,9 @@ private long latency() {
         }
 
         void cancelTimeoutAndLogOp(int rc) {
-            if (null != timeout) {
-                timeout.cancel();
+            Timeout t = timeout;
+            if (null != t) {
+                t.cancel();
             }
 
             if (rc != BKException.Code.OK) {
@@ -1580,28 +1581,55 @@ public void 
handleV3Response(BookkeeperProtocol.Response response) {
         }
     }
 
+    private final Recycler<AddCompletion> ADD_COMPLETION_RECYCLER = new 
Recycler<AddCompletion>() {
+            protected AddCompletion newObject(Recycler.Handle<AddCompletion> 
handle) {
+                return new AddCompletion(handle);
+            }
+        };
+
+    AddCompletion acquireAddCompletion(final CompletionKey key,
+                                       final WriteCallback originalCallback,
+                                       final Object originalCtx,
+                                       final long ledgerId, final long 
entryId) {
+        AddCompletion completion = ADD_COMPLETION_RECYCLER.get();
+        completion.reset(key, originalCallback, originalCtx, ledgerId, 
entryId);
+        return completion;
+    }
+
     // visible for testing
-    class AddCompletion extends CompletionValue {
-        final WriteCallback cb;
-
-        public AddCompletion(final CompletionKey key,
-                             final WriteCallback originalCallback,
-                             final Object originalCtx,
-                             final long ledgerId, final long entryId) {
-            super("Add", originalCtx, ledgerId, entryId,
-                  addEntryOpLogger, addTimeoutOpLogger,
-                  scheduleTimeout(key, addEntryTimeout));
-            this.cb = new WriteCallback() {
-                @Override
-                public void writeComplete(int rc, long ledgerId, long entryId,
-                                          BookieSocketAddress addr,
-                                          Object ctx) {
-                    cancelTimeoutAndLogOp(rc);
-                    originalCallback.writeComplete(rc, ledgerId, entryId,
-                                                   addr, originalCtx);
-                    key.release();
-                }
-            };
+    class AddCompletion extends CompletionValue implements WriteCallback {
+        final Recycler.Handle<AddCompletion> handle;
+
+        CompletionKey key = null;
+        WriteCallback originalCallback = null;
+
+        AddCompletion(Recycler.Handle<AddCompletion> handle) {
+            super("Add", null, -1, -1,
+                  addEntryOpLogger, addTimeoutOpLogger, null);
+            this.handle = handle;
+        }
+
+        void reset(final CompletionKey key,
+                   final WriteCallback originalCallback,
+                   final Object originalCtx,
+                   final long ledgerId, final long entryId) {
+            this.key = key;
+            this.originalCallback = originalCallback;
+            this.ctx = originalCtx;
+            this.ledgerId = ledgerId;
+            this.entryId = entryId;
+            this.startTime = MathUtils.nowInNano();
+            this.timeout = scheduleTimeout(key, addEntryTimeout);
+        }
+
+        @Override
+        public void writeComplete(int rc, long ledgerId, long entryId,
+                                  BookieSocketAddress addr,
+                                  Object ctx) {
+            cancelTimeoutAndLogOp(rc);
+            originalCallback.writeComplete(rc, ledgerId, entryId, addr, ctx);
+            key.release();
+            handle.recycle(this);
         }
 
         @Override
@@ -1612,7 +1640,7 @@ public void errorOut() {
         @Override
         public void errorOut(final int rc) {
             errorOutAndRunCallback(
-                    () -> cb.writeComplete(rc, ledgerId, entryId, addr, ctx));
+                    () -> writeComplete(rc, ledgerId, entryId, addr, ctx));
         }
 
         @Override
@@ -1638,7 +1666,7 @@ private void handleResponse(long ledgerId, long entryId,
                                          BKException.Code.WriteException,
                                          "ledger", ledgerId,
                                          "entry", entryId);
-            cb.writeComplete(rc, ledgerId, entryId, addr, ctx);
+            writeComplete(rc, ledgerId, entryId, addr, ctx);
         }
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to