This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-4.8 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.8 by this push: new 432a11b Issue #1584: LedgerHandleAdv should expose asyncAddEntry variant that takes ByteBuf (LedgerHandle exposes it as public) 432a11b is described below commit 432a11b0bf02a44013c0462b22da3687ed7353cd Author: Andrey Yegorov <ayego...@salesforce.com> AuthorDate: Mon Aug 20 23:30:52 2018 -0700 Issue #1584: LedgerHandleAdv should expose asyncAddEntry variant that takes ByteBuf (LedgerHandle exposes it as public) Descriptions of the changes in this PR: - exposed asyncAddEntry as public, similarly to other variants. - fixed ByteBuf retention ### Motivation It's useful to have this exposed as public for clients to make use of netty's allocator and pass ByteBuf directly. ### Changes exposed asyncAddEntry as public, similarly to other variants. Master Issue: #1584 Author: Andrey Yegorov <ayego...@salesforce.com> Reviewers: Enrico Olivelli <eolive...@gmail.com>, Sijie Guo <si...@apache.org> This closes #1585 from dlg99/feature/issue-1584-LedgerHandleAdv-public-addEntry-bytebuff, closes #1584 (cherry picked from commit 299fb58deed3dc284342716d52b7f918cc6cefc4) Signed-off-by: Sijie Guo <si...@apache.org> --- .../org/apache/bookkeeper/client/LedgerHandle.java | 21 ++++- .../apache/bookkeeper/client/LedgerHandleAdv.java | 19 +++- .../org/apache/bookkeeper/client/PendingAddOp.java | 7 +- .../bookkeeper/proto/checksum/DigestManager.java | 2 + .../bookkeeper/client/BookieWriteLedgerTest.java | 101 +++++++++++++++++++++ .../apache/bookkeeper/client/DeferredSyncTest.java | 36 ++++---- .../distributedlog/EnvelopedEntryWriter.java | 2 +- 7 files changed, 165 insertions(+), 23 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index a646651..bd81b73 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -1067,7 +1067,6 @@ public class LedgerHandle implements WriteHandle { } public void asyncAddEntry(ByteBuf data, final AddCallback cb, final Object ctx) { - data.retain(); PendingAddOp op = PendingAddOp.create(this, data, writeFlags, cb, ctx); doAsyncAddEntry(op); } @@ -1126,6 +1125,26 @@ public class LedgerHandle implements WriteHandle { } /** + * Add entry asynchronously to an open ledger, using an offset and range. + * This can be used only with {@link LedgerHandleAdv} returned through + * ledgers created with {@link createLedgerAdv(int, int, int, DigestType, byte[])}. + * + * @param entryId + * entryId of the entry to add. + * @param data + * io.netty.buffer.ByteBuf of bytes to be written + * @param cb + * object implementing callbackinterface + * @param ctx + * some control object + */ + public void asyncAddEntry(final long entryId, ByteBuf data, + final AddCallbackWithLatency cb, final Object ctx) { + LOG.error("To use this feature Ledger must be created with createLedgerAdv() interface."); + cb.addCompleteWithLatency(BKException.Code.IllegalOpException, LedgerHandle.this, entryId, 0, ctx); + } + + /** * {@inheritDoc} */ @Override diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java index 153ceeb..48beaa7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java @@ -177,8 +177,23 @@ public class LedgerHandleAdv extends LedgerHandle implements WriteAdvHandle { asyncAddEntry(entryId, Unpooled.wrappedBuffer(data, offset, length), cb, ctx); } - private void asyncAddEntry(final long entryId, ByteBuf data, - final AddCallbackWithLatency cb, final Object ctx) { + /** + * Add entry asynchronously to an open ledger, using an offset and range. + * This can be used only with {@link LedgerHandleAdv} returned through + * ledgers created with {@link createLedgerAdv(int, int, int, DigestType, byte[])}. + * + * @param entryId + * entryId of the entry to add. + * @param data + * io.netty.buffer.ByteBuf of bytes to be written + * @param cb + * object implementing callbackinterface + * @param ctx + * some control object + */ + @Override + public void asyncAddEntry(final long entryId, ByteBuf data, + final AddCallbackWithLatency cb, final Object ctx) { PendingAddOp op = PendingAddOp.create(this, data, writeFlags, cb, ctx); op.setEntryId(entryId); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java index 5f17226..afcaed6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java @@ -243,6 +243,8 @@ class PendingAddOp extends SafeRunnable implements WriteCallback { this.toSend = lh.macManager.computeDigestAndPackageForSending( entryId, lh.lastAddConfirmed, currentLedgerLength, payload); + // ownership of RefCounted ByteBuf was passed to computeDigestAndPackageForSending + payload = null; // We are about to send. Check if we need to make an ensemble change // becasue of delayed write errors @@ -456,7 +458,10 @@ class PendingAddOp extends SafeRunnable implements WriteCallback { private void recyclePendAddOpObject() { entryId = LedgerHandle.INVALID_ENTRY_ID; currentLedgerLength = -1; - payload = null; + if (payload != null) { + ReferenceCountUtil.release(payload); + payload = null; + } cb = null; ctx = null; ackSet.recycle(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java index 2627db8..4c174a8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java @@ -20,6 +20,7 @@ package org.apache.bookkeeper.proto.checksum; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCountUtil; import java.security.GeneralSecurityException; @@ -127,6 +128,7 @@ public abstract class DigestManager { populateValueAndReset(sendBuffer); sendBuffer.writeBytes(data, data.readerIndex(), data.readableBytes()); + ReferenceCountUtil.release(data); return ByteBufList.get(sendBuffer); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java index 93683c8..85376ea 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java @@ -28,8 +28,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; + +import com.google.common.collect.Lists; +import io.netty.buffer.AbstractByteBufAllocator; import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; +import io.netty.buffer.UnpooledByteBufAllocator; import java.io.IOException; import java.nio.ByteBuffer; @@ -43,6 +48,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.bookkeeper.bookie.Bookie; @@ -1294,6 +1300,101 @@ public class BookieWriteLedgerTest extends lh.close(); } + @Test + @SuppressWarnings("unchecked") + public void testLedgerCreateAdvByteBufRefCnt() throws Exception { + long ledgerId = rng.nextLong(); + ledgerId &= Long.MAX_VALUE; + if (!baseConf.getLedgerManagerFactoryClass().equals(LongHierarchicalLedgerManagerFactory.class)) { + // since LongHierarchicalLedgerManager supports ledgerIds of + // decimal length upto 19 digits but other + // LedgerManagers only upto 10 decimals + ledgerId %= 9999999999L; + } + + final LedgerHandle lh = bkc.createLedgerAdv(ledgerId, 5, 3, 2, digestType, ledgerPassword, null); + + final List<AbstractByteBufAllocator> allocs = Lists.newArrayList( + new PooledByteBufAllocator(true), + new PooledByteBufAllocator(false), + new UnpooledByteBufAllocator(true), + new UnpooledByteBufAllocator(false)); + + long entryId = 0; + for (AbstractByteBufAllocator alloc: allocs) { + final ByteBuf data = alloc.buffer(10); + data.writeBytes(("fragment0" + entryId).getBytes()); + assertEquals("ref count on ByteBuf should be 1", 1, data.refCnt()); + + CompletableFuture<Integer> cf = new CompletableFuture<>(); + lh.asyncAddEntry(entryId, data, (rc, handle, eId, qwcLatency, ctx) -> { + CompletableFuture<Integer> future = (CompletableFuture<Integer>) ctx; + future.complete(rc); + }, cf); + + int rc = cf.get(); + assertEquals("rc code is OK", BKException.Code.OK, rc); + + for (int i = 0; i < 10; i++) { + if (data.refCnt() == 0) { + break; + } + TimeUnit.MILLISECONDS.sleep(250); // recycler runs asynchronously + } + assertEquals("writing entry with id " + entryId + ", ref count on ByteBuf should be 0 ", + 0, data.refCnt()); + + org.apache.bookkeeper.client.api.LedgerEntry e = lh.read(entryId, entryId).getEntry(entryId); + assertEquals("entry data is correct", "fragment0" + entryId, new String(e.getEntryBytes())); + entryId++; + } + + bkc.deleteLedger(lh.ledgerId); + } + + @Test + @SuppressWarnings("unchecked") + public void testLedgerCreateByteBufRefCnt() throws Exception { + final LedgerHandle lh = bkc.createLedger(5, 3, 2, digestType, ledgerPassword, null); + + final List<AbstractByteBufAllocator> allocs = Lists.newArrayList( + new PooledByteBufAllocator(true), + new PooledByteBufAllocator(false), + new UnpooledByteBufAllocator(true), + new UnpooledByteBufAllocator(false)); + + int entryId = 0; + for (AbstractByteBufAllocator alloc: allocs) { + final ByteBuf data = alloc.buffer(10); + data.writeBytes(("fragment0" + entryId).getBytes()); + assertEquals("ref count on ByteBuf should be 1", 1, data.refCnt()); + + CompletableFuture<Integer> cf = new CompletableFuture<>(); + lh.asyncAddEntry(data, (rc, handle, eId, ctx) -> { + CompletableFuture<Integer> future = (CompletableFuture<Integer>) ctx; + future.complete(rc); + }, cf); + + int rc = cf.get(); + assertEquals("rc code is OK", BKException.Code.OK, rc); + + for (int i = 0; i < 10; i++) { + if (data.refCnt() == 0) { + break; + } + TimeUnit.MILLISECONDS.sleep(250); // recycler runs asynchronously + } + assertEquals("writing entry with id " + entryId + ", ref count on ByteBuf should be 0 ", + 0, data.refCnt()); + + org.apache.bookkeeper.client.api.LedgerEntry e = lh.read(entryId, entryId).getEntry(entryId); + assertEquals("entry data is correct", "fragment0" + entryId, new String(e.getEntryBytes())); + entryId++; + } + + bkc.deleteLedger(lh.ledgerId); + } + private void readEntries(LedgerHandle lh, List<byte[]> entries) throws InterruptedException, BKException { ls = lh.readEntries(0, numEntriesToWrite - 1); int index = 0; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java index aaa7645..95dec9c 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/DeferredSyncTest.java @@ -50,9 +50,9 @@ public class DeferredSyncTest extends MockBookKeeperTestCase { .withWriteFlags(WriteFlag.DEFERRED_SYNC) .execute())) { for (int i = 0; i < NUM_ENTRIES - 1; i++) { - result(wh.appendAsync(DATA)); + result(wh.appendAsync(DATA.retainedDuplicate())); } - long lastEntryID = result(wh.appendAsync(DATA)); + long lastEntryID = result(wh.appendAsync(DATA.retainedDuplicate())); assertEquals(NUM_ENTRIES - 1, lastEntryID); assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed()); assertEquals(-1, wh.getLastAddConfirmed()); @@ -69,9 +69,9 @@ public class DeferredSyncTest extends MockBookKeeperTestCase { .withWriteFlags(WriteFlag.DEFERRED_SYNC) .execute())) { for (int i = 0; i < NUM_ENTRIES - 1; i++) { - result(wh.appendAsync(DATA)); + result(wh.appendAsync(DATA.retainedDuplicate())); } - long lastEntryID = result(wh.appendAsync(DATA)); + long lastEntryID = result(wh.appendAsync(DATA.retainedDuplicate())); assertEquals(NUM_ENTRIES - 1, lastEntryID); assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed()); assertEquals(-1, wh.getLastAddConfirmed()); @@ -90,20 +90,20 @@ public class DeferredSyncTest extends MockBookKeeperTestCase { .withWriteFlags(WriteFlag.DEFERRED_SYNC) .makeAdv() .execute())) { - CompletableFuture<Long> w0 = wh.writeAsync(0, DATA); - CompletableFuture<Long> w2 = wh.writeAsync(2, DATA); - CompletableFuture<Long> w3 = wh.writeAsync(3, DATA); + CompletableFuture<Long> w0 = wh.writeAsync(0, DATA.retainedDuplicate()); + CompletableFuture<Long> w2 = wh.writeAsync(2, DATA.retainedDuplicate()); + CompletableFuture<Long> w3 = wh.writeAsync(3, DATA.retainedDuplicate()); result(w0); result(wh.force()); assertEquals(0, wh.getLastAddConfirmed()); - CompletableFuture<Long> w1 = wh.writeAsync(1, DATA); + CompletableFuture<Long> w1 = wh.writeAsync(1, DATA.retainedDuplicate()); result(w3); assertTrue(w1.isDone()); assertTrue(w2.isDone()); - CompletableFuture<Long> w5 = wh.writeAsync(5, DATA); + CompletableFuture<Long> w5 = wh.writeAsync(5, DATA.retainedDuplicate()); result(wh.force()); assertEquals(3, wh.getLastAddConfirmed()); - wh.writeAsync(4, DATA); + wh.writeAsync(4, DATA.retainedDuplicate()); result(w5); result(wh.force()); assertEquals(5, wh.getLastAddConfirmed()); @@ -120,9 +120,9 @@ public class DeferredSyncTest extends MockBookKeeperTestCase { .withWriteFlags(WriteFlag.DEFERRED_SYNC) .execute())) { for (int i = 0; i < NUM_ENTRIES - 1; i++) { - result(wh.appendAsync(DATA)); + result(wh.appendAsync(DATA.retainedDuplicate())); } - long lastEntryID = result(wh.appendAsync(DATA)); + long lastEntryID = result(wh.appendAsync(DATA.retainedDuplicate())); assertEquals(NUM_ENTRIES - 1, lastEntryID); assertEquals(NUM_ENTRIES - 1, wh.getLastAddPushed()); assertEquals(-1, wh.getLastAddConfirmed()); @@ -131,7 +131,7 @@ public class DeferredSyncTest extends MockBookKeeperTestCase { killBookie(bookieAddress); // write should succeed (we still have 2 bookies out of 3) - result(wh.appendAsync(DATA)); + result(wh.appendAsync(DATA.retainedDuplicate())); // force cannot go, it must be acknowledged by all of the bookies in the ensamble try { @@ -154,9 +154,9 @@ public class DeferredSyncTest extends MockBookKeeperTestCase { .withWriteFlags(WriteFlag.DEFERRED_SYNC) .execute())) { for (int i = 0; i < NUM_ENTRIES - 1; i++) { - result(wh.appendAsync(DATA)); + result(wh.appendAsync(DATA.retainedDuplicate())); } - long lastEntryIdBeforeSuspend = result(wh.appendAsync(DATA)); + long lastEntryIdBeforeSuspend = result(wh.appendAsync(DATA.retainedDuplicate())); assertEquals(NUM_ENTRIES - 1, lastEntryIdBeforeSuspend); assertEquals(-1, wh.getLastAddConfirmed()); @@ -170,7 +170,7 @@ public class DeferredSyncTest extends MockBookKeeperTestCase { assertEquals(-1, wh.getLastAddConfirmed()); // send an entry and receive ack - long lastEntry = wh.append(DATA); + long lastEntry = wh.append(DATA.retainedDuplicate()); // receive the ack for forceLedger resumeBookieWriteAcks(bookieAddress); @@ -195,7 +195,7 @@ public class DeferredSyncTest extends MockBookKeeperTestCase { .withWriteFlags(WriteFlag.DEFERRED_SYNC) .execute())) { for (int i = 0; i < NUM_ENTRIES - 1; i++) { - wh.append(DATA); + wh.append(DATA.retainedDuplicate()); } assertEquals(1, availableBookies.size()); @@ -207,7 +207,7 @@ public class DeferredSyncTest extends MockBookKeeperTestCase { try { // we cannot switch to the new bookie with DEFERRED_SYNC - wh.append(DATA); + wh.append(DATA.retainedDuplicate()); fail("since ensemble change is disable we cannot be able to write any more"); } catch (BKException.BKWriteException ex) { // expected diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java index 98c88f7..733593a 100644 --- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java +++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java @@ -165,7 +165,7 @@ class EnvelopedEntryWriter implements Writer { if (null == finalizedBuffer) { finalizedBuffer = finalizeBuffer(); } - return finalizedBuffer.slice(); + return finalizedBuffer.retainedSlice(); } private ByteBuf finalizeBuffer() {