abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2145
Change subject: [NO ISSUE][TX] Fix DatasetLock for Multiple Index Builds
......................................................................
[NO ISSUE][TX] Fix DatasetLock for Multiple Index Builds
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- The mechansim used for allowing multiple concurrent index
builds does not work if the first index builds finishes
before other index builds. It relied on a write lock
obtained by the first index builder and released by
the last index builder. This is not allowed when using
ReentrantReadWriteLock and will lead to an
IllegalMonitorStateException since the last thread to exit
did not hold the lock.
- Potential Deadlock between modify and exclusive modify can
happen when an exclusive modify attempts to upgrade the
lock to write lock while the modify lock waits for the
exclusive lock to release. This has been fixed too.
- Test cases were added.
Change-Id: I3bea3ff2075d952ab13402b0c445c464b431c0f5
---
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
A
asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/lock/MetadataLockManagerTest.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
4 files changed, 320 insertions(+), 38 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/45/2145/1
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
index 31f2089..935cdec 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/lock/DatasetLock.java
@@ -30,17 +30,24 @@
public class DatasetLock implements IMetadataLock {
private final String key;
+ // The lock
private final ReentrantReadWriteLock lock;
- private final ReentrantReadWriteLock dsReadLock;
- private final ReentrantReadWriteLock dsModifyLock;
+ // Used for lock upgrade operation
+ private final ReentrantReadWriteLock upgradeLock;
+ // Used for exclusive modification
+ private final ReentrantReadWriteLock modifyLock;
+ // The two counters below are used to ensure mutual exclusivity between
index builds and modifications
+ // order of entry indexBuildCounter -> indexModifyCounter
private final MutableInt indexBuildCounter;
+ private final MutableInt dsModifyCounter;
public DatasetLock(String key) {
this.key = key;
lock = new ReentrantReadWriteLock(true);
- dsReadLock = new ReentrantReadWriteLock(true);
- dsModifyLock = new ReentrantReadWriteLock(true);
+ upgradeLock = new ReentrantReadWriteLock(true);
+ modifyLock = new ReentrantReadWriteLock(true);
indexBuildCounter = new MutableInt(0);
+ dsModifyCounter = new MutableInt(0);
}
private void readLock() {
@@ -71,63 +78,110 @@
lock.writeLock().unlock();
}
- private void readReadLock() {
- dsReadLock.readLock().lock();
+ private void upgradeReadLock() {
+ upgradeLock.readLock().lock();
}
private void modifyReadLock() {
// insert
- dsModifyLock.readLock().lock();
+ modifyLock.readLock().lock();
+ incrementModifyCounter();
+ }
+
+ private void incrementModifyCounter() {
+ synchronized (indexBuildCounter) {
+ boolean interrupted = false;
+ while (indexBuildCounter.getValue() > 0) {
+ try {
+ indexBuildCounter.wait();
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ synchronized (dsModifyCounter) {
+ dsModifyCounter.increment();
+ }
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private void decrementModifyCounter() {
+ synchronized (indexBuildCounter) {
+ synchronized (dsModifyCounter) {
+ dsModifyCounter.decrement();
+ }
+ indexBuildCounter.notifyAll();
+ }
}
private void modifyReadUnlock() {
// insert
- dsModifyLock.readLock().unlock();
+ decrementModifyCounter();
+ modifyLock.readLock().unlock();
}
- private void readReadUnlock() {
- dsReadLock.readLock().unlock();
+ private void upgradeReadUnlock() {
+ upgradeLock.readLock().unlock();
}
- private void readWriteUnlock() {
- dsReadLock.writeLock().unlock();
+ private void upgradeWriteUnlock() {
+ upgradeLock.writeLock().unlock();
}
- private void modifySharedWriteLock() {
+ private void BuildIndexLock() {
// Build index statement
synchronized (indexBuildCounter) {
if (indexBuildCounter.getValue() > 0) {
indexBuildCounter.setValue(indexBuildCounter.getValue() + 1);
} else {
- dsModifyLock.writeLock().lock();
- indexBuildCounter.setValue(1);
+ boolean locked = false;
+ boolean interrupted = false;
+ while (!locked) {
+ synchronized (dsModifyCounter) {
+ if (dsModifyCounter.getValue() == 0) {
+
indexBuildCounter.setValue(indexBuildCounter.getValue() + 1);
+ locked = true;
+ }
+ }
+ if (!locked) {
+ try {
+ indexBuildCounter.wait();
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ }
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
}
}
}
- private void modifySharedWriteUnlock() {
+ private void BuildIndexUnlock() {
// Build index statement
synchronized (indexBuildCounter) {
- if (indexBuildCounter.getValue() == 1) {
- dsModifyLock.writeLock().unlock();
- }
indexBuildCounter.setValue(indexBuildCounter.getValue() - 1);
+ indexBuildCounter.notifyAll();
}
}
private void modifyExclusiveWriteLock() {
- dsModifyLock.writeLock().lock();
+ modifyLock.writeLock().lock();
+ incrementModifyCounter();
}
private void modifyExclusiveWriteUnlock() {
- dsModifyLock.writeLock().unlock();
+ decrementModifyCounter();
+ modifyLock.writeLock().unlock();
}
@Override
public void upgrade(IMetadataLock.Mode from, IMetadataLock.Mode to) throws
AlgebricksException {
if (from == IMetadataLock.Mode.EXCLUSIVE_MODIFY && to ==
IMetadataLock.Mode.UPGRADED_WRITE) {
- dsReadLock.readLock().unlock();
- dsReadLock.writeLock().lock();
+ upgradeLock.writeLock().lock();
} else {
throw new
MetadataException(ErrorCode.ILLEGAL_LOCK_UPGRADE_OPERATION, from, to);
}
@@ -136,8 +190,7 @@
@Override
public void downgrade(IMetadataLock.Mode from, IMetadataLock.Mode to)
throws AlgebricksException {
if (from == IMetadataLock.Mode.UPGRADED_WRITE && to ==
IMetadataLock.Mode.EXCLUSIVE_MODIFY) {
- dsReadLock.writeLock().unlock();
- dsReadLock.readLock().lock();
+ upgradeLock.writeLock().unlock();
} else {
throw new
MetadataException(ErrorCode.ILLEGAL_LOCK_DOWNGRADE_OPERATION, from, to);
}
@@ -148,16 +201,14 @@
switch (mode) {
case INDEX_BUILD:
readLock();
- modifySharedWriteLock();
+ BuildIndexLock();
break;
case MODIFY:
readLock();
- readReadLock();
modifyReadLock();
break;
case EXCLUSIVE_MODIFY:
readLock();
- readReadLock();
modifyExclusiveWriteLock();
break;
case WRITE:
@@ -165,7 +216,7 @@
break;
case READ:
readLock();
- readReadLock();
+ upgradeReadLock();
break;
default:
throw new IllegalStateException("locking mode " + mode + " is
not supported");
@@ -176,28 +227,26 @@
public void unlock(IMetadataLock.Mode mode) {
switch (mode) {
case INDEX_BUILD:
- modifySharedWriteUnlock();
+ BuildIndexUnlock();
readUnlock();
break;
case MODIFY:
modifyReadUnlock();
- readReadUnlock();
readUnlock();
break;
case EXCLUSIVE_MODIFY:
modifyExclusiveWriteUnlock();
- readReadUnlock();
readUnlock();
break;
case WRITE:
writeUnlock();
break;
case READ:
- readReadUnlock();
+ upgradeReadUnlock();
readUnlock();
break;
case UPGRADED_WRITE:
- readWriteUnlock();
+ upgradeWriteUnlock();
modifyExclusiveWriteUnlock();
readUnlock();
break;
diff --git
a/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/lock/MetadataLockManagerTest.java
b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/lock/MetadataLockManagerTest.java
new file mode 100644
index 0000000..6697448
--- /dev/null
+++
b/asterixdb/asterix-metadata/src/test/java/org/apache/asterix/metadata/lock/MetadataLockManagerTest.java
@@ -0,0 +1,228 @@
+package org.apache.asterix.metadata.lock;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+
+import org.apache.asterix.common.metadata.LockList;
+import org.apache.hyracks.api.util.SingleThreadEventProcessor;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class MetadataLockManagerTest {
+
+ static final int REPREAT_TEST_COUNT = 10;
+
+ @Parameterized.Parameters
+ public static List<Object[]> data() {
+ return Arrays.asList(new Object[REPREAT_TEST_COUNT][0]);
+ }
+
+ private static class Request {
+ private enum Statement {
+ INDEX,
+ MODIFY,
+ EXCLUSIVE_MODIFY_UPGRADE_DOWNGRADE,
+ EXCLUSIVE_MODIFY_UPGRADE,
+ }
+
+ private final Statement statement;
+ private final String dataset;
+ private boolean done;
+ private int step = 0;
+
+ public Request(Statement statement, String dataset) {
+ this.statement = statement;
+ this.dataset = dataset;
+ done = false;
+ }
+
+ Statement statement() {
+ return statement;
+ }
+
+ String dataset() {
+ return dataset;
+ }
+
+ synchronized void complete() {
+ done = true;
+ notifyAll();
+ }
+
+ synchronized void await() throws InterruptedException {
+ while (!done) {
+ wait();
+ }
+ }
+
+ synchronized void step() {
+ step++;
+ notifyAll();
+ }
+
+ synchronized int getSteps() {
+ return step;
+ }
+
+ synchronized void await(int step) throws InterruptedException {
+ while (this.step < step) {
+ wait();
+ }
+ }
+ }
+
+ public class User extends SingleThreadEventProcessor<Request> {
+
+ private MetadataLockManager lockManager;
+ private Semaphore step = new Semaphore(0);
+ private final LockList locks = new LockList();
+
+ public User(String username, MetadataLockManager lockManager) {
+ super(username);
+ this.lockManager = lockManager;
+ }
+
+ public void step() {
+ step.release();
+ }
+
+ @Override
+ protected void handle(Request req) throws Exception {
+ try {
+ step.acquire();
+ switch (req.statement()) {
+ case INDEX:
+ lockManager.acquireDatasetCreateIndexLock(locks,
req.dataset());
+ break;
+ case MODIFY:
+ lockManager.acquireDatasetModifyLock(locks,
req.dataset());
+ break;
+ case EXCLUSIVE_MODIFY_UPGRADE:
+
lockManager.acquireDatasetExclusiveModificationLock(locks, req.dataset());
+ step.acquire();
+ lockManager.upgradeDatasetLockToWrite(locks,
req.dataset());
+ break;
+ case EXCLUSIVE_MODIFY_UPGRADE_DOWNGRADE:
+
lockManager.acquireDatasetExclusiveModificationLock(locks, req.dataset());
+ step.acquire();
+ lockManager.upgradeDatasetLockToWrite(locks,
req.dataset());
+ step.acquire();
+
lockManager.downgradeDatasetLockToExclusiveModify(locks, req.dataset());
+ break;
+ default:
+ break;
+ }
+ req.step();
+ step.acquire();
+ } finally {
+ locks.reset();
+ req.step();
+ req.complete();
+ }
+ }
+
+ }
+
+ @Test
+ public void testDatasetLockMultipleIndexBuilds() throws Exception {
+ MetadataLockManager lockManager = new MetadataLockManager();
+ String dataset = "Dataset";
+ User[] users = new User[3];
+ Request[] requests = new Request[3];
+ users[0] = new User("till", lockManager);
+ requests[0] = new Request(Request.Statement.INDEX, dataset);
+ users[1] = new User("dmitry", lockManager);
+ requests[1] = new Request(Request.Statement.INDEX, dataset);
+ users[2] = new User("mike", lockManager);
+ requests[2] = new Request(Request.Statement.MODIFY, dataset);
+ // Till builds an index
+ users[0].add(requests[0]);
+ // Dmitry builds an index
+ users[1].add(requests[1]);
+ // Mike Modifies
+ users[2].add(requests[2]);
+ // Till starts
+ users[0].step();
+ // Ensure lock acquired
+ requests[0].await(1);
+ // Dmitry starts
+ users[1].step();
+ // Ensure lock acquired
+ requests[1].await(1);
+ // Mike starts and is allowed to go all the way
+ users[2].step();
+ users[2].step();
+ // Ensure that Mike still could not acquire locks
+ Assert.assertEquals(0, requests[2].getSteps());
+ // Till finishes first
+ users[0].step();
+ // Ensure the request has been completed and lock has been released
+ requests[0].await();
+ // Ensure that Mike still could not acquire locks
+ Assert.assertEquals(0, requests[2].getSteps());
+ // Dmitry finishes second
+ users[1].step();
+ // Ensure the request has been completed and lock has been released
+ requests[1].await();
+ // Ensure that Mike could proceed and request has been completed
+ requests[2].await();
+ // Stop users
+ for (int i = 0; i < users.length; i++) {
+ users[i].stop();
+ }
+ }
+
+ @Test
+ public void testDatasetLockMultipleModifiers() throws Exception {
+ MetadataLockManager lockManager = new MetadataLockManager();
+ String dataset = "Dataset";
+ User[] users = new User[3];
+ Request[] requests = new Request[3];
+ users[0] = new User("till", lockManager);
+ requests[0] = new Request(Request.Statement.MODIFY, dataset);
+ users[1] = new User("dmitry", lockManager);
+ requests[1] = new Request(Request.Statement.MODIFY, dataset);
+ users[2] = new User("mike", lockManager);
+ requests[2] = new Request(Request.Statement.INDEX, dataset);
+ // Till builds an index
+ users[0].add(requests[0]);
+ // Dmitry builds an index
+ users[1].add(requests[1]);
+ // Mike Modifies
+ users[2].add(requests[2]);
+ // Till starts
+ users[0].step();
+ // Ensure lock acquired
+ requests[0].await(1);
+ // Dmitry starts
+ users[1].step();
+ // Ensure lock acquired
+ requests[1].await(1);
+ // Mike starts and is allowed to go all the way
+ users[2].step();
+ users[2].step();
+ // Ensure that Mike still could not acquire locks
+ Assert.assertEquals(0, requests[2].getSteps());
+ // Till finishes first
+ users[0].step();
+ // Ensure the request has been completed and lock has been released
+ requests[0].await();
+ // Ensure that Mike still could not acquire locks
+ Assert.assertEquals(0, requests[2].getSteps());
+ // Dmitry finishes second
+ users[1].step();
+ // Ensure the request has been completed and lock has been released
+ requests[1].await();
+ // Ensure that Mike could proceed and request has been completed
+ requests[2].await();
+ // Stop users
+ for (int i = 0; i < users.length; i++) {
+ users[i].stop();
+ }
+ }
+
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
index 4517730..7ae7cbf 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java
@@ -38,7 +38,7 @@
return (HyracksDataException) cause;
} else if (cause instanceof Error) {
// don't wrap errors, allow them to propagate
- throw (Error)cause;
+ throw (Error) cause;
} else if (cause instanceof InterruptedException &&
!Thread.currentThread().isInterrupted()) {
// TODO(mblow): why not force interrupt on current thread?
LOGGER.log(Level.WARNING,
@@ -76,6 +76,12 @@
public HyracksDataException(String component, int errorCode, String
message, Throwable cause, String nodeId,
Serializable... params) {
super(component, errorCode, message, cause, nodeId, params);
+ }
+
+ public HyracksDataException(String component, int errorCode, String
message, Throwable cause, String nodeId,
+ StackTraceElement[] stackTrace, Serializable... params) {
+ super(component, errorCode, message, cause, nodeId, params);
+ setStackTrace(stackTrace);
}
/**
@@ -141,6 +147,6 @@
public static HyracksDataException create(HyracksDataException e, String
nodeId) {
return new HyracksDataException(e.getComponent(), e.getErrorCode(),
e.getMessage(), e.getCause(), nodeId,
- e.getParams());
+ e.getStackTrace(), e.getParams());
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
index 350984c..af5c102 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java
@@ -72,8 +72,7 @@
break;
case NODE_HEARTBEAT:
CCNCFunctions.NodeHeartbeatFunction nhf =
(CCNCFunctions.NodeHeartbeatFunction) fn;
- ccs.getWorkQueue().schedule(new NodeHeartbeatWork(ccs,
nhf.getNodeId(),
- nhf.getHeartbeatData()));
+ ccs.getExecutor().execute(new NodeHeartbeatWork(ccs,
nhf.getNodeId(), nhf.getHeartbeatData()));
break;
case NOTIFY_JOBLET_CLEANUP:
CCNCFunctions.NotifyJobletCleanupFunction njcf =
(CCNCFunctions.NotifyJobletCleanupFunction) fn;
--
To view, visit https://asterix-gerrit.ics.uci.edu/2145
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I3bea3ff2075d952ab13402b0c445c464b431c0f5
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>