Author: adulceanu
Date: Tue Jan 22 09:25:37 2019
New Revision: 1851789

URL: http://svn.apache.org/viewvc?rev=1851789&view=rev
Log:
OAK-7977 - Add multi-threaded segment transfer to oak-run segment-copy

Added:
    
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java
   (with props)
Modified:
    
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java
    
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java
    
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTestBase.java

Modified: 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java?rev=1851789&r1=1851788&r2=1851789&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/run/SegmentCopyCommand.java
 Tue Jan 22 09:25:37 2019
@@ -25,14 +25,12 @@ import java.io.PrintWriter;
 
 import joptsimple.OptionParser;
 import joptsimple.OptionSet;
-import joptsimple.OptionSpec;
 
 class SegmentCopyCommand implements Command {
 
     @Override
     public void execute(String... args) throws Exception {
         OptionParser parser = new OptionParser();
-        OptionSpec<?> verbose = parser.accepts("verbose", "print detailed 
output about individual segments transfered");
         OptionSet options = parser.parse(args);
 
         PrintWriter out = new PrintWriter(System.out, true);
@@ -48,7 +46,6 @@ class SegmentCopyCommand implements Comm
         int statusCode = SegmentCopy.builder()
                 .withSource(source)
                 .withDestination(destination)
-                .withVerbose(options.has(verbose))
                 .withOutWriter(out)
                 .withErrWriter(err)
                 .build()
@@ -66,4 +63,4 @@ class SegmentCopyCommand implements Comm
         parser.printHelpOn(err);
         System.exit(1);
     }
-}
+}
\ No newline at end of file

Modified: 
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java?rev=1851789&r1=1851788&r2=1851789&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentCopy.java
 Tue Jan 22 09:25:37 2019
@@ -19,7 +19,6 @@
 package org.apache.jackrabbit.oak.segment.azure.tool;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static 
org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.fetchByteArray;
 import static 
org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.newSegmentNodeStorePersistence;
 import static 
org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.printMessage;
 import static 
org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.printableStopwatch;
@@ -29,33 +28,13 @@ import static org.apache.jackrabbit.oak.
 import com.google.common.base.Stopwatch;
 
 import org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.SegmentStoreType;
-import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitor;
-import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
-import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitor;
-import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
-import org.apache.jackrabbit.oak.segment.spi.persistence.GCJournalFile;
-import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFile;
-import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader;
-import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter;
-import org.apache.jackrabbit.oak.segment.spi.persistence.ManifestFile;
 import org.apache.jackrabbit.oak.segment.spi.persistence.RepositoryLock;
-import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
-import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
-import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
-import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
 import 
org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
 import org.apache.jackrabbit.oak.segment.tool.Check;
