This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 77d936e When offloading to S3 add version information (#1876) 77d936e is described below commit 77d936e65b1bf6a8f6fc52e1a51670851fe90e20 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Thu May 31 19:51:22 2018 +0200 When offloading to S3 add version information (#1876) When we offload ledgers to S3, we create 2 objects, an index and a data object. This patch adds version information for the format, the version of the software and the gitsha of the software to each, for debugging purposes. On reads, it the format version is a version higher than the broker knows about the read is rejected. Master Issue: #1511 --- .../broker/s3offload/S3ManagedLedgerOffloader.java | 36 ++++++++++- .../s3offload/impl/S3BackedInputStreamImpl.java | 7 +++ .../s3offload/impl/S3BackedReadHandleImpl.java | 5 ++ .../broker/s3offload/S3BackedInputStreamTest.java | 20 +++++-- .../s3offload/S3ManagedLedgerOffloaderTest.java | 69 ++++++++++++++++++++++ 5 files changed, 131 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java index dcfa9e8..73bb78a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java @@ -33,6 +33,7 @@ import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartResult; import com.google.common.base.Strings; import java.io.InputStream; +import java.io.IOException; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -45,6 +46,7 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.s3offload.impl.BlockAwareSegmentInputStreamImpl; import org.apache.pulsar.broker.s3offload.impl.S3BackedReadHandleImpl; +import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +54,20 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { private static final Logger log = LoggerFactory.getLogger(S3ManagedLedgerOffloader.class); public static final String DRIVER_NAME = "S3"; + + static final String METADATA_FORMAT_VERSION_KEY = "S3ManagedLedgerOffloaderFormatVersion"; + static final String METADATA_SOFTWARE_VERSION_KEY = "S3ManagedLedgerOffloaderSoftwareVersion"; + static final String METADATA_SOFTWARE_GITSHA_KEY = "S3ManagedLedgerOffloaderSoftwareGitSha"; + static final String CURRENT_VERSION = String.valueOf(1); + + private final VersionCheck VERSION_CHECK = (key, metadata) -> { + String version = metadata.getUserMetadata().get(METADATA_FORMAT_VERSION_KEY); + if (version == null || !version.equals(CURRENT_VERSION)) { + throw new IOException(String.format("Invalid object version %s for %s, expect %s", + version, key, CURRENT_VERSION)); + } + }; + private final OrderedScheduler scheduler; private final AmazonS3 s3client; private final String bucket; @@ -124,7 +140,11 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { .withDataBlockHeaderLength(BlockAwareSegmentInputStreamImpl.getHeaderSize()); String dataBlockKey = dataBlockOffloadKey(readHandle.getId(), uuid); String indexBlockKey = indexBlockOffloadKey(readHandle.getId(), uuid); - InitiateMultipartUploadRequest dataBlockReq = new InitiateMultipartUploadRequest(bucket, dataBlockKey, new ObjectMetadata()); + + ObjectMetadata dataMetadata = new ObjectMetadata(); + addVersionInfo(dataMetadata); + + InitiateMultipartUploadRequest dataBlockReq = new InitiateMultipartUploadRequest(bucket, dataBlockKey, dataMetadata); InitiateMultipartUploadResult dataBlockRes = null; // init multi part upload for data block. @@ -195,6 +215,8 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { // write the index block ObjectMetadata metadata = new ObjectMetadata(); metadata.setContentLength(indexStream.getStreamSize()); + addVersionInfo(metadata); + s3client.putObject(new PutObjectRequest( bucket, indexBlockKey, @@ -225,6 +247,7 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { promise.complete(S3BackedReadHandleImpl.open(scheduler.chooseThread(ledgerId), s3client, bucket, key, indexKey, + VERSION_CHECK, ledgerId, readBufferSize)); } catch (Throwable t) { promise.completeExceptionally(t); @@ -233,6 +256,13 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { return promise; } + private static void addVersionInfo(ObjectMetadata metadata) { + metadata.getUserMetadata().put(METADATA_FORMAT_VERSION_KEY, CURRENT_VERSION); + metadata.getUserMetadata().put(METADATA_SOFTWARE_VERSION_KEY, + PulsarBrokerVersionStringUtils.getNormalizedVersionString()); + metadata.getUserMetadata().put(METADATA_SOFTWARE_GITSHA_KEY, PulsarBrokerVersionStringUtils.getGitSha()); + } + @Override public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uid) { CompletableFuture<Void> promise = new CompletableFuture<>(); @@ -249,6 +279,10 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { return promise; } + + public interface VersionCheck { + void check(String key, ObjectMetadata md) throws IOException; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedInputStreamImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedInputStreamImpl.java index 0c5e3df..912a1d5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedInputStreamImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedInputStreamImpl.java @@ -30,6 +30,8 @@ import java.io.InputStream; import java.io.IOException; import org.apache.pulsar.broker.s3offload.S3BackedInputStream; +import org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader.VersionCheck; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +41,7 @@ public class S3BackedInputStreamImpl extends S3BackedInputStream { private final AmazonS3 s3client; private final String bucket; private final String key; + private final VersionCheck versionCheck; private final ByteBuf buffer; private final long objectLen; private final int bufferSize; @@ -46,10 +49,12 @@ public class S3BackedInputStreamImpl extends S3BackedInputStream { private long cursor; public S3BackedInputStreamImpl(AmazonS3 s3client, String bucket, String key, + VersionCheck versionCheck, long objectLen, int bufferSize) { this.s3client = s3client; this.bucket = bucket; this.key = key; + this.versionCheck = versionCheck; this.buffer = PooledByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize); this.objectLen = objectLen; this.bufferSize = bufferSize; @@ -72,6 +77,8 @@ public class S3BackedInputStreamImpl extends S3BackedInputStream { .withRange(startRange, endRange); log.debug("Reading range {}-{} from {}/{}", startRange, endRange, bucket, key); try (S3Object obj = s3client.getObject(req)) { + versionCheck.check(key, obj.getObjectMetadata()); + Long[] range = obj.getObjectMetadata().getContentRange(); long bytesRead = range[1] - range[0] + 1; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java index 984af59..65acbb8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java @@ -48,6 +48,7 @@ import org.apache.pulsar.broker.s3offload.OffloadIndexBlock; import org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder; import org.apache.pulsar.broker.s3offload.OffloadIndexEntry; import org.apache.pulsar.broker.s3offload.S3BackedInputStream; +import org.apache.pulsar.broker.s3offload.S3ManagedLedgerOffloader.VersionCheck; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -190,14 +191,18 @@ public class S3BackedReadHandleImpl implements ReadHandle { public static ReadHandle open(ScheduledExecutorService executor, AmazonS3 s3client, String bucket, String key, String indexKey, + VersionCheck versionCheck, long ledgerId, int readBufferSize) throws AmazonClientException, IOException { GetObjectRequest req = new GetObjectRequest(bucket, indexKey); try (S3Object obj = s3client.getObject(req)) { + versionCheck.check(indexKey, obj.getObjectMetadata()); + OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create(); OffloadIndexBlock index = indexBuilder.fromStream(obj.getObjectContent()); S3BackedInputStream inputStream = new S3BackedInputStreamImpl(s3client, bucket, key, + versionCheck, index.getDataObjectLength(), readBufferSize); return new S3BackedReadHandleImpl(ledgerId, index, inputStream, executor); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3BackedInputStreamTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3BackedInputStreamTest.java index 1155c2b..9f06ee8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3BackedInputStreamTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3BackedInputStreamTest.java @@ -91,7 +91,9 @@ class S3BackedInputStreamTest extends S3TestBase { metadata.setContentLength(objectSize); s3client.putObject(BUCKET, objectKey, toWrite, metadata); - S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, objectSize, 1000); + S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, + (key, md) -> {}, + objectSize, 1000); assertStreamsMatch(toTest, toCompare); } @@ -106,13 +108,17 @@ class S3BackedInputStreamTest extends S3TestBase { metadata.setContentLength(objectSize); s3client.putObject(BUCKET, objectKey, toWrite, metadata); - S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, objectSize, 1000); + S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, + (key, md) -> {}, + objectSize, 1000); assertStreamsMatchByBytes(toTest, toCompare); } @Test(expectedExceptions = IOException.class) public void testErrorOnS3Read() throws Exception { - S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, "doesn't exist", 1234, 1000); + S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, "doesn't exist", + (key, md) -> {}, + 1234, 1000); toTest.read(); } @@ -136,7 +142,9 @@ class S3BackedInputStreamTest extends S3TestBase { metadata.setContentLength(objectSize); s3client.putObject(BUCKET, objectKey, toWrite, metadata); - S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, objectSize, 1000); + S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, + (key, md) -> {}, + objectSize, 1000); for (Map.Entry<Integer, InputStream> e : seeks.entrySet()) { toTest.seek(e.getKey()); assertStreamsMatch(toTest, e.getValue()); @@ -153,7 +161,9 @@ class S3BackedInputStreamTest extends S3TestBase { metadata.setContentLength(objectSize); s3client.putObject(BUCKET, objectKey, toWrite, metadata); - S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, objectSize, 1000); + S3BackedInputStream toTest = new S3BackedInputStreamImpl(s3client, BUCKET, objectKey, + (key, md) -> {}, + objectSize, 1000); // seek forward to middle long middle = objectSize/2; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java index 1b5ad03..ab78f07 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloaderTest.java @@ -25,8 +25,11 @@ import static org.mockito.Matchers.anyLong; import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; import io.netty.util.concurrent.DefaultThreadFactory; import java.lang.reflect.Method; +import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Random; @@ -447,5 +450,71 @@ class S3ManagedLedgerOffloaderTest extends S3TestBase { Assert.assertEquals(e.getCause().getClass(), IllegalArgumentException.class); } } + + @Test + public void testReadUnknownDataVersion() throws Exception { + ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1); + LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); + UUID uuid = UUID.randomUUID(); + offloader.offload(toWrite, uuid, new HashMap<>()).get(); + + String dataKey = dataBlockOffloadKey(toWrite.getId(), uuid); + ObjectMetadata md = s3client.getObjectMetadata(BUCKET, dataKey); + md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(-12345)); + s3client.copyObject(new CopyObjectRequest(BUCKET, dataKey, BUCKET, dataKey).withNewObjectMetadata(md)); + + try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), uuid).get()) { + toRead.readAsync(0, 0).get(); + Assert.fail("Shouldn't have been able to read"); + } catch (ExecutionException e) { + Assert.assertEquals(e.getCause().getClass(), IOException.class); + Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version")); + } + + md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(12345)); + s3client.copyObject(new CopyObjectRequest(BUCKET, dataKey, BUCKET, dataKey).withNewObjectMetadata(md)); + + try (ReadHandle toRead = offloader.readOffloaded(toWrite.getId(), uuid).get()) { + toRead.readAsync(0, 0).get(); + Assert.fail("Shouldn't have been able to read"); + } catch (ExecutionException e) { + Assert.assertEquals(e.getCause().getClass(), IOException.class); + Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version")); + } + } + + @Test + public void testReadUnknownIndexVersion() throws Exception { + ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1); + LedgerOffloader offloader = new S3ManagedLedgerOffloader(s3client, BUCKET, scheduler, + DEFAULT_BLOCK_SIZE, DEFAULT_READ_BUFFER_SIZE); + UUID uuid = UUID.randomUUID(); + offloader.offload(toWrite, uuid, new HashMap<>()).get(); + + String indexKey = indexBlockOffloadKey(toWrite.getId(), uuid); + ObjectMetadata md = s3client.getObjectMetadata(BUCKET, indexKey); + md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(-12345)); + s3client.copyObject(new CopyObjectRequest(BUCKET, indexKey, BUCKET, indexKey).withNewObjectMetadata(md)); + + try { + offloader.readOffloaded(toWrite.getId(), uuid).get(); + Assert.fail("Shouldn't have been able to open"); + } catch (ExecutionException e) { + Assert.assertEquals(e.getCause().getClass(), IOException.class); + Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version")); + } + + md.getUserMetadata().put(S3ManagedLedgerOffloader.METADATA_FORMAT_VERSION_KEY, String.valueOf(12345)); + s3client.copyObject(new CopyObjectRequest(BUCKET, indexKey, BUCKET, indexKey).withNewObjectMetadata(md)); + + try { + offloader.readOffloaded(toWrite.getId(), uuid).get(); + Assert.fail("Shouldn't have been able to open"); + } catch (ExecutionException e) { + Assert.assertEquals(e.getCause().getClass(), IOException.class); + Assert.assertTrue(e.getCause().getMessage().contains("Invalid object version")); + } + } } -- To stop receiving notification emails like this one, please contact si...@apache.org.