This is an automated email from the ASF dual-hosted git repository.

domgarguilo pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 8a69501490 Add more conditions to AccumuloStore (#4242)
8a69501490 is described below

commit 8a695014908b3e78197925527be4be37ec01e126
Author: Dom G <domgargu...@apache.org>
AuthorDate: Wed Feb 21 10:39:10 2024 -0500

    Add more conditions to AccumuloStore (#4242)
    
    * Add require status condition to AccumuloStore.pop(), delete() and push()
    * Add tests to make sure statuses are checked
---
 .../accumulo/core/fate/accumulo/AccumuloStore.java |  10 +-
 .../core/fate/accumulo/StatusMappingIterator.java  |   2 +
 .../test/fate/accumulo/AccumuloStoreIT.java        | 108 ++++++++++++++++++++-
 .../accumulo/test/fate/accumulo/FateStoreIT.java   |   5 +
 4 files changed, 121 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
index 7fd4b967cb..8ae222c61c 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
@@ -351,7 +351,8 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
         throw new StackOverflowException("Repo stack size too large");
       }
 
-      FateMutator<T> fateMutator = newMutator(fateId);
+      FateMutator<T> fateMutator =
+          newMutator(fateId).requireStatus(TStatus.IN_PROGRESS, TStatus.NEW);
       fateMutator.putRepo(top.map(t -> t + 1).orElse(1), repo).mutate();
     }
 
@@ -360,7 +361,8 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
       verifyReserved(true);
 
       Optional<Integer> top = findTop();
-      top.ifPresent(t -> newMutator(fateId).deleteRepo(t).mutate());
+      top.ifPresent(
+          t -> 
newMutator(fateId).requireStatus(TStatus.FAILED_IN_PROGRESS).deleteRepo(t).mutate());
     }
 
     @Override
@@ -384,7 +386,9 @@ public class AccumuloStore<T> extends AbstractFateStore<T> {
     public void delete() {
       verifyReserved(true);
 
-      newMutator(fateId).delete().mutate();
+      var mutator = newMutator(fateId);
+      mutator.requireStatus(TStatus.NEW, TStatus.SUBMITTED, 
TStatus.SUCCESSFUL, TStatus.FAILED);
+      mutator.delete().mutate();
     }
 
     private Optional<Integer> findTop() {
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/StatusMappingIterator.java
 
b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/StatusMappingIterator.java
index d7dc4fa22c..073f879318 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/StatusMappingIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/StatusMappingIterator.java
@@ -65,6 +65,8 @@ public class StatusMappingIterator implements 
SortedKeyValueIterator<Key,Value>
     if (options.containsKey(STATUS_SET_KEY)) {
       String[] statuses = decodeStatuses(options.get(STATUS_SET_KEY));
       acceptableStatuses.addAll(Arrays.asList(statuses));
+    } else {
+      throw new IllegalArgumentException("Expected option " + STATUS_SET_KEY + 
" to be set.");
     }
   }
 
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java
 
b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java
index af9280f850..a501526cba 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/AccumuloStoreIT.java
@@ -18,6 +18,7 @@
  */
 package org.apache.accumulo.test.fate.accumulo;
 
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -28,14 +29,28 @@ import java.util.TreeSet;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.fate.FateId;
 import org.apache.accumulo.core.fate.FateInstanceType;
+import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore;
 import org.apache.accumulo.core.fate.accumulo.AccumuloStore;
+import org.apache.accumulo.core.fate.accumulo.schema.FateSchema;
 import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.test.fate.FateIT;
+import org.apache.hadoop.io.Text;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,7 +69,7 @@ public class AccumuloStoreIT extends SharedMiniClusterBase {
     SharedMiniClusterBase.stopMiniCluster();
   }
 
-  private static class TestAccumuloStore extends AccumuloStore<String> {
+  private static class TestAccumuloStore extends AccumuloStore<FateIT.TestEnv> 
{
     private final Iterator<FateId> fateIdIterator;
 
     // use the list of fateIds to simulate collisions on fateIds
@@ -71,6 +86,10 @@ public class AccumuloStoreIT extends SharedMiniClusterBase {
         return FateId.from(fateInstanceType, -1L);
       }
     }
+
+    public TStatus getStatus(FateId fateId) {
+      return _getStatus(fateId);
+    }
   }
 
   @Test
@@ -97,4 +116,91 @@ public class AccumuloStoreIT extends SharedMiniClusterBase {
       assertThrows(IllegalStateException.class, store::create);
     }
   }
