Author: adulceanu
Date: Thu Sep 28 12:27:29 2017
New Revision: 1809982
URL: http://svn.apache.org/viewvc?rev=1809982&view=rev
Log:
OAK-6678 - Syncing big blobs fails since StandbyServer sends persisted head
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/TestBase.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReaderTest.java
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java?rev=1809982&r1=1809981&r2=1809982&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/client/StandbyClientSyncExecution.java
Thu Sep 28 12:27:29 2017
@@ -64,7 +64,8 @@ class StandbyClientSyncExecution {
RecordId remoteHead = getHead();
if (remoteHead == null) {
- throw new IllegalStateException("Unable to fetch remote head");
+ log.error("Unable to fetch remote head");
+ return;
}
if (remoteHead.equals(store.getHead().getRecordId())) {
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java?rev=1809982&r1=1809981&r2=1809982&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReader.java
Thu Sep 28 12:27:29 2017
@@ -17,19 +17,25 @@
package org.apache.jackrabbit.oak.segment.standby.server;
+import static
org.apache.jackrabbit.oak.segment.standby.server.FileStoreUtil.readPersistedHeadWithRetry;
+
+import org.apache.jackrabbit.oak.segment.RecordId;
import org.apache.jackrabbit.oak.segment.file.FileStore;
class DefaultStandbyHeadReader implements StandbyHeadReader {
private final FileStore store;
+ private final long timeout;
- DefaultStandbyHeadReader(FileStore store) {
+ DefaultStandbyHeadReader(FileStore store, long timeout) {
this.store = store;
+ this.timeout = timeout;
}
@Override
public String readHeadRecordId() {
- return store.getRevisions().getPersistedHead().toString();
+ RecordId persistedHead = readPersistedHeadWithRetry(store, timeout);
+ return persistedHead != null ? persistedHead.toString() : null;
}
}
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java?rev=1809982&r1=1809981&r2=1809982&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyReferencesReader.java
Thu Sep 28 12:27:29 2017
@@ -18,12 +18,12 @@
package org.apache.jackrabbit.oak.segment.standby.server;
import static com.google.common.collect.Lists.newArrayList;
-import static
org.apache.jackrabbit.oak.segment.standby.server.FileStoreUtil.readSegmentWithRetry;
import java.util.List;
import java.util.UUID;
import org.apache.jackrabbit.oak.segment.Segment;
+import org.apache.jackrabbit.oak.segment.SegmentId;
import org.apache.jackrabbit.oak.segment.file.FileStore;
class DefaultStandbyReferencesReader implements StandbyReferencesReader {
@@ -40,20 +40,21 @@ class DefaultStandbyReferencesReader imp
long msb = uuid.getMostSignificantBits();
long lsb = uuid.getLeastSignificantBits();
+ SegmentId segmentId = store.getSegmentIdProvider().newSegmentId(msb,
lsb);
- Segment segment = readSegmentWithRetry(store,
store.getSegmentIdProvider().newSegmentId(msb, lsb));
+ if (store.containsSegment(segmentId)) {
+ Segment segment = store.readSegment(segmentId);
- if (segment == null) {
- return null;
- }
+ List<String> references = newArrayList();
- List<String> references = newArrayList();
+ for (int i = 0; i < segment.getReferencedSegmentIdCount(); i++) {
+ references.add(segment.getReferencedSegmentId(i).toString());
+ }
- for (int i = 0; i < segment.getReferencedSegmentIdCount(); i++) {
- references.add(segment.getReferencedSegmentId(i).toString());
+ return references;
}
- return references;
+ return null;
}
}
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java?rev=1809982&r1=1809981&r2=1809982&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbySegmentReader.java
Thu Sep 28 12:27:29 2017
@@ -17,13 +17,12 @@
package org.apache.jackrabbit.oak.segment.standby.server;
-import static
org.apache.jackrabbit.oak.segment.standby.server.FileStoreUtil.readSegmentWithRetry;
-
import java.io.IOException;
import java.util.UUID;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.jackrabbit.oak.segment.Segment;
+import org.apache.jackrabbit.oak.segment.SegmentId;
import org.apache.jackrabbit.oak.segment.file.FileStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,24 +38,24 @@ class DefaultStandbySegmentReader implem
}
@Override
- public byte[] readSegment(String segmentId) {
- UUID uuid = UUID.fromString(segmentId);
+ public byte[] readSegment(String id) {
+ UUID uuid = UUID.fromString(id);
long msb = uuid.getMostSignificantBits();
long lsb = uuid.getLeastSignificantBits();
+ SegmentId segmentId = store.getSegmentIdProvider().newSegmentId(msb,
lsb);
- Segment segment = readSegmentWithRetry(store,
store.getSegmentIdProvider().newSegmentId(msb, lsb));
-
- if (segment == null) {
- return null;
- }
-
- try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
- segment.writeTo(stream);
- return stream.toByteArray();
- } catch (IOException e) {
- log.warn("Error while reading segment content", e);
- return null;
+ if (store.containsSegment(segmentId)) {
+ Segment segment = store.readSegment(segmentId);
+ try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
+ segment.writeTo(stream);
+ return stream.toByteArray();
+ } catch (IOException e) {
+ log.warn("Error while reading segment content", e);
+ return null;
+ }
}
+
+ return null;
}
}
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java?rev=1809982&r1=1809981&r2=1809982&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/FileStoreUtil.java
Thu Sep 28 12:27:29 2017
@@ -18,9 +18,9 @@
package org.apache.jackrabbit.oak.segment.standby.server;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
-import org.apache.jackrabbit.oak.segment.Segment;
-import org.apache.jackrabbit.oak.segment.SegmentId;
+import org.apache.jackrabbit.oak.segment.RecordId;
import org.apache.jackrabbit.oak.segment.file.FileStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
public final class FileStoreUtil {
private static final Logger log =
LoggerFactory.getLogger(FileStoreUtil.class);
+ private static final long DEFAULT_SLEEP_TIME = 125L;
private FileStoreUtil() {
// Prevent instantiation
@@ -37,14 +38,27 @@ public final class FileStoreUtil {
return (int) Math.ceil((double) x / (double) y);
}
- static Segment readSegmentWithRetry(FileStore store, SegmentId id) {
- for (int i = 0; i < 160; i++) {
- if (store.containsSegment(id)) {
- return store.readSegment(id);
+ static RecordId readPersistedHeadWithRetry(FileStore store, long timeout) {
+ Supplier<RecordId> headSupplier = () -> {
+ return store.getRevisions().getPersistedHead();
+ };
+
+ if (timeout > DEFAULT_SLEEP_TIME) {
+ return readWithRetry(headSupplier, "persisted head", timeout);
+ } else {
+ return headSupplier.get();
+ }
+ }
+
+ private static <T> T readWithRetry(Supplier<T> supplier, String supplied,
long timeout) {
+ for (int i = 0; i < timeout / DEFAULT_SLEEP_TIME; i++) {
+ if (supplier.get() != null) {
+ return supplier.get();
}
+
try {
- log.trace("Unable to read segment, waiting...");
- TimeUnit.MILLISECONDS.sleep(125);
+ log.trace("Unable to read {}, waiting...", supplied);
+ TimeUnit.MILLISECONDS.sleep(DEFAULT_SLEEP_TIME);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
@@ -52,5 +66,4 @@ public final class FileStoreUtil {
}
return null;
}
-
}
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java?rev=1809982&r1=1809981&r2=1809982&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/server/StandbyServer.java
Thu Sep 28 12:27:29 2017
@@ -55,6 +55,13 @@ import org.slf4j.LoggerFactory;
class StandbyServer implements AutoCloseable {
private static final Logger log =
LoggerFactory.getLogger(StandbyServer.class);
+
+ /**
+ * If a persisted head state cannot be acquired in less than this timeout,
+ * the 'get head' request from the client will be discarded.
+ */
+ static final long READ_HEAD_TIMEOUT =
+ Long.getLong("standby.server.timeout", 10_000L);
static Builder builder(int port, StoreProvider provider, int
blobChunkSize) {
return new Builder(port, provider, blobChunkSize);
@@ -183,7 +190,7 @@ class StandbyServer implements AutoClose
FileStore store = builder.storeProvider.provideStore();
- p.addLast(new GetHeadRequestHandler(new
DefaultStandbyHeadReader(store)));
+ p.addLast(new GetHeadRequestHandler(new
DefaultStandbyHeadReader(store, READ_HEAD_TIMEOUT)));
p.addLast(new GetSegmentRequestHandler(new
DefaultStandbySegmentReader(store)));
p.addLast(new GetBlobRequestHandler(new
DefaultStandbyBlobReader(store.getBlobStore())));
p.addLast(new GetReferencesRequestHandler(new
DefaultStandbyReferencesReader(store)));
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java?rev=1809982&r1=1809981&r2=1809982&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/DataStoreTestBase.java
Thu Sep 28 12:27:29 2017
@@ -22,9 +22,9 @@ package org.apache.jackrabbit.oak.segmen
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeFalse;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -37,7 +37,6 @@ import org.apache.jackrabbit.oak.api.Blo
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
-import org.apache.jackrabbit.oak.commons.CIHelper;
import org.apache.jackrabbit.oak.commons.junit.TemporaryPort;
import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders;
import org.apache.jackrabbit.oak.segment.file.FileStore;
@@ -125,6 +124,58 @@ public abstract class DataStoreTestBase
public void after() {
proxy.close();
}
+
+ @Test
+ public void testResilientSync() throws Exception {
+ final int blobSize = 5 * MB;
+ FileStore primary = getPrimary();
+ FileStore secondary = getSecondary();
+
+ NodeStore store = SegmentNodeStoreBuilders.builder(primary).build();
+ byte[] data = addTestContent(store, "server", blobSize);
+
+ // run 1: unsuccessful
+ try (
+ StandbyServerSync serverSync = new
StandbyServerSync(serverPort.getPort(), primary, 1 * MB);
+ StandbyClientSync cl = newStandbyClientSync(secondary,
serverPort.getPort(), 4_000)
+ ) {
+ serverSync.start();
+ // no persisted head on primary
+ // sync shouldn't be successful, but shouldn't throw exception
either,
+ // timeout too low for TarMK flush thread to kick-in
+ cl.run();
+ assertNotEquals(primary.getHead(), secondary.getHead());
+ }
+
+ // run 2: successful
+ try (
+ StandbyServerSync serverSync = new
StandbyServerSync(serverPort.getPort(), primary, 1 * MB);
+ StandbyClientSync cl = newStandbyClientSync(secondary,
serverPort.getPort(), 4_000)
+ ) {
+ serverSync.start();
+ // this time persisted head will be available on primary
+ // waited at least 4s + 4s > 5s (TarMK flush thread run frequency)
+ cl.run();
+ assertEquals(primary.getHead(), secondary.getHead());
+ }
+
+ assertTrue(primary.getStats().getApproximateSize() < MB);
+ assertTrue(secondary.getStats().getApproximateSize() < MB);
+
+ PropertyState ps = secondary.getHead().getChildNode("root")
+ .getChildNode("server").getProperty("testBlob");
+ assertNotNull(ps);
+ assertEquals(Type.BINARY.tag(), ps.getType().tag());
+ Blob b = ps.getValue(Type.BINARY);
+ assertEquals(blobSize, b.length());
+ byte[] testData = new byte[blobSize];
+ try (
+ InputStream blobInputStream = b.getNewStream()
+ ) {
+ ByteStreams.readFully(blobInputStream, testData);
+ assertArrayEquals(data, testData);
+ }
+ }
@Test
public void testSync() throws Exception {
@@ -167,8 +218,6 @@ public abstract class DataStoreTestBase
*/
@Test
public void testSyncBigBlob() throws Exception {
- assumeFalse(CIHelper.jenkins()); // FIXME OAK-6678: fails on Jenkins
-
final long blobSize = (long) (1 * GB);
final int seed = 13;
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/TestBase.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/TestBase.java?rev=1809982&r1=1809981&r2=1809982&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/TestBase.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/TestBase.java
Thu Sep 28 12:27:29 2017
@@ -29,7 +29,7 @@ import org.junit.BeforeClass;
public class TestBase {
static final int MB = 1024 * 1024;
- private static final int timeout =
Integer.getInteger("standby.test.timeout", 500);
+ private static final int timeout =
Integer.getInteger("standby.test.timeout", 5000);
// Java 6 on Windows doesn't support dual IP stacks, so we will skip our
// IPv6 tests.
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReaderTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReaderTest.java?rev=1809982&r1=1809981&r2=1809982&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReaderTest.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/server/DefaultStandbyHeadReaderTest.java
Thu Sep 28 12:27:29 2017
@@ -19,6 +19,7 @@ package org.apache.jackrabbit.oak.segmen
import static
org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import java.io.File;
@@ -40,7 +41,16 @@ public class DefaultStandbyHeadReaderTes
public void shouldReturnHeadSegmentId() throws Exception {
try (FileStore store = newFileStore()) {
store.flush();
- DefaultStandbyHeadReader reader = new
DefaultStandbyHeadReader(store);
+ DefaultStandbyHeadReader reader = new
DefaultStandbyHeadReader(store, 0L);
+ assertEquals(store.getRevisions().getPersistedHead().toString(),
reader.readHeadRecordId());
+ }
+ }
+
+ @Test
+ public void shouldWaitForFlushAndReturnHeadSegmentId() throws Exception {
+ try (FileStore store = newFileStore()) {
+ DefaultStandbyHeadReader reader = new
DefaultStandbyHeadReader(store, 10_000L);
+ assertNotNull(reader.readHeadRecordId());
assertEquals(store.getRevisions().getPersistedHead().toString(),
reader.readHeadRecordId());
}
}