Author: adulceanu Date: Wed Aug 9 12:47:32 2017 New Revision: 1804511 URL: http://svn.apache.org/viewvc?rev=1804511&view=rev Log: OAK-5902 - Cold standby should allow syncing of blobs bigger than 2.2 GB Implemented chunking algorithm on client and server
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponse.java jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoder.java jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java?rev=1804511&r1=1804510&r2=1804511&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java Wed Aug 9 12:47:32 2017 @@ -17,6 +17,7 @@ package org.apache.jackrabbit.oak.segment.standby.client; +import java.io.InputStream; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; @@ -117,10 +118,7 @@ class StandbyClient implements AutoClose p.addLast(new SnappyFramedDecoder(true)); - // Such a big max frame length is needed because blob - // values are sent in one big message. In future - // versions of the protocol, sending binaries in chunks - // should be considered instead. + // The frame length limits the chunk size to max. 2.2GB p.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4)); p.addLast(new ResponseDecoder()); @@ -189,7 +187,7 @@ class StandbyClient implements AutoClose } @Nullable - byte[] getBlob(String blobId) throws InterruptedException { + InputStream getBlob(String blobId) throws InterruptedException { channel.writeAndFlush(new GetBlobRequest(clientId, blobId)); GetBlobResponse response = blobQueue.poll(readTimeoutMs, TimeUnit.MILLISECONDS); @@ -198,7 +196,7 @@ class StandbyClient implements AutoClose return null; } - return response.getBlobData(); + return response.getInputStream(); } @Nullable Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java?rev=1804511&r1=1804510&r2=1804511&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java Wed Aug 9 12:47:32 2017 @@ -22,7 +22,7 @@ package org.apache.jackrabbit.oak.segmen import static org.apache.jackrabbit.oak.api.Type.BINARIES; import static org.apache.jackrabbit.oak.api.Type.BINARY; -import java.io.ByteArrayInputStream; +import java.io.InputStream; import java.io.IOException; import com.google.common.base.Supplier; @@ -164,16 +164,17 @@ class StandbyDiff implements NodeStateDi } private void readBlob(String blobId, String pName) throws InterruptedException { - byte[] data = client.getBlob(blobId); + InputStream in = client.getBlob(blobId); - if (data == null) { + if (in == null) { throw new IllegalStateException("Unable to load remote blob " + blobId + " at " + path + "#" + pName); } try { BlobStore blobStore = store.getBlobStore(); assert blobStore != null : "Blob store must not be null"; - blobStore.writeBlob(new ByteArrayInputStream(data)); + blobStore.writeBlob(in); + in.close(); } catch (IOException f) { throw new IllegalStateException("Unable to persist blob " + blobId + " at " + path + "#" + pName, f); } Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponse.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponse.java?rev=1804511&r1=1804510&r2=1804511&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponse.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponse.java Wed Aug 9 12:47:32 2017 @@ -17,18 +17,23 @@ package org.apache.jackrabbit.oak.segment.standby.codec; +import java.io.InputStream; + public class GetBlobResponse { private final String clientId; private final String blobId; - private final byte[] blobData; + private final InputStream in; + + private final long length; - public GetBlobResponse(String clientId, String blobId, byte[] blobData) { + public GetBlobResponse(String clientId, String blobId, InputStream in, long length) { this.clientId = clientId; this.blobId = blobId; - this.blobData = blobData; + this.in = in; + this.length = length; } public String getClientId() { @@ -39,8 +44,11 @@ public class GetBlobResponse { return blobId; } - public byte[] getBlobData() { - return blobData; + public InputStream getInputStream() { + return in; } + public long getLength() { + return length; + } } Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoder.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoder.java?rev=1804511&r1=1804510&r2=1804511&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoder.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoder.java Wed Aug 9 12:47:32 2017 @@ -17,38 +17,97 @@ package org.apache.jackrabbit.oak.segment.standby.codec; +import static java.lang.Math.min; + +import java.io.InputStream; import java.nio.charset.Charset; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class GetBlobResponseEncoder extends MessageToByteEncoder<GetBlobResponse> { +public class GetBlobResponseEncoder extends ChannelOutboundHandlerAdapter { private static final Logger log = LoggerFactory.getLogger(GetBlobResponseEncoder.class); + + private final int blobChunkSize; - @Override - protected void encode(ChannelHandlerContext ctx, GetBlobResponse msg, ByteBuf out) throws Exception { - log.debug("Sending blob {} to client {}", msg.getBlobId(), msg.getClientId()); - encode(msg.getBlobId(), msg.getBlobData(), out); + public GetBlobResponseEncoder(final int blobChunkSize) { + this.blobChunkSize = blobChunkSize; + } + + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + if (msg instanceof GetBlobResponse) { + GetBlobResponse response = (GetBlobResponse) msg; + log.debug("Sending blob {} to client {}", response.getBlobId(), response.getClientId()); + + String blobId = response.getBlobId(); + long length = response.getLength(); + InputStream in = response.getInputStream(); + byte[] blobIdBytes = blobId.getBytes(Charset.forName("UTF-8")); + + try { + byte[] buffer = new byte[blobChunkSize]; + int l = 0; + int totalChunks = (int) (length / (long) blobChunkSize); + if (length % blobChunkSize != 0) { + totalChunks++; + } + int currentChunk = 0; + + while (currentChunk < totalChunks) { + l = in.read(buffer, 0, (int) min(buffer.length, length)); + byte[] data = new byte[l]; + System.arraycopy(buffer, 0, data, 0, l); + + currentChunk++; + ByteBuf out = createChunk(ctx, blobIdBytes, data, currentChunk, totalChunks); + log.info("Sending chunk {}/{} of size {} to client {}", currentChunk, totalChunks, data.length, + response.getClientId()); + ctx.writeAndFlush(out); + } + } finally { + in.close(); + } + } else { + ctx.write(msg, promise); + } } - private static void encode(String blobId, byte[] data, ByteBuf out) { - byte[] blobIdBytes = blobId.getBytes(Charset.forName("UTF-8")); + private ByteBuf createChunk(ChannelHandlerContext ctx, byte[] blobIdBytes, byte[] data, int currentChunk, + int totalChunks) { + byte mask = createMask(currentChunk, totalChunks); Hasher hasher = Hashing.murmur3_32().newHasher(); - long hash = hasher.putBytes(data).hash().padToLong(); + long hash = hasher.putByte(mask).putBytes(data).hash().padToLong(); - out.writeInt(1 + 4 + blobIdBytes.length + 8 + data.length); + ByteBuf out = ctx.alloc().buffer(); + out.writeInt(1 + 1 + 4 + blobIdBytes.length + 8 + data.length); out.writeByte(Messages.HEADER_BLOB); + out.writeByte(mask); out.writeInt(blobIdBytes.length); out.writeBytes(blobIdBytes); out.writeLong(hash); out.writeBytes(data); + + return out; } + + private byte createMask(int currentChunk, int totalChunks) { + byte mask = 0; + if (currentChunk == 1) { + mask = (byte) (mask | (1 << 0)); + } + + if (currentChunk == totalChunks) { + mask = (byte) (mask | (1 << 1)); + } + return mask; + } } Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java?rev=1804511&r1=1804510&r2=1804511&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java Wed Aug 9 12:47:32 2017 @@ -20,6 +20,11 @@ package org.apache.jackrabbit.oak.segmen import static java.util.Arrays.asList; import static java.util.Collections.emptyList; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; import java.util.List; import java.util.UUID; @@ -33,6 +38,7 @@ import org.slf4j.LoggerFactory; public class ResponseDecoder extends ByteToMessageDecoder { + private static final String TMP_DIR = System.getProperty("java.io.tmpdir"); private static final Logger log = LoggerFactory.getLogger(ResponseDecoder.class); @Override @@ -87,25 +93,46 @@ public class ResponseDecoder extends Byt out.add(new GetSegmentResponse(null, segmentId, data)); } - private static void decodeGetBlobResponse(int length, ByteBuf in, List<Object> out) { - int blobIdLength = in.readInt(); + private static void decodeGetBlobResponse(int length, ByteBuf in, List<Object> out) throws IOException { + byte mask = in.readByte(); + int blobIdLength = in.readInt(); byte[] blobIdBytes = new byte[blobIdLength]; in.readBytes(blobIdBytes); - String blobId = new String(blobIdBytes, Charsets.UTF_8); + File tempFile = new File(TMP_DIR, blobId + ".tmp"); + + // START_CHUNK flag enabled + if ((mask & (1 << 0)) != 0) { + if (tempFile.exists()) { + log.debug("Detected previous incomplete transfer for {}. Cleaning up...", blobId); + tempFile.delete(); + } + } long hash = in.readLong(); - byte[] blobData = new byte[length - 1 - 4 - blobIdBytes.length - 8]; - in.readBytes(blobData); + log.debug("Received chunk of size {} from blob {} ", in.readableBytes(), blobId); + byte[] chunkData = new byte[in.readableBytes()]; + in.readBytes(chunkData); - if (hash(blobData) != hash) { - log.debug("Invalid checksum, discarding blob {}", blobId); + if (hash(mask, chunkData) != hash) { + log.debug("Invalid checksum, discarding current chunk from {}", blobId); return; + } else { + log.debug("All checks OK. Appending chunk to disk to {} ", tempFile.getAbsolutePath()); + OutputStream outStream = new FileOutputStream(tempFile, true); + outStream.write(chunkData); + outStream.close(); } - out.add(new GetBlobResponse(null, blobId, blobData)); + // END_CHUNK flag enabled + if ((mask & (1 << 1)) != 0) { + log.debug("Received entire blob {}", blobId); + + FileInputStream fis = new FileInputStream(tempFile); + out.add(new GetBlobResponse(null, blobId, fis, fis.getChannel().size())); + } } private static void decodeGetReferencesResponse(int length, ByteBuf in, List<Object> out) { @@ -139,4 +166,8 @@ public class ResponseDecoder extends Byt return Hashing.murmur3_32().newHasher().putBytes(data).hash().padToLong(); } + private static long hash(byte mask, byte[] data) { + return Hashing.murmur3_32().newHasher().putByte(mask).putBytes(data).hash().padToLong(); + } + } Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java?rev=1804511&r1=1804510&r2=1804511&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java Wed Aug 9 12:47:32 2017 @@ -20,7 +20,6 @@ package org.apache.jackrabbit.oak.segmen import java.io.IOException; import java.io.InputStream; -import org.apache.commons.io.IOUtils; import org.apache.jackrabbit.oak.spi.blob.BlobStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,20 +35,32 @@ class DefaultStandbyBlobReader implement } @Override - public byte[] readBlob(String blobId) { + public InputStream readBlob(String blobId) { if (store == null) { return null; } - byte[] bytes = null; - - try (InputStream s = store.getInputStream(blobId)) { - bytes = IOUtils.toByteArray(s); + try { + return store.getInputStream(blobId); } catch (IOException e) { log.warn("Error while reading blob content", e); } - return bytes; + return null; + } + + @Override + public long getBlobLength(String blobId) { + if (store == null) { + return -1L; + } + + try { + return store.getBlobLength(blobId); + } catch (IOException e) { + log.warn("Error while reading blob content", e); + } + + return -1L; } - } Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java?rev=1804511&r1=1804510&r2=1804511&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java Wed Aug 9 12:47:32 2017 @@ -17,6 +17,8 @@ package org.apache.jackrabbit.oak.segment.standby.server; +import java.io.InputStream; + import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.apache.jackrabbit.oak.segment.standby.codec.GetBlobRequest; @@ -38,14 +40,15 @@ class GetBlobRequestHandler extends Simp protected void channelRead0(ChannelHandlerContext ctx, GetBlobRequest msg) throws Exception { log.debug("Reading blob {} for client {}", msg.getBlobId(), msg.getClientId()); - byte[] data = reader.readBlob(msg.getBlobId()); + InputStream in = reader.readBlob(msg.getBlobId()); + long length = reader.getBlobLength(msg.getBlobId()); - if (data == null) { + if (in == null || length == -1L) { log.debug("Blob {} not found, discarding request from client {}", msg.getBlobId(), msg.getClientId()); return; } - ctx.writeAndFlush(new GetBlobResponse(msg.getClientId(), msg.getBlobId(), data)); + ctx.writeAndFlush(new GetBlobResponse(msg.getClientId(), msg.getBlobId(), in, length)); } } Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java?rev=1804511&r1=1804510&r2=1804511&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java Wed Aug 9 12:47:32 2017 @@ -52,7 +52,7 @@ class ResponseObserverHandler extends Ch } private void onGetBlobResponse(GetBlobResponse response) { - observer.didSendBinariesBytes(response.getClientId(), Math.max(0, response.getBlobData().length)); + observer.didSendBinariesBytes(response.getClientId(), Math.max(0, response.getLength())); } } Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java?rev=1804511&r1=1804510&r2=1804511&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java Wed Aug 9 12:47:32 2017 @@ -17,8 +17,11 @@ package org.apache.jackrabbit.oak.segment.standby.server; -interface StandbyBlobReader { +import java.io.InputStream; - byte[] readBlob(String blobId); +interface StandbyBlobReader { + InputStream readBlob(String blobId); + + long getBlobLength(String blobId); } Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java?rev=1804511&r1=1804510&r2=1804511&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java Wed Aug 9 12:47:32 2017 @@ -54,6 +54,7 @@ import org.slf4j.LoggerFactory; class StandbyServer implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(StandbyServer.class); + private static final int BLOB_CHUNK_SIZE = 1024 * 1024; static Builder builder(int port, StoreProvider provider) { return new Builder(port, provider); @@ -164,7 +165,7 @@ class StandbyServer implements AutoClose p.addLast(new SnappyFramedEncoder()); p.addLast(new GetHeadResponseEncoder()); p.addLast(new GetSegmentResponseEncoder()); - p.addLast(new GetBlobResponseEncoder()); + p.addLast(new GetBlobResponseEncoder(BLOB_CHUNK_SIZE)); p.addLast(new GetReferencesResponseEncoder()); p.addLast(new ResponseObserverHandler(builder.observer)); Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java?rev=1804511&r1=1804510&r2=1804511&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java (original) +++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java Wed Aug 9 12:47:32 2017 @@ -101,7 +101,7 @@ public class CommunicationObserver { partnerDetails.put(client, m); } - public void didSendBinariesBytes(String client, int size) { + public void didSendBinariesBytes(String client, long size) { log.debug("Binary with size {} sent to client {}", size, client); CommunicationPartnerMBean m = partnerDetails.get(client); m.onBinarySent(size);