+
+  @Nested
+  class TestStatusEnforcement {
+
+    String tableName;
+    ClientContext client;
+    FateId fateId;
+    TestAccumuloStore store;
+    FateStore.FateTxStore<FateIT.TestEnv> txStore;
+
+    @BeforeEach
+    public void setup() throws Exception {
+      client = (ClientContext) 
Accumulo.newClient().from(getClientProps()).build();
+      tableName = getUniqueNames(1)[0];
+      client.tableOperations().create(tableName);
+      fateId = FateId.from(fateInstanceType, 1L);
+      store = new TestAccumuloStore(client, tableName, List.of(fateId));
+      store.create();
+      txStore = store.reserve(fateId);
+    }
+
+    @AfterEach
+    public void teardown() throws Exception {
+      client.close();
+    }
+
+    private void testOperationWithStatuses(Runnable beforeOperation, 
Executable operation,
+        Set<ReadOnlyFateStore.TStatus> acceptableStatuses) throws Exception {
+      for (ReadOnlyFateStore.TStatus status : 
ReadOnlyFateStore.TStatus.values()) {
+        // Run any needed setup for the operation before each iteration
+        beforeOperation.run();
+
+        injectStatus(client, tableName, fateId, status);
+        assertEquals(status, store.getStatus(fateId));
+        if (!acceptableStatuses.contains(status)) {
+          assertThrows(IllegalStateException.class, operation,
+              "Expected operation to fail with status " + status + " but it 
did not");
+        } else {
+          assertDoesNotThrow(operation,
+              "Expected operation to succeed with status " + status + " but it 
did not");
+        }
+      }
+    }
+
+    @Test
+    public void push() throws Exception {
+      testOperationWithStatuses(() -> {}, // No special setup needed for push
+          () -> txStore.push(new FateIT.TestRepo("testOp")),
+          Set.of(ReadOnlyFateStore.TStatus.IN_PROGRESS, 
ReadOnlyFateStore.TStatus.NEW));
+    }
+
+    @Test
+    public void pop() throws Exception {
+      testOperationWithStatuses(() -> {
+        // Setup for pop: Ensure there something to pop by first pushing
+        try {
+          injectStatus(client, tableName, fateId, 
ReadOnlyFateStore.TStatus.NEW);
+          txStore.push(new FateIT.TestRepo("testOp"));
+        } catch (Exception e) {
+          throw new RuntimeException("Failed to setup for pop", e);
+        }
+      }, txStore::pop, Set.of(ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS));
+    }
+
+    @Test
+    public void delete() throws Exception {
+      testOperationWithStatuses(() -> {}, // No special setup needed for delete
+          txStore::delete,
+          Set.of(ReadOnlyFateStore.TStatus.NEW, 
ReadOnlyFateStore.TStatus.SUBMITTED,
+              ReadOnlyFateStore.TStatus.SUCCESSFUL, 
ReadOnlyFateStore.TStatus.FAILED));
+    }
+  }
+
+  /**
+   * Inject a status into the status col of the fate store table for a given 
transaction id.
+   */
+  private void injectStatus(ClientContext client, String table, FateId fateId,
+      ReadOnlyFateStore.TStatus status) throws TableNotFoundException {
+    try (BatchWriter writer = client.createBatchWriter(table)) {
+      Mutation mutation = new Mutation(new Text("tx_" + fateId.getHexTid()));
+      FateSchema.TxColumnFamily.STATUS_COLUMN.put(mutation, new 
Value(status.name()));
+      writer.addMutation(mutation);
+    } catch (MutationsRejectedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java
index 181a21b9c6..deda053717 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java
@@ -99,6 +99,7 @@ public abstract class FateStoreIT extends 
SharedMiniClusterBase implements FateT
     assertEquals(1, store.list().count());
 
     // Push a test FATE op and verify we can read it back
+    txStore.setStatus(TStatus.IN_PROGRESS);
     txStore.push(new TestRepo("testOp"));
     TestRepo op = (TestRepo) txStore.top();
     assertNotNull(op);
@@ -114,6 +115,7 @@ public abstract class FateStoreIT extends 
SharedMiniClusterBase implements FateT
     // Try setting a second test op to test getStack()
     // when listing or popping TestOperation2 should be first
     assertEquals(1, txStore.getStack().size());
+    txStore.setStatus(TStatus.IN_PROGRESS);
     txStore.push(new TestOperation2());
     // test top returns TestOperation2
     ReadOnlyRepo<TestEnv> top = txStore.top();
@@ -126,6 +128,7 @@ public abstract class FateStoreIT extends 
SharedMiniClusterBase implements FateT
     assertEquals(TestRepo.class, ops.get(1).getClass());
 
     // test pop, TestOperation should be left
+    txStore.setStatus(TStatus.FAILED_IN_PROGRESS); // needed to satisfy the 
condition on pop
     txStore.pop();
     ops = txStore.getStack();
     assertEquals(1, ops.size());
@@ -136,8 +139,10 @@ public abstract class FateStoreIT extends 
SharedMiniClusterBase implements FateT
     assertEquals(2, store.list().count());
 
     // test delete
+    txStore.setStatus(TStatus.SUCCESSFUL); // needed to satisfy the condition 
on delete
     txStore.delete();
     assertEquals(1, store.list().count());
+    txStore2.setStatus(TStatus.SUCCESSFUL); // needed to satisfy the condition 
on delete
     txStore2.delete();
     assertEquals(0, store.list().count());
   }

Reply via email to