keith-turner commented on code in PR #4049:
URL: https://github.com/apache/accumulo/pull/4049#discussion_r1427283947


##########
core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate.accumulo;
+
+import static java.util.Collections.reverseOrder;
+import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.fate.AbstractFateStore;
+import org.apache.accumulo.core.fate.Fate.TxInfo;
+import org.apache.accumulo.core.fate.ReadOnlyRepo;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.fate.StackOverflowException;
+import 
org.apache.accumulo.core.fate.accumulo.schema.FateSchema.RepoColumnFamily;
+import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxColumnFamily;
+import 
org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxInfoColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.core.util.FastFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AccumuloStore<T> extends AbstractFateStore<T> {
+
+  private static Logger log = LoggerFactory.getLogger(AccumuloStore.class);
+
+  private static final Comparator<Entry<Key,Value>> repoComparator =
+      Comparator.comparing(o -> o.getKey().getColumnQualifier(), 
reverseOrder());
+
+  private final ClientContext context;
+  private final String tableName;
+
+  public AccumuloStore(ClientContext context, String tableName) {
+    this.context = Objects.requireNonNull(context);
+    this.tableName = Objects.requireNonNull(tableName);
+  }
+
+  @Override
+  public long create() {
+    long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL;
+
+    // TODO = conditional mutation and retry if exists?
+    
newMutator(tid).putStatus(TStatus.NEW).putCreateTime(System.currentTimeMillis()).mutate();
+
+    return tid;
+  }
+
+  @Override
+  protected List<String> getTransactions() {
+    return scanTx(scanner -> {
+      scanner.setRange(new Range());
+      scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(),
+          TxColumnFamily.STATUS_COLUMN.getColumnQualifier());
+      return StreamSupport.stream(scanner.spliterator(), false)
+          .map(e -> 
e.getKey().getRow().toString()).collect(Collectors.toList());
+    });
+  }
+
+  @Override
+  protected TStatus _getStatus(long tid) {
+    return scanTx(scanner -> {
+      scanner.setRange(getRow(tid));
+      scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(),
+          TxColumnFamily.STATUS_COLUMN.getColumnQualifier());
+      return StreamSupport.stream(scanner.spliterator(), false)
+          .map(e -> 
TStatus.valueOf(e.getValue().toString())).findFirst().orElse(TStatus.UNKNOWN);
+    });
+  }
+
+  @Override
+  protected FateTxStore<T> newFateTxStore(long tid, boolean isReserved) {
+    return new FateTxStoreImpl(tid, isReserved);
+  }
+
+  static Range getRow(long tid) {
+    return new Range("tx_" + FastFormat.toHexString(tid));
+  }
+
+  private <T> FateMutatorImpl<T> newMutator(long tid) {
+    return new FateMutatorImpl<>(context, tableName, tid);
+  }
+
+  private <R> R scanTx(Function<Scanner,R> func) {
+    try (Scanner scanner = context.createScanner(tableName, 
Authorizations.EMPTY)) {
+      return func.apply(scanner);
+    } catch (TableNotFoundException e) {
+      throw new IllegalStateException(tableName + " not found!", e);
+    }
+  }
+
+  private class FateTxStoreImpl extends AbstractFateTxStoreImpl<T> {
+
+    private FateTxStoreImpl(long tid, boolean isReserved) {
+      super(tid, isReserved);
+    }
+
+    @Override
+    public Repo<T> top() {
+      verifyReserved(false);
+
+      return scanTx(scanner -> {
+        scanner.setRange(getRow(tid));
+        scanner.fetchColumnFamily(RepoColumnFamily.NAME);
+        return StreamSupport.stream(scanner.spliterator(), 
false).sorted(repoComparator).map(e -> {
+          @SuppressWarnings("unchecked")
+          var repo = (Repo<T>) deserialize(e.getValue().get());
+          return repo;
+        }).findFirst().orElse(null);
+      });
+    }
+
+    @Override
+    public List<ReadOnlyRepo<T>> getStack() {
+      verifyReserved(false);
+
+      return scanTx(scanner -> {
+        scanner.setRange(getRow(tid));
+        scanner.fetchColumnFamily(RepoColumnFamily.NAME);
+        return StreamSupport.stream(scanner.spliterator(), 
false).sorted(repoComparator).map(e -> {
+          @SuppressWarnings("unchecked")
+          var repo = (ReadOnlyRepo<T>) deserialize(e.getValue().get());
+          return repo;
+        }).collect(Collectors.toList());
+      });
+    }
+
+    @Override
+    public Serializable getTransactionInfo(TxInfo txInfo) {
+      verifyReserved(false);
+
+      try (Scanner scanner = context.createScanner(tableName, 
Authorizations.EMPTY)) {
+        scanner.setRange(new Range("tx_" + FastFormat.toHexString(tid)));

Review Comment:
   Seems like the following could be done.
   
   ```suggestion
           scanner.setRange(getRow(tid));
   ```



##########
test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate.accumulo;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.time.Duration;
+import java.util.List;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.fate.Fate.TxInfo;
+import org.apache.accumulo.core.fate.FateStore.FateTxStore;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
+import org.apache.accumulo.core.fate.ReadOnlyRepo;
+import org.apache.accumulo.core.fate.accumulo.AccumuloStore;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.test.fate.zookeeper.FateIT.TestOperation;
+import org.junit.jupiter.api.Test;
+
+public class AccumuloStoreReadWriteIT extends AccumuloClusterHarness {
+
+  private static final NamespaceId NS = NamespaceId.of("testNameSpace");
+  private static final TableId TID = TableId.of("testTable");
+
+  @Override
+  protected Duration defaultTimeout() {
+    return Duration.ofMinutes(1);
+  }
+
+  @Test
+  public void testReadWrite() throws Exception {
+    final String table = getUniqueNames(1)[0];
+    try (ClientContext client =
+        (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(table);
+
+      AccumuloStore<Manager> store = new AccumuloStore<>(client, table);
+      // Verify no transactions
+      assertEquals(0, store.list().size());
+
+      // Create a new transaction and get the store for it
+      long tid = store.create();
+      FateTxStore<Manager> txStore = store.reserve(tid);
+      assertTrue(txStore.timeCreated() > 0);
+      assertEquals(1, store.list().size());
+
+      // Push a test FATE op and verify we can read it back
+      txStore.push(new TestOperation(NS, TID));
+      TestOperation op = (TestOperation) txStore.top();
+      assertNotNull(op);
+
+      // Test status
+      txStore.setStatus(TStatus.SUBMITTED);
+      assertEquals(TStatus.SUBMITTED, txStore.getStatus());
+
+      // Set a name to test setTransactionInfo()
+      txStore.setTransactionInfo(TxInfo.TX_NAME, "name");
+      assertEquals("name", txStore.getTransactionInfo(TxInfo.TX_NAME));
+
+      // Try setting a second test op to test getStack()
+      // when listing or popping TestOperation2 should be first
+      assertEquals(1, txStore.getStack().size());
+      txStore.push(new TestOperation2(NS, TID));
+      // test top returns TestOperation2
+      ReadOnlyRepo<Manager> top = txStore.top();
+      assertInstanceOf(TestOperation2.class, top);
+
+      // test get stack
+      List<ReadOnlyRepo<Manager>> ops = txStore.getStack();

Review Comment:
   Could be in another test method.  Would be nice to push a lot of things on 
the stack, like ~100 to ensure it maintains the order.  Could have a large 
stack test.
   
   Would probably be good to have a set of test that can run against both store 
impls.  Like this is a nice test could probably also run against the zoostore 
impl.  That could be a follow on issue.
   
   Wonder if we could make some of the existing fate ITs use this new store and 
the zoostore.



##########
core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate.accumulo;
+
+import static java.util.Collections.reverseOrder;
+import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.fate.AbstractFateStore;
+import org.apache.accumulo.core.fate.Fate.TxInfo;
+import org.apache.accumulo.core.fate.ReadOnlyRepo;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.fate.StackOverflowException;
+import 
org.apache.accumulo.core.fate.accumulo.schema.FateSchema.RepoColumnFamily;
+import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxColumnFamily;
+import 
org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxInfoColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.core.util.FastFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AccumuloStore<T> extends AbstractFateStore<T> {
+
+  private static Logger log = LoggerFactory.getLogger(AccumuloStore.class);
+
+  private static final Comparator<Entry<Key,Value>> repoComparator =
+      Comparator.comparing(o -> o.getKey().getColumnQualifier(), 
reverseOrder());
+
+  private final ClientContext context;
+  private final String tableName;
+
+  public AccumuloStore(ClientContext context, String tableName) {
+    this.context = Objects.requireNonNull(context);
+    this.tableName = Objects.requireNonNull(tableName);
+  }
+
+  @Override
+  public long create() {
+    long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL;
+
+    // TODO = conditional mutation and retry if exists?
+    
newMutator(tid).putStatus(TStatus.NEW).putCreateTime(System.currentTimeMillis()).mutate();
+
+    return tid;
+  }
+
+  @Override
+  protected List<String> getTransactions() {
+    return scanTx(scanner -> {
+      scanner.setRange(new Range());
+      scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(),
+          TxColumnFamily.STATUS_COLUMN.getColumnQualifier());
+      return StreamSupport.stream(scanner.spliterator(), false)
+          .map(e -> 
e.getKey().getRow().toString()).collect(Collectors.toList());
+    });
+  }
+
+  @Override
+  protected TStatus _getStatus(long tid) {
+    return scanTx(scanner -> {
+      scanner.setRange(getRow(tid));
+      scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(),
+          TxColumnFamily.STATUS_COLUMN.getColumnQualifier());
+      return StreamSupport.stream(scanner.spliterator(), false)
+          .map(e -> 
TStatus.valueOf(e.getValue().toString())).findFirst().orElse(TStatus.UNKNOWN);
+    });
+  }
+
+  @Override
+  protected FateTxStore<T> newFateTxStore(long tid, boolean isReserved) {
+    return new FateTxStoreImpl(tid, isReserved);
+  }
+
+  static Range getRow(long tid) {
+    return new Range("tx_" + FastFormat.toHexString(tid));
+  }
+
+  private <T> FateMutatorImpl<T> newMutator(long tid) {
+    return new FateMutatorImpl<>(context, tableName, tid);
+  }
+
+  private <R> R scanTx(Function<Scanner,R> func) {
+    try (Scanner scanner = context.createScanner(tableName, 
Authorizations.EMPTY)) {
+      return func.apply(scanner);
+    } catch (TableNotFoundException e) {
+      throw new IllegalStateException(tableName + " not found!", e);
+    }
+  }
+
+  private class FateTxStoreImpl extends AbstractFateTxStoreImpl<T> {
+
+    private FateTxStoreImpl(long tid, boolean isReserved) {
+      super(tid, isReserved);
+    }
+
+    @Override
+    public Repo<T> top() {
+      verifyReserved(false);
+
+      return scanTx(scanner -> {
+        scanner.setRange(getRow(tid));
+        scanner.fetchColumnFamily(RepoColumnFamily.NAME);
+        return StreamSupport.stream(scanner.spliterator(), 
false).sorted(repoComparator).map(e -> {

Review Comment:
   We could change how the column qualifier is encoded such that the latest 
entry on the stack sorts first. Then we would not need to read everything here 
and sort it.  Encoding could be that we start with 99 for the first thing on 
the stack and decrement each time we add something.  So for a stack with three 
things on it it would have 97,98,99 and 97 would be the newest entry and would 
sort first.  Would need to use fixed width encoding of 2 with zero padding so 
that 01,02, etc would sort correctly.
   
   Eventually we could have a specialized iterator for reading the top entry.  
The scanner always reads a batch of key values, so even if we only read the 
first thing from the scanner it will read more.  With an iterator it could read 
the first entry only and stop so the scanner is not reading uneeded data for 
get top.  Alternatively we could set the batch size on the scanner to a small 
number to avoid reading too much data for get top.



##########
test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate.accumulo;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.time.Duration;
+import java.util.List;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.fate.Fate.TxInfo;
+import org.apache.accumulo.core.fate.FateStore.FateTxStore;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
+import org.apache.accumulo.core.fate.ReadOnlyRepo;
+import org.apache.accumulo.core.fate.accumulo.AccumuloStore;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.test.fate.zookeeper.FateIT.TestOperation;
+import org.junit.jupiter.api.Test;
+
+public class AccumuloStoreReadWriteIT extends AccumuloClusterHarness {
+
+  private static final NamespaceId NS = NamespaceId.of("testNameSpace");
+  private static final TableId TID = TableId.of("testTable");
+
+  @Override
+  protected Duration defaultTimeout() {
+    return Duration.ofMinutes(1);
+  }
+
+  @Test
+  public void testReadWrite() throws Exception {
+    final String table = getUniqueNames(1)[0];
+    try (ClientContext client =
+        (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+      client.tableOperations().create(table);
+
+      AccumuloStore<Manager> store = new AccumuloStore<>(client, table);
+      // Verify no transactions
+      assertEquals(0, store.list().size());
+
+      // Create a new transaction and get the store for it
+      long tid = store.create();
+      FateTxStore<Manager> txStore = store.reserve(tid);
+      assertTrue(txStore.timeCreated() > 0);
+      assertEquals(1, store.list().size());
+
+      // Push a test FATE op and verify we can read it back
+      txStore.push(new TestOperation(NS, TID));
+      TestOperation op = (TestOperation) txStore.top();
+      assertNotNull(op);
+
+      // Test status
+      txStore.setStatus(TStatus.SUBMITTED);
+      assertEquals(TStatus.SUBMITTED, txStore.getStatus());
+
+      // Set a name to test setTransactionInfo()
+      txStore.setTransactionInfo(TxInfo.TX_NAME, "name");
+      assertEquals("name", txStore.getTransactionInfo(TxInfo.TX_NAME));
+
+      // Try setting a second test op to test getStack()
+      // when listing or popping TestOperation2 should be first
+      assertEquals(1, txStore.getStack().size());
+      txStore.push(new TestOperation2(NS, TID));
+      // test top returns TestOperation2
+      ReadOnlyRepo<Manager> top = txStore.top();
+      assertInstanceOf(TestOperation2.class, top);
+
+      // test get stack
+      List<ReadOnlyRepo<Manager>> ops = txStore.getStack();
+      assertEquals(2, ops.size());
+      assertInstanceOf(TestOperation2.class, ops.get(0));
+      assertEquals(TestOperation.class, ops.get(1).getClass());
+
+      // test pop, TestOperation should be left
+      txStore.pop();
+      ops = txStore.getStack();
+      assertEquals(1, ops.size());
+      assertEquals(TestOperation.class, ops.get(0).getClass());
+
+      // create second
+      FateTxStore<Manager> txStore2 = store.reserve(store.create());
+      assertEquals(2, store.list().size());
+
+      // test delete
+      txStore.delete();
+      assertEquals(1, store.list().size());
+      txStore2.delete();
+      assertEquals(0, store.list().size());
+    }
+  }
+
+  private static class TestOperation2 extends TestOperation {
+
+    private static final long serialVersionUID = 1L;
+
+    public TestOperation2(NamespaceId namespaceId, TableId tableId) {
+      super(namespaceId, tableId);
+    }
+  }
+

Review Comment:
   I experimented with making Fate use this this new store by adding the 
following to the test and it worked.  
   
   ```java
   public static class TestEnv {
   
     }
   
     public static class TestRepo implements Repo<TestEnv> {
   
       private final String data;
   
       TestRepo(String data) {
         this.data = data;
       }
   
       @Override
       public long isReady(long tid, TestEnv environment) throws Exception {
         System.out.println("Im ready "+data);
         return 0;
       }
   
       @Override
       public String getName() {
         return "TestRepo_"+data;
       }
   
       @Override
       public Repo<TestEnv> call(long tid, TestEnv environment) throws 
Exception {
         System.out.println("hello world : "+data);
   
         if(data.endsWith("_2")) {
           return null;
         }
   
         return new TestRepo(data+"_2");
       }
   
       @Override
       public void undo(long tid, TestEnv environment) throws Exception {
   
       }
   
       @Override
       public String getReturn() {
         return data+"_ret";
       }
     }
   
     @Test
     public void testFate() throws Exception {
       final String table = getUniqueNames(1)[0];
       try (ClientContext client =
                    (ClientContext) 
Accumulo.newClient().from(getClientProps()).build()) {
         client.tableOperations().create(table);
   
         AccumuloStore<TestEnv> store = new AccumuloStore<>(client, table);
   
         TestEnv testEnv = new TestEnv();
   
         Fate<TestEnv> fate = new Fate<>(testEnv, store, r->r+"", 
DefaultConfiguration.getInstance());
   
         long txid1 = fate.startTransaction();
         fate.seedTransaction("hw", txid1, new TestRepo("test_a"), false, "test 
accumulo store");
   
         fate.waitForCompletion(txid1);
   
         System.out.println("Return : "+fate.getReturn(txid1));
   
         fate.delete(txid1);
   
         fate.shutdown();
       }
     }
   ```
   
   Saw the following output.
   
   ```
   2023-12-14T17:20:10,104 [fate.Fate] INFO : Seeding FATE[21f670d0f198ad56] 
test accumulo store
   Im ready test_a
   hello world : test_a
   Im ready test_a_2
   hello world : test_a_2
   Return : test_a_2_ret
   
   ```



##########
core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java:
##########
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.accumulo.core.fate.Fate.TxInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+public abstract class AbstractFateStore<T> implements FateStore<T> {
+
+  private static final Logger log = 
LoggerFactory.getLogger(AbstractFateStore.class);
+
+  protected final Set<Long> reserved;
+  protected final Map<Long,Long> defered;
+
+  // This is incremented each time a transaction was unreserved that was non 
new
+  protected final SignalCount unreservedNonNewCount = new SignalCount();
+
+  // This is incremented each time a transaction is unreserved that was 
runnable
+  protected final SignalCount unreservedRunnableCount = new SignalCount();
+
+  public AbstractFateStore() {
+    this.reserved = new HashSet<>();
+    this.defered = new HashMap<>();
+  }
+
+  public static byte[] serialize(Object o) {
+    try {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      ObjectOutputStream oos = new ObjectOutputStream(baos);
+      oos.writeObject(o);
+      oos.close();
+
+      return baos.toByteArray();
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @SuppressFBWarnings(value = "OBJECT_DESERIALIZATION",
+      justification = "unsafe to store arbitrary serialized objects like this, 
but needed for now"
+          + " for backwards compatibility")
+  public static Object deserialize(byte[] ser) {
+    try {
+      ByteArrayInputStream bais = new ByteArrayInputStream(ser);
+      ObjectInputStream ois = new ObjectInputStream(bais);
+      return ois.readObject();
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    } catch (ReflectiveOperationException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  /**
+   * Attempt to reserve transaction
+   *
+   * @param tid transaction id
+   * @return true if reserved by this call, false if already reserved
+   */
+  @Override
+  public Optional<FateTxStore<T>> tryReserve(long tid) {
+    synchronized (this) {
+      if (!reserved.contains(tid)) {
+        return Optional.of(reserve(tid));
+      }
+      return Optional.empty();
+    }
+  }
+
+  @Override
+  public FateTxStore<T> reserve(long tid) {
+    synchronized (AbstractFateStore.this) {
+      while (reserved.contains(tid)) {
+        try {
+          AbstractFateStore.this.wait(100);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IllegalStateException(e);
+        }
+      }
+
+      reserved.add(tid);
+      return newFateTxStore(tid, true);
+    }
+  }
+
+  @Override
+  public Iterator<Long> runnable(AtomicBoolean keepWaiting) {
+
+    while (keepWaiting.get()) {
+      ArrayList<Long> runnableTids = new ArrayList<>();
+
+      final long beforeCount = unreservedRunnableCount.getCount();
+
+      List<String> transactions = getTransactions();

Review Comment:
   Not something to worry about in this PR.  Eventually, I hope we can avoid 
buffering all of the txid in memory and the Iterator this method returns is 
directly linked to a filtering stream backed by an accumulo scanner.



##########
core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate.accumulo;
+
+import static java.util.Collections.reverseOrder;
+import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.fate.AbstractFateStore;
+import org.apache.accumulo.core.fate.Fate.TxInfo;
+import org.apache.accumulo.core.fate.ReadOnlyRepo;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.fate.StackOverflowException;
+import 
org.apache.accumulo.core.fate.accumulo.schema.FateSchema.RepoColumnFamily;
+import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxColumnFamily;
+import 
org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxInfoColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.core.util.FastFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AccumuloStore<T> extends AbstractFateStore<T> {
+
+  private static Logger log = LoggerFactory.getLogger(AccumuloStore.class);
+
+  private static final Comparator<Entry<Key,Value>> repoComparator =
+      Comparator.comparing(o -> o.getKey().getColumnQualifier(), 
reverseOrder());
+
+  private final ClientContext context;
+  private final String tableName;
+
+  public AccumuloStore(ClientContext context, String tableName) {
+    this.context = Objects.requireNonNull(context);
+    this.tableName = Objects.requireNonNull(tableName);
+  }
+
+  @Override
+  public long create() {
+    long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL;
+
+    // TODO = conditional mutation and retry if exists?
+    
newMutator(tid).putStatus(TStatus.NEW).putCreateTime(System.currentTimeMillis()).mutate();
+
+    return tid;
+  }
+
+  @Override
+  protected List<String> getTransactions() {
+    return scanTx(scanner -> {
+      scanner.setRange(new Range());
+      scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(),
+          TxColumnFamily.STATUS_COLUMN.getColumnQualifier());
+      return StreamSupport.stream(scanner.spliterator(), false)
+          .map(e -> 
e.getKey().getRow().toString()).collect(Collectors.toList());
+    });
+  }
+
+  @Override
+  protected TStatus _getStatus(long tid) {
+    return scanTx(scanner -> {
+      scanner.setRange(getRow(tid));
+      scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(),
+          TxColumnFamily.STATUS_COLUMN.getColumnQualifier());
+      return StreamSupport.stream(scanner.spliterator(), false)
+          .map(e -> 
TStatus.valueOf(e.getValue().toString())).findFirst().orElse(TStatus.UNKNOWN);
+    });
+  }
+
+  @Override
+  protected FateTxStore<T> newFateTxStore(long tid, boolean isReserved) {
+    return new FateTxStoreImpl(tid, isReserved);
+  }
+
+  static Range getRow(long tid) {
+    return new Range("tx_" + FastFormat.toHexString(tid));
+  }
+
+  private <T> FateMutatorImpl<T> newMutator(long tid) {
+    return new FateMutatorImpl<>(context, tableName, tid);
+  }
+
+  private <R> R scanTx(Function<Scanner,R> func) {
+    try (Scanner scanner = context.createScanner(tableName, 
Authorizations.EMPTY)) {
+      return func.apply(scanner);
+    } catch (TableNotFoundException e) {
+      throw new IllegalStateException(tableName + " not found!", e);
+    }
+  }
+
+  private class FateTxStoreImpl extends AbstractFateTxStoreImpl<T> {
+
+    private FateTxStoreImpl(long tid, boolean isReserved) {
+      super(tid, isReserved);
+    }
+
+    @Override
+    public Repo<T> top() {
+      verifyReserved(false);
+
+      return scanTx(scanner -> {
+        scanner.setRange(getRow(tid));
+        scanner.fetchColumnFamily(RepoColumnFamily.NAME);
+        return StreamSupport.stream(scanner.spliterator(), 
false).sorted(repoComparator).map(e -> {
+          @SuppressWarnings("unchecked")
+          var repo = (Repo<T>) deserialize(e.getValue().get());
+          return repo;
+        }).findFirst().orElse(null);
+      });
+    }
+
+    @Override
+    public List<ReadOnlyRepo<T>> getStack() {
+      verifyReserved(false);
+
+      return scanTx(scanner -> {
+        scanner.setRange(getRow(tid));
+        scanner.fetchColumnFamily(RepoColumnFamily.NAME);
+        return StreamSupport.stream(scanner.spliterator(), 
false).sorted(repoComparator).map(e -> {
+          @SuppressWarnings("unchecked")
+          var repo = (ReadOnlyRepo<T>) deserialize(e.getValue().get());
+          return repo;
+        }).collect(Collectors.toList());
+      });
+    }
+
+    @Override
+    public Serializable getTransactionInfo(TxInfo txInfo) {
+      verifyReserved(false);
+
+      try (Scanner scanner = context.createScanner(tableName, 
Authorizations.EMPTY)) {
+        scanner.setRange(new Range("tx_" + FastFormat.toHexString(tid)));
+
+        final ColumnFQ cq;
+        switch (txInfo) {
+          case TX_NAME:
+            cq = TxInfoColumnFamily.TX_NAME_COLUMN;
+            break;
+          case AUTO_CLEAN:
+            cq = TxInfoColumnFamily.AUTO_CLEAN_COLUMN;
+            break;
+          case EXCEPTION:
+            cq = TxInfoColumnFamily.EXCEPTION_COLUMN;
+            break;
+          case RETURN_VALUE:
+            cq = TxInfoColumnFamily.RETURN_VALUE_COLUMN;
+            break;
+          default:
+            throw new IllegalArgumentException("Unexpected TxInfo type " + 
txInfo);
+        }
+        scanner.fetchColumn(cq.getColumnFamily(), cq.getColumnQualifier());
+
+        return StreamSupport.stream(scanner.spliterator(), false)
+            .map(e -> deserializeTxInfo(txInfo, 
e.getValue().get())).findFirst().orElse(null);
+      } catch (TableNotFoundException e) {
+        throw new IllegalStateException(tableName + " not found!", e);
+      }
+    }
+
+    @Override
+    public long timeCreated() {
+      verifyReserved(false);
+
+      return scanTx(scanner -> {
+        scanner.setRange(getRow(tid));
+        
scanner.fetchColumn(TxColumnFamily.CREATE_TIME_COLUMN.getColumnFamily(),
+            TxColumnFamily.CREATE_TIME_COLUMN.getColumnQualifier());

Review Comment:
   Can do the following to shorten the code.
   
   ```suggestion
           TxColumnFamily.CREATE_TIME_COLUMN.fetch(scanner);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to