keith-turner closed pull request #773: fixes #518 ignore non rfiles in new bulk 
import
URL: https://github.com/apache/accumulo/pull/773
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/BulkImport.java 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/BulkImport.java
index 426cf0b2a0..3d8da28b42 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/BulkImport.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/BulkImport.java
@@ -312,7 +312,7 @@ private static Text nextRow(Text row) {
     }
   }
 
-  private static Map<String,Long> getFileLenMap(FileStatus[] statuses) {
+  private static Map<String,Long> getFileLenMap(List<FileStatus> statuses) {
     HashMap<String,Long> fileLens = new HashMap<>();
     for (FileStatus status : statuses) {
       fileLens.put(status.getPath().getName(), status.getLen());
@@ -322,7 +322,7 @@ private static Text nextRow(Text row) {
     return fileLens;
   }
 
-  private static Cache<String,Long> getPopulatedFileLenCache(Path dir, 
FileStatus[] statuses) {
+  private static Cache<String,Long> getPopulatedFileLenCache(Path dir, 
List<FileStatus> statuses) {
     Map<String,Long> fileLens = getFileLenMap(statuses);
 
     Map<String,Long> absFileLens = new HashMap<>();
@@ -343,8 +343,8 @@ private static Text nextRow(Text row) {
     Map<String,List<Destination>> fileDestinations = 
plan.getDestinations().stream()
         .collect(groupingBy(Destination::getFileName));
 
-    FileStatus[] statuses = fs.listStatus(srcPath,
-        p -> !p.getName().equals(Constants.BULK_LOAD_MAPPING));
+    List<FileStatus> statuses = filterInvalid(
+        fs.listStatus(srcPath, p -> 
!p.getName().equals(Constants.BULK_LOAD_MAPPING)));
 
     Map<String,Long> fileLens = getFileLenMap(statuses);
 
@@ -447,13 +447,47 @@ private Text toText(byte[] row) {
 
   }
 
+  private static List<FileStatus> filterInvalid(FileStatus[] files) {
+    ArrayList<FileStatus> fileList = new ArrayList<>(files.length);
+
+    for (FileStatus fileStatus : files) {
+
+      String fname = fileStatus.getPath().getName();
+
+      if (fname.equals("_SUCCESS") || fname.equals("_logs")) {
+        log.debug("Ignoring file likely created by map reduce : {}", 
fileStatus.getPath());
+        continue;
+      }
+
+      if (fileStatus.isDirectory()) {
+        log.warn("{} is a directory, ignoring.", fileStatus.getPath());
+        continue;
+      }
+
+      String sa[] = fname.split("\\.");
+      String extension = "";
+      if (sa.length > 1) {
+        extension = sa[sa.length - 1];
+      }
+
+      if (!FileOperations.getValidExtensions().contains(extension)) {
+        log.warn("{} does not have a valid extension, ignoring", 
fileStatus.getPath());
+        continue;
+      }
+
+      fileList.add(fileStatus);
+    }
+
+    return fileList;
+  }
+
   public static SortedMap<KeyExtent,Bulk.Files> 
computeFileToTabletMappings(FileSystem fs,
       Table.ID tableId, Path dirPath, Executor executor, ClientContext 
context) throws IOException {
 
     KeyExtentCache extentCache = new ConcurrentKeyExtentCache(tableId, 
context);
 
-    FileStatus[] files = fs.listStatus(dirPath,
-        p -> !p.getName().equals(Constants.BULK_LOAD_MAPPING));
+    List<FileStatus> files = filterInvalid(
+        fs.listStatus(dirPath, p -> 
!p.getName().equals(Constants.BULK_LOAD_MAPPING)));
 
     // we know all of the file lens, so construct a cache and populate it in 
order to avoid later
     // trips to the namenode
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java 
b/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java
index 34f2707ec6..24fd475522 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkLoadIT.java
@@ -59,6 +59,7 @@
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
@@ -231,6 +232,11 @@ private void testBulkFile(boolean offline, boolean 
usePlan) throws Exception {
         hashes.put(endRow, new HashSet<>());
       }
 
+      // Add a junk file, should be ignored
+      FSDataOutputStream out = fs.create(new Path(dir, "junk"));
+      out.writeChars("ABCDEFG\n");
+      out.close();
+
       // 1 Tablet 0333-null
       String h1 = writeData(dir + "/f1.", aconf, 0, 333);
       hashes.get("0333").add(h1);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to