Author: adulceanu
Date: Wed Aug  9 12:47:41 2017
New Revision: 1804512

URL: http://svn.apache.org/viewvc?rev=1804512&view=rev
Log:
OAK-5902 - Cold standby should allow syncing of blobs bigger than 2.2 GB
Fixed old unit tests and added additional tests for chunking

Modified:
    
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTestUtils.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoderTest.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoderTest.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReaderTest.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandlerTest.java

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTestUtils.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTestUtils.java?rev=1804512&r1=1804511&r2=1804512&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTestUtils.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/StandbyTestUtils.java
 Wed Aug  9 12:47:41 2017
@@ -22,7 +22,10 @@ import static org.mockito.Mockito.mock;
 import java.nio.ByteBuffer;
 import java.util.UUID;
 
+import com.google.common.base.Charsets;
 import com.google.common.hash.Hashing;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import org.apache.jackrabbit.oak.segment.RecordId;
 import org.apache.jackrabbit.oak.segment.Segment;
 import org.apache.jackrabbit.oak.segment.SegmentId;
@@ -54,5 +57,37 @@ public class StandbyTestUtils {
     public static long hash(byte[] data) {
         return 
Hashing.murmur3_32().newHasher().putBytes(data).hash().padToLong();
     }
+    
+    public static long hash(byte mask, byte[] data) {
+        return 
Hashing.murmur3_32().newHasher().putByte(mask).putBytes(data).hash().padToLong();
+    }
+    
+    public static byte createMask(int currentChunk, int totalChunks) {
+        byte mask = 0;
+        if (currentChunk == 1) {
+            mask = (byte) (mask | (1 << 0));
+        }
+
+        if (currentChunk == totalChunks) {
+            mask = (byte) (mask | (1 << 1));
+        }
+
+        return mask;
+    }
+    
+    public static ByteBuf createBlobChunkBuffer(byte header,String blobId, 
byte[] data, byte mask) {
+        byte[] blobIdBytes = blobId.getBytes(Charsets.UTF_8);
+        
+        ByteBuf buf = Unpooled.buffer();
+        buf.writeInt(1 + 1 + 4 + blobIdBytes.length + 8 + data.length);
+        buf.writeByte(header);
+        buf.writeByte(mask);
+        buf.writeInt(blobIdBytes.length);
+        buf.writeBytes(blobIdBytes);
+        buf.writeLong(hash(mask, data));
+        buf.writeBytes(data);
+        
+        return buf;
+    }
 
 }

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoderTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoderTest.java?rev=1804512&r1=1804511&r2=1804512&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoderTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/GetBlobResponseEncoderTest.java
 Wed Aug  9 12:47:41 2017
@@ -17,37 +17,54 @@
 
 package org.apache.jackrabbit.oak.segment.standby.codec;
 
-import static org.apache.jackrabbit.oak.segment.standby.StandbyTestUtils.hash;
+import static 
org.apache.jackrabbit.oak.segment.standby.StandbyTestUtils.createBlobChunkBuffer;
+import static 
org.apache.jackrabbit.oak.segment.standby.StandbyTestUtils.createMask;
 import static org.junit.Assert.assertEquals;
 
