Repository: accumulo Updated Branches: refs/heads/master f51949924 -> d8019702b
ACCUMULO-4664: Add property to allow users to specify which servers should be assigned to manage bulk imports Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d8019702 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d8019702 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d8019702 Branch: refs/heads/master Commit: d8019702bfae7679b7df8a70ed172639bfce9ba2 Parents: f519499 Author: Dave Marion <dlmar...@apache.org> Authored: Thu Jun 22 15:21:22 2017 -0400 Committer: Dave Marion <dlmar...@apache.org> Committed: Thu Jun 22 15:21:22 2017 -0400 ---------------------------------------------------------------------- .../org/apache/accumulo/core/conf/Property.java | 2 + .../accumulo/master/tableOps/LoadFiles.java | 79 ++++++++++++-------- 2 files changed, 51 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/d8019702/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index f161d48..210c04d 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -219,6 +219,8 @@ public enum Property { MASTER_BULK_TIMEOUT("master.bulk.timeout", "5m", PropertyType.TIMEDURATION, "The time to wait for a tablet server to process a bulk import request"), MASTER_BULK_RENAME_THREADS("master.bulk.rename.threadpool.size", "20", PropertyType.COUNT, "The number of threads to use when moving user files to bulk ingest directories under accumulo control"), + MASTER_BULK_TSERVER_REGEX("master.bulk.tserver.regex", "", PropertyType.STRING, + "Regular expression that defines the set of Tablet Servers that will perform bulk imports"), MASTER_MINTHREADS("master.server.threads.minimum", "20", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."), MASTER_THREADCHECK("master.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."), MASTER_RECOVERY_DELAY("master.recovery.delay", "10s", PropertyType.TIMEDURATION, http://git-wip-us.apache.org/repos/asf/accumulo/blob/d8019702/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java index ff285fa..8ed7451 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java @@ -33,6 +33,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import org.apache.accumulo.core.client.impl.AcceptableThriftTableOperationException; import org.apache.accumulo.core.client.impl.thrift.ClientService; @@ -135,38 +136,56 @@ class LoadFiles extends MasterRepo { // Use the threadpool to assign files one-at-a-time to the server final List<String> loaded = Collections.synchronizedList(new ArrayList<String>()); final Random random = new Random(); - final TServerInstance[] servers = master.onlineTabletServers().toArray(new TServerInstance[0]); - for (final String file : filesToLoad) { - results.add(executor.submit(new Callable<List<String>>() { - @Override - public List<String> call() { - List<String> failures = new ArrayList<>(); - ClientService.Client client = null; - HostAndPort server = null; - try { - // get a connection to a random tablet server, do not prefer cached connections because - // this is running on the master and there are lots of connections to tablet servers - // serving the metadata tablets - long timeInMillis = master.getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT); - // Pair<String,Client> pair = ServerClient.getConnection(master, false, timeInMillis); - server = servers[random.nextInt(servers.length)].getLocation(); - client = ThriftUtil.getTServerClient(server, master, timeInMillis); - List<String> attempt = Collections.singletonList(file); - log.debug("Asking " + server + " to bulk import " + file); - List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), master.rpcCreds(), tid, tableId, attempt, errorDir, setTime); - if (fail.isEmpty()) { - loaded.add(file); - } else { - failures.addAll(fail); + final TServerInstance[] servers; + String prop = conf.get(Property.MASTER_BULK_TSERVER_REGEX); + if (null == prop || "".equals(prop)) { + servers = master.onlineTabletServers().toArray(new TServerInstance[0]); + } else { + Pattern regex = Pattern.compile(prop); + List<TServerInstance> subset = new ArrayList<>(); + master.onlineTabletServers().forEach(t -> { + if (regex.matcher(t.host()).matches()) { + subset.add(t); + } + }); + if (0 == subset.size()) { + log.warn("There are no tablet servers online that match supplied regex: {}", conf.get(Property.MASTER_BULK_TSERVER_REGEX)); + } + servers = subset.toArray(new TServerInstance[0]); + } + if (servers.length > 0) { + for (final String file : filesToLoad) { + results.add(executor.submit(new Callable<List<String>>() { + @Override + public List<String> call() { + List<String> failures = new ArrayList<>(); + ClientService.Client client = null; + HostAndPort server = null; + try { + // get a connection to a random tablet server, do not prefer cached connections because + // this is running on the master and there are lots of connections to tablet servers + // serving the metadata tablets + long timeInMillis = master.getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT); + // Pair<String,Client> pair = ServerClient.getConnection(master, false, timeInMillis); + server = servers[random.nextInt(servers.length)].getLocation(); + client = ThriftUtil.getTServerClient(server, master, timeInMillis); + List<String> attempt = Collections.singletonList(file); + log.debug("Asking " + server + " to bulk import " + file); + List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), master.rpcCreds(), tid, tableId, attempt, errorDir, setTime); + if (fail.isEmpty()) { + loaded.add(file); + } else { + failures.addAll(fail); + } + } catch (Exception ex) { + log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex); + } finally { + ThriftUtil.returnClient(client); } - } catch (Exception ex) { - log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex); - } finally { - ThriftUtil.returnClient(client); + return failures; } - return failures; - } - })); + })); + } } Set<String> failures = new HashSet<>(); for (Future<List<String>> f : results)