This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 162b8effb1 Replace long + TimeUnit with Duration in 
ReadOnlyTStore.unreserve() (#4358)
162b8effb1 is described below

commit 162b8effb13aacf864c0db6ccefb53bd3f652fa3
Author: Dom G <domgargu...@apache.org>
AuthorDate: Tue Mar 12 09:24:57 2024 -0400

    Replace long + TimeUnit with Duration in ReadOnlyTStore.unreserve() (#4358)
---
 .../org/apache/accumulo/core/fate/AdminUtil.java   |  8 ++++----
 .../org/apache/accumulo/core/fate/AgeOffStore.java | 10 ++++-----
 .../java/org/apache/accumulo/core/fate/Fate.java   | 14 ++++++-------
 .../apache/accumulo/core/fate/ReadOnlyTStore.java  |  5 ++---
 .../org/apache/accumulo/core/fate/ZooStore.java    | 24 ++++++++++------------
 .../apache/accumulo/core/logging/FateLogger.java   |  6 +++---
 .../apache/accumulo/core/fate/AgeOffStoreTest.java | 18 ++++++++--------
 .../org/apache/accumulo/core/fate/TestStore.java   |  4 ++--
 8 files changed, 43 insertions(+), 46 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java 
b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
index 858e6e6998..7cc0a9c004 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.core.fate;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
+import java.time.Duration;
 import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
@@ -32,7 +33,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus;
 import org.apache.accumulo.core.fate.zookeeper.FateLock;
@@ -368,7 +368,7 @@ public class AdminUtil<T> {
 
       long timeCreated = zs.timeCreated(tid);
 
-      zs.unreserve(tid, 0, TimeUnit.MILLISECONDS);
+      zs.unreserve(tid, Duration.ZERO);
 
       if (includeByStatus(status, filterStatus) && includeByTxid(tid, 
filterTxid)) {
         statuses.add(new TransactionStatus(tid, status, txName, hlocks, 
wlocks, top, timeCreated));
@@ -451,7 +451,7 @@ public class AdminUtil<T> {
         break;
     }
 
-    zs.unreserve(txid, 0, TimeUnit.MILLISECONDS);
+    zs.unreserve(txid, Duration.ZERO);
     return state;
   }
 
@@ -495,7 +495,7 @@ public class AdminUtil<T> {
         break;
     }
 
-    zs.unreserve(txid, 0, TimeUnit.MILLISECONDS);
+    zs.unreserve(txid, Duration.ZERO);
     return state;
   }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java
index ca016d0c9c..bd2bd5208b 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AgeOffStore.java
@@ -19,13 +19,13 @@
 package org.apache.accumulo.core.fate;
 
 import java.io.Serializable;
+import java.time.Duration;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -108,7 +108,7 @@ public class AgeOffStore<T> implements TStore<T> {
           }
 
         } finally {
-          store.unreserve(txid, 0, TimeUnit.MILLISECONDS);
+          store.unreserve(txid, Duration.ZERO);
         }
       } catch (Exception e) {
         log.warn("Failed to age off FATE tx " + FateTxId.formatTid(txid), e);
@@ -138,7 +138,7 @@ public class AgeOffStore<T> implements TStore<T> {
             break;
         }
       } finally {
-        store.unreserve(txid, 0, TimeUnit.MILLISECONDS);
+        store.unreserve(txid, Duration.ZERO);
       }
     }
   }
@@ -166,8 +166,8 @@ public class AgeOffStore<T> implements TStore<T> {
   }
 
   @Override
-  public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) {
-    store.unreserve(tid, deferTime, deferTimeUnit);
+  public void unreserve(long tid, Duration deferTime) {
+    store.unreserve(tid, deferTime);
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java 
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 1a14418b1a..4fe07bb8b2 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -30,12 +30,12 @@ import static 
org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.SUCCESSFUL;
 import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.UNKNOWN;
 import static org.apache.accumulo.core.util.ShutdownUtil.isIOException;
 
+import java.time.Duration;
 import java.util.EnumSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
@@ -133,7 +133,7 @@ public class Fate<T> {
           runnerLog.error("Uncaught exception in FATE runner thread.", e);
         } finally {
           if (tid != null) {
-            store.unreserve(tid, deferTime, TimeUnit.MILLISECONDS);
+            store.unreserve(tid, Duration.ofMillis(deferTime));
           }
         }
       }
@@ -289,7 +289,7 @@ public class Fate<T> {
         store.setStatus(tid, SUBMITTED);
       }
     } finally {
-      store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
+      store.unreserve(tid, Duration.ZERO);
     }
 
   }
