This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push: new 8c10e56 V2 decoder marks the reads index of the payload on write 8c10e56 is described below commit 8c10e56db285788eeb17a4290a417a0745114a77 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Mon Feb 19 18:26:03 2018 -0800 V2 decoder marks the reads index of the payload on write This means that any storage implementation that receives that payload, can reset the reader index, without jumping into bytes it knows nothing about. This was the source of an issue when using V2 protocol with DbLedgerStorage where the data could not be successfully read back from the ledger. Author: Ivan Kelly <iv...@apache.org> Reviewers: Matteo Merli <mme...@apache.org>, Sijie Guo <si...@apache.org> This closes #1180 from ivankelly/broken-v2-rocks --- .../bookie/storage/ldb/DbLedgerStorage.java | 5 ++--- .../bookkeeper/proto/BookieProtoEncoding.java | 3 +++ .../storage/ldb/DbLedgerStorageBookieTest.java | 23 ++++++++++++++++++++++ 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java index ec7a3b7..59d2b1b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java @@ -308,9 +308,8 @@ public class DbLedgerStorage implements CompactableLedgerStorage { public long addEntry(ByteBuf entry) throws IOException { long startTime = MathUtils.nowInNano(); - long ledgerId = entry.readLong(); - long entryId = entry.readLong(); - entry.resetReaderIndex(); + long ledgerId = entry.getLong(entry.readerIndex()); + long entryId = entry.getLong(entry.readerIndex() + 8); if (log.isDebugEnabled()) { log.debug("Add entry. {}@{}", ledgerId, entryId); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index 24ef117..375caba 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -167,6 +167,9 @@ public class BookieProtoEncoding { // Read ledger and entry id without advancing the reader index ledgerId = packet.getLong(packet.readerIndex()); entryId = packet.getLong(packet.readerIndex() + 8); + // mark the reader index so that any resets will return to the + // start of the payload + packet.markReaderIndex(); return BookieProtocol.ParsedAddRequest.create( version, ledgerId, entryId, flags, masterKey, packet.retain()); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java index da7f32a..a2162bf 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageBookieTest.java @@ -22,15 +22,21 @@ package org.apache.bookkeeper.bookie.storage.ldb; import static org.junit.Assert.assertEquals; +import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Unit test for {@link DbLedgerStorageBookieTest}. */ public class DbLedgerStorageBookieTest extends BookKeeperClusterTestCase { + static final Logger LOG = LoggerFactory.getLogger(DbLedgerStorageBookieTest.class); public DbLedgerStorageBookieTest() { super(1); @@ -49,4 +55,21 @@ public class DbLedgerStorageBookieTest extends BookKeeperClusterTestCase { assertEquals(0, lh2.getLength()); assertEquals(-1, lh2.getLastAddConfirmed()); } + + @Test + public void testV2ReadWrite() throws Exception { + ClientConfiguration conf = new ClientConfiguration(); + conf.setUseV2WireProtocol(true); + conf.setZkServers(zkUtil.getZooKeeperConnectString()); + + BookKeeper bkc = new BookKeeper(conf); + LedgerHandle lh1 = bkc.createLedger(1, 1, DigestType.CRC32, new byte[0]); + lh1.addEntry("Foobar".getBytes()); + lh1.close(); + + LedgerHandle lh2 = bkc.openLedger(lh1.getId(), DigestType.CRC32, new byte[0]); + assertEquals(0, lh2.getLastAddConfirmed()); + assertEquals(new String(lh2.readEntries(0, 0).nextElement().getEntry()), + "Foobar"); + } } -- To stop receiving notification emails like this one, please contact si...@apache.org.