Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2948
Change subject: [NO ISSUE][STO] Ensure First Component ID is Initialized
......................................................................
[NO ISSUE][STO] Ensure First Component ID is Initialized
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Initialize the component id generator from the primary
index checkpoint, if exits, as soon as it is created.
- Ensure the first component id is passed to all indexes.
Change-Id: I246f9373f950e2f9a2c63f86746462e42a3f1c62
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
M
asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
14 files changed, 90 insertions(+), 95 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/48/2948/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index adf9960..946815f 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -300,7 +300,6 @@
TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1,
false);
ILogRecord logRecord = null;
- ILSMComponentIdGenerator idGenerator = null;
try {
logReader.setPosition(lowWaterMarkLSN);
logRecord = logReader.next();
@@ -389,8 +388,7 @@
int partition = logRecord.getResourcePartition();
if (partitions.contains(partition)) {
int datasetId = logRecord.getDatasetId();
- idGenerator =
datasetLifecycleManager.getComponentIdGenerator(datasetId, partition);
- if (idGenerator == null) {
+ if
(!datasetLifecycleManager.isRegistered(datasetId)) {
// it's possible this dataset has been dropped
logRecord = logReader.next();
continue;
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index 017c59f..703f6d3 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -56,6 +56,7 @@
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.test.support.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -75,10 +76,12 @@
private static ITransactionContext txnCtx;
private static LSMInsertDeleteOperatorNodePushable insertOp;
private static final int PARTITION = 0;
+ private static String indexPath;
@BeforeClass
public static void setUp() throws Exception {
System.out.println("SetUp: ");
+ TestUtils.redirectLoggingToConsole();
TestHelper.deleteExistingInstanceFiles();
String configPath = System.getProperty("user.dir") + File.separator +
"src" + File.separator + "test"
+ File.separator + "resources" + File.separator + "cc.conf";
@@ -109,6 +112,7 @@
txnCtx =
nc.getTransactionManager().beginTransaction(nc.getTxnJobId(ctx),
new
TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL));
insertOp = StorageTestUtils.getInsertPipeline(nc, ctx);
+ indexPath = indexDataflowHelper.getResource().getPath();
}
@After
@@ -148,9 +152,10 @@
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
StorageTestUtils.searchAndAssertCount(nc, PARTITION,
StorageTestUtils.TOTAL_NUM_OF_RECORDS);
ILSMIndexAccessor lsmAccessor =
lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID,
PARTITION).refresh();
+
+
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION,
indexPath).refresh();
ILSMComponentId next =
-
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID,
PARTITION).getId();
+
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION,
indexPath).getId();
long flushLsn =
nc.getTransactionSubsystem().getLogManager().getAppendLSN();
Map<String, Object> flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -163,8 +168,8 @@
// rollback the last disk component
lsmAccessor =
lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn =
LSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
-
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID,
PARTITION).refresh();
- next =
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID,
PARTITION).getId();
+
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION,
indexPath).refresh();
+ next =
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION,
indexPath).getId();
flushLsn =
nc.getTransactionSubsystem().getLogManager().getAppendLSN();
flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -216,9 +221,9 @@
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
StorageTestUtils.searchAndAssertCount(nc, PARTITION,
StorageTestUtils.TOTAL_NUM_OF_RECORDS);
ILSMIndexAccessor lsmAccessor =
lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID,
PARTITION).refresh();
+
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION,
indexPath).refresh();
ILSMComponentId next =
-
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID,
PARTITION).getId();
+
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION,
indexPath).getId();
long flushLsn =
nc.getTransactionSubsystem().getLogManager().getAppendLSN();
Map<String, Object> flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -248,8 +253,8 @@
// rollback the last disk component
lsmAccessor =
lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn =
LSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
-
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID,
PARTITION).refresh();
- next =
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID,
PARTITION).getId();
+
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION,
indexPath).refresh();
+ next =
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION,
indexPath).getId();
flushLsn =
nc.getTransactionSubsystem().getLogManager().getAppendLSN();
flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -302,9 +307,9 @@
firstSearcher.waitUntilEntered();
// now that we enetered, we will rollback
ILSMIndexAccessor lsmAccessor =
lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID,
PARTITION).refresh();
+
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION,
indexPath).refresh();
ILSMComponentId next =
-
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID,
PARTITION).getId();
+
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION,
indexPath).getId();
long flushLsn =
nc.getTransactionSubsystem().getLogManager().getAppendLSN();
Map<String, Object> flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -329,8 +334,8 @@
secondSearcher.waitUntilEntered();
lsmAccessor =
lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn =
LSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
-
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID,
PARTITION).refresh();
- next =
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID,
PARTITION).getId();
+
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION,
indexPath).refresh();
+ next =
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION,
indexPath).getId();
flushLsn =
nc.getTransactionSubsystem().getLogManager().getAppendLSN();
flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
@@ -738,9 +743,10 @@
public void run() {
ILSMIndexAccessor lsmAccessor =
lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
try {
-
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID,
PARTITION).refresh();
- ILSMComponentId next =
-
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID,
PARTITION).getId();
+
dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION,
indexPath)
+ .refresh();
+ ILSMComponentId next = dsLifecycleMgr
+
.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION,
indexPath).getId();
long flushLsn =
nc.getTransactionSubsystem().getLogManager().getAppendLSN();
Map<String, Object> flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN,
flushLsn);
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
index f4c0ea6..c727f52 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmIoOpCallbackFactory.java
@@ -127,7 +127,7 @@
public TestLsmIoOpCallback(DatasetInfo dsInfo, ILSMIndex index,
ILSMComponentIdGenerator idGenerator,
IIndexCheckpointManagerProvider checkpointManagerProvider) {
- super(dsInfo, index, idGenerator, checkpointManagerProvider);
+ super(dsInfo, index, idGenerator.getId(),
checkpointManagerProvider);
lsmBtree = (TestLsmBtree) index;
}
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
index 22ee542..c4390fa 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
@@ -55,12 +55,13 @@
DatasetLifecycleManager dslcManager = (DatasetLifecycleManager)
appCtx.getDatasetLifecycleManager();
DatasetResource dsr = dslcManager.getDatasetLifecycle(datasetId);
int partition =
StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
- PrimaryIndexOperationTracker opTracker =
dslcManager.getOperationTracker(datasetId, partition);
+ PrimaryIndexOperationTracker opTracker =
+ dslcManager.getOperationTracker(datasetId, partition,
resource.getPath());
if (!(opTracker instanceof TestPrimaryIndexOperationTracker)) {
Field opTrackersField =
DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
opTracker = new TestPrimaryIndexOperationTracker(datasetId,
partition,
appCtx.getTransactionSubsystem().getLogManager(),
dsr.getDatasetInfo(),
- dslcManager.getComponentIdGenerator(datasetId,
partition));
+ dslcManager.getComponentIdGenerator(datasetId,
partition, resource.getPath()));
replaceMapEntry(opTrackersField, dsr, partition, opTracker);
}
return opTracker;
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 4441c6e..d18b6ab 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -40,6 +40,15 @@
IIndex getIndex(int datasetId, long indexId) throws HyracksDataException;
/**
+ * Indicates if the dataset with id {@code datasetId} is currently
registered
+ * with this {@link IDatasetLifecycleManager}
+ *
+ * @param datasetId
+ * @return true if the dataset is currently registered. Otherwise false.
+ */
+ boolean isRegistered(int datasetId);
+
+ /**
* Flushes all open datasets synchronously.
*
* @throws HyracksDataException
@@ -76,18 +85,20 @@
*
* @param datasetId
* @param partition
+ * @param path
* @return
*/
- PrimaryIndexOperationTracker getOperationTracker(int datasetId, int
partition);
+ PrimaryIndexOperationTracker getOperationTracker(int datasetId, int
partition, String path);
/**
* creates (if necessary) and returns the component Id generator of a
dataset.
*
* @param datasetId
* @param partition
+ * @param path
* @return
*/
- ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int
partition);
+ ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int
partition, String path);
/**
* creates (if necessary) and returns the dataset virtual buffer caches.
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
index 6476e68..811da78 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
@@ -53,7 +53,7 @@
IDatasetLifecycleManager dslcManager =
((INcApplicationContext)
serviceCtx.getApplicationContext()).getDatasetLifecycleManager();
int partition =
StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
- return dslcManager.getComponentIdGenerator(datasetId, partition);
+ return dslcManager.getComponentIdGenerator(datasetId, partition,
resource.getPath());
}
@Override
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 682eaea..c2e9978 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -38,7 +38,9 @@
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.LogType;
@@ -113,9 +115,6 @@
datasetResource = getDatasetLifecycle(did);
}
datasetResource.register(resource, (ILSMIndex) index);
- if (((ILSMIndex) index).isPrimaryIndex()) {
- initializeDatasetPartitionValidComponentId(datasetResource,
resource);
- }
}
private int getDIDfromResourcePath(String resourcePath) throws
HyracksDataException {
@@ -326,32 +325,36 @@
}
@Override
- public synchronized PrimaryIndexOperationTracker getOperationTracker(int
datasetId, int partition) {
+ public synchronized PrimaryIndexOperationTracker getOperationTracker(int
datasetId, int partition, String path) {
DatasetResource dataset = datasets.get(datasetId);
PrimaryIndexOperationTracker opTracker =
dataset.getOpTracker(partition);
if (opTracker == null) {
- populateOpTrackerAndIdGenerator(dataset, partition);
+ populateOpTrackerAndIdGenerator(dataset, partition, path);
opTracker = dataset.getOpTracker(partition);
}
return opTracker;
}
@Override
- public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int
datasetId, int partition) {
+ public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int
datasetId, int partition, String path) {
DatasetResource dataset = datasets.get(datasetId);
- if (dataset == null) {
- return null;
- }
ILSMComponentIdGenerator generator =
dataset.getComponentIdGenerator(partition);
if (generator == null) {
- populateOpTrackerAndIdGenerator(dataset, partition);
+ populateOpTrackerAndIdGenerator(dataset, partition, path);
generator = dataset.getComponentIdGenerator(partition);
}
return generator;
}
- private void populateOpTrackerAndIdGenerator(DatasetResource dataset, int
partition) {
- ILSMComponentIdGenerator idGenerator = new
LSMComponentIdGenerator(storageProperties.getMemoryComponentsNum());
+ @Override
+ public synchronized boolean isRegistered(int datasetId) {
+ return datasets.containsKey(datasetId);
+ }
+
+ private void populateOpTrackerAndIdGenerator(DatasetResource dataset, int
partition, String path) {
+ final long lastValidId = getDatasetLastValidComponentId(path);
+ ILSMComponentIdGenerator idGenerator =
+ new
LSMComponentIdGenerator(storageProperties.getMemoryComponentsNum(),
lastValidId);
PrimaryIndexOperationTracker opTracker = new
PrimaryIndexOperationTracker(dataset.getDatasetID(), partition,
logManager, dataset.getDatasetInfo(), idGenerator);
dataset.setPrimaryIndexOperationTracker(partition, opTracker);
@@ -605,21 +608,15 @@
}
}
- private void initializeDatasetPartitionValidComponentId(DatasetResource
datasetResource,
- LocalResource primaryIndexResource) {
- final IndexInfo indexInfo =
datasetResource.getIndexInfo(primaryIndexResource.getId());
- final int partition = indexInfo.getPartition();
- final ILSMComponentIdGenerator componentIdGenerator =
- getComponentIdGenerator(datasetResource.getDatasetID(),
partition);
- final long indexLastValidComponentId =
getIndexLastValidComponentId(indexInfo.getLocalResource());
- componentIdGenerator.init(indexLastValidComponentId);
- }
-
- private long getIndexLastValidComponentId(LocalResource resource) {
+ private long getDatasetLastValidComponentId(String indexPath) {
try {
- final DatasetResourceReference datasetResource =
DatasetResourceReference.of(resource);
- return
Math.max(indexCheckpointManagerProvider.get(datasetResource).getLatest().getLastComponentId(),
- MIN_VALID_COMPONENT_ID);
+ final ResourceReference indexRef =
ResourceReference.ofIndex(indexPath);
+ final ResourceReference primaryIndexRef =
indexRef.getDatasetReference();
+ final IIndexCheckpointManager iIndexCheckpointManager =
indexCheckpointManagerProvider.get(primaryIndexRef);
+ if (iIndexCheckpointManager.getCheckpointCount() > 0) {
+ return
Math.max(iIndexCheckpointManager.getLatest().getLastComponentId(),
MIN_VALID_COMPONENT_ID);
+ }
+ return MIN_VALID_COMPONENT_ID;
} catch (HyracksDataException e) {
throw new IllegalStateException(e);
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 606d63c..53bc31b 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -37,7 +37,6 @@
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
@@ -69,19 +68,18 @@
private final IIndexCheckpointManagerProvider
indexCheckpointManagerProvider;
protected final DatasetInfo dsInfo;
protected final ILSMIndex lsmIndex;
- private final ILSMComponentIdGenerator componentIdGenerator;
private long firstLsnForCurrentMemoryComponent = 0L;
private long persistenceLsn = 0L;
private int pendingFlushes = 0;
private Deque<ILSMComponentId> componentIds = new ArrayDeque<>();
private boolean firstAllocation = true;
- public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex,
ILSMComponentIdGenerator componentIdGenerator,
+ public LSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex lsmIndex,
ILSMComponentId componentId,
IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
this.dsInfo = dsInfo;
this.lsmIndex = lsmIndex;
- this.componentIdGenerator = componentIdGenerator;
this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
+ componentIds.add(componentId);
}
@Override
@@ -278,9 +276,6 @@
@Override
public void allocated(ILSMMemoryComponent component) throws
HyracksDataException {
- if (firstAllocation) {
- firstAllocation = false;
- componentIds.add(componentIdGenerator.getId());
- }
+ // no op
}
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
index 68ffd6a..25cd8b2 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIndexIOOperationCallbackFactory.java
@@ -70,8 +70,8 @@
@Override
public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws
HyracksDataException {
- return new
LSMIOOperationCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index,
getComponentIdGenerator(),
- getIndexCheckpointManagerProvider());
+ return new
LSMIOOperationCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index,
+ getComponentIdGenerator().getId(),
getIndexCheckpointManagerProvider());
}
@Override
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
index 8bd848d..a24bf72 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
@@ -38,6 +38,10 @@
protected ResourceReference() {
}
+ public static ResourceReference ofIndex(String indexPath) {
+ return of(new File(indexPath,
StorageConstants.METADATA_FILE_NAME).toString());
+ }
+
public static ResourceReference of(String localResourcePath) {
ResourceReference lrr = new ResourceReference();
parse(lrr, localResourcePath);
@@ -72,6 +76,11 @@
return Paths.get(root, partition, dataverse, dataset, rebalance,
index);
}
+ public ResourceReference getDatasetReference() {
+ return ResourceReference
+ .ofIndex(Paths.get(root, partition, dataverse, dataset,
rebalance, dataset).toFile().getPath());
+ }
+
public Path getFileRelativePath() {
return Paths.get(root, partition, dataverse, dataset, rebalance,
index, name);
}
diff --git
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
index 4b18e1d..db0911b 100644
---
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
+++
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
@@ -78,10 +78,9 @@
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(Mockito.mock(AbstractLSMMemoryComponent.class));
DatasetInfo dsInfo = new DatasetInfo(101, null);
- LSMComponentIdGenerator idGenerator = new
LSMComponentIdGenerator(numMemoryComponents);
- idGenerator.init(MIN_VALID_COMPONENT_ID);
- LSMIOOperationCallback callback =
- new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator,
mockIndexCheckpointManagerProvider());
+ LSMComponentIdGenerator idGenerator = new
LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
+ LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo,
mockIndex, idGenerator.getId(),
+ mockIndexCheckpointManagerProvider());
//Flush first
idGenerator.refresh();
long flushLsn = 1L;
@@ -138,19 +137,19 @@
public void testAllocateComponentId() throws HyracksDataException {
int numMemoryComponents = 2;
DatasetInfo dsInfo = new DatasetInfo(101, null);
- ILSMComponentIdGenerator idGenerator = new
LSMComponentIdGenerator(numMemoryComponents);
- idGenerator.init(MIN_VALID_COMPONENT_ID);
+ ILSMComponentIdGenerator idGenerator = new
LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
ILSMMemoryComponent mockComponent =
Mockito.mock(AbstractLSMMemoryComponent.class);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
- LSMIOOperationCallback callback =
- new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator,
mockIndexCheckpointManagerProvider());
- callback.allocated(mockComponent);
+ LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo,
mockIndex, idGenerator.getId(),
+ mockIndexCheckpointManagerProvider());
ILSMComponentId initialId = idGenerator.getId();
+ // simulate a partition is flushed before allocated
idGenerator.refresh();
long flushLsn = 1L;
ILSMComponentId nextComponentId = idGenerator.getId();
+ callback.allocated(mockComponent);
callback.recycled(mockComponent);
checkMemoryComponent(initialId, mockComponent);
}
@@ -159,16 +158,14 @@
public void testRecycleComponentId() throws HyracksDataException {
int numMemoryComponents = 2;
DatasetInfo dsInfo = new DatasetInfo(101, null);
- ILSMComponentIdGenerator idGenerator = new
LSMComponentIdGenerator(numMemoryComponents);
- idGenerator.init(MIN_VALID_COMPONENT_ID);
+ ILSMComponentIdGenerator idGenerator = new
LSMComponentIdGenerator(numMemoryComponents, MIN_VALID_COMPONENT_ID);
ILSMIndex mockIndex = Mockito.mock(ILSMIndex.class);
Mockito.when(mockIndex.getNumberOfAllMemoryComponents()).thenReturn(numMemoryComponents);
ILSMMemoryComponent mockComponent =
Mockito.mock(AbstractLSMMemoryComponent.class);
Mockito.when(mockIndex.getCurrentMemoryComponent()).thenReturn(mockComponent);
- LSMIOOperationCallback callback =
- new LSMIOOperationCallback(dsInfo, mockIndex, idGenerator,
mockIndexCheckpointManagerProvider());
+ LSMIOOperationCallback callback = new LSMIOOperationCallback(dsInfo,
mockIndex, idGenerator.getId(),
+ mockIndexCheckpointManagerProvider());
String indexId = "mockIndexId";
- callback.allocated(mockComponent);
ILSMComponentId id = idGenerator.getId();
callback.recycled(mockComponent);
checkMemoryComponent(id, mockComponent);
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
index e353714..ab84211 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
@@ -49,7 +49,7 @@
IDatasetLifecycleManager dslcManager =
((INcApplicationContext)
ctx.getApplicationContext()).getDatasetLifecycleManager();
int partition =
StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
- return dslcManager.getOperationTracker(datasetId, partition);
+ return dslcManager.getOperationTracker(datasetId, partition,
resource.getPath());
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
index 0af06d7..a5c6360 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMComponentIdGenerator.java
@@ -38,11 +38,4 @@
* @return the index of the current memory component
*/
int getCurrentComponentIndex();
-
- /**
- * Initializes this {@link ILSMComponentIdGenerator} by setting the last
used id
- *
- * @param lastUsedId
- */
- void init(long lastUsedId);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
index e6bf0ab..21a27a9 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMComponentIdGenerator.java
@@ -33,25 +33,16 @@
private int currentComponentIndex;
private long lastUsedId;
private ILSMComponentId componentId;
- private boolean initialized = false;
- public LSMComponentIdGenerator(int numComponents) {
+ public LSMComponentIdGenerator(int numComponents, long lastUsedId) {
this.numComponents = numComponents;
- }
-
- @Override
- public synchronized void init(long lastUsedId) {
this.lastUsedId = lastUsedId;
- initialized = true;
refresh();
currentComponentIndex = 0;
}
@Override
public synchronized void refresh() {
- if (!initialized) {
- throw new IllegalStateException("Attempt to refresh component id
before initialziation.");
- }
final long nextId = ++lastUsedId;
componentId = new LSMComponentId(nextId, nextId);
currentComponentIndex = (currentComponentIndex + 1) % numComponents;
@@ -59,9 +50,6 @@
@Override
public synchronized ILSMComponentId getId() {
- if (!initialized) {
- throw new IllegalStateException("Attempt to get component id
before initialziation.");
- }
return componentId;
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/2948
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I246f9373f950e2f9a2c63f86746462e42a3f1c62
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: stabilization-f69489
Gerrit-Owner: Murtadha Hubail <[email protected]>