ACCUMULO-4028 use known information to pick a random server, merge to 1.7
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/78e2c65e Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/78e2c65e Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/78e2c65e Branch: refs/heads/master Commit: 78e2c65e282abbf36fdd502688a8eabf96f6a9a2 Parents: 91b161a 0212f2f Author: Eric C. Newton <eric.new...@gmail.com> Authored: Wed Oct 14 13:57:12 2015 -0400 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Wed Oct 14 13:57:12 2015 -0400 ---------------------------------------------------------------------- .../accumulo/master/tableOps/BulkImport.java | 1 + .../accumulo/master/tableOps/LoadFiles.java | 24 ++++++++++++-------- 2 files changed, 15 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/78e2c65e/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java ---------------------------------------------------------------------- diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java index ad20473,5320aae..7001fdd --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java @@@ -266,3 -286,337 +266,4 @@@ public class BulkImport extends MasterR ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid); } } + -class CleanUpBulkImport extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private static final Logger log = Logger.getLogger(CleanUpBulkImport.class); - - private String tableId; - private String source; - private String bulk; - private String error; - - public CleanUpBulkImport(String tableId, String source, String bulk, String error) { - this.tableId = tableId; - this.source = source; - this.bulk = bulk; - this.error = error; - } - - @Override - public Repo<Master> call(long tid, Master master) throws Exception { - log.debug("removing the bulk processing flag file in " + bulk); - Path bulkDir = new Path(bulk); - MetadataTableUtil.removeBulkLoadInProgressFlag("/" + bulkDir.getParent().getName() + "/" + bulkDir.getName()); - MetadataTableUtil.addDeleteEntry(tableId, bulkDir.toString()); - log.debug("removing the metadata table markers for loaded files"); - Connector conn = master.getConnector(); - MetadataTableUtil.removeBulkLoadEntries(conn, tableId, tid); - log.debug("releasing HDFS reservations for " + source + " and " + error); - Utils.unreserveHdfsDirectory(source, tid); - Utils.unreserveHdfsDirectory(error, tid); - Utils.getReadLock(tableId, tid).unlock(); - log.debug("completing bulk import transaction " + tid); - ZooArbitrator.cleanup(Constants.BULK_ARBITRATOR_TYPE, tid); - return null; - } -} - -class CompleteBulkImport extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private String tableId; - private String source; - private String bulk; - private String error; - - public CompleteBulkImport(String tableId, String source, String bulk, String error) { - this.tableId = tableId; - this.source = source; - this.bulk = bulk; - this.error = error; - } - - @Override - public Repo<Master> call(long tid, Master master) throws Exception { - ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid); - return new CopyFailed(tableId, source, bulk, error); - } -} - -class CopyFailed extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private String tableId; - private String source; - private String bulk; - private String error; - - public CopyFailed(String tableId, String source, String bulk, String error) { - this.tableId = tableId; - this.source = source; - this.bulk = bulk; - this.error = error; - } - - @Override - public long isReady(long tid, Master master) throws Exception { - Set<TServerInstance> finished = new HashSet<TServerInstance>(); - Set<TServerInstance> running = master.onlineTabletServers(); - for (TServerInstance server : running) { - try { - TServerConnection client = master.getConnection(server); - if (client != null && !client.isActive(tid)) - finished.add(server); - } catch (TException ex) { - log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + ex); - } - } - if (finished.containsAll(running)) - return 0; - return 500; - } - - @Override - public Repo<Master> call(long tid, Master master) throws Exception { - // This needs to execute after the arbiter is stopped - - VolumeManager fs = master.getFileSystem(); - - if (!fs.exists(new Path(error, BulkImport.FAILURES_TXT))) - return new CleanUpBulkImport(tableId, source, bulk, error); - - HashMap<FileRef,String> failures = new HashMap<FileRef,String>(); - HashMap<FileRef,String> loadedFailures = new HashMap<FileRef,String>(); - - FSDataInputStream failFile = fs.open(new Path(error, BulkImport.FAILURES_TXT)); - BufferedReader in = new BufferedReader(new InputStreamReader(failFile, UTF_8)); - try { - String line = null; - while ((line = in.readLine()) != null) { - Path path = new Path(line); - if (!fs.exists(new Path(error, path.getName()))) - failures.put(new FileRef(line, path), line); - } - } finally { - failFile.close(); - } - - /* - * I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that - * have no loaded markers. - */ - - // determine which failed files were loaded - Connector conn = master.getConnector(); - Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)); - mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange()); - mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME); - - for (Entry<Key,Value> entry : mscanner) { - if (Long.parseLong(entry.getValue().toString()) == tid) { - FileRef loadedFile = new FileRef(fs, entry.getKey()); - String absPath = failures.remove(loadedFile); - if (absPath != null) { - loadedFailures.put(loadedFile, absPath); - } - } - } - - // move failed files that were not loaded - for (String failure : failures.values()) { - Path orig = new Path(failure); - Path dest = new Path(error, orig.getName()); - fs.rename(orig, dest); - log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": import failed"); - } - - if (loadedFailures.size() > 0) { - DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() - + Constants.ZBULK_FAILED_COPYQ); - - HashSet<String> workIds = new HashSet<String>(); - - for (String failure : loadedFailures.values()) { - Path orig = new Path(failure); - Path dest = new Path(error, orig.getName()); - - if (fs.exists(dest)) - continue; - - bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes(UTF_8)); - workIds.add(orig.getName()); - log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed"); - } - - bifCopyQueue.waitUntilDone(workIds); - } - - fs.deleteRecursively(new Path(error, BulkImport.FAILURES_TXT)); - return new CleanUpBulkImport(tableId, source, bulk, error); - } - -} - -class LoadFiles extends MasterRepo { - - private static final long serialVersionUID = 1L; - - private static ExecutorService threadPool = null; - private static final Logger log = Logger.getLogger(BulkImport.class); - - private String tableId; - private String source; - private String bulk; - private String errorDir; - private boolean setTime; - - public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) { - this.tableId = tableId; - this.source = source; - this.bulk = bulk; - this.errorDir = errorDir; - this.setTime = setTime; - } - - @Override - public long isReady(long tid, Master master) throws Exception { - if (master.onlineTabletServers().size() == 0) - return 500; - return 0; - } - - private static synchronized ExecutorService getThreadPool(Master master) { - if (threadPool == null) { - int threadPoolSize = master.getSystemConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE); - ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import"); - pool.allowCoreThreadTimeOut(true); - threadPool = new TraceExecutorService(pool); - } - return threadPool; - } - - @Override - public Repo<Master> call(final long tid, final Master master) throws Exception { - ExecutorService executor = getThreadPool(master); - final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration(); - VolumeManager fs = master.getFileSystem(); - List<FileStatus> files = new ArrayList<FileStatus>(); - for (FileStatus entry : fs.listStatus(new Path(bulk))) { - files.add(entry); - } - log.debug("tid " + tid + " importing " + files.size() + " files"); - - Path writable = new Path(this.errorDir, ".iswritable"); - if (!fs.createNewFile(writable)) { - // Maybe this is a re-try... clear the flag and try again - fs.delete(writable); - if (!fs.createNewFile(writable)) - throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, - "Unable to write to " + this.errorDir); - } - fs.delete(writable); - - final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>()); - for (FileStatus f : files) - filesToLoad.add(f.getPath().toString()); - - final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES)); - for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) { - List<Future<List<String>>> results = new ArrayList<Future<List<String>>>(); - - if (master.onlineTabletServers().size() == 0) - log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")"); - - while (master.onlineTabletServers().size() == 0) { - UtilWaitThread.sleep(500); - } - - // Use the threadpool to assign files one-at-a-time to the server - final List<String> loaded = Collections.synchronizedList(new ArrayList<String>()); - final Random random = new Random(); - for (final String file : filesToLoad) { - results.add(executor.submit(new Callable<List<String>>() { - @Override - public List<String> call() { - List<String> failures = new ArrayList<String>(); - Client client = null; - String server = null; - try { - // get a connection to a random tablet server, do not prefer cached connections because - // this is running on the master and there are lots of connections to tablet servers - // serving the metadata tablets - long timeInMillis = master.getConfiguration().getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT); - TServerInstance servers[] = master.onlineTabletServers().toArray(new TServerInstance[0]); - server = servers[random.nextInt(servers.length)].getLocation().toString(); - client = ThriftUtil.getTServerClient(server, master.getConfiguration().getConfiguration(), timeInMillis); - List<String> attempt = Collections.singletonList(file); - log.debug("Asking " + server + " to bulk import " + file); - List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), SystemCredentials.get().toThrift(master.getInstance()), tid, tableId, attempt, - errorDir, setTime); - if (fail.isEmpty()) { - loaded.add(file); - } else { - failures.addAll(fail); - } - } catch (Exception ex) { - log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex); - } finally { - if (client != null) { - ThriftUtil.returnClient(client); - } - } - return failures; - } - })); - } - Set<String> failures = new HashSet<String>(); - for (Future<List<String>> f : results) - failures.addAll(f.get()); - filesToLoad.removeAll(loaded); - if (filesToLoad.size() > 0) { - log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed"); - UtilWaitThread.sleep(100); - } - } - - FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true); - BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, UTF_8)); - try { - for (String f : filesToLoad) { - out.write(f); - out.write("\n"); - } - } finally { - out.close(); - } - - // return the next step, which will perform cleanup - return new CompleteBulkImport(tableId, source, bulk, errorDir); - } - - static String sampleList(Collection<?> potentiallyLongList, int max) { - StringBuffer result = new StringBuffer(); - result.append("["); - int i = 0; - for (Object obj : potentiallyLongList) { - result.append(obj); - if (i >= max) { - result.append("..."); - break; - } else { - result.append(", "); - } - i++; - } - if (i < max) - result.delete(result.length() - 2, result.length()); - result.append("]"); - return result.toString(); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/78e2c65e/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java ---------------------------------------------------------------------- diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java index 4a56c6f,0000000..a80e1ff mode 100644,000000..100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/LoadFiles.java @@@ -1,209 -1,0 +1,213 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.master.tableOps; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.io.BufferedWriter; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; ++import java.util.Random; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; + - import org.apache.accumulo.core.client.impl.ServerClient; +import org.apache.accumulo.core.client.impl.thrift.ClientService; - import org.apache.accumulo.core.client.impl.thrift.ClientService.Client; +import org.apache.accumulo.core.client.impl.thrift.TableOperation; +import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType; +import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; ++import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.trace.Tracer; - import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.core.util.SimpleThreadPool; +import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.fate.Repo; +import org.apache.accumulo.master.Master; +import org.apache.accumulo.server.fs.VolumeManager; ++import org.apache.accumulo.server.master.state.TServerInstance; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.htrace.wrappers.TraceExecutorService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + ++import com.google.common.net.HostAndPort; ++ +class LoadFiles extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private static ExecutorService threadPool = null; + private static final Logger log = LoggerFactory.getLogger(LoadFiles.class); + + private String tableId; + private String source; + private String bulk; + private String errorDir; + private boolean setTime; + + public LoadFiles(String tableId, String source, String bulk, String errorDir, boolean setTime) { + this.tableId = tableId; + this.source = source; + this.bulk = bulk; + this.errorDir = errorDir; + this.setTime = setTime; + } + + @Override + public long isReady(long tid, Master master) throws Exception { + if (master.onlineTabletServers().size() == 0) + return 500; + return 0; + } + + private static synchronized ExecutorService getThreadPool(Master master) { + if (threadPool == null) { + int threadPoolSize = master.getConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE); + ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import"); + pool.allowCoreThreadTimeOut(true); + threadPool = new TraceExecutorService(pool); + } + return threadPool; + } + + @Override + public Repo<Master> call(final long tid, final Master master) throws Exception { + ExecutorService executor = getThreadPool(master); + final AccumuloConfiguration conf = master.getConfiguration(); + VolumeManager fs = master.getFileSystem(); + List<FileStatus> files = new ArrayList<FileStatus>(); + for (FileStatus entry : fs.listStatus(new Path(bulk))) { + files.add(entry); + } + log.debug("tid " + tid + " importing " + files.size() + " files"); + + Path writable = new Path(this.errorDir, ".iswritable"); + if (!fs.createNewFile(writable)) { + // Maybe this is a re-try... clear the flag and try again + fs.delete(writable); + if (!fs.createNewFile(writable)) + throw new ThriftTableOperationException(tableId, null, TableOperation.BULK_IMPORT, TableOperationExceptionType.BULK_BAD_ERROR_DIRECTORY, + "Unable to write to " + this.errorDir); + } + fs.delete(writable); + + final Set<String> filesToLoad = Collections.synchronizedSet(new HashSet<String>()); + for (FileStatus f : files) + filesToLoad.add(f.getPath().toString()); + + final int RETRIES = Math.max(1, conf.getCount(Property.MASTER_BULK_RETRIES)); + for (int attempt = 0; attempt < RETRIES && filesToLoad.size() > 0; attempt++) { + List<Future<List<String>>> results = new ArrayList<Future<List<String>>>(); + + if (master.onlineTabletServers().size() == 0) + log.warn("There are no tablet server to process bulk import, waiting (tid = " + tid + ")"); + + while (master.onlineTabletServers().size() == 0) { + UtilWaitThread.sleep(500); + } + + // Use the threadpool to assign files one-at-a-time to the server + final List<String> loaded = Collections.synchronizedList(new ArrayList<String>()); ++ final Random random = new Random(); + for (final String file : filesToLoad) { + results.add(executor.submit(new Callable<List<String>>() { + @Override + public List<String> call() { + List<String> failures = new ArrayList<String>(); + ClientService.Client client = null; - String server = null; ++ HostAndPort server = null; + try { + // get a connection to a random tablet server, do not prefer cached connections because + // this is running on the master and there are lots of connections to tablet servers + // serving the metadata tablets + long timeInMillis = master.getConfiguration().getTimeInMillis(Property.MASTER_BULK_TIMEOUT); - Pair<String,Client> pair = ServerClient.getConnection(master, false, timeInMillis); - client = pair.getSecond(); - server = pair.getFirst(); ++ // Pair<String,Client> pair = ServerClient.getConnection(master, false, timeInMillis); ++ TServerInstance[] servers = master.onlineTabletServers().toArray(new TServerInstance[0]); ++ server = servers[random.nextInt(servers.length)].getLocation(); ++ client = ThriftUtil.getTServerClient(server, master, timeInMillis); + List<String> attempt = Collections.singletonList(file); - log.debug("Asking " + pair.getFirst() + " to bulk import " + file); ++ log.debug("Asking " + server + " to bulk import " + file); + List<String> fail = client.bulkImportFiles(Tracer.traceInfo(), master.rpcCreds(), tid, tableId, attempt, errorDir, setTime); + if (fail.isEmpty()) { + loaded.add(file); + } else { + failures.addAll(fail); + } + } catch (Exception ex) { + log.error("rpc failed server:" + server + ", tid:" + tid + " " + ex); + } finally { - ServerClient.close(client); ++ ThriftUtil.returnClient(client); + } + return failures; + } + })); + } + Set<String> failures = new HashSet<String>(); + for (Future<List<String>> f : results) + failures.addAll(f.get()); + filesToLoad.removeAll(loaded); + if (filesToLoad.size() > 0) { + log.debug("tid " + tid + " attempt " + (attempt + 1) + " " + sampleList(filesToLoad, 10) + " failed"); + UtilWaitThread.sleep(100); + } + } + + FSDataOutputStream failFile = fs.create(new Path(errorDir, BulkImport.FAILURES_TXT), true); + BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile, UTF_8)); + try { + for (String f : filesToLoad) { + out.write(f); + out.write("\n"); + } + } finally { + out.close(); + } + + // return the next step, which will perform cleanup + return new CompleteBulkImport(tableId, source, bulk, errorDir); + } + + static String sampleList(Collection<?> potentiallyLongList, int max) { + StringBuffer result = new StringBuffer(); + result.append("["); + int i = 0; + for (Object obj : potentiallyLongList) { + result.append(obj); + if (i >= max) { + result.append("..."); + break; + } else { + result.append(", "); + } + i++; + } + if (i < max) + result.delete(result.length() - 2, result.length()); + result.append("]"); + return result.toString(); + } + - } ++}