keith-turner commented on code in PR #4049: URL: https://github.com/apache/accumulo/pull/4049#discussion_r1427283947
########## core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java: ########## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate.accumulo; + +import static java.util.Collections.reverseOrder; +import static org.apache.accumulo.core.util.LazySingletons.RANDOM; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.ReadOnlyRepo; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.fate.StackOverflowException; +import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.RepoColumnFamily; +import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxColumnFamily; +import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxInfoColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.accumulo.core.util.FastFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AccumuloStore<T> extends AbstractFateStore<T> { + + private static Logger log = LoggerFactory.getLogger(AccumuloStore.class); + + private static final Comparator<Entry<Key,Value>> repoComparator = + Comparator.comparing(o -> o.getKey().getColumnQualifier(), reverseOrder()); + + private final ClientContext context; + private final String tableName; + + public AccumuloStore(ClientContext context, String tableName) { + this.context = Objects.requireNonNull(context); + this.tableName = Objects.requireNonNull(tableName); + } + + @Override + public long create() { + long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL; + + // TODO = conditional mutation and retry if exists? + newMutator(tid).putStatus(TStatus.NEW).putCreateTime(System.currentTimeMillis()).mutate(); + + return tid; + } + + @Override + protected List<String> getTransactions() { + return scanTx(scanner -> { + scanner.setRange(new Range()); + scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(), + TxColumnFamily.STATUS_COLUMN.getColumnQualifier()); + return StreamSupport.stream(scanner.spliterator(), false) + .map(e -> e.getKey().getRow().toString()).collect(Collectors.toList()); + }); + } + + @Override + protected TStatus _getStatus(long tid) { + return scanTx(scanner -> { + scanner.setRange(getRow(tid)); + scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(), + TxColumnFamily.STATUS_COLUMN.getColumnQualifier()); + return StreamSupport.stream(scanner.spliterator(), false) + .map(e -> TStatus.valueOf(e.getValue().toString())).findFirst().orElse(TStatus.UNKNOWN); + }); + } + + @Override + protected FateTxStore<T> newFateTxStore(long tid, boolean isReserved) { + return new FateTxStoreImpl(tid, isReserved); + } + + static Range getRow(long tid) { + return new Range("tx_" + FastFormat.toHexString(tid)); + } + + private <T> FateMutatorImpl<T> newMutator(long tid) { + return new FateMutatorImpl<>(context, tableName, tid); + } + + private <R> R scanTx(Function<Scanner,R> func) { + try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { + return func.apply(scanner); + } catch (TableNotFoundException e) { + throw new IllegalStateException(tableName + " not found!", e); + } + } + + private class FateTxStoreImpl extends AbstractFateTxStoreImpl<T> { + + private FateTxStoreImpl(long tid, boolean isReserved) { + super(tid, isReserved); + } + + @Override + public Repo<T> top() { + verifyReserved(false); + + return scanTx(scanner -> { + scanner.setRange(getRow(tid)); + scanner.fetchColumnFamily(RepoColumnFamily.NAME); + return StreamSupport.stream(scanner.spliterator(), false).sorted(repoComparator).map(e -> { + @SuppressWarnings("unchecked") + var repo = (Repo<T>) deserialize(e.getValue().get()); + return repo; + }).findFirst().orElse(null); + }); + } + + @Override + public List<ReadOnlyRepo<T>> getStack() { + verifyReserved(false); + + return scanTx(scanner -> { + scanner.setRange(getRow(tid)); + scanner.fetchColumnFamily(RepoColumnFamily.NAME); + return StreamSupport.stream(scanner.spliterator(), false).sorted(repoComparator).map(e -> { + @SuppressWarnings("unchecked") + var repo = (ReadOnlyRepo<T>) deserialize(e.getValue().get()); + return repo; + }).collect(Collectors.toList()); + }); + } + + @Override + public Serializable getTransactionInfo(TxInfo txInfo) { + verifyReserved(false); + + try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { + scanner.setRange(new Range("tx_" + FastFormat.toHexString(tid))); Review Comment: Seems like the following could be done. ```suggestion scanner.setRange(getRow(tid)); ``` ########## test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate.accumulo; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.List; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.NamespaceId; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.FateStore.FateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.fate.ReadOnlyRepo; +import org.apache.accumulo.core.fate.accumulo.AccumuloStore; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.test.fate.zookeeper.FateIT.TestOperation; +import org.junit.jupiter.api.Test; + +public class AccumuloStoreReadWriteIT extends AccumuloClusterHarness { + + private static final NamespaceId NS = NamespaceId.of("testNameSpace"); + private static final TableId TID = TableId.of("testTable"); + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(1); + } + + @Test + public void testReadWrite() throws Exception { + final String table = getUniqueNames(1)[0]; + try (ClientContext client = + (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { + client.tableOperations().create(table); + + AccumuloStore<Manager> store = new AccumuloStore<>(client, table); + // Verify no transactions + assertEquals(0, store.list().size()); + + // Create a new transaction and get the store for it + long tid = store.create(); + FateTxStore<Manager> txStore = store.reserve(tid); + assertTrue(txStore.timeCreated() > 0); + assertEquals(1, store.list().size()); + + // Push a test FATE op and verify we can read it back + txStore.push(new TestOperation(NS, TID)); + TestOperation op = (TestOperation) txStore.top(); + assertNotNull(op); + + // Test status + txStore.setStatus(TStatus.SUBMITTED); + assertEquals(TStatus.SUBMITTED, txStore.getStatus()); + + // Set a name to test setTransactionInfo() + txStore.setTransactionInfo(TxInfo.TX_NAME, "name"); + assertEquals("name", txStore.getTransactionInfo(TxInfo.TX_NAME)); + + // Try setting a second test op to test getStack() + // when listing or popping TestOperation2 should be first + assertEquals(1, txStore.getStack().size()); + txStore.push(new TestOperation2(NS, TID)); + // test top returns TestOperation2 + ReadOnlyRepo<Manager> top = txStore.top(); + assertInstanceOf(TestOperation2.class, top); + + // test get stack + List<ReadOnlyRepo<Manager>> ops = txStore.getStack(); Review Comment: Could be in another test method. Would be nice to push a lot of things on the stack, like ~100 to ensure it maintains the order. Could have a large stack test. Would probably be good to have a set of test that can run against both store impls. Like this is a nice test could probably also run against the zoostore impl. That could be a follow on issue. Wonder if we could make some of the existing fate ITs use this new store and the zoostore. ########## core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java: ########## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate.accumulo; + +import static java.util.Collections.reverseOrder; +import static org.apache.accumulo.core.util.LazySingletons.RANDOM; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.ReadOnlyRepo; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.fate.StackOverflowException; +import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.RepoColumnFamily; +import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxColumnFamily; +import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxInfoColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.accumulo.core.util.FastFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AccumuloStore<T> extends AbstractFateStore<T> { + + private static Logger log = LoggerFactory.getLogger(AccumuloStore.class); + + private static final Comparator<Entry<Key,Value>> repoComparator = + Comparator.comparing(o -> o.getKey().getColumnQualifier(), reverseOrder()); + + private final ClientContext context; + private final String tableName; + + public AccumuloStore(ClientContext context, String tableName) { + this.context = Objects.requireNonNull(context); + this.tableName = Objects.requireNonNull(tableName); + } + + @Override + public long create() { + long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL; + + // TODO = conditional mutation and retry if exists? + newMutator(tid).putStatus(TStatus.NEW).putCreateTime(System.currentTimeMillis()).mutate(); + + return tid; + } + + @Override + protected List<String> getTransactions() { + return scanTx(scanner -> { + scanner.setRange(new Range()); + scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(), + TxColumnFamily.STATUS_COLUMN.getColumnQualifier()); + return StreamSupport.stream(scanner.spliterator(), false) + .map(e -> e.getKey().getRow().toString()).collect(Collectors.toList()); + }); + } + + @Override + protected TStatus _getStatus(long tid) { + return scanTx(scanner -> { + scanner.setRange(getRow(tid)); + scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(), + TxColumnFamily.STATUS_COLUMN.getColumnQualifier()); + return StreamSupport.stream(scanner.spliterator(), false) + .map(e -> TStatus.valueOf(e.getValue().toString())).findFirst().orElse(TStatus.UNKNOWN); + }); + } + + @Override + protected FateTxStore<T> newFateTxStore(long tid, boolean isReserved) { + return new FateTxStoreImpl(tid, isReserved); + } + + static Range getRow(long tid) { + return new Range("tx_" + FastFormat.toHexString(tid)); + } + + private <T> FateMutatorImpl<T> newMutator(long tid) { + return new FateMutatorImpl<>(context, tableName, tid); + } + + private <R> R scanTx(Function<Scanner,R> func) { + try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { + return func.apply(scanner); + } catch (TableNotFoundException e) { + throw new IllegalStateException(tableName + " not found!", e); + } + } + + private class FateTxStoreImpl extends AbstractFateTxStoreImpl<T> { + + private FateTxStoreImpl(long tid, boolean isReserved) { + super(tid, isReserved); + } + + @Override + public Repo<T> top() { + verifyReserved(false); + + return scanTx(scanner -> { + scanner.setRange(getRow(tid)); + scanner.fetchColumnFamily(RepoColumnFamily.NAME); + return StreamSupport.stream(scanner.spliterator(), false).sorted(repoComparator).map(e -> { Review Comment: We could change how the column qualifier is encoded such that the latest entry on the stack sorts first. Then we would not need to read everything here and sort it. Encoding could be that we start with 99 for the first thing on the stack and decrement each time we add something. So for a stack with three things on it it would have 97,98,99 and 97 would be the newest entry and would sort first. Would need to use fixed width encoding of 2 with zero padding so that 01,02, etc would sort correctly. Eventually we could have a specialized iterator for reading the top entry. The scanner always reads a batch of key values, so even if we only read the first thing from the scanner it will read more. With an iterator it could read the first entry only and stop so the scanner is not reading uneeded data for get top. Alternatively we could set the batch size on the scanner to a small number to avoid reading too much data for get top. ########## test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreReadWriteIT.java: ########## @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.fate.accumulo; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.List; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.NamespaceId; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.FateStore.FateTxStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus; +import org.apache.accumulo.core.fate.ReadOnlyRepo; +import org.apache.accumulo.core.fate.accumulo.AccumuloStore; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.test.fate.zookeeper.FateIT.TestOperation; +import org.junit.jupiter.api.Test; + +public class AccumuloStoreReadWriteIT extends AccumuloClusterHarness { + + private static final NamespaceId NS = NamespaceId.of("testNameSpace"); + private static final TableId TID = TableId.of("testTable"); + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(1); + } + + @Test + public void testReadWrite() throws Exception { + final String table = getUniqueNames(1)[0]; + try (ClientContext client = + (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { + client.tableOperations().create(table); + + AccumuloStore<Manager> store = new AccumuloStore<>(client, table); + // Verify no transactions + assertEquals(0, store.list().size()); + + // Create a new transaction and get the store for it + long tid = store.create(); + FateTxStore<Manager> txStore = store.reserve(tid); + assertTrue(txStore.timeCreated() > 0); + assertEquals(1, store.list().size()); + + // Push a test FATE op and verify we can read it back + txStore.push(new TestOperation(NS, TID)); + TestOperation op = (TestOperation) txStore.top(); + assertNotNull(op); + + // Test status + txStore.setStatus(TStatus.SUBMITTED); + assertEquals(TStatus.SUBMITTED, txStore.getStatus()); + + // Set a name to test setTransactionInfo() + txStore.setTransactionInfo(TxInfo.TX_NAME, "name"); + assertEquals("name", txStore.getTransactionInfo(TxInfo.TX_NAME)); + + // Try setting a second test op to test getStack() + // when listing or popping TestOperation2 should be first + assertEquals(1, txStore.getStack().size()); + txStore.push(new TestOperation2(NS, TID)); + // test top returns TestOperation2 + ReadOnlyRepo<Manager> top = txStore.top(); + assertInstanceOf(TestOperation2.class, top); + + // test get stack + List<ReadOnlyRepo<Manager>> ops = txStore.getStack(); + assertEquals(2, ops.size()); + assertInstanceOf(TestOperation2.class, ops.get(0)); + assertEquals(TestOperation.class, ops.get(1).getClass()); + + // test pop, TestOperation should be left + txStore.pop(); + ops = txStore.getStack(); + assertEquals(1, ops.size()); + assertEquals(TestOperation.class, ops.get(0).getClass()); + + // create second + FateTxStore<Manager> txStore2 = store.reserve(store.create()); + assertEquals(2, store.list().size()); + + // test delete + txStore.delete(); + assertEquals(1, store.list().size()); + txStore2.delete(); + assertEquals(0, store.list().size()); + } + } + + private static class TestOperation2 extends TestOperation { + + private static final long serialVersionUID = 1L; + + public TestOperation2(NamespaceId namespaceId, TableId tableId) { + super(namespaceId, tableId); + } + } + Review Comment: I experimented with making Fate use this this new store by adding the following to the test and it worked. ```java public static class TestEnv { } public static class TestRepo implements Repo<TestEnv> { private final String data; TestRepo(String data) { this.data = data; } @Override public long isReady(long tid, TestEnv environment) throws Exception { System.out.println("Im ready "+data); return 0; } @Override public String getName() { return "TestRepo_"+data; } @Override public Repo<TestEnv> call(long tid, TestEnv environment) throws Exception { System.out.println("hello world : "+data); if(data.endsWith("_2")) { return null; } return new TestRepo(data+"_2"); } @Override public void undo(long tid, TestEnv environment) throws Exception { } @Override public String getReturn() { return data+"_ret"; } } @Test public void testFate() throws Exception { final String table = getUniqueNames(1)[0]; try (ClientContext client = (ClientContext) Accumulo.newClient().from(getClientProps()).build()) { client.tableOperations().create(table); AccumuloStore<TestEnv> store = new AccumuloStore<>(client, table); TestEnv testEnv = new TestEnv(); Fate<TestEnv> fate = new Fate<>(testEnv, store, r->r+"", DefaultConfiguration.getInstance()); long txid1 = fate.startTransaction(); fate.seedTransaction("hw", txid1, new TestRepo("test_a"), false, "test accumulo store"); fate.waitForCompletion(txid1); System.out.println("Return : "+fate.getReturn(txid1)); fate.delete(txid1); fate.shutdown(); } } ``` Saw the following output. ``` 2023-12-14T17:20:10,104 [fate.Fate] INFO : Seeding FATE[21f670d0f198ad56] test accumulo store Im ready test_a hello world : test_a Im ready test_a_2 hello world : test_a_2 Return : test_a_2_ret ``` ########## core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java: ########## @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +public abstract class AbstractFateStore<T> implements FateStore<T> { + + private static final Logger log = LoggerFactory.getLogger(AbstractFateStore.class); + + protected final Set<Long> reserved; + protected final Map<Long,Long> defered; + + // This is incremented each time a transaction was unreserved that was non new + protected final SignalCount unreservedNonNewCount = new SignalCount(); + + // This is incremented each time a transaction is unreserved that was runnable + protected final SignalCount unreservedRunnableCount = new SignalCount(); + + public AbstractFateStore() { + this.reserved = new HashSet<>(); + this.defered = new HashMap<>(); + } + + public static byte[] serialize(Object o) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(o); + oos.close(); + + return baos.toByteArray(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @SuppressFBWarnings(value = "OBJECT_DESERIALIZATION", + justification = "unsafe to store arbitrary serialized objects like this, but needed for now" + + " for backwards compatibility") + public static Object deserialize(byte[] ser) { + try { + ByteArrayInputStream bais = new ByteArrayInputStream(ser); + ObjectInputStream ois = new ObjectInputStream(bais); + return ois.readObject(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (ReflectiveOperationException e) { + throw new IllegalStateException(e); + } + } + + /** + * Attempt to reserve transaction + * + * @param tid transaction id + * @return true if reserved by this call, false if already reserved + */ + @Override + public Optional<FateTxStore<T>> tryReserve(long tid) { + synchronized (this) { + if (!reserved.contains(tid)) { + return Optional.of(reserve(tid)); + } + return Optional.empty(); + } + } + + @Override + public FateTxStore<T> reserve(long tid) { + synchronized (AbstractFateStore.this) { + while (reserved.contains(tid)) { + try { + AbstractFateStore.this.wait(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + } + + reserved.add(tid); + return newFateTxStore(tid, true); + } + } + + @Override + public Iterator<Long> runnable(AtomicBoolean keepWaiting) { + + while (keepWaiting.get()) { + ArrayList<Long> runnableTids = new ArrayList<>(); + + final long beforeCount = unreservedRunnableCount.getCount(); + + List<String> transactions = getTransactions(); Review Comment: Not something to worry about in this PR. Eventually, I hope we can avoid buffering all of the txid in memory and the Iterator this method returns is directly linked to a filtering stream backed by an accumulo scanner. ########## core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java: ########## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.fate.accumulo; + +import static java.util.Collections.reverseOrder; +import static org.apache.accumulo.core.util.LazySingletons.RANDOM; + +import java.io.Serializable; +import java.util.Comparator; +import java.util.List; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.fate.AbstractFateStore; +import org.apache.accumulo.core.fate.Fate.TxInfo; +import org.apache.accumulo.core.fate.ReadOnlyRepo; +import org.apache.accumulo.core.fate.Repo; +import org.apache.accumulo.core.fate.StackOverflowException; +import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.RepoColumnFamily; +import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxColumnFamily; +import org.apache.accumulo.core.fate.accumulo.schema.FateSchema.TxInfoColumnFamily; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.accumulo.core.util.FastFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AccumuloStore<T> extends AbstractFateStore<T> { + + private static Logger log = LoggerFactory.getLogger(AccumuloStore.class); + + private static final Comparator<Entry<Key,Value>> repoComparator = + Comparator.comparing(o -> o.getKey().getColumnQualifier(), reverseOrder()); + + private final ClientContext context; + private final String tableName; + + public AccumuloStore(ClientContext context, String tableName) { + this.context = Objects.requireNonNull(context); + this.tableName = Objects.requireNonNull(tableName); + } + + @Override + public long create() { + long tid = RANDOM.get().nextLong() & 0x7fffffffffffffffL; + + // TODO = conditional mutation and retry if exists? + newMutator(tid).putStatus(TStatus.NEW).putCreateTime(System.currentTimeMillis()).mutate(); + + return tid; + } + + @Override + protected List<String> getTransactions() { + return scanTx(scanner -> { + scanner.setRange(new Range()); + scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(), + TxColumnFamily.STATUS_COLUMN.getColumnQualifier()); + return StreamSupport.stream(scanner.spliterator(), false) + .map(e -> e.getKey().getRow().toString()).collect(Collectors.toList()); + }); + } + + @Override + protected TStatus _getStatus(long tid) { + return scanTx(scanner -> { + scanner.setRange(getRow(tid)); + scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(), + TxColumnFamily.STATUS_COLUMN.getColumnQualifier()); + return StreamSupport.stream(scanner.spliterator(), false) + .map(e -> TStatus.valueOf(e.getValue().toString())).findFirst().orElse(TStatus.UNKNOWN); + }); + } + + @Override + protected FateTxStore<T> newFateTxStore(long tid, boolean isReserved) { + return new FateTxStoreImpl(tid, isReserved); + } + + static Range getRow(long tid) { + return new Range("tx_" + FastFormat.toHexString(tid)); + } + + private <T> FateMutatorImpl<T> newMutator(long tid) { + return new FateMutatorImpl<>(context, tableName, tid); + } + + private <R> R scanTx(Function<Scanner,R> func) { + try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { + return func.apply(scanner); + } catch (TableNotFoundException e) { + throw new IllegalStateException(tableName + " not found!", e); + } + } + + private class FateTxStoreImpl extends AbstractFateTxStoreImpl<T> { + + private FateTxStoreImpl(long tid, boolean isReserved) { + super(tid, isReserved); + } + + @Override + public Repo<T> top() { + verifyReserved(false); + + return scanTx(scanner -> { + scanner.setRange(getRow(tid)); + scanner.fetchColumnFamily(RepoColumnFamily.NAME); + return StreamSupport.stream(scanner.spliterator(), false).sorted(repoComparator).map(e -> { + @SuppressWarnings("unchecked") + var repo = (Repo<T>) deserialize(e.getValue().get()); + return repo; + }).findFirst().orElse(null); + }); + } + + @Override + public List<ReadOnlyRepo<T>> getStack() { + verifyReserved(false); + + return scanTx(scanner -> { + scanner.setRange(getRow(tid)); + scanner.fetchColumnFamily(RepoColumnFamily.NAME); + return StreamSupport.stream(scanner.spliterator(), false).sorted(repoComparator).map(e -> { + @SuppressWarnings("unchecked") + var repo = (ReadOnlyRepo<T>) deserialize(e.getValue().get()); + return repo; + }).collect(Collectors.toList()); + }); + } + + @Override + public Serializable getTransactionInfo(TxInfo txInfo) { + verifyReserved(false); + + try (Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY)) { + scanner.setRange(new Range("tx_" + FastFormat.toHexString(tid))); + + final ColumnFQ cq; + switch (txInfo) { + case TX_NAME: + cq = TxInfoColumnFamily.TX_NAME_COLUMN; + break; + case AUTO_CLEAN: + cq = TxInfoColumnFamily.AUTO_CLEAN_COLUMN; + break; + case EXCEPTION: + cq = TxInfoColumnFamily.EXCEPTION_COLUMN; + break; + case RETURN_VALUE: + cq = TxInfoColumnFamily.RETURN_VALUE_COLUMN; + break; + default: + throw new IllegalArgumentException("Unexpected TxInfo type " + txInfo); + } + scanner.fetchColumn(cq.getColumnFamily(), cq.getColumnQualifier()); + + return StreamSupport.stream(scanner.spliterator(), false) + .map(e -> deserializeTxInfo(txInfo, e.getValue().get())).findFirst().orElse(null); + } catch (TableNotFoundException e) { + throw new IllegalStateException(tableName + " not found!", e); + } + } + + @Override + public long timeCreated() { + verifyReserved(false); + + return scanTx(scanner -> { + scanner.setRange(getRow(tid)); + scanner.fetchColumn(TxColumnFamily.CREATE_TIME_COLUMN.getColumnFamily(), + TxColumnFamily.CREATE_TIME_COLUMN.getColumnQualifier()); Review Comment: Can do the following to shorten the code. ```suggestion TxColumnFamily.CREATE_TIME_COLUMN.fetch(scanner); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
