>From Wail Alkowaileet <[email protected]>:

Wail Alkowaileet has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18387 )

Change subject: [ASTERIXDB-3438][STO] Ensure masks are written to cloud
......................................................................

[ASTERIXDB-3438][STO] Ensure masks are written to cloud

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Masks are not written to cloud, which could lead to
consider uncompleted merged components as valid.

Change-Id: I13afb363de3fe6dafabffd0d72c3cb037780f633
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18387
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Wail Alkowaileet <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIOOperationCallback.java
M 
asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIndexIOOperationCallbackFactory.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
M 
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
8 files changed, 57 insertions(+), 19 deletions(-)

Approvals:
  Murtadha Hubail: Looks good to me, approved
  Wail Alkowaileet: Looks good to me, but someone else must approve
  Jenkins: Verified; Verified




diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
index 246983d..e4f5ed9 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/MetadataBootstrapTask.java
@@ -60,7 +60,7 @@
         IIOManager persistenceIoManager = appContext.getPersistenceIoManager();
         FileReference bootstrapMarker = persistenceIoManager
                 
.resolve(StoragePathUtil.getBootstrapMarkerRelativePath(appContext.getNamespacePathResolver()));
-        persistenceIoManager.overwrite(bootstrapMarker, new byte[0]);
+        persistenceIoManager.create(bootstrapMarker);
     }

     private void deleteBootstrapMarker(INcApplicationContext appContext) 
throws HyracksDataException {
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
index 0d8677f..fa25f6d 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
@@ -25,6 +25,7 @@
 import 
org.apache.asterix.common.ioopcallbacks.LSMIndexIOOperationCallbackFactory;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.io.IJsonSerializable;
 import org.apache.hyracks.api.io.IPersistedResourceRegistry;
 import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
@@ -65,7 +66,7 @@
         // However, the counters for the failed operations are never reset 
since we expect them
         // To be always 0
         return new 
TestLsmIoOpCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index, 
getComponentIdGenerator(),
-                getIndexCheckpointManagerProvider());
+                getIndexCheckpointManagerProvider(), getIOManager());
     }

     public int getTotalFlushes() {
@@ -126,8 +127,8 @@
         private final TestLsmBtree lsmBtree;

         public TestLsmIoOpCallback(DatasetInfo dsInfo, ILSMIndex index, 
ILSMComponentIdGenerator idGenerator,
-                IIndexCheckpointManagerProvider checkpointManagerProvider) {
-            super(dsInfo, index, idGenerator.getId(), 
checkpointManagerProvider);
+                IIndexCheckpointManagerProvider checkpointManagerProvider, 
IIOManager ioManager) {
+            super(dsInfo, index, idGenerator.getId(), 
checkpointManagerProvider, ioManager);
             lsmBtree = (TestLsmBtree) index;
         }

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index c0823af..4dfcd5f 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -60,6 +60,7 @@

 public abstract class AbstractCloudIOManager extends IOManager implements 
IPartitionBootstrapper, ICloudIOManager {
     private static final Logger LOGGER = LogManager.getLogger();
+    private static final byte[] EMPTY_FILE_BYTES = "empty".getBytes();
     protected final ICloudClient cloudClient;
     protected final ICloudGuardian guardian;
     protected final IWriteBufferProvider writeBufferProvider;
@@ -302,6 +303,7 @@
     public final void create(FileReference fileRef) throws 
HyracksDataException {
         // We need to delete the local file on create as the cloud storage 
didn't complete the upload
         // In other words, both cloud files and the local files are not in sync
+        overwrite(fileRef, EMPTY_FILE_BYTES);
         localIoManager.delete(fileRef);
         localIoManager.create(fileRef);
     }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIOOperationCallback.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIOOperationCallback.java
index 08273a7..9ac0410 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIOOperationCallback.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIOOperationCallback.java
@@ -22,7 +22,7 @@
 import org.apache.asterix.common.context.DatasetInfo;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
@@ -33,8 +33,8 @@
 public class AtomicLSMIOOperationCallback extends LSMIOOperationCallback {

     public AtomicLSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex 
lsmIndex, ILSMComponentId componentId,
-            IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
-        super(dsInfo, lsmIndex, componentId, indexCheckpointManagerProvider);
+            IIndexCheckpointManagerProvider indexCheckpointManagerProvider, 
IIOManager ioManager) {
+        super(dsInfo, lsmIndex, componentId, indexCheckpointManagerProvider, 
ioManager);
     }

     @Override
@@ -48,7 +48,7 @@
         } else if (operation.getIOOperationType() == LSMIOOperationType.LOAD) {
             addComponentToCheckpoint(operation);
         } else if (isMerge(operation)) {
-            IoUtil.delete(getOperationMaskFilePath(operation));
+            ioManager.delete(getOperationMaskFilePath(operation));
         }
     }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIndexIOOperationCallbackFactory.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIndexIOOperationCallbackFactory.java
index 3829c54..259235b 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIndexIOOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIndexIOOperationCallbackFactory.java
@@ -40,6 +40,7 @@
         super(idGeneratorFactory, datasetInfoProvider);
     }

+    @Override
     protected IIndexCheckpointManagerProvider 
getIndexCheckpointManagerProvider() {
         return ((INcApplicationContext) 
ncCtx.getApplicationContext()).getIndexCheckpointManagerProvider();
     }
@@ -47,7 +48,7 @@
     @Override
     public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws 
