This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a6544bd  PIP-17: impl deleteOffloaded() for S3ManagedLedgerOffloader 
(#1784)
a6544bd is described below

commit a6544bdb82de3056c6350ae9ad389a0f0e5fd2f6
Author: Jia Zhai <zhaiji...@gmail.com>
AuthorDate: Thu May 17 16:35:13 2018 +0800

    PIP-17: impl deleteOffloaded() for S3ManagedLedgerOffloader (#1784)
    
    * s3 delete
    
    * change following @ivan's comments
    
    * fix issue after merge master
---
 .../broker/s3offload/S3ManagedLedgerOffloader.java | 17 +++++--
 .../s3offload/S3ManagedLedgerOffloaderTest.java    | 55 +++++++++++++++++++---
 2 files changed, 63 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
index 4d5b388..7a73a3b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
@@ -18,13 +18,12 @@
  */
 package org.apache.pulsar.broker.s3offload;
 
-import com.amazonaws.AmazonServiceException;
-import com.amazonaws.SdkClientException;
 import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
 import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
 import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
 import com.amazonaws.services.s3.model.ObjectMetadata;
@@ -212,7 +211,19 @@ public class S3ManagedLedgerOffloader implements 
LedgerOffloader {
     @Override
     public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        promise.completeExceptionally(new UnsupportedOperationException());
+        scheduler.submit(() -> {
+            try {
+
+                s3client.deleteObjects(new DeleteObjectsRequest(bucket)
+                    .withKeys(dataBlockOffloadKey(ledgerId, uid), 
indexBlockOffloadKey(ledgerId, uid)));
+                promise.complete(null);
+            } catch (Throwable t) {
+                log.error("Failed delete s3 Object ", t);
+                promise.completeExceptionally(t);
+                return;
+            }
+        });
+
         return promise;
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
index f9a043b..ed44ddc 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
@@ -25,12 +25,9 @@ import static org.mockito.Matchers.any;
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.s3.AmazonS3;
 import io.netty.util.concurrent.DefaultThreadFactory;
-
-import java.io.DataInputStream;
 import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
@@ -38,7 +35,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -52,9 +48,7 @@ import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import 
org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl;
 import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl;
-import org.apache.pulsar.broker.s3offload.impl.OffloadIndexBlockImpl;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -370,5 +364,54 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase {
         } catch (BKException.BKIncorrectParameterException e) {
         }
     }
+
+    @Test
+    public void testDeleteOffloaded() throws Exception {
+        int maxBlockSize = 1024;
+        int entryCount = 3;
+        ReadHandle readHandle = buildReadHandle(maxBlockSize, entryCount);
+        UUID uuid = UUID.randomUUID();
+        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler, maxBlockSize, DEFAULT_READ_BUFFER_SIZE);
+
+        // verify object exist after offload
+        offloader.offload(readHandle, uuid, new HashMap<>()).get();
+        Assert.assertTrue(s3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+        Assert.assertTrue(s3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+
+        // verify object deleted after delete
+        offloader.deleteOffloaded(readHandle.getId(), uuid).get();
+        Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+        Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+    }
+
+    @Test
+    public void testDeleteOffloadedFail() throws Exception {
+        int maxBlockSize = 1024;
+        int entryCount = 3;
+        ReadHandle readHandle = buildReadHandle(maxBlockSize, entryCount);
+        UUID uuid = UUID.randomUUID();
+        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler, maxBlockSize, DEFAULT_READ_BUFFER_SIZE);
+        String failureString = "fail deleteOffloaded";
+        AmazonS3 mockS3client = Mockito.spy(s3client);
+        Mockito
+            .doThrow(new AmazonServiceException(failureString))
+            .when(mockS3client).deleteObjects(any());
+
+        try {
+            // verify object exist after offload
+            offloader.offload(readHandle, uuid, new HashMap<>()).get();
+            Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+
+            offloader.deleteOffloaded(readHandle.getId(), uuid).get();
+        } catch (Exception e) {
+            // expected
+            Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
+            
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
+            // verify object still there.
+            Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+        }
+    }
 }
 

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to