This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-4.7 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.7 by this push: new 51040cf ISSUE #1578: Fixed deadlock in auditor blocking ZK thread 51040cf is described below commit 51040cf0942615b915720287b8bb3d9378583ec0 Author: Matteo Merli <mme...@apache.org> AuthorDate: Tue Aug 21 11:42:14 2018 -0700 ISSUE #1578: Fixed deadlock in auditor blocking ZK thread ### Motivation Fixes #1578 After getting ZK callback from ZK event thread, we need to jump to a background thread before doing synchronous call to `admin.openLedgerNoRecovery(ledgerId);` which will try to make a ZK request a wait for a response (which would be coming through same ZK event thread currently blocked..) Author: Matteo Merli <mme...@apache.org> Reviewers: Enrico Olivelli <eolive...@gmail.com>, Sijie Guo <si...@apache.org> This closes #1608 from merlimat/fix-auditor-deadlock, closes #1578 (cherry picked from commit f782a9d818a12479d08c580a68b2566715da4c89) Signed-off-by: Sijie Guo <si...@apache.org> --- .../org/apache/bookkeeper/replication/Auditor.java | 83 ++++++++++++---------- 1 file changed, 44 insertions(+), 39 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java index 15868d9..8578a5b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; @@ -640,47 +641,51 @@ public class Auditor { return; } - LedgerHandle lh = null; - try { - lh = admin.openLedgerNoRecovery(ledgerId); - checker.checkLedger(lh, - new ProcessLostFragmentsCb(lh, callback), - conf.getAuditorLedgerVerificationPercentage()); - // we collect the following stats to get a measure of the - // distribution of a single ledger within the bk cluster - // the higher the number of fragments/bookies, the more distributed it is - numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments()); - numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies()); - numLedgersChecked.inc(); - } catch (BKException.BKNoSuchLedgerExistsException bknsle) { - if (LOG.isDebugEnabled()) { - LOG.debug("Ledger was deleted before we could check it", bknsle); - } - callback.processResult(BKException.Code.OK, - null, null); - return; - } catch (BKException bke) { - LOG.error("Couldn't open ledger " + ledgerId, bke); - callback.processResult(BKException.Code.BookieHandleNotAvailableException, - null, null); - return; - } catch (InterruptedException ie) { - LOG.error("Interrupted opening ledger", ie); - Thread.currentThread().interrupt(); - callback.processResult(BKException.Code.InterruptedException, null, null); - return; - } finally { - if (lh != null) { - try { - lh.close(); - } catch (BKException bke) { - LOG.warn("Couldn't close ledger " + ledgerId, bke); - } catch (InterruptedException ie) { - LOG.warn("Interrupted closing ledger " + ledgerId, ie); - Thread.currentThread().interrupt(); + // Do not perform blocking calls that involve making ZK calls from within the ZK + // event thread. Jump to background thread instead to avoid deadlock. + ForkJoinPool.commonPool().execute(() -> { + LedgerHandle lh = null; + try { + lh = admin.openLedgerNoRecovery(ledgerId); + checker.checkLedger(lh, + new ProcessLostFragmentsCb(lh, callback), + conf.getAuditorLedgerVerificationPercentage()); + // we collect the following stats to get a measure of the + // distribution of a single ledger within the bk cluster + // the higher the number of fragments/bookies, the more distributed it is + numFragmentsPerLedger.registerSuccessfulValue(lh.getNumFragments()); + numBookiesPerLedger.registerSuccessfulValue(lh.getNumBookies()); + numLedgersChecked.inc(); + } catch (BKException.BKNoSuchLedgerExistsException bknsle) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ledger was deleted before we could check it", bknsle); + } + callback.processResult(BKException.Code.OK, + null, null); + return; + } catch (BKException bke) { + LOG.error("Couldn't open ledger " + ledgerId, bke); + callback.processResult(BKException.Code.BookieHandleNotAvailableException, + null, null); + return; + } catch (InterruptedException ie) { + LOG.error("Interrupted opening ledger", ie); + Thread.currentThread().interrupt(); + callback.processResult(BKException.Code.InterruptedException, null, null); + return; + } finally { + if (lh != null) { + try { + lh.close(); + } catch (BKException bke) { + LOG.warn("Couldn't close ledger " + ledgerId, bke); + } catch (InterruptedException ie) { + LOG.warn("Interrupted closing ledger " + ledgerId, ie); + Thread.currentThread().interrupt(); + } } } - } + }); } };