Author: adulceanu
Date: Wed Aug  9 12:47:32 2017
New Revision: 1804511

URL: http://svn.apache.org/viewvc?rev=1804511&view=rev
Log:
OAK-5902 - Cold standby should allow syncing of blobs bigger than 2.2 GB
Implemented chunking algorithm on client and server

Modified:
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponse.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoder.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java?rev=1804511&r1=1804510&r2=1804511&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClient.java
 Wed Aug  9 12:47:32 2017
@@ -17,6 +17,7 @@
 
 package org.apache.jackrabbit.oak.segment.standby.client;
 
+import java.io.InputStream;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
@@ -117,10 +118,7 @@ class StandbyClient implements AutoClose
 
                         p.addLast(new SnappyFramedDecoder(true));
 
-                        // Such a big max frame length is needed because blob
-                        // values are sent in one big message. In future
-                        // versions of the protocol, sending binaries in chunks
-                        // should be considered instead.
+                        // The frame length limits the chunk size to max. 2.2GB
 
                         p.addLast(new 
LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4));
                         p.addLast(new ResponseDecoder());
@@ -189,7 +187,7 @@ class StandbyClient implements AutoClose
     }
 
     @Nullable
-    byte[] getBlob(String blobId) throws InterruptedException {
+    InputStream getBlob(String blobId) throws InterruptedException {
         channel.writeAndFlush(new GetBlobRequest(clientId, blobId));
 
         GetBlobResponse response = blobQueue.poll(readTimeoutMs, 
TimeUnit.MILLISECONDS);
@@ -198,7 +196,7 @@ class StandbyClient implements AutoClose
             return null;
         }
 
-        return response.getBlobData();
+        return response.getInputStream();
     }
 
     @Nullable

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java?rev=1804511&r1=1804510&r2=1804511&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyDiff.java
 Wed Aug  9 12:47:32 2017
@@ -22,7 +22,7 @@ package org.apache.jackrabbit.oak.segmen
 import static org.apache.jackrabbit.oak.api.Type.BINARIES;
 import static org.apache.jackrabbit.oak.api.Type.BINARY;
 
-import java.io.ByteArrayInputStream;
+import java.io.InputStream;
 import java.io.IOException;
 
 import com.google.common.base.Supplier;
@@ -164,16 +164,17 @@ class StandbyDiff implements NodeStateDi
     }
 
     private void readBlob(String blobId, String pName) throws 
