keith-turner commented on a change in pull request #2329:
URL: https://github.com/apache/accumulo/pull/2329#discussion_r741111823



##########
File path: 
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +233,82 @@ public boolean tryLock() {
       }
       SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
       Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
-      if (!iterator.hasNext())
+      if (!iterator.hasNext()) {
         throw new IllegalStateException("Did not find our own lock in the 
queue: " + this.entry
             + " userData " + new String(this.userData, UTF_8) + " lockType " + 
lockType());
-      return iterator.next().getKey().equals(entry);
+      }
+      if (!failBlockers) {
+        return iterator.next().getKey().equals(entry);
+      } else {
+        ZooStore<DistributedReadWriteLock> zs;
+        try {
+          zs = new ZooStore<>(zooPath, zrw);

Review comment:
       Need to use the same ZooStore object that the manager/Fate framework is 
using, or will not get mutual exclusion when reserving transactions.  Look at 
the impl of zs.reserve(long), by creating a new object will be using a 
different hashset and synchronize on diff obj monitors than the manager/Fate 
framework.

##########
File path: 
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +233,82 @@ public boolean tryLock() {
       }
       SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
       Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
-      if (!iterator.hasNext())
+      if (!iterator.hasNext()) {
         throw new IllegalStateException("Did not find our own lock in the 
queue: " + this.entry
             + " userData " + new String(this.userData, UTF_8) + " lockType " + 
lockType());
-      return iterator.next().getKey().equals(entry);
+      }
+      if (!failBlockers) {
+        return iterator.next().getKey().equals(entry);
+      } else {
+        ZooStore<DistributedReadWriteLock> zs;
+        try {
+          zs = new ZooStore<>(zooPath, zrw);
+        } catch (KeeperException | InterruptedException e1) {
+          log.error("Error creating zoo store", e1);
+          return false;
+        }
+        // Loop through all of the prior transactions that are waiting to
+        // acquire this write lock. If the transaction has not succeeded or 
failed,
+        // then fail it and return false from this method so that 
Utils.reserveX()
+        // will call this method again and will re-check the prior 
transactions.
+        boolean result = true;
+        while (iterator.hasNext()) {
+          Entry<Long,byte[]> e = iterator.next();
+          Long txid = e.getKey();
+          if (!txid.equals(entry)) {
+            if (zs.isReserved(txid)) {
+              result &= false;
+            } else {
+              zs.reserve(txid);
+              TStatus status = zs.getStatus(txid);
+              if (status.equals(TStatus.FAILED)) {
+                result = false;
+                continue;
+              } else {
+                switch (status) {
+                  case UNKNOWN:
+                    // not sure what to do here, we have an invalid txid
+                    break;
+                  case SUCCESSFUL:
+                    // already succeeded, lock should be removed
+                    break;
+                  case FAILED:
+                  case FAILED_IN_PROGRESS:
+                    // transaction failed or in process of failing
+                    break;
+                  default:
+                    zs.setStatus(txid, TStatus.FAILED_IN_PROGRESS);
+                    result &= false;
+                }
+              }
+              zs.unreserve(txid, 0);
+            }
+          } else {
+            return result && txid.equals(entry);
+          }
+        }
+        return result;

Review comment:
       The following high level structure would be nice, where if we are first 
in the queue return true no matter what fail blockers is... also never return 
true unless we are the 1st in the queue.  Can't really apply this suggestion, 
just used a suggestion to make clear the scope I was thinking about.
   
   ```suggestion
         if( iterator.next().getKey().equals(entry)){
             return true;
         }
   
         if (failBlockers) {
               // attempt to fail anything before this lock entry in the 
queue.. but never return true unless ours is the 1st
           }
           return false;
   ```

##########
File path: 
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +233,82 @@ public boolean tryLock() {
       }
       SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
       Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
-      if (!iterator.hasNext())
+      if (!iterator.hasNext()) {
         throw new IllegalStateException("Did not find our own lock in the 
queue: " + this.entry
             + " userData " + new String(this.userData, UTF_8) + " lockType " + 
lockType());
-      return iterator.next().getKey().equals(entry);
+      }
+      if (!failBlockers) {
+        return iterator.next().getKey().equals(entry);
+      } else {
+        ZooStore<DistributedReadWriteLock> zs;
+        try {
+          zs = new ZooStore<>(zooPath, zrw);
+        } catch (KeeperException | InterruptedException e1) {
+          log.error("Error creating zoo store", e1);
+          return false;
+        }
+        // Loop through all of the prior transactions that are waiting to
+        // acquire this write lock. If the transaction has not succeeded or 
failed,
+        // then fail it and return false from this method so that 
Utils.reserveX()
+        // will call this method again and will re-check the prior 
transactions.
+        boolean result = true;
+        while (iterator.hasNext()) {
+          Entry<Long,byte[]> e = iterator.next();
+          Long txid = e.getKey();
+          if (!txid.equals(entry)) {
+            if (zs.isReserved(txid)) {
+              result &= false;
+            } else {
+              zs.reserve(txid);
+              TStatus status = zs.getStatus(txid);
+              if (status.equals(TStatus.FAILED)) {
+                result = false;
+                continue;
+              } else {
+                switch (status) {
+                  case UNKNOWN:
+                    // not sure what to do here, we have an invalid txid
+                    break;
+                  case SUCCESSFUL:
+                    // already succeeded, lock should be removed
+                    break;
+                  case FAILED:
+                  case FAILED_IN_PROGRESS:

Review comment:
       Investigation would be needed to confirm this, but I suspect we should 
only expect to see IN_PROGRESS and FAILED_IN_PROGRESS transactions actually 
holding the lock.  Anything else seems like and indication of a more general 
problem that we should not attempt to react to here.  We could log a warn/error 
if we see them, but it may not be a good idea to let the lock be acquired when 
the system is an unexpected state. This kinda goes w/ my other comment of 
always returning false when we are not first in the queue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to