This is an automated email from the ASF dual-hosted git repository. domgarguilo pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 8b3cc0874b Add support for checking WALs in condition mutations (#3948) 8b3cc0874b is described below commit 8b3cc0874beb43246a152ff3a5ddeabf54a1824f Author: Dom G <domgargu...@apache.org> AuthorDate: Thu Nov 16 13:28:30 2023 -0500 Add support for checking WALs in condition mutations (#3948) * Add support for checking WALs in condition mutations --------- Co-authored-by: Keith Turner <ktur...@apache.org> --- .../accumulo/core/metadata/schema/Ample.java | 2 +- .../metadata/ConditionalTabletMutatorImpl.java | 9 +++ .../test/functional/AmpleConditionalWriterIT.java | 90 ++++++++++++++++++++++ 3 files changed, 100 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index adaa0324dc..74c493236b 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -311,7 +311,7 @@ public interface Ample { /** * This can only be called when {@link #getStatus()} returns something other than * {@link Status#ACCEPTED}. It reads that tablets metadata for a failed conditional mutation. - * This can used used to see why it was rejected. + * This can be used to see why it was not accepted. */ TabletMetadata readMetadata(); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java index a77a357d1f..83a988ca38 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java @@ -20,6 +20,7 @@ package org.apache.accumulo.server.metadata; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.HostingColumnFamily.GOAL_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.COMPACT_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.FLUSH_COLUMN; @@ -29,6 +30,7 @@ import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSec import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN; import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.encodePrevEndRow; +import java.util.HashSet; import java.util.Objects; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -170,6 +172,13 @@ public class ConditionalTabletMutatorImpl extends TabletMutatorBase<Ample.Condit mutation.addCondition(c); } break; + case LOGS: { + Condition c = SetEqualityIterator.createCondition(new HashSet<>(tabletMetadata.getLogs()), + logEntry -> logEntry.getColumnQualifier().toString().getBytes(UTF_8), + LogColumnFamily.NAME); + mutation.addCondition(c); + } + break; case FILES: { // ELASTICITY_TODO compare values? Condition c = SetEqualityIterator.createCondition(tabletMetadata.getFiles(), diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java index 3a7e2d3c9d..e81e6856e5 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/AmpleConditionalWriterIT.java @@ -26,6 +26,7 @@ import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FLUSH_ID; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOADED; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOCATION; +import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.LOGS; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW; import static org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.SELECTED; @@ -40,11 +41,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; +import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -75,9 +78,11 @@ import org.apache.accumulo.core.metadata.schema.SelectedFiles; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; +import org.apache.accumulo.core.metadata.schema.TabletMetadataBuilder; import org.apache.accumulo.core.metadata.schema.TabletOperationId; import org.apache.accumulo.core.metadata.schema.TabletOperationType; import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.server.metadata.AsyncConditionalTabletsMutatorImpl; import org.apache.accumulo.server.metadata.ConditionalTabletsMutatorImpl; @@ -86,6 +91,8 @@ import org.apache.hadoop.io.Text; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import com.google.common.collect.Sets; + public class AmpleConditionalWriterIT extends AccumuloClusterHarness { // ELASTICITY_TODO ensure that all conditional updates are tested @@ -321,6 +328,89 @@ public class AmpleConditionalWriterIT extends AccumuloClusterHarness { } } + @Test + public void testWALs() { + var context = cluster.getServerContext(); + + // Test adding a WAL to a tablet and verifying its presence + String walFilePath = + java.nio.file.Path.of("tserver:8080", UUID.randomUUID().toString()).toString(); + LogEntry originalLogEntry = new LogEntry(walFilePath); + ConditionalTabletsMutatorImpl ctmi = new ConditionalTabletsMutatorImpl(context); + // create a tablet metadata with no write ahead logs + var tmEmptySet = TabletMetadata.builder(e1).build(LOGS); + // tablet should not have any logs to start with so requireSame with the empty logs should pass + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tmEmptySet, LOGS) + .putWal(originalLogEntry).submit(tm -> false); + var results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + Set<LogEntry> expectedLogs = new HashSet<>(); + expectedLogs.add(originalLogEntry); + assertEquals(expectedLogs, new HashSet<>(context.getAmple().readTablet(e1).getLogs()), + "The original LogEntry should be present."); + + // Test adding another WAL and verifying the update + String walFilePath2 = + java.nio.file.Path.of("tserver:8080", UUID.randomUUID().toString()).toString(); + LogEntry newLogEntry = new LogEntry(walFilePath2); + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().putWal(newLogEntry).submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus()); + + // Verify that both the original and new WALs are present + expectedLogs.add(newLogEntry); + HashSet<LogEntry> actualLogs = new HashSet<>(context.getAmple().readTablet(e1).getLogs()); + assertEquals(expectedLogs, actualLogs, "Both original and new LogEntry should be present."); + + String walFilePath3 = + java.nio.file.Path.of("tserver:8080", UUID.randomUUID().toString()).toString(); + LogEntry otherLogEntry = new LogEntry(walFilePath3); + + // create a powerset to ensure all possible subsets fail when using requireSame except the + // expected current state + Set<LogEntry> allLogs = Set.of(originalLogEntry, newLogEntry, otherLogEntry); + Set<Set<LogEntry>> allSubsets = Sets.powerSet(allLogs); + + for (Set<LogEntry> subset : allSubsets) { + // Skip the subset that matches the current state of the tablet + if (subset.equals(expectedLogs)) { + continue; + } + + final TabletMetadataBuilder builder = TabletMetadata.builder(e1); + subset.forEach(builder::putWal); + TabletMetadata tmSubset = builder.build(LOGS); + + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tmSubset, LOGS) + .deleteWal(originalLogEntry).submit(t -> false); + results = ctmi.process(); + + assertEquals(Status.REJECTED, results.get(e1).getStatus()); + + // ensure the operation did not go through + actualLogs = new HashSet<>(context.getAmple().readTablet(e1).getLogs()); + assertEquals(expectedLogs, actualLogs, "Both original and new LogEntry should be present."); + } + + // Test that requiring the current WALs gets accepted when making an update (deleting a WAL in + // this example) + TabletMetadata tm2 = + TabletMetadata.builder(e1).putWal(originalLogEntry).putWal(newLogEntry).build(LOGS); + ctmi = new ConditionalTabletsMutatorImpl(context); + ctmi.mutateTablet(e1).requireAbsentOperation().requireSame(tm2, LOGS) + .deleteWal(originalLogEntry).submit(tm -> false); + results = ctmi.process(); + assertEquals(Status.ACCEPTED, results.get(e1).getStatus(), + "Requiring the current WALs should result in acceptance when making an update."); + + // Verify that the update went through as expected + assertEquals(List.of(newLogEntry), context.getAmple().readTablet(e1).getLogs(), + "Only the new LogEntry should remain after deleting the original."); + } + @Test public void testSelectedFiles() throws Exception { var context = cluster.getServerContext();