This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 48e1131 In S3 offloader, dont use InputStream#available for stream length (#1807) 48e1131 is described below commit 48e11316170138da7d8bd511356bfa468f4f0426 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Tue May 22 18:01:02 2018 +0100 In S3 offloader, dont use InputStream#available for stream length (#1807) According to the javadoc, available() returns the number bytes that can be read without blocking. It is _not_ the total length of the stream. While this currently works for the index as it's backed by a byte buffer, we shouldn't rely on implicit assumptions like that. Master issue: #1511 --- .../pulsar/broker/s3offload/OffloadIndexBlock.java | 26 +++++++++++++++++++--- .../broker/s3offload/S3ManagedLedgerOffloader.java | 4 ++-- .../s3offload/impl/OffloadIndexBlockImpl.java | 12 +++++----- 3 files changed, 30 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java index 944edca..c7e71c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java @@ -19,8 +19,9 @@ package org.apache.pulsar.broker.s3offload; import java.io.Closeable; -import java.io.IOException; +import java.io.FilterInputStream; import java.io.InputStream; +import java.io.IOException; import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.common.annotation.InterfaceStability.Unstable; @@ -38,7 +39,7 @@ public interface OffloadIndexBlock extends Closeable { * | index_magic_header | index_block_len | index_entry_count | * | data_object_size | segment_metadata_length | segment metadata | index entries ... | */ - InputStream toStream() throws IOException; + IndexInputStream toStream() throws IOException; /** * Get the related OffloadIndexEntry that contains the given messageEntryId. @@ -59,9 +60,28 @@ public interface OffloadIndexBlock extends Closeable { */ LedgerMetadata getLedgerMetadata(); - /** + /* * Get the total size of the data object. */ long getDataObjectLength(); + + /** + * An input stream which knows the size of the stream upfront. + */ + public static class IndexInputStream extends FilterInputStream { + final long streamSize; + + public IndexInputStream(InputStream in, long streamSize) { + super(in); + this.streamSize = streamSize; + } + + /** + * @return the number of bytes in the stream. + */ + public long getStreamSize() { + return streamSize; + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java index ee82532..9cb1486 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java @@ -180,10 +180,10 @@ public class S3ManagedLedgerOffloader implements LedgerOffloader { // upload index block try (OffloadIndexBlock index = indexBuilder.withDataObjectLength(dataObjectLength).build(); - InputStream indexStream = index.toStream()) { + OffloadIndexBlock.IndexInputStream indexStream = index.toStream()) { // write the index block ObjectMetadata metadata = new ObjectMetadata(); - metadata.setContentLength(indexStream.available()); + metadata.setContentLength(indexStream.getStreamSize()); s3client.putObject(new PutObjectRequest( bucket, indexBlockKey, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java index 638edc4..3c5f337 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java @@ -28,6 +28,7 @@ import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import java.io.DataInputStream; import java.io.IOException; +import java.io.FilterInputStream; import java.io.InputStream; import java.util.ArrayList; import java.util.List; @@ -158,15 +159,12 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock { * |segment_metadata_len | segment metadata | index entries | */ @Override - public InputStream toStream() throws IOException { - int indexBlockLength; - int segmentMetadataLength; + public OffloadIndexBlock.IndexInputStream toStream() throws IOException { int indexEntryCount = this.indexEntries.size(); - byte[] ledgerMetadataByte = buildLedgerMetadataFormat(this.segmentMetadata); - segmentMetadataLength = ledgerMetadataByte.length; + int segmentMetadataLength = ledgerMetadataByte.length; - indexBlockLength = 4 /* magic header */ + int indexBlockLength = 4 /* magic header */ + 4 /* index block length */ + 8 /* data object length */ + 4 /* segment metadata length */ @@ -191,7 +189,7 @@ public class OffloadIndexBlockImpl implements OffloadIndexBlock { .writeInt(entry.getValue().getPartId()) .writeLong(entry.getValue().getOffset())); - return new ByteBufInputStream(out, true); + return new OffloadIndexBlock.IndexInputStream(new ByteBufInputStream(out, true), indexBlockLength); } static private class InternalLedgerMetadata implements LedgerMetadata { -- To stop receiving notification emails like this one, please contact mme...@apache.org.