Author: adulceanu
Date: Fri Jun 5 13:10:09 2020
New Revision: 1878509
URL: http://svn.apache.org/viewvc?rev=1878509&view=rev
Log:
OAK-9099 - Improve segment write resiliency for remote segment store
Contribution by Miroslav Smiljanic
Added:
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/package-info.java
Modified:
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueue.java
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java
Modified:
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueue.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueue.java?rev=1878509&r1=1878508&r2=1878509&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueue.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueue.java
Fri Jun 5 13:10:09 2020
@@ -89,6 +89,11 @@ public class SegmentWriteQueue implement
queue.put(segment);
} catch (InterruptedException e1) {
log.error("Can't re-add the segment {} to the queue. It'll
be dropped.", segment.getUuid(), e1);
+
+ synchronized (segmentsByUUID) {
+ segmentsByUUID.remove(segment.getUuid());
+ segmentsByUUID.notifyAll();
+ }
}
}
}
@@ -109,7 +114,7 @@ public class SegmentWriteQueue implement
private void consume(SegmentWriteAction segment) throws
SegmentConsumeException {
try {
segment.passTo(writer);
- } catch (IOException e) {
+ } catch (IOException | RuntimeException e) {
setBroken(true);
throw new SegmentConsumeException(segment, e);
}
@@ -271,7 +276,7 @@ public class SegmentWriteQueue implement
private final SegmentWriteAction segment;
- public SegmentConsumeException(SegmentWriteAction segment, IOException
cause) {
+ public SegmentConsumeException(SegmentWriteAction segment, Exception
cause) {
super(cause);
this.segment = segment;
}
Added:
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/package-info.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/package-info.java?rev=1878509&view=auto
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/package-info.java
(added)
+++
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/package-info.java
Fri Jun 5 13:10:09 2020
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@Internal(since = "1.0.0")
+@Version("1.0.0")
+package org.apache.jackrabbit.oak.segment.remote.queue;
+
+import org.apache.jackrabbit.oak.commons.annotations.Internal;
+import org.osgi.annotation.versioning.Version;
\ No newline at end of file
Modified:
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java?rev=1878509&r1=1878508&r2=1878509&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java
Fri Jun 5 13:10:09 2020
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.Atomi
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -280,6 +281,50 @@ public class SegmentWriteQueueTest {
}
}
+ @Test
+ public void testRuntimeExceptionInSegmentConsumer() throws
InterruptedException, NoSuchFieldException, IOException {
+
+ Set<UUID> added = Collections.synchronizedSet(new HashSet<>());
+ AtomicBoolean doBreak = new AtomicBoolean(true);
+ queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> {
+ //simulate runtime exception that can happen while writing to the
remote repository
+ if (doBreak.get()) {
+ throw new RuntimeException();
+ }
+
+ added.add(new UUID(tarEntry.getMsb(), tarEntry.getLsb()));
+ });
+
+ queue.addToQueue(tarEntry(0), EMPTY_DATA, 0, 0);
+ queue.addToQueue(tarEntry(1), EMPTY_DATA, 0, 0);
+ queue.addToQueue(tarEntry(2), EMPTY_DATA, 0, 0);
+
+ AtomicBoolean flushFinished = new AtomicBoolean(false);
+ Thread flusher = new Thread(() -> {
+ try {
+ queue.flush();
+ flushFinished.set(true);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ flusher.start();
+
+ Thread.sleep(100);
+
+ assertFalse("Flush thread should not be finished",
flushFinished.get());
+ assertEquals(0, added.size());
+
+ //Stop throwing runtime exception
+ doBreak.set(false);
+
+ //give enough time to emergency thread to wake up
+ Thread.sleep(1200);
+
+ assertTrue("Segment queue should be empty", flushFinished.get());
+ assertEquals(3, added.size());
+ }
+
private static RemoteSegmentArchiveEntry tarEntry(long i) {
return new RemoteSegmentArchiveEntry(0, i, 0, 0, 0, 0, false);
}