keith-turner commented on a change in pull request #2329:
URL: https://github.com/apache/accumulo/pull/2329#discussion_r745060098
##########
File path:
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +228,69 @@ public boolean tryLock() {
}
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
- if (!iterator.hasNext())
+ if (!iterator.hasNext()) {
throw new IllegalStateException("Did not find our own lock in the
queue: " + this.entry
+ " userData " + new String(this.userData, UTF_8) + " lockType " +
lockType());
- return iterator.next().getKey().equals(entry);
+ }
+ if (iterator.next().getKey().equals(entry)) {
+ return true;
+ }
+ if (failBlockers) {
+ // Loop through all of the prior transactions that are waiting to
+ // acquire this write lock. If the transaction has not succeeded or
failed,
+ // then fail it and return false from this method so that
Utils.reserveX()
+ // will call this method again and will re-check the prior
transactions.
+ boolean result = true;
+ while (iterator.hasNext()) {
+ Entry<Long,byte[]> e = iterator.next();
+ Long txid = e.getKey();
+ if (!txid.equals(entry)) {
+ if (store.isReserved(txid)) {
+ result &= false;
+ } else {
+ store.reserve(txid);
+ TStatus status = store.getStatus(txid);
+ if (status.equals(TStatus.FAILED)) {
+ result = false;
+ continue;
+ } else {
+ switch (status) {
+ case FAILED_IN_PROGRESS:
+ case IN_PROGRESS:
+ store.setStatus(txid, TStatus.FAILED_IN_PROGRESS);
+ result &= false;
+ break;
+ default:
+ log.error("Attempting to fail a transaction in an unhandled
state: {}", status);
+ }
+ }
+ store.unreserve(txid, 0);
+ }
+ } else {
+ return result && txid.equals(entry);
Review comment:
```suggestion
break;
```
##########
File path:
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +228,69 @@ public boolean tryLock() {
}
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
- if (!iterator.hasNext())
+ if (!iterator.hasNext()) {
throw new IllegalStateException("Did not find our own lock in the
queue: " + this.entry
+ " userData " + new String(this.userData, UTF_8) + " lockType " +
lockType());
- return iterator.next().getKey().equals(entry);
+ }
+ if (iterator.next().getKey().equals(entry)) {
+ return true;
+ }
+ if (failBlockers) {
+ // Loop through all of the prior transactions that are waiting to
+ // acquire this write lock. If the transaction has not succeeded or
failed,
+ // then fail it and return false from this method so that
Utils.reserveX()
+ // will call this method again and will re-check the prior
transactions.
+ boolean result = true;
+ while (iterator.hasNext()) {
+ Entry<Long,byte[]> e = iterator.next();
+ Long txid = e.getKey();
+ if (!txid.equals(entry)) {
+ if (store.isReserved(txid)) {
+ result &= false;
+ } else {
+ store.reserve(txid);
+ TStatus status = store.getStatus(txid);
+ if (status.equals(TStatus.FAILED)) {
+ result = false;
+ continue;
+ } else {
+ switch (status) {
+ case FAILED_IN_PROGRESS:
+ case IN_PROGRESS:
+ store.setStatus(txid, TStatus.FAILED_IN_PROGRESS);
+ result &= false;
+ break;
+ default:
+ log.error("Attempting to fail a transaction in an unhandled
state: {}", status);
+ }
+ }
+ store.unreserve(txid, 0);
+ }
+ } else {
+ return result && txid.equals(entry);
+ }
+ }
+ return result;
Review comment:
```suggestion
```
##########
File path:
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +228,69 @@ public boolean tryLock() {
}
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
- if (!iterator.hasNext())
+ if (!iterator.hasNext()) {
throw new IllegalStateException("Did not find our own lock in the
queue: " + this.entry
+ " userData " + new String(this.userData, UTF_8) + " lockType " +
lockType());
- return iterator.next().getKey().equals(entry);
+ }
+ if (iterator.next().getKey().equals(entry)) {
+ return true;
+ }
+ if (failBlockers) {
+ // Loop through all of the prior transactions that are waiting to
+ // acquire this write lock. If the transaction has not succeeded or
failed,
+ // then fail it and return false from this method so that
Utils.reserveX()
+ // will call this method again and will re-check the prior
transactions.
+ boolean result = true;
Review comment:
I was thinking we should drop this and always return false if we are not
the first, this seems the safest to avoid concurrency issues.
```suggestion
```
##########
File path:
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +228,69 @@ public boolean tryLock() {
}
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
- if (!iterator.hasNext())
+ if (!iterator.hasNext()) {
throw new IllegalStateException("Did not find our own lock in the
queue: " + this.entry
+ " userData " + new String(this.userData, UTF_8) + " lockType " +
lockType());
- return iterator.next().getKey().equals(entry);
+ }
+ if (iterator.next().getKey().equals(entry)) {
+ return true;
+ }
+ if (failBlockers) {
+ // Loop through all of the prior transactions that are waiting to
+ // acquire this write lock. If the transaction has not succeeded or
failed,
+ // then fail it and return false from this method so that
Utils.reserveX()
+ // will call this method again and will re-check the prior
transactions.
+ boolean result = true;
+ while (iterator.hasNext()) {
+ Entry<Long,byte[]> e = iterator.next();
+ Long txid = e.getKey();
+ if (!txid.equals(entry)) {
+ if (store.isReserved(txid)) {
+ result &= false;
+ } else {
+ store.reserve(txid);
+ TStatus status = store.getStatus(txid);
+ if (status.equals(TStatus.FAILED)) {
+ result = false;
+ continue;
+ } else {
+ switch (status) {
+ case FAILED_IN_PROGRESS:
+ case IN_PROGRESS:
+ store.setStatus(txid, TStatus.FAILED_IN_PROGRESS);
+ result &= false;
+ break;
+ default:
+ log.error("Attempting to fail a transaction in an unhandled
state: {}", status);
+ }
+ }
+ store.unreserve(txid, 0);
Review comment:
should probably do something like
```java
if(store.tryReserve(txid)){
try{
// attempt to cancel
} finally {
// always unresrve even if there is an exception
store.unreserve(txid, 0);
}
}
```
##########
File path:
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +228,69 @@ public boolean tryLock() {
}
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
- if (!iterator.hasNext())
+ if (!iterator.hasNext()) {
throw new IllegalStateException("Did not find our own lock in the
queue: " + this.entry
+ " userData " + new String(this.userData, UTF_8) + " lockType " +
lockType());
- return iterator.next().getKey().equals(entry);
+ }
+ if (iterator.next().getKey().equals(entry)) {
+ return true;
+ }
+ if (failBlockers) {
+ // Loop through all of the prior transactions that are waiting to
+ // acquire this write lock. If the transaction has not succeeded or
failed,
+ // then fail it and return false from this method so that
Utils.reserveX()
+ // will call this method again and will re-check the prior
transactions.
+ boolean result = true;
+ while (iterator.hasNext()) {
+ Entry<Long,byte[]> e = iterator.next();
+ Long txid = e.getKey();
+ if (!txid.equals(entry)) {
+ if (store.isReserved(txid)) {
+ result &= false;
+ } else {
+ store.reserve(txid);
Review comment:
The check for reserved and then reserving is a racy. Could add a new
method that tries to reserve atomically.
```suggestion
if (store.tryReserve(txid)) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]