keith-turner closed pull request #773: fixes #518 ignore non rfiles in new bulk import URL: https://github.com/apache/accumulo/pull/773
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/BulkImport.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/BulkImport.java index 426cf0b2a0..3d8da28b42 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/BulkImport.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/BulkImport.java @@ -312,7 +312,7 @@ private static Text nextRow(Text row) { } } - private static Map<String,Long> getFileLenMap(FileStatus[] statuses) { + private static Map<String,Long> getFileLenMap(List<FileStatus> statuses) { HashMap<String,Long> fileLens = new HashMap<>(); for (FileStatus status : statuses) { fileLens.put(status.getPath().getName(), status.getLen()); @@ -322,7 +322,7 @@ private static Text nextRow(Text row) { return fileLens; } - private static Cache<String,Long> getPopulatedFileLenCache(Path dir, FileStatus[] statuses) { + private static Cache<String,Long> getPopulatedFileLenCache(Path dir, List<FileStatus> statuses) { Map<String,Long> fileLens = getFileLenMap(statuses); Map<String,Long> absFileLens = new HashMap<>(); @@ -343,8 +343,8 @@ private static Text nextRow(Text row) { Map<String,List<Destination>> fileDestinations = plan.getDestinations().stream() .collect(groupingBy(Destination::getFileName)); - FileStatus[] statuses = fs.listStatus(srcPath, - p -> !p.getName().equals(Constants.BULK_LOAD_MAPPING)); + List<FileStatus> statuses = filterInvalid( + fs.listStatus(srcPath, p -> !p.getName().equals(Constants.BULK_LOAD_MAPPING))); Map<String,Long> fileLens = getFileLenMap(statuses); @@ -447,13 +447,47 @@ private Text toText(byte[] row) { } + private static List<FileStatus> filterInvalid(FileStatus[] files) { + ArrayList<FileStatus> fileList = new ArrayList<>(files.length); + + for (FileStatus fileStatus : files) { + + String fname = fileStatus.getPath().getName(); + + if (fname.equals("_SUCCESS") || fname.equals("_logs")) { + log.debug("Ignoring file likely created by map reduce : {}", fileStatus.getPath()); + continue; + } + + if (fileStatus.isDirectory()) { + log.warn("{} is a directory, ignoring.", fileStatus.getPath()); + continue; + } + + String sa[] = fname.split("\\."); + String extension = ""; + if (sa.length > 1) { + extension = sa[sa.length - 1]; + } + + if (!FileOperations.getValidExtensions().contains(extension)) { + log.warn("{} does not have a valid extension, ignoring", fileStatus.getPath()); + continue; + } + + fileList.add(fileStatus); + } + + return fileList; + } + public static SortedMap<KeyExtent,Bulk.Files> computeFileToTabletMappings(FileSystem fs, Table.ID tableId, Path dirPath, Executor executor, ClientContext context) throws IOException { KeyExtentCache extentCache = new ConcurrentKeyExtentCache(tableId, context); - FileStatus[] files = fs.listStatus(dirPath, - p -> !p.getName().equals(Constants.BULK_LOAD_MAPPING)); + List<FileStatus> files = filterInvalid( + fs.listStatus(dirPath, p -> !p.getName().equals(Constants.BULK_LOAD_MAPPING))); // we know all of the file lens, so construct a cache and populate it in order to avoid later // trips to the namenode diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java index 34f2707ec6..24fd475522 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java @@ -59,6 +59,7 @@ import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; @@ -231,6 +232,11 @@ private void testBulkFile(boolean offline, boolean usePlan) throws Exception { hashes.put(endRow, new HashSet<>()); } + // Add a junk file, should be ignored + FSDataOutputStream out = fs.create(new Path(dir, "junk")); + out.writeChars("ABCDEFG\n"); + out.close(); + // 1 Tablet 0333-null String h1 = writeData(dir + "/f1.", aconf, 0, 333); hashes.get("0333").add(h1); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services