merlimat commented on a change in pull request #1513: Managed ledger uses 
ReadHandle in read path
URL: https://github.com/apache/incubator-pulsar/pull/1513#discussion_r179807644
 
 

 ##########
 File path: 
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
 ##########
 @@ -239,43 +243,44 @@ public void asyncReadEntry(LedgerHandle lh, long 
firstEntry, long lastEntry, boo
             }
 
             // Read all the entries from bookkeeper
-            lh.asyncReadEntries(firstEntry, lastEntry, (rc, lh1, sequence, cb) 
-> {
-
-                if (rc != BKException.Code.OK) {
-                    if (rc == BKException.Code.TooManyRequestsException) {
-                        
callback.readEntriesFailed(createManagedLedgerException(rc), ctx);
-                    } else {
-                        ml.invalidateLedgerHandle(lh1, rc);
-                        ManagedLedgerException mlException = 
createManagedLedgerException(rc);
-                        callback.readEntriesFailed(mlException, ctx);
-                    }
-                    return;
-                }
-
-                checkNotNull(ml.getName());
-                checkNotNull(ml.getExecutor());
-                ml.getExecutor().executeOrdered(ml.getName(), safeRun(() -> {
-                    // We got the entries, we need to transform them to a 
List<> type
-                    long totalSize = 0;
-                    final List<EntryImpl> entriesToReturn = 
Lists.newArrayListWithExpectedSize(entriesToRead);
-                    while (sequence.hasMoreElements()) {
-                        // Insert the entries at the end of the list (they 
will be unsorted for now)
-                        LedgerEntry ledgerEntry = sequence.nextElement();
-                        EntryImpl entry = EntryImpl.create(ledgerEntry);
-                        ledgerEntry.getEntryBuffer().release();
-
-                        entriesToReturn.add(entry);
-
-                        totalSize += entry.getLength();
-
-                    }
-
-                    
manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
-                    ml.getMBean().addReadEntriesSample(entriesToReturn.size(), 
totalSize);
-
-                    callback.readEntriesComplete((List) entriesToReturn, ctx);
-                }));
-            }, callback);
+            lh.readAsync(firstEntry, lastEntry).whenComplete(
+                    (ledgerEntries, exception) -> {
+                        if (exception != null) {
+                            if (exception instanceof BKException
+                                && ((BKException)exception).getCode() == 
BKException.Code.TooManyRequestsException) {
+                                
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
+                            } else {
+                                ml.invalidateLedgerHandle(lh, exception);
+                                ManagedLedgerException mlException = 
createManagedLedgerException(exception);
+                                callback.readEntriesFailed(mlException, ctx);
+                            }
+                            return;
+                        }
+
+                        checkNotNull(ml.getName());
+                        checkNotNull(ml.getExecutor());
+                        ml.getExecutor().executeOrdered(ml.getName(), 
safeRun(() -> {
 
 Review comment:
   As you did in some other part of code, even here we could use 
`whenCompleteAsync()` with the `ml.executor().chooseThread(ml.getName()`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to