Repository: bookkeeper Updated Branches: refs/heads/master bbd1eb8d8 -> bf4a4d6a0
BOOKKEEPER-924: addEntry() is susceptible to spurious wakeups Use Java8 CompletableFuture instead of SyncCounter Author: eolivelli <eolive...@gmail.com> Reviewers: si...@apache.org <si...@apache.org> Closes #60 from eolivelli/BOOKKEEPER-924 and squashes the following commits: 61e6b1a [eolivelli] BOOKKEEPER-924 addEntry() is susceptible to spurious wakeups 7d7eaf7 [eolivelli] BOOKKEEPER-924 addEntry() is susceptible to spurious wakeups f865610 [eolivelli] BOOKKEEPER-924 addEntry() is susceptible to spurious wakeups e75569a [eolivelli] BOOKKEEPER-924 addEntry() is susceptible to spurious wakeups cdd32c3 [eolivelli] BOOKKEEPER-924 addEntry() is susceptible to spurious wakeups Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/bf4a4d6a Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/bf4a4d6a Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/bf4a4d6a Branch: refs/heads/master Commit: bf4a4d6a07f9d615752054c6743035cebd86716e Parents: bbd1eb8 Author: eolivelli <eolive...@gmail.com> Authored: Thu Oct 13 00:27:18 2016 -0700 Committer: Sijie Guo <si...@apache.org> Committed: Thu Oct 13 00:27:18 2016 -0700 ---------------------------------------------------------------------- .../apache/bookkeeper/client/BookKeeper.java | 91 ++++++-------------- .../bookkeeper/client/BookKeeperAdmin.java | 64 ++++++-------- .../apache/bookkeeper/client/LedgerHandle.java | 56 +++--------- .../bookkeeper/client/LedgerHandleAdv.java | 14 +-- .../bookkeeper/client/SynchCallbackUtils.java | 67 ++++++++++++++ 5 files changed, 139 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bf4a4d6a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index b683ca4..08c24b0 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -65,6 +65,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; /** * BookKeeper client. We assume there is one single writer to a ledger at any @@ -616,27 +618,20 @@ public class BookKeeper implements AutoCloseable { public LedgerHandle createLedger(int ensSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType, byte passwd[], final Map<String, byte[]> customMetadata) throws InterruptedException, BKException { - SyncCounter counter = new SyncCounter(); - counter.inc(); + CompletableFuture<LedgerHandle> counter = new CompletableFuture<>(); + /* * Calls asynchronous version */ asyncCreateLedger(ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd, new SyncCreateCallback(), counter, customMetadata); - /* - * Wait - */ - counter.block(0); - if (counter.getrc() != BKException.Code.OK) { - LOG.error("Error while creating ledger : {}", counter.getrc()); - throw BKException.create(counter.getrc()); - } else if (counter.getLh() == null) { + LedgerHandle lh = SynchCallbackUtils.waitForResult(counter); + if (lh == null) { LOG.error("Unexpected condition : no ledger handle returned for a success ledger creation"); throw BKException.create(BKException.Code.UnexpectedConditionException); } - - return counter.getLh(); + return lh; } /** @@ -682,27 +677,20 @@ public class BookKeeper implements AutoCloseable { public LedgerHandle createLedgerAdv(int ensSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType, byte passwd[], final Map<String, byte[]> customMetadata) throws InterruptedException, BKException { - SyncCounter counter = new SyncCounter(); - counter.inc(); + CompletableFuture<LedgerHandle> counter = new CompletableFuture<>(); + /* * Calls asynchronous version */ asyncCreateLedgerAdv(ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd, new SyncCreateCallback(), counter, customMetadata); - /* - * Wait - */ - counter.block(0); - if (counter.getrc() != BKException.Code.OK) { - LOG.error("Error while creating ledger : {}", counter.getrc()); - throw BKException.create(counter.getrc()); - } else if (counter.getLh() == null) { + LedgerHandle lh = SynchCallbackUtils.waitForResult(counter); + if (lh == null) { LOG.error("Unexpected condition : no ledger handle returned for a success ledger creation"); throw BKException.create(BKException.Code.UnexpectedConditionException); } - - return counter.getLh(); + return lh; } /** @@ -855,22 +843,14 @@ public class BookKeeper implements AutoCloseable { public LedgerHandle openLedger(long lId, DigestType digestType, byte passwd[]) throws BKException, InterruptedException { - SyncCounter counter = new SyncCounter(); - counter.inc(); + CompletableFuture<LedgerHandle> counter = new CompletableFuture<>(); /* * Calls async open ledger */ asyncOpenLedger(lId, digestType, passwd, new SyncOpenCallback(), counter); - /* - * Wait - */ - counter.block(0); - if (counter.getrc() != BKException.Code.OK) - throw BKException.create(counter.getrc()); - - return counter.getLh(); + return SynchCallbackUtils.waitForResult(counter); } /** @@ -890,8 +870,7 @@ public class BookKeeper implements AutoCloseable { public LedgerHandle openLedgerNoRecovery(long lId, DigestType digestType, byte passwd[]) throws BKException, InterruptedException { - SyncCounter counter = new SyncCounter(); - counter.inc(); + CompletableFuture<LedgerHandle> counter = new CompletableFuture<>(); /* * Calls async open ledger @@ -899,14 +878,7 @@ public class BookKeeper implements AutoCloseable { asyncOpenLedgerNoRecovery(lId, digestType, passwd, new SyncOpenCallback(), counter); - /* - * Wait - */ - counter.block(0); - if (counter.getrc() != BKException.Code.OK) - throw BKException.create(counter.getrc()); - - return counter.getLh(); + return SynchCallbackUtils.waitForResult(counter); } /** @@ -944,16 +916,11 @@ public class BookKeeper implements AutoCloseable { * @throws BKException */ public void deleteLedger(long lId) throws InterruptedException, BKException { - SyncCounter counter = new SyncCounter(); - counter.inc(); + CompletableFuture<Void> counter = new CompletableFuture<>(); // Call asynchronous version asyncDeleteLedger(lId, new SyncDeleteCallback(), counter); - // Wait - counter.block(0); - if (counter.getrc() != BKException.Code.OK) { - LOG.error("Error deleting ledger " + lId + " : " + counter.getrc()); - throw BKException.create(counter.getrc()); - } + + SynchCallbackUtils.waitForResult(counter); } /** @@ -1079,11 +1046,9 @@ public class BookKeeper implements AutoCloseable { * optional control object */ @Override + @SuppressWarnings("unchecked") public void createComplete(int rc, LedgerHandle lh, Object ctx) { - SyncCounter counter = (SyncCounter) ctx; - counter.setLh(lh); - counter.setrc(rc); - counter.dec(); + SynchCallbackUtils.finish(rc, lh, (CompletableFuture<LedgerHandle>) ctx); } } @@ -1099,14 +1064,9 @@ public class BookKeeper implements AutoCloseable { * optional control object */ @Override + @SuppressWarnings("unchecked") public void openComplete(int rc, LedgerHandle lh, Object ctx) { - SyncCounter counter = (SyncCounter) ctx; - counter.setLh(lh); - - LOG.debug("Open complete: {}", rc); - - counter.setrc(rc); - counter.dec(); + SynchCallbackUtils.finish(rc, lh, (CompletableFuture<LedgerHandle>) ctx); } } @@ -1120,10 +1080,9 @@ public class BookKeeper implements AutoCloseable { * optional control object */ @Override + @SuppressWarnings("unchecked") public void deleteComplete(int rc, Object ctx) { - SyncCounter counter = (SyncCounter) ctx; - counter.setrc(rc); - counter.dec(); + SynchCallbackUtils.finish(rc, null, (CompletableFuture<Void>) ctx); } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bf4a4d6a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java index 022d4da..dd8fde4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperAdmin.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; +import java.util.Enumeration; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -35,6 +36,8 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Random; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; import org.apache.bookkeeper.client.AsyncCallback.RecoverCallback; @@ -260,18 +263,11 @@ public class BookKeeperAdmin { */ public LedgerHandle openLedger(final long lId) throws InterruptedException, BKException { - SyncCounter counter = new SyncCounter(); - counter.inc(); + CompletableFuture<LedgerHandle> counter = new CompletableFuture<>(); + new LedgerOpenOp(bkc, lId, new SyncOpenCallback(), counter).initiate(); - /* - * Wait - */ - counter.block(0); - if (counter.getrc() != BKException.Code.OK) { - throw BKException.create(counter.getrc()); - } - return counter.getLh(); + return SynchCallbackUtils.waitForResult(counter); } /** @@ -303,19 +299,12 @@ public class BookKeeperAdmin { */ public LedgerHandle openLedgerNoRecovery(final long lId) throws InterruptedException, BKException { - SyncCounter counter = new SyncCounter(); - counter.inc(); + CompletableFuture<LedgerHandle> counter = new CompletableFuture<>(); + new LedgerOpenOp(bkc, lId, new SyncOpenCallback(), counter) .initiateWithoutRecovery(); - /* - * Wait - */ - counter.block(0); - if (counter.getrc() != BKException.Code.OK) { - throw BKException.create(counter.getrc()); - } - return counter.getLh(); + return SynchCallbackUtils.waitForResult(counter); } /** @@ -384,16 +373,13 @@ public class BookKeeperAdmin { } if (lastEntryId == -1 || nextEntryId <= lastEntryId) { try { - SyncCounter counter = new SyncCounter(); - counter.inc(); + CompletableFuture<Enumeration<LedgerEntry>> counter = new CompletableFuture<>(); handle.asyncReadEntriesInternal(nextEntryId, nextEntryId, new LedgerHandle.SyncReadCallback(), counter); - counter.block(0); - if (counter.getrc() != BKException.Code.OK) { - throw BKException.create(counter.getrc()); - } - currentEntry = counter.getSequence().nextElement(); + + currentEntry = SynchCallbackUtils.waitForResult(counter).nextElement(); + return true; } catch (Exception e) { if (e instanceof BKException.BKNoSuchEntryException && lastEntryId == -1) { @@ -862,31 +848,33 @@ public class BookKeeperAdmin { final LedgerFragment ledgerFragment, final BookieSocketAddress targetBookieAddress) throws InterruptedException, BKException { - SyncCounter syncCounter = new SyncCounter(); - ResultCallBack resultCallBack = new ResultCallBack(syncCounter); + CompletableFuture<Void> counter = new CompletableFuture<>(); + ResultCallBack resultCallBack = new ResultCallBack(counter); SingleFragmentCallback cb = new SingleFragmentCallback(resultCallBack, lh, ledgerFragment.getFirstEntryId(), ledgerFragment .getAddress(), targetBookieAddress); - syncCounter.inc(); + asyncRecoverLedgerFragment(lh, ledgerFragment, cb, targetBookieAddress); - syncCounter.block(0); - if (syncCounter.getrc() != BKException.Code.OK) { - throw BKException.create(bkc.getReturnRc(syncCounter.getrc())); + + try { + SynchCallbackUtils.waitForResult(counter); + } catch (BKException err) { + throw BKException.create(bkc.getReturnRc(err.getCode())); } } /** This is the class for getting the replication result */ static class ResultCallBack implements AsyncCallback.VoidCallback { - private SyncCounter sync; + private final CompletableFuture<Void> sync; - public ResultCallBack(SyncCounter sync) { + public ResultCallBack(CompletableFuture<Void> sync) { this.sync = sync; } @Override - public void processResult(int rc, String s, Object obj) { - sync.setrc(rc); - sync.dec(); + @SuppressWarnings("unchecked") + public void processResult(int rc, String s, Object ctx) { + SynchCallbackUtils.finish(rc, null, sync); } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bf4a4d6a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 06f84eb..5c33929 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -55,6 +55,8 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Sets; import com.google.common.util.concurrent.RateLimiter; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; /** * Ledger handle contains ledger metadata and is used to access the read and @@ -266,15 +268,11 @@ public class LedgerHandle implements AutoCloseable { */ public void close() throws InterruptedException, BKException { - SyncCounter counter = new SyncCounter(); - counter.inc(); + CompletableFuture<Void> counter = new CompletableFuture<>(); asyncClose(new SyncCloseCallback(), counter); - counter.block(0); - if (counter.getrc() != BKException.Code.OK) { - throw BKException.create(counter.getrc()); - } + SynchCallbackUtils.waitForResult(counter); } /** @@ -461,17 +459,11 @@ public class LedgerHandle implements AutoCloseable { */ public Enumeration<LedgerEntry> readEntries(long firstEntry, long lastEntry) throws InterruptedException, BKException { - SyncCounter counter = new SyncCounter(); - counter.inc(); + CompletableFuture<Enumeration<LedgerEntry>> counter = new CompletableFuture<>(); asyncReadEntries(firstEntry, lastEntry, new SyncReadCallback(), counter); - counter.block(0); - if (counter.getrc() != BKException.Code.OK) { - throw BKException.create(counter.getrc()); - } - - return counter.getSequence(); + return SynchCallbackUtils.waitForResult(counter); } /** @@ -550,18 +542,12 @@ public class LedgerHandle implements AutoCloseable { throws InterruptedException, BKException { LOG.debug("Adding entry {}", data); - SyncCounter counter = new SyncCounter(); - counter.inc(); + CompletableFuture<Long> counter = new CompletableFuture<>(); SyncAddCallback callback = new SyncAddCallback(); asyncAddEntry(data, offset, length, callback, counter); - counter.block(0); - - if (counter.getrc() != BKException.Code.OK) { - throw BKException.create(counter.getrc()); - } - return callback.entryId; + return SynchCallbackUtils.waitForResult(counter); } /** @@ -1353,21 +1339,14 @@ public class LedgerHandle implements AutoCloseable { * control object */ @Override + @SuppressWarnings("unchecked") public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) { - - SyncCounter counter = (SyncCounter) ctx; - synchronized (counter) { - counter.setSequence(seq); - counter.setrc(rc); - counter.dec(); - counter.notify(); - } + SynchCallbackUtils.finish(rc, seq, (CompletableFuture<Enumeration<LedgerEntry>>)ctx); } } static class SyncAddCallback implements AddCallback { - long entryId = -1; /** * Implementation of callback interface for synchronous read method. @@ -1382,12 +1361,9 @@ public class LedgerHandle implements AutoCloseable { * control object */ @Override + @SuppressWarnings("unchecked") public void addComplete(int rc, LedgerHandle lh, long entry, Object ctx) { - SyncCounter counter = (SyncCounter) ctx; - - this.entryId = entry; - counter.setrc(rc); - counter.dec(); + SynchCallbackUtils.finish(rc, entry, (CompletableFuture<Long>)ctx); } } @@ -1416,13 +1392,9 @@ public class LedgerHandle implements AutoCloseable { * @param ctx */ @Override + @SuppressWarnings("unchecked") public void closeComplete(int rc, LedgerHandle lh, Object ctx) { - SyncCounter counter = (SyncCounter) ctx; - counter.setrc(rc); - synchronized (counter) { - counter.dec(); - counter.notify(); - } + SynchCallbackUtils.finish(rc, null, (CompletableFuture<Void>)ctx); } } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bf4a4d6a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java index c69a0e5..4a7de57 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java @@ -24,6 +24,8 @@ package org.apache.bookkeeper.client; import java.io.Serializable; import java.security.GeneralSecurityException; import java.util.Comparator; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionException; @@ -90,18 +92,16 @@ public class LedgerHandleAdv extends LedgerHandle { BKException { LOG.debug("Adding entry {}", data); - SyncCounter counter = new SyncCounter(); - counter.inc(); + CompletableFuture<Long> counter = new CompletableFuture<>(); SyncAddCallback callback = new SyncAddCallback(); asyncAddEntry(entryId, data, offset, length, callback, counter); - counter.block(0); - - if (counter.getrc() != BKException.Code.OK) { - throw BKException.create(counter.getrc()); + try { + return counter.get(); + } catch (ExecutionException err) { + throw (BKException) err.getCause(); } - return callback.entryId; } /** http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bf4a4d6a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SynchCallbackUtils.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SynchCallbackUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SynchCallbackUtils.java new file mode 100644 index 0000000..d1ef9e4 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/SynchCallbackUtils.java @@ -0,0 +1,67 @@ +/* + * Copyright 2016 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.client; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** + * Utility for callbacks + * + */ +public class SynchCallbackUtils { + + /** + * Wait for a result. This is convenience method to implement callbacks + * + * @param <T> + * @param future + * @return + * @throws InterruptedException + * @throws BKException + */ + public static <T> T waitForResult(CompletableFuture<T> future) throws InterruptedException, BKException { + try { + return future.get(); + } catch (ExecutionException err) { + if (err.getCause() instanceof BKException) { + throw (BKException) err.getCause(); + } else { + BKException unexpectedConditionException + = BKException.create(BKException.Code.UnexpectedConditionException); + unexpectedConditionException.initCause(err.getCause()); + throw unexpectedConditionException; + } + + } + } + + /** + * Handle the Response Code and transform it to a BKException + * @param <T> + * @param rc + * @param result + * @param future + */ + public static <T> void finish(int rc, T result, CompletableFuture<T> future) { + if (rc != BKException.Code.OK) { + future.completeExceptionally(BKException.create(rc).fillInStackTrace()); + } else { + future.complete(result); + } + } + +}