@@ -325,7 +325,7 @@ public class Fate<T> {
             return false;
           }
         } finally {
-          store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
+          store.unreserve(tid, Duration.ZERO);
         }
       } else {
         // reserved, lets retry.
@@ -356,7 +356,7 @@ public class Fate<T> {
           break;
       }
     } finally {
-      store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
+      store.unreserve(tid, Duration.ZERO);
     }
   }
 
@@ -369,7 +369,7 @@ public class Fate<T> {
       }
       return (String) store.getTransactionInfo(tid, TxInfo.RETURN_VALUE);
     } finally {
-      store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
+      store.unreserve(tid, Duration.ZERO);
     }
   }
 
@@ -383,7 +383,7 @@ public class Fate<T> {
       }
       return (Exception) store.getTransactionInfo(tid, TxInfo.EXCEPTION);
     } finally {
-      store.unreserve(tid, 0, TimeUnit.MILLISECONDS);
+      store.unreserve(tid, Duration.ZERO);
     }
   }
 
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyTStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyTStore.java
index 4a216f1e36..0b48c3b823 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyTStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyTStore.java
@@ -19,9 +19,9 @@
 package org.apache.accumulo.core.fate;
 
 import java.io.Serializable;
+import java.time.Duration;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Read only access to a Transaction Store.
@@ -79,9 +79,8 @@ public interface ReadOnlyTStore<T> {
    * @param tid transaction id, previously reserved.
    * @param deferTime time to keep this transaction out of the pool used in 
the {@link #reserve()
    *        reserve} method. must be non-negative.
-   * @param deferTimeUnit the time unit of deferTime
    */
-  void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit);
+  void unreserve(long tid, Duration deferTime);
 
   /**
    * Get the current operation for the given transaction id.
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
index feecda66a7..941c04c241 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
@@ -30,6 +30,7 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.io.UncheckedIOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -63,7 +64,7 @@ public class ZooStore<T> implements TStore<T> {
   private ZooReaderWriter zk;
   private String lastReserved = "";
   private Set<Long> reserved;
-  private Map<Long,Long> deferred;
+  private Map<Long,Long> deferred; // use Long here to properly handle 
System.nanoTime()
   private long statusChangeEvents = 0;
   private int reservationsWaiting = 0;
 
@@ -163,7 +164,7 @@ public class ZooStore<T> implements TStore<T> {
             }
 
             if (deferred.containsKey(tid)) {
-              if ((deferred.get(tid) - System.nanoTime()) < 0) {
+              if (deferred.get(tid) - System.nanoTime() < 0) {
                 deferred.remove(tid);
               } else {
                 continue;
@@ -202,12 +203,10 @@ public class ZooStore<T> implements TStore<T> {
             if (deferred.isEmpty()) {
               this.wait(5000);
             } else {
-              long currTime = System.nanoTime();
-              long minWait =
-                  deferred.values().stream().mapToLong(l -> l - 
currTime).min().getAsLong();
-              long waitTime = TimeUnit.MILLISECONDS.convert(minWait, 
TimeUnit.NANOSECONDS);
-              if (waitTime > 0) {
-                this.wait(Math.min(waitTime, 5000));
+              final long now = System.nanoTime();
+              long minWait = deferred.values().stream().mapToLong(l -> l - 
now).min().orElseThrow();
+              if (minWait > 0) {
+                this.wait(Math.min(TimeUnit.NANOSECONDS.toMillis(minWait), 
5000));
               }
             }
           }
@@ -272,10 +271,9 @@ public class ZooStore<T> implements TStore<T> {
   }
 
   @Override
-  public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) {
-    deferTime = TimeUnit.NANOSECONDS.convert(deferTime, deferTimeUnit);
+  public void unreserve(long tid, Duration deferTime) {
 
-    if (deferTime < 0) {
+    if (deferTime.isNegative()) {
       throw new IllegalArgumentException("deferTime < 0 : " + deferTime);
     }
 
@@ -285,8 +283,8 @@ public class ZooStore<T> implements TStore<T> {
             "Tried to unreserve id that was not reserved " + 
FateTxId.formatTid(tid));
       }
 
-      if (deferTime > 0) {
-        deferred.put(tid, System.nanoTime() + deferTime);
+      if (deferTime.compareTo(Duration.ZERO) > 0) {
+        deferred.put(tid, deferTime.toNanos() + System.nanoTime());
       }
 
       this.notifyAll();
diff --git 
a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java 
b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
index ccad01a7f1..fa7ed86e08 100644
--- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
@@ -21,9 +21,9 @@ package org.apache.accumulo.core.logging;
 import static org.apache.accumulo.core.fate.FateTxId.formatTid;
 
 import java.io.Serializable;
+import java.time.Duration;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
 import org.apache.accumulo.core.fate.Fate;
@@ -62,8 +62,8 @@ public class FateLogger {
       }
 
       @Override
-      public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) {
-        store.unreserve(tid, deferTime, deferTimeUnit);
+      public void unreserve(long tid, Duration deferTime) {
+        store.unreserve(tid, deferTime);
       }
 
       @Override
diff --git 
a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java 
b/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java
index c2b086ee34..42adc60bef 100644
--- a/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/AgeOffStoreTest.java
@@ -20,9 +20,9 @@ package org.apache.accumulo.core.fate;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
+import java.time.Duration;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.fate.AgeOffStore.TimeSource;
 import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus;
@@ -53,7 +53,7 @@ public class AgeOffStoreTest {
     long txid1 = aoStore.create();
     aoStore.reserve(txid1);
     aoStore.setStatus(txid1, TStatus.IN_PROGRESS);
-    aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);
+    aoStore.unreserve(txid1, Duration.ZERO);
 
     aoStore.ageOff();
 
@@ -61,7 +61,7 @@ public class AgeOffStoreTest {
     aoStore.reserve(txid2);
     aoStore.setStatus(txid2, TStatus.IN_PROGRESS);
     aoStore.setStatus(txid2, TStatus.FAILED);
-    aoStore.unreserve(txid2, 0, TimeUnit.MILLISECONDS);
+    aoStore.unreserve(txid2, Duration.ZERO);
 
     tts.time = 6;
 
@@ -69,7 +69,7 @@ public class AgeOffStoreTest {
     aoStore.reserve(txid3);
     aoStore.setStatus(txid3, TStatus.IN_PROGRESS);
     aoStore.setStatus(txid3, TStatus.SUCCESSFUL);
-    aoStore.unreserve(txid3, 0, TimeUnit.MILLISECONDS);
+    aoStore.unreserve(txid3, Duration.ZERO);
 
     Long txid4 = aoStore.create();
 
@@ -102,19 +102,19 @@ public class AgeOffStoreTest {
     long txid1 = testStore.create();
     testStore.reserve(txid1);
     testStore.setStatus(txid1, TStatus.IN_PROGRESS);
-    testStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);
+    testStore.unreserve(txid1, Duration.ZERO);
 
     long txid2 = testStore.create();
     testStore.reserve(txid2);
     testStore.setStatus(txid2, TStatus.IN_PROGRESS);
     testStore.setStatus(txid2, TStatus.FAILED);
-    testStore.unreserve(txid2, 0, TimeUnit.MILLISECONDS);
+    testStore.unreserve(txid2, Duration.ZERO);
 
     long txid3 = testStore.create();
     testStore.reserve(txid3);
     testStore.setStatus(txid3, TStatus.IN_PROGRESS);
     testStore.setStatus(txid3, TStatus.SUCCESSFUL);
-    testStore.unreserve(txid3, 0, TimeUnit.MILLISECONDS);
+    testStore.unreserve(txid3, Duration.ZERO);
 
     Long txid4 = testStore.create();
 
@@ -137,7 +137,7 @@ public class AgeOffStoreTest {
 
     aoStore.reserve(txid1);
     aoStore.setStatus(txid1, TStatus.FAILED_IN_PROGRESS);
-    aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);
+    aoStore.unreserve(txid1, Duration.ZERO);
 
     tts.time = 30;
 
@@ -148,7 +148,7 @@ public class AgeOffStoreTest {
 
     aoStore.reserve(txid1);
     aoStore.setStatus(txid1, TStatus.FAILED);
-    aoStore.unreserve(txid1, 0, TimeUnit.MILLISECONDS);
+    aoStore.unreserve(txid1, Duration.ZERO);
 
     aoStore.ageOff();
 
diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java 
b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
index 3253c41a90..ba06a51ff9 100644
--- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
@@ -18,13 +18,13 @@
  */
 package org.apache.accumulo.core.fate;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Transient in memory store for transactions.
@@ -62,7 +62,7 @@ public class TestStore extends ZooStore<String> {
   }
 
   @Override
-  public void unreserve(long tid, long deferTime, TimeUnit deferTimeUnit) {
+  public void unreserve(long tid, Duration deferTime) {
     if (!reserved.remove(tid)) {
       throw new IllegalStateException();
     }

Reply via email to