Repository: bookkeeper Updated Branches: refs/heads/master 63395a3e3 -> 9c937f5d8
BOOKKEEPER-891: Read entries failure should trigger callback only once When reading multiple entries with `LedgerHandle.asyncReadEntries()`, in case there is a read error, the callback is currently being invoked for each of the requested entries. Since a single "success" callback is expected, we should also have a single "failure" callback invocation. Author: Matteo Merli <mme...@apache.org> Reviewers: Sijie Guo <si...@apache.org> Closes #12 from merlimat/bk-891-read-callbacks Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/9c937f5d Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/9c937f5d Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/9c937f5d Branch: refs/heads/master Commit: 9c937f5d814d4079b6134db361a0e2de1e37cd05 Parents: 63395a3 Author: Matteo Merli <mme...@apache.org> Authored: Mon Feb 8 23:39:50 2016 -0800 Committer: Sijie Guo <si...@apache.org> Committed: Mon Feb 8 23:39:50 2016 -0800 ---------------------------------------------------------------------- .../apache/bookkeeper/client/PendingReadOp.java | 6 +++ .../bookkeeper/client/BookKeeperTest.java | 50 ++++++++++++++++++++ 2 files changed, 56 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9c937f5d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java index d9c11d1..cafe8f7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java @@ -375,6 +375,11 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback { } private void submitCallback(int code) { + if (cb == null) { + // Callback had already been triggered before + return; + } + long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos); if (code != BKException.Code.OK) { long firstUnread = LedgerHandle.INVALID_ENTRY_ID; @@ -392,6 +397,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, ReadEntryCallback { } cancelSpeculativeTask(true); cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx); + cb = null; } @Override http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9c937f5d/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java index b1a544b..a0eab35 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java @@ -1,5 +1,7 @@ package org.apache.bookkeeper.client; +import java.util.Enumeration; + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -28,6 +30,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; +import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; +import org.apache.bookkeeper.client.BKException.BKBookieHandleNotAvailableException; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.test.BaseTestCase; import org.apache.zookeeper.ZooKeeper; @@ -240,4 +244,50 @@ public class BookKeeperTest extends BaseTestCase { bkc.close(); } + + @Test(timeout = 60000) + public void testReadFailureCallback() throws Exception { + ClientConfiguration conf = new ClientConfiguration().setZkServers(zkUtil.getZooKeeperConnectString()); + + BookKeeper bkc = new BookKeeper(conf); + LedgerHandle lh = bkc.createLedger(digestType, "testPasswd".getBytes()); + + final int numEntries = 10; + for (int i = 0; i < numEntries; i++) { + lh.addEntry(("entry-" + i).getBytes()); + } + + stopBKCluster(); + + try { + lh.readEntries(0, numEntries - 1); + fail("Read operation should have failed"); + } catch (BKBookieHandleNotAvailableException e) { + // expected + } + + final CountDownLatch counter = new CountDownLatch(1); + final AtomicInteger receivedResponses = new AtomicInteger(0); + final AtomicInteger returnCode = new AtomicInteger(); + lh.asyncReadEntries(0, numEntries - 1, new ReadCallback() { + @Override + public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) { + returnCode.set(rc); + receivedResponses.incrementAndGet(); + counter.countDown(); + } + }, null); + + counter.await(); + + // Wait extra time to ensure no extra responses received + Thread.sleep(1000); + + assertEquals(1, receivedResponses.get()); + assertEquals(BKException.Code.BookieHandleNotAvailableException, returnCode.get()); + + bkc.close(); + + startBKCluster(); + } }