This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push: new d0d7b585ea Prevents bulk import from hanging. (#3044) d0d7b585ea is described below commit d0d7b585ea7bace2f86d5168058aaf5f33eda69c Author: Keith Turner <ktur...@apache.org> AuthorDate: Tue Oct 25 23:24:24 2022 +0100 Prevents bulk import from hanging. (#3044) Fixes a bug where bulk import would hang when the first row of the file was equal to the last row of the first tablet. --- .../clientImpl/bulk/ConcurrentKeyExtentCache.java | 6 +++-- .../core/metadata/schema/TabletsMetadata.java | 24 ++++++++++++++++++-- .../apache/accumulo/test/functional/BulkNewIT.java | 26 ++++++++++++++++++++++ 3 files changed, 52 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java index ec154e7dbe..3f5763c10b 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/bulk/ConcurrentKeyExtentCache.java @@ -88,13 +88,15 @@ class ConcurrentKeyExtentCache implements KeyExtentCache { @VisibleForTesting protected Stream<KeyExtent> lookupExtents(Text row) { - return TabletsMetadata.builder(ctx).forTable(tableId).overlapping(row, null).checkConsistency() - .fetch(PREV_ROW).build().stream().limit(100).map(TabletMetadata::getExtent); + return TabletsMetadata.builder(ctx).forTable(tableId).overlapping(row, true, null) + .checkConsistency().fetch(PREV_ROW).build().stream().limit(100) + .map(TabletMetadata::getExtent); } @Override public KeyExtent lookup(Text row) { while (true) { + KeyExtent ke = getFromCache(row); if (ke != null) return ke; diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java index 36caa39e90..e586b1a6ea 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletsMetadata.java @@ -356,12 +356,19 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable } @Override - public Options overlapping(Text startRow, Text endRow) { - this.range = new KeyExtent(tableId, null, startRow).toMetaRange(); + public Options overlapping(Text startRow, boolean startInclusive, Text endRow) { + var encRow = TabletsSection.encodeRow(tableId, startRow == null ? new Text("") : startRow); + this.range = new Range(encRow, startRow == null ? true : startInclusive, null, true); this.endRow = endRow; + return this; } + @Override + public Options overlapping(Text startRow, Text endRow) { + return overlapping(startRow, false, endRow); + } + @Override public Options saveKeyValues() { this.saveKeyValues = true; @@ -465,8 +472,21 @@ public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable * Limit to tablets that overlap the range {@code (startRow, endRow]}. Can pass null * representing -inf and +inf. The impl creates open ended ranges which may be problematic, see * #813. + * + * <p> + * This method is equivalent to calling {@link #overlapping(Text, boolean, Text)} as + * {@code overlapping(startRow, false, endRow)} + * </p> */ Options overlapping(Text startRow, Text endRow); + + /** + * When {@code startRowInclusive} is true limits to tablets that overlap the range + * {@code [startRow,endRow]}. When {@code startRowInclusive} is false limits to tablets that + * overlap the range {@code (startRow, endRow]}. Can pass null for start and end row + * representing -inf and +inf. + */ + Options overlapping(Text startRow, boolean startRowInclusive, Text endRow); } private static class TabletMetadataIterator implements Iterator<TabletMetadata> { diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java index 927f17993c..afa0afc677 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java @@ -480,6 +480,32 @@ public class BulkNewIT extends SharedMiniClusterBase { } } + /* + * This test imports a file where the first row of the file is equal to the last row of the first + * tablet. There was a bug where this scenario would cause bulk import to hang forever. + */ + @Test + public void testEndOfFirstTablet() throws Exception { + try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) { + String dir = getDir("/testBulkFile-"); + FileSystem fs = getCluster().getFileSystem(); + fs.mkdirs(new Path(dir)); + + addSplits(c, tableName, "0333"); + + var h1 = writeData(dir + "/f1.", aconf, 333, 333); + + c.tableOperations().importDirectory(dir).to(tableName).load(); + + verifyData(c, tableName, 333, 333, false); + + Map<String,Set<String>> hashes = new HashMap<>(); + hashes.put("0333", Set.of(h1)); + hashes.put("null", Set.of()); + verifyMetadata(c, tableName, hashes); + } + } + private void addSplits(AccumuloClient client, String tableName, String splitString) throws Exception { SortedSet<Text> splits = new TreeSet<>();