This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/distributedlog.git
The following commit(s) were added to refs/heads/master by this push: new 25090fc Make distributedlog compiled with latest bookkeeper version 25090fc is described below commit 25090fc94788948f2c6b1218ad63aa0e4db9c09b Author: Sijie Guo <si...@apache.org> AuthorDate: Wed Nov 29 23:30:32 2017 -0800 Make distributedlog compiled with latest bookkeeper version Descriptions of the changes in this PR: There are code changes on PendingReadOp for new bookkeeper api in current master. DistributedLog uses PendingReadOp for some administration tools. So the current master doesn't compile with the latest bookkeeper version. This code change is to fix that. The change here includes: - bump bk to 4.7.0-SNAPSHOT (will switch to 4.6.0 after it is released) - change to use the latest CompletableFuture in latest PendingReadOp. (this change doesn't target at making distributedlog work with new API) Author: Sijie Guo <si...@apache.org> Reviewers: Jia Zhai <None> This closes #240 from sijie/sijie/use_new_ledger_api --- .../org/apache/bookkeeper/client/LedgerReader.java | 67 ++++++++++++++-------- pom.xml | 2 +- 2 files changed, 43 insertions(+), 26 deletions(-) diff --git a/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java b/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java index ccdc52b..6c6bd4a 100644 --- a/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java +++ b/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java @@ -21,27 +21,27 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.net.InetSocketAddress; import java.util.ArrayList; -import java.util.Enumeration; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.SortedMap; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.bookkeeper.client.BKException.BKNoSuchEntryException; +import org.apache.bookkeeper.client.BKException.Code; import org.apache.bookkeeper.client.DistributionSchedule.WriteSet; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.impl.LedgerEntryImpl; +import org.apache.bookkeeper.common.concurrent.FutureEventListener; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieClient; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Reader used for DL tools to read entries. */ public class LedgerReader { - private static final Logger logger = LoggerFactory.getLogger(LedgerReader.class); - /** * Read Result Holder. */ @@ -137,21 +137,42 @@ public class LedgerReader { final GenericCallback<List<LedgerEntry>> callback) { final List<LedgerEntry> resultList = new ArrayList<LedgerEntry>(); - final AsyncCallback.ReadCallback readCallback = new AsyncCallback.ReadCallback() { + final FutureEventListener<LedgerEntries> readListener = new FutureEventListener<LedgerEntries>() { + + private void readNext(long entryId) { + PendingReadOp op = new PendingReadOp(lh, lh.bk.scheduler, entryId, entryId, false); + op.future().whenComplete(this); + op.submit(); + } + @Override - public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entries, Object ctx) { - if (BKException.Code.NoSuchEntryException == rc) { - callback.operationComplete(BKException.Code.OK, resultList); - } else if (BKException.Code.OK == rc) { - while (entries.hasMoreElements()) { - resultList.add(entries.nextElement()); - } - long entryId = (Long) ctx; - ++entryId; - PendingReadOp readOp = new PendingReadOp(lh, lh.bk.scheduler, entryId, entryId, this, entryId); - readOp.initiate(); + public void onSuccess(LedgerEntries ledgerEntries) { + long entryId = -1L; + for (org.apache.bookkeeper.client.api.LedgerEntry entry : ledgerEntries) { + resultList.add(new LedgerEntry((LedgerEntryImpl) entry)); + entryId = entry.getEntryId(); + } + try { + ledgerEntries.close(); + } catch (Exception e) { + // bk should not throw any exceptions here + } + ++entryId; + readNext(entryId); + } + + @Override + public void onFailure(Throwable throwable) { + if (throwable instanceof BKNoSuchEntryException) { + callback.operationComplete(Code.OK, resultList); } else { - callback.operationComplete(rc, resultList); + int retCode; + if (throwable instanceof BKException) { + retCode = ((BKException) throwable).getCode(); + } else { + retCode = Code.UnexpectedConditionException; + } + callback.operationComplete(retCode, resultList); } } }; @@ -169,13 +190,9 @@ public class LedgerReader { } long entryId = recoveryData.lastAddConfirmed; - PendingReadOp readOp = new PendingReadOp(lh, lh.bk.scheduler, entryId, entryId, readCallback, entryId); - try { - readOp.initiate(); - } catch (Throwable t) { - logger.error("Failed to initialize pending read entry {} for ledger {} : ", - new Object[] { entryId, lh.getLedgerMetadata(), t }); - } + PendingReadOp op = new PendingReadOp(lh, lh.bk.scheduler, entryId, entryId, false); + op.future().whenComplete(readListener); + op.submit(); }; // Read Last AddConfirmed new ReadLastConfirmedOp(lh, readLACCallback).initiate(); diff --git a/pom.xml b/pom.xml index 616d755..738ef81 100644 --- a/pom.xml +++ b/pom.xml @@ -100,7 +100,7 @@ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <!-- dependencies --> - <bookkeeper.version>4.6.0-SNAPSHOT</bookkeeper.version> + <bookkeeper.version>4.7.0-SNAPSHOT</bookkeeper.version> <codahale.metrics.version>3.0.1</codahale.metrics.version> <commons-cli.version>1.1</commons-cli.version> <commons-codec.version>1.6</commons-codec.version> -- To stop receiving notification emails like this one, please contact ['"distributedlog-commits@bookkeeper.apache.org" <distributedlog-commits@bookkeeper.apache.org>'].