This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.wiki.git


The following commit(s) were added to refs/heads/master by this push:
     new 6890ce1  Created PIP-17: Tiered storage for Pulsar topics (markdown)
6890ce1 is described below

commit 6890ce1ce86695fc3216e9434b138fc29f177d3b
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Thu Apr 5 09:26:37 2018 -0700

    Created PIP-17: Tiered storage for Pulsar topics (markdown)
---
 PIP-17:-Tiered-storage-for-Pulsar-topics.md | 213 ++++++++++++++++++++++++++++
 1 file changed, 213 insertions(+)

diff --git a/PIP-17:-Tiered-storage-for-Pulsar-topics.md 
b/PIP-17:-Tiered-storage-for-Pulsar-topics.md
new file mode 100644
index 0000000..e6d20a9
--- /dev/null
+++ b/PIP-17:-Tiered-storage-for-Pulsar-topics.md
@@ -0,0 +1,213 @@
+
+## Motivation
+
+Pulsar stores topics in bookkeeper ledgers, henceforth referred to as segments 
to avoid confusion with “ManagedLedger”. BookKeeper segments are normally 
replicated to three nodes. This can get expensive if you want to store the data 
for a long time. Even more so, if these are fast disks.
+
+Bookkeeper segments have 3 primary access patterns.
+ * Writes: Need to have low latency
+ * Tailing reads: Need to have low latency
+ * Catchup reads: Latency is unimportant. Throughput is important but how 
important depends on the use case.
+
+BookKeeper uses the same type of storage for all three patterns. Moveover, 
bookkeeper uses the simplest form of replication available. It just makes 3(or 
N) copies and puts them on three different disks. The 3 copies are important 
while writing as it makes the voting protocol simple.
+
+However, once a segment has been sealed it is immutable. I.e. The set of 
entries, the content of those entries and the order of the entries can never 
change. As a result once a segment is sealed you don’t need store it on your 
expensive 3X replicated SSD cluster. It can be transferred to some object store 
using Reed-Solomon on HDDs.
+
+The remainder of this document covers how we will do this (in the context of 
Pulsar). 
+
+## Background
+
+Pulsar topics are stored in managed ledgers. Each topic (or partition of the 
topic, if the topic is partitioned), is stored on a single managed ledger. A 
managed ledger consists of a list of log segments, in a fixed order, oldest 
first. All segments in this list, apart from the most recent, are sealed. The 
most recent segment is the segment messages are currently being written to.
+
+For tiered storage, the unit of storage will be a segment.
+
+## Proposal
+
+### High level
+
+To implement tiered storage, Pulsar will copy segments as a whole from 
bookkeeper to an object store. Once the segment has been copied, we tag the 
segment in the managed ledger’s segment list with a key that identifies the 
segment in the object store. Once the tag has been added to the segment list, 
the segment can be deleted from bookkeeper.
+
+For Pulsar to read back and serve messages from the object store, we will 
provide a ReadHandle implementation which reads data from the object store. 
With this, Pulsar only needs to know whether it is reading from bookkeeper or 
the object store when it is constructing the ReadHandle.
+
+### Detail
+
+Tiered storage requires the following changes:
+
+ * An interface for offloading
+ * Modifications to managed ledger to use offloading
+ * Triggering mechanism
+ * Offloading implementation for S3
+
+#### SegmentOffloader interface
+
+```java
+interface SegmentOffloader {
+    CompletableFuture<byte[]> offload(ReadHandle segment,
+                                      Map<String, String> extraMetadata);
+    CompletableFuture<ReadHandle> readOffloaded(long ledgerId,
+                                                byte[] offloadContext);
+    CompletableFuture<Void> deleteOffloaded(long ledgerId,
+                                            byte[] offloadContext);
+}
+```
+
+The *offload* method should take a `ReadHandle` to a segment, and copy all the 
data in that segment to an object store. The method returns a context, 
containing extra information which can be used to lookup the object on read. 
For example, in the case of S3, this context could be used to specify a region 
or a bucket. From Pulsar’s point of view, this data is opaque. It is up to the 
implementation of the `SegmentOffloader` interface to serialize and deserialize 
the context. The context is [...]
+
+The `extraMetadata` parameter is used to add some extra information to the 
object in the data store. For example, it can be used to carry the name of the 
managed ledger the segment is part of and the version of the software doing the 
offloading. The metadata is not intended to be read back by Pulsar, but rather 
to be used in inspection and debugging of the object store.
+
+The `readOffloaded` method takes a ledger ID and a context and returns a 
`ReadHandle` which is backed by an object in the object store. Reading from 
this handle is identical to reading from the bookkeeper-backed `ReadHandle` 
passed into offload.
+
+The `deleteOffloaded` method deletes the segment from the object store.
+
+A `SegmentOffloader` implementation is passed as a parameter in a 
`ManagedLedgerConfig` object when opening a managed ledger via the managed 
ledger factory. If no implementation is specified, a default implementation is 
used which simply throws an exception on any usage.
+
+Segment offloading is implemented as an interface rather than directly in 
managed ledger to allow implementations to inspect and massage the data before 
it goes to the object store. For example, it may make sense for Pulsar to break 
out batch messages into individual messages before storing in the object store, 
so that the object can be used directly by some other system. Managed ledgers 
know nothing of these batches, nor should they, so the interface allows these 
concerns to be kept separate.
+
+
+### Modifications to ManagedLedger
+
+From the perspective of ManagedLedger, offloading looks very similar to 
trimming the head of the log, so it will follow the same pattern.
+
+However, trimming occurs in the background, after a specified retention time, 
whereas offloading is triggered directly by the client. Offloading after a 
specified retention time will likely be added in a later iteration.
+
+A new managed ledger call is provided.
+
+```java
+interface ManagedLedger {
+    void asyncOffloadPrefix(Position pos, OffloadCallback cb, Object ctx);
+}
+```
+
+This call selects all sealed segments in the managed ledger before the passed 
in position, offloads them to an object store, updates the metadata, and 
deletes the key.
+
+Updating the metadata involves setting a string field, `offloadKey`, in the 
`LedgerInfo` object associated with the segment. At the end of the operation, 
the metadata is written to zookeeper.
+
+The offload prefix operation is mutually exclusive with trimming operations.
+
+The `offloadKey` is used when creating opening segments for reading. If an 
`offloadKey` exists, then `readOffloaded` is called with the key. Otherwise the 
segment is opened from bookkeeper as is currently always the case.
+
+It is also used when an offloaded segment is trimmed from the head of the log.
+
+### Triggering an offload
+
+There will be a admin rest endpoint on the broker, to which a managed ledger 
position can be passed, and this will call `asyncOffloadPrefix` on the managed 
ledger.
+
+Offloading can also be enabled to occur automatically for all topics within a 
namespace. There are two new configurations on a namespace, 
`offloadTimeInMinutes` and `offloadSizeInMB` similar to the corresponding 
configurations for retention. 
+
+Setting `offloadTimeInMinutes` will cause a segment to be offloaded to 
longterm storage when it has been sealed for that many minutes. This value must 
be lower than `retentionTimeInMinutes`, as otherwise the segment would be 
deleted before being offloaded.
+
+Setting `offloadSizeInMB` will cause segments to be offloaded to longterm 
storage when the tail of the log reaches the specified size in MB. This value 
must be lower than `retentionSizeInMB`, as otherwise the segment would be 
deleted before being offloaded.
+
+### S3 Implementation of SegmentOffloader
+
+A single segment in bookkeeper maps to a single object in S3. The object has 
two parts, an index and payload. The payload contains all the entries of the 
segment.
+
+The schema version is stored as part of the S3 object user metadata.
+
+The object is written to S3 using a [multipart 
upload](https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadInitiate.html). 
The index is stored in part 1. The payload is split into 64MB blocks, and each 
block is uploaded as a “part”.
+
+S3’s API requires that the size of the uploaded part is specified before 
sending the data. Therefore, all blocks apart from the final block must be 
exactly 64MB. Blocks can only contain complete entries, so if an entry doesn’t 
fit into the end of a block, the remaining bytes must be padded, with the 
pattern 0xDEAD1234.
+
+The size of the final block can be calculated exactly before writing. The 
calculation is as follows.
+
+```
+(block header size)
+ + ((length of segment) - (sum of length of already written entries))
+ + ((number of entries to be written) * 4)
+```
+
+#### Payload format
+
+A block has a short header, followed by payload data.
+```
+[ magic_word ][ block_len ][ block_entry_count ][ first_entry_id ]
+```
+
+ * `magic_word` : 4 bytes, a sequence of bytes used to identify the start of a 
block
+ * `block_len : 4 bytes, the length of the block, including the header
+ * `block_entry_count` : 4 bytes, the number of entries contained in the block
+ * `first_entry_id` : 8 bytes, Entry ID of first entry in the block
+
+The payload data is a sequence of entries of the format
+
+```
+[ entry_len ][ entry_data ]
+```
+
+ * `entry_len` : 4 bytes, the length of the entry
+ * `entry_data` : as many bytes as specified in entry_len
+
+Padding may be added at the end to bring the block to the correct size.
+
+#### Index format
+
+The index is a short header and then sequence of mappings.
+
+The index header is
+```
+[ index_magic_word ][ index_len ][ index_entry_count ][ segment metadata ]
+```
+
+ * `index_magic_word` : 4 bytes, a sequence of bytes to identify the index
+ * `index_len` : 4 bytes, the total length of the index in bytes, including 
the header
+ * `index_entry_count` : 4 bytes, the total number of mappings in the index
+ * `segment_metadata_len` : 4 bytes, the length of the segment metadata
+ * `segment_metadata` : the binary representation of the segment metadata, 
stored in protobuf format
+
+The body of the index contains mappings from the entry id of the first entry 
in a block to the block id, and the offset of the block within the S3 object. 
For example:
+
+```
+[ entryId:   0 ] -> [ block: 2, offset: 0]
+[ entryId: 100 ] -> [ block: 3, offset: 12345 ]
+...
+```
+
+ * `entryId` is 8 bytes
+ * `block` is 4 bytes
+ * `offset` is 8 bytes
+
+Block id corresponds to S3 object part number. For the payload they start at 
2. The index is stored in part 1. The offset is the byte offset of block, from 
the start of the file.
+
+The offset for a block can be calculated as
+
+```
+(index length) + ((number of preceeding blocks) * (block size, 64MB in this 
case))
+``
+
+#### Writing to S3
+
+We use the S3 multipart API to upload a segment to S3. Entries are written to 
2MB byte buffers, and these are uploaded as parts, with part id starting from 
2. The first entry id of each block is recorded locally along with the part id 
and the length of the block.
+
+```java
+void writeToS3(ReadHandle segment) {
+  var uploadId = initiateS3MultiPartUpload();
+
+  var bytesWritten = 0;
+  var nextEntry = 0;
+  while (nextEntry <= segment.getLastAddConfirmed()) {
+    var blockSize = calcBlockSizeForRemainingEntries(
+            segment, bytesWritten, nextEntry);
+
+    var is = new BlockAwareSegmentInputStream(
+            segment, nextEntry, blockSize);
+    var partId = pushPartToS3(uploadId, blockSize, is);
+
+    addToIndex(firstEntry, partId, blockSize);
+
+    nextEntry = is.lastEntryWritten() + 1;
+    bytesWritten += is.entryBytesWritten();
+  }
+
+  buildAndWriteIndex(uploadId);
+  finalizeUpload(uploadId);
+}
+```
+
+The algorithm has roughly the form of the pseudocode above. As S3 doesn’t seem 
to have an asynchronous API which gives you fine grained access to multipart, 
the upload should run in a dedicated threadpool.
+
+The most important component when writing is `BlockAwareSegmentInputStream`. 
This allows the reader to read exactly `blockSize` bytes. It first returns the 
header, followed by complete entries from the segment in the format specified 
above. Once it can no longer writer a complete entry due to the `blockSize` 
limit, it outputs bytes the pattern `0xDEAD1234` until it reaches the limit. It 
tracks the ID of the last entry it has written as well as the sum of the 
cumulative entry bytes writte [...]
+
+#### Reading from S3
+
+To read back from S3 we make a read request for part 1 of the object to load 
the index. Then when a client makes request to read entries, we do a binary 
search into the index to find which blocks needed to be loaded and request 
those blocks.
+
+We don’t load the whole block each time while reading, as this would mean 
holding many 64MB blocks in memory when only a fraction of that is currently 
useful. Instead we read 1MB at a time from S3 using by setting the range on the 
get object request. To read an entries in the middle of a block, we read from 
the start of the block until we find that block. The offset of some of the read 
entries are cached and these are used to enhance the index, so we can quickly 
find these entries, or ne [...]

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to