>From Wail Alkowaileet <[email protected]>:
Wail Alkowaileet has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18387 )
Change subject: [ASTERIXDB-3438][STO] Ensure masks are written to cloud
......................................................................
[ASTERIXDB-3438][STO] Ensure masks are written to cloud
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Masks are not written to cloud, which could lead to
consider uncompleted merged components as valid.
Change-Id: I13afb363de3fe6dafabffd0d72c3cb037780f633
---
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIOOperationCallback.java
M
asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIndexIOOperationCallbackFactory.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
6 files changed, 50 insertions(+), 18 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/87/18387/1
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
index 0d8677f..fa25f6d 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
@@ -25,6 +25,7 @@
import
org.apache.asterix.common.ioopcallbacks.LSMIndexIOOperationCallbackFactory;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.io.IJsonSerializable;
import org.apache.hyracks.api.io.IPersistedResourceRegistry;
import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
@@ -65,7 +66,7 @@
// However, the counters for the failed operations are never reset
since we expect them
// To be always 0
return new
TestLsmIoOpCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index,
getComponentIdGenerator(),
- getIndexCheckpointManagerProvider());
+ getIndexCheckpointManagerProvider(), getIOManager());
}
public int getTotalFlushes() {
@@ -126,8 +127,8 @@
private final TestLsmBtree lsmBtree;
public TestLsmIoOpCallback(DatasetInfo dsInfo, ILSMIndex index,
ILSMComponentIdGenerator idGenerator,
- IIndexCheckpointManagerProvider checkpointManagerProvider) {
- super(dsInfo, index, idGenerator.getId(),
checkpointManagerProvider);
+ IIndexCheckpointManagerProvider checkpointManagerProvider,
IIOManager ioManager) {
+ super(dsInfo, index, idGenerator.getId(),
checkpointManagerProvider, ioManager);
lsmBtree = (TestLsmBtree) index;
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIOOperationCallback.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIOOperationCallback.java
index 08273a7..9ac0410 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIOOperationCallback.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIOOperationCallback.java
@@ -22,7 +22,7 @@
import org.apache.asterix.common.context.DatasetInfo;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
@@ -33,8 +33,8 @@
public class AtomicLSMIOOperationCallback extends LSMIOOperationCallback {
public AtomicLSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex
lsmIndex, ILSMComponentId componentId,
- IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
- super(dsInfo, lsmIndex, componentId, indexCheckpointManagerProvider);
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider,
IIOManager ioManager) {
+ super(dsInfo, lsmIndex, componentId, indexCheckpointManagerProvider,
ioManager);
}
@Override
@@ -48,7 +48,7 @@
} else if (operation.getIOOperationType() == LSMIOOperationType.LOAD) {
addComponentToCheckpoint(operation);
} else if (isMerge(operation)) {
- IoUtil.delete(getOperationMaskFilePath(operation));
+ ioManager.delete(getOperationMaskFilePath(operation));
}
}
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIndexIOOperationCallbackFactory.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIndexIOOperationCallbackFactory.java
index 3829c54..259235b 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIndexIOOperationCallbackFactory.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIndexIOOperationCallbackFactory.java
@@ -40,6 +40,7 @@
super(idGeneratorFactory, datasetInfoProvider);
}
+ @Override
protected IIndexCheckpointManagerProvider
getIndexCheckpointManagerProvider() {
return ((INcApplicationContext)
ncCtx.getApplicationContext()).getIndexCheckpointManagerProvider();
}
@@ -47,7 +48,7 @@
@Override
public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws
HyracksDataException {
return new
AtomicLSMIOOperationCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index,
- getComponentIdGenerator().getId(),
getIndexCheckpointManagerProvider());
+ getComponentIdGenerator().getId(),
getIndexCheckpointManagerProvider(), getIOManager());
}
@SuppressWarnings("squid:S1172") // unused parameter
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index fc895bc..4011c95 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -33,7 +33,7 @@
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.data.std.primitive.LongPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
@@ -60,6 +60,7 @@
// A single LSMIOOperationCallback per LSM index used to perform actions
around Flush and Merge operations
public class LSMIOOperationCallback implements ILSMIOOperationCallback {
private static final Logger LOGGER = LogManager.getLogger();
+ private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
public static final String KEY_FLUSH_LOG_LSN = "FlushLogLsn";
public static final String KEY_NEXT_COMPONENT_ID = "NextComponentId";
public static final String KEY_FLUSHED_COMPONENT_ID = "FlushedComponentId";
@@ -69,6 +70,7 @@
public static final long INVALID_LSN = -1L;
private final ArrayBackedValueStorage buffer = new
ArrayBackedValueStorage(Long.BYTES);
private final IIndexCheckpointManagerProvider
indexCheckpointManagerProvider;
+ protected final IIOManager ioManager;
protected final DatasetInfo dsInfo;
protected final ILSMIndex lsmIndex;
private final int partition;
@@ -78,11 +80,12 @@
private final Deque<ILSMComponentId> componentIds = new ArrayDeque<>();
public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex,
ILSMComponentId componentId,
- IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider,
IIOManager ioManager) {
this.dsInfo = dsInfo;
this.lsmIndex = lsmIndex;
this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
this.partition =
ResourceReference.ofIndex(lsmIndex.getIndexIdentifier()).getPartitionNum();
+ this.ioManager = ioManager;
componentIds.add(componentId);
}
@@ -90,9 +93,9 @@
public void beforeOperation(ILSMIOOperation operation) throws
HyracksDataException {
if (isMerge(operation)) {
FileReference operationMaskFilePath =
getOperationMaskFilePath(operation);
- // if a merge operation is attempted after a failure, its mask
file may already exists
- if (!operationMaskFilePath.getFile().exists()) {
- IoUtil.create(operationMaskFilePath);
+ // if a merge operation is attempted after a failure, its mask
file may already exist
+ if (!ioManager.exists(operationMaskFilePath)) {
+ ioManager.overwrite(operationMaskFilePath, EMPTY_BYTE_ARRAY);
} else {
LOGGER.warn("merge operation mask file {} already exists",
operationMaskFilePath);
}
@@ -135,7 +138,7 @@
|| operation.getIOOperationType() == LSMIOOperationType.LOAD) {
addComponentToCheckpoint(operation);
} else if (isMerge(operation)) {
- IoUtil.delete(getOperationMaskFilePath(operation));
+ ioManager.delete(getOperationMaskFilePath(operation));
}
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
index 25cd8b2..2bb4c89 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
@@ -25,6 +25,7 @@
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.io.IJsonSerializable;
import org.apache.hyracks.api.io.IPersistedResourceRegistry;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
@@ -68,10 +69,14 @@
return ((INcApplicationContext)
ncCtx.getApplicationContext()).getIndexCheckpointManagerProvider();
}
+ protected IIOManager getIOManager() {
+ return ((INcApplicationContext)
ncCtx.getApplicationContext()).getPersistenceIoManager();
+ }
+
@Override
public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws
HyracksDataException {
return new
LSMIOOperationCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index,
- getComponentIdGenerator().getId(),
getIndexCheckpointManagerProvider());
+ getComponentIdGenerator().getId(),
getIndexCheckpointManagerProvider(), getIOManager());
}
@Override
diff --git
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
index befdd1a..9963659 100644
---
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
+++
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
@@ -31,6 +31,7 @@
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
@@ -82,7 +83,7 @@
DatasetInfo dsInfo = new DatasetInfo(101, null);
LSMComponentIdGenerator idGenerator = new
LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo,
mockIndex, idGenerator.getId(),
- mockIndexCheckpointManagerProvider());
+ mockIndexCheckpointManagerProvider(), mockIOManager());
//Flush first
idGenerator.refresh();
long flushLsn = 1L;
@@ -148,7 +149,7 @@
ILSMMemoryComponent mockComponent =
Mockito.mock(AbstractLSMMemoryComponent.class);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo,
mockIndex, idGenerator.getId(),
- mockIndexCheckpointManagerProvider());
+ mockIndexCheckpointManagerProvider(), mockIOManager());
ILSMComponentId initialId = idGenerator.getId();
// simulate a partition is flushed before allocated
idGenerator.refresh();
@@ -170,7 +171,7 @@
ILSMMemoryComponent mockComponent =
Mockito.mock(AbstractLSMMemoryComponent.class);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo,
mockIndex, idGenerator.getId(),
- mockIndexCheckpointManagerProvider());
+ mockIndexCheckpointManagerProvider(), mockIOManager());
String indexId = "mockIndexId";
ILSMComponentId id = idGenerator.getId();
callback.recycled(mockComponent);
@@ -230,6 +231,10 @@
return indexCheckpointManagerProvider;
}
+ private IIOManager mockIOManager() {
+ return Mockito.mock(IIOManager.class);
+ }
+
private static String getIndexPath() {
return "storage/partition_0/dataverse/dataset/0/index";
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18387
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I13afb363de3fe6dafabffd0d72c3cb037780f633
Gerrit-Change-Number: 18387
Gerrit-PatchSet: 1
Gerrit-Owner: Wail Alkowaileet <[email protected]>
Gerrit-MessageType: newchange