This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.9 by this push:
     new 3b9dadc  Fix idempotency bug in importtable (#1555)
3b9dadc is described below

commit 3b9dadce9d62d3432fabd995892568f63816420f
Author: Arvind Shyamsundar <arvin...@apache.org>
AuthorDate: Thu Mar 12 11:02:45 2020 -0700

    Fix idempotency bug in importtable (#1555)
    
    * Fix idempotency bug in importtable
    The previous implementation would fail when an interrupted importtable
    operation was retried by FATE, as it would not find (already moved)
    files in the exported directory. Additionally, this commit also removes
    a chatty fs.exists() check.
    
    * Use Set operations to streamline code
    This code uses set operations to validate if there are missing files in
    the exported (source) directory. In addition the usage of sets improves
    perf compared to the lambda scanning the file status arrays.
    
    * Fix formatting and use inbuilt Java joiner
---
 .../master/tableOps/MoveExportedFiles.java         | 46 +++++++++++++++++-----
 1 file changed, 36 insertions(+), 10 deletions(-)

diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/tableOps/MoveExportedFiles.java
 
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/MoveExportedFiles.java
index 667ccfc..f13e1c5 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/tableOps/MoveExportedFiles.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/MoveExportedFiles.java
@@ -17,7 +17,12 @@
 package org.apache.accumulo.master.tableOps;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import 
org.apache.accumulo.core.client.impl.AcceptableThriftTableOperationException;
 import org.apache.accumulo.core.client.impl.thrift.TableOperation;
@@ -30,6 +35,8 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Sets;
+
 class MoveExportedFiles extends MasterRepo {
   private static final Logger log = 
LoggerFactory.getLogger(MoveExportedFiles.class);
 
@@ -48,21 +55,40 @@ class MoveExportedFiles extends MasterRepo {
 
       Map<String,String> fileNameMappings = 
PopulateMetadataTable.readMappingFile(fs, tableInfo);
 
-      for (String oldFileName : fileNameMappings.keySet()) {
-        if (!fs.exists(new Path(tableInfo.exportDir, oldFileName))) {
-          throw new AcceptableThriftTableOperationException(tableInfo.tableId, 
tableInfo.tableName,
-              TableOperation.IMPORT, TableOperationExceptionType.OTHER,
-              "File referenced by exported table does not exists " + 
oldFileName);
-        }
+      FileStatus[] exportedFiles = fs.listStatus(new 
Path(tableInfo.exportDir));
+      FileStatus[] importedFiles = fs.listStatus(new 
Path(tableInfo.importDir));
+
+      Function<FileStatus,String> fileStatusName = fstat -> 
fstat.getPath().getName();
+
+      Set<String> importing = Arrays.stream(exportedFiles).map(fileStatusName)
+          .map(fileNameMappings::get).collect(Collectors.toSet());
+
+      Set<String> imported =
+          
Arrays.stream(importedFiles).map(fileStatusName).collect(Collectors.toSet());
+
+      if (log.isDebugEnabled()) {
+        log.debug("Files already present in imported (target) directory: {}",
+            imported.stream().collect(Collectors.joining(",")));
       }
 
-      FileStatus[] files = fs.listStatus(new Path(tableInfo.exportDir));
+      Set<String> missingFiles = Sets.difference(new 
HashSet<String>(fileNameMappings.values()),
+          new HashSet<String>(Sets.union(importing, imported)));
+
+      if (!missingFiles.isEmpty()) {
+        throw new AcceptableThriftTableOperationException(tableInfo.tableId, 
tableInfo.tableName,
+            TableOperation.IMPORT, TableOperationExceptionType.OTHER,
+            "Missing source files corresponding to files "
+                + missingFiles.stream().collect(Collectors.joining(",")));
+      }
 
-      for (FileStatus fileStatus : files) {
+      for (FileStatus fileStatus : exportedFiles) {
         String newName = fileNameMappings.get(fileStatus.getPath().getName());
 
-        if (newName != null)
-          fs.rename(fileStatus.getPath(), new Path(tableInfo.importDir, 
newName));
+        if (newName != null) {
+          Path newPath = new Path(tableInfo.importDir, newName);
+          log.debug("Renaming file {} to {}", fileStatus.getPath(), newPath);
+          fs.rename(fileStatus.getPath(), newPath);
+        }
       }
 
       return new FinishImportTable(tableInfo);

Reply via email to