This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 1.9 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1.9 by this push: new 3b9dadc Fix idempotency bug in importtable (#1555) 3b9dadc is described below commit 3b9dadce9d62d3432fabd995892568f63816420f Author: Arvind Shyamsundar <arvin...@apache.org> AuthorDate: Thu Mar 12 11:02:45 2020 -0700 Fix idempotency bug in importtable (#1555) * Fix idempotency bug in importtable The previous implementation would fail when an interrupted importtable operation was retried by FATE, as it would not find (already moved) files in the exported directory. Additionally, this commit also removes a chatty fs.exists() check. * Use Set operations to streamline code This code uses set operations to validate if there are missing files in the exported (source) directory. In addition the usage of sets improves perf compared to the lambda scanning the file status arrays. * Fix formatting and use inbuilt Java joiner --- .../master/tableOps/MoveExportedFiles.java | 46 +++++++++++++++++----- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/MoveExportedFiles.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/MoveExportedFiles.java index 667ccfc..f13e1c5 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/MoveExportedFiles.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/MoveExportedFiles.java @@ -17,7 +17,12 @@ package org.apache.accumulo.master.tableOps; import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.impl.AcceptableThriftTableOperationException; import org.apache.accumulo.core.client.impl.thrift.TableOperation; @@ -30,6 +35,8 @@ import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Sets; + class MoveExportedFiles extends MasterRepo { private static final Logger log = LoggerFactory.getLogger(MoveExportedFiles.class); @@ -48,21 +55,40 @@ class MoveExportedFiles extends MasterRepo { Map<String,String> fileNameMappings = PopulateMetadataTable.readMappingFile(fs, tableInfo); - for (String oldFileName : fileNameMappings.keySet()) { - if (!fs.exists(new Path(tableInfo.exportDir, oldFileName))) { - throw new AcceptableThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, - TableOperation.IMPORT, TableOperationExceptionType.OTHER, - "File referenced by exported table does not exists " + oldFileName); - } + FileStatus[] exportedFiles = fs.listStatus(new Path(tableInfo.exportDir)); + FileStatus[] importedFiles = fs.listStatus(new Path(tableInfo.importDir)); + + Function<FileStatus,String> fileStatusName = fstat -> fstat.getPath().getName(); + + Set<String> importing = Arrays.stream(exportedFiles).map(fileStatusName) + .map(fileNameMappings::get).collect(Collectors.toSet()); + + Set<String> imported = + Arrays.stream(importedFiles).map(fileStatusName).collect(Collectors.toSet()); + + if (log.isDebugEnabled()) { + log.debug("Files already present in imported (target) directory: {}", + imported.stream().collect(Collectors.joining(","))); } - FileStatus[] files = fs.listStatus(new Path(tableInfo.exportDir)); + Set<String> missingFiles = Sets.difference(new HashSet<String>(fileNameMappings.values()), + new HashSet<String>(Sets.union(importing, imported))); + + if (!missingFiles.isEmpty()) { + throw new AcceptableThriftTableOperationException(tableInfo.tableId, tableInfo.tableName, + TableOperation.IMPORT, TableOperationExceptionType.OTHER, + "Missing source files corresponding to files " + + missingFiles.stream().collect(Collectors.joining(","))); + } - for (FileStatus fileStatus : files) { + for (FileStatus fileStatus : exportedFiles) { String newName = fileNameMappings.get(fileStatus.getPath().getName()); - if (newName != null) - fs.rename(fileStatus.getPath(), new Path(tableInfo.importDir, newName)); + if (newName != null) { + Path newPath = new Path(tableInfo.importDir, newName); + log.debug("Renaming file {} to {}", fileStatus.getPath(), newPath); + fs.rename(fileStatus.getPath(), newPath); + } } return new FinishImportTable(tableInfo);