keith-turner commented on a change in pull request #2329:
URL: https://github.com/apache/accumulo/pull/2329#discussion_r745060098



##########
File path: 
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +228,69 @@ public boolean tryLock() {
       }
       SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
       Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
-      if (!iterator.hasNext())
+      if (!iterator.hasNext()) {
         throw new IllegalStateException("Did not find our own lock in the 
queue: " + this.entry
             + " userData " + new String(this.userData, UTF_8) + " lockType " + 
lockType());
-      return iterator.next().getKey().equals(entry);
+      }
+      if (iterator.next().getKey().equals(entry)) {
+        return true;
+      }
+      if (failBlockers) {
+        // Loop through all of the prior transactions that are waiting to
+        // acquire this write lock. If the transaction has not succeeded or 
failed,
+        // then fail it and return false from this method so that 
Utils.reserveX()
+        // will call this method again and will re-check the prior 
transactions.
+        boolean result = true;
+        while (iterator.hasNext()) {
+          Entry<Long,byte[]> e = iterator.next();
+          Long txid = e.getKey();
+          if (!txid.equals(entry)) {
+            if (store.isReserved(txid)) {
+              result &= false;
+            } else {
+              store.reserve(txid);
+              TStatus status = store.getStatus(txid);
+              if (status.equals(TStatus.FAILED)) {
+                result = false;
+                continue;
+              } else {
+                switch (status) {
+                  case FAILED_IN_PROGRESS:
+                  case IN_PROGRESS:
+                    store.setStatus(txid, TStatus.FAILED_IN_PROGRESS);
+                    result &= false;
+                    break;
+                  default:
+                   log.error("Attempting to fail a transaction in an unhandled 
state: {}", status);
+                }
+              }
+              store.unreserve(txid, 0);
+            }
+          } else {
+            return result && txid.equals(entry);

Review comment:
       ```suggestion
              break;
   ```

##########
File path: 
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +228,69 @@ public boolean tryLock() {
       }
       SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
       Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
-      if (!iterator.hasNext())
+      if (!iterator.hasNext()) {
         throw new IllegalStateException("Did not find our own lock in the 
queue: " + this.entry
             + " userData " + new String(this.userData, UTF_8) + " lockType " + 
lockType());
-      return iterator.next().getKey().equals(entry);
+      }
+      if (iterator.next().getKey().equals(entry)) {
+        return true;
+      }
+      if (failBlockers) {
+        // Loop through all of the prior transactions that are waiting to
+        // acquire this write lock. If the transaction has not succeeded or 
failed,
+        // then fail it and return false from this method so that 
Utils.reserveX()
+        // will call this method again and will re-check the prior 
transactions.
+        boolean result = true;
+        while (iterator.hasNext()) {
+          Entry<Long,byte[]> e = iterator.next();
+          Long txid = e.getKey();
+          if (!txid.equals(entry)) {
+            if (store.isReserved(txid)) {
+              result &= false;
+            } else {
+              store.reserve(txid);
+              TStatus status = store.getStatus(txid);
+              if (status.equals(TStatus.FAILED)) {
+                result = false;
+                continue;
+              } else {
+                switch (status) {
+                  case FAILED_IN_PROGRESS:
+                  case IN_PROGRESS:
+                    store.setStatus(txid, TStatus.FAILED_IN_PROGRESS);
+                    result &= false;
+                    break;
+                  default:
+                   log.error("Attempting to fail a transaction in an unhandled 
state: {}", status);
+                }
+              }
+              store.unreserve(txid, 0);
+            }
+          } else {
+            return result && txid.equals(entry);
+          }
+        }
+        return result;

Review comment:
       ```suggestion
   ```

##########
File path: 
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +228,69 @@ public boolean tryLock() {
       }
       SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
       Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
-      if (!iterator.hasNext())
+      if (!iterator.hasNext()) {
         throw new IllegalStateException("Did not find our own lock in the 
queue: " + this.entry
             + " userData " + new String(this.userData, UTF_8) + " lockType " + 
lockType());
-      return iterator.next().getKey().equals(entry);
+      }
+      if (iterator.next().getKey().equals(entry)) {
+        return true;
+      }
+      if (failBlockers) {
+        // Loop through all of the prior transactions that are waiting to
+        // acquire this write lock. If the transaction has not succeeded or 
failed,
+        // then fail it and return false from this method so that 
Utils.reserveX()
+        // will call this method again and will re-check the prior 
transactions.
+        boolean result = true;

Review comment:
       I was thinking we should drop this and always return false if we are not 
the first, this seems the safest to avoid concurrency issues.
   
   ```suggestion
   ```

##########
File path: 
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +228,69 @@ public boolean tryLock() {
       }
       SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
       Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
-      if (!iterator.hasNext())
+      if (!iterator.hasNext()) {
         throw new IllegalStateException("Did not find our own lock in the 
queue: " + this.entry
             + " userData " + new String(this.userData, UTF_8) + " lockType " + 
lockType());
-      return iterator.next().getKey().equals(entry);
+      }
+      if (iterator.next().getKey().equals(entry)) {
+        return true;
+      }
+      if (failBlockers) {
+        // Loop through all of the prior transactions that are waiting to
+        // acquire this write lock. If the transaction has not succeeded or 
failed,
+        // then fail it and return false from this method so that 
Utils.reserveX()
+        // will call this method again and will re-check the prior 
transactions.
+        boolean result = true;
+        while (iterator.hasNext()) {
+          Entry<Long,byte[]> e = iterator.next();
+          Long txid = e.getKey();
+          if (!txid.equals(entry)) {
+            if (store.isReserved(txid)) {
+              result &= false;
+            } else {
+              store.reserve(txid);
+              TStatus status = store.getStatus(txid);
+              if (status.equals(TStatus.FAILED)) {
+                result = false;
+                continue;
+              } else {
+                switch (status) {
+                  case FAILED_IN_PROGRESS:
+                  case IN_PROGRESS:
+                    store.setStatus(txid, TStatus.FAILED_IN_PROGRESS);
+                    result &= false;
+                    break;
+                  default:
+                   log.error("Attempting to fail a transaction in an unhandled 
state: {}", status);
+                }
+              }
+              store.unreserve(txid, 0);

Review comment:
       should probably do something like
   
   ```java
   if(store.tryReserve(txid)){
        try{
              // attempt to cancel
        } finally {
             // always unresrve even if there is an exception
             store.unreserve(txid, 0);
        }
   }
   ```

##########
File path: 
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +228,69 @@ public boolean tryLock() {
       }
       SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
       Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
-      if (!iterator.hasNext())
+      if (!iterator.hasNext()) {
         throw new IllegalStateException("Did not find our own lock in the 
queue: " + this.entry
             + " userData " + new String(this.userData, UTF_8) + " lockType " + 
lockType());
-      return iterator.next().getKey().equals(entry);
+      }
+      if (iterator.next().getKey().equals(entry)) {
+        return true;
+      }
+      if (failBlockers) {
+        // Loop through all of the prior transactions that are waiting to
+        // acquire this write lock. If the transaction has not succeeded or 
failed,
+        // then fail it and return false from this method so that 
Utils.reserveX()
+        // will call this method again and will re-check the prior 
transactions.
+        boolean result = true;
+        while (iterator.hasNext()) {
+          Entry<Long,byte[]> e = iterator.next();
+          Long txid = e.getKey();
+          if (!txid.equals(entry)) {
+            if (store.isReserved(txid)) {
+              result &= false;
+            } else {
+              store.reserve(txid);

Review comment:
       The check for reserved and then reserving is a racy.  Could add a new 
method that tries to reserve atomically.
   
   ```suggestion
               if (store.tryReserve(txid)) {
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to