Author: ecn
Date: Mon Apr  8 17:25:56 2013
New Revision: 1465688

URL: http://svn.apache.org/r1465688
Log:
ACCUMULO-1252 remove loaded files outside the threadpool

Modified:
    accumulo/branches/1.5/   (props changed)
    accumulo/branches/1.5/assemble/   (props changed)
    accumulo/branches/1.5/core/   (props changed)
    accumulo/branches/1.5/examples/   (props changed)
    
accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java 
  (props changed)
    
accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
   (props changed)
    accumulo/branches/1.5/server/   (props changed)
    
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
    accumulo/branches/1.5/src/   (props changed)

Propchange: accumulo/branches/1.5/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/src:r1465687
  Merged /accumulo/branches/1.4:r1465687

Propchange: accumulo/branches/1.5/assemble/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/src/assemble:r1465687
  Merged /accumulo/branches/1.4/assemble:r1465687

Propchange: accumulo/branches/1.5/core/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/core:r1465687
  Merged /accumulo/branches/1.4/src/core:r1465687

Propchange: accumulo/branches/1.5/examples/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/src/examples:r1465687

Propchange: 
accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java
------------------------------------------------------------------------------
  Merged 
/accumulo/branches/1.4/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:r1465687
  Merged 
/accumulo/branches/1.4/src/fate/src/main/java/org/apache/accumulo/fate/ZooStore.java:r1465687

Propchange: 
accumulo/branches/1.5/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java
------------------------------------------------------------------------------
  Merged 
/accumulo/branches/1.4/src/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:r1465687
  Merged 
/accumulo/branches/1.4/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooSession.java:r1465687

Propchange: accumulo/branches/1.5/server/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/server:r1465687
  Merged /accumulo/branches/1.4/src/server:r1465687

Modified: 
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
URL: 
http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1465688&r1=1465687&r2=1465688&view=diff
==============================================================================
--- 
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
 (original)
+++ 
accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java
 Mon Apr  8 17:25:56 2013
@@ -505,7 +505,7 @@ class LoadFiles extends MasterRepo {
     }
     fs.delete(writable, false);
     
-    final List<String> filesToLoad = Collections.synchronizedList(new 
ArrayList<String>());
+    final Set<String> filesToLoad = Collections.synchronizedSet(new 
HashSet<String>());
     for (FileStatus f : files)
       filesToLoad.add(f.getPath().toString());
     
@@ -521,6 +521,7 @@ class LoadFiles extends MasterRepo {
       }
       
       // Use the threadpool to assign files one-at-a-time to the server
+      final List<String> loaded = Collections.synchronizedList(new 
ArrayList<String>());
       for (final String file : filesToLoad) {
         results.add(threadPool.submit(new Callable<List<String>>() {
           @Override
@@ -540,7 +541,7 @@ class LoadFiles extends MasterRepo {
               log.debug("Asking " + pair.getFirst() + " to bulk import " + 
file);
               List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), 
SecurityConstants.getSystemCredentials(), tid, tableId, attempt, errorDir, 
setTime);
               if (fail.isEmpty()) {
-                filesToLoad.remove(file);
+                loaded.add(file);
               } else {
                 failures.addAll(fail);
               }
@@ -556,6 +557,7 @@ class LoadFiles extends MasterRepo {
       Set<String> failures = new HashSet<String>();
       for (Future<List<String>> f : results)
         failures.addAll(f.get());
+      filesToLoad.removeAll(loaded);
       if (filesToLoad.size() > 0) {
         log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + 
sampleList(filesToLoad, 10) + " failed");
         UtilWaitThread.sleep(100);

Propchange: accumulo/branches/1.5/src/
------------------------------------------------------------------------------
  Merged /accumulo/branches/1.4/src:r1465687
  Merged /accumulo/branches/1.4/src/src:r1465687


Reply via email to