DomGarguilo commented on code in PR #4162:
URL: https://github.com/apache/accumulo/pull/4162#discussion_r1454046137
##########
test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java:
##########
@@ -621,6 +623,65 @@ public void testManyFiles() throws Exception {
}
}
+ @Test
+ public void testConcurrentCompactions() throws Exception {
+ // Tests compactions running concurrently with bulk import to ensure that
data is not bulk
+ // imported twice. Doing a large number of bulk imports should naturally
cause compactions to
+ // happen. This test ensures that compactions running concurrently with
bulk import does not
+ // cause duplicate imports of a files. For example if a files is imported
into a tablet and then
+ // compacted away then the file should not be imported again by the FATE
operation doing the
+ // bulk import. The test is structured in such a way that duplicate
imports would be detected.
+
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
+ c.tableOperations().delete(tableName);
+ // Create table without versioning iterator. This done to detect the
same file being imported
+ // more than once.
+ c.tableOperations().create(tableName, new
NewTableConfiguration().withoutDefaultIterators());
+
+ addSplits(c, tableName, "0999 1999 2999 3999 4999 5999 6999 7999 8999");
+
+ String dir = getDir("/testBulkFile-");
+
+ final int N = 100;
+
+ // Do N bulk imports of the exact same data.
+ for (int i = 0; i < N; i++) {
+ // Create 10 files for the bulk import.
+ for (int f = 0; f < 10; f++) {
+ writeData(dir + "/f" + f + ".", aconf, f * 1000, (f + 1) * 1000 - 1);
+ }
+
c.tableOperations().importDirectory(dir).to(tableName).tableTime(true).load();
+ getCluster().getFileSystem().delete(new Path(dir), true);
+ }
Review Comment:
```suggestion
// Do N bulk imports of the exact same data in parallel.
IntStream.range(0, N).boxed().parallel().forEach(i -> {
String iterationDir = dir + "/iteration" + i;
try {
// Create 10 files for the bulk import.
for (int f = 0; f < 10; f++) {
writeData(iterationDir + "/f" + f + ".", aconf, f * 1000, (f +
1) * 1000 - 1);
}
c.tableOperations().importDirectory(iterationDir).to(tableName).tableTime(true).load();
getCluster().getFileSystem().delete(new Path(iterationDir), true);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
```
Not sure if its a good idea or not but could do something like this to kick
off the bulk imports in parallel.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]