This is an automated email from the ASF dual-hosted git repository.

thomasm pushed a commit to branch OAK-12004
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git

commit 82a2487b91a36061ee7fee0b217e22588722a1cd
Author: Thomas Mueller <[email protected]>
AuthorDate: Wed Nov 5 12:03:28 2025 +0100

    OAK-12004 Datastore: speedup datastore copy
---
 .../jackrabbit/oak/run/DataStoreCommand.java       |  32 ++++-
 .../jackrabbit/oak/run/DataStoreCopyCommand.java   |  13 +-
 .../org/apache/jackrabbit/oak/run/Downloader.java  | 153 ++++++++++++++++++---
 .../jackrabbit/oak/run/DataStoreCommandTest.java   |  14 +-
 4 files changed, 181 insertions(+), 31 deletions(-)

diff --git 
a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java 
b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java
index c9e84e29c7..8b7c69ca50 100644
--- a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java
@@ -644,12 +644,8 @@ public class DataStoreCommand implements Command {
 
             String id = list.get(0);
             // Split 
b47b58169f121822cd4a0a0a153ba5910e581ad2bc450b6af7e51e6214c2b173#123311 on # to 
get the id
-            List<String> idLengthSepList = Arrays.stream(id.split(HASH))
-                    .map(String::trim)
-                    .filter(s -> !s.isEmpty())
-                    .collect(Collectors.toList());
-            String blobId = idLengthSepList.get(0);
-
+            String blobId = id.split(HASH)[0];
+            long length = getBlobLengthOrZero(id);
             if (dsType == FAKE || dsType == FDS) {
                 // 0102030405... => 01/02/03/0102030405...
                 blobId = String.join(System.getProperty("file.separator"), 
blobId.substring(0, 2), blobId.substring(2, 4),
@@ -665,7 +661,7 @@ public class DataStoreCommand implements Command {
             // In case of blob ids dump, the list size would be 1 (Consisting 
of just the id)
             if (list.size() > 1) {
                 // Join back the encoded blob ref and the path on which the 
ref is present
-                return String.join(DELIM, blobId, 
EscapeUtils.unescapeLineBreaks(list.get(1)));
+                return String.join(DELIM, blobId, 
EscapeUtils.unescapeLineBreaks(list.get(1)), "" + length);
             } else {
                 // return the encoded blob id
                 return blobId;
@@ -687,6 +683,28 @@ public class DataStoreCommand implements Command {
         }
     }
 
+    /**
+     * Try to read the blob length from the blobId. It is typically after the 
'#'.
+     * It never throws an exception.
+     *
+     * @param blobId the blob id, which may contain a '#'
+     * @return the length, or 0 if unknown.
+     */
+    public static long getBlobLengthOrZero(String blobId) {
+        if (blobId == null) {
+            return 0;
+        }
+        int hashIndex = blobId.indexOf('#');
+        if (hashIndex < 0) {
+            return 0;
+        }
+        try {
+            return Long.parseLong(blobId.substring(hashIndex + 1));
+        } catch (NumberFormatException e) {
+            return 0;
+        }
+    }
+
     public static void main(String[] args) {
         long timestamp = System.currentTimeMillis();
         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss");
diff --git 
a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java 
b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java
index 7f54ad96a5..606371b4b5 100644
--- 
a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java
+++ 
b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java
@@ -76,13 +76,24 @@ public class DataStoreCopyCommand implements Command {
 
             long startNano = System.nanoTime();
 
-            ids.forEach(id -> {
+            ids.forEach(line -> {
+                String[] parts = line.split(",");
+                String id = parts[0];
+                long length = 0;
+                if (parts.length > 2) {
+                    try {
+                        length = Long.parseLong(parts[2]);
+                    } catch (NumberFormatException e) {
+                        // ignore: length 0
+                    }
+                }
                 Downloader.Item item = new Downloader.Item();
                 item.source = sourceRepo + "/" + id;
                 if (sasToken != null) {
                     item.source += "?" + sasToken;
                 }
                 item.destination = getDestinationFromId(id);
+                item.length = length;
                 item.checksum = id.replaceAll("-", "");
                 downloader.offer(item);
             });
diff --git 
a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java 
b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java
index 70ce113d1a..bb840b2461 100644
--- a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java
+++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java
@@ -27,11 +27,14 @@ import java.io.Closeable;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
 import java.net.URL;
 import java.net.URLConnection;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.security.DigestOutputStream;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
@@ -53,8 +56,10 @@ import java.util.stream.Collectors;
 public class Downloader implements Closeable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(Downloader.class);
+    private static final long MAX_LENGTH_SINGLE_THREADED = 16 * 1024 * 1024;
 
     private final ExecutorService executorService;
+    private final ExecutorService executorServiceForParts;
     private final int connectTimeoutMs;
     private final int readTimeoutMs;
     private final int slowLogThreshold;
@@ -80,7 +85,8 @@ public class Downloader implements Closeable {
         if (maxRetries <= 0 || maxRetries > 100) {
             throw new IllegalArgumentException("maxRetries range must be 
between 1 and 100");
         }
-        LOG.info("Initializing Downloader with max number of concurrent 
requests={}", concurrency);
+        int corePoolSize = (int) Math.ceil(concurrency * .4);
+        LOG.info("Initializing Downloader with max number of concurrent 
requests={}, core pool size {}", concurrency, corePoolSize);
         this.connectTimeoutMs = connectTimeoutMs;
         this.readTimeoutMs = readTimeoutMs;
         this.slowLogThreshold = slowLogThreshold;
@@ -101,16 +107,21 @@ public class Downloader implements Closeable {
         this.bufferSize = bufferSize;
 
         this.executorService = new ThreadPoolExecutor(
-                (int) Math.ceil(concurrency * .1), concurrency, 60L, 
TimeUnit.SECONDS,
+                corePoolSize, concurrency, 60L, TimeUnit.SECONDS,
                 new LinkedBlockingQueue<>(),
                 
BasicThreadFactory.builder().namingPattern("downloader-%d").daemon().build()
         );
+        this.executorServiceForParts = new ThreadPoolExecutor(
+                corePoolSize, concurrency, 60L, TimeUnit.SECONDS,
+                new LinkedBlockingQueue<>(),
+                
BasicThreadFactory.builder().namingPattern("partDownloader-%d").daemon().build()
+        );
         this.responses = new ArrayList<>();
     }
 
     public void offer(Item item) {
         responses.add(
-                this.executorService.submit(new RetryingCallable<>(new 
DownloaderWorker(item)))
+                this.executorService.submit(new RetryingCallable<>(new 
DownloaderWorker(executorServiceForParts, item)))
         );
     }
 
@@ -146,9 +157,11 @@ public class Downloader implements Closeable {
 
     private class DownloaderWorker implements Callable<ItemResponse> {
 
+        private final ExecutorService executorService;
         private final Item item;
 
-        public DownloaderWorker(Item item) {
+        public DownloaderWorker(ExecutorService executorService, Item item) {
+            this.executorService = executorService;
             this.item = item;
         }
 
@@ -170,29 +183,86 @@ public class Downloader implements Closeable {
             Path destinationPath = Paths.get(item.destination);
             Files.createDirectories(destinationPath.getParent());
 
+            long segmentSize = MAX_LENGTH_SINGLE_THREADED;
             long size = 0;
-            try (InputStream inputStream = sourceUrl.getInputStream();
-                 FileOutputStream outputStream = new 
FileOutputStream(destinationPath.toFile())) {
-                byte[] buffer = new byte[bufferSize];
-                int bytesRead;
-                while ((bytesRead = inputStream.read(buffer)) != -1) {
-                    if (md != null) {
-                        md.update(buffer, 0, bytesRead);
+            if (item.length >= segmentSize) {
+                size = item.length;
+                LOG.info("Downloading large file {}: {} bytes", 
destinationPath.toString(), item.length);
+                String fileName = destinationPath.getFileName().toString();
+                long numSegments = (item.length + segmentSize - 1) / 
segmentSize;
+                ArrayList<Path> segmentFiles = new ArrayList<>();
+                ArrayList<Future<Boolean>> downloadTasks = new ArrayList<>();
+                for (int i = 0; i < numSegments; i++) {
+                    long startByte = i * segmentSize;
+                    long endByte = Math.min(startByte + segmentSize - 1, 
item.length - 1);
+                    Path segmentFile = 
destinationPath.getParent().resolve(fileName + "_" + i + ".tmp");
+                    segmentFiles.add(segmentFile);
+                    downloadTasks.add(executorService.submit(
+                        new Callable<Boolean>() {
+                            @Override
+                                public Boolean call() throws Exception {
+                                    Exception lastException = null;
+                                    for (int i = 0; i < maxRetries; i++) {
+                                        try {
+                                            return 
tryDownloadRange(item.source, connectTimeoutMs, readTimeoutMs,
+                                                    segmentFile, startByte, 
endByte);
+                                        } catch (Exception e) {
+                                            LOG.warn("Range download try # {} 
failed", i, e);
+                                            lastException = e;
+                                            // retry
+                                        }
+                                    }
+                                    throw lastException;
+                                }
+                        }
+                    ));
+                }
+                // wait for threads
+                boolean allSuccess = true;
+                for (int i = 0; i < downloadTasks.size(); i++) {
+                    try {
+                        boolean success = downloadTasks.get(i).get();
+                        if (!success) {
+                            allSuccess = false;
+                            break;
+                        }
+                    } catch (Exception e) {
+                        allSuccess = false;
+                        break;
+                    }
+                }
+                // merge
+                if (allSuccess) {
+                    try (OutputStream fileOut = 
Files.newOutputStream(destinationPath)) {
+                        OutputStream out = md == null ? fileOut : new 
DigestOutputStream(fileOut, md);
+                        for (Path segmentFile : segmentFiles) {
+                            if (Files.exists(segmentFile)) {
+                                Files.copy(segmentFile, out);
+                                Files.delete(segmentFile);
+                            }
+                        }
+                        LOG.info("Download {} size {}, {} parts", 
destinationPath.toString(), size, downloadTasks.size());
+                    }
+                } else {
+                    LOG.warn("Download {} failed", destinationPath.toString());
+                }
+            } else {
+                try (InputStream inputStream = sourceUrl.getInputStream();
+                     FileOutputStream out = new 
FileOutputStream(destinationPath.toFile())) {
+                    byte[] buffer = new byte[bufferSize];
+                    int bytesRead;
+                    while ((bytesRead = inputStream.read(buffer)) != -1) {
+                        if (md != null) {
+                            md.update(buffer, 0, bytesRead);
+                        }
+                        out.write(buffer, 0, bytesRead);
+                        size += bytesRead;
                     }
-                    outputStream.write(buffer, 0, bytesRead);
-                    size += bytesRead;
                 }
             }
 
             if (md != null) {
-                byte[] checksumBytes = md.digest();
-
-                // Convert the checksum bytes to a hexadecimal string
-                StringBuilder sb = new StringBuilder();
-                for (byte b : checksumBytes) {
-                    sb.append(String.format("%02x", b));
-                }
-                String checksum = sb.toString();
+                String checksum = getMessageDigestString(md);
                 // Warning: most modern checksum algorithms used for 
cryptographic purposes are designed to be case-insensitive,
                 // to ensure that the same checksum value is produced 
regardless of the input's case. There may be some
                 // legacy algorithms that are case-sensitive. Using 
equalsIgnoreCase can be considered safe here.
@@ -219,6 +289,45 @@ public class Downloader implements Closeable {
         }
     }
 
+    private static String getMessageDigestString(MessageDigest md) {
+        byte[] checksumBytes = md.digest();
+        // Convert the checksum bytes to a hexadecimal string
+        StringBuilder sb = new StringBuilder();
+        for (byte b : checksumBytes) {
+            sb.append(String.format("%02x", b));
+        }
+        return sb.toString();
+    }
+
+    private static boolean tryDownloadRange(String sourceURL, int 
connectTimeoutMs,
+            int readTimeoutMs, Path target, long startByte, long endByte) 
throws IOException {
+        HttpURLConnection connection = (HttpURLConnection) new 
URL(sourceURL).openConnection();
+        connection.setConnectTimeout(connectTimeoutMs);
+        connection.setReadTimeout(readTimeoutMs);
+        connection.setRequestProperty("Range", "bytes=" + startByte + "-" + 
endByte);
+        int responseCode = connection.getResponseCode();
+        if (responseCode != HttpURLConnection.HTTP_PARTIAL && responseCode != 
HttpURLConnection.HTTP_OK) {
+            throw new IOException("Unexpected response code: " + responseCode);
+        }
+        try (InputStream inputStream = connection.getInputStream();
+                OutputStream outputStream = Files.newOutputStream(target)) {
+            byte[] buffer = new byte[8192];
+            int bytesRead;
+            long totalBytesRead = 0;
+            long expectedBytes = endByte - startByte + 1;
+            while ((bytesRead = inputStream.read(buffer)) != -1) {
+                outputStream.write(buffer, 0, bytesRead);
+                totalBytesRead += bytesRead;
+                if (totalBytesRead >= expectedBytes) {
+                    break;
+                }
+            }
+            return true;
+        } finally {
+            connection.disconnect();
+        }
+    }
+
     private class RetryingCallable<V> implements Callable<V> {
         private final Callable<V> callable;
 
@@ -285,12 +394,14 @@ public class Downloader implements Closeable {
         public String source;
         public String destination;
         public String checksum;
+        public long length;
 
         @Override
         public String toString() {
             return "Item{" +
                     "source='" + source + '\'' +
                     ", destination='" + destination + '\'' +
+                    ", length=" + length +
                     (checksum != null ? ", checksum='" + checksum + '\'' : "") 
+
                     '}';
         }
diff --git 
a/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java 
b/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java
index d97d629e19..cdae57317f 100644
--- 
a/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java
+++ 
b/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java
@@ -547,6 +547,16 @@ public class DataStoreCommandTest {
         testConsistency(dump, data, false, false);
     }
 
+    @Test
+    public void getBlobLengthOrZero() {
+        assertEquals(1, DataStoreCommand.getBlobLengthOrZero("cafe#1"));
+        assertEquals(10, DataStoreCommand.getBlobLengthOrZero("cafe#10"));
+        assertEquals(0, DataStoreCommand.getBlobLengthOrZero("cafe"));
+        assertEquals(0, DataStoreCommand.getBlobLengthOrZero("#"));
+        assertEquals(0, DataStoreCommand.getBlobLengthOrZero(""));
+        assertEquals(0, DataStoreCommand.getBlobLengthOrZero(null));
+    }
+
     @Test
     public void testConsistencyMarkOnly() throws Exception {
         File dump = temporaryFolder.newFolder();
@@ -729,7 +739,7 @@ public class DataStoreCommandTest {
         }
         DataStoreCommand cmd = new DataStoreCommand();
         cmd.execute(argsList.toArray(new String[0]));
-        
+
         if (!markOnly) {
             assertFileEquals(dump, "avail-", SetUtils.difference(data.added, 
data.missingDataStore));
         } else {
@@ -743,7 +753,7 @@ public class DataStoreCommandTest {
                 (storeFixture instanceof StoreFixture.MongoStoreFixture) ?
                         encodedIdsAndPath(SetUtils.difference(data.added, 
data.deleted), blobFixture.getType(), data.idToPath, false) :
                         SetUtils.difference(data.added, data.deleted));
-        
+
         if (!markOnly) {
             // Verbose would have paths as well as ids changed but normally 
only DocumentNS would have paths suffixed
             assertFileEquals(dump, "gccand-", verbose ?

Reply via email to