Author: kturner Date: Tue Jul 3 21:13:15 2012 New Revision: 1356949 URL: http://svn.apache.org/viewvc?rev=1356949&view=rev Log: ACCUMULO-409 Make tservers copy failed bulk import files instead of master. (merged from 1.4)
Added: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/BulkFailedCopyProcessor.java - copied unchanged from r1356900, accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/BulkFailedCopyProcessor.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java - copied, changed from r1356900, accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java Modified: accumulo/trunk/ (props changed) accumulo/trunk/core/ (props changed) accumulo/trunk/core/src/main/java/org/apache/accumulo/core/Constants.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/FirstEntryInRowTest.java accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java accumulo/trunk/server/ (props changed) accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java accumulo/trunk/src/ (props changed) accumulo/trunk/test/system/test4/bulk_import_test.sh Propchange: accumulo/trunk/ ------------------------------------------------------------------------------ Merged /accumulo/trunk/src:r1341000,1342373,1351691,1356400 Merged /accumulo/branches/1.4/src:r1339309-1342420,1343897-1343898,1343943-1349971,1349973-1351424,1351426-1354669,1354673-1356900 Merged /accumulo/branches/1.3:r1354669 Merged /accumulo/branches/1.3/src:r1354669 Merged /accumulo/branches/1.4:r1343943-1349971,1349973-1351424,1351426-1354668,1354673-1356923 Propchange: accumulo/trunk/core/ ------------------------------------------------------------------------------ Merged /accumulo/trunk/src/core:r1341000,1342373,1351691,1356400 Merged /accumulo/branches/1.3/src/core:r1354669 Merged /accumulo/branches/1.4/core:r1343943-1349971,1349973-1351424,1351426-1354668,1354673-1356923 Merged /accumulo/branches/1.4/src/core:r1339309-1342420,1343897-1343898,1343943-1349971,1349973-1351424,1351426-1354669,1354673-1356900 Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/Constants.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/Constants.java?rev=1356949&r1=1356948&r2=1356949&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/Constants.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/Constants.java Tue Jul 3 21:13:15 2012 @@ -74,6 +74,8 @@ public class Constants { public static final String ZNEXT_FILE = "/next_file"; + public static final String ZBULK_FAILED_COPYQ = "/bulk_failed_copyq"; + public static final String ZHDFS_RESERVATIONS = "/hdfs_reservations"; public static final String ZRECOVERY = "/recovery"; Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1356949&r1=1356948&r2=1356949&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java Tue Jul 3 21:13:15 2012 @@ -151,7 +151,9 @@ public enum Property { TSERV_RECOVERY_MAX_CONCURRENT("tserver.recovery.concurrent.max", "2", PropertyType.COUNT, "The maximum number of threads to use to sort logs during recovery"), TSERV_SORT_BUFFER_SIZE("tserver.sort.buffer.size", "200M", PropertyType.MEMORY, "The amount of memory to use when sorting logs during recovery."), TSERV_ARCHIVE_WALOGS("tserver.archive.walogs", "false", PropertyType.BOOLEAN, "Keep copies of the WALOGs for debugging purposes"), - + TSERV_WORKQ_THREADS("tserver.workq.threads", "2", PropertyType.COUNT, + "The number of threads for the distributed workq. These threads are used for copying failed bulk files."), + // properties that are specific to logger server behavior LOGGER_PREFIX("logger.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the write-ahead logger servers"), LOGGER_DIR("logger.dir.walog", "walogs", PropertyType.PATH, Modified: accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/FirstEntryInRowTest.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/FirstEntryInRowTest.java?rev=1356949&r1=1356948&r2=1356949&view=diff ============================================================================== --- accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/FirstEntryInRowTest.java (original) +++ accumulo/trunk/core/src/test/java/org/apache/accumulo/core/iterators/FirstEntryInRowTest.java Tue Jul 3 21:13:15 2012 @@ -124,3 +124,4 @@ public class FirstEntryInRowTest { assertFalse(fei.hasTop()); } } + Modified: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java?rev=1356949&r1=1356948&r2=1356949&view=diff ============================================================================== --- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java (original) +++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/IZooReaderWriter.java Tue Jul 3 21:13:15 2012 @@ -48,6 +48,8 @@ public interface IZooReaderWriter extend public abstract String putEphemeralSequential(String zPath, byte[] data) throws KeeperException, InterruptedException; + public String putEphemeralData(String zPath, byte[] data) throws KeeperException, InterruptedException; + public abstract void recursiveCopyPersistent(String source, String destination, NodeExistsPolicy policy) throws KeeperException, InterruptedException; public abstract void delete(String path, int version) throws InterruptedException, KeeperException; Modified: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java?rev=1356949&r1=1356948&r2=1356949&view=diff ============================================================================== --- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java (original) +++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooReaderWriter.java Tue Jul 3 21:13:15 2012 @@ -93,6 +93,11 @@ public class ZooReaderWriter extends Zoo } @Override + public String putEphemeralData(String zPath, byte[] data) throws KeeperException, InterruptedException { + return ZooUtil.putEphemeralData(getZooKeeper(), zPath, data); + } + + @Override public String putEphemeralSequential(String zPath, byte[] data) throws KeeperException, InterruptedException { return ZooUtil.putEphemeralSequential(getZooKeeper(), zPath, data); } Modified: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java?rev=1356949&r1=1356948&r2=1356949&view=diff ============================================================================== --- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java (original) +++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java Tue Jul 3 21:13:15 2012 @@ -211,6 +211,10 @@ public class ZooUtil { return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.PERSISTENT_SEQUENTIAL); } + public static String putEphemeralData(ZooKeeper zk, String zPath, byte[] data) throws KeeperException, InterruptedException { + return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL); + } + public static String putEphemeralSequential(ZooKeeper zk, String zPath, byte[] data) throws KeeperException, InterruptedException { return zk.create(zPath, data, ZooUtil.PUBLIC, CreateMode.EPHEMERAL_SEQUENTIAL); } Propchange: accumulo/trunk/server/ ------------------------------------------------------------------------------ Merged /accumulo/branches/1.3/src/server:r1354669 Merged /accumulo/branches/1.4/server:r1343943-1349971,1349973-1351424,1351426-1354668,1354673-1356923 Merged /accumulo/trunk/src/server:r1341000,1342373,1351691,1356400 Merged /accumulo/branches/1.4/src/server:r1339309-1342420,1343897-1343898,1343943-1349971,1349973-1351424,1351426-1354669,1354673-1356900 Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java?rev=1356949&r1=1356948&r2=1356949&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/client/BulkImporter.java Tue Jul 3 21:13:15 2012 @@ -64,8 +64,6 @@ import org.apache.accumulo.core.util.Uti import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.Writer; import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; import org.apache.thrift.TServiceClient; @@ -299,48 +297,16 @@ public class BulkImporter { if (completeFailures.size() == 0) return Collections.emptySet(); - log.error("The following map files failed completely, saving this info to : " + new Path(failureDir, "failures.seq")); + log.debug("The following map files failed "); for (Entry<Path,List<KeyExtent>> entry : es) { List<KeyExtent> extents = entry.getValue(); for (KeyExtent keyExtent : extents) - log.error("\t" + entry.getKey() + " -> " + keyExtent); + log.debug("\t" + entry.getKey() + " -> " + keyExtent); } - - try { - - Writer outSeq = SequenceFile.createWriter(fs, conf, new Path(failureDir, "failures.seq"), Text.class, KeyExtent.class); - - for (Entry<Path,List<KeyExtent>> entry : es) { - List<KeyExtent> extents = entry.getValue(); - - for (KeyExtent keyExtent : extents) - outSeq.append(new Text(entry.getKey().toString()), keyExtent); - } - - outSeq.close(); - } catch (IOException ioe) { - log.error("Failed to create " + new Path(failureDir, "failures.seq") + " : " + ioe.getMessage()); - } - - // we should make copying multi-threaded - Set<Path> failedCopies = new HashSet<Path>(); - - for (Entry<Path,List<KeyExtent>> entry : es) { - Path dest = new Path(failureDir, entry.getKey().getName()); - - log.debug("Copying " + entry.getKey() + " to " + dest); - - try { - org.apache.hadoop.fs.FileUtil.copy(fs, entry.getKey(), fs, dest, false, conf); - } catch (IOException ioe) { - log.error("Failed to copy " + entry.getKey() + " : " + ioe.getMessage()); - failedCopies.add(entry.getKey()); - } - } - - return failedCopies; + + return Collections.emptySet(); } private class AssignmentInfo { Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java?rev=1356949&r1=1356948&r2=1356949&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/tableOps/BulkImport.java Tue Jul 3 21:13:15 2012 @@ -16,13 +16,19 @@ */ package org.apache.accumulo.server.master.tableOps; +import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -33,6 +39,8 @@ import org.apache.accumulo.cloudtrace.in import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.impl.ServerClient; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.thrift.ClientService; @@ -42,10 +50,12 @@ import org.apache.accumulo.core.client.i import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.security.thrift.AuthInfo; -import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.SimpleThreadPool; import org.apache.accumulo.core.util.UtilWaitThread; @@ -58,14 +68,16 @@ import org.apache.accumulo.server.master import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.security.SecurityConstants; import org.apache.accumulo.server.tabletserver.UniqueNameAllocator; -import org.apache.accumulo.server.trace.TraceFileSystem; import org.apache.accumulo.server.util.MetadataTable; +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.MapFile; +import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; import org.apache.thrift.TException; @@ -133,9 +145,8 @@ public class BulkImport extends MasterRe Utils.getReadLock(tableId, tid).lock(); // check that the error directory exists and is empty - FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(), - ServerConfiguration.getSiteConfiguration())); - ; + FileSystem fs = master.getFileSystem(); + Path errorPath = new Path(errorDir); FileStatus errorStatus = fs.getFileStatus(errorPath); if (errorStatus == null) @@ -273,24 +284,6 @@ class CleanUpBulkImport extends MasterRe } @Override - public long isReady(long tid, Master master) throws Exception { - Set<TServerInstance> finished = new HashSet<TServerInstance>(); - Set<TServerInstance> running = master.onlineTabletServers(); - for (TServerInstance server : running) { - try { - TServerConnection client = master.getConnection(server); - if (client != null && !client.isActive(tid)) - finished.add(server); - } catch (TException ex) { - log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + ex); - } - } - if (finished.containsAll(running)) - return 0; - return 1000; - } - - @Override public Repo<Master> call(long tid, Master master) throws Exception { log.debug("removing the bulk processing flag file in " + bulk); Path bulkDir = new Path(bulk); @@ -327,8 +320,124 @@ class CompleteBulkImport extends MasterR @Override public Repo<Master> call(long tid, Master master) throws Exception { ZooArbitrator.stop(Constants.BULK_ARBITRATOR_TYPE, tid); + return new CopyFailed(tableId, source, bulk, error); + } +} + +class CopyFailed extends MasterRepo { + + private static final long serialVersionUID = 1L; + + private String tableId; + private String source; + private String bulk; + private String error; + + public CopyFailed(String tableId, String source, String bulk, String error) { + this.tableId = tableId; + this.source = source; + this.bulk = bulk; + this.error = error; + } + + @Override + public long isReady(long tid, Master master) throws Exception { + Set<TServerInstance> finished = new HashSet<TServerInstance>(); + Set<TServerInstance> running = master.onlineTabletServers(); + for (TServerInstance server : running) { + try { + TServerConnection client = master.getConnection(server); + if (client != null && !client.isActive(tid)) + finished.add(server); + } catch (TException ex) { + log.info("Ignoring error trying to check on tid " + tid + " from server " + server + ": " + ex); + } + } + if (finished.containsAll(running)) + return 0; + return 500; + } + + @Override + public Repo<Master> call(long tid, Master environment) throws Exception { + //This needs to execute after the arbiter is stopped + + FileSystem fs = environment.getFileSystem(); + + if (!fs.exists(new Path(error, "failures.txt"))) + return new CleanUpBulkImport(tableId, source, bulk, error); + + HashMap<String,String> failures = new HashMap<String,String>(); + HashMap<String,String> loadedFailures = new HashMap<String,String>(); + + FSDataInputStream failFile = fs.open(new Path(error, "failures.txt")); + BufferedReader in = new BufferedReader(new InputStreamReader(failFile)); + try { + String line = null; + while ((line = in.readLine()) != null) { + Path path = new Path(line); + if (!fs.exists(new Path(error, path.getName()))) + failures.put("/" + path.getParent().getName() + "/" + path.getName(), line); + } + } finally { + failFile.close(); + } + + /* + * I thought I could move files that have no file references in the table. However its possible a clone references a file. Therefore only move files that + * have no loaded markers. + */ + + // determine which failed files were loaded + AuthInfo creds = SecurityConstants.getSystemCredentials(); + Connector conn = HdfsZooInstance.getInstance().getConnector(creds.user, creds.password); + Scanner mscanner = new IsolatedScanner(conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS)); + mscanner.setRange(new KeyExtent(new Text(tableId), null, null).toMetadataRange()); + mscanner.fetchColumnFamily(Constants.METADATA_BULKFILE_COLUMN_FAMILY); + + for (Entry<Key,Value> entry : mscanner) { + if (Long.parseLong(entry.getValue().toString()) == tid) { + String loadedFile = entry.getKey().getColumnQualifierData().toString(); + String absPath = failures.remove(loadedFile); + if (absPath != null) { + loadedFailures.put(loadedFile, absPath); + } + } + } + + // move failed files that were not loaded + for (String failure : failures.values()) { + Path orig = new Path(failure); + Path dest = new Path(error, orig.getName()); + fs.rename(orig, dest); + log.debug("tid " + tid + " renamed " + orig + " to " + dest + ": failed"); + } + + if (loadedFailures.size() > 0) { + DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + + Constants.ZBULK_FAILED_COPYQ); + + HashSet<String> workIds = new HashSet<String>(); + + for (String failure : loadedFailures.values()) { + Path orig = new Path(failure); + Path dest = new Path(error, orig.getName()); + + if (fs.exists(dest)) + continue; + + bifCopyQueue.addWork(orig.getName(), (failure + "," + dest).getBytes()); + workIds.add(orig.getName()); + log.debug("tid " + tid + " added to copyq: " + orig + " to " + dest + ": failed"); + } + + bifCopyQueue.waitUntilDone(workIds); + } + + fs.delete(new Path(error, "failures.txt"), true); return new CleanUpBulkImport(tableId, source, bulk, error); } + } class LoadFiles extends MasterRepo { @@ -375,8 +484,7 @@ class LoadFiles extends MasterRepo { public Repo<Master> call(final long tid, final Master master) throws Exception { initializeThreadPool(master); final SiteConfiguration conf = ServerConfiguration.getSiteConfiguration(); - FileSystem fs = TraceFileSystem.wrap(org.apache.accumulo.core.file.FileUtil.getFileSystem(CachedConfiguration.getInstance(), - ServerConfiguration.getSiteConfiguration())); + FileSystem fs = master.getFileSystem(); List<FileStatus> files = new ArrayList<FileStatus>(); for (FileStatus entry : fs.listStatus(new Path(bulk))) { files.add(entry); @@ -448,23 +556,18 @@ class LoadFiles extends MasterRepo { UtilWaitThread.sleep(100); } } - // Copy/Create failed file markers - for (String f : filesToLoad) { - Path orig = new Path(f); - Path dest = new Path(errorDir, orig.getName()); - try { - FileUtil.copy(fs, orig, fs, dest, false, true, CachedConfiguration.getInstance()); - log.debug("tid " + tid + " copied " + orig + " to " + dest + ": failed"); - } catch (IOException ex) { - try { - fs.create(dest).close(); - log.debug("tid " + tid + " marked " + dest + " failed"); - } catch (IOException e) { - log.error("Unable to create failure flag file " + dest, e); - } + + FSDataOutputStream failFile = fs.create(new Path(errorDir, "failures.txt"), true); + BufferedWriter out = new BufferedWriter(new OutputStreamWriter(failFile)); + try { + for (String f : filesToLoad) { + out.write(f); + out.write("\n"); } + } finally { + out.close(); } - + // return the next step, which will perform cleanup return new CompleteBulkImport(tableId, source, bulk, errorDir); } Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1356949&r1=1356948&r2=1356949&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Tue Jul 3 21:13:15 2012 @@ -51,8 +51,10 @@ import java.util.concurrent.ArrayBlockin import java.util.concurrent.BlockingDeque; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -124,6 +126,7 @@ import org.apache.accumulo.core.util.Cac import org.apache.accumulo.core.util.ColumnFQ; import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.util.LoggingRunnable; +import org.apache.accumulo.core.util.NamingThreadFactory; import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.util.ServerServices.Service; import org.apache.accumulo.core.util.Stat; @@ -186,6 +189,7 @@ import org.apache.accumulo.server.util.T import org.apache.accumulo.server.util.TServerUtils.ServerPort; import org.apache.accumulo.server.util.time.RelativeTime; import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.accumulo.server.zookeeper.DistributedWorkQueue; import org.apache.accumulo.server.zookeeper.TransactionWatcher; import org.apache.accumulo.server.zookeeper.ZooCache; import org.apache.accumulo.server.zookeeper.ZooLock; @@ -2569,6 +2573,8 @@ public class TabletServer extends Abstra private TServer server; + private DistributedWorkQueue bulkFailedCopyQ; + private static final String METRICS_PREFIX = "tserver"; private static ObjectName OBJECT_NAME = null; @@ -2715,6 +2721,16 @@ public class TabletServer extends Abstra throw new RuntimeException(ex); } + ThreadPoolExecutor distWorkQThreadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS), + new NamingThreadFactory("distributed work queue")); + + bulkFailedCopyQ = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() + Constants.ZBULK_FAILED_COPYQ); + try { + bulkFailedCopyQ.startProcessing(new BulkFailedCopyProcessor(), distWorkQThreadPool); + } catch (Exception e1) { + throw new RuntimeException("Failed to start distributed work queue for copying ", e1); + } + try { OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerMBean,instance=" + Thread.currentThread().getName()); // Do this because interface not in same package. Copied: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java (from r1356900, accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java) URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java?p2=accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java&p1=accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java&r1=1356900&r2=1356949&rev=1356949&view=diff ============================================================================== --- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java Tue Jul 3 21:13:15 2012 @@ -24,8 +24,8 @@ import java.util.TimerTask; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.accumulo.core.zookeeper.ZooUtil.NodeExistsPolicy; -import org.apache.accumulo.core.zookeeper.ZooUtil.NodeMissingPolicy; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; +import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.accumulo.server.util.time.SimpleTimer; import org.apache.log4j.Logger; import org.apache.zookeeper.KeeperException; Propchange: accumulo/trunk/src/ ------------------------------------------------------------------------------ Merged /accumulo/trunk:r1341000,1344302,1344358,1356400 Merged /accumulo/branches/1.4/src:r1343943-1349971,1349973-1351424,1351426-1354668,1354673-1356923 Merged /accumulo/branches/1.3/src:r1354669 Merged /accumulo/branches/1.4/src/src:r1339309-1342420,1343897-1343898,1343943-1349971,1349973-1351424,1351426-1354669,1354673-1356900 Modified: accumulo/trunk/test/system/test4/bulk_import_test.sh URL: http://svn.apache.org/viewvc/accumulo/trunk/test/system/test4/bulk_import_test.sh?rev=1356949&r1=1356948&r2=1356949&view=diff ============================================================================== --- accumulo/trunk/test/system/test4/bulk_import_test.sh (original) +++ accumulo/trunk/test/system/test4/bulk_import_test.sh Tue Jul 3 21:13:15 2012 @@ -19,11 +19,11 @@ hadoop dfs -rmr /testmf echo "creating first set of map files" -../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile /testmf/mf01 -timestamp 1 -size 50 -random 56 1000000 0 1 & -../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile /testmf/mf02 -timestamp 1 -size 50 -random 56 1000000 1000000 1 & -../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile /testmf/mf03 -timestamp 1 -size 50 -random 56 1000000 2000000 1 & -../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile /testmf/mf04 -timestamp 1 -size 50 -random 56 1000000 3000000 1 & -../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile /testmf/mf05 -timestamp 1 -size 50 -random 56 1000000 4000000 1 & +../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile /testmf/mf01 -timestamp 1 -size 50 -random 56 1000000 0 1 & +../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile /testmf/mf02 -timestamp 1 -size 50 -random 56 1000000 1000000 1 & +../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile /testmf/mf03 -timestamp 1 -size 50 -random 56 1000000 2000000 1 & +../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile /testmf/mf04 -timestamp 1 -size 50 -random 56 1000000 3000000 1 & +../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile /testmf/mf05 -timestamp 1 -size 50 -random 56 1000000 4000000 1 & wait @@ -46,11 +46,11 @@ hadoop dfs -rmr /testmf echo "creating second set of map files" -../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile /testmf/mf01 -timestamp 2 -size 50 -random 57 1000000 0 1 & -../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile /testmf/mf02 -timestamp 2 -size 50 -random 57 1000000 1000000 1 & -../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile /testmf/mf03 -timestamp 2 -size 50 -random 57 1000000 2000000 1 & -../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile /testmf/mf04 -timestamp 2 -size 50 -random 57 1000000 3000000 1 & -../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -mapFile /testmf/mf05 -timestamp 2 -size 50 -random 57 1000000 4000000 1 & +../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile /testmf/mf01 -timestamp 2 -size 50 -random 57 1000000 0 1 & +../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile /testmf/mf02 -timestamp 2 -size 50 -random 57 1000000 1000000 1 & +../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile /testmf/mf03 -timestamp 2 -size 50 -random 57 1000000 2000000 1 & +../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile /testmf/mf04 -timestamp 2 -size 50 -random 57 1000000 3000000 1 & +../../../bin/accumulo org.apache.accumulo.server.test.TestIngest -rfile /testmf/mf05 -timestamp 2 -size 50 -random 57 1000000 4000000 1 & wait