sijie closed pull request #1784: PIP-17: impl deleteOffloaded() for S3ManagedLedgerOffloader URL: https://github.com/apache/incubator-pulsar/pull/1784
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java index 4d5b388a59..7a73a3bbc8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java @@ -18,13 +18,12 @@ */ package org.apache.pulsar.broker.s3offload; -import com.amazonaws.AmazonServiceException; -import com.amazonaws.SdkClientException; import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; import com.amazonaws.services.s3.model.ObjectMetadata; @@ -212,7 +211,19 @@ static String indexBlockOffloadKey(long ledgerId, UUID uuid) { @Override public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) { CompletableFuture<Void> promise = new CompletableFuture<>(); - promise.completeExceptionally(new UnsupportedOperationException()); + scheduler.submit(() -> { + try { + + s3client.deleteObjects(new DeleteObjectsRequest(bucket) + .withKeys(dataBlockOffloadKey(ledgerId, uid), indexBlockOffloadKey(ledgerId, uid))); + promise.complete(null); + } catch (Throwable t) { + log.error("Failed delete s3 Object ", t); + promise.completeExceptionally(t); + return; + } + }); + return promise; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java index f9a043b179..ed44ddc586 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java @@ -25,12 +25,9 @@ import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.AmazonS3; import io.netty.util.concurrent.DefaultThreadFactory; - -import java.io.DataInputStream; import java.lang.reflect.Method; import java.util.HashMap; import java.util.Iterator; -import java.util.Map; import java.util.Random; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -38,7 +35,6 @@ import java.util.concurrent.ScheduledExecutorService; import lombok.extern.slf4j.Slf4j; - import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; @@ -52,9 +48,7 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; -import org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl; import org.apache.pulsar.broker.s3offload.impl.DataBlockHeaderImpl; -import org.apache.pulsar.broker.s3offload.impl.OffloadIndexBlockImpl; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; @@ -370,5 +364,54 @@ public void testOffloadReadInvalidEntryIds() throws Exception { } catch (BKException.BKIncorrectParameterException e) { } } + + @Test + public void testDeleteOffloaded() throws Exception { + int maxBlockSize = 1024; + int entryCount = 3; + ReadHandle readHandle = buildReadHandle(maxBlockSize, entryCount); + UUID uuid = UUID.randomUUID(); + LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, maxBlockSize, DEFAULT_READ_BUFFER_SIZE); + + // verify object exist after offload + offloader.offload(readHandle, uuid, new HashMap<>()).get(); + Assert.assertTrue(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertTrue(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + + // verify object deleted after delete + offloader.deleteOffloaded(readHandle.getId(), uuid).get(); + Assert.assertFalse(s3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertFalse(s3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + } + + @Test + public void testDeleteOffloadedFail() throws Exception { + int maxBlockSize = 1024; + int entryCount = 3; + ReadHandle readHandle = buildReadHandle(maxBlockSize, entryCount); + UUID uuid = UUID.randomUUID(); + LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, maxBlockSize, DEFAULT_READ_BUFFER_SIZE); + String failureString = "fail deleteOffloaded"; + AmazonS3 mockS3client = Mockito.spy(s3client); + Mockito + .doThrow(new AmazonServiceException(failureString)) + .when(mockS3client).deleteObjects(any()); + + try { + // verify object exist after offload + offloader.offload(readHandle, uuid, new HashMap<>()).get(); + Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + + offloader.deleteOffloaded(readHandle.getId(), uuid).get(); + } catch (Exception e) { + // expected + Assert.assertTrue(e.getCause() instanceof AmazonServiceException); + Assert.assertTrue(e.getCause().getMessage().contains(failureString)); + // verify object still there. + Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, dataBlockOffloadKey(readHandle.getId(), uuid))); + Assert.assertTrue(mockS3client.doesObjectExist(BUCKET, indexBlockOffloadKey(readHandle.getId(), uuid))); + } + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services