eolivelli closed pull request #649: Pool AddCompletions URL: https://github.com/apache/bookkeeper/pull/649
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index c544701be..ffa030b56 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -562,8 +562,8 @@ void addEntry(final long ledgerId, byte[] masterKey, final long entryId, ByteBuf } completionObjects.put(completionKey, - new AddCompletion(completionKey, - cb, ctx, ledgerId, entryId)); + acquireAddCompletion(completionKey, + cb, ctx, ledgerId, entryId)); final Channel c = channel; if (c == null) { // usually checked in writeAndFlush, but we have extra check @@ -1162,14 +1162,14 @@ public void operationComplete(Future<Channel> future) throws Exception { // visible for testing abstract class CompletionValue { - final Object ctx; - protected final long ledgerId; - protected final long entryId; - private final long startTime; private final OpStatsLogger opLogger; private final OpStatsLogger timeoutOpLogger; - protected final Timeout timeout; private final String operationName; + protected Object ctx; + protected long ledgerId; + protected long entryId; + protected long startTime; + protected Timeout timeout; public CompletionValue(String operationName, Object ctx, @@ -1192,8 +1192,9 @@ private long latency() { } void cancelTimeoutAndLogOp(int rc) { - if (null != timeout) { - timeout.cancel(); + Timeout t = timeout; + if (null != t) { + t.cancel(); } if (rc != BKException.Code.OK) { @@ -1580,28 +1581,55 @@ public void handleV3Response(BookkeeperProtocol.Response response) { } } + private final Recycler<AddCompletion> ADD_COMPLETION_RECYCLER = new Recycler<AddCompletion>() { + protected AddCompletion newObject(Recycler.Handle<AddCompletion> handle) { + return new AddCompletion(handle); + } + }; + + AddCompletion acquireAddCompletion(final CompletionKey key, + final WriteCallback originalCallback, + final Object originalCtx, + final long ledgerId, final long entryId) { + AddCompletion completion = ADD_COMPLETION_RECYCLER.get(); + completion.reset(key, originalCallback, originalCtx, ledgerId, entryId); + return completion; + } + // visible for testing - class AddCompletion extends CompletionValue { - final WriteCallback cb; - - public AddCompletion(final CompletionKey key, - final WriteCallback originalCallback, - final Object originalCtx, - final long ledgerId, final long entryId) { - super("Add", originalCtx, ledgerId, entryId, - addEntryOpLogger, addTimeoutOpLogger, - scheduleTimeout(key, addEntryTimeout)); - this.cb = new WriteCallback() { - @Override - public void writeComplete(int rc, long ledgerId, long entryId, - BookieSocketAddress addr, - Object ctx) { - cancelTimeoutAndLogOp(rc); - originalCallback.writeComplete(rc, ledgerId, entryId, - addr, originalCtx); - key.release(); - } - }; + class AddCompletion extends CompletionValue implements WriteCallback { + final Recycler.Handle<AddCompletion> handle; + + CompletionKey key = null; + WriteCallback originalCallback = null; + + AddCompletion(Recycler.Handle<AddCompletion> handle) { + super("Add", null, -1, -1, + addEntryOpLogger, addTimeoutOpLogger, null); + this.handle = handle; + } + + void reset(final CompletionKey key, + final WriteCallback originalCallback, + final Object originalCtx, + final long ledgerId, final long entryId) { + this.key = key; + this.originalCallback = originalCallback; + this.ctx = originalCtx; + this.ledgerId = ledgerId; + this.entryId = entryId; + this.startTime = MathUtils.nowInNano(); + this.timeout = scheduleTimeout(key, addEntryTimeout); + } + + @Override + public void writeComplete(int rc, long ledgerId, long entryId, + BookieSocketAddress addr, + Object ctx) { + cancelTimeoutAndLogOp(rc); + originalCallback.writeComplete(rc, ledgerId, entryId, addr, ctx); + key.release(); + handle.recycle(this); } @Override @@ -1612,7 +1640,7 @@ public void errorOut() { @Override public void errorOut(final int rc) { errorOutAndRunCallback( - () -> cb.writeComplete(rc, ledgerId, entryId, addr, ctx)); + () -> writeComplete(rc, ledgerId, entryId, addr, ctx)); } @Override @@ -1638,7 +1666,7 @@ private void handleResponse(long ledgerId, long entryId, BKException.Code.WriteException, "ledger", ledgerId, "entry", entryId); - cb.writeComplete(rc, ledgerId, entryId, addr, ctx); + writeComplete(rc, ledgerId, entryId, addr, ctx); } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services