This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 79b0e28  Store offloaded data object size in index (#1810)
79b0e28 is described below

commit 79b0e28d15e1e8c3fcdb4b29aee0956a3a31ebc8
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Mon May 21 09:54:28 2018 +0200

    Store offloaded data object size in index (#1810)
    
    We need the size of the data object to set a bound on the stream we
    read from S3. Without the size in the index we need to do an extra
    call to S3 which is undesirable.
    
    Master Issue: #1511
---
 .../apache/pulsar/broker/s3offload/OffloadIndexBlock.java  |  6 +++++-
 .../pulsar/broker/s3offload/OffloadIndexBlockBuilder.java  |  8 +++++++-
 .../pulsar/broker/s3offload/S3ManagedLedgerOffloader.java  |  7 +++++--
 .../s3offload/impl/OffloadIndexBlockBuilderImpl.java       | 12 ++++++++++--
 .../broker/s3offload/impl/OffloadIndexBlockImpl.java       | 14 +++++++++++++-
 .../broker/s3offload/impl/S3BackedReadHandleImpl.java      |  3 +--
 .../pulsar/broker/s3offload/impl/OffloadIndexTest.java     |  4 +++-
 7 files changed, 44 insertions(+), 10 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
index 8f9d3ce..944edca 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
@@ -36,7 +36,7 @@ public interface OffloadIndexBlock extends Closeable {
      * Get the content of the index block as InputStream.
      * Read out in format:
      *   | index_magic_header | index_block_len | index_entry_count |
-     *   |segment_metadata_length | segment metadata | index entries |
+     *   | data_object_size | segment_metadata_length | segment metadata | 
index entries ... |
      */
     InputStream toStream() throws IOException;
 
@@ -59,5 +59,9 @@ public interface OffloadIndexBlock extends Closeable {
      */
     LedgerMetadata getLedgerMetadata();
 
+    /**
+     * Get the total size of the data object.
+     */
+    long getDataObjectLength();
 }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
index 8ec0395..c60ce88 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
@@ -37,7 +37,7 @@ public interface OffloadIndexBlockBuilder {
      *
      * @param metadata the ledger metadata
      */
-    OffloadIndexBlockBuilder withMetadata(LedgerMetadata metadata);
+    OffloadIndexBlockBuilder withLedgerMetadata(LedgerMetadata metadata);
 
     /**
      * Add one payload block related information into index block.
@@ -52,6 +52,12 @@ public interface OffloadIndexBlockBuilder {
     OffloadIndexBlockBuilder addBlock(long firstEntryId, int partId, int 
blockSize);
 
     /**
+     * Specify the length of data object this index is associated with.
+     * @param dataObjectLength the length of the data object
+     */
+    OffloadIndexBlockBuilder withDataObjectLength(long dataObjectLength);
+
+    /**
      * Finalize the immutable OffloadIndexBlock
      */
     OffloadIndexBlock build();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
index 7a73a3b..276488d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
@@ -110,7 +110,7 @@ public class S3ManagedLedgerOffloader implements 
LedgerOffloader {
         CompletableFuture<Void> promise = new CompletableFuture<>();
         scheduler.submit(() -> {
             OffloadIndexBlockBuilder indexBuilder = 
OffloadIndexBlockBuilder.create()
-                .withMetadata(readHandle.getLedgerMetadata());
+                .withLedgerMetadata(readHandle.getLedgerMetadata());
             String dataBlockKey = dataBlockOffloadKey(readHandle.getId(), 
uuid);
             String indexBlockKey = indexBlockOffloadKey(readHandle.getId(), 
uuid);
             InitiateMultipartUploadRequest dataBlockReq = new 
InitiateMultipartUploadRequest(bucket, dataBlockKey);
@@ -124,6 +124,7 @@ public class S3ManagedLedgerOffloader implements 
LedgerOffloader {
                 return;
             }
 
+            long dataObjectLength = 0;
             // start multi part upload for data block.
             try {
                 long startEntry = 0;
@@ -157,6 +158,8 @@ public class S3ManagedLedgerOffloader implements 
LedgerOffloader {
                         entryBytesWritten += 
blockStream.getBlockEntryBytesCount();
                         partId++;
                     }
+
+                    dataObjectLength += blockSize;
                 }
 
                 s3client.completeMultipartUpload(new 
CompleteMultipartUploadRequest()
@@ -171,7 +174,7 @@ public class S3ManagedLedgerOffloader implements 
LedgerOffloader {
             }
 
             // upload index block
-            try (OffloadIndexBlock index = indexBuilder.build();
+            try (OffloadIndexBlock index = 
indexBuilder.withDataObjectLength(dataObjectLength).build();
                  InputStream indexStream = index.toStream()) {
                 // write the index block
                 ObjectMetadata metadata = new ObjectMetadata();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
index 766083d..3b0f899 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
@@ -34,6 +34,7 @@ import 
org.apache.pulsar.broker.s3offload.OffloadIndexBlockBuilder;
 public class OffloadIndexBlockBuilderImpl implements OffloadIndexBlockBuilder {
 
     private LedgerMetadata ledgerMetadata;
+    private long dataObjectLength;
     private List<OffloadIndexEntryImpl> entries;
     private int lastBlockSize;
 
@@ -42,7 +43,13 @@ public class OffloadIndexBlockBuilderImpl implements 
OffloadIndexBlockBuilder {
     }
 
     @Override
-    public OffloadIndexBlockBuilder withMetadata(LedgerMetadata metadata) {
+    public OffloadIndexBlockBuilder withDataObjectLength(long 
dataObjectLength) {
+        this.dataObjectLength = dataObjectLength;
+        return this;
+    }
+
+    @Override
+    public OffloadIndexBlockBuilder withLedgerMetadata(LedgerMetadata 
metadata) {
         this.ledgerMetadata = metadata;
         return this;
     }
@@ -73,7 +80,8 @@ public class OffloadIndexBlockBuilderImpl implements 
OffloadIndexBlockBuilder {
     public OffloadIndexBlock build() {
         checkState(ledgerMetadata != null);
         checkState(!entries.isEmpty());
-        return OffloadIndexBlockImpl.get(ledgerMetadata, entries);
+        checkState(dataObjectLength > 0);
+        return OffloadIndexBlockImpl.get(ledgerMetadata, dataObjectLength, 
entries);
     }
 
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
index f43c503..638edc4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
@@ -54,6 +54,7 @@ public class OffloadIndexBlockImpl implements 
OffloadIndexBlock {
     private static final int INDEX_MAGIC_WORD = 0xDE47DE47;
 
     private LedgerMetadata segmentMetadata;
+    private long dataObjectLength;
     private TreeMap<Long, OffloadIndexEntryImpl> indexEntries;
 
     private final Handle<OffloadIndexBlockImpl> recyclerHandle;
@@ -69,12 +70,14 @@ public class OffloadIndexBlockImpl implements 
OffloadIndexBlock {
         this.recyclerHandle = recyclerHandle;
     }
 
-    public static OffloadIndexBlockImpl get(LedgerMetadata metadata, 
List<OffloadIndexEntryImpl> entries) {
+    public static OffloadIndexBlockImpl get(LedgerMetadata metadata, long 
dataObjectLength,
+                                            List<OffloadIndexEntryImpl> 
entries) {
         OffloadIndexBlockImpl block = RECYCLER.get();
         block.indexEntries = Maps.newTreeMap();
         entries.forEach(entry -> 
block.indexEntries.putIfAbsent(entry.getEntryId(), entry));
         checkState(entries.size() == block.indexEntries.size());
         block.segmentMetadata = metadata;
+        block.dataObjectLength = dataObjectLength;
         return block;
     }
 
@@ -86,6 +89,7 @@ public class OffloadIndexBlockImpl implements 
OffloadIndexBlock {
     }
 
     public void recycle() {
+        dataObjectLength = -1;
         segmentMetadata = null;
         indexEntries.clear();
         indexEntries = null;
@@ -116,6 +120,11 @@ public class OffloadIndexBlockImpl implements 
OffloadIndexBlock {
         return this.segmentMetadata;
     }
 
+    @Override
+    public long getDataObjectLength() {
+        return this.dataObjectLength;
+    }
+
     private static byte[] buildLedgerMetadataFormat(LedgerMetadata metadata) {
         LedgerMetadataFormat.Builder builder = 
LedgerMetadataFormat.newBuilder();
         builder.setQuorumSize(metadata.getWriteQuorumSize())
@@ -159,6 +168,7 @@ public class OffloadIndexBlockImpl implements 
OffloadIndexBlock {
 
         indexBlockLength = 4 /* magic header */
             + 4 /* index block length */
+            + 8 /* data object length */
             + 4 /* segment metadata length */
             + 4 /* index entry count */
             + segmentMetadataLength
@@ -168,6 +178,7 @@ public class OffloadIndexBlockImpl implements 
OffloadIndexBlock {
 
         out.writeInt(INDEX_MAGIC_WORD)
             .writeInt(indexBlockLength)
+            .writeLong(dataObjectLength)
             .writeInt(segmentMetadataLength)
             .writeInt(indexEntryCount);
 
@@ -306,6 +317,7 @@ public class OffloadIndexBlockImpl implements 
OffloadIndexBlock {
                                                 magic, INDEX_MAGIC_WORD));
         }
         int indexBlockLength = dis.readInt();
+        this.dataObjectLength = dis.readLong();
         int segmentMetadataLength = dis.readInt();
         int indexEntryCount = dis.readInt();
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
index 037ea67..984af59 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/S3BackedReadHandleImpl.java
@@ -197,9 +197,8 @@ public class S3BackedReadHandleImpl implements ReadHandle {
             OffloadIndexBlockBuilder indexBuilder = 
OffloadIndexBlockBuilder.create();
             OffloadIndexBlock index = 
indexBuilder.fromStream(obj.getObjectContent());
 
-            ObjectMetadata dataMetadata = s3client.getObjectMetadata(bucket, 
key); // FIXME: this should be part of index
             S3BackedInputStream inputStream = new 
S3BackedInputStreamImpl(s3client, bucket, key,
-                                                                          
dataMetadata.getContentLength(),
+                                                                          
index.getDataObjectLength(),
                                                                           
readBufferSize);
             return new S3BackedReadHandleImpl(ledgerId, index, inputStream, 
executor);
         }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java
index 916c4cd..af9d6e4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java
@@ -108,7 +108,7 @@ public class OffloadIndexTest {
         LedgerMetadata metadata = createLedgerMetadata();
         log.debug("created metadata: {}", metadata.toString());
 
-        blockBuilder.withMetadata(metadata);
+        blockBuilder.withLedgerMetadata(metadata).withDataObjectLength(1);
 
         blockBuilder.addBlock(0, 2, 64 * 1024 * 1024);
         blockBuilder.addBlock(1000, 3, 64 * 1024 * 1024);
@@ -161,6 +161,7 @@ public class OffloadIndexTest {
         ByteBuf wrapper = Unpooled.wrappedBuffer(b);
         int magic = wrapper.readInt();
         int indexBlockLength = wrapper.readInt();
+        long dataObjectLength = wrapper.readLong();
         int segmentMetadataLength = wrapper.readInt();
         int indexEntryCount = wrapper.readInt();
 
@@ -168,6 +169,7 @@ public class OffloadIndexTest {
         assertEquals(magic, OffloadIndexBlockImpl.getIndexMagicWord());
         assertEquals(indexBlockLength, readoutLen);
         assertEquals(indexEntryCount, 3);
+        assertEquals(dataObjectLength, 1);
 
         wrapper.readBytes(segmentMetadataLength);
         log.debug("magic: {}, blockLength: {}, metadataLength: {}, indexCount: 
{}",

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to