Author: ecn Date: Mon Apr 8 17:27:37 2013 New Revision: 1465690 URL: http://svn.apache.org/r1465690 Log: ACCUMULO-1252 remove loaded files outside the threadpool
Modified: accumulo/trunk/ (props changed) accumulo/trunk/assemble/ (props changed) accumulo/trunk/core/ (props changed) accumulo/trunk/examples/ (props changed) accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java (props changed) accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java (props changed) accumulo/trunk/server/ (props changed) accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java accumulo/trunk/src/ (props changed) Propchange: accumulo/trunk/ ------------------------------------------------------------------------------ Merged /accumulo/branches/1.4/src:r1465687 Merged /accumulo/branches/1.5:r1465688 Merged /accumulo/branches/1.4:r1465687 Propchange: accumulo/trunk/assemble/ ------------------------------------------------------------------------------ Merged /accumulo/branches/1.4/src/assemble:r1465687 Merged /accumulo/branches/1.4/assemble:r1465687 Merged /accumulo/branches/1.5/assemble:r1465688 Propchange: accumulo/trunk/core/ ------------------------------------------------------------------------------ Merged /accumulo/branches/1.5/core:r1465688 Merged /accumulo/branches/1.4/core:r1465687 Merged /accumulo/branches/1.4/src/core:r1465687 Propchange: accumulo/trunk/examples/ ------------------------------------------------------------------------------ Merged /accumulo/branches/1.4/src/examples:r1465687 Merged /accumulo/branches/1.5/examples:r1465688 Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java ------------------------------------------------------------------------------ Merged /accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:r1465688 Merged /accumulo/branches/1.4/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:r1465687 Merged /accumulo/branches/1.4/src/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:r1465687 Propchange: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java ------------------------------------------------------------------------------ Merged /accumulo/branches/1.4/src/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:r1465687 Merged /accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:r1465688 Merged /accumulo/branches/1.4/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:r1465687 Propchange: accumulo/trunk/server/ ------------------------------------------------------------------------------ Merged /accumulo/branches/1.4/server:r1465687 Merged /accumulo/branches/1.5/server:r1465688 Merged /accumulo/branches/1.4/src/server:r1465687 Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1465690&r1=1465689&r2=1465690&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Mon Apr 8 17:27:37 2013 @@ -505,7 +505,7 @@ class LoadFiles extends MasterRepo { } fs.delete(writable, false); - final List<String> filesToLoad = Collections.synchronizedList(new ArrayList<String>()); + final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>()); for (FileStatus f : files) filesToLoad.add(f.getPath().toString()); @@ -521,6 +521,7 @@ class LoadFiles extends MasterRepo { } // Use the threadpool to assign files one-at-a-time to the server + final List<String> loaded = Collections.synchronizedList(new ArrayList<String>()); for (final String file : filesToLoad) { results.add(threadPool.submit(new Callable<List<String>>() { @Override @@ -540,7 +541,7 @@ class LoadFiles extends MasterRepo { log.debug("Asking " + pair.getFirst() + " to bulk import " + file); List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), SecurityConstants.getSystemCredentials(), tid, tableId, attempt, errorDir, setTime); if (fail.isEmpty()) { - filesToLoad.remove(file); + loaded.add(file); } else { failures.addAll(fail); } @@ -556,6 +557,7 @@ class LoadFiles extends MasterRepo { Set<String> failures = new HashSet<String>(); for (Future<List<String>> f : results) failures.addAll(f.get()); + filesToLoad.removeAll(loaded); if (filesToLoad.size() > 0) { log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed"); UtilWaitThread.sleep(100); Propchange: accumulo/trunk/src/ ------------------------------------------------------------------------------ Merged /accumulo/branches/1.4/src:r1465687 Merged /accumulo/branches/1.4/src/src:r1465687 Merged /accumulo/branches/1.5/src:r1465688