-import com.google.common.base.Charsets;
+import java.io.ByteArrayInputStream;
+
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
 import io.netty.channel.embedded.EmbeddedChannel;
 import org.junit.Test;
 
 public class GetBlobResponseEncoderTest {
 
     @Test
-    public void encodeResponse() throws Exception {
+    public void shouldEncodeOneChunkResponse() throws Exception {
         byte[] blobData = new byte[] {1, 2, 3};
 
         String blobId = "blobId";
-        byte[] blobIdBytes = blobId.getBytes(Charsets.UTF_8);
+        byte mask = createMask(1, 1);
 
-        EmbeddedChannel channel = new EmbeddedChannel(new 
GetBlobResponseEncoder());
-        channel.writeOutbound(new GetBlobResponse("clientId", blobId, 
blobData));
+        EmbeddedChannel channel = new EmbeddedChannel(new 
GetBlobResponseEncoder(3));
+        channel.writeOutbound(new GetBlobResponse("clientId", blobId,new 
ByteArrayInputStream(blobData), blobData.length));
         ByteBuf buffer = (ByteBuf) channel.readOutbound();
-
-        ByteBuf expected = Unpooled.buffer();
-        expected.writeInt(1 + 4 + blobIdBytes.length + 8 + blobData.length);
-        expected.writeByte(Messages.HEADER_BLOB);
-        expected.writeInt(blobIdBytes.length);
-        expected.writeBytes(blobIdBytes);
-        expected.writeLong(hash(blobData));
-        expected.writeBytes(blobData);
-
+        ByteBuf expected = createBlobChunkBuffer(Messages.HEADER_BLOB, blobId, 
blobData, mask);
+        
         assertEquals(expected, buffer);
     }
 
+    @Test
+    public void shouldEncodeTwoChunksResponse() throws Exception {
+        byte[] blobData = new byte[] {1, 2, 3, 4};
+        byte[] firstChunkData = new byte[] {1, 2};
+        byte[] secondChunkbData = new byte[] {3, 4};
+
+        String blobId = "blobId";
+
+        EmbeddedChannel channel = new EmbeddedChannel(new 
GetBlobResponseEncoder(2));
+        channel.writeOutbound(new GetBlobResponse("clientId", blobId,new 
ByteArrayInputStream(blobData), blobData.length));
+        
+        ByteBuf firstBuffer = (ByteBuf) channel.readOutbound();
+        byte firstMask = createMask(1, 2);
+        ByteBuf firstExpected = createBlobChunkBuffer(Messages.HEADER_BLOB, 
blobId, firstChunkData, firstMask);
+
+        assertEquals(firstExpected, firstBuffer);
+        
+        ByteBuf secondBuffer = (ByteBuf) channel.readOutbound();
+        byte secondMask = createMask(2, 2);
+        ByteBuf secondExpected = createBlobChunkBuffer(Messages.HEADER_BLOB, 
blobId, secondChunkbData, secondMask);
+
+        assertEquals(secondExpected, secondBuffer);
+    }
 }

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoderTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoderTest.java?rev=1804512&r1=1804511&r2=1804512&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoderTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoderTest.java
 Wed Aug  9 12:47:41 2017
@@ -21,19 +21,21 @@ import static com.google.common.collect.
 import static com.google.common.collect.Lists.newArrayList;
 import static java.util.Arrays.asList;
 import static java.util.Collections.emptyList;
+import static 
org.apache.jackrabbit.oak.segment.standby.StandbyTestUtils.createBlobChunkBuffer;
+import static 
org.apache.jackrabbit.oak.segment.standby.StandbyTestUtils.createMask;
 import static org.apache.jackrabbit.oak.segment.standby.StandbyTestUtils.hash;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import java.util.Collections;
 import java.util.UUID;
 
 import com.google.common.base.Charsets;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.commons.io.IOUtils;
 import org.junit.Test;
 
 public class ResponseDecoderTest {
@@ -50,25 +52,45 @@ public class ResponseDecoderTest {
     }
 
     @Test
-    public void shouldDecodeValidGetBlobResponses() throws Exception {
+    public void shouldDecodeValidOneChunkGetBlobResponses() throws Exception {
         byte[] blobData = new byte[] {1, 2, 3};
 
         String blobId = "blobId";
-        byte[] blobIdBytes = blobId.getBytes(Charsets.UTF_8);
-
-        ByteBuf buf = Unpooled.buffer();
-        buf.writeInt(1 + 4 + blobIdBytes.length + 8 + blobData.length);
-        buf.writeByte(Messages.HEADER_BLOB);
-        buf.writeInt(blobIdBytes.length);
-        buf.writeBytes(blobIdBytes);
-        buf.writeLong(hash(blobData));
-        buf.writeBytes(blobData);
+        byte mask = createMask(1, 1);
+        ByteBuf buf = createBlobChunkBuffer(Messages.HEADER_BLOB, blobId, 
blobData, mask);
 
         EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder());
         channel.writeInbound(buf);
         GetBlobResponse response = (GetBlobResponse) channel.readInbound();
         assertEquals("blobId", response.getBlobId());
-        assertArrayEquals(blobData, response.getBlobData());
+        assertEquals(blobData.length, response.getLength());
+        byte[] receivedData = IOUtils.toByteArray(response.getInputStream());
+        assertArrayEquals(blobData, receivedData);
+    }
+    
+    @Test
+    public void shouldDecodeValidTwoChunksGetBlobResponses() throws Exception {
+        byte[] blobData = new byte[] {1, 2, 3, 4};
+        byte[] firstChunkData = new byte[] {1, 2};
+        byte[] secondChunkbData = new byte[] {3, 4};
+
+        String blobId = "blobId";
+        
+        byte firstMask = createMask(1, 2);
+        ByteBuf firstBuf = createBlobChunkBuffer(Messages.HEADER_BLOB, blobId, 
firstChunkData, firstMask);
+        
+        byte secondMask = createMask(2, 2);
+        ByteBuf secondBuf = createBlobChunkBuffer(Messages.HEADER_BLOB, 
blobId, secondChunkbData, secondMask);
+
+        EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder());
+        channel.writeInbound(firstBuf);
+        channel.writeInbound(secondBuf);
+        
+        GetBlobResponse response = (GetBlobResponse) channel.readInbound();
+        assertEquals("blobId", response.getBlobId());
+        assertEquals(blobData.length, response.getLength());
+        byte[] receivedData = IOUtils.toByteArray(response.getInputStream());
+        assertArrayEquals(blobData, receivedData);
     }
 
     @Test
@@ -77,13 +99,15 @@ public class ResponseDecoderTest {
 
         String blobId = "blobId";
         byte[] blobIdBytes = blobId.getBytes(Charsets.UTF_8);
+        byte mask = createMask(1, 1);
 
         ByteBuf buf = Unpooled.buffer();
-        buf.writeInt(1 + 4 + blobIdBytes.length + 8 + blobData.length);
+        buf.writeInt(1 + 1 + 4 + blobIdBytes.length + 8 + blobData.length);
         buf.writeByte(Messages.HEADER_BLOB);
+        buf.writeByte(mask);
         buf.writeInt(blobIdBytes.length);
         buf.writeBytes(blobIdBytes);
-        buf.writeLong(hash(blobData) + 1);
+        buf.writeLong(hash(mask, blobData) + 1);
         buf.writeBytes(blobData);
 
         EmbeddedChannel channel = new EmbeddedChannel(new ResponseDecoder());

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReaderTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReaderTest.java?rev=1804512&r1=1804511&r2=1804512&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReaderTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyBlobReaderTest.java
 Wed Aug  9 12:47:41 2017
@@ -18,30 +18,20 @@
 package org.apache.jackrabbit.oak.segment.standby.server;
 
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.junit.Test;
 
 public class DefaultStandbyBlobReaderTest {
 
-    private static InputStream newFailingInputStream() {
-        return new InputStream() {
-
-            @Override
-            public int read() throws IOException {
-                throw new IOException("generic error");
-            }
-
-        };
-    }
-
     @Test
     public void shouldAlwaysReturnNullWithoutBlobStore() throws Exception {
         DefaultStandbyBlobReader r = new DefaultStandbyBlobReader(null);
@@ -57,19 +47,21 @@ public class DefaultStandbyBlobReaderTes
     }
 
     @Test
-    public void shouldReturnNullIfBlobIsUnreadable() throws Exception {
+    public void shouldReturnNegativeLengthIfBlobIsUnreadable() throws 
Exception {
         BlobStore s = mock(BlobStore.class);
-        when(s.getInputStream("id")).thenReturn(newFailingInputStream());
+        when(s.getBlobLength("id")).thenReturn(-1L);
         DefaultStandbyBlobReader r = new DefaultStandbyBlobReader(s);
-        assertNull(r.readBlob("id"));
+        assertEquals(-1L, r.getBlobLength("id"));
     }
 
     @Test
     public void shouldReturnBlobContent() throws Exception {
         BlobStore s = mock(BlobStore.class);
         when(s.getInputStream("id")).thenReturn(new ByteArrayInputStream(new 
byte[]{1, 2, 3}));
+        when(s.getBlobLength("id")).thenReturn(3L);
         DefaultStandbyBlobReader r = new DefaultStandbyBlobReader(s);
-        assertArrayEquals(new byte[]{1, 2, 3}, r.readBlob("id"));
+        assertEquals(3, r.getBlobLength("id"));
+        assertArrayEquals(new byte[]{1, 2, 3}, 
IOUtils.toByteArray(r.readBlob("id")));
     }
 
 }

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandlerTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandlerTest.java?rev=1804512&r1=1804511&r2=1804512&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandlerTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/GetBlobRequestHandlerTest.java
 Wed Aug  9 12:47:41 2017
@@ -23,7 +23,10 @@ import static org.junit.Assert.assertNul
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.ByteArrayInputStream;
+
 import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.commons.io.IOUtils;
 import org.apache.jackrabbit.oak.segment.standby.codec.GetBlobRequest;
 import org.apache.jackrabbit.oak.segment.standby.codec.GetBlobResponse;
 import org.junit.Test;
@@ -35,14 +38,17 @@ public class GetBlobRequestHandlerTest {
         byte[] blobData = new byte[] {99, 114, 97, 112};
 
         StandbyBlobReader reader = mock(StandbyBlobReader.class);
-        when(reader.readBlob("blobId")).thenReturn(blobData);
+        when(reader.readBlob("blobId")).thenReturn(new 
ByteArrayInputStream(blobData));
+        when(reader.getBlobLength("blobId")).thenReturn(4L);
 
         EmbeddedChannel channel = new EmbeddedChannel(new 
GetBlobRequestHandler(reader));
         channel.writeInbound(new GetBlobRequest("clientId", "blobId"));
         GetBlobResponse response = (GetBlobResponse) channel.readOutbound();
         assertEquals("clientId", response.getClientId());
         assertEquals("blobId", response.getBlobId());
-        assertArrayEquals(blobData, response.getBlobData());
+        assertEquals(blobData.length, response.getLength());
+        byte[] receivedData = IOUtils.toByteArray(response.getInputStream());
+        assertArrayEquals(blobData, receivedData);
     }
 
     @Test


Reply via email to