keith-turner commented on a change in pull request #2329:
URL: https://github.com/apache/accumulo/pull/2329#discussion_r741111823
##########
File path:
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +233,82 @@ public boolean tryLock() {
}
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
- if (!iterator.hasNext())
+ if (!iterator.hasNext()) {
throw new IllegalStateException("Did not find our own lock in the
queue: " + this.entry
+ " userData " + new String(this.userData, UTF_8) + " lockType " +
lockType());
- return iterator.next().getKey().equals(entry);
+ }
+ if (!failBlockers) {
+ return iterator.next().getKey().equals(entry);
+ } else {
+ ZooStore<DistributedReadWriteLock> zs;
+ try {
+ zs = new ZooStore<>(zooPath, zrw);
Review comment:
Need to use the same ZooStore object that the manager/Fate framework is
using, or will not get mutual exclusion when reserving transactions. Look at
the impl of zs.reserve(long), by creating a new object will be using a
different hashset and synchronize on diff obj monitors than the manager/Fate
framework.
##########
File path:
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +233,82 @@ public boolean tryLock() {
}
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
- if (!iterator.hasNext())
+ if (!iterator.hasNext()) {
throw new IllegalStateException("Did not find our own lock in the
queue: " + this.entry
+ " userData " + new String(this.userData, UTF_8) + " lockType " +
lockType());
- return iterator.next().getKey().equals(entry);
+ }
+ if (!failBlockers) {
+ return iterator.next().getKey().equals(entry);
+ } else {
+ ZooStore<DistributedReadWriteLock> zs;
+ try {
+ zs = new ZooStore<>(zooPath, zrw);
+ } catch (KeeperException | InterruptedException e1) {
+ log.error("Error creating zoo store", e1);
+ return false;
+ }
+ // Loop through all of the prior transactions that are waiting to
+ // acquire this write lock. If the transaction has not succeeded or
failed,
+ // then fail it and return false from this method so that
Utils.reserveX()
+ // will call this method again and will re-check the prior
transactions.
+ boolean result = true;
+ while (iterator.hasNext()) {
+ Entry<Long,byte[]> e = iterator.next();
+ Long txid = e.getKey();
+ if (!txid.equals(entry)) {
+ if (zs.isReserved(txid)) {
+ result &= false;
+ } else {
+ zs.reserve(txid);
+ TStatus status = zs.getStatus(txid);
+ if (status.equals(TStatus.FAILED)) {
+ result = false;
+ continue;
+ } else {
+ switch (status) {
+ case UNKNOWN:
+ // not sure what to do here, we have an invalid txid
+ break;
+ case SUCCESSFUL:
+ // already succeeded, lock should be removed
+ break;
+ case FAILED:
+ case FAILED_IN_PROGRESS:
+ // transaction failed or in process of failing
+ break;
+ default:
+ zs.setStatus(txid, TStatus.FAILED_IN_PROGRESS);
+ result &= false;
+ }
+ }
+ zs.unreserve(txid, 0);
+ }
+ } else {
+ return result && txid.equals(entry);
+ }
+ }
+ return result;
Review comment:
The following high level structure would be nice, where if we are first
in the queue return true no matter what fail blockers is... also never return
true unless we are the 1st in the queue. Can't really apply this suggestion,
just used a suggestion to make clear the scope I was thinking about.
```suggestion
if( iterator.next().getKey().equals(entry)){
return true;
}
if (failBlockers) {
// attempt to fail anything before this lock entry in the
queue.. but never return true unless ours is the 1st
}
return false;
```
##########
File path:
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +233,82 @@ public boolean tryLock() {
}
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
- if (!iterator.hasNext())
+ if (!iterator.hasNext()) {
throw new IllegalStateException("Did not find our own lock in the
queue: " + this.entry
+ " userData " + new String(this.userData, UTF_8) + " lockType " +
lockType());
- return iterator.next().getKey().equals(entry);
+ }
+ if (!failBlockers) {
+ return iterator.next().getKey().equals(entry);
+ } else {
+ ZooStore<DistributedReadWriteLock> zs;
+ try {
+ zs = new ZooStore<>(zooPath, zrw);
+ } catch (KeeperException | InterruptedException e1) {
+ log.error("Error creating zoo store", e1);
+ return false;
+ }
+ // Loop through all of the prior transactions that are waiting to
+ // acquire this write lock. If the transaction has not succeeded or
failed,
+ // then fail it and return false from this method so that
Utils.reserveX()
+ // will call this method again and will re-check the prior
transactions.
+ boolean result = true;
+ while (iterator.hasNext()) {
+ Entry<Long,byte[]> e = iterator.next();
+ Long txid = e.getKey();
+ if (!txid.equals(entry)) {
+ if (zs.isReserved(txid)) {
+ result &= false;
+ } else {
+ zs.reserve(txid);
+ TStatus status = zs.getStatus(txid);
+ if (status.equals(TStatus.FAILED)) {
+ result = false;
+ continue;
+ } else {
+ switch (status) {
+ case UNKNOWN:
+ // not sure what to do here, we have an invalid txid
+ break;
+ case SUCCESSFUL:
+ // already succeeded, lock should be removed
+ break;
+ case FAILED:
+ case FAILED_IN_PROGRESS:
Review comment:
Investigation would be needed to confirm this, but I suspect we should
only expect to see IN_PROGRESS and FAILED_IN_PROGRESS transactions actually
holding the lock. Anything else seems like and indication of a more general
problem that we should not attempt to react to here. We could log a warn/error
if we see them, but it may not be a good idea to let the lock be acquired when
the system is an unexpected state. This kinda goes w/ my other comment of
always returning false when we are not first in the queue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]