keith-turner commented on a change in pull request #2329:
URL: https://github.com/apache/accumulo/pull/2329#discussion_r740333841



##########
File path: 
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +237,54 @@ public boolean tryLock() {
       }
       SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
       Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
-      if (!iterator.hasNext())
+      if (!iterator.hasNext()) {
         throw new IllegalStateException("Did not find our own lock in the 
queue: " + this.entry
             + " userData " + new String(this.userData, UTF_8) + " lockType " + 
lockType());
-      return iterator.next().getKey().equals(entry);
+      }
+      if (!failBlockers) {
+        return iterator.next().getKey().equals(entry);
+      } else {
+        ZooStore<DistributedReadWriteLock> zs;
+        try {
+          zs = new ZooStore<>(zooPath, zrw);
+        } catch (KeeperException | InterruptedException e1) {
+          log.error("Error creating zoo store", e1);
+          return false;
+        }
+        final AdminUtil<DistributedReadWriteLock> util = new AdminUtil<>();
+        boolean result = true;
+        while (iterator.hasNext()) {
+          Entry<Long,byte[]> e = iterator.next();
+          if (!e.getKey().equals(entry)) {
+            result &= util.prepFail(zs, zrw, zooManagerPath, 
Long.toString(e.getKey(), 16));

Review comment:
       @dlmarion  could possible do something like the following... where the 
code in  `failPrecedingOperations()` would go through and transition operations 
to FAILED_IN_PROGRESS... also thinking that code could transition NEW things 
straight to FAILED instead of to FAILED_IN_PROGRESS but not sure.   However 
there is not waiting on these other operations.   The FATE framework will 
reserve a txid, take the top repo off the stack, and then call isReady().  If 
isReady returns non-zero it will unreserve and the txid and come back to it 
later.  So if isReady transitions things to FAILED_IN_PROGRESS then hopefully a 
later call to isReady can get the lock sooner.
   
   ```java
      class SomeRepo extends Repo {
           public long isReady(long tid,...) {
               long waitTime = tryToGetTableWriteLock(...);
   
              if(waitTime > 0 && failBlockers) {
                   // we did not get the the table write lock, so lets initiate 
the failure of all operations that precede us in the lock queue.... hopefully 
after these fail they will release the lock and we will get the lock on a later 
call to isRead()
                  failPrecedingOperations(tid)
              }
              
              return waitTime;
           }
       
      }
   ```

##########
File path: 
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +237,54 @@ public boolean tryLock() {
       }
       SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
       Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
-      if (!iterator.hasNext())
+      if (!iterator.hasNext()) {
         throw new IllegalStateException("Did not find our own lock in the 
queue: " + this.entry
             + " userData " + new String(this.userData, UTF_8) + " lockType " + 
lockType());
-      return iterator.next().getKey().equals(entry);
+      }
+      if (!failBlockers) {
+        return iterator.next().getKey().equals(entry);
+      } else {
+        ZooStore<DistributedReadWriteLock> zs;
+        try {
+          zs = new ZooStore<>(zooPath, zrw);
+        } catch (KeeperException | InterruptedException e1) {
+          log.error("Error creating zoo store", e1);
+          return false;
+        }
+        final AdminUtil<DistributedReadWriteLock> util = new AdminUtil<>();
+        boolean result = true;
+        while (iterator.hasNext()) {
+          Entry<Long,byte[]> e = iterator.next();
+          if (!e.getKey().equals(entry)) {
+            result &= util.prepFail(zs, zrw, zooManagerPath, 
Long.toString(e.getKey(), 16));

Review comment:
       @dlmarion  could possibly do something like the following... where the 
code in  `failPrecedingOperations()` would go through and transition operations 
to FAILED_IN_PROGRESS... also thinking that code could transition NEW things 
straight to FAILED instead of to FAILED_IN_PROGRESS but not sure.   However 
there is not waiting on these other operations.   The FATE framework will 
reserve a txid, take the top repo off the stack, and then call isReady().  If 
isReady() returns non-zero it will unreserve and the txid and come back to it 
later.  So if isReady transitions things to FAILED_IN_PROGRESS then hopefully a 
later call to isReady can get the lock sooner.
   
   ```java
      class SomeRepo extends Repo {
           public long isReady(long tid,...) {
               long waitTime = tryToGetTableWriteLock(...);
   
              if(waitTime > 0 && failBlockers) {
                   // we did not get the the table write lock, so lets initiate 
the failure of all operations that precede us in the lock queue.... hopefully 
after these fail they will release the lock and we will get the lock on a later 
call to isRead()
                  failPrecedingOperations(tid)
              }
              
              return waitTime;
           }
       
      }
   ```

##########
File path: 
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +237,54 @@ public boolean tryLock() {
       }
       SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
       Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
-      if (!iterator.hasNext())
+      if (!iterator.hasNext()) {
         throw new IllegalStateException("Did not find our own lock in the 
queue: " + this.entry
             + " userData " + new String(this.userData, UTF_8) + " lockType " + 
lockType());
-      return iterator.next().getKey().equals(entry);
+      }
+      if (!failBlockers) {
+        return iterator.next().getKey().equals(entry);
+      } else {
+        ZooStore<DistributedReadWriteLock> zs;
+        try {
+          zs = new ZooStore<>(zooPath, zrw);
+        } catch (KeeperException | InterruptedException e1) {
+          log.error("Error creating zoo store", e1);
+          return false;
+        }
+        final AdminUtil<DistributedReadWriteLock> util = new AdminUtil<>();
+        boolean result = true;
+        while (iterator.hasNext()) {
+          Entry<Long,byte[]> e = iterator.next();
+          if (!e.getKey().equals(entry)) {
+            result &= util.prepFail(zs, zrw, zooManagerPath, 
Long.toString(e.getKey(), 16));

Review comment:
       For the prev comment, failPrecedingOperations() code could possibly be 
pushed into the lock code.  I teased it out in the example above to show how it 
could work w/ the isReady() call to avoid waiting for transitions to happen. 

##########
File path: 
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +233,82 @@ public boolean tryLock() {
       }
       SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
       Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
-      if (!iterator.hasNext())
+      if (!iterator.hasNext()) {
         throw new IllegalStateException("Did not find our own lock in the 
queue: " + this.entry
             + " userData " + new String(this.userData, UTF_8) + " lockType " + 
lockType());
-      return iterator.next().getKey().equals(entry);
+      }
+      if (!failBlockers) {
+        return iterator.next().getKey().equals(entry);
+      } else {
+        ZooStore<DistributedReadWriteLock> zs;
+        try {
+          zs = new ZooStore<>(zooPath, zrw);

Review comment:
       Need to use the same ZooStore object that the manager/Fate framework is 
using, or will not get mutual exclusion when reserving transactions.  Look at 
the impl of zs.reserve(long), by creating a new object will be using a 
different hashset and synchronize on diff obj monitors than the manager/Fate 
framework.

##########
File path: 
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +233,82 @@ public boolean tryLock() {
       }
       SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
       Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
-      if (!iterator.hasNext())
+      if (!iterator.hasNext()) {
         throw new IllegalStateException("Did not find our own lock in the 
queue: " + this.entry
             + " userData " + new String(this.userData, UTF_8) + " lockType " + 
lockType());
-      return iterator.next().getKey().equals(entry);
+      }
+      if (!failBlockers) {
+        return iterator.next().getKey().equals(entry);
+      } else {
+        ZooStore<DistributedReadWriteLock> zs;
+        try {
+          zs = new ZooStore<>(zooPath, zrw);
+        } catch (KeeperException | InterruptedException e1) {
+          log.error("Error creating zoo store", e1);
+          return false;
+        }
+        // Loop through all of the prior transactions that are waiting to
+        // acquire this write lock. If the transaction has not succeeded or 
failed,
+        // then fail it and return false from this method so that 
Utils.reserveX()
+        // will call this method again and will re-check the prior 
transactions.
+        boolean result = true;
+        while (iterator.hasNext()) {
+          Entry<Long,byte[]> e = iterator.next();
+          Long txid = e.getKey();
+          if (!txid.equals(entry)) {
+            if (zs.isReserved(txid)) {
+              result &= false;
+            } else {
+              zs.reserve(txid);
+              TStatus status = zs.getStatus(txid);
+              if (status.equals(TStatus.FAILED)) {
+                result = false;
+                continue;
+              } else {
+                switch (status) {
+                  case UNKNOWN:
+                    // not sure what to do here, we have an invalid txid
+                    break;
+                  case SUCCESSFUL:
+                    // already succeeded, lock should be removed
+                    break;
+                  case FAILED:
+                  case FAILED_IN_PROGRESS:
+                    // transaction failed or in process of failing
+                    break;
+                  default:
+                    zs.setStatus(txid, TStatus.FAILED_IN_PROGRESS);
+                    result &= false;
+                }
+              }
+              zs.unreserve(txid, 0);
+            }
+          } else {
+            return result && txid.equals(entry);
+          }
+        }
+        return result;

Review comment:
       The following high level structure would be nice, where if we are first 
in the queue return true no matter what fail blockers is... also never return 
true unless we are the 1st in the queue.  Can't really apply this suggestion, 
just used a suggestion to make clear the scope I was thinking about.
   
   ```suggestion
         if( iterator.next().getKey().equals(entry)){
             return true;
         }
   
         if (failBlockers) {
               // attempt to fail anything before this lock entry in the 
queue.. but never return true unless ours is the 1st
           }
           return false;
   ```

##########
File path: 
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +233,82 @@ public boolean tryLock() {
       }
       SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
       Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
-      if (!iterator.hasNext())
+      if (!iterator.hasNext()) {
         throw new IllegalStateException("Did not find our own lock in the 
queue: " + this.entry
             + " userData " + new String(this.userData, UTF_8) + " lockType " + 
lockType());
-      return iterator.next().getKey().equals(entry);
+      }
+      if (!failBlockers) {
+        return iterator.next().getKey().equals(entry);
+      } else {
+        ZooStore<DistributedReadWriteLock> zs;
+        try {
+          zs = new ZooStore<>(zooPath, zrw);
+        } catch (KeeperException | InterruptedException e1) {
+          log.error("Error creating zoo store", e1);
+          return false;
+        }
+        // Loop through all of the prior transactions that are waiting to
+        // acquire this write lock. If the transaction has not succeeded or 
failed,
+        // then fail it and return false from this method so that 
Utils.reserveX()
+        // will call this method again and will re-check the prior 
transactions.
+        boolean result = true;
+        while (iterator.hasNext()) {
+          Entry<Long,byte[]> e = iterator.next();
+          Long txid = e.getKey();
+          if (!txid.equals(entry)) {
+            if (zs.isReserved(txid)) {
+              result &= false;
+            } else {
+              zs.reserve(txid);
+              TStatus status = zs.getStatus(txid);
+              if (status.equals(TStatus.FAILED)) {
+                result = false;
+                continue;
+              } else {
+                switch (status) {
+                  case UNKNOWN:
+                    // not sure what to do here, we have an invalid txid
+                    break;
+                  case SUCCESSFUL:
+                    // already succeeded, lock should be removed
+                    break;
+                  case FAILED:
+                  case FAILED_IN_PROGRESS:

Review comment:
       Investigation would be needed to confirm this, but I suspect we should 
only expect to see IN_PROGRESS and FAILED_IN_PROGRESS transactions actually 
holding the lock.  Anything else seems like and indication of a more general 
problem that we should not attempt to react to here.  We could log a warn/error 
if we see them, but it may not be a good idea to let the lock be acquired when 
the system is an unexpected state. This kinda goes w/ my other comment of 
always returning false when we are not first in the queue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to