This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new df6962e S3 offloader should throw an error on receiving an empty ledger (#1855) df6962e is described below commit df6962e4ce359a22bcad097ac8cad3b55d975dd2 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Tue May 29 20:07:51 2018 +0200 S3 offloader should throw an error on receiving an empty ledger (#1855) ManagedLedger should never send an empty ledger for offload, as its a waste of resources. This patch adds a defensive check to ensure that if the S3 offload does get an empty ledger, it doesn't even attempt to create any resources on the S3 side. Master issue: #1511 --- .../broker/s3offload/S3ManagedLedgerOffloader.java | 5 +++++ .../s3offload/S3ManagedLedgerOffloaderTest.java | 25 ++++++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java index 65ffd37..dcfa9e8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java @@ -114,6 +114,11 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { Map<String, String> extraMetadata) { CompletableFuture<Void> promise = new CompletableFuture<>(); scheduler.chooseThread(readHandle.getId()).submit(() -> { + if (readHandle.getLength() == 0 || !readHandle.isClosed() || readHandle.getLastAddConfirmed() < 0) { + promise.completeExceptionally( + new IllegalArgumentException("An empty or open ledger should never be offloaded")); + return; + } OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create() .withLedgerMetadata(readHandle.getLedgerMetadata()) .withDataBlockHeaderLength(BlockAwareSegmentInputStreamImpl.getHeaderSize()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java index 9f0d253..1b5ad03 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.s3offload; import static org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader.dataBlockOffloadKey; import static org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader.indexBlockOffloadKey; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.AmazonS3; @@ -30,6 +31,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Random; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; @@ -422,5 +424,28 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); } } + + @Test + public void testOffloadEmpty() throws Exception { + CompletableFuture<LedgerEntries> noEntries = new CompletableFuture<>(); + noEntries.completeExceptionally(new BKException.BKReadException()); + + ReadHandle readHandle = Mockito.mock(ReadHandle.class); + Mockito.doReturn(-1L).when(readHandle).getLastAddConfirmed(); + Mockito.doReturn(noEntries).when(readHandle).readAsync(anyLong(), anyLong()); + Mockito.doReturn(0L).when(readHandle).getLength(); + Mockito.doReturn(true).when(readHandle).isClosed(); + Mockito.doReturn(1234L).when(readHandle).getId(); + + UUID uuid = UUID.randomUUID(); + LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); + try { + offloader.offload(readHandle, uuid, new HashMap<>()).get(); + Assert.fail("Shouldn't have been able to offload"); + } catch (ExecutionException e) { + Assert.assertEquals(e.getCause().getClass(), IllegalArgumentException.class); + } + } } -- To stop receiving notification emails like this one, please contact si...@apache.org.