Author: adulceanu
Date: Fri Jun  5 13:10:09 2020
New Revision: 1878509

URL: http://svn.apache.org/viewvc?rev=1878509&view=rev
Log:
OAK-9099 - Improve segment write resiliency for remote segment store
Contribution by Miroslav Smiljanic

Added:
    
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/package-info.java
Modified:
    
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueue.java
    
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java

Modified: 
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueue.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueue.java?rev=1878509&r1=1878508&r2=1878509&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueue.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueue.java
 Fri Jun  5 13:10:09 2020
@@ -89,6 +89,11 @@ public class SegmentWriteQueue implement
                     queue.put(segment);
                 } catch (InterruptedException e1) {
                     log.error("Can't re-add the segment {} to the queue. It'll 
be dropped.", segment.getUuid(), e1);
+
+                    synchronized (segmentsByUUID) {
+                        segmentsByUUID.remove(segment.getUuid());
+                        segmentsByUUID.notifyAll();
+                    }
                 }
             }
         }
@@ -109,7 +114,7 @@ public class SegmentWriteQueue implement
     private void consume(SegmentWriteAction segment) throws 
SegmentConsumeException {
         try {
             segment.passTo(writer);
-        } catch (IOException e) {
+        } catch (IOException | RuntimeException e) {
             setBroken(true);
             throw new SegmentConsumeException(segment, e);
         }
@@ -271,7 +276,7 @@ public class SegmentWriteQueue implement
 
         private final SegmentWriteAction segment;
 
-        public SegmentConsumeException(SegmentWriteAction segment, IOException 
cause) {
+        public SegmentConsumeException(SegmentWriteAction segment, Exception 
cause) {
             super(cause);
             this.segment = segment;
         }

Added: 
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/package-info.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/package-info.java?rev=1878509&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/package-info.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-segment-remote/src/main/java/org/apache/jackrabbit/oak/segment/remote/queue/package-info.java
 Fri Jun  5 13:10:09 2020
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@Internal(since = "1.0.0")
+@Version("1.0.0")
+package org.apache.jackrabbit.oak.segment.remote.queue;
+
+import org.apache.jackrabbit.oak.commons.annotations.Internal;
+import org.osgi.annotation.versioning.Version;
\ No newline at end of file

Modified: 
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java?rev=1878509&r1=1878508&r2=1878509&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-remote/src/test/java/org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.java
 Fri Jun  5 13:10:09 2020
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.Atomi
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -280,6 +281,50 @@ public class SegmentWriteQueueTest {
         }
     }
 
+    @Test
+    public void testRuntimeExceptionInSegmentConsumer() throws 
InterruptedException, NoSuchFieldException, IOException {
+
+        Set<UUID> added = Collections.synchronizedSet(new HashSet<>());
+        AtomicBoolean doBreak = new AtomicBoolean(true);
+        queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> {
+            //simulate runtime exception that can happen while writing to the 
remote repository
+            if (doBreak.get()) {
+                throw new RuntimeException();
+            }
+
+            added.add(new UUID(tarEntry.getMsb(), tarEntry.getLsb()));
+        });
+
+        queue.addToQueue(tarEntry(0), EMPTY_DATA, 0, 0);
+        queue.addToQueue(tarEntry(1), EMPTY_DATA, 0, 0);
+        queue.addToQueue(tarEntry(2), EMPTY_DATA, 0, 0);
+
+        AtomicBoolean flushFinished = new AtomicBoolean(false);
+        Thread flusher = new Thread(() -> {
+            try {
+                queue.flush();
+                flushFinished.set(true);
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+        });
+        flusher.start();
+
+        Thread.sleep(100);
+
+        assertFalse("Flush thread should not be finished", 
flushFinished.get());
+        assertEquals(0, added.size());
+
+        //Stop throwing runtime exception
+        doBreak.set(false);
+
+        //give enough time to emergency thread to wake up
+        Thread.sleep(1200);
+
+        assertTrue("Segment queue should be empty", flushFinished.get());
+        assertEquals(3, added.size());
+    }
+
     private static RemoteSegmentArchiveEntry tarEntry(long i) {
         return new RemoteSegmentArchiveEntry(0, i, 0, 0, 0, 0, false);
     }


Reply via email to