This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new e7d789dc79 Small improvements to AccumuloStore related code (#4121) e7d789dc79 is described below commit e7d789dc7903131ea8682b95ac55a6b954866433 Author: Dom G <domgargu...@apache.org> AuthorDate: Thu Jan 4 14:37:45 2024 -0500 Small improvements to AccumuloStore related code (#4121) --- .../accumulo/core/fate/AbstractFateStore.java | 41 +++++++++------------- .../accumulo/core/fate/accumulo/AccumuloStore.java | 39 +++++++++----------- 2 files changed, 33 insertions(+), 47 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java index 5e840d3247..874b58d8c6 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java @@ -52,7 +52,7 @@ public abstract class AbstractFateStore<T> implements FateStore<T> { private static final Logger log = LoggerFactory.getLogger(AbstractFateStore.class); protected final Set<Long> reserved; - protected final Map<Long,Long> defered; + protected final Map<Long,Long> deferred; // This is incremented each time a transaction was unreserved that was non new protected final SignalCount unreservedNonNewCount = new SignalCount(); @@ -62,16 +62,13 @@ public abstract class AbstractFateStore<T> implements FateStore<T> { public AbstractFateStore() { this.reserved = new HashSet<>(); - this.defered = new HashMap<>(); + this.deferred = new HashMap<>(); } public static byte[] serialize(Object o) { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos)) { oos.writeObject(o); - oos.close(); - return baos.toByteArray(); } catch (IOException e) { throw new UncheckedIOException(e); @@ -82,9 +79,8 @@ public abstract class AbstractFateStore<T> implements FateStore<T> { justification = "unsafe to store arbitrary serialized objects like this, but needed for now" + " for backwards compatibility") public static Object deserialize(byte[] ser) { - try { - ByteArrayInputStream bais = new ByteArrayInputStream(ser); - ObjectInputStream ois = new ObjectInputStream(bais); + try (ByteArrayInputStream bais = new ByteArrayInputStream(ser); + ObjectInputStream ois = new ObjectInputStream(bais)) { return ois.readObject(); } catch (IOException e) { throw new UncheckedIOException(e); @@ -97,7 +93,8 @@ public abstract class AbstractFateStore<T> implements FateStore<T> { * Attempt to reserve transaction * * @param tid transaction id - * @return true if reserved by this call, false if already reserved + * @return An Optional containing the FateTxStore if the transaction was successfully reserved, or + * an empty Optional if the transaction was already reserved. */ @Override public Optional<FateTxStore<T>> tryReserve(long tid) { @@ -144,28 +141,24 @@ public abstract class AbstractFateStore<T> implements FateStore<T> { synchronized (this) { runnableTids.removeIf(txid -> { - var deferedTime = defered.get(txid); - if (deferedTime != null) { - if (deferedTime >= System.currentTimeMillis()) { + var deferredTime = deferred.get(txid); + if (deferredTime != null) { + if (deferredTime >= System.currentTimeMillis()) { return true; } else { - defered.remove(txid); + deferred.remove(txid); } } - if (reserved.contains(txid)) { - return true; - } - - return false; + return reserved.contains(txid); }); } if (runnableTids.isEmpty()) { if (beforeCount == unreservedRunnableCount.getCount()) { long waitTime = 5000; - if (!defered.isEmpty()) { - Long minTime = Collections.min(defered.values()); + if (!deferred.isEmpty()) { + Long minTime = Collections.min(deferred.values()); waitTime = minTime - System.currentTimeMillis(); } @@ -180,7 +173,7 @@ public abstract class AbstractFateStore<T> implements FateStore<T> { } - return List.<Long>of().iterator(); + return Collections.emptyIterator(); } @Override @@ -258,7 +251,7 @@ public abstract class AbstractFateStore<T> implements FateStore<T> { AbstractFateStore.this.notifyAll(); if (deferTime > 0) { - defered.put(tid, System.currentTimeMillis() + deferTime); + deferred.put(tid, System.currentTimeMillis() + deferTime); } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java index aa5883a6d8..1c1284696e 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java @@ -26,7 +26,6 @@ import java.util.Objects; import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; @@ -75,8 +74,7 @@ public class AccumuloStore<T> extends AbstractFateStore<T> { return scanTx(scanner -> { scanner.setRange(new Range()); TxColumnFamily.STATUS_COLUMN.fetch(scanner); - return StreamSupport.stream(scanner.spliterator(), false) - .map(e -> e.getKey().getRow().toString()).collect(Collectors.toList()); + return scanner.stream().map(e -> e.getKey().getRow().toString()).collect(Collectors.toList()); }); } @@ -85,8 +83,8 @@ public class AccumuloStore<T> extends AbstractFateStore<T> { return scanTx(scanner -> { scanner.setRange(getRow(tid)); TxColumnFamily.STATUS_COLUMN.fetch(scanner); - return StreamSupport.stream(scanner.spliterator(), false) - .map(e -> TStatus.valueOf(e.getValue().toString())).findFirst().orElse(TStatus.UNKNOWN); + return scanner.stream().map(e -> TStatus.valueOf(e.getValue().toString())).findFirst() + .orElse(TStatus.UNKNOWN); }); } @@ -125,7 +123,7 @@ public class AccumuloStore<T> extends AbstractFateStore<T> { scanner.setRange(getRow(tid)); scanner.setBatchSize(1); scanner.fetchColumnFamily(RepoColumnFamily.NAME); - return StreamSupport.stream(scanner.spliterator(), false).map(e -> { + return scanner.stream().map(e -> { @SuppressWarnings("unchecked") var repo = (Repo<T>) deserialize(e.getValue().get()); return repo; @@ -140,7 +138,7 @@ public class AccumuloStore<T> extends AbstractFateStore<T> { return scanTx(scanner -> { scanner.setRange(getRow(tid)); scanner.fetchColumnFamily(RepoColumnFamily.NAME); - return StreamSupport.stream(scanner.spliterator(), false).map(e -> { + return scanner.stream().map(e -> { @SuppressWarnings("unchecked") var repo = (ReadOnlyRepo<T>) deserialize(e.getValue().get()); return repo; @@ -174,8 +172,8 @@ public class AccumuloStore<T> extends AbstractFateStore<T> { } scanner.fetchColumn(cq.getColumnFamily(), cq.getColumnQualifier()); - return StreamSupport.stream(scanner.spliterator(), false) - .map(e -> deserializeTxInfo(txInfo, e.getValue().get())).findFirst().orElse(null); + return scanner.stream().map(e -> deserializeTxInfo(txInfo, e.getValue().get())).findFirst() + .orElse(null); } catch (TableNotFoundException e) { throw new IllegalStateException(tableName + " not found!", e); } @@ -188,8 +186,8 @@ public class AccumuloStore<T> extends AbstractFateStore<T> { return scanTx(scanner -> { scanner.setRange(getRow(tid)); TxColumnFamily.CREATE_TIME_COLUMN.fetch(scanner); - return StreamSupport.stream(scanner.spliterator(), false) - .map(e -> Long.parseLong(e.getValue().toString())).findFirst().orElse(0L); + return scanner.stream().map(e -> Long.parseLong(e.getValue().toString())).findFirst() + .orElse(0L); }); } @@ -197,18 +195,14 @@ public class AccumuloStore<T> extends AbstractFateStore<T> { public void push(Repo<T> repo) throws StackOverflowException { verifyReserved(true); - try { - Optional<Integer> top = findTop(); - - if (top.filter(t -> t >= maxRepos).isPresent()) { - throw new StackOverflowException("Repo stack size too large"); - } + Optional<Integer> top = findTop(); - FateMutator<T> fateMutator = newMutator(tid); - fateMutator.putRepo(top.map(t -> t + 1).orElse(1), repo).mutate(); - } catch (StackOverflowException soe) { - throw soe; + if (top.filter(t -> t >= maxRepos).isPresent()) { + throw new StackOverflowException("Repo stack size too large"); } + + FateMutator<T> fateMutator = newMutator(tid); + fateMutator.putRepo(top.map(t -> t + 1).orElse(1), repo).mutate(); } @Override @@ -266,8 +260,7 @@ public class AccumuloStore<T> extends AbstractFateStore<T> { scanner.setRange(getRow(tid)); scanner.setBatchSize(1); scanner.fetchColumnFamily(RepoColumnFamily.NAME); - return StreamSupport.stream(scanner.spliterator(), false) - .map(e -> restoreRepo(e.getKey().getColumnQualifier())).findFirst(); + return scanner.stream().map(e -> restoreRepo(e.getKey().getColumnQualifier())).findFirst(); }); } }