Repository: bookkeeper
Updated Branches:
  refs/heads/master 63395a3e3 -> 9c937f5d8


BOOKKEEPER-891: Read entries failure should trigger callback only once

When reading multiple entries with `LedgerHandle.asyncReadEntries()`, in case 
there is a read error, the callback is currently being invoked for each of the 
requested entries.
Since a single "success" callback is expected, we should also have a single 
"failure" callback invocation.

Author: Matteo Merli <mme...@apache.org>

Reviewers: Sijie Guo <si...@apache.org>

Closes #12 from merlimat/bk-891-read-callbacks


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/9c937f5d
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/9c937f5d
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/9c937f5d

Branch: refs/heads/master
Commit: 9c937f5d814d4079b6134db361a0e2de1e37cd05
Parents: 63395a3
Author: Matteo Merli <mme...@apache.org>
Authored: Mon Feb 8 23:39:50 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Mon Feb 8 23:39:50 2016 -0800

----------------------------------------------------------------------
 .../apache/bookkeeper/client/PendingReadOp.java |  6 +++
 .../bookkeeper/client/BookKeeperTest.java       | 50 ++++++++++++++++++++
 2 files changed, 56 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9c937f5d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
index d9c11d1..cafe8f7 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java
@@ -375,6 +375,11 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
     }
 
     private void submitCallback(int code) {
+        if (cb == null) {
+            // Callback had already been triggered before
+            return;
+        }
+
         long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos);
         if (code != BKException.Code.OK) {
             long firstUnread = LedgerHandle.INVALID_ENTRY_ID;
@@ -392,6 +397,7 @@ class PendingReadOp implements Enumeration<LedgerEntry>, 
ReadEntryCallback {
         }
         cancelSpeculativeTask(true);
         cb.readComplete(code, lh, PendingReadOp.this, PendingReadOp.this.ctx);
+        cb = null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9c937f5d/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
----------------------------------------------------------------------
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index b1a544b..a0eab35 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -1,5 +1,7 @@
 package org.apache.bookkeeper.client;
 
+import java.util.Enumeration;
+
 /*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
@@ -28,6 +30,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
+import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
+import 
org.apache.bookkeeper.client.BKException.BKBookieHandleNotAvailableException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.test.BaseTestCase;
 import org.apache.zookeeper.ZooKeeper;
@@ -240,4 +244,50 @@ public class BookKeeperTest extends BaseTestCase {
 
         bkc.close();
     }
+
+    @Test(timeout = 60000)
+    public void testReadFailureCallback() throws Exception {
+        ClientConfiguration conf = new 
ClientConfiguration().setZkServers(zkUtil.getZooKeeperConnectString());
+
+        BookKeeper bkc = new BookKeeper(conf);
+        LedgerHandle lh = bkc.createLedger(digestType, 
"testPasswd".getBytes());
+
+        final int numEntries = 10;
+        for (int i = 0; i < numEntries; i++) {
+            lh.addEntry(("entry-" + i).getBytes());
+        }
+
+        stopBKCluster();
+
+        try {
+            lh.readEntries(0, numEntries - 1);
+            fail("Read operation should have failed");
+        } catch (BKBookieHandleNotAvailableException e) {
+            // expected
+        }
+
+        final CountDownLatch counter = new CountDownLatch(1);
+        final AtomicInteger receivedResponses = new AtomicInteger(0);
+        final AtomicInteger returnCode = new AtomicInteger();
+        lh.asyncReadEntries(0, numEntries - 1, new ReadCallback() {
+            @Override
+            public void readComplete(int rc, LedgerHandle lh, 
Enumeration<LedgerEntry> seq, Object ctx) {
+                returnCode.set(rc);
+                receivedResponses.incrementAndGet();
+                counter.countDown();
+            }
+        }, null);
+
+        counter.await();
+
+        // Wait extra time to ensure no extra responses received
+        Thread.sleep(1000);
+
+        assertEquals(1, receivedResponses.get());
+        assertEquals(BKException.Code.BookieHandleNotAvailableException, 
returnCode.get());
+
+        bkc.close();
+
+        startBKCluster();
+    }
 }

Reply via email to