keith-turner commented on a change in pull request #2329:
URL: https://github.com/apache/accumulo/pull/2329#discussion_r752497136



##########
File path: 
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +229,63 @@ public boolean tryLock() {
       }
       SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
       Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
-      if (!iterator.hasNext())
+      if (!iterator.hasNext()) {
         throw new IllegalStateException("Did not find our own lock in the 
queue: " + this.entry
             + " userData " + new String(this.userData, UTF_8) + " lockType " + 
lockType());
-      return iterator.next().getKey().equals(entry);
+      }
+      if (iterator.next().getKey().equals(entry)) {
+        return true;
+      }
+      if (preemptBlockingTransactions) {
+        log.info("Preempting transactions that are blocking this transaction");
+        // Loop through all of the prior transactions that are waiting to
+        // acquire this write lock. If the transaction has not succeeded or 
failed,
+        // then fail it and return false from this method so that 
Utils.reserveX()
+        // will call this method again and will re-check the prior 
transactions.
+        iterator = entries.entrySet().iterator();
+        while (iterator.hasNext()) {
+          Entry<Long,byte[]> e = iterator.next();
+          Long priorEntry = e.getKey();
+          ParsedLock priorLock = new ParsedLock(e.getValue());
+          long priorTxid = Long.valueOf(new String(priorLock.getUserData(), 
UTF_8), 16);
+          if (!priorEntry.equals(entry)) {
+            log.info("Attempting to preempt blocking tx: {}", 
Long.toHexString(priorTxid));
+            // If the prior tx id is reserved, then it's running. We can't 
reserve it and set its
+            // status, we have to interrupt the thread.
+            if (store.isReserved(priorTxid)) {
+              store.cancel(priorTxid);
+            } else {
+              try {
+                store.reserve(priorTxid);
+                store.setStatus(priorTxid, TStatus.FAILED_IN_PROGRESS);
+              } finally {
+                store.unreserve(priorTxid, 0);
+              }
+            }

Review comment:
       Seeing three possible problems with this code.
   
   1. Does not seems like it checks the current status before changing it. If 
the current status were FAILED or SUCCEEDED  (if things change after we read 
from ZK before the loop) then we would not want to set it to 
FAILED_IN_PROGRESS.  Seems like prev versions of this PR did this type of 
check.  Could reserve, get the status, and change it depending on what it is.
   2. When isReserved() returns false and then we call reserve(), it could 
become reserved between those two call which would lead to the reserve call 
blocking which could cause problems.  May still want the tryReserve() call.
   3. Thinking more about the cancel call I realized it may cancel/interrupt a 
transaction that is undoing and I am not completely sure what the implications 
of this are.  I suspect the intention is that it only cancels transactions that 
are in the IN_PROGRESS state and not in the FAILED_IN_PROGRESS state.  
Interrupting something in the FAILED_IN_PROGRESS state (that is when unwinds 
the statck executing undo ops) seems counterproductive.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to