http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index afd3454..aeb73b4 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -19,7 +19,6 @@ package org.apache.accumulo.tserver; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD; -import java.io.FileNotFoundException; import java.io.IOException; import java.lang.management.ManagementFactory; import java.net.UnknownHostException; @@ -30,6 +29,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -45,6 +45,7 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.BlockingDeque; import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; @@ -147,8 +148,8 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.server.Accumulo; import org.apache.accumulo.server.AccumuloServerContext; import org.apache.accumulo.server.GarbageCollectionLogger; -import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.ServerOpts; +import org.apache.accumulo.server.TabletLevel; import org.apache.accumulo.server.client.ClientServiceHandler; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfigurationFactory; @@ -1440,6 +1441,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { } } + @Override public void loadTablet(TInfo tinfo, TCredentials credentials, String lock, final TKeyExtent textent) { @@ -1500,6 +1502,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { final AssignmentHandler ah = new AssignmentHandler(extent); // final Runnable ah = new LoggingRunnable(log, ); // Root tablet assignment must take place immediately + if (extent.isRootTablet()) { new Daemon("Root Tablet Assignment") { @Override @@ -1692,66 +1695,6 @@ public class TabletServer extends AccumuloServerContext implements Runnable { } @Override - public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException { - String myname = getClientAddressString(); - myname = myname.replace(':', '+'); - Set<String> loggers = new HashSet<String>(); - logger.getLogFiles(loggers); - Set<String> loggerUUIDs = new HashSet<String>(); - for (String logger : loggers) - loggerUUIDs.add(new Path(logger).getName()); - - nextFile: for (String filename : filenames) { - String uuid = new Path(filename).getName(); - // skip any log we're currently using - if (loggerUUIDs.contains(uuid)) - continue nextFile; - - List<Tablet> onlineTabletsCopy = new ArrayList<Tablet>(); - synchronized (onlineTablets) { - onlineTabletsCopy.addAll(onlineTablets.values()); - } - for (Tablet tablet : onlineTabletsCopy) { - for (String current : tablet.getCurrentLogFiles()) { - if (current.contains(uuid)) { - log.info("Attempted to delete " + filename + " from tablet " + tablet.getExtent()); - continue nextFile; - } - } - } - - try { - Path source = new Path(filename); - if (TabletServer.this.getConfiguration().getBoolean(Property.TSERV_ARCHIVE_WALOGS)) { - Path walogArchive = fs.matchingFileSystem(source, ServerConstants.getWalogArchives()); - fs.mkdirs(walogArchive); - Path dest = new Path(walogArchive, source.getName()); - log.info("Archiving walog " + source + " to " + dest); - if (!fs.rename(source, dest)) - log.error("rename is unsuccessful"); - } else { - log.info("Deleting walog " + filename); - Path sourcePath = new Path(filename); - if (!(!TabletServer.this.getConfiguration().getBoolean(Property.GC_TRASH_IGNORE) && fs.moveToTrash(sourcePath)) - && !fs.deleteRecursively(sourcePath)) - log.warn("Failed to delete walog " + source); - for (String recovery : ServerConstants.getRecoveryDirs()) { - Path recoveryPath = new Path(recovery, source.getName()); - try { - if (fs.moveToTrash(recoveryPath) || fs.deleteRecursively(recoveryPath)) - log.info("Deleted any recovery log " + filename); - } catch (FileNotFoundException ex) { - // ignore - } - } - } - } catch (IOException e) { - log.warn("Error attempting to delete write-ahead log " + filename + ": " + e); - } - } - } - - @Override public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException { try { checkPermission(credentials, null, "getActiveCompactions"); @@ -1772,14 +1715,20 @@ public class TabletServer extends AccumuloServerContext implements Runnable { @Override public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) throws TException { - Set<String> logs = new HashSet<String>(); - logger.getLogFiles(logs); - return new ArrayList<String>(logs); + String log = logger.getLogFile(); + return Collections.singletonList(log); + } + + @Override + public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException { + log.warn("Garbage collector is attempting to remove logs through the tablet server"); + log.warn("This is probably because your file Garbage Collector is an older version than your tablet servers.\n" + + "Restart your file Garbage Collector."); } } private class SplitRunner implements Runnable { - private Tablet tablet; + private final Tablet tablet; public SplitRunner(Tablet tablet) { this.tablet = tablet; @@ -2033,7 +1982,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { log.error("Unexpected error ", e); } log.debug("Unassigning " + tls); - TabletStateStore.unassign(TabletServer.this, tls); + TabletStateStore.unassign(TabletServer.this, tls, null); } catch (DistributedStoreException ex) { log.warn("Unable to update storage", ex); } catch (KeeperException e) { @@ -2243,29 +2192,6 @@ public class TabletServer extends AccumuloServerContext implements Runnable { } } - public void addLoggersToMetadata(List<DfsLogger> logs, KeyExtent extent, int id) { - if (!this.onlineTablets.containsKey(extent)) { - log.info("Not adding " + logs.size() + " logs for extent " + extent + " as alias " + id + " tablet is offline"); - // minor compaction due to recovery... don't make updates... if it finishes, there will be no WALs, - // if it doesn't, we'll need to do the same recovery with the old files. - return; - } - - log.info("Adding " + logs.size() + " logs for extent " + extent + " as alias " + id); - long now = RelativeTime.currentTimeMillis(); - List<String> logSet = new ArrayList<String>(); - for (DfsLogger log : logs) - logSet.add(log.getFileName()); - LogEntry entry = new LogEntry(); - entry.extent = extent; - entry.tabletId = id; - entry.timestamp = now; - entry.server = logs.get(0).getLogger(); - entry.filename = logs.get(0).getFileName(); - entry.logSet = logSet; - MetadataTableUtil.addLogEntry(this, entry, getLock()); - } - private HostAndPort startServer(AccumuloConfiguration conf, String address, Property portHint, TProcessor processor, String threadName) throws UnknownHostException { Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); @@ -2984,6 +2910,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { public void minorCompactionFinished(CommitSession tablet, String newDatafile, int walogSeq) throws IOException { totalMinorCompactions.incrementAndGet(); logger.minorCompactionFinished(tablet, newDatafile, walogSeq); + markUnusedWALs(); } public void minorCompactionStarted(CommitSession tablet, int lastUpdateSequence, String newMapfileLocation) throws IOException { @@ -3002,14 +2929,11 @@ public class TabletServer extends AccumuloServerContext implements Runnable { }); for (LogEntry entry : sorted) { Path recovery = null; - for (String log : entry.logSet) { - Path finished = RecoveryPath.getRecoveryPath(fs, fs.getFullPath(FileType.WAL, log)); - finished = SortedLogState.getFinishedMarkerPath(finished); - TabletServer.log.info("Looking for " + finished); - if (fs.exists(finished)) { - recovery = finished.getParent(); - break; - } + Path finished = RecoveryPath.getRecoveryPath(fs, fs.getFullPath(FileType.WAL, entry.filename)); + finished = SortedLogState.getFinishedMarkerPath(finished); + TabletServer.log.info("Looking for " + finished); + if (fs.exists(finished)) { + recovery = finished.getParent(); } if (recovery == null) throw new IOException("Unable to find recovery files for extent " + extent + " logEntry: " + entry); @@ -3046,7 +2970,9 @@ public class TabletServer extends AccumuloServerContext implements Runnable { } public Collection<Tablet> getOnlineTablets() { - return Collections.unmodifiableCollection(onlineTablets.values()); + synchronized (onlineTablets) { + return new ArrayList<Tablet>(onlineTablets.values()); + } } public VolumeManager getFileSystem() { @@ -3072,4 +2998,62 @@ public class TabletServer extends AccumuloServerContext implements Runnable { public SecurityOperation getSecurityOperation() { return security; } + + // avoid unnecessary redundant markings to meta + final ConcurrentHashMap<DfsLogger, EnumSet<TabletLevel>> metadataTableLogs = new ConcurrentHashMap<>(); + final Object levelLocks[] = new Object[TabletLevel.values().length]; + { + for (int i = 0; i < levelLocks.length; i++) { + levelLocks[i] = new Object(); + } + } + + + // remove any meta entries after a rolled log is no longer referenced + Set<DfsLogger> closedLogs = new HashSet<>(); + + private void markUnusedWALs() { + Set<DfsLogger> candidates; + synchronized (closedLogs) { + candidates = new HashSet<>(closedLogs); + } + for (Tablet tablet : getOnlineTablets()) { + candidates.removeAll(tablet.getCurrentLogFiles()); + } + try { + Set<Path> filenames = new HashSet<>(); + for (DfsLogger candidate : candidates) { + filenames.add(candidate.getPath()); + } + MetadataTableUtil.markLogUnused(this, this.getLock(), this.getTabletSession(), filenames); + synchronized (closedLogs) { + closedLogs.removeAll(candidates); + } + } catch (AccumuloException ex) { + log.info(ex.toString(), ex); + } + } + + public void addLoggersToMetadata(DfsLogger copy, TabletLevel level) { + // serialize the updates to the metadata per level: avoids updating the level more than once + // updating one level, may cause updates to other levels, so we need to release the lock on metadataTableLogs + synchronized (levelLocks[level.ordinal()]) { + EnumSet<TabletLevel> set = null; + set = metadataTableLogs.putIfAbsent(copy, EnumSet.of(level)); + if (set == null || !set.contains(level) || level == TabletLevel.ROOT) { + log.info("Writing log marker for level " + level + " " + copy.getFileName()); + MetadataTableUtil.addNewLogMarker(this, this.getLock(), this.getTabletSession(), copy.getPath(), level); + } + set = metadataTableLogs.get(copy); + set.add(level); + } + } + + public void walogClosed(DfsLogger currentLog) { + metadataTableLogs.remove(currentLog); + synchronized (closedLogs) { + closedLogs.add(currentLog); + } + } + }
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index 8512690..cd7ce08 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@ -72,7 +72,7 @@ import com.google.common.base.Optional; * Wrap a connection to a logger. * */ -public class DfsLogger { +public class DfsLogger implements Comparable<DfsLogger> { public static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---"; public static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---"; @@ -371,6 +371,7 @@ public class DfsLogger { public synchronized void open(String address) throws IOException { String filename = UUID.randomUUID().toString(); + log.debug("Address is " + address); String logger = Joiner.on("+").join(address.split(":")); log.debug("DfsLogger.open() begin"); @@ -463,7 +464,11 @@ public class DfsLogger { } public String getFileName() { - return logPath.toString(); + return logPath; + } + + public Path getPath() { + return new Path(logPath); } public void close() throws IOException { @@ -609,4 +614,9 @@ public class DfsLogger { return Joiner.on(":").join(parts[parts.length - 2].split("[+]")); } + @Override + public int compareTo(DfsLogger o) { + return getFileName().compareTo(o.getFileName()); + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java index 37882cd..ab3dea2 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java @@ -180,7 +180,7 @@ public class SortedLogRecovery { // find the maximum tablet id... because a tablet may leave a tserver and then come back, in which case it would have a different tablet id // for the maximum tablet id, find the minimum sequence #... may be ok to find the max seq, but just want to make the code behave like it used to while (reader.next(key, value)) { - // LogReader.printEntry(entry); + // log.debug("Event " + key.event + " tablet " + key.tablet); if (key.event != DEFINE_TABLET) break; if (key.tablet.equals(extent) || key.tablet.equals(alternative)) { @@ -209,7 +209,7 @@ public class SortedLogRecovery { if (lastStartToFinish.compactionStatus == Status.INITIAL) lastStartToFinish.compactionStatus = Status.COMPLETE; if (key.seq <= lastStartToFinish.lastStart) - throw new RuntimeException("Sequence numbers are not increasing for start/stop events."); + throw new RuntimeException("Sequence numbers are not increasing for start/stop events: " + key.seq + " vs " + lastStartToFinish.lastStart); lastStartToFinish.update(fileno, key.seq); // Tablet server finished the minor compaction, but didn't remove the entry from the METADATA table. @@ -218,7 +218,7 @@ public class SortedLogRecovery { lastStartToFinish.update(-1); } else if (key.event == COMPACTION_FINISH) { if (key.seq <= lastStartToFinish.lastStart) - throw new RuntimeException("Sequence numbers are not increasing for start/stop events."); + throw new RuntimeException("Sequence numbers are not increasing for start/stop events: " + key.seq + " vs " + lastStartToFinish.lastStart); if (lastStartToFinish.compactionStatus == Status.INITIAL) lastStartToFinish.compactionStatus = Status.LOOKING_FOR_FINISH; else if (lastStartToFinish.lastFinish > lastStartToFinish.lastStart) @@ -249,8 +249,6 @@ public class SortedLogRecovery { break; if (key.tid != tid) break; - // log.info("Replaying " + key); - // log.info(value); if (key.event == MUTATION) { mr.receive(value.mutations.get(0)); } else if (key.event == MANY_MUTATIONS) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 1d385d9..bc77ffb 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -21,14 +21,16 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -37,7 +39,9 @@ import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.impl.KeyExtent; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationConfigurationUtil; +import org.apache.accumulo.core.util.SimpleThreadPool; import org.apache.accumulo.core.util.UtilWaitThread; +import org.apache.accumulo.server.TabletLevel; import org.apache.accumulo.server.conf.TableConfiguration; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.replication.StatusUtil; @@ -72,20 +76,22 @@ public class TabletServerLogger { private final TabletServer tserver; - // The current log set: always updated to a new set with every change of loggers - private final List<DfsLogger> loggers = new ArrayList<DfsLogger>(); + // The current logger + private DfsLogger currentLog = null; + private final SynchronousQueue<Object> nextLog = new SynchronousQueue<>(); + private ThreadPoolExecutor nextLogMaker; - // The current generation of logSet. - // Because multiple threads can be using a log set at one time, a log + // The current generation of logs. + // Because multiple threads can be using a log at one time, a log // failure is likely to affect multiple threads, who will all attempt to - // create a new logSet. This will cause many unnecessary updates to the + // create a new log. This will cause many unnecessary updates to the // metadata table. // We'll use this generational counter to determine if another thread has - // already fetched a new logSet. - private AtomicInteger logSetId = new AtomicInteger(); + // already fetched a new log. + private final AtomicInteger logId = new AtomicInteger(); // Use a ReadWriteLock to allow multiple threads to use the log set, but obtain a write lock to change them - private final ReentrantReadWriteLock logSetLock = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock logIdLock = new ReentrantReadWriteLock(); private final AtomicInteger seqGen = new AtomicInteger(); @@ -146,62 +152,66 @@ public class TabletServerLogger { this.flushCounter = flushCounter; } - private int initializeLoggers(final List<DfsLogger> copy) throws IOException { - final int[] result = {-1}; - testLockAndRun(logSetLock, new TestCallWithWriteLock() { + private DfsLogger initializeLoggers(final AtomicInteger logIdOut) throws IOException { + final AtomicReference<DfsLogger> result = new AtomicReference<DfsLogger>(); + testLockAndRun(logIdLock, new TestCallWithWriteLock() { @Override boolean test() { - copy.clear(); - copy.addAll(loggers); - if (!loggers.isEmpty()) - result[0] = logSetId.get(); - return loggers.isEmpty(); + result.set(currentLog); + if (currentLog != null) + logIdOut.set(logId.get()); + return currentLog == null; } @Override void withWriteLock() throws IOException { try { - createLoggers(); - copy.clear(); - copy.addAll(loggers); - if (copy.size() > 0) - result[0] = logSetId.get(); + createLogger(); + result.set(currentLog); + if (currentLog != null) + logIdOut.set(logId.get()); else - result[0] = -1; + logIdOut.set(-1); } catch (IOException e) { log.error("Unable to create loggers", e); } } }); - return result[0]; + return result.get(); } - public void getLogFiles(Set<String> loggersOut) { - logSetLock.readLock().lock(); + public String getLogFile() { + logIdLock.readLock().lock(); try { - for (DfsLogger logger : loggers) { - loggersOut.add(logger.getFileName()); - } + return currentLog.getFileName(); } finally { - logSetLock.readLock().unlock(); + logIdLock.readLock().unlock(); } } - synchronized private void createLoggers() throws IOException { - if (!logSetLock.isWriteLockedByCurrentThread()) { + synchronized private void createLogger() throws IOException { + if (!logIdLock.isWriteLockedByCurrentThread()) { throw new IllegalStateException("createLoggers should be called with write lock held!"); } - if (loggers.size() != 0) { - throw new IllegalStateException("createLoggers should not be called when loggers.size() is " + loggers.size()); + if (currentLog != null) { + throw new IllegalStateException("createLoggers should not be called when current log is set"); } try { - DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter); - alog.open(tserver.getClientAddressString()); - loggers.add(alog); - logSetId.incrementAndGet(); - return; + startLogMaker(); + Object next = nextLog.take(); + if (next instanceof Exception) { + throw (Exception)next; + } + if (next instanceof DfsLogger) { + currentLog = (DfsLogger)next; + logId.incrementAndGet(); + log.info("Using next log " + currentLog.getFileName()); + return; + } else { + throw new RuntimeException("Error: unexpected type seen: " + next); + } } catch (Exception t) { walErrors.put(System.currentTimeMillis(), ""); if (walErrors.size() >= HALT_AFTER_ERROR_COUNT) { @@ -211,22 +221,63 @@ public class TabletServerLogger { } } + private synchronized void startLogMaker() { + if (nextLogMaker != null) { + return; + } + nextLogMaker = new SimpleThreadPool(1, "WALog creator"); + nextLogMaker.submit(new Runnable() { + @Override + public void run() { + while (!nextLogMaker.isShutdown()) { + try { + log.debug("Creating next WAL"); + DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter); + alog.open(tserver.getClientAddressString()); + log.debug("Created next WAL " + alog.getFileName()); + while (!nextLog.offer(alog, 12, TimeUnit.HOURS)) { + log.info("Our WAL was not used for 12 hours: " + alog.getFileName()); + } + } catch (Exception t) { + log.error("{}", t.getMessage(), t); + try { + nextLog.offer(t, 12, TimeUnit.HOURS); + } catch (InterruptedException ex) { + // ignore + } + } + } + } + }); + } + + public void resetLoggers() throws IOException { + logIdLock.writeLock().lock(); + try { + close(); + } finally { + logIdLock.writeLock().unlock(); + } + } + synchronized private void close() throws IOException { - if (!logSetLock.isWriteLockedByCurrentThread()) { + if (!logIdLock.isWriteLockedByCurrentThread()) { throw new IllegalStateException("close should be called with write lock held!"); } try { - for (DfsLogger logger : loggers) { + if (null != currentLog) { try { - logger.close(); + currentLog.close(); } catch (DfsLogger.LogClosedException ex) { // ignore } catch (Throwable ex) { - log.error("Unable to cleanly close log " + logger.getFileName() + ": " + ex, ex); + log.error("Unable to cleanly close log " + currentLog.getFileName() + ": " + ex, ex); + } finally { + this.tserver.walogClosed(currentLog); } + currentLog = null; + logSizeEstimate.set(0); } - loggers.clear(); - logSizeEstimate.set(0); } catch (Throwable t) { throw new IOException(t); } @@ -243,7 +294,7 @@ public class TabletServerLogger { private int write(final Collection<CommitSession> sessions, boolean mincFinish, Writer writer) throws IOException { // Work very hard not to lock this during calls to the outside world - int currentLogSet = logSetId.get(); + int currentLogId = logId.get(); int seq = -1; int attempt = 1; @@ -251,20 +302,22 @@ public class TabletServerLogger { while (!success) { try { // get a reference to the loggers that no other thread can touch - ArrayList<DfsLogger> copy = new ArrayList<DfsLogger>(); - currentLogSet = initializeLoggers(copy); + DfsLogger copy = null; + AtomicInteger currentId = new AtomicInteger(-1); + copy = initializeLoggers(currentId); + currentLogId = currentId.get(); // add the logger to the log set for the memory in the tablet, // update the metadata table if we've never used this tablet - if (currentLogSet == logSetId.get()) { + if (currentLogId == logId.get()) { for (CommitSession commitSession : sessions) { if (commitSession.beginUpdatingLogsUsed(copy, mincFinish)) { try { // Scribble out a tablet definition and then write to the metadata table defineTablet(commitSession); - if (currentLogSet == logSetId.get()) - tserver.addLoggersToMetadata(copy, commitSession.getExtent(), commitSession.getLogId()); + if (currentLogId == logId.get()) + tserver.addLoggersToMetadata(copy, TabletLevel.getLevel(commitSession.getExtent())); } finally { commitSession.finishUpdatingLogsUsed(); } @@ -272,39 +325,29 @@ public class TabletServerLogger { // Need to release KeyExtent extent = commitSession.getExtent(); if (ReplicationConfigurationUtil.isEnabled(extent, tserver.getTableConfiguration(extent))) { - Set<String> logs = new HashSet<String>(); - for (DfsLogger logger : copy) { - logs.add(logger.getFileName()); - } - Status status = StatusUtil.fileCreated(System.currentTimeMillis()); - log.debug("Writing " + ProtobufUtil.toString(status) + " to metadata table for " + logs); + Status status = StatusUtil.openWithUnknownLength(System.currentTimeMillis()); + log.debug("Writing " + ProtobufUtil.toString(status) + " to metadata table for " + copy.getFileName()); // Got some new WALs, note this in the metadata table - ReplicationTableUtil.updateFiles(tserver, commitSession.getExtent(), logs, status); + ReplicationTableUtil.updateFiles(tserver, commitSession.getExtent(), copy.getFileName(), status); } } } } // Make sure that the logs haven't changed out from underneath our copy - if (currentLogSet == logSetId.get()) { + if (currentLogId == logId.get()) { // write the mutation to the logs seq = seqGen.incrementAndGet(); if (seq < 0) throw new RuntimeException("Logger sequence generator wrapped! Onos!!!11!eleven"); - ArrayList<LoggerOperation> queuedOperations = new ArrayList<LoggerOperation>(copy.size()); - for (DfsLogger wal : copy) { - LoggerOperation lop = writer.write(wal, seq); - if (lop != null) - queuedOperations.add(lop); - } - - for (LoggerOperation lop : queuedOperations) { + LoggerOperation lop = writer.write(copy, seq); + if (lop != null) { lop.await(); } // double-check: did the log set change? - success = (currentLogSet == logSetId.get()); + success = (currentLogId == logId.get()); } } catch (DfsLogger.LogClosedException ex) { log.debug("Logs closed while writing, retrying " + attempt); @@ -319,13 +362,13 @@ public class TabletServerLogger { // Some sort of write failure occurred. Grab the write lock and reset the logs. // But since multiple threads will attempt it, only attempt the reset when // the logs haven't changed. - final int finalCurrent = currentLogSet; + final int finalCurrent = currentLogId; if (!success) { - testLockAndRun(logSetLock, new TestCallWithWriteLock() { + testLockAndRun(logIdLock, new TestCallWithWriteLock() { @Override boolean test() { - return finalCurrent == logSetId.get(); + return finalCurrent == logId.get(); } @Override @@ -338,7 +381,7 @@ public class TabletServerLogger { } // if the log gets too big, reset it .. grab the write lock first logSizeEstimate.addAndGet(4 * 3); // event, tid, seq overhead - testLockAndRun(logSetLock, new TestCallWithWriteLock() { + testLockAndRun(logIdLock, new TestCallWithWriteLock() { @Override boolean test() { return logSizeEstimate.get() > maxSize; http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java index d908f1d..dee705c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java @@ -16,7 +16,6 @@ */ package org.apache.accumulo.tserver.tablet; -import java.util.ArrayList; import java.util.List; import org.apache.accumulo.core.data.Mutation; @@ -86,7 +85,7 @@ public class CommitSession { return committer; } - public boolean beginUpdatingLogsUsed(ArrayList<DfsLogger> copy, boolean mincFinish) { + public boolean beginUpdatingLogsUsed(DfsLogger copy, boolean mincFinish) { return committer.beginUpdatingLogsUsed(memTable, copy, mincFinish); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java index db1b418..ab15ccc 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java @@ -424,7 +424,9 @@ class DatafileManager { if (log.isDebugEnabled()) { log.debug("Recording that data has been ingested into " + tablet.getExtent() + " using " + logFileOnly); } - ReplicationTableUtil.updateFiles(tablet.getTabletServer(), tablet.getExtent(), logFileOnly, StatusUtil.openWithUnknownLength()); + for (String logFile : logFileOnly) { + ReplicationTableUtil.updateFiles(tablet.getTabletServer(), tablet.getExtent(), logFile, StatusUtil.openWithUnknownLength()); + } } } finally { tablet.finishClearingUnusedLogs(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 1f4625b..17864be 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -37,6 +37,7 @@ import java.util.PriorityQueue; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -200,7 +201,7 @@ public class Tablet implements TabletCommitter { } // stores info about user initiated major compaction that is waiting on a minor compaction to finish - private CompactionWaitInfo compactionWaitInfo = new CompactionWaitInfo(); + private final CompactionWaitInfo compactionWaitInfo = new CompactionWaitInfo(); static enum CompactionState { WAITING_TO_START, IN_PROGRESS @@ -627,8 +628,8 @@ public class Tablet implements TabletCommitter { // the WAL isn't closed (WRT replication Status) and thus we're safe to update its progress. Status status = StatusUtil.openWithUnknownLength(); for (LogEntry logEntry : logEntries) { - log.debug("Writing updated status to metadata table for " + logEntry.logSet + " " + ProtobufUtil.toString(status)); - ReplicationTableUtil.updateFiles(tabletServer, extent, logEntry.logSet, status); + log.debug("Writing updated status to metadata table for " + logEntry.filename + " " + ProtobufUtil.toString(status)); + ReplicationTableUtil.updateFiles(tabletServer, extent, logEntry.filename, status); } } @@ -640,11 +641,9 @@ public class Tablet implements TabletCommitter { } } // make some closed references that represent the recovered logs - currentLogs = new HashSet<DfsLogger>(); + currentLogs = new ConcurrentSkipListSet<DfsLogger>(); for (LogEntry logEntry : logEntries) { - for (String log : logEntry.logSet) { - currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), log, logEntry.getColumnQualifier().toString())); - } + currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.filename, logEntry.getColumnQualifier().toString())); } log.info("Write-Ahead Log recovery complete for " + this.extent + " (" + count[0] + " mutations applied, " + getTabletMemory().getNumEntries() @@ -935,7 +934,9 @@ public class Tablet implements TabletCommitter { long count = 0; + String oldName = Thread.currentThread().getName(); try { + Thread.currentThread().setName("Minor compacting " + this.extent); Span span = Trace.start("write"); CompactionStats stats; try { @@ -966,6 +967,7 @@ public class Tablet implements TabletCommitter { failed = true; throw new RuntimeException(e); } finally { + Thread.currentThread().setName(oldName); try { getTabletMemory().finalizeMinC(); } catch (Throwable t) { @@ -990,7 +992,7 @@ public class Tablet implements TabletCommitter { private synchronized MinorCompactionTask prepareForMinC(long flushId, MinorCompactionReason mincReason) { CommitSession oldCommitSession = getTabletMemory().prepareForMinC(); otherLogs = currentLogs; - currentLogs = new HashSet<DfsLogger>(); + currentLogs = new ConcurrentSkipListSet<DfsLogger>(); FileRef mergeFile = null; if (mincReason != MinorCompactionReason.RECOVERY) { @@ -2374,14 +2376,11 @@ public class Tablet implements TabletCommitter { } } - private Set<DfsLogger> currentLogs = new HashSet<DfsLogger>(); + private ConcurrentSkipListSet<DfsLogger> currentLogs = new ConcurrentSkipListSet<DfsLogger>(); - public synchronized Set<String> getCurrentLogFiles() { - Set<String> result = new HashSet<String>(); - for (DfsLogger log : currentLogs) { - result.add(log.getFileName()); - } - return result; + // currentLogs may be updated while a tablet is otherwise locked + public Set<DfsLogger> getCurrentLogFiles() { + return new HashSet<DfsLogger>(currentLogs); } Set<String> beginClearingUnusedLogs() { @@ -2440,13 +2439,13 @@ public class Tablet implements TabletCommitter { // this lock is basically used to synchronize writing of log info to metadata private final ReentrantLock logLock = new ReentrantLock(); - public synchronized int getLogCount() { + public int getLogCount() { return currentLogs.size(); } // don't release the lock if this method returns true for success; instead, the caller should clean up by calling finishUpdatingLogsUsed() @Override - public boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<DfsLogger> more, boolean mincFinish) { + public boolean beginUpdatingLogsUsed(InMemoryMap memTable, DfsLogger more, boolean mincFinish) { boolean releaseLock = true; @@ -2483,28 +2482,26 @@ public class Tablet implements TabletCommitter { int numAdded = 0; int numContained = 0; - for (DfsLogger logger : more) { - if (addToOther) { - if (otherLogs.add(logger)) - numAdded++; + if (addToOther) { + if (otherLogs.add(more)) + numAdded++; - if (currentLogs.contains(logger)) - numContained++; - } else { - if (currentLogs.add(logger)) - numAdded++; + if (currentLogs.contains(more)) + numContained++; + } else { + if (currentLogs.add(more)) + numAdded++; - if (otherLogs.contains(logger)) - numContained++; - } + if (otherLogs.contains(more)) + numContained++; } - if (numAdded > 0 && numAdded != more.size()) { + if (numAdded > 0 && numAdded != 1) { // expect to add all or none throw new IllegalArgumentException("Added subset of logs " + extent + " " + more + " " + currentLogs); } - if (numContained > 0 && numContained != more.size()) { + if (numContained > 0 && numContained != 1) { // expect to contain all or none throw new IllegalArgumentException("Other logs contained subset of logs " + extent + " " + more + " " + otherLogs); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java index c7e3a66..934ce20 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java @@ -16,7 +16,6 @@ */ package org.apache.accumulo.tserver.tablet; -import java.util.Collection; import java.util.List; import org.apache.accumulo.core.client.Durability; @@ -38,7 +37,7 @@ public interface TabletCommitter { /** * If this method returns true, the caller must call {@link #finishUpdatingLogsUsed()} to clean up */ - boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<DfsLogger> copy, boolean mincFinish); + boolean beginUpdatingLogsUsed(InMemoryMap memTable, DfsLogger copy, boolean mincFinish); void finishUpdatingLogsUsed(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java new file mode 100644 index 0000000..44058d3 --- /dev/null +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEntryTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.log; + +import static org.junit.Assert.assertEquals; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyExtent; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +public class LogEntryTest { + + @Test + public void test() throws Exception { + KeyExtent extent = new KeyExtent(new Text("1"), null, new Text("")); + long ts = 12345678L; + String server = "localhost:1234"; + String filename = "default/foo"; + LogEntry entry = new LogEntry(extent, ts, server, filename); + assertEquals(extent, entry.extent); + assertEquals(server, entry.server); + assertEquals(filename, entry.filename); + assertEquals(ts, entry.timestamp); + assertEquals("1<; default/foo", entry.toString()); + assertEquals(new Text("log"), entry.getColumnFamily()); + assertEquals(new Text("localhost:1234/default/foo"), entry.getColumnQualifier()); + LogEntry copy = LogEntry.fromBytes(entry.toBytes()); + assertEquals(entry.toString(), copy.toString()); + Key key = new Key(new Text("1<"), new Text("log"), new Text("localhost:1234/default/foo")); + key.setTimestamp(ts); + LogEntry copy2 = LogEntry.fromKeyValue(key, entry.getValue()); + assertEquals(entry.toString(), copy2.toString()); + assertEquals(entry.timestamp, copy2.timestamp); + assertEquals("foo", entry.getUniqueID()); + assertEquals("localhost:1234/default/foo", entry.getName()); + assertEquals(new Value("default/foo".getBytes()), entry.getValue()); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java index d0de29f..1186c68 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java @@ -202,9 +202,6 @@ public class NullTserver { } @Override - public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {} - - @Override public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException { return new ArrayList<ActiveCompaction>(); } @@ -231,6 +228,9 @@ public class NullTserver { public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) throws TException { return null; } + + @Override + public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException { } } static class Opts extends Help { http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java b/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java index 404a8fd..81e25cc 100644 --- a/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java +++ b/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java @@ -60,6 +60,11 @@ import com.google.common.net.HostAndPort; public class ProxyDurabilityIT extends ConfigurableMacIT { @Override + protected int defaultTimeoutSeconds() { + return 60; + } + + @Override public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "10s"); @@ -111,7 +116,7 @@ public class ProxyDurabilityIT extends ConfigurableMacIT { assertEquals(0, count(tableName)); ConditionalWriterOptions cfg = new ConditionalWriterOptions(); - cfg.setDurability(Durability.LOG); + cfg.setDurability(Durability.SYNC); String cwriter = client.createConditionalWriter(login, tableName, cfg); ConditionalUpdates updates = new ConditionalUpdates(); updates.addToConditions(new Condition(new Column(bytes("cf"), bytes("cq"), bytes("")))); @@ -120,7 +125,7 @@ public class ProxyDurabilityIT extends ConfigurableMacIT { assertEquals(ConditionalStatus.ACCEPTED, status.get(bytes("row"))); assertEquals(1, count(tableName)); restartTServer(); - assertEquals(0, count(tableName)); + assertEquals(1, count(tableName)); proxyServer.stop(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java b/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java index 25337b2..0dcdf42 100644 --- a/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java +++ b/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java @@ -54,7 +54,7 @@ public class BadDeleteMarkersCreatedIT extends AccumuloClusterIT { @Override public int defaultTimeoutSeconds() { - return 60; + return 120; } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/BalanceIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/BalanceIT.java b/test/src/test/java/org/apache/accumulo/test/BalanceIT.java index f793925..8703f18 100644 --- a/test/src/test/java/org/apache/accumulo/test/BalanceIT.java +++ b/test/src/test/java/org/apache/accumulo/test/BalanceIT.java @@ -20,25 +20,33 @@ import java.util.SortedSet; import java.util.TreeSet; import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.accumulo.harness.AccumuloClusterIT; import org.apache.hadoop.io.Text; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class BalanceIT extends ConfigurableMacIT { +public class BalanceIT extends AccumuloClusterIT { + private static final Logger log = LoggerFactory.getLogger(BalanceIT.class); - @Test(timeout = 60 * 1000) + @Override + public int defaultTimeoutSeconds() { + return 60; + } + + @Test public void testBalance() throws Exception { String tableName = getUniqueNames(1)[0]; Connector c = getConnector(); - System.out.println("Creating table"); + log.info("Creating table"); c.tableOperations().create(tableName); SortedSet<Text> splits = new TreeSet<Text>(); for (int i = 0; i < 10; i++) { splits.add(new Text("" + i)); } - System.out.println("Adding splits"); + log.info("Adding splits"); c.tableOperations().addSplits(tableName, splits); - System.out.println("Waiting for balance"); + log.info("Waiting for balance"); c.instanceOperations().waitForBalance(); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java index f553be8..fcad293 100644 --- a/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java +++ b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java @@ -128,6 +128,7 @@ public class CleanWalIT extends AccumuloClusterIT { private int countLogs(String tableName, Connector conn) throws TableNotFoundException { Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME); + scanner.setRange(MetadataSchema.TabletsSection.getRange()); int count = 0; for (Entry<Key,Value> entry : scanner) { log.debug("Saw " + entry.getKey() + "=" + entry.getValue()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java index b7637a6..65be396 100644 --- a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java +++ b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java @@ -1294,6 +1294,7 @@ public class ConditionalWriterIT extends AccumuloClusterIT { conn.tableOperations().create(tableName); DistributedTrace.enable("localhost", "testTrace", mac.getClientConfig()); + UtilWaitThread.sleep(1000); Span root = Trace.on("traceTest"); ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java b/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java new file mode 100644 index 0000000..0324e4a --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.Assert.assertEquals; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.fate.util.UtilWaitThread; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.fs.RemoteIterator; +import org.junit.Test; + +import com.google.common.collect.Iterators; + +public class GarbageCollectWALIT extends ConfigurableMacIT { + + @Override + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.INSTANCE_ZK_HOST, "5s"); + cfg.setProperty(Property.GC_CYCLE_START, "1s"); + cfg.setProperty(Property.GC_CYCLE_DELAY, "1s"); + cfg.setNumTservers(1); + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + } + + @Test(timeout = 2 * 60 * 1000) + public void test() throws Exception { + // not yet, please + String tableName = getUniqueNames(1)[0]; + cluster.getClusterControl().stop(ServerType.GARBAGE_COLLECTOR); + Connector c = getConnector(); + c.tableOperations().create(tableName); + // count the number of WALs in the filesystem + assertEquals(2, countWALsInFS(cluster)); + cluster.getClusterControl().stop(ServerType.TABLET_SERVER); + cluster.getClusterControl().start(ServerType.GARBAGE_COLLECTOR); + cluster.getClusterControl().start(ServerType.TABLET_SERVER); + Iterators.size(c.createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator()); + // let GC run + UtilWaitThread.sleep(3 * 5 * 1000); + assertEquals(2, countWALsInFS(cluster)); + } + + private int countWALsInFS(MiniAccumuloClusterImpl cluster) throws Exception { + FileSystem fs = cluster.getFileSystem(); + RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(cluster.getConfig().getAccumuloDir() + "/wal"), true); + int result = 0; + while (iterator.hasNext()) { + LocatedFileStatus next = iterator.next(); + if (!next.isDirectory()) { + result++; + } + } + return result; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java index b78a311..27f1f69 100644 --- a/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java +++ b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java @@ -19,7 +19,6 @@ package org.apache.accumulo.test; import static java.nio.charset.StandardCharsets.UTF_8; import java.io.File; -import java.util.Collections; import java.util.UUID; import org.apache.accumulo.core.client.BatchWriter; @@ -27,6 +26,7 @@ import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema; @@ -127,11 +127,7 @@ public class MissingWalHeaderCompletesRecoveryIT extends ConfigurableMacIT { String tableId = conn.tableOperations().tableIdMap().get(tableName); Assert.assertNotNull("Table ID was null", tableId); - LogEntry logEntry = new LogEntry(); - logEntry.server = "127.0.0.1:12345"; - logEntry.filename = emptyWalog.toURI().toString(); - logEntry.tabletId = 10; - logEntry.logSet = Collections.singleton(logEntry.filename); + LogEntry logEntry = new LogEntry(new KeyExtent(new Text(tableId), null, null), 0, "127.0.0.1:12345", emptyWalog.toURI().toString()); log.info("Taking {} offline", tableName); conn.tableOperations().offline(tableName, true); @@ -186,11 +182,7 @@ public class MissingWalHeaderCompletesRecoveryIT extends ConfigurableMacIT { String tableId = conn.tableOperations().tableIdMap().get(tableName); Assert.assertNotNull("Table ID was null", tableId); - LogEntry logEntry = new LogEntry(); - logEntry.server = "127.0.0.1:12345"; - logEntry.filename = partialHeaderWalog.toURI().toString(); - logEntry.tabletId = 10; - logEntry.logSet = Collections.singleton(logEntry.filename); + LogEntry logEntry = new LogEntry(null, 0, "127.0.0.1:12345", partialHeaderWalog.toURI().toString()); log.info("Taking {} offline", tableName); conn.tableOperations().offline(tableName, true); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java b/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java deleted file mode 100644 index 6a9975c..0000000 --- a/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.Map.Entry; - -import org.apache.accumulo.cluster.ClusterControl; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.PartialKey; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.TablePermission; -import org.apache.accumulo.harness.AccumuloClusterIT; -import org.apache.accumulo.minicluster.ServerType; -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.accumulo.test.functional.FunctionalTestUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.RawLocalFileSystem; -import org.apache.hadoop.io.Text; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -// Verify that a recovery of a log without any mutations removes the log reference -public class NoMutationRecoveryIT extends AccumuloClusterIT { - private static final Logger log = LoggerFactory.getLogger(NoMutationRecoveryIT.class); - - @Override - public int defaultTimeoutSeconds() { - return 10 * 60; - } - - @Override - public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.setNumTservers(1); - hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); - } - - @Before - public void takeTraceTableOffline() throws Exception { - Connector conn = getConnector(); - if (conn.tableOperations().exists("trace")) { - conn.tableOperations().offline("trace", true); - } - } - - @After - public void takeTraceTableOnline() throws Exception { - Connector conn = getConnector(); - if (conn.tableOperations().exists("trace")) { - conn.tableOperations().online("trace", true); - } - } - - public boolean equals(Entry<Key,Value> a, Entry<Key,Value> b) { - // comparison, without timestamp - Key akey = a.getKey(); - Key bkey = b.getKey(); - log.info("Comparing {} to {}", akey.toStringNoTruncate(), bkey.toStringNoTruncate()); - return akey.compareTo(bkey, PartialKey.ROW_COLFAM_COLQUAL_COLVIS) == 0 && a.getValue().equals(b.getValue()); - } - - @Test - public void test() throws Exception { - Connector conn = getConnector(); - final String table = getUniqueNames(1)[0]; - conn.tableOperations().create(table); - String tableId = conn.tableOperations().tableIdMap().get(table); - - log.info("Created {} with id {}", table, tableId); - - // Add a record to the table - update(conn, table, new Text("row"), new Text("cf"), new Text("cq"), new Value("value".getBytes())); - - // Get the WAL reference used by the table we just added the update to - Entry<Key,Value> logRef = getLogRef(conn, MetadataTable.NAME); - - log.info("Log reference in metadata table {} {}", logRef.getKey().toStringNoTruncate(), logRef.getValue()); - - // Flush the record to disk - conn.tableOperations().flush(table, null, null, true); - - Range range = Range.prefix(tableId); - log.info("Fetching WAL references over " + table); - assertEquals("should not have any refs", 0, FunctionalTestUtils.count(getLogRefs(conn, MetadataTable.NAME, range))); - - // Grant permission to the admin user to write to the Metadata table - conn.securityOperations().grantTablePermission(conn.whoami(), MetadataTable.NAME, TablePermission.WRITE); - - // Add the wal record back to the metadata table - update(conn, MetadataTable.NAME, logRef); - - // Assert that we can get the bogus update back out again - assertTrue(equals(logRef, getLogRef(conn, MetadataTable.NAME))); - - conn.tableOperations().flush(MetadataTable.NAME, null, null, true); - conn.tableOperations().flush(RootTable.NAME, null, null, true); - - ClusterControl control = cluster.getClusterControl(); - control.stopAllServers(ServerType.TABLET_SERVER); - control.startAllServers(ServerType.TABLET_SERVER); - - // Verify that we can read the original record we wrote - Scanner s = conn.createScanner(table, Authorizations.EMPTY); - int count = 0; - for (Entry<Key,Value> e : s) { - assertEquals(e.getKey().getRow().toString(), "row"); - assertEquals(e.getKey().getColumnFamily().toString(), "cf"); - assertEquals(e.getKey().getColumnQualifier().toString(), "cq"); - assertEquals(e.getValue().toString(), "value"); - count++; - } - assertEquals(1, count); - - // Verify that the bogus log reference we wrote it gone - for (Entry<Key,Value> ref : getLogRefs(conn, MetadataTable.NAME)) { - assertFalse("Unexpected found reference to bogus log entry: " + ref.getKey().toStringNoTruncate() + " " + ref.getValue(), equals(ref, logRef)); - } - } - - private void update(Connector conn, String name, Entry<Key,Value> logRef) throws Exception { - Key k = logRef.getKey(); - update(conn, name, k.getRow(), k.getColumnFamily(), k.getColumnQualifier(), logRef.getValue()); - } - - private Iterable<Entry<Key,Value>> getLogRefs(Connector conn, String table) throws Exception { - return getLogRefs(conn, table, new Range()); - } - - private Iterable<Entry<Key,Value>> getLogRefs(Connector conn, String table, Range r) throws Exception { - Scanner s = conn.createScanner(table, Authorizations.EMPTY); - s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME); - s.setRange(r); - return s; - } - - private Entry<Key,Value> getLogRef(Connector conn, String table) throws Exception { - return getLogRefs(conn, table).iterator().next(); - } - - private void update(Connector conn, String table, Text row, Text cf, Text cq, Value value) throws Exception { - BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig()); - Mutation m = new Mutation(row); - m.put(cf, cq, value); - bw.addMutation(m); - bw.close(); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java index 8b37169..5c5b95d 100644 --- a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java +++ b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java @@ -349,7 +349,7 @@ public class ShellServerIT extends SharedMiniClusterIT { ts.exec("config -t " + table2 + " -np", true, "345M", true); ts.exec("getsplits -t " + table2, true, "row5", true); ts.exec("constraint --list -t " + table2, true, "VisibilityConstraint=2", true); - ts.exec("onlinetable " + table, true); + ts.exec("online " + table, true); ts.exec("deletetable -f " + table, true); ts.exec("deletetable -f " + table2, true); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java b/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java new file mode 100644 index 0000000..03d783c --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.Assert.assertEquals; + +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.Test; + +import com.google.common.collect.Iterators; + +// When reviewing the changes for ACCUMULO-3423, kturner suggested +// "tablets will now have log references that contain no data, +// so it may be marked with 3 WALs, the first with data, the 2nd without, a 3rd with data. +// It would be useful to have an IT that will test this situation. +public class UnusedWALIT extends ConfigurableMacIT { + + @Override + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + final long logSize = 1024 * 1024 * 10; + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); + cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, Long.toString(logSize)); + cfg.setNumTservers(1); + // use raw local file system so walogs sync and flush will work + hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); + hadoopCoreSite.set("fs.namenode.fs-limits.min-block-size", Long.toString(logSize)); + } + + @Test(timeout = 2 * 60 * 1000) + public void test() throws Exception { + // don't want this bad boy cleaning up walog entries + getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR); + + // make two tables + String[] tableNames = getUniqueNames(2); + String bigTable = tableNames[0]; + String lilTable = tableNames[1]; + Connector c = getConnector(); + c.tableOperations().create(bigTable); + c.tableOperations().create(lilTable); + + // put some data in a log that should be replayed for both tables + writeSomeData(c, bigTable, 0, 10, 0, 10); + scanSomeData(c, bigTable, 0, 10, 0, 10); + writeSomeData(c, lilTable, 0, 1, 0, 1); + scanSomeData(c, lilTable, 0, 1, 0, 1); + assertEquals(1, getWALCount(c)); + + // roll the logs by pushing data into bigTable + writeSomeData(c, bigTable, 0, 3000, 0, 1000); + assertEquals(2, getWALCount(c)); + + // put some data in the latest log + writeSomeData(c, lilTable, 1, 10, 0, 10); + scanSomeData(c, lilTable, 1, 10, 0, 10); + + // bounce the tserver + getCluster().getClusterControl().stop(ServerType.TABLET_SERVER); + getCluster().getClusterControl().start(ServerType.TABLET_SERVER); + + // wait for the metadata table to be online + Iterators.size(c.createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator()); + + // check our two sets of data in different logs + scanSomeData(c, lilTable, 0, 1, 0, 1); + scanSomeData(c, lilTable, 1, 10, 0, 10); + } + + private void scanSomeData(Connector c, String table, int startRow, int rowCount, int startCol, int colCount) throws Exception { + Scanner s = c.createScanner(table, Authorizations.EMPTY); + s.setRange(new Range(Integer.toHexString(startRow), Integer.toHexString(startRow + rowCount))); + int row = startRow; + int col = startCol; + for (Entry<Key,Value> entry : s) { + assertEquals(row, Integer.parseInt(entry.getKey().getRow().toString(), 16)); + assertEquals(col++, Integer.parseInt(entry.getKey().getColumnQualifier().toString(), 16)); + if (col == startCol + colCount) { + col = startCol; + row++; + if (row == startRow + rowCount) { + break; + } + } + } + assertEquals(row, startRow + rowCount); + } + + private int getWALCount(Connector c) throws Exception { + Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + s.setRange(CurrentLogsSection.getRange()); + try { + return Iterators.size(s.iterator()); + } finally { + s.close(); + } + } + + private void writeSomeData(Connector conn, String table, int startRow, int rowCount, int startCol, int colCount) throws Exception { + BatchWriterConfig config = new BatchWriterConfig(); + config.setMaxMemory(10 * 1024 * 1024); + BatchWriter bw = conn.createBatchWriter(table, config); + for (int r = startRow; r < startRow + rowCount; r++) { + Mutation m = new Mutation(Integer.toHexString(r)); + for (int c = startCol; c < startCol + colCount; c++) { + m.put("", Integer.toHexString(c), ""); + } + bw.addMutation(m); + } + bw.close(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/VolumeIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java index d9b9429..e2a0e03 100644 --- a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java +++ b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java @@ -103,6 +103,7 @@ public class VolumeIT extends ConfigurableMacIT { cfg.setProperty(Property.INSTANCE_DFS_DIR, v1Uri.getPath()); cfg.setProperty(Property.INSTANCE_DFS_URI, v1Uri.getScheme() + v1Uri.getHost()); cfg.setProperty(Property.INSTANCE_VOLUMES, v1.toString() + "," + v2.toString()); + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); // use raw local file system so walogs sync and flush will work hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); @@ -425,6 +426,21 @@ public class VolumeIT extends ConfigurableMacIT { Assert.fail("Unexpected volume " + path); } + Text path = new Text(); + for (String table : new String[]{RootTable.NAME, MetadataTable.NAME}) { + Scanner meta = conn.createScanner(table, Authorizations.EMPTY); + meta.setRange(MetadataSchema.CurrentLogsSection.getRange()); + outer: for (Entry<Key,Value> entry : meta) { + MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path); + for (int i = 0; i < paths.length; i++) { + if (path.toString().startsWith(paths[i].toString())) { + continue outer; + } + } + Assert.fail("Unexpected volume " + path); + } + } + // if a volume is chosen randomly for each tablet, then the probability that a volume will not be chosen for any tablet is ((num_volumes - // 1)/num_volumes)^num_tablets. For 100 tablets and 3 volumes the probability that only 2 volumes would be chosen is 2.46e-18 @@ -435,6 +451,7 @@ public class VolumeIT extends ConfigurableMacIT { } Assert.assertEquals(200, sum); + } @Test http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java index 099743d..1f3e600 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/ReadWriteIT.java @@ -57,6 +57,7 @@ import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.client.security.tokens.KerberosToken; import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Range; @@ -72,9 +73,11 @@ import org.apache.accumulo.fate.zookeeper.ZooLock; import org.apache.accumulo.fate.zookeeper.ZooReader; import org.apache.accumulo.harness.AccumuloClusterIT; import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; import org.apache.accumulo.test.TestIngest; import org.apache.accumulo.test.TestMultiTableIngest; import org.apache.accumulo.test.VerifyIngest; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.junit.Test; @@ -84,6 +87,11 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Charsets; public class ReadWriteIT extends AccumuloClusterIT { + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); + } + private static final Logger log = LoggerFactory.getLogger(ReadWriteIT.class); static final int ROWS = 200000;