This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new a6544bd PIP-17: impl deleteOffloaded() for S3ManagedLedgerOffloader (#1784) a6544bd is described below commit a6544bdb82de3056c6350ae9ad389a0f0e5fd2f6 Author: Jia Zhai <zhaiji...@gmail.com> AuthorDate: Thu May 17 16:35:13 2018 +0800 PIP-17: impl deleteOffloaded() for S3ManagedLedgerOffloader (#1784) * s3 delete * change following @ivan's comments * fix issue after merge master --- .../broker/s3offload/S3ManagedLedgerOffloader.java | 17 +++++-- .../s3offload/S3ManagedLedgerOffloaderTest.java | 55 +++++++++++++++++++--- 2 files changed, 63 insertions(+), 9 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java index 4d5b388..7a73a3b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java @@ -18,13 +18,12 @@ */ package org.apache.pulsar.broker.s3offload; -import com.amazonaws.AmazonServiceException; -import com.amazonaws.SdkClientException; import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; import com.amazonaws.services.s3.model.ObjectMetadata; @@ -212,7 +211,19 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { @Override public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) { CompletableFuture<Void> promise = new CompletableFuture<>(); - promise.completeExceptionally(new UnsupportedOperationException()); + scheduler.submit(() -> { + try { + + s3client.deleteObjects(new DeleteObjectsRequest(bucket) + .withKeys(dataBlockOffloadKey(ledgerId, uid), indexBlockOffloadKey(ledgerId, uid))); + promise.complete(null); + } catch (Throwable t) { + log.error("Failed delete s3 Object ", t); + promise.completeExceptionally(t); + return; + } + }); + return promise; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java index f9a043b..ed44ddc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java @@ -25,12 +25,9 @@ import static org.mockito.Matchers.any; import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.AmazonS3; import io.netty.util.concurrent.DefaultThreadFactory; - -import java.io.DataInputStream; import java.lang.reflect.Method; import java.util.HashMap; import java.util.Iterator; -import java.util.Map; import java.util.Random; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -38,7 +35,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import lombok.extern.slf4j.Slf4j; - import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; @@ -52,9 +48,7 @@ import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl; import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl; -import org.apache.pulsar.broker.s3offload.impl.OffloadIndexBlockImpl; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; @@ -370,5 +364,54 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { } catch (BKException.BKIncorrectParameterException e) { } } + + @Test + public void testDeleteOffloaded() throws Exception { + int maxBlockSize = 1024; + int entryCount = 3; + ReadHandle readHandle = buildReadHandle(maxBlockSize, entryCount); + UUID uuid = UUID.randomUUID(); + LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, maxBlockSize, DEFAULT_READ_BUFFER_SIZE); + + // verify object exist after offload + offloader.offload(readHandle, uuid, new HashMap<>()).get(); + Assert.assertTrue(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertTrue(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + + // verify object deleted after delete + offloader.deleteOffloaded(readHandle.getId(), uuid).get(); + Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + } + + @Test + public void testDeleteOffloadedFail() throws Exception { + int maxBlockSize = 1024; + int entryCount = 3; + ReadHandle readHandle = buildReadHandle(maxBlockSize, entryCount); + UUID uuid = UUID.randomUUID(); + LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, maxBlockSize, DEFAULT_READ_BUFFER_SIZE); + String failureString = "fail deleteOffloaded"; + AmazonS3 mockS3client = Mockito.spy(s3client); + Mockito + .doThrow(new AmazonServiceException(failureString)) + .when(mockS3client).deleteObjects(any()); + + try { + // verify object exist after offload + offloader.offload(readHandle, uuid, new HashMap<>()).get(); + Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + + offloader.deleteOffloaded(readHandle.getId(), uuid).get(); + } catch (Exception e) { + // expected + Assert.assertTrue(e.getCause() instanceof AmazonServiceException); + Assert.assertTrue(e.getCause().getMessage().contains(failureString)); + // verify object still there. + Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + } + } } -- To stop receiving notification emails like this one, please contact si...@apache.org.