EdColeman commented on code in PR #3639: URL: https://github.com/apache/accumulo/pull/3639#discussion_r1270896502
########## test/src/main/java/org/apache/accumulo/test/ScanConsistencyIT.java: ########## @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.harness.AccumuloITBase.SUNNY_DAY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.client.admin.CompactionConfig; +import org.apache.accumulo.core.client.rfile.RFile; +import org.apache.accumulo.core.client.rfile.RFileWriter; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.Filter; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Collections2; +import com.google.common.collect.MoreCollectors; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; + +/** + * This test verifies that scans will always see data written before the scan started even when + * there are concurrent scans, writes, and table operations running. + */ +@Tag(SUNNY_DAY) +public class ScanConsistencyIT extends AccumuloClusterHarness { + + private static final Logger log = LoggerFactory.getLogger(ScanConsistencyIT.class); + + @SuppressFBWarnings(value = {"PREDICTABLE_RANDOM", "DMI_RANDOM_USED_ONLY_ONCE"}, + justification = "predictable random is ok for testing") + @Test + public void testConcurrentScanConsistency() throws Exception { + final String table = this.getUniqueNames(1)[0]; + + /** + * Tips for debugging this test when it sees a row that should not exist or does not see a row + * that should exist. + * + * 1. Disable the GC from running for the test. + * + * 2. Modify the test code to print some of the offending rows, just need a few to start + * investigating. + * + * 3. After the test fails, somehow run the static function findRow() passing it the Accumulo + * table directory that the failed test used and one of the problem rows. + * + * 4. Once the files containing the row is found, analyze what happened with those files in the + * servers logs. + */ + // getClusterControl().stopAllServers(ServerType.GARBAGE_COLLECTOR); + + var executor = Executors.newCachedThreadPool(); + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + client.tableOperations().create(table); + + TestContext testContext = new TestContext(client, table, getCluster().getFileSystem(), + getCluster().getTemporaryPath().toString()); + + List<Future<WriteStats>> writeTasks = new ArrayList<>(); + List<Future<ScanStats>> scanTasks = new ArrayList<>(); + + Random random = new Random(); + + int numWriteTask = random.nextInt(10) + 1; + int numsScanTask = random.nextInt(10) + 1; + + for (int i = 0; i < numWriteTask; i++) { + writeTasks.add(executor.submit(new WriteTask(testContext))); + } + + for (int i = 0; i < numsScanTask; i++) { + scanTasks.add(executor.submit(new ScanTask(testContext))); + } + + var tableOpsTask = executor.submit(new TableOpsTask(testContext)); + + // let the concurrent mayhem run for a bit + Thread.sleep(60000); + + // let the threads know to exit + testContext.keepRunning.set(false); + + for (Future<WriteStats> writeTask : writeTasks) { + var stats = writeTask.get(); + log.debug(String.format("Wrote:%,d Bulk imported:%,d Deleted:%,d Bulk deleted:%,d", + stats.written, stats.bulkImported, stats.deleted, stats.bulkDeleted)); + assertTrue(stats.written + stats.bulkImported > 0); + assertTrue(stats.deleted + stats.bulkDeleted > 0); + } + + for (Future<ScanStats> scanTask : scanTasks) { + var stats = scanTask.get(); + log.debug(String.format("Scanned:%,d verified:%,d", stats.scanned, stats.verified)); + assertTrue(stats.verified > 0); + // These scans were running concurrently with writes, so a scan will see more data than what + // was written before the scan started. + assertTrue(stats.scanned > stats.verified); + } + + log.debug(tableOpsTask.get()); + + var stats1 = scanData(testContext, new Range(), false); + var stats2 = scanData(testContext, new Range(), true); + log.debug( + String.format("Final scan, scanned:%,d verified:%,d", stats1.scanned, stats1.verified)); + assertTrue(stats1.verified > 0); + // Should see all expected data now that there are no concurrent writes happening + assertEquals(stats1.scanned, stats1.verified); + assertEquals(stats2.scanned, stats1.scanned); + assertEquals(stats2.verified, stats1.verified); + } finally { + executor.shutdownNow(); + } + } + + /** + * Tracks what data has been written and deleted in an Accumulo table. + */ + private static class DataTracker { + // ConcurrentLinkedQueue was chosen because its safe to iterate over concurrently w/o locking + private final Queue<DataSet> dataSets = new ConcurrentLinkedQueue<>(); + + /** + * Reserves data for scan so that it will not be deleted and returns the data that is expected + * to exist in the table at this point. + */ + public ExpectedScanData beginScan() { + List<DataSet> reservedData = new ArrayList<>(); + + for (var dataSet : dataSets) { + if (dataSet.reserveForScan()) { + reservedData.add(dataSet); + } + } + + return new ExpectedScanData(reservedData); + } + + /** + * add new data that scans should expect to see + */ + public void addExpectedData(List<Mutation> data) { + dataSets.add(new DataSet(data)); + } + + /** + * @return data to delete from the table that is not reserved for scans + */ + public Collection<Mutation> getDeletes() { + DataSet dataSet = dataSets.poll(); + + if (dataSet == null) { + return List.of(); + } + + dataSet.reserveForDelete(); + + return Collections2.transform(dataSet.data, m -> { + Mutation delMutation = new Mutation(m.getRow()); + m.getUpdates() + .forEach(cu -> delMutation.putDelete(cu.getColumnFamily(), cu.getColumnQualifier())); + return delMutation; + }); + } + + public long estimatedRows() { + return dataSets.stream().mapToLong(ds -> ds.data.size()).sum(); + } + } + + private static class DataSet { + private final List<Mutation> data; + + private int activeScans = 0; + + private boolean deleting = false; + + public DataSet(List<Mutation> data) { + this.data = data; + } + + synchronized boolean reserveForScan() { + if (deleting) { + return false; + } + + activeScans++; + + return true; + } + + synchronized void unreserveForScan() { + activeScans--; + Preconditions.checkState(activeScans >= 0); + if (activeScans == 0) { + notify(); + } + } + + synchronized void reserveForDelete() { + Preconditions.checkState(!deleting); + deleting = true; + while (activeScans > 0) { + try { + wait(50); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + Stream<Key> getExpectedData(Range range) { + return data.stream().flatMap(ScanConsistencyIT::toKeys).filter(range::contains); + } + } + + private static Stream<Key> toKeys(Mutation m) { + return m.getUpdates().stream().map(cu -> new Key(m.getRow(), cu.getColumnFamily(), + cu.getColumnQualifier(), cu.getColumnVisibility(), 0L, cu.isDeleted(), false)); + } + + private static class ExpectedScanData implements AutoCloseable { + + private final List<DataSet> reservedData; + + public ExpectedScanData(List<DataSet> reservedData) { + this.reservedData = reservedData; + } + + /** + * @return keys that are expected to exist in the accumulo table + */ + Stream<Key> getExpectedData(Range range) { + return reservedData.stream().flatMap(ds -> ds.getExpectedData(range)); + } + + @Override + public void close() { + reservedData.forEach(DataSet::unreserveForScan); + } + } + + private static class TestContext { + final DataTracker dataTracker = new DataTracker(); + final AccumuloClient client; + final String table; + final AtomicBoolean keepRunning = new AtomicBoolean(true); + final AtomicLong generationCounter = new AtomicLong(0); + final FileSystem fileSystem; + private final String tmpDir; + + private TestContext(AccumuloClient client, String table, FileSystem fs, String tmpDir) { + this.client = client; + this.table = table; + this.fileSystem = fs; + this.tmpDir = tmpDir; + } + } + + private static class ScanStats { + long scanned; + long verified; + + public void add(ScanStats stats) { + scanned += stats.scanned; + verified += stats.verified; + } + } + + private static ScanStats scan(ScannerBase scanner, Set<Key> expected) { + ScanStats stats = new ScanStats(); + for (Map.Entry<Key,Value> entry : scanner) { + stats.scanned++; + Key key = entry.getKey(); + key.setTimestamp(0); + if (expected.remove(key)) { + stats.verified++; + } + } + + assertTrue(expected.isEmpty()); + return stats; + } + + // TODO create multiple ranges for batch scanner + private static ScanStats batchScanData(TestContext tctx, Range range) throws Exception { + try (ExpectedScanData expectedScanData = tctx.dataTracker.beginScan(); + BatchScanner scanner = tctx.client.createBatchScanner(tctx.table)) { + Set<Key> expected = expectedScanData.getExpectedData(range).collect(Collectors.toSet()); + scanner.setRanges(List.of(range)); + return scan(scanner, expected); + } + } + + private static ScanStats scanData(TestContext tctx, Range range, boolean scanIsolated) + throws Exception { + try (ExpectedScanData expectedScanData = tctx.dataTracker.beginScan(); + Scanner scanner = tctx.client.createScanner(tctx.table)) { + Set<Key> expected = expectedScanData.getExpectedData(range).collect(Collectors.toSet()); + scanner.setRange(range); + + Scanner s = scanner; + if (scanIsolated) { + s = new IsolatedScanner(scanner); + } + + return scan(s, expected); + } + } + + private static class ScanTask implements Callable<ScanStats> { + + private final TestContext tctx; + + private ScanTask(TestContext testContext) { + this.tctx = testContext; + } + + @SuppressFBWarnings(value = {"PREDICTABLE_RANDOM", "DMI_RANDOM_USED_ONLY_ONCE"}, + justification = "predictable random is ok for testing") + @Override + public ScanStats call() throws Exception { + ScanStats allStats = new ScanStats(); + + Random random = new Random(); + + while (tctx.keepRunning.get()) { + + Range range; + if (random.nextInt(10) == 0) { + // 1 in 10 chance of doing a full table scan + range = new Range(); + } else { + long start = nextLongAbs(random); + long end = nextLongAbs(random); + + while (end <= start) { + end = nextLongAbs(random); + } + + range = new Range(String.format("%016x", start), String.format("%016x", end)); + } + + int scanChance = random.nextInt(3); + if (scanChance == 0) { + allStats.add(scanData(tctx, range, false)); + } else if (scanChance == 1) { + allStats.add(scanData(tctx, range, true)); + } else { + allStats.add(batchScanData(tctx, range)); + } + } + + return allStats; + } + } + + private static class WriteStats { + long written; + long deleted; + long bulkImported; + long bulkDeleted; + } + + private static class WriteTask implements Callable<WriteStats> { + + private final TestContext tctx; + + private WriteTask(TestContext testContext) { + this.tctx = testContext; + } + + private long bulkImport(Random random, Collection<Mutation> mutations) throws Exception { + + if (mutations.isEmpty()) { + return 0; + } + + Path bulkDir = new Path(tctx.tmpDir + "/" + "bulkimport_" + nextLongAbs(random)); Review Comment: ```suggestion Path bulkDir = new Path(tctx.tmpDir + "/bulkimport_" + nextLongAbs(random)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
