Added: 
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java?rev=1827292&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java
 Tue Mar 20 10:54:09 2018
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure.queue;
+
+import org.apache.jackrabbit.oak.segment.azure.AzureSegmentArchiveEntry;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+public class SegmentWriteAction {
+
+    private final AzureSegmentArchiveEntry indexEntry;
+
+    private final byte[] buffer;
+
+    private final int offset;
+
+    private final int length;
+
+    public SegmentWriteAction(AzureSegmentArchiveEntry indexEntry, byte[] 
buffer, int offset, int length) {
+        this.indexEntry = indexEntry;
+
+        this.buffer = new byte[length];
+        for (int i = 0; i < length; i++) {
+            this.buffer[i] = buffer[i + offset];
+        }
+        this.offset = 0;
+        this.length = length;
+    }
+
+    public UUID getUuid() {
+        return new UUID(indexEntry.getMsb(), indexEntry.getLsb());
+    }
+
+    public ByteBuffer toByteBuffer() {
+        return ByteBuffer.wrap(buffer, offset, length);
+    }
+
+    void passTo(SegmentWriteQueue.SegmentConsumer consumer) throws IOException 
{
+        consumer.consume(indexEntry, buffer, offset, length);
+    }
+
+    @Override
+    public String toString() {
+        return getUuid().toString();
+    }
+}

