This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 8a69501490 Add more conditions to AccumuloStore (#4242) 8a69501490 is described below commit 8a695014908b3e78197925527be4be37ec01e126 Author: Dom G <domgargu...@apache.org> AuthorDate: Wed Feb 21 10:39:10 2024 -0500 Add more conditions to AccumuloStore (#4242) * Add require status condition to AccumuloStore.pop(), delete() and push() * Add tests to make sure statuses are checked --- .../accumulo/core/fate/accumulo/AccumuloStore.java | 10 +- .../core/fate/accumulo/StatusMappingIterator.java | 2 + .../test/fate/accumulo/AccumuloStoreIT.java | 108 ++++++++++++++++++++- .../accumulo/test/fate/accumulo/FateStoreIT.java | 5 + 4 files changed, 121 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java index 7fd4b967cb..8ae222c61c 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java @@ -351,7 +351,8 @@ public class AccumuloStore<T> extends AbstractFateStore<T> { throw new StackOverflowException("Repo stack size too large"); } - FateMutator<T> fateMutator = newMutator(fateId); + FateMutator<T> fateMutator = + newMutator(fateId).requireStatus(TStatus.IN_PROGRESS, TStatus.NEW); fateMutator.putRepo(top.map(t -> t + 1).orElse(1), repo).mutate(); } @@ -360,7 +361,8 @@ public class AccumuloStore<T> extends AbstractFateStore<T> { verifyReserved(true); Optional<Integer> top = findTop(); - top.ifPresent(t -> newMutator(fateId).deleteRepo(t).mutate()); + top.ifPresent( + t -> newMutator(fateId).requireStatus(TStatus.FAILED_IN_PROGRESS).deleteRepo(t).mutate()); } @Override @@ -384,7 +386,9 @@ public class AccumuloStore<T> extends AbstractFateStore<T> { public void delete() { verifyReserved(true); - newMutator(fateId).delete().mutate(); + var mutator = newMutator(fateId); + mutator.requireStatus(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL, TStatus.FAILED); + mutator.delete().mutate(); } private Optional<Integer> findTop() { diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/StatusMappingIterator.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/StatusMappingIterator.java index d7dc4fa22c..073f879318 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/StatusMappingIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/StatusMappingIterator.java @@ -65,6 +65,8 @@ public class StatusMappingIterator implements SortedKeyValueIterator<Key,Value> if (options.containsKey(STATUS_SET_KEY)) { String[] statuses = decodeStatuses(options.get(STATUS_SET_KEY)); acceptableStatuses.addAll(Arrays.asList(statuses)); + } else { + throw new IllegalArgumentException("Expected option " + STATUS_SET_KEY + " to be set."); } } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java index af9280f850..a501526cba 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java @@ -18,6 +18,7 @@ */ package org.apache.accumulo.test.fate.accumulo; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -28,14 +29,28 @@ import java.util.TreeSet; import java.util.stream.Collectors; import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.fate.FateId; import org.apache.accumulo.core.fate.FateInstanceType; +import org.apache.accumulo.core.fate.FateStore; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; import org.apache.accumulo.core.fate.accumulo.AccumuloStore; +import org.apache.accumulo.core.fate.accumulo.schema.FateSchema; import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.test.fate.FateIT; +import org.apache.hadoop.io.Text; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +69,7 @@ public class AccumuloStoreIT extends SharedMiniClusterBase { SharedMiniClusterBase.stopMiniCluster(); } - private static class TestAccumuloStore extends AccumuloStore<String> { + private static class TestAccumuloStore extends AccumuloStore<FateIT.TestEnv> { private final Iterator<FateId> fateIdIterator; // use the list of fateIds to simulate collisions on fateIds @@ -71,6 +86,10 @@ public class AccumuloStoreIT extends SharedMiniClusterBase { return FateId.from(fateInstanceType, -1L); } } + + public TStatus getStatus(FateId fateId) { + return _getStatus(fateId); + } } @Test @@ -97,4 +116,91 @@ public class AccumuloStoreIT extends SharedMiniClusterBase { assertThrows(IllegalStateException.class, store::create); } } + + @Nested + class TestStatusEnforcement { + + String tableName; + ClientContext client; + FateId fateId; + TestAccumuloStore store; + FateStore.FateTxStore<FateIT.TestEnv> txStore; + + @BeforeEach + public void setup() throws Exception { + client = (ClientContext) Accumulo.newClient().from(getClientProps()).build(); + tableName = getUniqueNames(1)[0]; + client.tableOperations().create(tableName); + fateId = FateId.from(fateInstanceType, 1L); + store = new TestAccumuloStore(client, tableName, List.of(fateId)); + store.create(); + txStore = store.reserve(fateId); + } + + @AfterEach + public void teardown() throws Exception { + client.close(); + } + + private void testOperationWithStatuses(Runnable beforeOperation, Executable operation, + Set<ReadOnlyFateStore.TStatus> acceptableStatuses) throws Exception { + for (ReadOnlyFateStore.TStatus status : ReadOnlyFateStore.TStatus.values()) { + // Run any needed setup for the operation before each iteration + beforeOperation.run(); + + injectStatus(client, tableName, fateId, status); + assertEquals(status, store.getStatus(fateId)); + if (!acceptableStatuses.contains(status)) { + assertThrows(IllegalStateException.class, operation, + "Expected operation to fail with status " + status + " but it did not"); + } else { + assertDoesNotThrow(operation, + "Expected operation to succeed with status " + status + " but it did not"); + } + } + } + + @Test + public void push() throws Exception { + testOperationWithStatuses(() -> {}, // No special setup needed for push + () -> txStore.push(new FateIT.TestRepo("testOp")), + Set.of(ReadOnlyFateStore.TStatus.IN_PROGRESS, ReadOnlyFateStore.TStatus.NEW)); + } + + @Test + public void pop() throws Exception { + testOperationWithStatuses(() -> { + // Setup for pop: Ensure there something to pop by first pushing + try { + injectStatus(client, tableName, fateId, ReadOnlyFateStore.TStatus.NEW); + txStore.push(new FateIT.TestRepo("testOp")); + } catch (Exception e) { + throw new RuntimeException("Failed to setup for pop", e); + } + }, txStore::pop, Set.of(ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS)); + } + + @Test + public void delete() throws Exception { + testOperationWithStatuses(() -> {}, // No special setup needed for delete + txStore::delete, + Set.of(ReadOnlyFateStore.TStatus.NEW, ReadOnlyFateStore.TStatus.SUBMITTED, + ReadOnlyFateStore.TStatus.SUCCESSFUL, ReadOnlyFateStore.TStatus.FAILED)); + } + } + + /** + * Inject a status into the status col of the fate store table for a given transaction id. + */ + private void injectStatus(ClientContext client, String table, FateId fateId, + ReadOnlyFateStore.TStatus status) throws TableNotFoundException { + try (BatchWriter writer = client.createBatchWriter(table)) { + Mutation mutation = new Mutation(new Text("tx_" + fateId.getHexTid())); + FateSchema.TxColumnFamily.STATUS_COLUMN.put(mutation, new Value(status.name())); + writer.addMutation(mutation); + } catch (MutationsRejectedException e) { + throw new RuntimeException(e); + } + } + } diff --git a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java index 181a21b9c6..deda053717 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java @@ -99,6 +99,7 @@ public abstract class FateStoreIT extends SharedMiniClusterBase implements FateT assertEquals(1, store.list().count()); // Push a test FATE op and verify we can read it back + txStore.setStatus(TStatus.IN_PROGRESS); txStore.push(new TestRepo("testOp")); TestRepo op = (TestRepo) txStore.top(); assertNotNull(op); @@ -114,6 +115,7 @@ public abstract class FateStoreIT extends SharedMiniClusterBase implements FateT // Try setting a second test op to test getStack() // when listing or popping TestOperation2 should be first assertEquals(1, txStore.getStack().size()); + txStore.setStatus(TStatus.IN_PROGRESS); txStore.push(new TestOperation2()); // test top returns TestOperation2 ReadOnlyRepo<TestEnv> top = txStore.top(); @@ -126,6 +128,7 @@ public abstract class FateStoreIT extends SharedMiniClusterBase implements FateT assertEquals(TestRepo.class, ops.get(1).getClass()); // test pop, TestOperation should be left + txStore.setStatus(TStatus.FAILED_IN_PROGRESS); // needed to satisfy the condition on pop txStore.pop(); ops = txStore.getStack(); assertEquals(1, ops.size()); @@ -136,8 +139,10 @@ public abstract class FateStoreIT extends SharedMiniClusterBase implements FateT assertEquals(2, store.list().count()); // test delete + txStore.setStatus(TStatus.SUCCESSFUL); // needed to satisfy the condition on delete txStore.delete(); assertEquals(1, store.list().count()); + txStore2.setStatus(TStatus.SUCCESSFUL); // needed to satisfy the condition on delete txStore2.delete(); assertEquals(0, store.list().count()); }