Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteAction.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure.queue; + +import org.apache.jackrabbit.oak.segment.azure.AzureSegmentArchiveEntry; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.UUID; + +public class SegmentWriteAction { + + private final AzureSegmentArchiveEntry indexEntry; + + private final byte[] buffer; + + private final int offset; + + private final int length; + + public SegmentWriteAction(AzureSegmentArchiveEntry indexEntry, byte[] buffer, int offset, int length) { + this.indexEntry = indexEntry; + + this.buffer = new byte[length]; + for (int i = 0; i < length; i++) { + this.buffer[i] = buffer[i + offset]; + } + this.offset = 0; + this.length = length; + } + + public UUID getUuid() { + return new UUID(indexEntry.getMsb(), indexEntry.getLsb()); + } + + public ByteBuffer toByteBuffer() { + return ByteBuffer.wrap(buffer, offset, length); + } + + void passTo(SegmentWriteQueue.SegmentConsumer consumer) throws IOException { + consumer.consume(indexEntry, buffer, offset, length); + } + + @Override + public String toString() { + return getUuid().toString(); + } +}
Added: jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueue.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure.queue; + +import org.apache.jackrabbit.oak.segment.azure.AzureSegmentArchiveEntry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class SegmentWriteQueue implements Closeable { + + public static final int THREADS = Integer.getInteger("oak.segment.azure.threads", 5); + + private static final int QUEUE_SIZE = Integer.getInteger("oak.segment.org.apache.jackrabbit.oak.segment.azure.queue", 20); + + private static final Logger log = LoggerFactory.getLogger(SegmentWriteQueue.class); + + private final BlockingDeque<SegmentWriteAction> queue; + + private final Map<UUID, SegmentWriteAction> segmentsByUUID; + + private final ExecutorService executor; + + private final ReadWriteLock flushLock; + + private final SegmentConsumer writer; + + private volatile boolean shutdown; + + private final Object brokenMonitor = new Object(); + + private volatile boolean broken; + + public SegmentWriteQueue(SegmentConsumer writer) { + this(writer, QUEUE_SIZE, THREADS); + } + + SegmentWriteQueue(SegmentConsumer writer, int queueSize, int threadNo) { + this.writer = writer; + segmentsByUUID = new ConcurrentHashMap<>(); + flushLock = new ReentrantReadWriteLock(); + + queue = new LinkedBlockingDeque<>(queueSize); + executor = Executors.newFixedThreadPool(threadNo + 1); + for (int i = 0; i < threadNo; i++) { + executor.submit(this::mainLoop); + } + executor.submit(this::emergencyLoop); + } + + private void mainLoop() { + while (!shutdown) { + try { + waitWhileBroken(); + if (shutdown) { + break; + } + consume(); + } catch (SegmentConsumeException e) { + SegmentWriteAction segment = e.segment; + log.error("Can't persist the segment {}", segment.getUuid(), e.getCause()); + try { + queue.put(segment); + } catch (InterruptedException e1) { + log.error("Can't re-add the segment {} to the queue. It'll be dropped.", segment.getUuid(), e1); + } + } + } + } + + private void consume() throws SegmentConsumeException { + SegmentWriteAction segment = null; + try { + segment = queue.poll(100, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + log.error("Poll from queue interrupted", e); + } + if (segment != null) { + consume(segment); + } + } + + private void consume(SegmentWriteAction segment) throws SegmentConsumeException { + try { + segment.passTo(writer); + } catch (IOException e) { + setBroken(true); + throw new SegmentConsumeException(segment, e); + } + synchronized (segmentsByUUID) { + segmentsByUUID.remove(segment.getUuid()); + segmentsByUUID.notifyAll(); + } + setBroken(false); + } + + private void emergencyLoop() { + while (!shutdown) { + waitUntilBroken(); + if (shutdown) { + break; + } + + boolean success = false; + SegmentWriteAction segmentToRetry = null; + do { + try { + if (segmentToRetry == null) { + consume(); + } else { + consume(segmentToRetry); + } + success = true; + } catch (SegmentConsumeException e) { + segmentToRetry = e.segment; + log.error("Can't persist the segment {}", segmentToRetry.getUuid(), e.getCause()); + try { + Thread.sleep(1000); + } catch (InterruptedException e1) { + log.warn("Interrupted", e); + } + if (shutdown) { + log.error("Shutdown initiated. The segment {} will be dropped.", segmentToRetry.getUuid()); + } + } + } while (!success && !shutdown); + } + } + + public void addToQueue(AzureSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException { + waitWhileBroken(); + if (shutdown) { + throw new IllegalStateException("Can't accept the new segment - shutdown in progress"); + } + + SegmentWriteAction action = new SegmentWriteAction(indexEntry, data, offset, size); + flushLock.readLock().lock(); + try { + segmentsByUUID.put(action.getUuid(), action); + if (!queue.offer(action, 1, TimeUnit.MINUTES)) { + segmentsByUUID.remove(action.getUuid()); + throw new IOException("Can't add segment to the queue"); + } + } catch (InterruptedException e) { + throw new IOException(e); + } finally { + flushLock.readLock().unlock(); + } + } + + public void flush() throws IOException { + flushLock.writeLock().lock(); + try { + synchronized (segmentsByUUID) { + long start = System.currentTimeMillis(); + while (!segmentsByUUID.isEmpty()) { + segmentsByUUID.wait(100); + if (System.currentTimeMillis() - start > TimeUnit.MINUTES.toMillis(1)) { + log.error("Can't flush the queue in 1 minute. Queue: {}. Segment map: {}", queue, segmentsByUUID); + start = System.currentTimeMillis(); + } + } + } + } catch (InterruptedException e) { + throw new IOException(e); + } finally { + flushLock.writeLock().unlock(); + } + } + + public SegmentWriteAction read(UUID id) { + return segmentsByUUID.get(id); + } + + public void close() throws IOException { + shutdown = true; + try { + executor.shutdown(); + if (!executor.awaitTermination(1, TimeUnit.MINUTES)) { + throw new IOException("The write wasn't able to shut down clearly"); + } + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + public boolean isEmpty() { + return segmentsByUUID.isEmpty(); + } + + boolean isBroken() { + return broken; + } + + int getSize() { + return queue.size(); + } + + private void setBroken(boolean broken) { + synchronized (brokenMonitor) { + this.broken = broken; + brokenMonitor.notifyAll(); + } + } + + private void waitWhileBroken() { + if (!broken) { + return; + } + synchronized (brokenMonitor) { + while (broken && !shutdown) { + try { + brokenMonitor.wait(100); + } catch (InterruptedException e) { + log.warn("Interrupted", e); + } + } + } + } + + private void waitUntilBroken() { + if (broken) { + return; + } + synchronized (brokenMonitor) { + while (!broken && !shutdown) { + try { + brokenMonitor.wait(100); + } catch (InterruptedException e) { + log.warn("Interrupted", e); + } + } + } + } + + public interface SegmentConsumer { + + void consume(AzureSegmentArchiveEntry indexEntry, byte[] data, int offset, int size) throws IOException; + + } + + public static class SegmentConsumeException extends Exception { + + private final SegmentWriteAction segment; + + public SegmentConsumeException(SegmentWriteAction segment, IOException cause) { + super(cause); + this.segment = segment; + } + } +} Added: jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.UUID; + +import static com.google.common.collect.Lists.newArrayList; +import static org.junit.Assert.assertEquals; + +public class AzureArchiveManagerTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + public void setup() throws StorageException, InvalidKeyException, URISyntaxException { + container = azurite.getContainer("oak-test"); + } + + @Test + public void testRecovery() throws StorageException, URISyntaxException, IOException { + SegmentArchiveManager manager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(false, new IOMonitorAdapter(), new FileStoreMonitorAdapter()); + SegmentArchiveWriter writer = manager.create("data00000a.tar"); + + List<UUID> uuids = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + UUID u = UUID.randomUUID(); + writer.writeSegment(u.getMostSignificantBits(), u.getLeastSignificantBits(), new byte[10], 0, 10, 0, 0, false); + uuids.add(u); + } + + writer.flush(); + writer.close(); + + container.getBlockBlobReference("oak/data00000a.tar/0005." + uuids.get(5).toString()).delete(); + + LinkedHashMap<UUID, byte[]> recovered = new LinkedHashMap<>(); + manager.recoverEntries("data00000a.tar", recovered); + assertEquals(uuids.subList(0, 5), newArrayList(recovered.keySet())); + } + +} Added: jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalTest.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalTest.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureGCJournalTest.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,45 @@ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.file.GcJournalTest; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; + +import java.net.URISyntaxException; +import java.security.InvalidKeyException; + +public class AzureGCJournalTest extends GcJournalTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + public void setup() throws StorageException, InvalidKeyException, URISyntaxException { + container = azurite.getContainer("oak-test"); + } + + @Override + protected SegmentNodeStorePersistence getPersistence() throws Exception { + return new AzurePersistence(container.getDirectoryReference("oak")); + } + + @Test + @Ignore + @Override + public void testReadOak16GCLog() throws Exception { + super.testReadOak16GCLog(); + } + + @Test + @Ignore + @Override + public void testUpdateOak16GCLog() throws Exception { + super.testUpdateOak16GCLog(); + } +} Added: jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureJournalFileTest.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader; +import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class AzureJournalFileTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + private AzureJournalFile journal; + + @Before + public void setup() throws StorageException, InvalidKeyException, URISyntaxException { + container = azurite.getContainer("oak-test"); + journal = new AzureJournalFile(container.getDirectoryReference("journal"), "journal.log", 10); + } + + @Test + public void testSplitJournalFiles() throws IOException { + assertFalse(journal.exists()); + + JournalFileWriter writer = journal.openJournalWriter(); + for (int i = 0; i < 100; i++) { + writer.writeLine("line " + i); + } + + assertTrue(journal.exists()); + + writer = journal.openJournalWriter(); + for (int i = 100; i < 200; i++) { + writer.writeLine("line " + i); + } + + JournalFileReader reader = journal.openJournalReader(); + for (int i = 199; i >= 0; i--) { + assertEquals("line " + i, reader.readLine()); + } + + + } + +} Added: jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFileTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFileTest.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFileTest.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureManifestFileTest.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,44 @@ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.spi.persistence.ManifestFile; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class AzureManifestFileTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + public void setup() throws StorageException, InvalidKeyException, URISyntaxException { + container = azurite.getContainer("oak-test"); + } + + @Test + public void testManifest() throws URISyntaxException, IOException { + ManifestFile manifestFile = new AzurePersistence(container.getDirectoryReference("oak")).getManifestFile(); + assertFalse(manifestFile.exists()); + + Properties props = new Properties(); + props.setProperty("xyz", "abc"); + props.setProperty("version", "123"); + manifestFile.save(props); + + Properties loaded = manifestFile.load(); + assertEquals(props, loaded); + } + +} Added: jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFileTest.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; +import org.apache.jackrabbit.oak.segment.file.tar.TarFileTest; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; + +public class AzureTarFileTest extends TarFileTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + @Override + public void setUp() throws IOException { + try { + container = azurite.getContainer("oak-test"); + archiveManager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(true, new IOMonitorAdapter(), new FileStoreMonitorAdapter()); + } catch (StorageException | InvalidKeyException | URISyntaxException e) { + throw new IOException(e); + } + } + + @Override + protected long getWriteAndReadExpectedSize() { + return 45; + } + + @Test + @Ignore + @Override + public void graphShouldBeTrimmedDownOnSweep() throws Exception { + super.graphShouldBeTrimmedDownOnSweep(); + } +} \ No newline at end of file Added: jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarFilesTest.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; +import org.apache.jackrabbit.oak.segment.file.tar.TarFiles; +import org.apache.jackrabbit.oak.segment.file.tar.TarFilesTest; +import org.junit.Before; +import org.junit.ClassRule; + +public class AzureTarFilesTest extends TarFilesTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + @Override + public void setUp() throws Exception { + container = azurite.getContainer("oak-test"); + tarFiles = TarFiles.builder() + .withDirectory(folder.newFolder()) + .withTarRecovery((id, data, recovery) -> { + // Intentionally left blank + }) + .withIOMonitor(new IOMonitorAdapter()) + .withFileStoreMonitor(new FileStoreMonitorAdapter()) + .withMaxFileSize(MAX_FILE_SIZE) + .withPersistence(new AzurePersistence(container.getDirectoryReference("oak"))) + .build(); + } +} Added: jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureTarWriterTest.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; +import org.apache.jackrabbit.oak.segment.file.tar.TarWriterTest; +import org.junit.Before; +import org.junit.ClassRule; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; + +public class AzureTarWriterTest extends TarWriterTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + @Override + public void setUp() throws IOException { + try { + monitor = new TestFileStoreMonitor(); + container = azurite.getContainer("oak-test"); + archiveManager = new AzurePersistence(container.getDirectoryReference("oak")).createArchiveManager(true, new IOMonitorAdapter(), monitor); + } catch (StorageException | InvalidKeyException | URISyntaxException e) { + throw new IOException(e); + } + } +} Added: jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzuriteDockerRule.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure; + +import com.arakelian.docker.junit.DockerRule; +import com.arakelian.docker.junit.model.ImmutableDockerConfig; +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.spotify.docker.client.DefaultDockerClient; +import org.junit.Assume; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +import java.net.URISyntaxException; +import java.security.InvalidKeyException; + +public class AzuriteDockerRule implements TestRule { + + private final DockerRule wrappedRule; + + public AzuriteDockerRule() { + wrappedRule = new DockerRule(ImmutableDockerConfig.builder() + .image("trekawek/azurite") + .name("oak-test-azurite") + .ports("10000") + .addStartedListener(container -> { + container.waitForPort("10000/tcp"); + container.waitForLog("Azure Blob Storage Emulator listening on port 10000"); + }) + .addContainerConfigurer(builder -> builder.env("executable=blob")) + .alwaysRemoveContainer(true) + .build()); + + } + + public CloudBlobContainer getContainer(String name) throws URISyntaxException, StorageException, InvalidKeyException { + int mappedPort = wrappedRule.getContainer().getPortBinding("10000/tcp").getPort(); + CloudStorageAccount cloud = CloudStorageAccount.parse("DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:" + mappedPort + "/devstoreaccount1;"); + CloudBlobContainer container = cloud.createCloudBlobClient().getContainerReference(name); + container.deleteIfExists(); + container.create(); + return container; + } + + @Override + public Statement apply(Statement statement, Description description) { + try { + DefaultDockerClient client = DefaultDockerClient.fromEnv().connectTimeoutMillis(5000L).readTimeoutMillis(20000L).build(); + client.ping(); + client.close(); + } catch (Exception e) { + Assume.assumeNoException(e); + } + + return wrappedRule.apply(statement, description); + } +} Added: jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/SegmentAzureFixture.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/SegmentAzureFixture.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/SegmentAzureFixture.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/fixture/SegmentAzureFixture.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.segment.azure.fixture; + +import com.google.common.io.Files; +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.CloudBlobDirectory; +import org.apache.jackrabbit.oak.fixture.NodeStoreFixture; +import org.apache.jackrabbit.oak.segment.SegmentNodeStoreBuilders; +import org.apache.jackrabbit.oak.segment.azure.AzurePersistence; +import org.apache.jackrabbit.oak.segment.file.FileStore; +import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder; +import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; +import org.apache.jackrabbit.oak.spi.state.NodeStore; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.HashMap; +import java.util.Map; + +public class SegmentAzureFixture extends NodeStoreFixture { + + private static final String AZURE_CONNECTION_STRING = System.getProperty("oak.segment.azure.connection", "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"); + + private static final String AZURE_CONTAINER = System.getProperty("oak.segment.azure.container", "oak"); + + private static final String AZURE_ROOT_PATH = System.getProperty("oak.segment.azure.rootPath", "/oak"); + + private Map<NodeStore, FileStore> fileStoreMap = new HashMap<>(); + + private Map<NodeStore, CloudBlobContainer> containerMap = new HashMap<>(); + + @Override + public NodeStore createNodeStore() { + AzurePersistence persistence; + CloudBlobContainer container; + try { + CloudStorageAccount cloud = CloudStorageAccount.parse(AZURE_CONNECTION_STRING); + + int i = 1; + while (true) { + String containerName; + if (i == 1) { + containerName = AZURE_CONTAINER; + } else { + containerName = AZURE_CONTAINER + "_" + i; + } + container = cloud.createCloudBlobClient().getContainerReference(containerName); + if (!container.exists()) { + container.create(); + break; + } + i++; + } + CloudBlobDirectory directory = container.getDirectoryReference(AZURE_ROOT_PATH); + persistence = new AzurePersistence(directory); + } catch (StorageException | URISyntaxException | InvalidKeyException e) { + throw new RuntimeException(e); + } + + try { + FileStore fileStore = FileStoreBuilder.fileStoreBuilder(Files.createTempDir()).withCustomPersistence(persistence).build(); + NodeStore nodeStore = SegmentNodeStoreBuilders.builder(fileStore).build(); + fileStoreMap.put(nodeStore, fileStore); + containerMap.put(nodeStore, container); + return nodeStore; + } catch (IOException | InvalidFileStoreVersionException e) { + throw new RuntimeException(e); + } + } + + public void dispose(NodeStore nodeStore) { + FileStore fs = fileStoreMap.remove(nodeStore); + if (fs != null) { + fs.close(); + } + try { + CloudBlobContainer container = containerMap.remove(nodeStore); + if (container != null) { + container.deleteIfExists(); + } + } catch (StorageException e) { + throw new RuntimeException(e); + } + } + + @Override + public String toString() { + return "SegmentAzure"; + } +} Added: jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureJournalReaderTest.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure.journal; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudAppendBlob; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.azure.AzuriteDockerRule; +import org.apache.jackrabbit.oak.segment.file.JournalReader; +import org.apache.jackrabbit.oak.segment.file.JournalReaderTest; +import org.apache.jackrabbit.oak.segment.azure.AzureJournalFile; +import org.junit.Before; +import org.junit.ClassRule; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; + +public class AzureJournalReaderTest extends JournalReaderTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + public void setup() throws StorageException, InvalidKeyException, URISyntaxException { + container = azurite.getContainer("oak-test"); + } + + protected JournalReader createJournalReader(String s) throws IOException { + try { + CloudAppendBlob blob = container.getAppendBlobReference("journal/journal.log.001"); + blob.createOrReplace(); + blob.appendText(s); + return new JournalReader(new AzureJournalFile(container.getDirectoryReference("journal"), "journal.log")); + } catch (StorageException | URISyntaxException e) { + throw new IOException(e); + } + } +} Added: jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureTarRevisionsTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureTarRevisionsTest.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureTarRevisionsTest.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/AzureTarRevisionsTest.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure.journal; + +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.segment.azure.AzuriteDockerRule; +import org.apache.jackrabbit.oak.segment.file.TarRevisionsTest; +import org.apache.jackrabbit.oak.segment.azure.AzurePersistence; +import org.junit.Before; +import org.junit.ClassRule; + +import java.io.IOException; +import java.net.URISyntaxException; + +public class AzureTarRevisionsTest extends TarRevisionsTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + public void setup() throws Exception { + container = azurite.getContainer("oak-test"); + super.setup(); + } + + @Override + protected SegmentNodeStorePersistence getPersistence() throws IOException { + try { + return new AzurePersistence(container.getDirectoryReference("oak")); + } catch (URISyntaxException e) { + throw new IOException(e); + } + } +} Added: jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ReverseFileReaderTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ReverseFileReaderTest.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ReverseFileReaderTest.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/journal/ReverseFileReaderTest.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure.journal; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudAppendBlob; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import org.apache.jackrabbit.oak.segment.azure.AzuriteDockerRule; +import org.apache.jackrabbit.oak.segment.azure.ReverseFileReader; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +public class ReverseFileReaderTest { + + @ClassRule + public static AzuriteDockerRule azurite = new AzuriteDockerRule(); + + private CloudBlobContainer container; + + @Before + public void setup() throws StorageException, InvalidKeyException, URISyntaxException { + container = azurite.getContainer("oak-test"); + getBlob().createOrReplace(); + } + + private CloudAppendBlob getBlob() throws URISyntaxException, StorageException { + return container.getAppendBlobReference("test-blob"); + } + + @Test + public void testReverseReader() throws IOException, URISyntaxException, StorageException { + List<String> entries = createFile( 1024, 80); + ReverseFileReader reader = new ReverseFileReader(getBlob(), 256); + assertEquals(entries, reader); + } + + @Test + public void testEmptyFile() throws IOException, URISyntaxException, StorageException { + List<String> entries = createFile( 0, 80); + ReverseFileReader reader = new ReverseFileReader(getBlob(), 256); + assertEquals(entries, reader); + } + + @Test + public void test1ByteBlock() throws IOException, URISyntaxException, StorageException { + List<String> entries = createFile( 10, 16); + ReverseFileReader reader = new ReverseFileReader(getBlob(), 1); + assertEquals(entries, reader); + } + + + private List<String> createFile(int lines, int maxLineLength) throws IOException, URISyntaxException, StorageException { + Random random = new Random(); + List<String> entries = new ArrayList<>(); + CloudAppendBlob blob = getBlob(); + for (int i = 0; i < lines; i++) { + int entrySize = random.nextInt(maxLineLength) + 1; + String entry = randomString(entrySize); + try { + blob.appendText(entry + '\n'); + } catch (StorageException e) { + throw new IOException(e); + } + entries.add(entry); + } + + entries.add(""); + Collections.reverse(entries); + return entries; + } + + private static void assertEquals(List<String> entries, ReverseFileReader reader) throws IOException { + int i = entries.size(); + for (String e : entries) { + Assert.assertEquals("line " + (--i), e, reader.readLine()); + } + Assert.assertNull(reader.readLine()); + } + + private static String randomString(int entrySize) { + Random r = new Random(); + + StringBuilder result = new StringBuilder(); + for (int i = 0; i < entrySize; i++) { + result.append((char) ('a' + r.nextInt('z' - 'a'))); + } + + return result.toString(); + } +} Added: jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java (added) +++ jackrabbit/oak/trunk/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/queue/SegmentWriteQueueTest.java Tue Mar 20 10:54:09 2018 @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.jackrabbit.oak.segment.azure.queue; + +import org.apache.jackrabbit.oak.segment.azure.AzureSegmentArchiveEntry; +import org.junit.After; +import org.junit.Test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class SegmentWriteQueueTest { + + private static final byte[] EMPTY_DATA = new byte[0]; + + private SegmentWriteQueue queue; + + @After + public void shutdown() throws IOException { + if (queue != null) { + queue.close(); + } + } + + @Test + public void testQueue() throws IOException, InterruptedException { + Set<UUID> added = Collections.synchronizedSet(new HashSet<>()); + Semaphore semaphore = new Semaphore(0); + queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + } + added.add(new UUID(tarEntry.getMsb(), tarEntry.getLsb())); + }); + + for (int i = 0; i < 10; i++) { + queue.addToQueue(tarEntry(i), EMPTY_DATA, 0, 0); + } + + for (int i = 0; i < 10; i++) { + assertNotNull("Segments should be available for read", queue.read(uuid(i))); + } + assertFalse("Queue shouldn't be empty", queue.isEmpty()); + + semaphore.release(Integer.MAX_VALUE); + while (!queue.isEmpty()) { + Thread.sleep(10); + } + + assertEquals("There should be 10 segments consumed",10, added.size()); + for (int i = 0; i < 10; i++) { + assertTrue("Missing consumed segment", added.contains(uuid(i))); + } + } + + @Test(timeout = 1000) + public void testFlush() throws IOException, InterruptedException { + Set<UUID> added = Collections.synchronizedSet(new HashSet<>()); + Semaphore semaphore = new Semaphore(0); + queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> { + try { + semaphore.acquire(); + } catch (InterruptedException e) { + } + added.add(new UUID(tarEntry.getMsb(), tarEntry.getLsb())); + }); + + for (int i = 0; i < 3; i++) { + queue.addToQueue(tarEntry(i), EMPTY_DATA, 0, 0); + } + + AtomicBoolean flushFinished = new AtomicBoolean(false); + Set<UUID> addedAfterFlush = new HashSet<>(); + new Thread(() -> { + try { + queue.flush(); + flushFinished.set(true); + addedAfterFlush.addAll(added); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }).start(); + + Thread.sleep(100); + assertFalse("Flush should be blocked", flushFinished.get()); + + AtomicBoolean addFinished = new AtomicBoolean(false); + new Thread(() -> { + try { + queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0); + addFinished.set(true); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }).start(); + + Thread.sleep(100); + assertFalse("Adding segments should be blocked until the flush is finished", addFinished.get()); + + semaphore.release(Integer.MAX_VALUE); + + while (!addFinished.get()) { + Thread.sleep(10); + } + assertTrue("Flush should be finished once the ", flushFinished.get()); + assertTrue("Adding segments should be blocked until the flush is finished", addFinished.get()); + + for (int i = 0; i < 3; i++) { + assertTrue(addedAfterFlush.contains(uuid(i))); + } + } + + @Test(expected = IllegalStateException.class) + public void testClose() throws IOException, InterruptedException { + queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> {}); + queue.close(); + queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0); + } + + @Test + public void testRecoveryMode() throws IOException, InterruptedException { + Set<UUID> added = Collections.synchronizedSet(new HashSet<>()); + Semaphore semaphore = new Semaphore(0); + AtomicBoolean doBreak = new AtomicBoolean(true); + List<Long> writeAttempts = Collections.synchronizedList(new ArrayList<>()); + queue = new SegmentWriteQueue((tarEntry, data, offset, size) -> { + writeAttempts.add(System.currentTimeMillis()); + try { + semaphore.acquire(); + } catch (InterruptedException e) { + } + if (doBreak.get()) { + throw new IOException(); + } + added.add(new UUID(tarEntry.getMsb(), tarEntry.getLsb())); + }); + + for (int i = 0; i < 10; i++) { + queue.addToQueue(tarEntry(i), EMPTY_DATA, 0, 0); + } + + semaphore.release(Integer.MAX_VALUE); + Thread.sleep(100); + + assertTrue(queue.isBroken()); + assertEquals(9, queue.getSize()); // the 10th segment is handled by the recovery thread + + writeAttempts.clear(); + while (writeAttempts.size() < 5) { + Thread.sleep(100); + } + long lastAttempt = writeAttempts.get(0); + for (int i = 1; i < 5; i++) { + long delay = writeAttempts.get(i) - lastAttempt; + assertTrue("The delay between attempts to persist segment should be larger than 1s. Actual: " + delay, delay >= 1000); + lastAttempt = writeAttempts.get(i); + } + + AtomicBoolean addFinished = new AtomicBoolean(false); + new Thread(() -> { + try { + queue.addToQueue(tarEntry(10), EMPTY_DATA, 0, 0); + addFinished.set(true); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }).start(); + + Thread.sleep(100); + assertFalse("Adding segments should be blocked until the recovery mode is finished", addFinished.get()); + + doBreak.set(false); + while (queue.isBroken()) { + Thread.sleep(10); + } + assertFalse("Queue shouldn't be broken anymore", queue.isBroken()); + + while (added.size() < 11) { + Thread.sleep(10); + } + assertEquals("All segments should be consumed",11, added.size()); + for (int i = 0; i < 11; i++) { + assertTrue("All segments should be consumed", added.contains(uuid(i))); + } + + int i = writeAttempts.size() - 10; + lastAttempt = writeAttempts.get(i); + for (; i < writeAttempts.size(); i++) { + long delay = writeAttempts.get(i) - lastAttempt; + assertTrue("Segments should be persisted immediately", delay < 1000); + lastAttempt = writeAttempts.get(i); + } + } + + private static AzureSegmentArchiveEntry tarEntry(long i) { + return new AzureSegmentArchiveEntry(0, i, 0, 0, 0, 0, false); + } + + private static UUID uuid(long i) { + return new UUID(0, i); + } + +} Added: jackrabbit/oak/trunk/oak-segment-azure/start-azurite.sh URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/start-azurite.sh?rev=1827292&view=auto ============================================================================== --- jackrabbit/oak/trunk/oak-segment-azure/start-azurite.sh (added) +++ jackrabbit/oak/trunk/oak-segment-azure/start-azurite.sh Tue Mar 20 10:54:09 2018 @@ -0,0 +1,3 @@ +#!/bin/bash + +docker run -e executable=blob --rm -t -p 10000:10000 trekawek/azurite Propchange: jackrabbit/oak/trunk/oak-segment-azure/start-azurite.sh ------------------------------------------------------------------------------ svn:executable = * Modified: jackrabbit/oak/trunk/pom.xml URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/pom.xml?rev=1827292&r1=1827291&r2=1827292&view=diff ============================================================================== --- jackrabbit/oak/trunk/pom.xml (original) +++ jackrabbit/oak/trunk/pom.xml Tue Mar 20 10:54:09 2018 @@ -70,6 +70,7 @@ <module>oak-examples</module> <module>oak-it</module> <module>oak-segment-tar</module> + <module>oak-segment-azure</module> <module>oak-benchmarks</module> </modules>
