This is an automated email from the ASF dual-hosted git repository.
miroslav pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 22c03154b9 OAK-11991 Optimize the oak-segment recovery process
22c03154b9 is described below
commit 22c03154b94f1bc82687da9ce0c3987d739be75c
Author: Ieran Bogdan <[email protected]>
AuthorDate: Thu Nov 6 16:05:40 2025 +0200
OAK-11991 Optimize the oak-segment recovery process
---
.../oak/segment/azure/AzureArchiveManager.java | 113 ++++++++++++++++-----
.../oak/segment/azure/AzureArchiveManagerTest.java | 29 ++++++
2 files changed, 119 insertions(+), 23 deletions(-)
diff --git
a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
index 7861ce9fa7..fe1426a617 100644
---
a/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
+++
b/oak-segment-azure/src/main/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManager.java
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.oak.segment.azure;
import com.azure.core.util.BinaryData;
import com.azure.core.util.polling.PollResponse;
+import com.azure.core.util.polling.SyncPoller;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.models.BlobCopyInfo;
import com.azure.storage.blob.models.BlobItem;
@@ -61,6 +62,12 @@ public class AzureArchiveManager implements
SegmentArchiveManager {
private static final String CLOSED_ARCHIVE_MARKER = "closed";
+ public static final String COPY_BATCH_SIZE_PROP =
"segment.azure.batch.copy.size";
+
+ private static final int DEFAULT_COPY_BATCH = 1000;
+
+ private final int COPY_BATCH = Integer.getInteger(COPY_BATCH_SIZE_PROP,
DEFAULT_COPY_BATCH);
+
protected final BlobContainerClient readBlobContainerClient;
protected final BlobContainerClient writeBlobContainerClient;
@@ -209,15 +216,7 @@ public class AzureArchiveManager implements
SegmentArchiveManager {
@Override
public void copyFile(String from, String to) throws IOException {
- String targetDirectory = getDirectory(to);
- getBlobs(from)
- .forEach(blobItem -> {
- try {
- copyBlob(blobItem, targetDirectory);
- } catch (IOException e) {
- log.error("Can't copy segment {}", blobItem.getName(),
e);
- }
- });
+ batchCopyBlobs(getBlobs(from), to);
}
@Override
@@ -270,18 +269,17 @@ public class AzureArchiveManager implements
SegmentArchiveManager {
}
}
- private void delete(String archiveName, Set<UUID> recoveredEntries) throws
IOException {
- getBlobs(archiveName)
- .forEach(blobItem -> {
- String name = getName(blobItem);
- if (RemoteUtilities.isSegmentName(name) &&
!recoveredEntries.contains(RemoteUtilities.getSegmentUUID(name))) {
- try {
-
writeBlobContainerClient.getBlobClient(blobItem.getName()).delete();
- } catch (BlobStorageException e) {
- log.error("Can't delete segment {}",
blobItem.getName(), e);
- }
- }
- });
+ private void delete(List<BlobItem> from, Set<UUID> recoveredEntries) {
+ from.forEach(blobItem -> {
+ String name = getName(blobItem);
+ if (RemoteUtilities.isSegmentName(name) &&
!recoveredEntries.contains(RemoteUtilities.getSegmentUUID(name))) {
+ try {
+
writeBlobContainerClient.getBlobClient(blobItem.getName()).delete();
+ } catch (BlobStorageException e) {
+ log.error("Can't delete segment {}", blobItem.getName(),
e);
+ }
+ }
+ });
}
/**
@@ -291,8 +289,9 @@ public class AzureArchiveManager implements
SegmentArchiveManager {
*/
@Override
public void backup(@NotNull String archiveName, @NotNull String
backupArchiveName, @NotNull Set<UUID> recoveredEntries) throws IOException {
- copyFile(archiveName, backupArchiveName);
- delete(archiveName, recoveredEntries);
+ List<BlobItem> blobItems = getBlobs(archiveName);
+ batchCopyBlobs(blobItems, backupArchiveName);
+ delete(blobItems, recoveredEntries);
}
/**
@@ -336,6 +335,74 @@ public class AzureArchiveManager implements
SegmentArchiveManager {
}
+ private void batchCopyBlobs(List<BlobItem> from, String to) {
+ String newParent = getDirectory(to);
+
+ if(from.isEmpty()) {
+ log.info("No blobs to copy to: {}", newParent);
+ return;
+ }
+
+ log.info("Start to copy {} blobs to {}", from.size(), newParent);
+
+ int batches = (int) Math.ceil(from.size() / (double) COPY_BATCH);
+ int start = 0;
+
+ for (int i = 0; i < batches; i++) {
+ int end = Math.min(start + COPY_BATCH, from.size());
+ log.info("Start batch {}/{}: {} to {}", i + 1, batches, start,
end);
+ List<BlobItem> blobItemsBatch = new
ArrayList<>(from.subList(start, end));
+ copyBlobs(blobItemsBatch, newParent);
+ start = end;
+ }
+ }
+
+ private void copyBlobs(List<BlobItem> blobs, String newParent) {
+ List<CopyBlob> copyBlobs = new ArrayList<>();
+ for (BlobItem blob : blobs) {
+ String destinationBlob = AzureUtilities.asAzurePrefix(newParent) +
AzureUtilities.getName(blob);
+ try {
+ BlockBlobClient blobClient =
readBlobContainerClient.getBlobClient(blob.getName()).getBlockBlobClient();
+
+ BlockBlobClient destinationBlobClient =
writeBlobContainerClient.getBlobClient(destinationBlob).getBlockBlobClient();
+
+ SyncPoller<BlobCopyInfo, Void> copy =
destinationBlobClient.beginCopy(blobClient.getBlobUrl(), null);
+
+ copyBlobs.add(new CopyBlob(copy, destinationBlob));
+ } catch (Exception e) {
+ log.error("Failed to start copying of blob {} to {}",
blob.getName(), destinationBlob, e);
+ }
+ }
+
+ processBeginCopy(copyBlobs);
+ }
+
+ private void processBeginCopy(List<CopyBlob> copyBlobs) {
+ for (CopyBlob copy : copyBlobs) {
+ try {
+ CopyStatusType statusType =
readBlobContainerClient.getBlobClient(copy.blobName).getBlockBlobClient().getProperties().getCopyStatus();
+ if (statusType == CopyStatusType.PENDING) {
+ statusType =
copy.poller.waitForCompletion().getValue().getCopyStatus();
+ }
+ if (statusType != CopyStatusType.SUCCESS) {
+ log.warn("Failed to copy blob {}, status {}",
copy.blobName, statusType.toString());
+ }
+ } catch (Exception e) {
+ log.error("Failed to copy blob {}, status {}", copy.blobName,
copy.poller, e);
+ }
+ }
+ }
+
+ private static class CopyBlob {
+ private final SyncPoller<BlobCopyInfo, Void> poller;
+ private final String blobName;
+
+ public CopyBlob(SyncPoller<BlobCopyInfo, Void> poller, String
blobName) {
+ this.poller = poller;
+ this.blobName = blobName;
+ }
+ }
+
private static class RecoveredEntry implements Comparable<RecoveredEntry> {
private final byte[] data;
diff --git
a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java
b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java
index d220123bc6..c29e49922e 100644
---
a/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java
+++
b/oak-segment-azure/src/test/java/org/apache/jackrabbit/oak/segment/azure/AzureArchiveManagerTest.java
@@ -635,6 +635,35 @@ public class AzureArchiveManagerTest {
assertTrue("Archive should not be deleted",
readBlobContainerClient.listBlobs(new
ListBlobsOptions().setPrefix("oak/data00000a.tar"), null).iterator().hasNext());
}
+ @Test
+ public void testCopyFile() throws IOException {
+ final String archiveName = "data00003a.tar";
+ final String bakArchiveName = archiveName + ".bak";
+ final String rootPrefix = "oak";
+ final int numberOfBlobs = 25;
+
+ createBlobs(rootPrefix, archiveName, numberOfBlobs);
+
+ System.setProperty(AzureArchiveManager.COPY_BATCH_SIZE_PROP, "10");
+
+ SegmentArchiveManager manager =
azurePersistence.createArchiveManager(false, false, new IOMonitorAdapter(), new
FileStoreMonitorAdapter(), new RemoteStoreMonitorAdapter());
+ manager.copyFile(archiveName, bakArchiveName);
+
+ int numberOfBlobsCopied = (int) readBlobContainerClient.listBlobs(new
ListBlobsOptions().setPrefix(rootPrefix + "/" + bakArchiveName), null)
+ .stream().count();
+
+ System.clearProperty(AzureArchiveManager.COPY_BATCH_SIZE_PROP);
+
+ assertEquals("blob from backup tar archive should not be copied",
numberOfBlobs, numberOfBlobsCopied);
+ }
+
+ private void createBlobs(String rootPrefix, String archiveName, int
numberOfBlobs) {
+ for (int i = 0; i < numberOfBlobs; i++) {
+ writeBlobContainerClient.getBlobClient(rootPrefix + "/" +
archiveName + "/0004." + UUID.randomUUID().toString())
+
.getBlockBlobClient().upload(BinaryData.fromString("test-data-segment-content"));
+ }
+ }
+
private void createArchive(SegmentArchiveManager manager, String
archiveName) throws IOException {
SegmentArchiveWriter writer = manager.create(archiveName);
UUID u = UUID.randomUUID();