Added: 
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java?rev=1827292&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java
 Tue Mar 20 10:54:09 2018
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure.queue;
+
+import org.apache.jackrabbit.oak.segment.azure.AzureSegmentArchiveEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class SegmentWriteQueue implements Closeable {
+
+    public static final int THREADS = 
Integer.getInteger("oak.segment.azure.threads", 5);
+
+    private static final int QUEUE_SIZE = 
Integer.getInteger("oak.segment.org.apache.jackrabbit.oak.segment.azure.queue", 
20);
+
+    private static final Logger log = 
LoggerFactory.getLogger(SegmentWriteQueue.class);
+
+    private final BlockingDeque<SegmentWriteAction> queue;
+
+    private final Map<UUID, SegmentWriteAction> segmentsByUUID;
+
+    private final ExecutorService executor;
+
+    private final ReadWriteLock flushLock;
+
+    private final SegmentConsumer writer;
+
+    private volatile boolean shutdown;
+
+    private final Object brokenMonitor = new Object();
+
+    private volatile boolean broken;
+
+    public SegmentWriteQueue(SegmentConsumer writer) {
+        this(writer, QUEUE_SIZE, THREADS);
+    }
+
+    SegmentWriteQueue(SegmentConsumer writer, int queueSize, int threadNo) {
+        this.writer = writer;
+        segmentsByUUID = new ConcurrentHashMap<>();
+        flushLock = new ReentrantReadWriteLock();
+
+        queue = new LinkedBlockingDeque<>(queueSize);
+        executor = Executors.newFixedThreadPool(threadNo + 1);
+        for (int i = 0; i < threadNo; i++) {
+            executor.submit(this::mainLoop);
+        }
+        executor.submit(this::emergencyLoop);
+    }
+
+    private void mainLoop() {
+        while (!shutdown) {
+            try {
+                waitWhileBroken();
+                if (shutdown) {
+                    break;
+                }
+                consume();
+            } catch (SegmentConsumeException e) {
+                SegmentWriteAction segment = e.segment;
+                log.error("Can't persist the segment {}", segment.getUuid(), 
e.getCause());
+                try {
+                    queue.put(segment);
+                } catch (InterruptedException e1) {
+                    log.error("Can't re-add the segment {} to the queue. It'll 
be dropped.", segment.getUuid(), e1);
+                }
+            }
+        }
+    }
+
+    private void consume() throws SegmentConsumeException {
+        SegmentWriteAction segment = null;
+        try {
+            segment = queue.poll(100, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            log.error("Poll from queue interrupted", e);
+        }
+        if (segment != null) {
+            consume(segment);
+        }
+    }
+
+    private void consume(SegmentWriteAction segment) throws 
SegmentConsumeException {
+        try {
+            segment.passTo(writer);
+        } catch (IOException e) {
+            setBroken(true);
+            throw new SegmentConsumeException(segment, e);
+        }
+        synchronized (segmentsByUUID) {
+            segmentsByUUID.remove(segment.getUuid());
+            segmentsByUUID.notifyAll();
+        }
+        setBroken(false);
+    }
+
+    private void emergencyLoop() {
+        while (!shutdown) {
+            waitUntilBroken();
+            if (shutdown) {
+                break;
+            }
+
+            boolean success = false;
+            SegmentWriteAction segmentToRetry = null;
+            do {
+                try {
+                    if (segmentToRetry == null) {
+                        consume();
+                    } else {
+                        consume(segmentToRetry);
+                    }
+                    success = true;
+                } catch (SegmentConsumeException e) {
+                    segmentToRetry = e.segment;
+                    log.error("Can't persist the segment {}", 
segmentToRetry.getUuid(), e.getCause());
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e1) {
+                        log.warn("Interrupted", e);
+                    }
+                    if (shutdown) {
+                        log.error("Shutdown initiated. The segment {} will be 
dropped.", segmentToRetry.getUuid());
+                    }
+                }
+            } while (!success && !shutdown);
+        }
+    }
+
+    public void addToQueue(AzureSegmentArchiveEntry indexEntry, byte[] data, 
int offset, int size) throws IOException {
+        waitWhileBroken();
+        if (shutdown) {
+            throw new IllegalStateException("Can't accept the new segment - 
shutdown in progress");
+        }
+
+        SegmentWriteAction action = new SegmentWriteAction(indexEntry, data, 
offset, size);
+        flushLock.readLock().lock();
+        try {
+            segmentsByUUID.put(action.getUuid(), action);
+            if (!queue.offer(action, 1, TimeUnit.MINUTES)) {
+                segmentsByUUID.remove(action.getUuid());
+                throw new IOException("Can't add segment to the queue");
+            }
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        } finally {
+            flushLock.readLock().unlock();
+        }
+    }
+
+    public void flush() throws IOException {
+        flushLock.writeLock().lock();
+        try {
+            synchronized (segmentsByUUID) {
+                long start = System.currentTimeMillis();
+                while (!segmentsByUUID.isEmpty()) {
+                    segmentsByUUID.wait(100);
+                    if (System.currentTimeMillis() - start > 
TimeUnit.MINUTES.toMillis(1)) {
+                        log.error("Can't flush the queue in 1 minute. Queue: 
{}. Segment map: {}", queue, segmentsByUUID);
+                        start = System.currentTimeMillis();
+                    }
+                }
+            }
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        } finally {
+            flushLock.writeLock().unlock();
+        }
+    }
+
+    public SegmentWriteAction read(UUID id) {
+        return segmentsByUUID.get(id);
+    }
+
+    public void close() throws IOException {
+        shutdown = true;
+        try {
+            executor.shutdown();
+            if (!executor.awaitTermination(1, TimeUnit.MINUTES)) {
+                throw new IOException("The write wasn't able to shut down 
clearly");
+            }
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
+
+    public boolean isEmpty() {
+        return segmentsByUUID.isEmpty();
+    }
+
+    boolean isBroken() {
+        return broken;
+    }
+
+    int getSize() {
+        return queue.size();
+    }
+
+    private void setBroken(boolean broken) {
+        synchronized (brokenMonitor) {
+            this.broken = broken;
+            brokenMonitor.notifyAll();
+        }
+    }
+
+    private void waitWhileBroken() {
+        if (!broken) {
+            return;
+        }
+        synchronized (brokenMonitor) {
+            while (broken && !shutdown) {
+                try {
+                    brokenMonitor.wait(100);
+                } catch (InterruptedException e) {
+                    log.warn("Interrupted", e);
+                }
+            }
+        }
+    }
+
+    private void waitUntilBroken() {
+        if (broken) {
+            return;
+        }
+        synchronized (brokenMonitor) {
+            while (!broken && !shutdown) {
+                try {
+                    brokenMonitor.wait(100);
+                } catch (InterruptedException e) {
+                    log.warn("Interrupted", e);
+                }
+            }
+        }
+    }
+
+    public interface SegmentConsumer {
+
+        void consume(AzureSegmentArchiveEntry indexEntry, byte[] data, int 
offset, int size) throws IOException;
+
+    }
+
+    public static class SegmentConsumeException extends Exception {
+
+        private final SegmentWriteAction segment;
+
+        public SegmentConsumeException(SegmentWriteAction segment, IOException 
cause) {
+            super(cause);
+            this.segment = segment;
+        }
+    }
+}

Added: 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java?rev=1827292&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java
 Tue Mar 20 10:54:09 2018
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.UUID;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static org.junit.Assert.assertEquals;
+
+public class AzureArchiveManagerTest {
+
+    @ClassRule
+    public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+    private CloudBlobContainer container;
+
+    @Before
+    public void setup() throws StorageException, InvalidKeyException, 
URISyntaxException {
+        container = azurite.getContainer("oak-test");
+    }
+
+    @Test
+    public void testRecovery() throws StorageException, URISyntaxException, 
IOException {
+        SegmentArchiveManager manager = new 
AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(false,
 new IOMonitorAdapter(), new FileStoreMonitorAdapter());
+        SegmentArchiveWriter writer = manager.create("data00000a.tar");
+
+        List<UUID> uuids = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            UUID u = UUID.randomUUID();
+            writer.writeSegment(u.getMostSignificantBits(), 
u.getLeastSignificantBits(), new byte[10], 0, 10, 0, 0, false);
+            uuids.add(u);
+        }
+
+        writer.flush();
+        writer.close();
+
+        container.getBlockBlobReference("oak/data00000a.tar/0005." + 
uuids.get(5).toString()).delete();
+
+        LinkedHashMap<UUID, byte[]> recovered = new LinkedHashMap<>();
+        manager.recoverEntries("data00000a.tar", recovered);
+        assertEquals(uuids.subList(0, 5), newArrayList(recovered.keySet()));
+    }
+
+}

Added: 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalTest.java?rev=1827292&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalTest.java
 Tue Mar 20 10:54:09 2018
@@ -0,0 +1,45 @@
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import 
org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.file.GcJournalTest;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+public class AzureGCJournalTest extends GcJournalTest {
+
+    @ClassRule
+    public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+    private CloudBlobContainer container;
+
+    @Before
+    public void setup() throws StorageException, InvalidKeyException, 
URISyntaxException {
+        container = azurite.getContainer("oak-test");
+    }
+
+    @Override
+    protected SegmentNodeStorePersistence getPersistence() throws Exception {
+        return new AzurePersistence(container.getDirectoryReference("oak"));
+    }
+
+    @Test
+    @Ignore
+    @Override
+    public void testReadOak16GCLog() throws Exception {
+        super.testReadOak16GCLog();
+    }
+
+    @Test
+    @Ignore
+    @Override
+    public void testUpdateOak16GCLog() throws Exception {
+        super.testUpdateOak16GCLog();
+    }
+}

Added: 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java?rev=1827292&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java
 Tue Mar 20 10:54:09 2018
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class AzureJournalFileTest {
+
+    @ClassRule
+    public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+    private CloudBlobContainer container;
+
+    private AzureJournalFile journal;
+
+    @Before
+    public void setup() throws StorageException, InvalidKeyException, 
URISyntaxException {
+        container = azurite.getContainer("oak-test");
+        journal = new 
AzureJournalFile(container.getDirectoryReference("journal"), "journal.log", 10);
+    }
+
+    @Test
+    public void testSplitJournalFiles() throws IOException {
+        assertFalse(journal.exists());
+
+        JournalFileWriter writer = journal.openJournalWriter();
+        for (int i = 0; i < 100; i++) {
+            writer.writeLine("line " + i);
+        }
+
+        assertTrue(journal.exists());
+
+        writer = journal.openJournalWriter();
+        for (int i = 100; i < 200; i++) {
+            writer.writeLine("line " + i);
+        }
+
+        JournalFileReader reader = journal.openJournalReader();
+        for (int i = 199; i >= 0; i--) {
+            assertEquals("line " + i, reader.readLine());
+        }
+
+        
+    }
+
+}

Added: 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFileTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFileTest.java?rev=1827292&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFileTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFileTest.java
 Tue Mar 20 10:54:09 2018
@@ -0,0 +1,44 @@
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.spi.persistence.ManifestFile;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class AzureManifestFileTest {
+
+    @ClassRule
+    public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+    private CloudBlobContainer container;
+
+    @Before
+    public void setup() throws StorageException, InvalidKeyException, 
URISyntaxException {
+        container = azurite.getContainer("oak-test");
+    }
+
+    @Test
+    public void testManifest() throws URISyntaxException, IOException {
+        ManifestFile manifestFile = new 
AzurePersistence(container.getDirectoryReference("oak")).getManifestFile();
+        assertFalse(manifestFile.exists());
+
+        Properties props = new Properties();
+        props.setProperty("xyz", "abc");
+        props.setProperty("version", "123");
+        manifestFile.save(props);
+
+        Properties loaded = manifestFile.load();
+        assertEquals(props, loaded);
+    }
+
+}

Added: 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java?rev=1827292&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java
 Tue Mar 20 10:54:09 2018
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.file.tar.TarFileTest;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+public class AzureTarFileTest extends TarFileTest {
+
+    @ClassRule
+    public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+    private CloudBlobContainer container;
+
+    @Before
+    @Override
+    public void setUp() throws IOException {
+        try {
+            container = azurite.getContainer("oak-test");
+            archiveManager = new 
AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(true,
 new IOMonitorAdapter(), new FileStoreMonitorAdapter());
+        } catch (StorageException | InvalidKeyException | URISyntaxException 
e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected long getWriteAndReadExpectedSize() {
+        return 45;
+    }
+
+    @Test
+    @Ignore
+    @Override
+    public void graphShouldBeTrimmedDownOnSweep() throws Exception {
+        super.graphShouldBeTrimmedDownOnSweep();
+    }
+}
\ No newline at end of file

Added: 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java?rev=1827292&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java
 Tue Mar 20 10:54:09 2018
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.file.tar.TarFiles;
+import org.apache.jackrabbit.oak.segment.file.tar.TarFilesTest;
+import org.junit.Before;
+import org.junit.ClassRule;
+
+public class AzureTarFilesTest extends TarFilesTest {
+
+    @ClassRule
+    public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+    private CloudBlobContainer container;
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        container = azurite.getContainer("oak-test");
+        tarFiles = TarFiles.builder()
+                .withDirectory(folder.newFolder())
+                .withTarRecovery((id, data, recovery) -> {
+                    // Intentionally left blank
+                })
+                .withIOMonitor(new IOMonitorAdapter())
+                .withFileStoreMonitor(new FileStoreMonitorAdapter())
+                .withMaxFileSize(MAX_FILE_SIZE)
+                .withPersistence(new 
AzurePersistence(container.getDirectoryReference("oak")))
+                .build();
+    }
+}

Added: 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java?rev=1827292&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java
 Tue Mar 20 10:54:09 2018
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.file.tar.TarWriterTest;
+import org.junit.Before;
+import org.junit.ClassRule;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+public class AzureTarWriterTest extends TarWriterTest {
+
+    @ClassRule
+    public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+    private CloudBlobContainer container;
+
+    @Before
+    @Override
+    public void setUp() throws IOException {
+        try {
+            monitor = new TestFileStoreMonitor();
+            container = azurite.getContainer("oak-test");
+            archiveManager = new 
AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(true,
 new IOMonitorAdapter(), monitor);
+        } catch (StorageException | InvalidKeyException | URISyntaxException 
e) {
+            throw new IOException(e);
+        }
+    }
+}

Added: 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java?rev=1827292&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java
 Tue Mar 20 10:54:09 2018
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure;
+
+import com.arakelian.docker.junit.DockerRule;
+import com.arakelian.docker.junit.model.ImmutableDockerConfig;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.spotify.docker.client.DefaultDockerClient;
+import org.junit.Assume;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+public class AzuriteDockerRule implements TestRule {
+
+    private final DockerRule wrappedRule;
+
+    public AzuriteDockerRule() {
+        wrappedRule = new DockerRule(ImmutableDockerConfig.builder()
+                .image("trekawek/azurite")
+                .name("oak-test-azurite")
+                .ports("10000")
+                .addStartedListener(container -> {
+                    container.waitForPort("10000/tcp");
+                    container.waitForLog("Azure Blob Storage Emulator 
listening on port 10000");
+                })
+                .addContainerConfigurer(builder -> 
builder.env("executable=blob"))
+                .alwaysRemoveContainer(true)
+                .build());
+
+    }
+
+    public CloudBlobContainer getContainer(String name) throws 
URISyntaxException, StorageException, InvalidKeyException {
+        int mappedPort = 
wrappedRule.getContainer().getPortBinding("10000/tcp").getPort();
+        CloudStorageAccount cloud = 
CloudStorageAccount.parse("DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:";
 + mappedPort + "/devstoreaccount1;");
+        CloudBlobContainer container = 
cloud.createCloudBlobClient().getContainerReference(name);
+        container.deleteIfExists();
+        container.create();
+        return container;
+    }
+
+    @Override
+    public Statement apply(Statement statement, Description description) {
+        try {
+            DefaultDockerClient client = 
DefaultDockerClient.fromEnv().connectTimeoutMillis(5000L).readTimeoutMillis(20000L).build();
+            client.ping();
+            client.close();
+        } catch (Exception e) {
+            Assume.assumeNoException(e);
+        }
+
+        return wrappedRule.apply(statement, description);
+    }
+}

Added: 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/SegmentAzureFixture.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/SegmentAzureFixture.java?rev=1827292&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/SegmentAzureFixture.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/SegmentAzureFixture.java
 Tue Mar 20 10:54:09 2018
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.azure.fixture;
+
+import com.google.common.io.Files;
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+import org.apache.jackrabbit.oak.fixture.NodeStoreFixture;
+import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders;
+import org.apache.jackrabbit.oak.segment.azure.AzurePersistence;
+import org.apache.jackrabbit.oak.segment.file.FileStore;
+import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder;
+import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class SegmentAzureFixture extends NodeStoreFixture {
+
+    private static final String AZURE_CONNECTION_STRING = 
System.getProperty("oak.segment.azure.connection", 
"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;";);
+
+    private static final String AZURE_CONTAINER = 
System.getProperty("oak.segment.azure.container", "oak");
+
+    private static final String AZURE_ROOT_PATH = 
System.getProperty("oak.segment.azure.rootPath", "/oak");
+
+    private Map<NodeStore, FileStore> fileStoreMap = new HashMap<>();
+
+    private Map<NodeStore, CloudBlobContainer> containerMap = new HashMap<>();
+
+    @Override
+    public NodeStore createNodeStore() {
+        AzurePersistence persistence;
+        CloudBlobContainer container;
+        try {
+            CloudStorageAccount cloud = 
CloudStorageAccount.parse(AZURE_CONNECTION_STRING);
+
+            int i = 1;
+            while (true) {
+                String containerName;
+                if (i == 1) {
+                    containerName = AZURE_CONTAINER;
+                } else {
+                    containerName = AZURE_CONTAINER + "_" + i;
+                }
+                container = 
cloud.createCloudBlobClient().getContainerReference(containerName);
+                if (!container.exists()) {
+                    container.create();
+                    break;
+                }
+                i++;
+            }
+            CloudBlobDirectory directory = 
container.getDirectoryReference(AZURE_ROOT_PATH);
+            persistence = new AzurePersistence(directory);
+        } catch (StorageException | URISyntaxException | InvalidKeyException 
e) {
+            throw new RuntimeException(e);
+        }
+
+        try {
+            FileStore fileStore = 
FileStoreBuilder.fileStoreBuilder(Files.createTempDir()).withCustomPersistence(persistence).build();
+            NodeStore nodeStore = 
SegmentNodeStoreBuilders.builder(fileStore).build();
+            fileStoreMap.put(nodeStore, fileStore);
+            containerMap.put(nodeStore, container);
+            return nodeStore;
+        } catch (IOException | InvalidFileStoreVersionException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void dispose(NodeStore nodeStore) {
+        FileStore fs = fileStoreMap.remove(nodeStore);
+        if (fs != null) {
+            fs.close();
+        }
+        try {
+            CloudBlobContainer container = containerMap.remove(nodeStore);
+            if (container != null) {
+                container.deleteIfExists();
+            }
+        } catch (StorageException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "SegmentAzure";
+    }
+}

Added: 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java?rev=1827292&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java
 Tue Mar 20 10:54:09 2018
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure.journal;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudAppendBlob;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.azure.AzuriteDockerRule;
+import org.apache.jackrabbit.oak.segment.file.JournalReader;
+import org.apache.jackrabbit.oak.segment.file.JournalReaderTest;
+import org.apache.jackrabbit.oak.segment.azure.AzureJournalFile;
+import org.junit.Before;
+import org.junit.ClassRule;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+public class AzureJournalReaderTest extends JournalReaderTest {
+
+    @ClassRule
+    public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+    private CloudBlobContainer container;
+
+    @Before
+    public void setup() throws StorageException, InvalidKeyException, 
URISyntaxException {
+        container = azurite.getContainer("oak-test");
+    }
+
+    protected JournalReader createJournalReader(String s) throws IOException {
+        try {
+            CloudAppendBlob blob = 
container.getAppendBlobReference("journal/journal.log.001");
+            blob.createOrReplace();
+            blob.appendText(s);
+            return new JournalReader(new 
AzureJournalFile(container.getDirectoryReference("journal"), "journal.log"));
+        } catch (StorageException | URISyntaxException e) {
+            throw new IOException(e);
+        }
+    }
+}

Added: 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureTarRevisionsTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureTarRevisionsTest.java?rev=1827292&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureTarRevisionsTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureTarRevisionsTest.java
 Tue Mar 20 10:54:09 2018
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure.journal;
+
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import 
org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.apache.jackrabbit.oak.segment.azure.AzuriteDockerRule;
+import org.apache.jackrabbit.oak.segment.file.TarRevisionsTest;
+import org.apache.jackrabbit.oak.segment.azure.AzurePersistence;
+import org.junit.Before;
+import org.junit.ClassRule;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+public class AzureTarRevisionsTest extends TarRevisionsTest {
+
+    @ClassRule
+    public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+    private CloudBlobContainer container;
+
+    @Before
+    public void setup() throws Exception {
+        container = azurite.getContainer("oak-test");
+        super.setup();
+    }
+
+    @Override
+    protected SegmentNodeStorePersistence getPersistence() throws IOException {
+        try {
+            return new 
AzurePersistence(container.getDirectoryReference("oak"));
+        } catch (URISyntaxException e) {
+            throw new IOException(e);
+        }
+    }
+}

Added: 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ReverseFileReaderTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ReverseFileReaderTest.java?rev=1827292&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ReverseFileReaderTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ReverseFileReaderTest.java
 Tue Mar 20 10:54:09 2018
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure.journal;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudAppendBlob;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import org.apache.jackrabbit.oak.segment.azure.AzuriteDockerRule;
+import org.apache.jackrabbit.oak.segment.azure.ReverseFileReader;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+public class ReverseFileReaderTest {
+
+    @ClassRule
+    public static AzuriteDockerRule azurite = new AzuriteDockerRule();
+
+    private CloudBlobContainer container;
+
+    @Before
+    public void setup() throws StorageException, InvalidKeyException, 
URISyntaxException {
+        container = azurite.getContainer("oak-test");
+        getBlob().createOrReplace();
+    }
+
+    private CloudAppendBlob getBlob() throws URISyntaxException, 
StorageException {
+        return container.getAppendBlobReference("test-blob");
+    }
+
+    @Test
+    public void testReverseReader() throws IOException, URISyntaxException, 
StorageException {
+        List<String> entries = createFile( 1024, 80);
+        ReverseFileReader reader = new ReverseFileReader(getBlob(), 256);
+        assertEquals(entries, reader);
+    }
+
+    @Test
+    public void testEmptyFile() throws IOException, URISyntaxException, 
StorageException {
+        List<String> entries = createFile( 0, 80);
+        ReverseFileReader reader = new ReverseFileReader(getBlob(), 256);
+        assertEquals(entries, reader);
+    }
+
+    @Test
+    public void test1ByteBlock() throws IOException, URISyntaxException, 
StorageException {
+        List<String> entries = createFile( 10, 16);
+        ReverseFileReader reader = new ReverseFileReader(getBlob(), 1);
+        assertEquals(entries, reader);
+    }
+
+
+    private List<String> createFile(int lines, int maxLineLength) throws 
IOException, URISyntaxException, StorageException {
+        Random random = new Random();
+        List<String> entries = new ArrayList<>();
+        CloudAppendBlob blob = getBlob();
+        for (int i = 0; i < lines; i++) {
+            int entrySize = random.nextInt(maxLineLength) + 1;
+            String entry = randomString(entrySize);
+            try {
+                blob.appendText(entry + '\n');
+            } catch (StorageException e) {
+                throw new IOException(e);
+            }
+            entries.add(entry);
+        }
+
+        entries.add("");
+        Collections.reverse(entries);
+        return entries;
+    }
+
+    private static void assertEquals(List<String> entries, ReverseFileReader 
reader) throws IOException {
+        int i = entries.size();
+        for (String e : entries) {
+            Assert.assertEquals("line " + (--i), e, reader.readLine());
+        }
+        Assert.assertNull(reader.readLine());
+    }
+
+    private static String randomString(int entrySize) {
+        Random r = new Random();
+
+        StringBuilder result = new StringBuilder();
+        for (int i = 0; i < entrySize; i++) {
+            result.append((char) ('a' + r.nextInt('z' - 'a')));
+        }
+
+        return result.toString();
+    }
+}

Added: 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java?rev=1827292&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java
 Tue Mar 20 10:54:09 2018
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure.queue;
+
+import org.apache.jackrabbit.oak.segment.azure.AzureSegmentArchiveEntry;
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class SegmentWriteQueueTest {
+
+    private static final byte[] EMPTY_DATA = new byte[0];
+
+    private SegmentWriteQueue queue;
+
+    @After
+    public void shutdown() throws IOException {
+        if (queue != null) {
+            queue.close();
+        }
+    }
+
+    @Test
+    public void testQueue() throws IOException, InterruptedException {
+        Set<UUID> added = Collections.synchronizedSet(new HashSet<>());
+        Semaphore semaphore = new Semaphore(0);
+        queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> {
+            try {
+                semaphore.acquire();
+            } catch (InterruptedException e) {
+            }
+            added.add(new UUID(tarEntry.getMsb(), tarEntry.getLsb()));
+        });
+
+        for (int i = 0; i < 10; i++) {
+            queue.addToQueue(tarEntry(i), EMPTY_DATA, 0, 0);
+        }
+
+        for (int i = 0; i < 10; i++) {
+            assertNotNull("Segments should be available for read", 
queue.read(uuid(i)));
+        }
+        assertFalse("Queue shouldn't be empty", queue.isEmpty());
+
+        semaphore.release(Integer.MAX_VALUE);
+        while (!queue.isEmpty()) {
+            Thread.sleep(10);
+        }
+
+        assertEquals("There should be 10 segments consumed",10, added.size());
+        for (int i = 0; i < 10; i++) {
+            assertTrue("Missing consumed segment", added.contains(uuid(i)));
+        }
+    }
+
+    @Test(timeout = 1000)
+    public void testFlush() throws IOException, InterruptedException {
+        Set<UUID> added = Collections.synchronizedSet(new HashSet<>());
+        Semaphore semaphore = new Semaphore(0);
+        queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> {
+            try {
+                semaphore.acquire();
+            } catch (InterruptedException e) {
+            }
+            added.add(new UUID(tarEntry.getMsb(), tarEntry.getLsb()));
+        });
+
+        for (int i = 0; i < 3; i++) {
+            queue.addToQueue(tarEntry(i), EMPTY_DATA, 0, 0);
+        }
+
+        AtomicBoolean flushFinished = new AtomicBoolean(false);
+        Set<UUID> addedAfterFlush = new HashSet<>();
+        new Thread(() -> {
+            try {
+                queue.flush();
+                flushFinished.set(true);
+                addedAfterFlush.addAll(added);
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+        }).start();
+
+        Thread.sleep(100);
+        assertFalse("Flush should be blocked", flushFinished.get());
+
+        AtomicBoolean addFinished = new AtomicBoolean(false);
+        new Thread(() -> {
+            try {
+                queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0);
+                addFinished.set(true);
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+        }).start();
+
+        Thread.sleep(100);
+        assertFalse("Adding segments should be blocked until the flush is 
finished", addFinished.get());
+
+        semaphore.release(Integer.MAX_VALUE);
+
+        while (!addFinished.get()) {
+            Thread.sleep(10);
+        }
+        assertTrue("Flush should be finished once the ", flushFinished.get());
+        assertTrue("Adding segments should be blocked until the flush is 
finished", addFinished.get());
+
+        for (int i = 0; i < 3; i++) {
+            assertTrue(addedAfterFlush.contains(uuid(i)));
+        }
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testClose() throws IOException, InterruptedException {
+        queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> {});
+        queue.close();
+        queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0);
+    }
+
+    @Test
+    public void testRecoveryMode() throws IOException, InterruptedException {
+        Set<UUID> added = Collections.synchronizedSet(new HashSet<>());
+        Semaphore semaphore = new Semaphore(0);
+        AtomicBoolean doBreak = new AtomicBoolean(true);
+        List<Long> writeAttempts = Collections.synchronizedList(new 
ArrayList<>());
+        queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> {
+            writeAttempts.add(System.currentTimeMillis());
+            try {
+                semaphore.acquire();
+            } catch (InterruptedException e) {
+            }
+            if (doBreak.get()) {
+                throw new IOException();
+            }
+            added.add(new UUID(tarEntry.getMsb(), tarEntry.getLsb()));
+        });
+
+        for (int i = 0; i < 10; i++) {
+            queue.addToQueue(tarEntry(i), EMPTY_DATA, 0, 0);
+        }
+
+        semaphore.release(Integer.MAX_VALUE);
+        Thread.sleep(100);
+
+        assertTrue(queue.isBroken());
+        assertEquals(9, queue.getSize()); // the 10th segment is handled by 
the recovery thread
+
+        writeAttempts.clear();
+        while (writeAttempts.size() < 5) {
+            Thread.sleep(100);
+        }
+        long lastAttempt = writeAttempts.get(0);
+        for (int i = 1; i < 5; i++) {
+            long delay = writeAttempts.get(i) - lastAttempt;
+            assertTrue("The delay between attempts to persist segment should 
be larger than 1s. Actual: " + delay, delay >= 1000);
+            lastAttempt = writeAttempts.get(i);
+        }
+
+        AtomicBoolean addFinished = new AtomicBoolean(false);
+        new Thread(() -> {
+            try {
+                queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0);
+                addFinished.set(true);
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+        }).start();
+
+        Thread.sleep(100);
+        assertFalse("Adding segments should be blocked until the recovery mode 
is finished", addFinished.get());
+
+        doBreak.set(false);
+        while (queue.isBroken()) {
+            Thread.sleep(10);
+        }
+        assertFalse("Queue shouldn't be broken anymore", queue.isBroken());
+
+        while (added.size() < 11) {
+            Thread.sleep(10);
+        }
+        assertEquals("All segments should be consumed",11, added.size());
+        for (int i = 0; i < 11; i++) {
+            assertTrue("All segments should be consumed", 
added.contains(uuid(i)));
+        }
+
+        int i = writeAttempts.size() - 10;
+        lastAttempt = writeAttempts.get(i);
+        for (; i < writeAttempts.size(); i++) {
+            long delay = writeAttempts.get(i) - lastAttempt;
+            assertTrue("Segments should be persisted immediately", delay < 
1000);
+            lastAttempt = writeAttempts.get(i);
+        }
+    }
+
+    private static AzureSegmentArchiveEntry tarEntry(long i) {
+        return new AzureSegmentArchiveEntry(0, i, 0, 0, 0, 0, false);
+    }
+
+    private static UUID uuid(long i) {
+        return new UUID(0, i);
+    }
+
+}

Added: jackrabbit/oak/trunk/oak-segment-azure/start-azurite.sh
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/start-azurite.sh?rev=1827292&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-azure/start-azurite.sh (added)
+++ jackrabbit/oak/trunk/oak-segment-azure/start-azurite.sh Tue Mar 20 10:54:09 
2018
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+docker run -e executable=blob --rm -t -p 10000:10000 trekawek/azurite

Propchange: jackrabbit/oak/trunk/oak-segment-azure/start-azurite.sh
------------------------------------------------------------------------------
    svn:executable = *

Modified: jackrabbit/oak/trunk/pom.xml
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/pom.xml?rev=1827292&r1=1827291&r2=1827292&view=diff
==============================================================================
--- jackrabbit/oak/trunk/pom.xml (original)
+++ jackrabbit/oak/trunk/pom.xml Tue Mar 20 10:54:09 2018
@@ -70,6 +70,7 @@
     <module>oak-examples</module>
     <module>oak-it</module>
     <module>oak-segment-tar</module>
+    <module>oak-segment-azure</module>
     <module>oak-benchmarks</module>
   </modules>
 


Reply via email to