This is an automated email from the ASF dual-hosted git repository. thomasm pushed a commit to branch OAK-12004 in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
commit 82a2487b91a36061ee7fee0b217e22588722a1cd Author: Thomas Mueller <[email protected]> AuthorDate: Wed Nov 5 12:03:28 2025 +0100 OAK-12004 Datastore: speedup datastore copy --- .../jackrabbit/oak/run/DataStoreCommand.java | 32 ++++- .../jackrabbit/oak/run/DataStoreCopyCommand.java | 13 +- .../org/apache/jackrabbit/oak/run/Downloader.java | 153 ++++++++++++++++++--- .../jackrabbit/oak/run/DataStoreCommandTest.java | 14 +- 4 files changed, 181 insertions(+), 31 deletions(-) diff --git a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java index c9e84e29c7..8b7c69ca50 100644 --- a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java +++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCommand.java @@ -644,12 +644,8 @@ public class DataStoreCommand implements Command { String id = list.get(0); // Split b47b58169f121822cd4a0a0a153ba5910e581ad2bc450b6af7e51e6214c2b173#123311 on # to get the id - List<String> idLengthSepList = Arrays.stream(id.split(HASH)) - .map(String::trim) - .filter(s -> !s.isEmpty()) - .collect(Collectors.toList()); - String blobId = idLengthSepList.get(0); - + String blobId = id.split(HASH)[0]; + long length = getBlobLengthOrZero(id); if (dsType == FAKE || dsType == FDS) { // 0102030405... => 01/02/03/0102030405... blobId = String.join(System.getProperty("file.separator"), blobId.substring(0, 2), blobId.substring(2, 4), @@ -665,7 +661,7 @@ public class DataStoreCommand implements Command { // In case of blob ids dump, the list size would be 1 (Consisting of just the id) if (list.size() > 1) { // Join back the encoded blob ref and the path on which the ref is present - return String.join(DELIM, blobId, EscapeUtils.unescapeLineBreaks(list.get(1))); + return String.join(DELIM, blobId, EscapeUtils.unescapeLineBreaks(list.get(1)), "" + length); } else { // return the encoded blob id return blobId; @@ -687,6 +683,28 @@ public class DataStoreCommand implements Command { } } + /** + * Try to read the blob length from the blobId. It is typically after the '#'. + * It never throws an exception. + * + * @param blobId the blob id, which may contain a '#' + * @return the length, or 0 if unknown. + */ + public static long getBlobLengthOrZero(String blobId) { + if (blobId == null) { + return 0; + } + int hashIndex = blobId.indexOf('#'); + if (hashIndex < 0) { + return 0; + } + try { + return Long.parseLong(blobId.substring(hashIndex + 1)); + } catch (NumberFormatException e) { + return 0; + } + } + public static void main(String[] args) { long timestamp = System.currentTimeMillis(); SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); diff --git a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java index 7f54ad96a5..606371b4b5 100644 --- a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java +++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/DataStoreCopyCommand.java @@ -76,13 +76,24 @@ public class DataStoreCopyCommand implements Command { long startNano = System.nanoTime(); - ids.forEach(id -> { + ids.forEach(line -> { + String[] parts = line.split(","); + String id = parts[0]; + long length = 0; + if (parts.length > 2) { + try { + length = Long.parseLong(parts[2]); + } catch (NumberFormatException e) { + // ignore: length 0 + } + } Downloader.Item item = new Downloader.Item(); item.source = sourceRepo + "/" + id; if (sasToken != null) { item.source += "?" + sasToken; } item.destination = getDestinationFromId(id); + item.length = length; item.checksum = id.replaceAll("-", ""); downloader.offer(item); }); diff --git a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java index 70ce113d1a..bb840b2461 100644 --- a/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java +++ b/oak-run/src/main/java/org/apache/jackrabbit/oak/run/Downloader.java @@ -27,11 +27,14 @@ import java.io.Closeable; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; import java.net.URL; import java.net.URLConnection; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.security.DigestOutputStream; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; @@ -53,8 +56,10 @@ import java.util.stream.Collectors; public class Downloader implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(Downloader.class); + private static final long MAX_LENGTH_SINGLE_THREADED = 16 * 1024 * 1024; private final ExecutorService executorService; + private final ExecutorService executorServiceForParts; private final int connectTimeoutMs; private final int readTimeoutMs; private final int slowLogThreshold; @@ -80,7 +85,8 @@ public class Downloader implements Closeable { if (maxRetries <= 0 || maxRetries > 100) { throw new IllegalArgumentException("maxRetries range must be between 1 and 100"); } - LOG.info("Initializing Downloader with max number of concurrent requests={}", concurrency); + int corePoolSize = (int) Math.ceil(concurrency * .4); + LOG.info("Initializing Downloader with max number of concurrent requests={}, core pool size {}", concurrency, corePoolSize); this.connectTimeoutMs = connectTimeoutMs; this.readTimeoutMs = readTimeoutMs; this.slowLogThreshold = slowLogThreshold; @@ -101,16 +107,21 @@ public class Downloader implements Closeable { this.bufferSize = bufferSize; this.executorService = new ThreadPoolExecutor( - (int) Math.ceil(concurrency * .1), concurrency, 60L, TimeUnit.SECONDS, + corePoolSize, concurrency, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), BasicThreadFactory.builder().namingPattern("downloader-%d").daemon().build() ); + this.executorServiceForParts = new ThreadPoolExecutor( + corePoolSize, concurrency, 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + BasicThreadFactory.builder().namingPattern("partDownloader-%d").daemon().build() + ); this.responses = new ArrayList<>(); } public void offer(Item item) { responses.add( - this.executorService.submit(new RetryingCallable<>(new DownloaderWorker(item))) + this.executorService.submit(new RetryingCallable<>(new DownloaderWorker(executorServiceForParts, item))) ); } @@ -146,9 +157,11 @@ public class Downloader implements Closeable { private class DownloaderWorker implements Callable<ItemResponse> { + private final ExecutorService executorService; private final Item item; - public DownloaderWorker(Item item) { + public DownloaderWorker(ExecutorService executorService, Item item) { + this.executorService = executorService; this.item = item; } @@ -170,29 +183,86 @@ public class Downloader implements Closeable { Path destinationPath = Paths.get(item.destination); Files.createDirectories(destinationPath.getParent()); + long segmentSize = MAX_LENGTH_SINGLE_THREADED; long size = 0; - try (InputStream inputStream = sourceUrl.getInputStream(); - FileOutputStream outputStream = new FileOutputStream(destinationPath.toFile())) { - byte[] buffer = new byte[bufferSize]; - int bytesRead; - while ((bytesRead = inputStream.read(buffer)) != -1) { - if (md != null) { - md.update(buffer, 0, bytesRead); + if (item.length >= segmentSize) { + size = item.length; + LOG.info("Downloading large file {}: {} bytes", destinationPath.toString(), item.length); + String fileName = destinationPath.getFileName().toString(); + long numSegments = (item.length + segmentSize - 1) / segmentSize; + ArrayList<Path> segmentFiles = new ArrayList<>(); + ArrayList<Future<Boolean>> downloadTasks = new ArrayList<>(); + for (int i = 0; i < numSegments; i++) { + long startByte = i * segmentSize; + long endByte = Math.min(startByte + segmentSize - 1, item.length - 1); + Path segmentFile = destinationPath.getParent().resolve(fileName + "_" + i + ".tmp"); + segmentFiles.add(segmentFile); + downloadTasks.add(executorService.submit( + new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + Exception lastException = null; + for (int i = 0; i < maxRetries; i++) { + try { + return tryDownloadRange(item.source, connectTimeoutMs, readTimeoutMs, + segmentFile, startByte, endByte); + } catch (Exception e) { + LOG.warn("Range download try # {} failed", i, e); + lastException = e; + // retry + } + } + throw lastException; + } + } + )); + } + // wait for threads + boolean allSuccess = true; + for (int i = 0; i < downloadTasks.size(); i++) { + try { + boolean success = downloadTasks.get(i).get(); + if (!success) { + allSuccess = false; + break; + } + } catch (Exception e) { + allSuccess = false; + break; + } + } + // merge + if (allSuccess) { + try (OutputStream fileOut = Files.newOutputStream(destinationPath)) { + OutputStream out = md == null ? fileOut : new DigestOutputStream(fileOut, md); + for (Path segmentFile : segmentFiles) { + if (Files.exists(segmentFile)) { + Files.copy(segmentFile, out); + Files.delete(segmentFile); + } + } + LOG.info("Download {} size {}, {} parts", destinationPath.toString(), size, downloadTasks.size()); + } + } else { + LOG.warn("Download {} failed", destinationPath.toString()); + } + } else { + try (InputStream inputStream = sourceUrl.getInputStream(); + FileOutputStream out = new FileOutputStream(destinationPath.toFile())) { + byte[] buffer = new byte[bufferSize]; + int bytesRead; + while ((bytesRead = inputStream.read(buffer)) != -1) { + if (md != null) { + md.update(buffer, 0, bytesRead); + } + out.write(buffer, 0, bytesRead); + size += bytesRead; } - outputStream.write(buffer, 0, bytesRead); - size += bytesRead; } } if (md != null) { - byte[] checksumBytes = md.digest(); - - // Convert the checksum bytes to a hexadecimal string - StringBuilder sb = new StringBuilder(); - for (byte b : checksumBytes) { - sb.append(String.format("%02x", b)); - } - String checksum = sb.toString(); + String checksum = getMessageDigestString(md); // Warning: most modern checksum algorithms used for cryptographic purposes are designed to be case-insensitive, // to ensure that the same checksum value is produced regardless of the input's case. There may be some // legacy algorithms that are case-sensitive. Using equalsIgnoreCase can be considered safe here. @@ -219,6 +289,45 @@ public class Downloader implements Closeable { } } + private static String getMessageDigestString(MessageDigest md) { + byte[] checksumBytes = md.digest(); + // Convert the checksum bytes to a hexadecimal string + StringBuilder sb = new StringBuilder(); + for (byte b : checksumBytes) { + sb.append(String.format("%02x", b)); + } + return sb.toString(); + } + + private static boolean tryDownloadRange(String sourceURL, int connectTimeoutMs, + int readTimeoutMs, Path target, long startByte, long endByte) throws IOException { + HttpURLConnection connection = (HttpURLConnection) new URL(sourceURL).openConnection(); + connection.setConnectTimeout(connectTimeoutMs); + connection.setReadTimeout(readTimeoutMs); + connection.setRequestProperty("Range", "bytes=" + startByte + "-" + endByte); + int responseCode = connection.getResponseCode(); + if (responseCode != HttpURLConnection.HTTP_PARTIAL && responseCode != HttpURLConnection.HTTP_OK) { + throw new IOException("Unexpected response code: " + responseCode); + } + try (InputStream inputStream = connection.getInputStream(); + OutputStream outputStream = Files.newOutputStream(target)) { + byte[] buffer = new byte[8192]; + int bytesRead; + long totalBytesRead = 0; + long expectedBytes = endByte - startByte + 1; + while ((bytesRead = inputStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, bytesRead); + totalBytesRead += bytesRead; + if (totalBytesRead >= expectedBytes) { + break; + } + } + return true; + } finally { + connection.disconnect(); + } + } + private class RetryingCallable<V> implements Callable<V> { private final Callable<V> callable; @@ -285,12 +394,14 @@ public class Downloader implements Closeable { public String source; public String destination; public String checksum; + public long length; @Override public String toString() { return "Item{" + "source='" + source + '\'' + ", destination='" + destination + '\'' + + ", length=" + length + (checksum != null ? ", checksum='" + checksum + '\'' : "") + '}'; } diff --git a/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java b/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java index d97d629e19..cdae57317f 100644 --- a/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java +++ b/oak-run/src/test/java/org/apache/jackrabbit/oak/run/DataStoreCommandTest.java @@ -547,6 +547,16 @@ public class DataStoreCommandTest { testConsistency(dump, data, false, false); } + @Test + public void getBlobLengthOrZero() { + assertEquals(1, DataStoreCommand.getBlobLengthOrZero("cafe#1")); + assertEquals(10, DataStoreCommand.getBlobLengthOrZero("cafe#10")); + assertEquals(0, DataStoreCommand.getBlobLengthOrZero("cafe")); + assertEquals(0, DataStoreCommand.getBlobLengthOrZero("#")); + assertEquals(0, DataStoreCommand.getBlobLengthOrZero("")); + assertEquals(0, DataStoreCommand.getBlobLengthOrZero(null)); + } + @Test public void testConsistencyMarkOnly() throws Exception { File dump = temporaryFolder.newFolder(); @@ -729,7 +739,7 @@ public class DataStoreCommandTest { } DataStoreCommand cmd = new DataStoreCommand(); cmd.execute(argsList.toArray(new String[0])); - + if (!markOnly) { assertFileEquals(dump, "avail-", SetUtils.difference(data.added, data.missingDataStore)); } else { @@ -743,7 +753,7 @@ public class DataStoreCommandTest { (storeFixture instanceof StoreFixture.MongoStoreFixture) ? encodedIdsAndPath(SetUtils.difference(data.added, data.deleted), blobFixture.getType(), data.idToPath, false) : SetUtils.difference(data.added, data.deleted)); - + if (!markOnly) { // Verbose would have paths as well as ids changed but normally only DocumentNS would have paths suffixed assertFileEquals(dump, "gccand-", verbose ?
