This is an automated email from the ASF dual-hosted git repository.

miroslav pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 22c03154b9 OAK-11991 Optimize the oak-segment recovery process
22c03154b9 is described below

commit 22c03154b94f1bc82687da9ce0c3987d739be75c
Author: Ieran Bogdan <[email protected]>
AuthorDate: Thu Nov 6 16:05:40 2025 +0200

    OAK-11991 Optimize the oak-segment recovery process
---
 .../oak/segment/azure/AzureArchiveManager.java     | 113 ++++++++++++++++-----
 .../oak/segment/azure/AzureArchiveManagerTest.java |  29 ++++++
 2 files changed, 119 insertions(+), 23 deletions(-)

diff --git 
a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
 
b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
index 7861ce9fa7..fe1426a617 100644
--- 
a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
+++ 
b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.segment.azure;
 
 import com.azure.core.util.BinaryData;
 import com.azure.core.util.polling.PollResponse;
+import com.azure.core.util.polling.SyncPoller;
 import com.azure.storage.blob.BlobContainerClient;
 import com.azure.storage.blob.models.BlobCopyInfo;
 import com.azure.storage.blob.models.BlobItem;
@@ -61,6 +62,12 @@ public class AzureArchiveManager implements 
SegmentArchiveManager {
 
     private static final String CLOSED_ARCHIVE_MARKER = "closed";
 
+    public static final String COPY_BATCH_SIZE_PROP = 
"segment.azure.batch.copy.size";
+
+    private static final int DEFAULT_COPY_BATCH = 1000;
+
+    private final int COPY_BATCH = Integer.getInteger(COPY_BATCH_SIZE_PROP, 
DEFAULT_COPY_BATCH);
+
     protected final BlobContainerClient readBlobContainerClient;
 
     protected final BlobContainerClient writeBlobContainerClient;
@@ -209,15 +216,7 @@ public class AzureArchiveManager implements 
SegmentArchiveManager {
 
     @Override
     public void copyFile(String from, String to) throws IOException {
-        String targetDirectory = getDirectory(to);
-        getBlobs(from)
-                .forEach(blobItem -> {
-                    try {
-                        copyBlob(blobItem, targetDirectory);
-                    } catch (IOException e) {
-                        log.error("Can't copy segment {}", blobItem.getName(), 
e);
-                    }
-                });
+        batchCopyBlobs(getBlobs(from), to);
     }
 
     @Override
@@ -270,18 +269,17 @@ public class AzureArchiveManager implements 
SegmentArchiveManager {
         }
     }
 
-    private void delete(String archiveName, Set<UUID> recoveredEntries) throws 
IOException {
-        getBlobs(archiveName)
-                .forEach(blobItem -> {
-                    String name = getName(blobItem);
-                    if (RemoteUtilities.isSegmentName(name) && 
!recoveredEntries.contains(RemoteUtilities.getSegmentUUID(name))) {
-                        try {
-                            
writeBlobContainerClient.getBlobClient(blobItem.getName()).delete();
-                        } catch (BlobStorageException e) {
-                            log.error("Can't delete segment {}", 
blobItem.getName(), e);
-                        }
-                    }
-                });
+    private void delete(List<BlobItem> from, Set<UUID> recoveredEntries) {
+        from.forEach(blobItem -> {
+            String name = getName(blobItem);
+            if (RemoteUtilities.isSegmentName(name) && 
!recoveredEntries.contains(RemoteUtilities.getSegmentUUID(name))) {
+                try {
+                    
writeBlobContainerClient.getBlobClient(blobItem.getName()).delete();
+                } catch (BlobStorageException e) {
+                    log.error("Can't delete segment {}", blobItem.getName(), 
e);
+                }
+            }
+        });
     }
 
     /**
@@ -291,8 +289,9 @@ public class AzureArchiveManager implements 
SegmentArchiveManager {
      */
     @Override
     public void backup(@NotNull String archiveName, @NotNull String 
backupArchiveName, @NotNull Set<UUID> recoveredEntries) throws IOException {
-        copyFile(archiveName, backupArchiveName);
-        delete(archiveName, recoveredEntries);
+        List<BlobItem> blobItems = getBlobs(archiveName);
+        batchCopyBlobs(blobItems, backupArchiveName);
+        delete(blobItems, recoveredEntries);
     }
 
     /**
@@ -336,6 +335,74 @@ public class AzureArchiveManager implements 
SegmentArchiveManager {
 
     }
 
+    private void batchCopyBlobs(List<BlobItem> from, String to) {
+        String newParent = getDirectory(to);
+
+        if(from.isEmpty()) {
+            log.info("No blobs to copy to: {}", newParent);
+            return;
+        }
+
+        log.info("Start to copy {} blobs to {}", from.size(), newParent);
+
+        int batches = (int) Math.ceil(from.size() / (double) COPY_BATCH);
+        int start = 0;
+
+        for (int i = 0; i < batches; i++) {
+            int end = Math.min(start + COPY_BATCH, from.size());
+            log.info("Start batch {}/{}: {} to {}", i + 1, batches, start, 
end);
+            List<BlobItem> blobItemsBatch = new 
ArrayList<>(from.subList(start, end));
+            copyBlobs(blobItemsBatch, newParent);
+            start = end;
+        }
+    }
+
+    private void copyBlobs(List<BlobItem> blobs, String newParent) {
+        List<CopyBlob> copyBlobs = new ArrayList<>();
+        for (BlobItem blob : blobs) {
+            String destinationBlob = AzureUtilities.asAzurePrefix(newParent) + 
AzureUtilities.getName(blob);
+            try {
+                BlockBlobClient blobClient = 
readBlobContainerClient.getBlobClient(blob.getName()).getBlockBlobClient();
+
+                BlockBlobClient destinationBlobClient = 
writeBlobContainerClient.getBlobClient(destinationBlob).getBlockBlobClient();
+
+                SyncPoller<BlobCopyInfo, Void> copy = 
destinationBlobClient.beginCopy(blobClient.getBlobUrl(), null);
+
+                copyBlobs.add(new CopyBlob(copy, destinationBlob));
+            } catch (Exception e) {
+                log.error("Failed to start copying of blob {} to {}", 
blob.getName(), destinationBlob, e);
+            }
+        }
+
+        processBeginCopy(copyBlobs);
+    }
+
+    private void processBeginCopy(List<CopyBlob> copyBlobs) {
+        for (CopyBlob copy : copyBlobs) {
+            try {
+                CopyStatusType statusType = 
readBlobContainerClient.getBlobClient(copy.blobName).getBlockBlobClient().getProperties().getCopyStatus();
+                if (statusType == CopyStatusType.PENDING) {
+                    statusType = 
copy.poller.waitForCompletion().getValue().getCopyStatus();
+                }
+                if (statusType != CopyStatusType.SUCCESS) {
+                    log.warn("Failed to copy blob {}, status {}", 
copy.blobName, statusType.toString());
+                }
+            } catch (Exception e) {
+                log.error("Failed to copy blob {}, status {}", copy.blobName, 
copy.poller, e);
+            }
+        }
+    }
+
+    private static class CopyBlob {
+        private final SyncPoller<BlobCopyInfo, Void> poller;
+        private final String blobName;
+
+        public CopyBlob(SyncPoller<BlobCopyInfo, Void> poller, String 
blobName) {
+            this.poller = poller;
+            this.blobName = blobName;
+        }
+    }
+
     private static class RecoveredEntry implements Comparable<RecoveredEntry> {
 
         private final byte[] data;
diff --git 
a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java
 
b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java
index d220123bc6..c29e49922e 100644
--- 
a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java
+++ 
b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java
@@ -635,6 +635,35 @@ public class AzureArchiveManagerTest {
         assertTrue("Archive should not be deleted", 
readBlobContainerClient.listBlobs(new 
ListBlobsOptions().setPrefix("oak/data00000a.tar"), null).iterator().hasNext());
     }
 
+    @Test
+    public void testCopyFile() throws IOException {
+        final String archiveName = "data00003a.tar";
+        final String bakArchiveName = archiveName + ".bak";
+        final String rootPrefix = "oak";
+        final int numberOfBlobs = 25;
+
+        createBlobs(rootPrefix, archiveName, numberOfBlobs);
+
+        System.setProperty(AzureArchiveManager.COPY_BATCH_SIZE_PROP, "10");
+
+        SegmentArchiveManager manager = 
azurePersistence.createArchiveManager(false, false, new IOMonitorAdapter(), new 
FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
+        manager.copyFile(archiveName, bakArchiveName);
+
+        int numberOfBlobsCopied = (int) readBlobContainerClient.listBlobs(new 
ListBlobsOptions().setPrefix(rootPrefix + "/" + bakArchiveName), null)
+                .stream().count();
+
+        System.clearProperty(AzureArchiveManager.COPY_BATCH_SIZE_PROP);
+
+        assertEquals("blob from backup tar archive should not be copied", 
numberOfBlobs, numberOfBlobsCopied);
+    }
+
+    private void createBlobs(String rootPrefix, String archiveName, int 
numberOfBlobs) {
+        for (int i = 0; i < numberOfBlobs; i++) {
+            writeBlobContainerClient.getBlobClient(rootPrefix + "/" + 
archiveName + "/0004." + UUID.randomUUID().toString())
+                    
.getBlockBlobClient().upload(BinaryData.fromString("test-data-segment-content"));
+        }
+    }
+
     private void createArchive(SegmentArchiveManager manager, String 
archiveName) throws IOException {
         SegmentArchiveWriter writer = manager.create(archiveName);
         UUID u = UUID.randomUUID();

Reply via email to