This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 79b0e28 Store offloaded data object size in index (#1810) 79b0e28 is described below commit 79b0e28d15e1e8c3fcdb4b29aee0956a3a31ebc8 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Mon May 21 09:54:28 2018 +0200 Store offloaded data object size in index (#1810) We need the size of the data object to set a bound on the stream we read from S3. Without the size in the index we need to do an extra call to S3 which is undesirable. Master Issue: #1511 --- .../apache/pulsar/broker/s3offload/OffloadIndexBlock.java | 6 +++++- .../pulsar/broker/s3offload/OffloadIndexBlockBuilder.java | 8 +++++++- .../pulsar/broker/s3offload/S3ManagedLedgerOffloader.java | 7 +++++-- .../s3offload/impl/OffloadIndexBlockBuilderImpl.java | 12 ++++++++++-- .../broker/s3offload/impl/OffloadIndexBlockImpl.java | 14 +++++++++++++- .../broker/s3offload/impl/S3BackedReadHandleImpl.java | 3 +-- .../pulsar/broker/s3offload/impl/OffloadIndexTest.java | 4 +++- 7 files changed, 44 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java index 8f9d3ce..944edca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java @@ -36,7 +36,7 @@ public interface OffloadIndexBlock extends Closeable { * Get the content of the index block as InputStream. * Read out in format: * | index_magic_header | index_block_len | index_entry_count | - * |segment_metadata_length | segment metadata | index entries | + * | data_object_size | segment_metadata_length | segment metadata | index entries ... | */ InputStream toStream() throws IOException; @@ -59,5 +59,9 @@ public interface OffloadIndexBlock extends Closeable { */ LedgerMetadata getLedgerMetadata(); + /** + * Get the total size of the data object. + */ + long getDataObjectLength(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java index 8ec0395..c60ce88 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java @@ -37,7 +37,7 @@ public interface OffloadIndexBlockBuilder { * * @param metadata the ledger metadata */ - OffloadIndexBlockBuilder withMetadata(LedgerMetadata metadata); + OffloadIndexBlockBuilder withLedgerMetadata(LedgerMetadata metadata); /** * Add one payload block related information into index block. @@ -52,6 +52,12 @@ public interface OffloadIndexBlockBuilder { OffloadIndexBlockBuilder addBlock(long firstEntryId, int partId, int blockSize); /** + * Specify the length of data object this index is associated with. + * @param dataObjectLength the length of the data object + */ + OffloadIndexBlockBuilder withDataObjectLength(long dataObjectLength); + + /** * Finalize the immutable OffloadIndexBlock */ OffloadIndexBlock build(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java index 7a73a3b..276488d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java @@ -110,7 +110,7 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { CompletableFuture<Void> promise = new CompletableFuture<>(); scheduler.submit(() -> { OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create() - .withMetadata(readHandle.getLedgerMetadata()); + .withLedgerMetadata(readHandle.getLedgerMetadata()); String dataBlockKey = dataBlockOffloadKey(readHandle.getId(), uuid); String indexBlockKey = indexBlockOffloadKey(readHandle.getId(), uuid); InitiateMultipartUploadRequest dataBlockReq = new InitiateMultipartUploadRequest(bucket, dataBlockKey); @@ -124,6 +124,7 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { return; } + long dataObjectLength = 0; // start multi part upload for data block. try { long startEntry = 0; @@ -157,6 +158,8 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { entryBytesWritten += blockStream.getBlockEntryBytesCount(); partId++; } + + dataObjectLength += blockSize; } s3client.completeMultipartUpload(new CompleteMultipartUploadRequest() @@ -171,7 +174,7 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { } // upload index block - try (OffloadIndexBlock index = indexBuilder.build(); + try (OffloadIndexBlock index = indexBuilder.withDataObjectLength(dataObjectLength).build(); InputStream indexStream = index.toStream()) { // write the index block ObjectMetadata metadata = new ObjectMetadata(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java index 766083d..3b0f899 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java @@ -34,6 +34,7 @@ import org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder; public class OffloadIndexBlockBuilderImpl implements OffloadIndexBlockBuilder { private LedgerMetadata ledgerMetadata; + private long dataObjectLength; private List<OffloadIndexEntryImpl> entries; private int lastBlockSize; @@ -42,7 +43,13 @@ public class OffloadIndexBlockBuilderImpl implements OffloadIndexBlockBuilder { } @Override - public OffloadIndexBlockBuilder withMetadata(LedgerMetadata metadata) { + public OffloadIndexBlockBuilder withDataObjectLength(long dataObjectLength) { + this.dataObjectLength = dataObjectLength; + return this; + } + + @Override + public OffloadIndexBlockBuilder withLedgerMetadata(LedgerMetadata metadata) { this.ledgerMetadata = metadata; return this; } @@ -73,7 +80,8 @@ public class OffloadIndexBlockBuilderImpl implements OffloadIndexBlockBuilder { public OffloadIndexBlock build() { checkState(ledgerMetadata != null); checkState(!entries.isEmpty()); - return OffloadIndexBlockImpl.get(ledgerMetadata, entries); + checkState(dataObjectLength > 0); + return OffloadIndexBlockImpl.get(ledgerMetadata, dataObjectLength, entries); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java index f43c503..638edc4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java @@ -54,6 +54,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock { private static final int INDEX_MAGIC_WORD = 0xDE47DE47; private LedgerMetadata segmentMetadata; + private long dataObjectLength; private TreeMap<Long, OffloadIndexEntryImpl> indexEntries; private final Handle<OffloadIndexBlockImpl> recyclerHandle; @@ -69,12 +70,14 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock { this.recyclerHandle = recyclerHandle; } - public static OffloadIndexBlockImpl get(LedgerMetadata metadata, List<OffloadIndexEntryImpl> entries) { + public static OffloadIndexBlockImpl get(LedgerMetadata metadata, long dataObjectLength, + List<OffloadIndexEntryImpl> entries) { OffloadIndexBlockImpl block = RECYCLER.get(); block.indexEntries = Maps.newTreeMap(); entries.forEach(entry -> block.indexEntries.putIfAbsent(entry.getEntryId(), entry)); checkState(entries.size() == block.indexEntries.size()); block.segmentMetadata = metadata; + block.dataObjectLength = dataObjectLength; return block; } @@ -86,6 +89,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock { } public void recycle() { + dataObjectLength = -1; segmentMetadata = null; indexEntries.clear(); indexEntries = null; @@ -116,6 +120,11 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock { return this.segmentMetadata; } + @Override + public long getDataObjectLength() { + return this.dataObjectLength; + } + private static byte[] buildLedgerMetadataFormat(LedgerMetadata metadata) { LedgerMetadataFormat.Builder builder = LedgerMetadataFormat.newBuilder(); builder.setQuorumSize(metadata.getWriteQuorumSize()) @@ -159,6 +168,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock { indexBlockLength = 4 /* magic header */ + 4 /* index block length */ + + 8 /* data object length */ + 4 /* segment metadata length */ + 4 /* index entry count */ + segmentMetadataLength @@ -168,6 +178,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock { out.writeInt(INDEX_MAGIC_WORD) .writeInt(indexBlockLength) + .writeLong(dataObjectLength) .writeInt(segmentMetadataLength) .writeInt(indexEntryCount); @@ -306,6 +317,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock { magic, INDEX_MAGIC_WORD)); } int indexBlockLength = dis.readInt(); + this.dataObjectLength = dis.readLong(); int segmentMetadataLength = dis.readInt(); int indexEntryCount = dis.readInt(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java index 037ea67..984af59 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java @@ -197,9 +197,8 @@ public class S3BackedReadHandleImpl implements ReadHandle { OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create(); OffloadIndexBlock index = indexBuilder.fromStream(obj.getObjectContent()); - ObjectMetadata dataMetadata = s3client.getObjectMetadata(bucket, key); // FIXME: this should be part of index S3BackedInputStream inputStream = new S3BackedInputStreamImpl(s3client, bucket, key, - dataMetadata.getContentLength(), + index.getDataObjectLength(), readBufferSize); return new S3BackedReadHandleImpl(ledgerId, index, inputStream, executor); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java index 916c4cd..af9d6e4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java @@ -108,7 +108,7 @@ public class OffloadIndexTest { LedgerMetadata metadata = createLedgerMetadata(); log.debug("created metadata: {}", metadata.toString()); - blockBuilder.withMetadata(metadata); + blockBuilder.withLedgerMetadata(metadata).withDataObjectLength(1); blockBuilder.addBlock(0, 2, 64 * 1024 * 1024); blockBuilder.addBlock(1000, 3, 64 * 1024 * 1024); @@ -161,6 +161,7 @@ public class OffloadIndexTest { ByteBuf wrapper = Unpooled.wrappedBuffer(b); int magic = wrapper.readInt(); int indexBlockLength = wrapper.readInt(); + long dataObjectLength = wrapper.readLong(); int segmentMetadataLength = wrapper.readInt(); int indexEntryCount = wrapper.readInt(); @@ -168,6 +169,7 @@ public class OffloadIndexTest { assertEquals(magic, OffloadIndexBlockImpl.getIndexMagicWord()); assertEquals(indexBlockLength, readoutLen); assertEquals(indexEntryCount, 3); + assertEquals(dataObjectLength, 1); wrapper.readBytes(segmentMetadataLength); log.debug("magic: {}, blockLength: {}, metadataLength: {}, indexCount: {}", -- To stop receiving notification emails like this one, please contact si...@apache.org.