Author: adulceanu
Date: Thu Sep 28 12:27:29 2017
New Revision: 1809982

URL: http://svn.apache.org/viewvc?rev=1809982&view=rev
Log:
OAK-6678 - Syncing big blobs fails since StandbyServer sends persisted head

Modified:
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/TestBase.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReaderTest.java

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java?rev=1809982&r1=1809981&r2=1809982&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java
 Thu Sep 28 12:27:29 2017
@@ -64,7 +64,8 @@ class StandbyClientSyncExecution {
         RecordId remoteHead = getHead();
 
         if (remoteHead == null) {
-            throw new IllegalStateException("Unable to fetch remote head");
+            log.error("Unable to fetch remote head");
+            return;
         }
 
         if (remoteHead.equals(store.getHead().getRecordId())) {

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java?rev=1809982&r1=1809981&r2=1809982&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java
 Thu Sep 28 12:27:29 2017
@@ -17,19 +17,25 @@
 
 package org.apache.jackrabbit.oak.segment.standby.server;
 
+import static 
org.apache.jackrabbit.oak.segment.standby.server.FileStoreUtil.readPersistedHeadWithRetry;
+
+import org.apache.jackrabbit.oak.segment.RecordId;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
 
 class DefaultStandbyHeadReader implements StandbyHeadReader {
 
     private final FileStore store;
+    private final long timeout;
 
-    DefaultStandbyHeadReader(FileStore store) {
+    DefaultStandbyHeadReader(FileStore store, long timeout) {
         this.store = store;
+        this.timeout = timeout;
     }
 
     @Override
     public String readHeadRecordId() {
-        return store.getRevisions().getPersistedHead().toString();
+        RecordId persistedHead = readPersistedHeadWithRetry(store, timeout);
+        return persistedHead != null ? persistedHead.toString() : null;
     }
 
 }

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java?rev=1809982&r1=1809981&r2=1809982&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java
 Thu Sep 28 12:27:29 2017
@@ -18,12 +18,12 @@
 package org.apache.jackrabbit.oak.segment.standby.server;
 
 import static com.google.common.collect.Lists.newArrayList;
-import static 
org.apache.jackrabbit.oak.segment.standby.server.FileStoreUtil.readSegmentWithRetry;
 
 import java.util.List;
 import java.util.UUID;
 
 import org.apache.jackrabbit.oak.segment.Segment;
+import org.apache.jackrabbit.oak.segment.SegmentId;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
 
 class DefaultStandbyReferencesReader implements StandbyReferencesReader {
@@ -40,20 +40,21 @@ class DefaultStandbyReferencesReader imp
 
         long msb = uuid.getMostSignificantBits();
         long lsb = uuid.getLeastSignificantBits();
+        SegmentId segmentId = store.getSegmentIdProvider().newSegmentId(msb, 
lsb);
 
-        Segment segment = readSegmentWithRetry(store, 
store.getSegmentIdProvider().newSegmentId(msb, lsb));
+        if (store.containsSegment(segmentId)) {
+            Segment segment = store.readSegment(segmentId);
 
-        if (segment == null) {
-            return null;
-        }
+            List<String> references = newArrayList();
 
-        List<String> references = newArrayList();
+            for (int i = 0; i < segment.getReferencedSegmentIdCount(); i++) {
+                references.add(segment.getReferencedSegmentId(i).toString());
+            }
 
-        for (int i = 0; i < segment.getReferencedSegmentIdCount(); i++) {
-            references.add(segment.getReferencedSegmentId(i).toString());
+            return references;
         }
 
-        return references;
+        return null;
     }
 
 }

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java?rev=1809982&r1=1809981&r2=1809982&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java
 Thu Sep 28 12:27:29 2017
@@ -17,13 +17,12 @@
 
 package org.apache.jackrabbit.oak.segment.standby.server;
 
-import static 
org.apache.jackrabbit.oak.segment.standby.server.FileStoreUtil.readSegmentWithRetry;
-
 import java.io.IOException;
 import java.util.UUID;
 
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.jackrabbit.oak.segment.Segment;
+import org.apache.jackrabbit.oak.segment.SegmentId;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,24 +38,24 @@ class DefaultStandbySegmentReader implem
     }
 
     @Override
-    public byte[] readSegment(String segmentId) {
-        UUID uuid = UUID.fromString(segmentId);
+    public byte[] readSegment(String id) {
+        UUID uuid = UUID.fromString(id);
         long msb = uuid.getMostSignificantBits();
         long lsb = uuid.getLeastSignificantBits();
+        SegmentId segmentId = store.getSegmentIdProvider().newSegmentId(msb, 
lsb);
 
-        Segment segment = readSegmentWithRetry(store, 
store.getSegmentIdProvider().newSegmentId(msb, lsb));
-
-        if (segment == null) {
-            return null;
-        }
-
-        try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
-            segment.writeTo(stream);
-            return stream.toByteArray();
-        } catch (IOException e) {
-            log.warn("Error while reading segment content", e);
-            return null;
+        if (store.containsSegment(segmentId)) {
+            Segment segment = store.readSegment(segmentId);
+            try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
+                segment.writeTo(stream);
+                return stream.toByteArray();
+            } catch (IOException e) {
+                log.warn("Error while reading segment content", e);
+                return null;
+            }
         }
+        
+        return null;
     }
 
 }

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java?rev=1809982&r1=1809981&r2=1809982&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java
 Thu Sep 28 12:27:29 2017
@@ -18,9 +18,9 @@
 package org.apache.jackrabbit.oak.segment.standby.server;
 
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
-import org.apache.jackrabbit.oak.segment.Segment;
-import org.apache.jackrabbit.oak.segment.SegmentId;
+import org.apache.jackrabbit.oak.segment.RecordId;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
 public final class FileStoreUtil {
 
     private static final Logger log = 
LoggerFactory.getLogger(FileStoreUtil.class);
+    private static final long DEFAULT_SLEEP_TIME = 125L;
 
     private FileStoreUtil() {
         // Prevent instantiation
@@ -37,14 +38,27 @@ public final class FileStoreUtil {
         return (int) Math.ceil((double) x / (double) y);
     }
     
-    static Segment readSegmentWithRetry(FileStore store, SegmentId id) {
-        for (int i = 0; i < 160; i++) {
-            if (store.containsSegment(id)) {
-                return store.readSegment(id);
+    static RecordId readPersistedHeadWithRetry(FileStore store, long timeout) {
+        Supplier<RecordId> headSupplier = () -> {
+            return store.getRevisions().getPersistedHead();
+        };
+
+        if (timeout > DEFAULT_SLEEP_TIME) {
+            return readWithRetry(headSupplier, "persisted head", timeout);
+        } else {
+            return headSupplier.get();
+        }
+    }
+    
+    private static <T> T readWithRetry(Supplier<T> supplier, String supplied, 
long timeout) {
+        for (int i = 0; i < timeout / DEFAULT_SLEEP_TIME; i++) {
+            if (supplier.get() != null) {
+                return supplier.get();
             }
+            
             try {
-                log.trace("Unable to read segment, waiting...");
-                TimeUnit.MILLISECONDS.sleep(125);
+                log.trace("Unable to read {}, waiting...", supplied);
+                TimeUnit.MILLISECONDS.sleep(DEFAULT_SLEEP_TIME);
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 return null;
@@ -52,5 +66,4 @@ public final class FileStoreUtil {
         }
         return null;
     }
-
 }

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java?rev=1809982&r1=1809981&r2=1809982&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
 Thu Sep 28 12:27:29 2017
@@ -55,6 +55,13 @@ import org.slf4j.LoggerFactory;
 class StandbyServer implements AutoCloseable {
 
     private static final Logger log = 
LoggerFactory.getLogger(StandbyServer.class);
+    
+    /**
+     * If a persisted head state cannot be acquired in less than this timeout,
+     * the 'get head' request from the client will be discarded.
+     */
+    static final long READ_HEAD_TIMEOUT =
+            Long.getLong("standby.server.timeout", 10_000L);
 
     static Builder builder(int port, StoreProvider provider, int 
blobChunkSize) {
         return new Builder(port, provider, blobChunkSize);
@@ -183,7 +190,7 @@ class StandbyServer implements AutoClose
 
                 FileStore store = builder.storeProvider.provideStore();
 
-                p.addLast(new GetHeadRequestHandler(new 
DefaultStandbyHeadReader(store)));
+                p.addLast(new GetHeadRequestHandler(new 
DefaultStandbyHeadReader(store, READ_HEAD_TIMEOUT)));
                 p.addLast(new GetSegmentRequestHandler(new 
DefaultStandbySegmentReader(store)));
                 p.addLast(new GetBlobRequestHandler(new 
DefaultStandbyBlobReader(store.getBlobStore())));
                 p.addLast(new GetReferencesRequestHandler(new 
DefaultStandbyReferencesReader(store)));

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java?rev=1809982&r1=1809981&r2=1809982&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java
 Thu Sep 28 12:27:29 2017
@@ -22,9 +22,9 @@ package org.apache.jackrabbit.oak.segmen
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeFalse;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -37,7 +37,6 @@ import org.apache.jackrabbit.oak.api.Blo
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
-import org.apache.jackrabbit.oak.commons.CIHelper;
 import org.apache.jackrabbit.oak.commons.junit.TemporaryPort;
 import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders;
 import org.apache.jackrabbit.oak.segment.file.FileStore;
@@ -125,6 +124,58 @@ public abstract class DataStoreTestBase
     public void after() {
         proxy.close();
     }
+    
+    @Test
+    public void testResilientSync() throws Exception {
+        final int blobSize = 5 * MB;
+        FileStore primary = getPrimary();
+        FileStore secondary = getSecondary();
+
+        NodeStore store = SegmentNodeStoreBuilders.builder(primary).build();
+        byte[] data = addTestContent(store, "server", blobSize);
+
+        // run 1: unsuccessful
+        try (
+                StandbyServerSync serverSync = new 
StandbyServerSync(serverPort.getPort(), primary, 1 * MB);
+                StandbyClientSync cl = newStandbyClientSync(secondary, 
serverPort.getPort(), 4_000)
+        ) {
+            serverSync.start();
+            // no persisted head on primary
+            // sync shouldn't be successful, but shouldn't throw exception 
either,
+            // timeout too low for TarMK flush thread to kick-in
+            cl.run();
+            assertNotEquals(primary.getHead(), secondary.getHead());
+        }
+        
+        // run 2: successful
+        try (
+                StandbyServerSync serverSync = new 
StandbyServerSync(serverPort.getPort(), primary, 1 * MB);
+                StandbyClientSync cl = newStandbyClientSync(secondary, 
serverPort.getPort(), 4_000)
+        ) {
+            serverSync.start();
+            // this time persisted head will be available on primary
+            // waited at least 4s + 4s > 5s (TarMK flush thread run frequency)
+            cl.run();
+            assertEquals(primary.getHead(), secondary.getHead());
+        }
+
+        assertTrue(primary.getStats().getApproximateSize() < MB);
+        assertTrue(secondary.getStats().getApproximateSize() < MB);
+
+        PropertyState ps = secondary.getHead().getChildNode("root")
+                .getChildNode("server").getProperty("testBlob");
+        assertNotNull(ps);
+        assertEquals(Type.BINARY.tag(), ps.getType().tag());
+        Blob b = ps.getValue(Type.BINARY);
+        assertEquals(blobSize, b.length());
+        byte[] testData = new byte[blobSize];
+        try (
+                InputStream blobInputStream = b.getNewStream()
+        ) {
+            ByteStreams.readFully(blobInputStream, testData);
+            assertArrayEquals(data, testData);
+        }
+    }
 
     @Test
     public void testSync() throws Exception {
@@ -167,8 +218,6 @@ public abstract class DataStoreTestBase
      */
     @Test
     public void testSyncBigBlob() throws Exception {
-        assumeFalse(CIHelper.jenkins());  // FIXME OAK-6678: fails on Jenkins
-        
         final long blobSize = (long) (1 * GB);
         final int seed = 13;
 

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/TestBase.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/TestBase.java?rev=1809982&r1=1809981&r2=1809982&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/TestBase.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/TestBase.java
 Thu Sep 28 12:27:29 2017
@@ -29,7 +29,7 @@ import org.junit.BeforeClass;
 
 public class TestBase {
     static final int MB = 1024 * 1024;
-    private static final int timeout = 
Integer.getInteger("standby.test.timeout", 500);
+    private static final int timeout = 
Integer.getInteger("standby.test.timeout", 5000);
 
     // Java 6 on Windows doesn't support dual IP stacks, so we will skip our
     // IPv6 tests.

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReaderTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReaderTest.java?rev=1809982&r1=1809981&r2=1809982&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReaderTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReaderTest.java
 Thu Sep 28 12:27:29 2017
@@ -19,6 +19,7 @@ package org.apache.jackrabbit.oak.segmen
 
 import static 
org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 import java.io.File;
 
@@ -40,7 +41,16 @@ public class DefaultStandbyHeadReaderTes
     public void shouldReturnHeadSegmentId() throws Exception {
         try (FileStore store = newFileStore()) {
             store.flush();
-            DefaultStandbyHeadReader reader = new 
DefaultStandbyHeadReader(store);
+            DefaultStandbyHeadReader reader = new 
DefaultStandbyHeadReader(store, 0L);
+            assertEquals(store.getRevisions().getPersistedHead().toString(), 
reader.readHeadRecordId());
+        }
+    }
+    
+    @Test
+    public void shouldWaitForFlushAndReturnHeadSegmentId() throws Exception {
+        try (FileStore store = newFileStore()) {
+            DefaultStandbyHeadReader reader = new 
DefaultStandbyHeadReader(store, 10_000L);
+            assertNotNull(reader.readHeadRecordId());
             assertEquals(store.getRevisions().getPersistedHead().toString(), 
reader.readHeadRecordId());
         }
     }


Reply via email to