cshannon commented on code in PR #4049:
URL: https://github.com/apache/accumulo/pull/4049#discussion_r1427961768


##########
core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java:
##########
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.fate.Fate.TxInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+public abstract class AbstractFateStore<T> implements FateStore<T> {
+
+  private static final Logger log = 
LoggerFactory.getLogger(AbstractFateStore.class);
+
+  protected final Set<Long> reserved;
+  protected final Map<Long,Long> defered;
+
+  // This is incremented each time a transaction was unreserved that was non 
new
+  protected final SignalCount unreservedNonNewCount = new SignalCount();
+
+  // This is incremented each time a transaction is unreserved that was 
runnable
+  protected final SignalCount unreservedRunnableCount = new SignalCount();
+
+  public AbstractFateStore() {
+    this.reserved = new HashSet<>();
+    this.defered = new HashMap<>();
+  }
+
+  public static byte[] serialize(Object o) {
+    try {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      ObjectOutputStream oos = new ObjectOutputStream(baos);
+      oos.writeObject(o);
+      oos.close();
+
+      return baos.toByteArray();
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @SuppressFBWarnings(value = "OBJECT_DESERIALIZATION",
+      justification = "unsafe to store arbitrary serialized objects like this, 
but needed for now"
+          + " for backwards compatibility")
+  public static Object deserialize(byte[] ser) {
+    try {
+      ByteArrayInputStream bais = new ByteArrayInputStream(ser);
+      ObjectInputStream ois = new ObjectInputStream(bais);
+      return ois.readObject();
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    } catch (ReflectiveOperationException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  /**
+   * Attempt to reserve transaction
+   *
+   * @param tid transaction id
+   * @return true if reserved by this call, false if already reserved
+   */
+  @Override
+  public Optional<FateTxStore<T>> tryReserve(long tid) {
+    synchronized (this) {
+      if (!reserved.contains(tid)) {
+        return Optional.of(reserve(tid));
+      }
+      return Optional.empty();
+    }
+  }
+
+  @Override
+  public FateTxStore<T> reserve(long tid) {
+    synchronized (AbstractFateStore.this) {
+      while (reserved.contains(tid)) {
+        try {
+          AbstractFateStore.this.wait(100);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IllegalStateException(e);
+        }
+      }
+
+      reserved.add(tid);
+      return newFateTxStore(tid, true);
+    }
+  }
+
+  @Override
+  public Iterator<Long> runnable(AtomicBoolean keepWaiting) {
+
+    while (keepWaiting.get()) {
+      ArrayList<Long> runnableTids = new ArrayList<>();
+
+      final long beforeCount = unreservedRunnableCount.getCount();
+
+      List<String> transactions = getTransactions();

Review Comment:
   Yeah that seems like a good goal, we could change things in a future PR so 
the method an return an iterator and the ZK impl's iterator will just be backed 
the in memory list and Accumulo can be a scanner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to