http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java index 755e322..edea93f 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java +++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java @@ -161,6 +161,7 @@ class TabletGroupWatcher extends Daemon { List<Assignment> assigned = new ArrayList<Assignment>(); List<TabletLocationState> assignedToDeadServers = new ArrayList<TabletLocationState>(); Map<KeyExtent,TServerInstance> unassigned = new HashMap<KeyExtent,TServerInstance>(); + Map<TServerInstance, List<String>> logsForDeadServers = new TreeMap<>(); MasterState masterState = master.getMasterState(); int[] counts = new int[TabletState.values().length]; @@ -173,6 +174,7 @@ class TabletGroupWatcher extends Daemon { if (tls == null) { continue; } + Master.log.debug(store.name() + " location State: " + tls); // ignore entries for tables that do not exist in zookeeper if (TableManager.getInstance().getTableState(tls.extent.getTableId().toString()) == null) continue; @@ -182,7 +184,7 @@ class TabletGroupWatcher extends Daemon { // Don't overwhelm the tablet servers with work if (unassigned.size() + unloaded > Master.MAX_TSERVER_WORK_CHUNK * currentTServers.size()) { - flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned); + flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, unassigned); assignments.clear(); assigned.clear(); assignedToDeadServers.clear(); @@ -237,7 +239,7 @@ class TabletGroupWatcher extends Daemon { assignedToDeadServers.add(tls); if (server.equals(this.master.migrations.get(tls.extent))) this.master.migrations.remove(tls.extent); - // log.info("Current servers " + currentTServers.keySet()); + MetadataTableUtil.fetchLogsForDeadServer(master, master.getMasterLock(), tls.extent, tls.futureOrCurrent(), logsForDeadServers); break; case UNASSIGNED: // maybe it's a finishing migration @@ -266,7 +268,7 @@ class TabletGroupWatcher extends Daemon { break; case ASSIGNED_TO_DEAD_SERVER: assignedToDeadServers.add(tls); - // log.info("Current servers " + currentTServers.keySet()); + MetadataTableUtil.fetchLogsForDeadServer(master, master.getMasterLock(), tls.extent, tls.futureOrCurrent(), logsForDeadServers); break; case HOSTED: TServerConnection conn = this.master.tserverSet.getConnection(server); @@ -285,7 +287,8 @@ class TabletGroupWatcher extends Daemon { counts[state.ordinal()]++; } - flushChanges(destinations, assignments, assigned, assignedToDeadServers, unassigned); + flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, unassigned); + store.markLogsAsUnused(master, logsForDeadServers); // provide stats after flushing changes to avoid race conditions w/ delete table stats.end(masterState); @@ -723,12 +726,19 @@ class TabletGroupWatcher extends Daemon { } } - private void flushChanges(SortedMap<TServerInstance,TabletServerStatus> currentTServers, List<Assignment> assignments, List<Assignment> assigned, - List<TabletLocationState> assignedToDeadServers, Map<KeyExtent,TServerInstance> unassigned) throws DistributedStoreException, TException { + private void flushChanges( + SortedMap<TServerInstance,TabletServerStatus> currentTServers, + List<Assignment> assignments, + List<Assignment> assigned, + List<TabletLocationState> assignedToDeadServers, + Map<TServerInstance, List<String>> logsForDeadServers, + Map<KeyExtent,TServerInstance> unassigned) + throws DistributedStoreException, TException { if (!assignedToDeadServers.isEmpty()) { int maxServersToShow = min(assignedToDeadServers.size(), 100); Master.log.debug(assignedToDeadServers.size() + " assigned to dead servers: " + assignedToDeadServers.subList(0, maxServersToShow) + "..."); - store.unassign(assignedToDeadServers); + Master.log.debug("logs for dead servers: " + logsForDeadServers); + store.unassign(assignedToDeadServers, logsForDeadServers); this.master.nextEvent.event("Marked %d tablets as unassigned because they don't have current servers", assignedToDeadServers.size()); }
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java index b8e0b40..bc4c64f 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java +++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java @@ -107,6 +107,7 @@ public class WorkMaker { // Don't create the record if we have nothing to do. // TODO put this into a filter on serverside if (!shouldCreateWork(status)) { + log.info("Not creating work: " + status.toString()); continue; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java index 8cdaf9f..895717a 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java +++ b/server/master/src/main/java/org/apache/accumulo/master/state/MergeStats.java @@ -30,6 +30,7 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.server.cli.ClientOpts; @@ -186,7 +187,7 @@ public class MergeStats { Text tableId = extent.getTableId(); Text first = KeyExtent.getMetadataEntry(tableId, start); Range range = new Range(first, false, null, true); - scanner.setRange(range); + scanner.setRange(range.clip(MetadataSchema.TabletsSection.getRange())); KeyExtent prevExtent = null; log.debug("Scanning range " + range); http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java index b0240f1..b39dcb8 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java +++ b/server/master/src/test/java/org/apache/accumulo/master/TestMergeState.java @@ -186,7 +186,7 @@ public class TestMergeState { // take it offline m = tablet.getPrevRowUpdateMutation(); Collection<Collection<String>> walogs = Collections.emptyList(); - metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false))); + metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, state.someTServer, null, walogs, false)), null); // now we can split stats = scan(state, metaDataStateStore); http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java ---------------------------------------------------------------------- diff --git a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java index abceae4..db16bcb 100644 --- a/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java +++ b/server/master/src/test/java/org/apache/accumulo/master/state/RootTabletStateStoreTest.java @@ -181,7 +181,7 @@ public class RootTabletStateStoreTest { } catch (BadLocationStateException e) { fail("Unexpected error " + e); } - tstore.unassign(Collections.singletonList(assigned)); + tstore.unassign(Collections.singletonList(assigned), null); count = 0; for (TabletLocationState location : tstore) { assertEquals(location.extent, root); @@ -209,7 +209,7 @@ public class RootTabletStateStoreTest { fail("Unexpected error " + e); } try { - tstore.unassign(Collections.singletonList(broken)); + tstore.unassign(Collections.singletonList(broken), null); Assert.fail("should not get here"); } catch (IllegalArgumentException ex) {} } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java b/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java index 7d09fe3..b05032e 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/server/GarbageCollectionLogger.java @@ -34,7 +34,8 @@ public class GarbageCollectionLogger { private long gcTimeIncreasedCount = 0; private static long lastMemoryCheckTime = 0; - public GarbageCollectionLogger() {} + public GarbageCollectionLogger() { + } public synchronized void logGCInfo(AccumuloConfiguration conf) { final long now = System.currentTimeMillis(); @@ -96,7 +97,7 @@ public class GarbageCollectionLogger { final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); if (lastMemoryCheckTime > 0 && lastMemoryCheckTime < now) { final long diff = now - lastMemoryCheckTime; - if (diff > keepAliveTimeout) { + if (diff > keepAliveTimeout + 1000) { log.warn(String.format("GC pause checker not called in a timely fashion. Expected every %.1f seconds but was %.1f seconds since last check", keepAliveTimeout / 1000., diff / 1000.)); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletLevel.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletLevel.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletLevel.java new file mode 100644 index 0000000..1e82393 --- /dev/null +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletLevel.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver; + +import org.apache.accumulo.core.data.KeyExtent; + +public enum TabletLevel { + ROOT, + META, + NORMAL; + + public static TabletLevel getLevel(KeyExtent extent) { + if (!extent.isMeta()) + return NORMAL; + if (extent.isRootTablet()) + return ROOT; + return META; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 662ee31..b12fccc 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -19,7 +19,6 @@ package org.apache.accumulo.tserver; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.accumulo.server.problems.ProblemType.TABLET_LOAD; -import java.io.FileNotFoundException; import java.io.IOException; import java.lang.management.ManagementFactory; import java.net.UnknownHostException; @@ -29,6 +28,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -44,6 +44,7 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.BlockingDeque; import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; @@ -146,7 +147,6 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.server.Accumulo; import org.apache.accumulo.server.AccumuloServerContext; import org.apache.accumulo.server.GarbageCollectionLogger; -import org.apache.accumulo.server.ServerConstants; import org.apache.accumulo.server.ServerOpts; import org.apache.accumulo.server.client.ClientServiceHandler; import org.apache.accumulo.server.client.HdfsZooInstance; @@ -1438,6 +1438,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { } } + @Override public void loadTablet(TInfo tinfo, TCredentials credentials, String lock, final TKeyExtent textent) { @@ -1498,6 +1499,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { final AssignmentHandler ah = new AssignmentHandler(extent); // final Runnable ah = new LoggingRunnable(log, ); // Root tablet assignment must take place immediately + if (extent.isRootTablet()) { new Daemon("Root Tablet Assignment") { @Override @@ -1690,66 +1692,6 @@ public class TabletServer extends AccumuloServerContext implements Runnable { } @Override - public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException { - String myname = getClientAddressString(); - myname = myname.replace(':', '+'); - Set<String> loggers = new HashSet<String>(); - logger.getLogFiles(loggers); - Set<String> loggerUUIDs = new HashSet<String>(); - for (String logger : loggers) - loggerUUIDs.add(new Path(logger).getName()); - - nextFile: for (String filename : filenames) { - String uuid = new Path(filename).getName(); - // skip any log we're currently using - if (loggerUUIDs.contains(uuid)) - continue nextFile; - - List<Tablet> onlineTabletsCopy = new ArrayList<Tablet>(); - synchronized (onlineTablets) { - onlineTabletsCopy.addAll(onlineTablets.values()); - } - for (Tablet tablet : onlineTabletsCopy) { - for (String current : tablet.getCurrentLogFiles()) { - if (current.contains(uuid)) { - log.info("Attempted to delete " + filename + " from tablet " + tablet.getExtent()); - continue nextFile; - } - } - } - - try { - Path source = new Path(filename); - if (TabletServer.this.getConfiguration().getBoolean(Property.TSERV_ARCHIVE_WALOGS)) { - Path walogArchive = fs.matchingFileSystem(source, ServerConstants.getWalogArchives()); - fs.mkdirs(walogArchive); - Path dest = new Path(walogArchive, source.getName()); - log.info("Archiving walog " + source + " to " + dest); - if (!fs.rename(source, dest)) - log.error("rename is unsuccessful"); - } else { - log.info("Deleting walog " + filename); - Path sourcePath = new Path(filename); - if (!(!TabletServer.this.getConfiguration().getBoolean(Property.GC_TRASH_IGNORE) && fs.moveToTrash(sourcePath)) - && !fs.deleteRecursively(sourcePath)) - log.warn("Failed to delete walog " + source); - for (String recovery : ServerConstants.getRecoveryDirs()) { - Path recoveryPath = new Path(recovery, source.getName()); - try { - if (fs.moveToTrash(recoveryPath) || fs.deleteRecursively(recoveryPath)) - log.info("Deleted any recovery log " + filename); - } catch (FileNotFoundException ex) { - // ignore - } - } - } - } catch (IOException e) { - log.warn("Error attempting to delete write-ahead log " + filename + ": " + e); - } - } - } - - @Override public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException { try { checkPermission(credentials, null, "getActiveCompactions"); @@ -1770,14 +1712,13 @@ public class TabletServer extends AccumuloServerContext implements Runnable { @Override public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) throws TException { - Set<String> logs = new HashSet<String>(); - logger.getLogFiles(logs); - return new ArrayList<String>(logs); + String log = logger.getLogFile(); + return Collections.singletonList(log); } } private class SplitRunner implements Runnable { - private Tablet tablet; + private final Tablet tablet; public SplitRunner(Tablet tablet) { this.tablet = tablet; @@ -2031,7 +1972,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { log.error("Unexpected error ", e); } log.debug("Unassigning " + tls); - TabletStateStore.unassign(TabletServer.this, tls); + TabletStateStore.unassign(TabletServer.this, tls, null); } catch (DistributedStoreException ex) { log.warn("Unable to update storage", ex); } catch (KeeperException e) { @@ -2238,29 +2179,6 @@ public class TabletServer extends AccumuloServerContext implements Runnable { } } - public void addLoggersToMetadata(List<DfsLogger> logs, KeyExtent extent, int id) { - if (!this.onlineTablets.containsKey(extent)) { - log.info("Not adding " + logs.size() + " logs for extent " + extent + " as alias " + id + " tablet is offline"); - // minor compaction due to recovery... don't make updates... if it finishes, there will be no WALs, - // if it doesn't, we'll need to do the same recovery with the old files. - return; - } - - log.info("Adding " + logs.size() + " logs for extent " + extent + " as alias " + id); - long now = RelativeTime.currentTimeMillis(); - List<String> logSet = new ArrayList<String>(); - for (DfsLogger log : logs) - logSet.add(log.getFileName()); - LogEntry entry = new LogEntry(); - entry.extent = extent; - entry.tabletId = id; - entry.timestamp = now; - entry.server = logs.get(0).getLogger(); - entry.filename = logs.get(0).getFileName(); - entry.logSet = logSet; - MetadataTableUtil.addLogEntry(this, entry, getLock()); - } - private HostAndPort startServer(AccumuloConfiguration conf, String address, Property portHint, TProcessor processor, String threadName) throws UnknownHostException { Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); @@ -2968,6 +2886,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable { public void minorCompactionFinished(CommitSession tablet, String newDatafile, int walogSeq) throws IOException { totalMinorCompactions.incrementAndGet(); logger.minorCompactionFinished(tablet, newDatafile, walogSeq); + removeUnusedWALs(); } public void minorCompactionStarted(CommitSession tablet, int lastUpdateSequence, String newMapfileLocation) throws IOException { @@ -2986,14 +2905,11 @@ public class TabletServer extends AccumuloServerContext implements Runnable { }); for (LogEntry entry : sorted) { Path recovery = null; - for (String log : entry.logSet) { - Path finished = RecoveryPath.getRecoveryPath(fs, fs.getFullPath(FileType.WAL, log)); - finished = SortedLogState.getFinishedMarkerPath(finished); - TabletServer.log.info("Looking for " + finished); - if (fs.exists(finished)) { - recovery = finished.getParent(); - break; - } + Path finished = RecoveryPath.getRecoveryPath(fs, fs.getFullPath(FileType.WAL, entry.filename)); + finished = SortedLogState.getFinishedMarkerPath(finished); + TabletServer.log.info("Looking for " + finished); + if (fs.exists(finished)) { + recovery = finished.getParent(); } if (recovery == null) throw new IOException("Unable to find recovery files for extent " + extent + " logEntry: " + entry); @@ -3035,7 +2951,9 @@ public class TabletServer extends AccumuloServerContext implements Runnable { } public Collection<Tablet> getOnlineTablets() { - return Collections.unmodifiableCollection(onlineTablets.values()); + synchronized (onlineTablets) { + return new ArrayList<Tablet>(onlineTablets.values()); + } } public VolumeManager getFileSystem() { @@ -3061,4 +2979,54 @@ public class TabletServer extends AccumuloServerContext implements Runnable { public SecurityOperation getSecurityOperation() { return security; } + + // avoid unnecessary redundant markings to meta + ConcurrentHashMap<DfsLogger, EnumSet<TabletLevel>> metadataTableLogs = new ConcurrentHashMap<>(); + + // remove any meta entries after a rolled log is no longer referenced + Set<DfsLogger> closedLogs = new HashSet<>(); + + private void removeUnusedWALs() { + Set<DfsLogger> candidates; + synchronized (closedLogs) { + candidates = new HashSet<>(closedLogs); + } + for (Tablet tablet : getOnlineTablets()) { + candidates.removeAll(tablet.getCurrentLogFiles()); + } + try { + Set<String> filenames = new HashSet<>(); + for (DfsLogger candidate : candidates) { + filenames.add(candidate.getFileName()); + } + MetadataTableUtil.markLogUnused(this, this.getLock(), this.getTabletSession(), filenames); + synchronized (closedLogs) { + closedLogs.removeAll(candidates); + } + } catch (AccumuloException ex) { + log.info(ex.toString(), ex); + } + } + + public void addLoggersToMetadata(DfsLogger copy, KeyExtent extent) { + TabletLevel level = TabletLevel.getLevel(extent); + synchronized (level) { + EnumSet<TabletLevel> set = metadataTableLogs.putIfAbsent(copy, EnumSet.of(level)); + if (set == null || !set.contains(level) || level == TabletLevel.ROOT) { + log.info("Writing log marker for level " + level + " " + copy.getFileName()); + MetadataTableUtil.addNewLogMarker(this, this.getLock(), this.getTabletSession(), copy.getFileName(), extent); + if (set != null) { + set.add(level); + } + } + } + } + + public void walogClosed(DfsLogger currentLog) { + metadataTableLogs.remove(currentLog); + synchronized (closedLogs) { + closedLogs.add(currentLog); + } + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java index 5acf5eb..f8bcfbc 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java @@ -74,7 +74,7 @@ import com.google.common.base.Optional; * Wrap a connection to a logger. * */ -public class DfsLogger { +public class DfsLogger implements Comparable<DfsLogger> { public static final String LOG_FILE_HEADER_V2 = "--- Log File Header (v2) ---"; public static final String LOG_FILE_HEADER_V3 = "--- Log File Header (v3) ---"; @@ -621,4 +621,9 @@ public class DfsLogger { return Joiner.on(":").join(parts[parts.length - 2].split("[+]")); } + @Override + public int compareTo(DfsLogger o) { + return getFileName().compareTo(o.getFileName()); + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java index 405ec70..efdbbf4 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java @@ -179,7 +179,7 @@ public class SortedLogRecovery { // find the maximum tablet id... because a tablet may leave a tserver and then come back, in which case it would have a different tablet id // for the maximum tablet id, find the minimum sequence #... may be ok to find the max seq, but just want to make the code behave like it used to while (reader.next(key, value)) { - // LogReader.printEntry(entry); + // log.debug("Event " + key.event + " tablet " + key.tablet); if (key.event != DEFINE_TABLET) break; if (key.tablet.equals(extent) || key.tablet.equals(alternative)) { @@ -208,7 +208,7 @@ public class SortedLogRecovery { if (lastStartToFinish.compactionStatus == Status.INITIAL) lastStartToFinish.compactionStatus = Status.COMPLETE; if (key.seq <= lastStartToFinish.lastStart) - throw new RuntimeException("Sequence numbers are not increasing for start/stop events."); + throw new RuntimeException("Sequence numbers are not increasing for start/stop events: " + key.seq + " vs " + lastStartToFinish.lastStart); lastStartToFinish.update(fileno, key.seq); // Tablet server finished the minor compaction, but didn't remove the entry from the METADATA table. @@ -217,7 +217,7 @@ public class SortedLogRecovery { lastStartToFinish.update(-1); } else if (key.event == COMPACTION_FINISH) { if (key.seq <= lastStartToFinish.lastStart) - throw new RuntimeException("Sequence numbers are not increasing for start/stop events."); + throw new RuntimeException("Sequence numbers are not increasing for start/stop events: " + key.seq + " vs " + lastStartToFinish.lastStart); if (lastStartToFinish.compactionStatus == Status.INITIAL) lastStartToFinish.compactionStatus = Status.LOOKING_FOR_FINISH; else if (lastStartToFinish.lastFinish > lastStartToFinish.lastStart) @@ -248,8 +248,6 @@ public class SortedLogRecovery { break; if (key.tid != tid) break; - // log.info("Replaying " + key); - // log.info(value); if (key.event == MUTATION) { mr.receive(value.mutations.get(0)); } else if (key.event == MANY_MUTATIONS) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 5c3fc2d..6455726 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -29,6 +28,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -37,7 +37,6 @@ import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationConfigurationUtil; -import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.server.conf.TableConfiguration; @@ -49,6 +48,7 @@ import org.apache.accumulo.tserver.TabletMutations; import org.apache.accumulo.tserver.TabletServer; import org.apache.accumulo.tserver.log.DfsLogger.LoggerOperation; import org.apache.accumulo.tserver.tablet.CommitSession; +import org.apache.accumulo.tserver.tablet.Tablet; import org.apache.hadoop.fs.Path; import org.apache.log4j.Logger; @@ -71,20 +71,22 @@ public class TabletServerLogger { private final TabletServer tserver; - // The current log set: always updated to a new set with every change of loggers - private final List<DfsLogger> loggers = new ArrayList<DfsLogger>(); + // The current logger + private DfsLogger currentLog = null; + private DfsLogger nextLog = null; + private Thread nextLogThread = null; - // The current generation of logSet. - // Because multiple threads can be using a log set at one time, a log + // The current generation of logs. + // Because multiple threads can be using a log at one time, a log // failure is likely to affect multiple threads, who will all attempt to - // create a new logSet. This will cause many unnecessary updates to the + // create a new log. This will cause many unnecessary updates to the // metadata table. // We'll use this generational counter to determine if another thread has - // already fetched a new logSet. - private AtomicInteger logSetId = new AtomicInteger(); + // already fetched a new log. + private final AtomicInteger logId = new AtomicInteger(); // Use a ReadWriteLock to allow multiple threads to use the log set, but obtain a write lock to change them - private final ReentrantReadWriteLock logSetLock = new ReentrantReadWriteLock(); + private final ReentrantReadWriteLock logIdLock = new ReentrantReadWriteLock(); private final AtomicInteger seqGen = new AtomicInteger(); @@ -145,61 +147,66 @@ public class TabletServerLogger { this.flushCounter = flushCounter; } - private int initializeLoggers(final List<DfsLogger> copy) throws IOException { - final int[] result = {-1}; - testLockAndRun(logSetLock, new TestCallWithWriteLock() { + private DfsLogger initializeLoggers(final AtomicInteger logIdOut) throws IOException { + final AtomicReference<DfsLogger> result = new AtomicReference<DfsLogger>(); + testLockAndRun(logIdLock, new TestCallWithWriteLock() { @Override boolean test() { - copy.clear(); - copy.addAll(loggers); - if (!loggers.isEmpty()) - result[0] = logSetId.get(); - return loggers.isEmpty(); + result.set(currentLog); + if (currentLog != null) + logIdOut.set(logId.get()); + return currentLog == null; } @Override void withWriteLock() throws IOException { try { - createLoggers(); - copy.clear(); - copy.addAll(loggers); - if (copy.size() > 0) - result[0] = logSetId.get(); + createLogger(); + result.set(currentLog); + if (currentLog != null) + logIdOut.set(logId.get()); else - result[0] = -1; + logIdOut.set(-1); } catch (IOException e) { log.error("Unable to create loggers", e); } } }); - return result[0]; + return result.get(); } - public void getLogFiles(Set<String> loggersOut) { - logSetLock.readLock().lock(); + public String getLogFile() { + logIdLock.readLock().lock(); try { - for (DfsLogger logger : loggers) { - loggersOut.add(logger.getFileName()); - } + return currentLog.getFileName(); } finally { - logSetLock.readLock().unlock(); + logIdLock.readLock().unlock(); } } - synchronized private void createLoggers() throws IOException { - if (!logSetLock.isWriteLockedByCurrentThread()) { + synchronized private void createLogger() throws IOException { + if (!logIdLock.isWriteLockedByCurrentThread()) { throw new IllegalStateException("createLoggers should be called with write lock held!"); } - if (loggers.size() != 0) { - throw new IllegalStateException("createLoggers should not be called when loggers.size() is " + loggers.size()); + if (currentLog != null) { + throw new IllegalStateException("createLoggers should not be called when current log is set"); } try { - DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter); - alog.open(tserver.getClientAddressString()); - loggers.add(alog); - logSetId.incrementAndGet(); + if (nextLog != null) { + log.info("Using next log " + nextLog.getFileName()); + currentLog = nextLog; + nextLog = null; + } else { + DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter); + alog.open(tserver.getClientAddressString()); + currentLog = alog; + } + if (nextLog == null) { + createNextLog(); + } + logId.incrementAndGet(); return; } catch (Exception t) { walErrors.put(System.currentTimeMillis(), ""); @@ -210,30 +217,58 @@ public class TabletServerLogger { } } + private void createNextLog() { + if (nextLogThread == null) { + nextLogThread = new Thread() { + @Override + public void run() { + try { + log.info("Creating next WAL"); + this.setName("Creating next WAL"); + DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter); + alog.open(tserver.getClientAddressString()); + for (Tablet tablet : tserver.getOnlineTablets()) { + // TODO + tserver.addLoggersToMetadata(alog, tablet.getExtent()); + } + nextLog = alog; + + log.info("Created next WAL " + alog.getFileName()); + } catch (Exception t) { + log.error(t, t); + } finally { + nextLogThread = null; + } + } + }; + nextLogThread.start(); + } + } + public void resetLoggers() throws IOException { - logSetLock.writeLock().lock(); + logIdLock.writeLock().lock(); try { close(); } finally { - logSetLock.writeLock().unlock(); + logIdLock.writeLock().unlock(); } } synchronized private void close() throws IOException { - if (!logSetLock.isWriteLockedByCurrentThread()) { + if (!logIdLock.isWriteLockedByCurrentThread()) { throw new IllegalStateException("close should be called with write lock held!"); } try { - for (DfsLogger logger : loggers) { - try { - logger.close(); - } catch (DfsLogger.LogClosedException ex) { - // ignore - } catch (Throwable ex) { - log.error("Unable to cleanly close log " + logger.getFileName() + ": " + ex, ex); - } + try { + currentLog.close(); + } catch (DfsLogger.LogClosedException ex) { + // ignore + } catch (Throwable ex) { + log.error("Unable to cleanly close log " + currentLog.getFileName() + ": " + ex, ex); + } finally { + this.tserver.walogClosed(currentLog); } - loggers.clear(); + currentLog = null; logSizeEstimate.set(0); } catch (Throwable t) { throw new IOException(t); @@ -251,7 +286,7 @@ public class TabletServerLogger { private int write(final Collection<CommitSession> sessions, boolean mincFinish, Writer writer) throws IOException { // Work very hard not to lock this during calls to the outside world - int currentLogSet = logSetId.get(); + int currentLogId = logId.get(); int seq = -1; int attempt = 1; @@ -259,20 +294,22 @@ public class TabletServerLogger { while (!success) { try { // get a reference to the loggers that no other thread can touch - ArrayList<DfsLogger> copy = new ArrayList<DfsLogger>(); - currentLogSet = initializeLoggers(copy); + DfsLogger copy = null; + AtomicInteger currentId = new AtomicInteger(-1); + copy = initializeLoggers(currentId); + currentLogId = currentId.get(); // add the logger to the log set for the memory in the tablet, // update the metadata table if we've never used this tablet - if (currentLogSet == logSetId.get()) { + if (currentLogId == logId.get()) { for (CommitSession commitSession : sessions) { if (commitSession.beginUpdatingLogsUsed(copy, mincFinish)) { try { // Scribble out a tablet definition and then write to the metadata table defineTablet(commitSession); - if (currentLogSet == logSetId.get()) - tserver.addLoggersToMetadata(copy, commitSession.getExtent(), commitSession.getLogId()); + if (currentLogId == logId.get()) + tserver.addLoggersToMetadata(copy, commitSession.getExtent()); } finally { commitSession.finishUpdatingLogsUsed(); } @@ -280,39 +317,29 @@ public class TabletServerLogger { // Need to release KeyExtent extent = commitSession.getExtent(); if (ReplicationConfigurationUtil.isEnabled(extent, tserver.getTableConfiguration(extent))) { - Set<String> logs = new HashSet<String>(); - for (DfsLogger logger : copy) { - logs.add(logger.getFileName()); - } - Status status = StatusUtil.fileCreated(System.currentTimeMillis()); - log.debug("Writing " + ProtobufUtil.toString(status) + " to metadata table for " + logs); + Status status = Status.newBuilder().setInfiniteEnd(true).setCreatedTime(System.currentTimeMillis()).build(); + log.debug("Writing " + ProtobufUtil.toString(status) + " to metadata table for " + copy.getFileName()); // Got some new WALs, note this in the metadata table - ReplicationTableUtil.updateFiles(tserver, commitSession.getExtent(), logs, status); + ReplicationTableUtil.updateFiles(tserver, commitSession.getExtent(), copy.getFileName(), status); } } } } // Make sure that the logs haven't changed out from underneath our copy - if (currentLogSet == logSetId.get()) { + if (currentLogId == logId.get()) { // write the mutation to the logs seq = seqGen.incrementAndGet(); if (seq < 0) throw new RuntimeException("Logger sequence generator wrapped! Onos!!!11!eleven"); - ArrayList<LoggerOperation> queuedOperations = new ArrayList<LoggerOperation>(copy.size()); - for (DfsLogger wal : copy) { - LoggerOperation lop = writer.write(wal, seq); - if (lop != null) - queuedOperations.add(lop); - } - - for (LoggerOperation lop : queuedOperations) { + LoggerOperation lop = writer.write(copy, seq); + if (lop != null) { lop.await(); } // double-check: did the log set change? - success = (currentLogSet == logSetId.get()); + success = (currentLogId == logId.get()); } } catch (DfsLogger.LogClosedException ex) { log.debug("Logs closed while writing, retrying " + attempt); @@ -327,13 +354,13 @@ public class TabletServerLogger { // Some sort of write failure occurred. Grab the write lock and reset the logs. // But since multiple threads will attempt it, only attempt the reset when // the logs haven't changed. - final int finalCurrent = currentLogSet; + final int finalCurrent = currentLogId; if (!success) { - testLockAndRun(logSetLock, new TestCallWithWriteLock() { + testLockAndRun(logIdLock, new TestCallWithWriteLock() { @Override boolean test() { - return finalCurrent == logSetId.get(); + return finalCurrent == logId.get(); } @Override @@ -346,7 +373,7 @@ public class TabletServerLogger { } // if the log gets too big, reset it .. grab the write lock first logSizeEstimate.addAndGet(4 * 3); // event, tid, seq overhead - testLockAndRun(logSetLock, new TestCallWithWriteLock() { + testLockAndRun(logIdLock, new TestCallWithWriteLock() { @Override boolean test() { return logSizeEstimate.get() > maxSize; http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java index 17290c0..70b1922 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CommitSession.java @@ -16,7 +16,6 @@ */ package org.apache.accumulo.tserver.tablet; -import java.util.ArrayList; import java.util.List; import org.apache.accumulo.core.data.KeyExtent; @@ -85,7 +84,7 @@ public class CommitSession { return committer; } - public boolean beginUpdatingLogsUsed(ArrayList<DfsLogger> copy, boolean mincFinish) { + public boolean beginUpdatingLogsUsed(DfsLogger copy, boolean mincFinish) { return committer.beginUpdatingLogsUsed(memTable, copy, mincFinish); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java index 8ba8128..b05f0c6 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java @@ -424,7 +424,9 @@ class DatafileManager { if (log.isDebugEnabled()) { log.debug("Recording that data has been ingested into " + tablet.getExtent() + " using " + logFileOnly); } - ReplicationTableUtil.updateFiles(tablet.getTabletServer(), tablet.getExtent(), logFileOnly, StatusUtil.openWithUnknownLength()); + for (String logFile : logFileOnly) { + ReplicationTableUtil.updateFiles(tablet.getTabletServer(), tablet.getExtent(), logFile, StatusUtil.openWithUnknownLength()); + } } } finally { tablet.finishClearingUnusedLogs(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index f2d5375..fa1ae86 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -37,6 +37,7 @@ import java.util.PriorityQueue; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -198,7 +199,7 @@ public class Tablet implements TabletCommitter { } // stores info about user initiated major compaction that is waiting on a minor compaction to finish - private CompactionWaitInfo compactionWaitInfo = new CompactionWaitInfo(); + private final CompactionWaitInfo compactionWaitInfo = new CompactionWaitInfo(); static enum CompactionState { WAITING_TO_START, IN_PROGRESS @@ -628,8 +629,8 @@ public class Tablet implements TabletCommitter { // the WAL isn't closed (WRT replication Status) and thus we're safe to update its progress. Status status = StatusUtil.openWithUnknownLength(); for (LogEntry logEntry : logEntries) { - log.debug("Writing updated status to metadata table for " + logEntry.logSet + " " + ProtobufUtil.toString(status)); - ReplicationTableUtil.updateFiles(tabletServer, extent, logEntry.logSet, status); + log.debug("Writing updated status to metadata table for " + logEntry.filename + " " + ProtobufUtil.toString(status)); + ReplicationTableUtil.updateFiles(tabletServer, extent, logEntry.filename, status); } } @@ -641,11 +642,9 @@ public class Tablet implements TabletCommitter { } } // make some closed references that represent the recovered logs - currentLogs = new HashSet<DfsLogger>(); + currentLogs = new ConcurrentSkipListSet<DfsLogger>(); for (LogEntry logEntry : logEntries) { - for (String log : logEntry.logSet) { - currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), log, logEntry.getColumnQualifier().toString())); - } + currentLogs.add(new DfsLogger(tabletServer.getServerConfig(), logEntry.filename, logEntry.getColumnQualifier().toString())); } log.info("Write-Ahead Log recovery complete for " + this.extent + " (" + count[0] + " mutations applied, " + getTabletMemory().getNumEntries() @@ -937,6 +936,8 @@ public class Tablet implements TabletCommitter { long count = 0; try { + String oldName = Thread.currentThread().getName(); + Thread.currentThread().setName("Minor compacting " + this.extent); Span span = Trace.start("write"); CompactionStats stats; try { @@ -957,6 +958,7 @@ public class Tablet implements TabletCommitter { commitSession, flushId); } finally { span.stop(); + Thread.currentThread().setName(oldName); } return new DataFileValue(stats.getFileSize(), stats.getEntriesWritten()); } catch (Exception e) { @@ -991,7 +993,7 @@ public class Tablet implements TabletCommitter { private synchronized MinorCompactionTask prepareForMinC(long flushId, MinorCompactionReason mincReason) { CommitSession oldCommitSession = getTabletMemory().prepareForMinC(); otherLogs = currentLogs; - currentLogs = new HashSet<DfsLogger>(); + currentLogs = new ConcurrentSkipListSet<DfsLogger>(); FileRef mergeFile = null; if (mincReason != MinorCompactionReason.RECOVERY) { @@ -2373,14 +2375,10 @@ public class Tablet implements TabletCommitter { } } - private Set<DfsLogger> currentLogs = new HashSet<DfsLogger>(); + private ConcurrentSkipListSet<DfsLogger> currentLogs = new ConcurrentSkipListSet<DfsLogger>(); - public synchronized Set<String> getCurrentLogFiles() { - Set<String> result = new HashSet<String>(); - for (DfsLogger log : currentLogs) { - result.add(log.getFileName()); - } - return result; + public Set<DfsLogger> getCurrentLogFiles() { + return new HashSet<DfsLogger>(currentLogs); } Set<String> beginClearingUnusedLogs() { @@ -2439,12 +2437,12 @@ public class Tablet implements TabletCommitter { // this lock is basically used to synchronize writing of log info to metadata private final ReentrantLock logLock = new ReentrantLock(); - public synchronized int getLogCount() { + public int getLogCount() { return currentLogs.size(); } @Override - public boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<DfsLogger> more, boolean mincFinish) { + public boolean beginUpdatingLogsUsed(InMemoryMap memTable, DfsLogger more, boolean mincFinish) { boolean releaseLock = true; @@ -2481,28 +2479,26 @@ public class Tablet implements TabletCommitter { int numAdded = 0; int numContained = 0; - for (DfsLogger logger : more) { - if (addToOther) { - if (otherLogs.add(logger)) - numAdded++; + if (addToOther) { + if (otherLogs.add(more)) + numAdded++; - if (currentLogs.contains(logger)) - numContained++; - } else { - if (currentLogs.add(logger)) - numAdded++; + if (currentLogs.contains(more)) + numContained++; + } else { + if (currentLogs.add(more)) + numAdded++; - if (otherLogs.contains(logger)) - numContained++; - } + if (otherLogs.contains(more)) + numContained++; } - if (numAdded > 0 && numAdded != more.size()) { + if (numAdded > 0 && numAdded != 1) { // expect to add all or none throw new IllegalArgumentException("Added subset of logs " + extent + " " + more + " " + currentLogs); } - if (numContained > 0 && numContained != more.size()) { + if (numContained > 0 && numContained != 1) { // expect to contain all or none throw new IllegalArgumentException("Other logs contained subset of logs " + extent + " " + more + " " + otherLogs); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java index b56d0af..39bde5c 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletCommitter.java @@ -16,7 +16,6 @@ */ package org.apache.accumulo.tserver.tablet; -import java.util.Collection; import java.util.List; import org.apache.accumulo.core.client.Durability; @@ -35,7 +34,7 @@ public interface TabletCommitter { void commit(CommitSession commitSession, List<Mutation> mutations); - boolean beginUpdatingLogsUsed(InMemoryMap memTable, Collection<DfsLogger> copy, boolean mincFinish); + boolean beginUpdatingLogsUsed(InMemoryMap memTable, DfsLogger copy, boolean mincFinish); void finishUpdatingLogsUsed(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java index b429607..b8a60c1 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java @@ -202,9 +202,6 @@ public class NullTserver { } @Override - public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {} - - @Override public List<ActiveCompaction> getActiveCompactions(TInfo tinfo, TCredentials credentials) throws ThriftSecurityException, TException { return new ArrayList<ActiveCompaction>(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java b/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java index 45799c4..de8ebc8 100644 --- a/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java +++ b/test/src/test/java/org/apache/accumulo/proxy/ProxyDurabilityIT.java @@ -58,6 +58,11 @@ import com.google.common.net.HostAndPort; public class ProxyDurabilityIT extends ConfigurableMacIT { @Override + protected int defaultTimeoutSeconds() { + return 60; + } + + @Override public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName()); cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java b/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java index 9cae889..93ca138 100644 --- a/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java +++ b/test/src/test/java/org/apache/accumulo/test/BadDeleteMarkersCreatedIT.java @@ -53,7 +53,7 @@ public class BadDeleteMarkersCreatedIT extends AccumuloClusterIT { @Override public int defaultTimeoutSeconds() { - return 60; + return 120; } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/BalanceIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/BalanceIT.java b/test/src/test/java/org/apache/accumulo/test/BalanceIT.java index f793925..8703f18 100644 --- a/test/src/test/java/org/apache/accumulo/test/BalanceIT.java +++ b/test/src/test/java/org/apache/accumulo/test/BalanceIT.java @@ -20,25 +20,33 @@ import java.util.SortedSet; import java.util.TreeSet; import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.accumulo.harness.AccumuloClusterIT; import org.apache.hadoop.io.Text; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -public class BalanceIT extends ConfigurableMacIT { +public class BalanceIT extends AccumuloClusterIT { + private static final Logger log = LoggerFactory.getLogger(BalanceIT.class); - @Test(timeout = 60 * 1000) + @Override + public int defaultTimeoutSeconds() { + return 60; + } + + @Test public void testBalance() throws Exception { String tableName = getUniqueNames(1)[0]; Connector c = getConnector(); - System.out.println("Creating table"); + log.info("Creating table"); c.tableOperations().create(tableName); SortedSet<Text> splits = new TreeSet<Text>(); for (int i = 0; i < 10; i++) { splits.add(new Text("" + i)); } - System.out.println("Adding splits"); + log.info("Adding splits"); c.tableOperations().addSplits(tableName, splits); - System.out.println("Waiting for balance"); + log.info("Waiting for balance"); c.instanceOperations().waitForBalance(); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java index bdfbd13..c113a08 100644 --- a/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java +++ b/test/src/test/java/org/apache/accumulo/test/CleanWalIT.java @@ -125,6 +125,7 @@ public class CleanWalIT extends AccumuloClusterIT { private int countLogs(String tableName, Connector conn) throws TableNotFoundException { Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); scanner.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME); + scanner.setRange(MetadataSchema.TabletsSection.getRange()); int count = 0; for (Entry<Key,Value> entry : scanner) { log.debug("Saw " + entry.getKey() + "=" + entry.getValue()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java index b68870d..7bd1842 100644 --- a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java +++ b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterIT.java @@ -1251,6 +1251,7 @@ public class ConditionalWriterIT extends AccumuloClusterIT { conn.tableOperations().create(tableName); DistributedTrace.enable("localhost", "testTrace", mac.getClientConfig()); + UtilWaitThread.sleep(1000); Span root = Trace.on("traceTest"); ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java index 7f2f6f9..685d71a 100644 --- a/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java +++ b/test/src/test/java/org/apache/accumulo/test/MissingWalHeaderCompletesRecoveryIT.java @@ -17,7 +17,6 @@ package org.apache.accumulo.test; import java.io.File; -import java.util.Collections; import java.util.UUID; import org.apache.accumulo.core.client.BatchWriter; @@ -25,6 +24,7 @@ import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.KeyExtent; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema; @@ -125,11 +125,7 @@ public class MissingWalHeaderCompletesRecoveryIT extends ConfigurableMacIT { String tableId = conn.tableOperations().tableIdMap().get(tableName); Assert.assertNotNull("Table ID was null", tableId); - LogEntry logEntry = new LogEntry(); - logEntry.server = "127.0.0.1:12345"; - logEntry.filename = emptyWalog.toURI().toString(); - logEntry.tabletId = 10; - logEntry.logSet = Collections.singleton(logEntry.filename); + LogEntry logEntry = new LogEntry(new KeyExtent(new Text(tableId), null, null), 0, "127.0.0.1:12345", emptyWalog.toURI().toString()); log.info("Taking {} offline", tableName); conn.tableOperations().offline(tableName, true); @@ -184,11 +180,7 @@ public class MissingWalHeaderCompletesRecoveryIT extends ConfigurableMacIT { String tableId = conn.tableOperations().tableIdMap().get(tableName); Assert.assertNotNull("Table ID was null", tableId); - LogEntry logEntry = new LogEntry(); - logEntry.server = "127.0.0.1:12345"; - logEntry.filename = partialHeaderWalog.toURI().toString(); - logEntry.tabletId = 10; - logEntry.logSet = Collections.singleton(logEntry.filename); + LogEntry logEntry = new LogEntry(null, 0, "127.0.0.1:12345", partialHeaderWalog.toURI().toString()); log.info("Taking {} offline", tableName); conn.tableOperations().offline(tableName, true); http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java b/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java deleted file mode 100644 index 10b8810..0000000 --- a/test/src/test/java/org/apache/accumulo/test/NoMutationRecoveryIT.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.Map.Entry; - -import org.apache.accumulo.cluster.ClusterControl; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.PartialKey; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.metadata.MetadataTable; -import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.metadata.schema.MetadataSchema; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.TablePermission; -import org.apache.accumulo.harness.AccumuloClusterIT; -import org.apache.accumulo.minicluster.ServerType; -import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; -import org.apache.accumulo.test.functional.FunctionalTestUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.junit.Test; - -// Verify that a recovery of a log without any mutations removes the log reference -public class NoMutationRecoveryIT extends AccumuloClusterIT { - - @Override - public int defaultTimeoutSeconds() { - return 10 * 60; - } - - @Override - public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { - cfg.useMiniDFS(true); - cfg.setNumTservers(1); - } - - public boolean equals(Entry<Key,Value> a, Entry<Key,Value> b) { - // comparison, without timestamp - Key akey = a.getKey(); - Key bkey = b.getKey(); - return akey.compareTo(bkey, PartialKey.ROW_COLFAM_COLQUAL_COLVIS) == 0 && a.getValue().equals(b.getValue()); - } - - @Test - public void test() throws Exception { - Connector conn = getConnector(); - final String table = getUniqueNames(1)[0]; - conn.tableOperations().create(table); - String tableId = conn.tableOperations().tableIdMap().get(table); - update(conn, table, new Text("row"), new Text("cf"), new Text("cq"), new Value("value".getBytes())); - Entry<Key,Value> logRef = getLogRef(conn, MetadataTable.NAME); - conn.tableOperations().flush(table, null, null, true); - assertEquals("should not have any refs", 0, FunctionalTestUtils.count(getLogRefs(conn, MetadataTable.NAME, Range.prefix(tableId)))); - conn.securityOperations().grantTablePermission(conn.whoami(), MetadataTable.NAME, TablePermission.WRITE); - update(conn, MetadataTable.NAME, logRef); - assertTrue(equals(logRef, getLogRef(conn, MetadataTable.NAME))); - conn.tableOperations().flush(MetadataTable.NAME, null, null, true); - conn.tableOperations().flush(RootTable.NAME, null, null, true); - - ClusterControl control = cluster.getClusterControl(); - control.stopAllServers(ServerType.TABLET_SERVER); - control.startAllServers(ServerType.TABLET_SERVER); - - Scanner s = conn.createScanner(table, Authorizations.EMPTY); - int count = 0; - for (Entry<Key,Value> e : s) { - assertEquals(e.getKey().getRow().toString(), "row"); - assertEquals(e.getKey().getColumnFamily().toString(), "cf"); - assertEquals(e.getKey().getColumnQualifier().toString(), "cq"); - assertEquals(e.getValue().toString(), "value"); - count++; - } - assertEquals(1, count); - for (Entry<Key,Value> ref : getLogRefs(conn, MetadataTable.NAME)) { - assertFalse(equals(ref, logRef)); - } - } - - private void update(Connector conn, String name, Entry<Key,Value> logRef) throws Exception { - Key k = logRef.getKey(); - update(conn, name, k.getRow(), k.getColumnFamily(), k.getColumnQualifier(), logRef.getValue()); - } - - private Iterable<Entry<Key,Value>> getLogRefs(Connector conn, String table) throws Exception { - return getLogRefs(conn, table, new Range()); - } - - private Iterable<Entry<Key,Value>> getLogRefs(Connector conn, String table, Range r) throws Exception { - Scanner s = conn.createScanner(table, Authorizations.EMPTY); - s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME); - s.setRange(r); - return s; - } - - private Entry<Key,Value> getLogRef(Connector conn, String table) throws Exception { - return getLogRefs(conn, table).iterator().next(); - } - - private void update(Connector conn, String table, Text row, Text cf, Text cq, Value value) throws Exception { - BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig()); - Mutation m = new Mutation(row); - m.put(cf, cq, value); - bw.addMutation(m); - bw.close(); - } - -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java index 72f098f..21d1115 100644 --- a/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java +++ b/test/src/test/java/org/apache/accumulo/test/ShellServerIT.java @@ -299,7 +299,7 @@ public class ShellServerIT extends SharedMiniClusterIT { ts.exec("config -t " + table2 + " -np", true, "345M", true); ts.exec("getsplits -t " + table2, true, "row5", true); ts.exec("constraint --list -t " + table2, true, "VisibilityConstraint=2", true); - ts.exec("onlinetable " + table, true); + ts.exec("online " + table, true); ts.exec("deletetable -f " + table, true); ts.exec("deletetable -f " + table2, true); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java b/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java index 6c81369..d83f038 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/ExamplesIT.java @@ -201,10 +201,10 @@ public class ExamplesIT extends AccumuloClusterIT { Entry<Integer,String> entry = getClusterControl().execWithStdout( Ingest.class, new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "--dirTable", dirTable, "--indexTable", indexTable, "--dataTable", dataTable, - "--vis", visibility, "--chunkSize", Integer.toString(10000), getUsableDir()}); + "--vis", visibility, "--chunkSize", Integer.toString(10000), System.getProperty("user.dir") + "/src/test"}); assertEquals("Got non-zero return code. Stdout=" + entry.getValue(), 0, entry.getKey().intValue()); entry = getClusterControl().execWithStdout(QueryUtil.class, - new String[] {"-i", instance, "-z", keepers, "-p", passwd, "-u", user, "-t", indexTable, "--auths", auths, "--search", "--path", "accumulo-site.xml"}); + new String[] {"-i", instance, "-z", keepers, "-p", passwd, "-u", user, "-t", indexTable, "--auths", auths, "--search", "--path", "log4j.properties"}); if (ClusterType.MINI == getClusterType()) { MiniAccumuloClusterImpl impl = (MiniAccumuloClusterImpl) cluster; for (LogWriter writer : impl.getLogWriters()) { @@ -214,7 +214,7 @@ public class ExamplesIT extends AccumuloClusterIT { log.info("result " + entry.getValue()); assertEquals(0, entry.getKey().intValue()); - assertTrue(entry.getValue().contains("accumulo-site.xml")); + assertTrue(entry.getValue().contains("log4j.properties")); } @Test http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java b/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java index bd0555b..4c86172 100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/WatchTheWatchCountIT.java @@ -51,8 +51,8 @@ public class WatchTheWatchCountIT extends ConfigurableMacIT { int n = socket.getInputStream().read(buffer); String response = new String(buffer, 0, n); long total = Long.parseLong(response.split(":")[1].trim()); - assertTrue("Total watches was not greater than 500, but was " + total, total > 500); - assertTrue("Total watches was not less than 600, but was " + total, total < 600); + assertTrue("Total watches was not greater than 600, but was " + total, total > 600); + assertTrue("Total watches was not less than 600, but was " + total, total < 675); } finally { socket.close(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java b/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java new file mode 100644 index 0000000..fcd1fd7 --- /dev/null +++ b/test/src/test/java/org/apache/accumulo/test/performance/RollWALPerformanceIT.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.performance; + +import static org.junit.Assert.assertTrue; + +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl; +import org.apache.accumulo.minicluster.impl.ProcessReference; +import org.apache.accumulo.test.continuous.ContinuousIngest; +import org.apache.accumulo.test.functional.ConfigurableMacIT; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +public class RollWALPerformanceIT extends ConfigurableMacIT { + + @Override + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.TSERV_WAL_REPLICATION, "1"); + cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "10M"); + cfg.setProperty(Property.TABLE_MINC_LOGS_MAX, "100"); + cfg.setProperty(Property.GC_FILE_ARCHIVE, "false"); + cfg.setProperty(Property.GC_CYCLE_START, "1s"); + cfg.setProperty(Property.GC_CYCLE_DELAY, "1s"); + cfg.useMiniDFS(true); + } + + private long ingest() throws Exception { + final Connector c = getConnector(); + final String tableName = getUniqueNames(1)[0]; + + log.info("Creating the table"); + c.tableOperations().create(tableName); + + log.info("Splitting the table"); + final long SPLIT_COUNT = 100; + final long distance = Long.MAX_VALUE / SPLIT_COUNT; + final SortedSet<Text> splits = new TreeSet<Text>(); + for (int i = 1; i < SPLIT_COUNT; i++) { + splits.add(new Text(String.format("%016x", i * distance))); + } + c.tableOperations().addSplits(tableName, splits); + + log.info("Waiting for balance"); + c.instanceOperations().waitForBalance(); + + final Instance inst = c.getInstance(); + + log.info("Starting ingest"); + final long start = System.currentTimeMillis(); + final String args[] = { + "-i", inst.getInstanceName(), + "-z", inst.getZooKeepers(), + "-u", "root", + "-p", ROOT_PASSWORD, + "--batchThreads", "2", + "--table", tableName, + "--num", Long.toString(1000*1000), // 1M 100 byte entries + }; + + ContinuousIngest.main(args); + final long result = System.currentTimeMillis() - start; + log.debug(String.format("Finished in %,d ms", result)); + log.debug("Dropping table"); + c.tableOperations().delete(tableName); + return result; + } + + private long getAverage() throws Exception { + final int REPEAT = 3; + long totalTime = 0; + for (int i = 0; i < REPEAT; i++) { + totalTime += ingest(); + } + return totalTime / REPEAT; + } + + private void testWalPerformanceOnce() throws Exception { + // get time with a small WAL, which will cause many WAL roll-overs + long avg1 = getAverage(); + // use a bigger WAL max size to eliminate WAL roll-overs + Connector c = getConnector(); + c.instanceOperations().setProperty(Property.TSERV_WALOG_MAX_SIZE.getKey(), "1G"); + c.tableOperations().flush(MetadataTable.NAME, null, null, true); + c.tableOperations().flush(RootTable.NAME, null, null, true); + for (ProcessReference tserver : getCluster().getProcesses().get(ServerType.TABLET_SERVER)) { + getCluster().killProcess(ServerType.TABLET_SERVER, tserver); + } + getCluster().start(); + long avg2 = getAverage(); + log.info(String.format("Average run time with small WAL %,d with large WAL %,d", avg1, avg2)); + assertTrue(avg1 > avg2); + double percent = (100. * avg1) / avg2; + log.info(String.format("Percent of large log: %.2f%%", percent)); + assertTrue(percent < 125.); + } + + @Test(timeout= 20 * 60 * 1000) + public void testWalPerformance() throws Exception { + testWalPerformanceOnce(); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java index 1ef47e5..9af5445 100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java @@ -38,6 +38,7 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.master.thrift.MasterClientService; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection; import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.replication.proto.Replication.Status; @@ -78,6 +79,7 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI @Override public void configure(MiniAccumuloConfigImpl cfg, Configuration coreSite) { cfg.setNumTservers(1); + cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "5s"); cfg.setProperty(Property.GC_CYCLE_DELAY, GC_PERIOD_SECONDS + "s"); // Wait longer to try to let the replication table come online before a cycle runs cfg.setProperty(Property.GC_CYCLE_START, "10s"); @@ -102,18 +104,14 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI Assert.assertNotNull("Could not determine table ID for " + tableName, tableId); Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - Range r = MetadataSchema.TabletsSection.getRange(tableId); - s.setRange(r); - s.fetchColumnFamily(MetadataSchema.TabletsSection.LogColumnFamily.NAME); + s.setRange(CurrentLogsSection.getRange()); + s.fetchColumnFamily(CurrentLogsSection.COLF); Set<String> wals = new HashSet<String>(); for (Entry<Key,Value> entry : s) { log.debug("Reading WALs: {}={}", entry.getKey().toStringNoTruncate(), entry.getValue()); // hostname:port/uri://path/to/wal - String cq = entry.getKey().getColumnQualifier().toString(); - int index = cq.indexOf('/'); - // Normalize the path - String path = new Path(cq.substring(index + 1)).toString(); + String path = new Path(entry.getKey().getColumnQualifier().toString()).toString(); log.debug("Extracted file: " + path); wals.add(path); } @@ -228,11 +226,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed()); - log.info("Checking to see that log entries are removed from tablet section after MinC"); - // After compaction, the log column should be gone from the tablet - Set<String> walsAfterMinc = getWalsForTable(table); - Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size()); - Set<String> filesForTable = getFilesForTable(table); Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size()); log.info("Files for table before MajC: {}", filesForTable); @@ -258,14 +251,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI fileExists = fs.exists(fileToBeDeleted); } - // At this point in time, we *know* that the GarbageCollector has run which means that the Status - // for our WAL should not be altered. - - log.info("Re-checking that WALs are still not referenced for our table"); - - Set<String> walsAfterMajc = getWalsForTable(table); - Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc, 0, walsAfterMajc.size()); - Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table); Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size()); @@ -326,11 +311,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI Assert.assertEquals("Expected Status for file to not be closed", false, status.getClosed()); - log.info("Checking to see that log entries are removed from tablet section after MinC"); - // After compaction, the log column should be gone from the tablet - Set<String> walsAfterMinc = getWalsForTable(table); - Assert.assertEquals("Expected to find no WALs for tablet", 0, walsAfterMinc.size()); - Set<String> filesForTable = getFilesForTable(table); Assert.assertEquals("Expected to only find one rfile for table", 1, filesForTable.size()); log.info("Files for table before MajC: {}", filesForTable); @@ -359,11 +339,6 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI // At this point in time, we *know* that the GarbageCollector has run which means that the Status // for our WAL should not be altered. - log.info("Re-checking that WALs are still not referenced for our table"); - - Set<String> walsAfterMajc = getWalsForTable(table); - Assert.assertEquals("Expected to find no WALs in tablets section: " + walsAfterMajc, 0, walsAfterMajc.size()); - Map<String,Status> fileToStatusAfterMinc = getMetadataStatusForTable(table); Assert.assertEquals("Expected to still find only one replication status message: " + fileToStatusAfterMinc, 1, fileToStatusAfterMinc.size()); http://git-wip-us.apache.org/repos/asf/accumulo/blob/b2539fb1/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java index 125286f..14812c4 100644 --- a/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java +++ b/test/src/test/java/org/apache/accumulo/test/replication/MultiInstanceReplicationIT.java @@ -146,7 +146,7 @@ public class MultiInstanceReplicationIT extends ConfigurableMacIT { } } - @Test + @Test(timeout = 10 * 60 * 1000) public void dataWasReplicatedToThePeer() throws Exception { MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"), ROOT_PASSWORD);