keith-turner commented on a change in pull request #2329:
URL: https://github.com/apache/accumulo/pull/2329#discussion_r750691497



##########
File path: 
core/src/main/java/org/apache/accumulo/fate/zookeeper/DistributedReadWriteLock.java
##########
@@ -218,22 +228,52 @@ public boolean tryLock() {
       }
       SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
       Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
-      if (!iterator.hasNext())
+      if (!iterator.hasNext()) {
         throw new IllegalStateException("Did not find our own lock in the 
queue: " + this.entry
             + " userData " + new String(this.userData, UTF_8) + " lockType " + 
lockType());
-      return iterator.next().getKey().equals(entry);
+      }
+      if (iterator.next().getKey().equals(entry)) {
+        return true;
+      }
+      if (preemptBlockingTransactions) {
+        log.debug("Preempting transactions that are blocking this 
transaction");
+        // Loop through all of the prior transactions that are waiting to
+        // acquire this write lock. If the transaction has not succeeded or 
failed,
+        // then fail it and return false from this method so that 
Utils.reserveX()
+        // will call this method again and will re-check the prior 
transactions.
+        iterator = entries.entrySet().iterator();
+        while (iterator.hasNext()) {
+          Entry<Long,byte[]> e = iterator.next();
+          Long priorEntry = e.getKey();
+          ParsedLock priorLock = new ParsedLock(e.getValue());
+          long priorTxid = Long.valueOf(new String(priorLock.getUserData(), 
UTF_8), 16);
+          if (!priorEntry.equals(entry)) {
+            log.info("Attempting to preempt blocking tx: {}", 
Long.toHexString(priorTxid));
+            store.cancel(priorTxid);

Review comment:
       If its not currently reserved, could possibly reserved it and change the 
state to FAILED_IN_PROGRESS.
   
   ```java
     if(reserved)
        interrupt()
     else
       changeToFailedInProgress()
   ``` 

##########
File path: core/src/main/java/org/apache/accumulo/fate/ZooStore.java
##########
@@ -529,4 +527,16 @@ public long timeCreated(long tid) {
       return dops;
     }
   }
+
+  /**
+   * Attempt to cancel the reserved transaction by interrupting the Thread 
that is running it
+   *
+   * @param tid
+   *          transaction id
+   */
+  public void cancel(long tid) {
+    if (reserved.containsKey(tid)) {

Review comment:
       synchronization is needed in this method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to