sijie closed pull request #1784: PIP-17: impl deleteOffloaded() for 
S3ManagedLedgerOffloader
URL: https://github.com/apache/incubator-pulsar/pull/1784
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
index 4d5b388a59..7a73a3bbc8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
@@ -18,13 +18,12 @@
  */
 package org.apache.pulsar.broker.s3offload;
 
-import com.amazonaws.AmazonServiceException;
-import com.amazonaws.SdkClientException;
 import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
 import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
 import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
 import com.amazonaws.services.s3.model.ObjectMetadata;
@@ -212,7 +211,19 @@ static String indexBlockOffloadKey(long ledgerId, UUID 
uuid) {
     @Override
     public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
-        promise.completeExceptionally(new UnsupportedOperationException());
+        scheduler.submit(() -> {
+            try {
+
+                s3client.deleteObjects(new DeleteObjectsRequest(bucket)
+                    .withKeys(dataBlockOffloadKey(ledgerId, uid), 
indexBlockOffloadKey(ledgerId, uid)));
+                promise.complete(null);
+            } catch (Throwable t) {
+                log.error("Failed delete s3 Object ", t);
+                promise.completeExceptionally(t);
+                return;
+            }
+        });
+
         return promise;
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
index f9a043b179..ed44ddc586 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java
@@ -25,12 +25,9 @@
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.s3.AmazonS3;
 import io.netty.util.concurrent.DefaultThreadFactory;
-
-import java.io.DataInputStream;
 import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
@@ -38,7 +35,6 @@
 import java.util.concurrent.ScheduledExecutorService;
 
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -52,9 +48,7 @@
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import 
org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl;
 import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl;
-import org.apache.pulsar.broker.s3offload.impl.OffloadIndexBlockImpl;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -370,5 +364,54 @@ public void testOffloadReadInvalidEntryIds() throws 
Exception {
         } catch (BKException.BKIncorrectParameterException e) {
         }
     }
+
+    @Test
+    public void testDeleteOffloaded() throws Exception {
+        int maxBlockSize = 1024;
+        int entryCount = 3;
+        ReadHandle readHandle = buildReadHandle(maxBlockSize, entryCount);
+        UUID uuid = UUID.randomUUID();
+        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler, maxBlockSize, DEFAULT_READ_BUFFER_SIZE);
+
+        // verify object exist after offload
+        offloader.offload(readHandle, uuid, new HashMap<>()).get();
+        Assert.assertTrue(s3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+        Assert.assertTrue(s3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+
+        // verify object deleted after delete
+        offloader.deleteOffloaded(readHandle.getId(), uuid).get();
+        Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+        Assert.assertFalse(s3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+    }
+
+    @Test
+    public void testDeleteOffloadedFail() throws Exception {
+        int maxBlockSize = 1024;
+        int entryCount = 3;
+        ReadHandle readHandle = buildReadHandle(maxBlockSize, entryCount);
+        UUID uuid = UUID.randomUUID();
+        LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, 
BUCKET, scheduler, maxBlockSize, DEFAULT_READ_BUFFER_SIZE);
+        String failureString = "fail deleteOffloaded";
+        AmazonS3 mockS3client = Mockito.spy(s3client);
+        Mockito
+            .doThrow(new AmazonServiceException(failureString))
+            .when(mockS3client).deleteObjects(any());
+
+        try {
+            // verify object exist after offload
+            offloader.offload(readHandle, uuid, new HashMap<>()).get();
+            Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+
+            offloader.deleteOffloaded(readHandle.getId(), uuid).get();
+        } catch (Exception e) {
+            // expected
+            Assert.assertTrue(e.getCause() instanceof AmazonServiceException);
+            
Assert.assertTrue(e.getCause().getMessage().contains(failureString));
+            // verify object still there.
+            Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, 
dataBlockOffloadKey(readHandle.getId(), uuid)));
+            Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, 
indexBlockOffloadKey(readHandle.getId(), uuid)));
+        }
+    }
 }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to