InterruptedException {
-        byte[] data = client.getBlob(blobId);
+        InputStream in = client.getBlob(blobId);
 
-        if (data == null) {
+        if (in == null) {
             throw new IllegalStateException("Unable to load remote blob " + 
blobId + " at " + path + "#" + pName);
         }
 
         try {
             BlobStore blobStore = store.getBlobStore();
             assert blobStore != null : "Blob store must not be null";
-            blobStore.writeBlob(new ByteArrayInputStream(data));
+            blobStore.writeBlob(in);
+            in.close();
         } catch (IOException f) {
             throw new IllegalStateException("Unable to persist blob " + blobId 
+ " at " + path + "#" + pName, f);
         }

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponse.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponse.java?rev=1804511&r1=1804510&r2=1804511&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponse.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponse.java
 Wed Aug  9 12:47:32 2017
@@ -17,18 +17,23 @@
 
 package org.apache.jackrabbit.oak.segment.standby.codec;
 
+import java.io.InputStream;
+
 public class GetBlobResponse {
 
     private final String clientId;
 
     private final String blobId;
 
-    private final byte[] blobData;
+    private final InputStream in;
+    
+    private final long length;
 
-    public GetBlobResponse(String clientId, String blobId, byte[] blobData) {
+    public GetBlobResponse(String clientId, String blobId, InputStream in, 
long length) {
         this.clientId = clientId;
         this.blobId = blobId;
-        this.blobData = blobData;
+        this.in = in;
+        this.length = length;
     }
 
     public String getClientId() {
@@ -39,8 +44,11 @@ public class GetBlobResponse {
         return blobId;
     }
 
-    public byte[] getBlobData() {
-        return blobData;
+    public InputStream getInputStream() {
+        return in;
     }
 
+    public long getLength() {
+        return length;
+    }
 }

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoder.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoder.java?rev=1804511&r1=1804510&r2=1804511&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoder.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoder.java
 Wed Aug  9 12:47:32 2017
@@ -17,38 +17,97 @@
 
 package org.apache.jackrabbit.oak.segment.standby.codec;
 
+import static java.lang.Math.min;
+
+import java.io.InputStream;
 import java.nio.charset.Charset;
 
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.MessageToByteEncoder;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class GetBlobResponseEncoder extends 
MessageToByteEncoder<GetBlobResponse> {
+public class GetBlobResponseEncoder extends ChannelOutboundHandlerAdapter {
 
     private static final Logger log = 
LoggerFactory.getLogger(GetBlobResponseEncoder.class);
+    
+    private final int blobChunkSize;
 
-    @Override
-    protected void encode(ChannelHandlerContext ctx, GetBlobResponse msg, 
ByteBuf out) throws Exception {
-        log.debug("Sending blob {} to client {}", msg.getBlobId(), 
msg.getClientId());
-        encode(msg.getBlobId(), msg.getBlobData(), out);
+    public GetBlobResponseEncoder(final int blobChunkSize) {
+        this.blobChunkSize = blobChunkSize;
+    }
+    
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise 
promise) throws Exception {
+        if (msg instanceof GetBlobResponse) {
+            GetBlobResponse response = (GetBlobResponse) msg;
+            log.debug("Sending blob {} to client {}", response.getBlobId(), 
response.getClientId());
+
+            String blobId = response.getBlobId();
+            long length = response.getLength();
+            InputStream in = response.getInputStream();
+            byte[] blobIdBytes = blobId.getBytes(Charset.forName("UTF-8"));
+
+            try {
+                byte[] buffer = new byte[blobChunkSize];
+                int l = 0;
+                int totalChunks = (int) (length / (long) blobChunkSize);
+                if (length % blobChunkSize != 0) {
+                    totalChunks++;
+                }
+                int currentChunk = 0;
+
+                while (currentChunk < totalChunks) {
+                    l = in.read(buffer, 0, (int) min(buffer.length, length));
+                    byte[] data = new byte[l];
+                    System.arraycopy(buffer, 0, data, 0, l);
+
+                    currentChunk++;
+                    ByteBuf out = createChunk(ctx, blobIdBytes, data, 
currentChunk, totalChunks);
+                    log.info("Sending chunk {}/{} of size {} to client {}", 
currentChunk, totalChunks, data.length,
+                            response.getClientId());
+                    ctx.writeAndFlush(out);
+                }
+            } finally {
+                in.close();
+            }
+        } else {
+            ctx.write(msg, promise);
+        }
     }
 
-    private static void encode(String blobId, byte[] data, ByteBuf out) {
-        byte[] blobIdBytes = blobId.getBytes(Charset.forName("UTF-8"));
+    private ByteBuf createChunk(ChannelHandlerContext ctx, byte[] blobIdBytes, 
byte[] data, int currentChunk,
+            int totalChunks) {
+        byte mask = createMask(currentChunk, totalChunks);
 
         Hasher hasher = Hashing.murmur3_32().newHasher();
-        long hash = hasher.putBytes(data).hash().padToLong();
+        long hash = hasher.putByte(mask).putBytes(data).hash().padToLong();
 
-        out.writeInt(1 + 4 + blobIdBytes.length + 8 + data.length);
+        ByteBuf out = ctx.alloc().buffer();
+        out.writeInt(1 + 1 + 4 + blobIdBytes.length + 8 + data.length);
         out.writeByte(Messages.HEADER_BLOB);
+        out.writeByte(mask);
         out.writeInt(blobIdBytes.length);
         out.writeBytes(blobIdBytes);
         out.writeLong(hash);
         out.writeBytes(data);
+
+        return out;
     }
+    
+    private byte createMask(int currentChunk, int totalChunks) {
+        byte mask = 0;
+        if (currentChunk == 1) {
+            mask = (byte) (mask | (1 << 0));
+        }
+
+        if (currentChunk == totalChunks) {
+            mask = (byte) (mask | (1 << 1));
+        }
 
+        return mask;
+    }
 }

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java?rev=1804511&r1=1804510&r2=1804511&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java
 Wed Aug  9 12:47:32 2017
@@ -20,6 +20,11 @@ package org.apache.jackrabbit.oak.segmen
 import static java.util.Arrays.asList;
 import static java.util.Collections.emptyList;
 
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.List;
 import java.util.UUID;
 
@@ -33,6 +38,7 @@ import org.slf4j.LoggerFactory;
 
 public class ResponseDecoder extends ByteToMessageDecoder {
 
+    private static final String TMP_DIR = System.getProperty("java.io.tmpdir");
     private static final Logger log = 
LoggerFactory.getLogger(ResponseDecoder.class);
 
     @Override
@@ -87,25 +93,46 @@ public class ResponseDecoder extends Byt
         out.add(new GetSegmentResponse(null, segmentId, data));
     }
 
-    private static void decodeGetBlobResponse(int length, ByteBuf in, 
List<Object> out) {
-        int blobIdLength = in.readInt();
+    private static void decodeGetBlobResponse(int length, ByteBuf in, 
List<Object> out) throws IOException {
+        byte mask = in.readByte();
 
+        int blobIdLength = in.readInt();
         byte[] blobIdBytes = new byte[blobIdLength];
         in.readBytes(blobIdBytes);
-
         String blobId = new String(blobIdBytes, Charsets.UTF_8);
+        File tempFile = new File(TMP_DIR, blobId + ".tmp");
+        
+        // START_CHUNK flag enabled
+        if ((mask & (1 << 0)) != 0) {
+            if (tempFile.exists()) {
+                log.debug("Detected previous incomplete transfer for {}. 
Cleaning up...", blobId);
+                tempFile.delete();
+            }
+        }
 
         long hash = in.readLong();
 
-        byte[] blobData = new byte[length - 1 - 4 - blobIdBytes.length - 8];
-        in.readBytes(blobData);
+        log.debug("Received chunk of size {} from blob {} ", 
in.readableBytes(), blobId);
+        byte[] chunkData = new byte[in.readableBytes()];
+        in.readBytes(chunkData);
 
-        if (hash(blobData) != hash) {
-            log.debug("Invalid checksum, discarding blob {}", blobId);
+        if (hash(mask, chunkData) != hash) {
+            log.debug("Invalid checksum, discarding current chunk from {}", 
blobId);
             return;
+        } else {
+            log.debug("All checks OK. Appending chunk to disk to {} ", 
tempFile.getAbsolutePath());
+            OutputStream outStream = new FileOutputStream(tempFile, true);
+            outStream.write(chunkData);
+            outStream.close();
         }
 
-        out.add(new GetBlobResponse(null, blobId, blobData));
+        // END_CHUNK flag enabled
+        if ((mask & (1 << 1)) != 0) {
+            log.debug("Received entire blob {}", blobId);
+
+            FileInputStream fis = new FileInputStream(tempFile);
+            out.add(new GetBlobResponse(null, blobId, fis, 
fis.getChannel().size()));
+        }
     }
 
     private static void decodeGetReferencesResponse(int length, ByteBuf in, 
List<Object> out) {
@@ -139,4 +166,8 @@ public class ResponseDecoder extends Byt
         return 
Hashing.murmur3_32().newHasher().putBytes(data).hash().padToLong();
     }
 
+    private static long hash(byte mask, byte[] data) {
+        return 
Hashing.murmur3_32().newHasher().putByte(mask).putBytes(data).hash().padToLong();
+    }
+    
 }

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java?rev=1804511&r1=1804510&r2=1804511&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReader.java
 Wed Aug  9 12:47:32 2017
@@ -20,7 +20,6 @@ package org.apache.jackrabbit.oak.segmen
 import java.io.IOException;
 import java.io.InputStream;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,20 +35,32 @@ class DefaultStandbyBlobReader implement
     }
 
     @Override
-    public byte[] readBlob(String blobId) {
+    public InputStream readBlob(String blobId) {
         if (store == null) {
             return null;
         }
 
-        byte[] bytes = null;
-
-        try (InputStream s = store.getInputStream(blobId)) {
-            bytes = IOUtils.toByteArray(s);
+        try {
+            return store.getInputStream(blobId);
         } catch (IOException e) {
             log.warn("Error while reading blob content", e);
         }
 
-        return bytes;
+        return null;
+    }
+    
+    @Override
+    public long getBlobLength(String blobId) {
+        if (store == null) {
+            return -1L;
+        }
+        
+        try {
+            return store.getBlobLength(blobId);
+        } catch (IOException e) {
+            log.warn("Error while reading blob content", e);
+        }
+     
+        return -1L;
     }
-
 }

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java?rev=1804511&r1=1804510&r2=1804511&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandler.java
 Wed Aug  9 12:47:32 2017
@@ -17,6 +17,8 @@
 
 package org.apache.jackrabbit.oak.segment.standby.server;
 
+import java.io.InputStream;
+
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import org.apache.jackrabbit.oak.segment.standby.codec.GetBlobRequest;
@@ -38,14 +40,15 @@ class GetBlobRequestHandler extends Simp
     protected void channelRead0(ChannelHandlerContext ctx, GetBlobRequest msg) 
throws Exception {
         log.debug("Reading blob {} for client {}", msg.getBlobId(), 
msg.getClientId());
 
-        byte[] data = reader.readBlob(msg.getBlobId());
+        InputStream in = reader.readBlob(msg.getBlobId());
+        long length = reader.getBlobLength(msg.getBlobId());
 
-        if (data == null) {
+        if (in == null || length == -1L) {
             log.debug("Blob {} not found, discarding request from client {}", 
msg.getBlobId(), msg.getClientId());
             return;
         }
 
-        ctx.writeAndFlush(new GetBlobResponse(msg.getClientId(), 
msg.getBlobId(), data));
+        ctx.writeAndFlush(new GetBlobResponse(msg.getClientId(), 
msg.getBlobId(), in, length));
     }
 
 }

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java?rev=1804511&r1=1804510&r2=1804511&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/ResponseObserverHandler.java
 Wed Aug  9 12:47:32 2017
@@ -52,7 +52,7 @@ class ResponseObserverHandler extends Ch
     }
 
     private void onGetBlobResponse(GetBlobResponse response) {
-        observer.didSendBinariesBytes(response.getClientId(), Math.max(0, 
response.getBlobData().length));
+        observer.didSendBinariesBytes(response.getClientId(), Math.max(0, 
response.getLength()));
     }
 
 }

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java?rev=1804511&r1=1804510&r2=1804511&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyBlobReader.java
 Wed Aug  9 12:47:32 2017
@@ -17,8 +17,11 @@
 
 package org.apache.jackrabbit.oak.segment.standby.server;
 
-interface StandbyBlobReader {
+import java.io.InputStream;
 
-    byte[] readBlob(String blobId);
+interface StandbyBlobReader {
 
+    InputStream readBlob(String blobId);
+    
+    long getBlobLength(String blobId);
 }

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java?rev=1804511&r1=1804510&r2=1804511&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
 Wed Aug  9 12:47:32 2017
@@ -54,6 +54,7 @@ import org.slf4j.LoggerFactory;
 class StandbyServer implements AutoCloseable {
 
     private static final Logger log = 
LoggerFactory.getLogger(StandbyServer.class);
+    private static final int BLOB_CHUNK_SIZE = 1024 * 1024;
 
     static Builder builder(int port, StoreProvider provider) {
         return new Builder(port, provider);
@@ -164,7 +165,7 @@ class StandbyServer implements AutoClose
                 p.addLast(new SnappyFramedEncoder());
                 p.addLast(new GetHeadResponseEncoder());
                 p.addLast(new GetSegmentResponseEncoder());
-                p.addLast(new GetBlobResponseEncoder());
+                p.addLast(new GetBlobResponseEncoder(BLOB_CHUNK_SIZE));
                 p.addLast(new GetReferencesResponseEncoder());
                 p.addLast(new ResponseObserverHandler(builder.observer));
 

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java?rev=1804511&r1=1804510&r2=1804511&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/store/CommunicationObserver.java
 Wed Aug  9 12:47:32 2017
@@ -101,7 +101,7 @@ public class CommunicationObserver {
         partnerDetails.put(client, m);
     }
 
-    public void didSendBinariesBytes(String client, int size) {
+    public void didSendBinariesBytes(String client, long size) {
         log.debug("Binary with size {} sent to client {}", size, client);
         CommunicationPartnerMBean m = partnerDetails.get(client);
         m.onBinarySent(size);


Reply via email to