-import org.apache.jackrabbit.oak.segment.spi.persistence.Buffer;
 
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.text.MessageFormat;
-import java.util.ArrayDeque;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
 
 /**
  * Perform a full-copy of repository data at segment level.
@@ -83,8 +62,6 @@ public class SegmentCopy {
 
         private SegmentNodeStorePersistence destPersistence;
 
-        private boolean verbose;
-
         private PrintWriter outWriter;
 
         private PrintWriter errWriter;
@@ -167,19 +144,6 @@ public class SegmentCopy {
         }
 
         /**
-         * Whether to show detailed output about current copy operation or not.
-         *
-         * @param verbose,
-         *            <code>true</code> to print detailed output, 
<code>false</code>
-         *            otherwise.
-         * @return this builder.
-         */
-        public Builder withVerbose(boolean verbose) {
-            this.verbose = verbose;
-            return this;
-        }
-
-        /**
          * Create an executable version of the {@link Check} command.
          *
          * @return an instance of {@link Runnable}.
@@ -197,8 +161,6 @@ public class SegmentCopy {
 
     private final String destination;
 
-    private final boolean verbose;
-
     private final PrintWriter outWriter;
 
     private final PrintWriter errWriter;
@@ -207,12 +169,12 @@ public class SegmentCopy {
 
     private SegmentNodeStorePersistence destPersistence;
 
+
     public SegmentCopy(Builder builder) {
         this.source = builder.source;
         this.destination = builder.destination;
         this.srcPersistence = builder.srcPersistence;
         this.destPersistence = builder.destPersistence;
-        this.verbose = builder.verbose;
         this.outWriter = builder.outWriter;
         this.errWriter = builder.errWriter;
     }
@@ -224,6 +186,9 @@ public class SegmentCopy {
         SegmentStoreType srcType = storeTypeFromPathOrUri(source);
         SegmentStoreType destType = storeTypeFromPathOrUri(destination);
 
+        String srcDescription = storeDescription(srcType, source);
+        String destDescription = storeDescription(destType, destination);
+
         try {
             if (srcPersistence == null || destPersistence == null) {
                 srcPersistence = newSegmentNodeStorePersistence(srcType, 
source);
@@ -231,48 +196,23 @@ public class SegmentCopy {
             }
 
             printMessage(outWriter, "Started segment-copy transfer!");
-            printMessage(outWriter, "Source: {0}", storeDescription(srcType, 
source));
-            printMessage(outWriter, "Destination: {0}", 
storeDescription(destType, destination));
+            printMessage(outWriter, "Source: {0}", srcDescription);
+            printMessage(outWriter, "Destination: {0}", destDescription);
 
             try {
-                srcPersistence.lockRepository();
+                srcRepositoryLock = srcPersistence.lockRepository();
             } catch (Exception e) {
                 throw new Exception(MessageFormat.format(
                         "Cannot lock source segment store {0} for starting 
copying process. Giving up!",
-                        storeDescription(srcType, source)));
+                        srcDescription));
             }
 
-            printMessage(outWriter, "Copying archives...");
-            // TODO: copy only segments not transfered
-            IOMonitor ioMonitor = new IOMonitorAdapter();
-            FileStoreMonitor fileStoreMonitor = new FileStoreMonitorAdapter();
-
-            SegmentArchiveManager srcArchiveManager = 
srcPersistence.createArchiveManager(false, false, ioMonitor,
-                    fileStoreMonitor);
-            SegmentArchiveManager destArchiveManager = 
destPersistence.createArchiveManager(false, false, ioMonitor,
-                    fileStoreMonitor);
-            copyArchives(srcArchiveManager, destArchiveManager);
-
-            printMessage(outWriter, "Copying journal...");
-            // TODO: delete destination journal file if present
-            JournalFile srcJournal = srcPersistence.getJournalFile();
-            JournalFile destJournal = destPersistence.getJournalFile();
-            copyJournal(srcJournal, destJournal);
-
-            printMessage(outWriter, "Copying gc journal...");
-            // TODO: delete destination gc journal file if present
-            GCJournalFile srcGcJournal = srcPersistence.getGCJournalFile();
-            GCJournalFile destGcJournal = destPersistence.getGCJournalFile();
-            for (String line : srcGcJournal.readLines()) {
-                destGcJournal.writeLine(line);
-            }
+            SegmentStoreMigrator migrator = new SegmentStoreMigrator.Builder()
+                    .withSourcePersistence(srcPersistence, srcDescription)
+                    .withTargetPersistence(destPersistence, destDescription)
+                    .build();
 
-            printMessage(outWriter, "Copying manifest...");
-            // TODO: delete destination manifest file if present
-            ManifestFile srcManifest = srcPersistence.getManifestFile();
-            ManifestFile destManifest = destPersistence.getManifestFile();
-            Properties properties = srcManifest.load();
-            destManifest.save(properties);
+            migrator.migrate();
         } catch (Exception e) {
             watch.stop();
             printMessage(errWriter, "A problem occured while copying archives 
from {0} to {1} ", source, destination);
@@ -283,8 +223,7 @@ public class SegmentCopy {
                 try {
                     srcRepositoryLock.unlock();
                 } catch (IOException e) {
-                    printMessage(errWriter, "A problem occured while unlocking 
source repository {0} ",
-                            storeDescription(srcType, source));
+                    printMessage(errWriter, "A problem occured while unlocking 
source repository {0} ", srcDescription);
                     e.printStackTrace(errWriter);
                 }
             }
@@ -295,76 +234,4 @@ public class SegmentCopy {
 
         return 0;
     }
-
-    private void copyArchives(SegmentArchiveManager srcArchiveManager, 
SegmentArchiveManager destArchiveManager)
-            throws IOException {
-        List<String> srcArchiveNames = srcArchiveManager.listArchives();
-        Collections.sort(srcArchiveNames);
-        int archiveCount = srcArchiveNames.size();
-        int crtCount = 0;
-
-        for (String archiveName : srcArchiveNames) {
-            crtCount++;
-            printMessage(outWriter, "{0} - {1}/{2}", archiveName, crtCount, 
archiveCount);
-            if (verbose) {
-                printMessage(outWriter, "    |");
-            }
-
-            SegmentArchiveWriter archiveWriter = 
destArchiveManager.create(archiveName);
-            SegmentArchiveReader archiveReader = 
srcArchiveManager.open(archiveName);
-            List<SegmentArchiveEntry> segmentEntries = 
archiveReader.listSegments();
-            for (SegmentArchiveEntry segmentEntry : segmentEntries) {
-                writeSegment(segmentEntry, archiveReader, archiveWriter);
-            }
-
-            Buffer binRefBuffer = archiveReader.getBinaryReferences();
-            byte[] binRefData = fetchByteArray(binRefBuffer);
-
-            archiveWriter.writeBinaryReferences(binRefData);
-
-            Buffer graphBuffer = archiveReader.getGraph();
-            byte[] graphData = fetchByteArray(graphBuffer);
-
-            archiveWriter.writeGraph(graphData);
-            archiveWriter.close();
-        }
-    }
-
-    private void writeSegment(SegmentArchiveEntry segmentEntry, 
SegmentArchiveReader archiveReader,
-            SegmentArchiveWriter archiveWriter) throws IOException {
-        long msb = segmentEntry.getMsb();
-        long lsb = segmentEntry.getLsb();
-        if (verbose) {
-            printMessage(outWriter, "    - {0}", new UUID(msb, lsb));
-        }
-
-        int size = segmentEntry.getLength();
-        int offset = 0;
-        int generation = segmentEntry.getGeneration();
-        int fullGeneration = segmentEntry.getFullGeneration();
-        boolean isCompacted = segmentEntry.isCompacted();
-
-        Buffer byteBuffer = archiveReader.readSegment(msb, lsb);
-        byte[] data = fetchByteArray(byteBuffer);
-
-        archiveWriter.writeSegment(msb, lsb, data, offset, size, generation, 
fullGeneration, isCompacted);
-        archiveWriter.flush();
-    }
-
-    private void copyJournal(JournalFile srcJournal, JournalFile destJournal) 
throws IOException {
-        try (JournalFileReader srcJournalReader = 
srcJournal.openJournalReader();
-                JournalFileWriter destJournalWriter = 
destJournal.openJournalWriter()) {
-
-            Deque<String> linesStack = new ArrayDeque<>();
-            String line = null;
-            while ((line = srcJournalReader.readLine()) != null) {
-                linesStack.push(line);
-            }
-
-            while (!linesStack.isEmpty()) {
-                line = linesStack.pop();
-                destJournalWriter.writeLine(line);
-            }
-        }
-    }
 }
\ No newline at end of file

Added: 
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java?rev=1851789&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java
 Tue Jan 22 09:25:37 2019
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.segment.azure.tool;
+
+import static 
org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.fetchByteArray;
+import static 
org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.storeDescription;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobDirectory;
+
+import org.apache.jackrabbit.oak.segment.azure.AzurePersistence;
+import org.apache.jackrabbit.oak.segment.azure.tool.ToolUtils.SegmentStoreType;
+import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence;
+import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter;
+import org.apache.jackrabbit.oak.segment.spi.persistence.Buffer;
+import org.apache.jackrabbit.oak.segment.spi.persistence.GCJournalFile;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileReader;
+import org.apache.jackrabbit.oak.segment.spi.persistence.JournalFileWriter;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader;
+import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveWriter;
+import 
org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SegmentStoreMigrator {
+
+    private static final Logger log = 
LoggerFactory.getLogger(SegmentStoreMigrator.class);
+
+    private static final int READ_THREADS = 20;
+
+    private final SegmentNodeStorePersistence source;
+
+    private final SegmentNodeStorePersistence target;
+
+    private final String sourceName;
+
+    private final String targetName;
+
+    private ExecutorService executor = 
Executors.newFixedThreadPool(READ_THREADS + 1);
+
+    private SegmentStoreMigrator(Builder builder) {
+        this.source = builder.source;
+        this.target = builder.target;
+        this.sourceName = builder.sourceName;
+        this.targetName = builder.targetName;
+    }
+
+    public void migrate() throws IOException, ExecutionException, 
InterruptedException {
+        migrateJournal();
+        migrateGCJournal();
+        migrateManifest();
+        migrateArchives();
+    }
+
+    private void migrateJournal() throws IOException {
+        log.info("{}/journal.log -> {}", sourceName, targetName);
+        if (!source.getJournalFile().exists()) {
+            log.info("No journal at {}; skipping.", sourceName);
+            return;
+        }
+        List<String> journal = new ArrayList<>();
+        try (JournalFileReader reader = 
source.getJournalFile().openJournalReader()) {
+            String line;
+            while ((line = reader.readLine()) != null) {
+                journal.add(line);
+            }
+        }
+        Collections.reverse(journal);
+        try (JournalFileWriter writer = 
target.getJournalFile().openJournalWriter()) {
+            for (String line : journal) {
+                writer.writeLine(line);
+            }
+        }
+    }
+
+    private void migrateGCJournal() throws IOException {
+        log.info("{}/gc.log -> {}", sourceName, targetName);
+        GCJournalFile targetGCJournal = target.getGCJournalFile();
+        for (String line : source.getGCJournalFile().readLines()) {
+            targetGCJournal.writeLine(line);
+        }
+    }
+
+    private void migrateManifest() throws IOException {
+        log.info("{}/manifest -> {}", sourceName, targetName);
+        if (!source.getManifestFile().exists()) {
+            log.info("No manifest at {}; skipping.", sourceName);
+            return;
+        }
+        Properties manifest = source.getManifestFile().load();
+        target.getManifestFile().save(manifest);
+    }
+
+    private void migrateArchives() throws IOException, ExecutionException, 
InterruptedException {
+        if (!source.segmentFilesExist()) {
+            log.info("No segment archives at {}; skipping.", sourceName);
+            return;
+        }
+        SegmentArchiveManager sourceManager = 
source.createArchiveManager(false, false, new IOMonitorAdapter(),
+                new FileStoreMonitorAdapter());
+        SegmentArchiveManager targetManager = 
target.createArchiveManager(false, false, new IOMonitorAdapter(),
+                new FileStoreMonitorAdapter());
+        for (String archiveName : sourceManager.listArchives()) {
+            log.info("{}/{} -> {}", sourceName, archiveName, targetName);
+            try (SegmentArchiveReader reader = 
sourceManager.forceOpen(archiveName)) {
+                SegmentArchiveWriter writer = 
targetManager.create(archiveName);
+                try {
+                    migrateSegments(reader, writer);
+                    migrateBinaryRef(reader, writer);
+                    migrateGraph(reader, writer);
+                } finally {
+                    writer.close();
+                }
+            }
+        }
+    }
+
+    private void migrateSegments(SegmentArchiveReader reader, 
SegmentArchiveWriter writer)
+            throws InterruptedException, ExecutionException {
+        BlockingDeque<Segment> readDeque = new 
LinkedBlockingDeque<>(READ_THREADS);
+        BlockingDeque<Segment> writeDeque = new 
LinkedBlockingDeque<>(READ_THREADS);
+        AtomicBoolean processingFinished = new AtomicBoolean(false);
+        AtomicBoolean exception = new AtomicBoolean(false);
+        List<Future<?>> futures = new ArrayList<>();
+        for (int i = 0; i < READ_THREADS; i++) {
+            futures.add(executor.submit(() -> {
+                try {
+                    while (!exception.get() && !(readDeque.isEmpty() && 
processingFinished.get())) {
+                        Segment segment = readDeque.poll(100, 
TimeUnit.MILLISECONDS);
+                        if (segment != null) {
+                            segment.read(reader);
+                        }
+                    }
+                    return null;
+                } catch (Exception e) {
+                    exception.set(true);
+                    throw e;
+                }
+            }));
+        }
+        futures.add(executor.submit(() -> {
+            try {
+                while (!exception.get() && !(writeDeque.isEmpty() && 
processingFinished.get())) {
+                    Segment segment = writeDeque.poll(100, 
TimeUnit.MILLISECONDS);
+                    if (segment != null) {
+                        while (segment.data == null && !exception.get()) {
+                            Thread.sleep(10);
+                        }
+                        segment.write(writer);
+                    }
+                }
+                return null;
+            } catch (Exception e) {
+                exception.set(true);
+                throw e;
+            }
+        }));
+        for (SegmentArchiveEntry entry : reader.listSegments()) {
+            Segment segment = new Segment(entry);
+            readDeque.putLast(segment);
+            writeDeque.putLast(segment);
+        }
+        processingFinished.set(true);
+        for (Future<?> future : futures) {
+            future.get();
+        }
+    }
+
+    private void migrateBinaryRef(SegmentArchiveReader reader, 
SegmentArchiveWriter writer) throws IOException {
+        Buffer binaryReferences = reader.getBinaryReferences();
+        if (binaryReferences != null) {
+            byte[] array = fetchByteArray(binaryReferences);
+            writer.writeBinaryReferences(array);
+        }
+    }
+
+    private void migrateGraph(SegmentArchiveReader reader, 
SegmentArchiveWriter writer) throws IOException {
+        if (reader.hasGraph()) {
+            Buffer graph = reader.getGraph();
+            byte[] array = fetchByteArray(graph);
+            writer.writeGraph(array);
+        }
+    }
+
+    private static class Segment {
+
+        private final SegmentArchiveEntry entry;
+
+        private volatile Buffer data;
+
+        private Segment(SegmentArchiveEntry entry) {
+            this.entry = entry;
+        }
+
+        private void read(SegmentArchiveReader reader) throws IOException {
+            data = reader.readSegment(entry.getMsb(), entry.getLsb());
+        }
+
+        private void write(SegmentArchiveWriter writer) throws IOException {
+            final byte[] array = data.array();
+            final int offset = 0;
+            writer.writeSegment(entry.getMsb(), entry.getLsb(), array, offset, 
entry.getLength(), entry.getGeneration(),
+                    entry.getFullGeneration(), entry.isCompacted());
+        }
+
+        @Override
+        public String toString() {
+            return new UUID(entry.getMsb(), entry.getLsb()).toString();
+        }
+    }
+
+    public static class Builder {
+
+        private SegmentNodeStorePersistence source;
+
+        private SegmentNodeStorePersistence target;
+
+        private String sourceName;
+
+        private String targetName;
+
+        public Builder withSource(File dir) {
+            this.source = new TarPersistence(dir);
+            this.sourceName = storeDescription(SegmentStoreType.TAR, 
dir.getPath());
+            return this;
+        }
+
+        public Builder withSource(CloudBlobDirectory dir) throws 
URISyntaxException, StorageException {
+            this.source = new AzurePersistence(dir);
+            this.sourceName = storeDescription(SegmentStoreType.AZURE, 
dir.getContainer().getName() + "/" + dir.getPrefix());
+            return this;
+        }
+
+        public Builder withSourcePersistence(SegmentNodeStorePersistence 
source, String sourceName) {
+            this.source = source;
+            this.sourceName = sourceName;
+            return this;
+        }
+
+        public Builder withTargetPersistence(SegmentNodeStorePersistence 
target, String targetName) {
+            this.target = target;
+            this.targetName = targetName;
+            return this;
+        }
+
+        public Builder withTarget(File dir) {
+            this.target = new TarPersistence(dir);
+            this.targetName = storeDescription(SegmentStoreType.TAR, 
dir.getPath());
+            return this;
+        }
+
+        public Builder withTarget(CloudBlobDirectory dir) throws 
URISyntaxException, StorageException {
+            this.target = new AzurePersistence(dir);
+            this.targetName = storeDescription(SegmentStoreType.AZURE, 
dir.getContainer().getName() + "/" + dir.getPrefix());
+            return this;
+        }
+
+        public SegmentStoreMigrator build() {
+            return new SegmentStoreMigrator(this);
+        }
+    }
+}
\ No newline at end of file

Propchange: 
jackrabbit/oak/trunk/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/tool/SegmentStoreMigrator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTestBase.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTestBase.java?rev=1851789&r1=1851788&r2=1851789&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTestBase.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-azure/src/test/java/oak/apache/jackrabbit/oak/segment/azure/tool/SegmentCopyTestBase.java
 Tue Jan 22 09:25:37 2019
@@ -121,7 +121,7 @@ public abstract class SegmentCopyTestBas
 
         SegmentCopy segmentCopy = 
SegmentCopy.builder().withSrcPersistencee(srcPersistence)
                 
.withDestPersistence(destPersistence).withSource(srcPathOrUri).withDestination(destPathOrUri)
-                
.withOutWriter(outWriter).withErrWriter(errWriter).withVerbose(true).build();
+                .withOutWriter(outWriter).withErrWriter(errWriter).build();
         return segmentCopy.run();
     }
 
@@ -229,4 +229,4 @@ public abstract class SegmentCopyTestBas
 
         return uri.toString();
     }
-}
+}
\ No newline at end of file


Reply via email to