HyracksDataException {
         return new 
AtomicLSMIOOperationCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index,
-                getComponentIdGenerator().getId(), 
getIndexCheckpointManagerProvider());
+                getComponentIdGenerator().getId(), 
getIndexCheckpointManagerProvider(), getIOManager());
     }

     @SuppressWarnings("squid:S1172") // unused parameter
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index fc895bc..87aa3bd 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -33,7 +33,7 @@
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.data.std.primitive.LongPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
@@ -69,6 +69,7 @@
     public static final long INVALID_LSN = -1L;
     private final ArrayBackedValueStorage buffer = new 
ArrayBackedValueStorage(Long.BYTES);
     private final IIndexCheckpointManagerProvider 
indexCheckpointManagerProvider;
+    protected final IIOManager ioManager;
     protected final DatasetInfo dsInfo;
     protected final ILSMIndex lsmIndex;
     private final int partition;
@@ -78,11 +79,12 @@
     private final Deque<ILSMComponentId> componentIds = new ArrayDeque<>();

     public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex, 
ILSMComponentId componentId,
-            IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
+            IIndexCheckpointManagerProvider indexCheckpointManagerProvider, 
IIOManager ioManager) {
         this.dsInfo = dsInfo;
         this.lsmIndex = lsmIndex;
         this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
         this.partition = 
ResourceReference.ofIndex(lsmIndex.getIndexIdentifier()).getPartitionNum();
+        this.ioManager = ioManager;
         componentIds.add(componentId);
     }

@@ -90,9 +92,9 @@
     public void beforeOperation(ILSMIOOperation operation) throws 
HyracksDataException {
         if (isMerge(operation)) {
             FileReference operationMaskFilePath = 
getOperationMaskFilePath(operation);
-            // if a merge operation is attempted after a failure, its mask 
file may already exists
-            if (!operationMaskFilePath.getFile().exists()) {
-                IoUtil.create(operationMaskFilePath);
+            // if a merge operation is attempted after a failure, its mask 
file may already exist
+            if (!ioManager.exists(operationMaskFilePath)) {
+                ioManager.create(operationMaskFilePath);
             } else {
                 LOGGER.warn("merge operation mask file {} already exists", 
operationMaskFilePath);
             }
@@ -135,7 +137,7 @@
                 || operation.getIOOperationType() == LSMIOOperationType.LOAD) {
             addComponentToCheckpoint(operation);
         } else if (isMerge(operation)) {
-            IoUtil.delete(getOperationMaskFilePath(operation));
+            ioManager.delete(getOperationMaskFilePath(operation));
         }
     }

diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
index 25cd8b2..2bb4c89 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
@@ -25,6 +25,7 @@
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.io.IJsonSerializable;
 import org.apache.hyracks.api.io.IPersistedResourceRegistry;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
@@ -68,10 +69,14 @@
         return ((INcApplicationContext) 
ncCtx.getApplicationContext()).getIndexCheckpointManagerProvider();
     }

+    protected IIOManager getIOManager() {
+        return ((INcApplicationContext) 
ncCtx.getApplicationContext()).getPersistenceIoManager();
+    }
+
     @Override
     public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws 
HyracksDataException {
         return new 
LSMIOOperationCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index,
-                getComponentIdGenerator().getId(), 
getIndexCheckpointManagerProvider());
+                getComponentIdGenerator().getId(), 
getIndexCheckpointManagerProvider(), getIOManager());
     }

     @Override
diff --git 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
index befdd1a..9963659 100644
--- 
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
+++ 
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
@@ -31,6 +31,7 @@
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
@@ -82,7 +83,7 @@
         DatasetInfo dsInfo = new DatasetInfo(101, null);
         LSMComponentIdGenerator idGenerator = new 
LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
         LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, 
mockIndex, idGenerator.getId(),
-                mockIndexCheckpointManagerProvider());
+                mockIndexCheckpointManagerProvider(), mockIOManager());
         //Flush first
         idGenerator.refresh();
         long flushLsn = 1L;
@@ -148,7 +149,7 @@
         ILSMMemoryComponent mockComponent = 
Mockito.mock(AbstractLSMMemoryComponent.class);
         
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
         LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, 
mockIndex, idGenerator.getId(),
-                mockIndexCheckpointManagerProvider());
+                mockIndexCheckpointManagerProvider(), mockIOManager());
         ILSMComponentId initialId = idGenerator.getId();
         // simulate a partition is flushed before allocated
         idGenerator.refresh();
@@ -170,7 +171,7 @@
         ILSMMemoryComponent mockComponent = 
Mockito.mock(AbstractLSMMemoryComponent.class);
         
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
         LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo, 
mockIndex, idGenerator.getId(),
-                mockIndexCheckpointManagerProvider());
+                mockIndexCheckpointManagerProvider(), mockIOManager());
         String indexId = "mockIndexId";
         ILSMComponentId id = idGenerator.getId();
         callback.recycled(mockComponent);
@@ -230,6 +231,10 @@
         return indexCheckpointManagerProvider;
     }

+    private IIOManager mockIOManager() {
+        return Mockito.mock(IIOManager.class);
+    }
+
     private static String getIndexPath() {
         return "storage/partition_0/dataverse/dataset/0/index";
     }

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18387
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I13afb363de3fe6dafabffd0d72c3cb037780f633
Gerrit-Change-Number: 18387
Gerrit-PatchSet: 3
Gerrit-Owner: Wail Alkowaileet <[email protected]>
Gerrit-Reviewer: Jenkins <[email protected]>
Gerrit-Reviewer: Murtadha Hubail <[email protected]>
Gerrit-Reviewer: Wail Alkowaileet <[email protected]>
Gerrit-MessageType: merged

Reply via email to