keith-turner commented on code in PR #4452:
URL: https://github.com/apache/accumulo/pull/4452#discussion_r1569348973
##########
core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java:
##########
@@ -1555,5 +1561,42 @@ private void setInterruptFlagInternal(AtomicBoolean
flag) {
public void setCacheProvider(CacheProvider cacheProvider) {
reader.setCacheProvider(cacheProvider);
}
+
+ /**
+ * Returns an estimate of the number of entries that overlap the given
extent. This is an
+ * estimate because the extent may or may not entirely overlap with each
of the index entries
+ * included in the count. Will never underestimate but may overestimate.
+ *
+ * @param extent the key extent
+ * @return the estimate
+ */
+ @Override
+ public long estimateEntries(KeyExtent extent) throws IOException {
+ long totalEntries = 0;
+ Key startKey = extent.toDataRange().getStartKey();
+ IndexEntry indexEntry;
+
+ for (LocalityGroupReader lgr : currentReaders) {
+ boolean prevEntryOverlapped = false;
+ var indexIter = startKey == null ? lgr.getIndex() :
lgr.index.lookup(startKey);
+
+ while (indexIter.hasNext()) {
+ indexEntry = indexIter.next();
+ if (extent.contains(indexEntry.getKey().getRow())) {
+ totalEntries += indexEntry.getNumEntries();
+ prevEntryOverlapped = true;
+ } else if (prevEntryOverlapped) {
+ // The last index entry included in the count is the one after the
last contained by the
+ // extent. This is because it is possible for the extent to
overlap this index entry
+ // but there is no way to check whether it does or not. The index
entry only contains
+ // info about the last key, but the extent may overlap but not
with the last key.
+ totalEntries += indexEntry.getNumEntries();
+ prevEntryOverlapped = false;
Review Comment:
could break out early here.
```suggestion
prevEntryOverlapped = false;
break;
```
##########
test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java:
##########
@@ -97,14 +104,90 @@ public void testProgress() throws Exception {
client.tableOperations().attachIterator(table1, setting,
EnumSet.of(IteratorUtil.IteratorScope.majc));
log.info("Compacting table");
- compact(client, table1, 2, "DCQ1", true);
+ compact(client, table1, 2, QUEUE1, true);
verify(client, table1, 2, ROWS);
log.info("Done Compacting table");
compactionFinished.set(true);
checkerThread.join();
verifyProgress();
+ } finally {
+ getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR);
+ }
+ }
+
+ @Test
+ public void testProgressWithBulkImport() throws Exception {
+ /*
+ * Tests the progress of an external compaction done on a table with bulk
imported files.
+ * Progress should stay 0-100. There was previously a bug with the
Compactor showing a >100%
+ * progress for compactions with bulk import files.
+ */
+ String[] tableNames = getUniqueNames(2);
+ String tableName1 = tableNames[0];
+ String tableName2 = tableNames[1];
+
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+ log.info("Creating table " + tableName1);
+ createTable(client, tableName1, "cs1");
+ log.info("Creating table " + tableName2);
+ createTable(client, tableName2, "cs1");
+ log.info("Writing " + ROWS + " rows to table " + tableName1);
+ writeData(client, tableName1, ROWS);
+ log.info("Writing " + ROWS + " rows to table " + tableName2);
+ writeData(client, tableName2, ROWS);
+ // This is done to avoid system compactions
+ client.tableOperations().setProperty(tableName1,
Property.TABLE_MAJC_RATIO.getKey(), "1000");
+ client.tableOperations().setProperty(tableName2,
Property.TABLE_MAJC_RATIO.getKey(), "1000");
+
+
getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class);
+ getCluster().getClusterControl().startCompactors(Compactor.class, 1,
QUEUE1);
+
+ String dir = getDir(client, tableName1);
+
+ log.info("Bulk importing files in dir " + dir + " to table " +
tableName2);
+ client.tableOperations().importDirectory(dir).to(tableName2).load();
Review Comment:
What is the intent of this? It seems like it will bulk import files from a
dir in table 1 into table 2. Bulk import moves files so that would delete
files from table1, but table1 would still have refs to those files in the
metadata table. So seems like it will leave table1 broken. Not a change for
this PR, but thinking we should update bulk import to throw an exception if the
bulk source dir is under a configured accumulo volume dir.
##########
core/src/main/java/org/apache/accumulo/core/file/rfile/RFile.java:
##########
@@ -1555,5 +1561,42 @@ private void setInterruptFlagInternal(AtomicBoolean
flag) {
public void setCacheProvider(CacheProvider cacheProvider) {
reader.setCacheProvider(cacheProvider);
}
+
+ /**
+ * Returns an estimate of the number of entries that overlap the given
extent. This is an
Review Comment:
Could move this javadoc up to FileSKVIterator.estimateEntries(). Its nice
javadoc, putting it there would make it more widely visible.
##########
test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java:
##########
@@ -97,14 +104,90 @@ public void testProgress() throws Exception {
client.tableOperations().attachIterator(table1, setting,
EnumSet.of(IteratorUtil.IteratorScope.majc));
log.info("Compacting table");
- compact(client, table1, 2, "DCQ1", true);
+ compact(client, table1, 2, QUEUE1, true);
verify(client, table1, 2, ROWS);
log.info("Done Compacting table");
compactionFinished.set(true);
checkerThread.join();
verifyProgress();
+ } finally {
+ getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR);
+ }
+ }
+
+ @Test
+ public void testProgressWithBulkImport() throws Exception {
+ /*
+ * Tests the progress of an external compaction done on a table with bulk
imported files.
+ * Progress should stay 0-100. There was previously a bug with the
Compactor showing a >100%
+ * progress for compactions with bulk import files.
+ */
+ String[] tableNames = getUniqueNames(2);
+ String tableName1 = tableNames[0];
+ String tableName2 = tableNames[1];
+
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+ log.info("Creating table " + tableName1);
+ createTable(client, tableName1, "cs1");
+ log.info("Creating table " + tableName2);
+ createTable(client, tableName2, "cs1");
+ log.info("Writing " + ROWS + " rows to table " + tableName1);
+ writeData(client, tableName1, ROWS);
+ log.info("Writing " + ROWS + " rows to table " + tableName2);
+ writeData(client, tableName2, ROWS);
+ // This is done to avoid system compactions
+ client.tableOperations().setProperty(tableName1,
Property.TABLE_MAJC_RATIO.getKey(), "1000");
+ client.tableOperations().setProperty(tableName2,
Property.TABLE_MAJC_RATIO.getKey(), "1000");
+
+
getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class);
+ getCluster().getClusterControl().startCompactors(Compactor.class, 1,
QUEUE1);
+
+ String dir = getDir(client, tableName1);
+
+ log.info("Bulk importing files in dir " + dir + " to table " +
tableName2);
+ client.tableOperations().importDirectory(dir).to(tableName2).load();
+ log.info("Finished bulk import");
+
+ log.info("Starting a compaction progress checker thread");
+ Thread checkerThread = startChecker();
+ checkerThread.start();
+
+ log.info("Attaching a slow iterator to table " + tableName2);
+ IteratorSetting setting = new IteratorSetting(50, "Slow",
SlowIterator.class);
+ SlowIterator.setSleepTime(setting, 1);
+ client.tableOperations().attachIterator(tableName2, setting,
Review Comment:
Instead of adding the iterator to the table could set it on the
CompactionConfig object. Setting it on the table is eventually consistent, so
its not certain the compaction will see the iterator. If the iterator is set
on the compaction config, then it will be seen.
##########
test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompactionProgressIT.java:
##########
@@ -97,14 +104,90 @@ public void testProgress() throws Exception {
client.tableOperations().attachIterator(table1, setting,
EnumSet.of(IteratorUtil.IteratorScope.majc));
log.info("Compacting table");
- compact(client, table1, 2, "DCQ1", true);
+ compact(client, table1, 2, QUEUE1, true);
verify(client, table1, 2, ROWS);
log.info("Done Compacting table");
compactionFinished.set(true);
checkerThread.join();
verifyProgress();
+ } finally {
+ getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR);
+ }
+ }
+
+ @Test
+ public void testProgressWithBulkImport() throws Exception {
+ /*
+ * Tests the progress of an external compaction done on a table with bulk
imported files.
+ * Progress should stay 0-100. There was previously a bug with the
Compactor showing a >100%
+ * progress for compactions with bulk import files.
+ */
+ String[] tableNames = getUniqueNames(2);
+ String tableName1 = tableNames[0];
+ String tableName2 = tableNames[1];
+
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
Review Comment:
Does this test fail w/o the new changes?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]