Repository: accumulo
Updated Branches:
  refs/heads/master f51949924 -> d8019702b


ACCUMULO-4664: Add property to allow users to specify which servers should be 
assigned to manage bulk imports


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d8019702
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d8019702
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d8019702

Branch: refs/heads/master
Commit: d8019702bfae7679b7df8a70ed172639bfce9ba2
Parents: f519499
Author: Dave Marion <dlmar...@apache.org>
Authored: Thu Jun 22 15:21:22 2017 -0400
Committer: Dave Marion <dlmar...@apache.org>
Committed: Thu Jun 22 15:21:22 2017 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |  2 +
 .../accumulo/master/tableOps/LoadFiles.java     | 79 ++++++++++++--------
 2 files changed, 51 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/d8019702/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index f161d48..210c04d 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -219,6 +219,8 @@ public enum Property {
   MASTER_BULK_TIMEOUT("master.bulk.timeout", "5m", PropertyType.TIMEDURATION, 
"The time to wait for a tablet server to process a bulk import request"),
   MASTER_BULK_RENAME_THREADS("master.bulk.rename.threadpool.size", "20", 
PropertyType.COUNT,
       "The number of threads to use when moving user files to bulk ingest 
directories under accumulo control"),
+  MASTER_BULK_TSERVER_REGEX("master.bulk.tserver.regex", "", 
PropertyType.STRING,
+      "Regular expression that defines the set of Tablet Servers that will 
perform bulk imports"),
   MASTER_MINTHREADS("master.server.threads.minimum", "20", PropertyType.COUNT, 
"The minimum number of threads to use to handle incoming requests."),
   MASTER_THREADCHECK("master.server.threadcheck.time", "1s", 
PropertyType.TIMEDURATION, "The time between adjustments of the server thread 
pool."),
   MASTER_RECOVERY_DELAY("master.recovery.delay", "10s", 
PropertyType.TIMEDURATION,

http://git-wip-us.apache.org/repos/asf/accumulo/blob/d8019702/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
----------------------------------------------------------------------
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
 
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
index ff285fa..8ed7451 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java
@@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 
 import 
org.apache.accumulo.core.client.impl.AcceptableThriftTableOperationException;
 import org.apache.accumulo.core.client.impl.thrift.ClientService;
@@ -135,38 +136,56 @@ class LoadFiles extends MasterRepo {
       // Use the threadpool to assign files one-at-a-time to the server
       final List<String> loaded = Collections.synchronizedList(new 
ArrayList<String>());
       final Random random = new Random();
-      final TServerInstance[] servers = 
master.onlineTabletServers().toArray(new TServerInstance[0]);
-      for (final String file : filesToLoad) {
-        results.add(executor.submit(new Callable<List<String>>() {
-          @Override
-          public List<String> call() {
-            List<String> failures = new ArrayList<>();
-            ClientService.Client client = null;
-            HostAndPort server = null;
-            try {
-              // get a connection to a random tablet server, do not prefer 
cached connections because
-              // this is running on the master and there are lots of 
connections to tablet servers
-              // serving the metadata tablets
-              long timeInMillis = 
master.getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
-              // Pair<String,Client> pair = ServerClient.getConnection(master, 
false, timeInMillis);
-              server = servers[random.nextInt(servers.length)].getLocation();
-              client = ThriftUtil.getTServerClient(server, master, 
timeInMillis);
-              List<String> attempt = Collections.singletonList(file);
-              log.debug("Asking " + server + " to bulk import " + file);
-              List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), 
master.rpcCreds(), tid, tableId, attempt, errorDir, setTime);
-              if (fail.isEmpty()) {
-                loaded.add(file);
-              } else {
-                failures.addAll(fail);
+      final TServerInstance[] servers;
+      String prop = conf.get(Property.MASTER_BULK_TSERVER_REGEX);
+      if (null == prop || "".equals(prop)) {
+        servers = master.onlineTabletServers().toArray(new TServerInstance[0]);
+      } else {
+        Pattern regex = Pattern.compile(prop);
+        List<TServerInstance> subset = new ArrayList<>();
+        master.onlineTabletServers().forEach(t -> {
+          if (regex.matcher(t.host()).matches()) {
+            subset.add(t);
+          }
+        });
+        if (0 == subset.size()) {
+          log.warn("There are no tablet servers online that match supplied 
regex: {}", conf.get(Property.MASTER_BULK_TSERVER_REGEX));
+        }
+        servers = subset.toArray(new TServerInstance[0]);
+      }
+      if (servers.length > 0) {
+        for (final String file : filesToLoad) {
+          results.add(executor.submit(new Callable<List<String>>() {
+            @Override
+            public List<String> call() {
+              List<String> failures = new ArrayList<>();
+              ClientService.Client client = null;
+              HostAndPort server = null;
+              try {
+                // get a connection to a random tablet server, do not prefer 
cached connections because
+                // this is running on the master and there are lots of 
connections to tablet servers
+                // serving the metadata tablets
+                long timeInMillis = 
master.getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT);
+                // Pair<String,Client> pair = 
ServerClient.getConnection(master, false, timeInMillis);
+                server = servers[random.nextInt(servers.length)].getLocation();
+                client = ThriftUtil.getTServerClient(server, master, 
timeInMillis);
+                List<String> attempt = Collections.singletonList(file);
+                log.debug("Asking " + server + " to bulk import " + file);
+                List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), 
master.rpcCreds(), tid, tableId, attempt, errorDir, setTime);
+                if (fail.isEmpty()) {
+                  loaded.add(file);
+                } else {
+                  failures.addAll(fail);
+                }
+              } catch (Exception ex) {
+                log.error("rpc failed server:" + server + ", tid:" + tid + " " 
+ ex);
+              } finally {
+                ThriftUtil.returnClient(client);
               }
-            } catch (Exception ex) {
-              log.error("rpc failed server:" + server + ", tid:" + tid + " " + 
ex);
-            } finally {
-              ThriftUtil.returnClient(client);
+              return failures;
             }
-            return failures;
-          }
-        }));
+          }));
+        }
       }
       Set<String> failures = new HashSet<>();
       for (Future<List<String>> f : results)

Reply via email to