>From Murtadha Al Hubail <[email protected]>:

Murtadha Al Hubail has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17633 )


Change subject: [NO ISSUE][OTH] Allow waiting for IO on specific dataset 
partition
......................................................................

[NO ISSUE][OTH] Allow waiting for IO on specific dataset partition

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:

- Add API to wait for IO on a specific dataset partition.
- When waiting for a partition replica IO ops to finish, only wait
  for the replica partition rather than all partitions.

Change-Id: I90f311f602b3c8526556f64d7b25672981fac320
---
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
M 
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
8 files changed, 68 insertions(+), 16 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/33/17633/1

diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 1c2a047..c7eee21 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -161,12 +161,14 @@
     void flushDataset(IReplicationStrategy replicationStrategy, IntPredicate 
partitions) throws HyracksDataException;

     /**
-     * Waits for all ongoing IO operations on all open datasets that are 
matching {@code replicationStrategy}.
+     * Waits for all ongoing IO operations on all open datasets that are 
matching {@code replicationStrategy} and
+     * {@code partition}.
      *
      * @param replicationStrategy
+     * @param partition
      * @throws HyracksDataException
      */
-    void waitForIO(IReplicationStrategy replicationStrategy) throws 
HyracksDataException;
+    void waitForIO(IReplicationStrategy replicationStrategy, int partition) 
throws HyracksDataException;

     /**
      * @return the current datasets io stats
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 5964bb4..7d3dba4 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -33,17 +33,19 @@
     private static final Logger LOGGER = LogManager.getLogger();
     protected final int datasetID;
     protected final DatasetInfo dsInfo;
+    protected final int partition;

-    public BaseOperationTracker(int datasetID, DatasetInfo dsInfo) {
+    public BaseOperationTracker(int datasetID, DatasetInfo dsInfo, int 
partition) {
         this.datasetID = datasetID;
         this.dsInfo = dsInfo;
+        this.partition = partition;
     }

     @Override
     public void beforeOperation(ILSMIndex index, LSMOperationType opType, 
ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws 
HyracksDataException {
         if (opType == LSMOperationType.REPLICATE) {
-            dsInfo.declareActiveIOOperation(REPLICATE);
+            dsInfo.declareActiveIOOperation(REPLICATE, partition);
         }
     }

@@ -59,7 +61,7 @@
     public void completeOperation(ILSMIndex index, LSMOperationType opType, 
ISearchOperationCallback searchCallback,
             IModificationOperationCallback modificationCallback) throws 
HyracksDataException {
         if (opType == LSMOperationType.REPLICATE) {
-            dsInfo.undeclareActiveIOOperation(REPLICATE);
+            dsInfo.undeclareActiveIOOperation(REPLICATE, partition);
         }
     }

diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index d15d9be..87a3c2f 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -33,12 +33,16 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;

+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
 public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
     private static final Logger LOGGER = LogManager.getLogger();
     // partition -> index
     private final Map<Integer, Set<IndexInfo>> partitionIndexes;
     // resourceID -> index
     private final Map<Long, IndexInfo> indexes;
+    private final Int2IntMap partitionPendingIO;
     private final int datasetID;
     private final ILogManager logManager;
     private final LogRecord waitLog = new LogRecord();
@@ -54,6 +58,7 @@
     public DatasetInfo(int datasetID, ILogManager logManager) {
         this.partitionIndexes = new HashMap<>();
         this.indexes = new HashMap<>();
+        this.partitionPendingIO = new Int2IntOpenHashMap();
         this.setLastAccess(-1);
         this.datasetID = datasetID;
         this.setRegistered(false);
@@ -74,7 +79,8 @@
         setLastAccess(System.currentTimeMillis());
     }

-    public synchronized void 
declareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) {
+    public synchronized void 
declareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType, int 
partition) {
+        partitionPendingIO.put(partition, 
partitionPendingIO.getOrDefault(partition, 0) + 1);
         numActiveIOOps++;
         switch (opType) {
             case FLUSH:
@@ -91,7 +97,8 @@
         }
     }

-    public synchronized void 
undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType) {
+    public synchronized void 
undeclareActiveIOOperation(ILSMIOOperation.LSMIOOperationType opType, int 
partition) {
+        partitionPendingIO.put(partition, 
partitionPendingIO.getOrDefault(partition, 0) - 1);
         numActiveIOOps--;
         switch (opType) {
             case FLUSH:
@@ -253,6 +260,26 @@
         }
     }

+    public void waitForIO(int partition) throws HyracksDataException {
+        logManager.log(waitLog);
+        synchronized (this) {
+            while (partitionPendingIO.getOrDefault(partition, 0) > 0) {
+                try {
+                    wait();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw HyracksDataException.create(e);
+                }
+            }
+            if (partitionPendingIO.getOrDefault(partition, 0) < 0) {
+                LOGGER.error("number of IO operations cannot be negative for 
dataset {}, partition {}", this,
+                        partition);
+                throw new IllegalStateException(
+                        "Number of IO operations cannot be negative: " + this 
+ ", partition " + partition);
+            }
+        }
+    }
+
     public synchronized int getPendingFlushes() {
         return pendingFlushes;
     }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 117b4fc..4fc9dd6 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -552,10 +552,10 @@
     }

     @Override
-    public void waitForIO(IReplicationStrategy replicationStrategy) throws 
HyracksDataException {
+    public void waitForIO(IReplicationStrategy replicationStrategy, int 
partition) throws HyracksDataException {
         for (DatasetResource dsr : datasets.values()) {
             if (dsr.isOpen() && 
replicationStrategy.isMatch(dsr.getDatasetID())) {
-                dsr.getDatasetInfo().waitForIO();
+                dsr.getDatasetInfo().waitForIO(partition);
             }
         }
     }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index b0d8e02..a4ad7cf 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -59,7 +59,6 @@
 @NotThreadSafe
 public class PrimaryIndexOperationTracker extends BaseOperationTracker 
implements IoOperationCompleteListener {
     private static final Logger LOGGER = LogManager.getLogger();
-    private final int partition;
     // Number of active operations on an ILSMIndex instance.
     private final AtomicInteger numActiveOperations;
     private final ILogManager logManager;
@@ -71,8 +70,7 @@

     public PrimaryIndexOperationTracker(int datasetID, int partition, 
ILogManager logManager, DatasetInfo dsInfo,
             ILSMComponentIdGenerator idGenerator) {
-        super(datasetID, dsInfo);
-        this.partition = partition;
+        super(datasetID, dsInfo, partition);
         this.logManager = logManager;
         this.numActiveOperations = new AtomicInteger();
         this.idGenerator = idGenerator;
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 1189b51..f56e5c0 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -74,6 +74,7 @@
     private final IIndexCheckpointManagerProvider 
indexCheckpointManagerProvider;
     protected final DatasetInfo dsInfo;
     protected final ILSMIndex lsmIndex;
+    private final int partition;
     private long firstLsnForCurrentMemoryComponent = 0L;
     private long persistenceLsn = 0L;
     private int pendingFlushes = 0;
@@ -84,6 +85,7 @@
         this.dsInfo = dsInfo;
         this.lsmIndex = lsmIndex;
         this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
+        this.partition = 
ResourceReference.ofIndex(lsmIndex.getIndexIdentifier()).getPartitionNum();
         componentIds.add(componentId);
     }

@@ -259,7 +261,7 @@

     @Override
     public synchronized void scheduled(ILSMIOOperation operation) throws 
HyracksDataException {
-        dsInfo.declareActiveIOOperation(operation.getIOOpertionType());
+        dsInfo.declareActiveIOOperation(operation.getIOOpertionType(), 
partition);
         if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
             pendingFlushes++;
             FlushOperation flush = (FlushOperation) operation;
@@ -282,7 +284,7 @@
                         pendingFlushes == 0 ? 
firstLsnForCurrentMemoryComponent : (Long) map.get(KEY_FLUSH_LOG_LSN);
             }
         }
-        dsInfo.undeclareActiveIOOperation(operation.getIOOpertionType());
+        dsInfo.undeclareActiveIOOperation(operation.getIOOpertionType(), 
partition);
     }

     public synchronized boolean hasPendingFlush() {
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 459ff01..68ccd54 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -103,6 +103,6 @@
     private void waitForReplicatedDatasetsIO() throws HyracksDataException {
         // wait for IO operations to ensure replicated datasets files won't 
change during replica sync
         final IReplicationStrategy replStrategy = 
appCtx.getReplicationManager().getReplicationStrategy();
-        appCtx.getDatasetLifecycleManager().waitForIO(replStrategy);
+        appCtx.getDatasetLifecycleManager().waitForIO(replStrategy, 
replica.getIdentifier().getPartition());
     }
 }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
index a104ae3..827b713 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexOperationTrackerFactory.java
@@ -21,6 +21,7 @@
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.context.BaseOperationTracker;
+import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IJsonSerializable;
@@ -46,7 +47,8 @@
     public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, 
IResource resource) {
         IDatasetLifecycleManager dslcManager =
                 ((INcApplicationContext) 
ctx.getApplicationContext()).getDatasetLifecycleManager();
-        return new BaseOperationTracker(datasetId, 
dslcManager.getDatasetInfo(datasetId));
+        int partition = 
StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
+        return new BaseOperationTracker(datasetId, 
dslcManager.getDatasetInfo(datasetId), partition);
     }

     @Override

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17633
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: neo
Gerrit-Change-Id: I90f311f602b3c8526556f64d7b25672981fac320
Gerrit-Change-Number: 17633
Gerrit-PatchSet: 1
Gerrit-Owner: Murtadha Al Hubail <[email protected]>
Gerrit-MessageType: newchange

Reply via email to