ACCUMULO-3423 optimize WAL metadata table updates
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3fdd29f5 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3fdd29f5 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3fdd29f5 Branch: refs/heads/master Commit: 3fdd29f5222f9d1d32ca28b5ecf1d740a8d20f87 Parents: ea25e98 Author: Eric C. Newton <eric.new...@gmail.com> Authored: Fri Apr 24 18:15:05 2015 -0400 Committer: Eric C. Newton <eric.new...@gmail.com> Committed: Fri Apr 24 18:18:56 2015 -0400 ---------------------------------------------------------------------- .../client/impl/ReplicationOperationsImpl.java | 4 +- .../org/apache/accumulo/core/conf/Property.java | 4 +- .../accumulo/core/metadata/RootTable.java | 1 + .../core/metadata/schema/MetadataSchema.java | 48 ++ .../core/tabletserver/log/LogEntry.java | 78 ++- .../core/metadata/MetadataTableSchemaTest.java | 47 ++ .../org/apache/accumulo/server/TabletLevel.java | 34 ++ .../apache/accumulo/server/fs/VolumeUtil.java | 22 +- .../apache/accumulo/server/init/Initialize.java | 1 + .../server/master/state/MetaDataStateStore.java | 47 +- .../master/state/MetaDataTableScanner.java | 6 +- .../master/state/TabletLocationState.java | 7 + .../server/master/state/TabletStateStore.java | 16 +- .../master/state/ZooTabletStateStore.java | 35 +- .../accumulo/server/replication/StatusUtil.java | 13 + .../accumulo/server/util/ListVolumesUsed.java | 18 +- .../server/util/MasterMetadataUtil.java | 18 +- .../accumulo/server/util/MetadataTableUtil.java | 239 +++++--- .../server/util/ReplicationTableUtil.java | 13 +- .../server/util/ReplicationTableUtilTest.java | 2 +- .../gc/GarbageCollectWriteAheadLogs.java | 499 +++++++--------- .../accumulo/gc/SimpleGarbageCollector.java | 1 - .../CloseWriteAheadLogReferences.java | 23 +- .../gc/GarbageCollectWriteAheadLogsTest.java | 567 ------------------- .../CloseWriteAheadLogReferencesTest.java | 151 +---- .../java/org/apache/accumulo/master/Master.java | 3 + .../master/MasterClientServiceHandler.java | 3 +- .../accumulo/master/TabletGroupWatcher.java | 37 +- .../accumulo/master/replication/WorkMaker.java | 1 + .../accumulo/master/state/MergeStats.java | 3 +- .../master/ReplicationOperationsImplTest.java | 9 +- .../apache/accumulo/master/TestMergeState.java | 2 +- .../master/state/RootTabletStateStoreTest.java | 4 +- .../src/main/findbugs/exclude-filter.xml | 2 +- .../server/GarbageCollectionLogger.java | 3 +- .../apache/accumulo/tserver/TabletServer.java | 182 +++--- .../apache/accumulo/tserver/log/DfsLogger.java | 14 +- .../accumulo/tserver/log/SortedLogRecovery.java | 8 +- .../tserver/log/TabletServerLogger.java | 187 +++--- .../accumulo/tserver/tablet/CommitSession.java | 3 +- .../tserver/tablet/DatafileManager.java | 4 +- .../apache/accumulo/tserver/tablet/Tablet.java | 59 +- .../tserver/tablet/TabletCommitter.java | 3 +- .../accumulo/tserver/log/LogEntryTest.java | 56 ++ .../test/performance/thrift/NullTserver.java | 6 +- .../accumulo/proxy/ProxyDurabilityIT.java | 9 +- .../test/BadDeleteMarkersCreatedIT.java | 2 +- .../org/apache/accumulo/test/BalanceIT.java | 20 +- .../org/apache/accumulo/test/CleanWalIT.java | 1 + .../accumulo/test/ConditionalWriterIT.java | 1 + .../accumulo/test/GarbageCollectWALIT.java | 81 +++ .../MissingWalHeaderCompletesRecoveryIT.java | 14 +- .../accumulo/test/NoMutationRecoveryIT.java | 178 ------ .../org/apache/accumulo/test/ShellServerIT.java | 2 +- .../org/apache/accumulo/test/UnusedWALIT.java | 144 +++++ .../java/org/apache/accumulo/test/VolumeIT.java | 17 + .../accumulo/test/functional/ReadWriteIT.java | 8 + .../accumulo/test/functional/WALSunnyDayIT.java | 250 ++++++++ .../test/functional/WatchTheWatchCountIT.java | 2 +- .../test/performance/RollWALPerformanceIT.java | 126 +++++ ...bageCollectorCommunicatesWithTServersIT.java | 35 +- .../replication/MultiInstanceReplicationIT.java | 2 +- .../test/replication/ReplicationIT.java | 370 ++++-------- 63 files changed, 1857 insertions(+), 1888 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java index 6a5c74a..925877d 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java @@ -153,9 +153,7 @@ public class ReplicationOperationsImpl implements ReplicationOperations { try { for (Entry<Key,Value> entry : metaBs) { LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue()); - for (String log : logEntry.logSet) { - wals.add(new Path(log).toString()); - } + wals.add(new Path(logEntry.filename).toString()); } } finally { metaBs.close(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 429abad..a5bef0a 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -345,8 +345,8 @@ public enum Property { + "no longer in use are removed from the filesystem."), GC_PORT("gc.port.client", "50091", PropertyType.PORT, "The listening port for the garbage collector's monitor service"), GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT, "The number of threads used to delete files"), - GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN, "Do not use the Trash, even if it is configured"), - GC_FILE_ARCHIVE("gc.file.archive", "false", PropertyType.BOOLEAN, "Archive any files/directories instead of moving to the HDFS trash or deleting"), + GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN, "Do not use the Trash, even if it is configured."), + GC_FILE_ARCHIVE("gc.file.archive", "false", PropertyType.BOOLEAN, "Archive any files/directories instead of moving to the HDFS trash or deleting."), GC_TRACE_PERCENT("gc.trace.percent", "0.01", PropertyType.FRACTION, "Percent of gc cycles to trace"), // properties that are specific to the monitor server behavior http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java b/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java index 292ba3b..97d73d1 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/RootTable.java @@ -41,6 +41,7 @@ public class RootTable { public static final String ZROOT_TABLET_FUTURE_LOCATION = ZROOT_TABLET + "/future_location"; public static final String ZROOT_TABLET_LAST_LOCATION = ZROOT_TABLET + "/lastlocation"; public static final String ZROOT_TABLET_WALOGS = ZROOT_TABLET + "/walogs"; + public static final String ZROOT_TABLET_CURRENT_LOGS = ZROOT_TABLET + "/current_logs"; public static final String ZROOT_TABLET_PATH = ZROOT_TABLET + "/dir"; public static final KeyExtent EXTENT = new KeyExtent(new Text(ID), null, null); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java index 6baae17..c787d6d 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java @@ -16,11 +16,14 @@ */ package org.apache.accumulo.core.metadata.schema; +import static java.nio.charset.StandardCharsets.UTF_8; + import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.data.ArrayByteSequence; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.PartialKey; import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.schema.Section; import org.apache.accumulo.core.util.ColumnFQ; import org.apache.hadoop.io.Text; @@ -278,4 +281,49 @@ public class MetadataSchema { buff.set(buff.getBytes(), section.getRowPrefix().length(), buff.getLength() - section.getRowPrefix().length()); } } + + /** + * Holds references to the WALs in use in a live Tablet Server. + * <p> + * <code>~wal+tserver:port[sessionId] log:hdfs://localhost:8020/accumulo/wal/tserver+port/WAL [] -></code> + */ + public static class CurrentLogsSection { + private static final Section section = new Section(RESERVED_PREFIX + "wal+", true, RESERVED_PREFIX + "wal,", false); + private static byte LEFT_BRACKET = (byte)'['; + public static final Text COLF = new Text("log"); + public static final Value UNUSED = new Value("unused".getBytes(UTF_8)); + + public static Range getRange() { + return section.getRange(); + } + + public static String getRowPrefix() { + return section.getRowPrefix(); + } + + public static void getTabletServer(Key k, Text hostPort, Text session) { + Preconditions.checkNotNull(k); + Preconditions.checkNotNull(hostPort); + Preconditions.checkNotNull(session); + + Text row = new Text(); + k.getRow(row); + if (!row.toString().startsWith(section.getRowPrefix())) { + throw new IllegalArgumentException("Bad key " + k.toString()); + } + for (int sessionStart = section.getRowPrefix().length(); sessionStart < row.getLength() - 1; sessionStart++) { + if (row.charAt(sessionStart) == LEFT_BRACKET) { + hostPort.set(row.getBytes(), section.getRowPrefix().length(), sessionStart - section.getRowPrefix().length()); + session.set(row.getBytes(), sessionStart + 1, row.getLength() - sessionStart - 2); + return; + } + } + throw new IllegalArgumentException("Bad key " + k.toString()); + } + + public static void getPath(Key k, Text path) { + k.getColumnQualifier(path); + } + + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java b/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java index 7fe61d1..ab70bb0 100644 --- a/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java +++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/log/LogEntry.java @@ -16,10 +16,10 @@ */ package org.apache.accumulo.core.tabletserver.log; +import static java.nio.charset.StandardCharsets.UTF_8; + import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; @@ -29,30 +29,29 @@ import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; -import com.google.common.base.Joiner; - public class LogEntry { - public KeyExtent extent; - public long timestamp; - public String server; - public String filename; - public int tabletId; - public Collection<String> logSet; - - public LogEntry() {} + public final KeyExtent extent; + public final long timestamp; + public final String server; + public final String filename; public LogEntry(LogEntry le) { this.extent = le.extent; this.timestamp = le.timestamp; this.server = le.server; this.filename = le.filename; - this.tabletId = le.tabletId; - this.logSet = new ArrayList<String>(le.logSet); + } + + public LogEntry(KeyExtent extent, long timestamp, String server, String filename) { + this.extent = extent; + this.timestamp = timestamp; + this.server = server; + this.filename = filename; } @Override public String toString() { - return extent.toString() + " " + filename + " (" + tabletId + ")"; + return extent.toString() + " " + filename; } public String getName() { @@ -65,43 +64,35 @@ public class LogEntry { out.writeLong(timestamp); out.writeUTF(server); out.writeUTF(filename); - out.write(tabletId); - out.write(logSet.size()); - for (String s : logSet) { - out.writeUTF(s); - } return Arrays.copyOf(out.getData(), out.getLength()); } - public void fromBytes(byte bytes[]) throws IOException { + static public LogEntry fromBytes(byte bytes[]) throws IOException { DataInputBuffer inp = new DataInputBuffer(); inp.reset(bytes, bytes.length); - extent = new KeyExtent(); + KeyExtent extent = new KeyExtent(); extent.readFields(inp); - timestamp = inp.readLong(); - server = inp.readUTF(); - filename = inp.readUTF(); - tabletId = inp.read(); - int count = inp.read(); - ArrayList<String> logSet = new ArrayList<String>(count); - for (int i = 0; i < count; i++) - logSet.add(inp.readUTF()); - this.logSet = logSet; + long timestamp = inp.readLong(); + String server = inp.readUTF(); + String filename = inp.readUTF(); + return new LogEntry(extent, timestamp, server, filename); } static private final Text EMPTY_TEXT = new Text(); public static LogEntry fromKeyValue(Key key, Value value) { - LogEntry result = new LogEntry(); - result.extent = new KeyExtent(key.getRow(), EMPTY_TEXT); + String qualifier = key.getColumnQualifier().toString(); + if (qualifier.indexOf('/') < 1) { + throw new IllegalArgumentException("Bad key for log entry: " + key); + } + KeyExtent extent = new KeyExtent(key.getRow(), EMPTY_TEXT); String[] parts = key.getColumnQualifier().toString().split("/", 2); - result.server = parts[0]; - result.filename = parts[1]; - parts = value.toString().split("\\|"); - result.tabletId = Integer.parseInt(parts[1]); - result.logSet = Arrays.asList(parts[0].split(";")); - result.timestamp = key.getTimestamp(); - return result; + String server = parts[0]; + // handle old-style log entries that specify log sets + parts = value.toString().split("\\|")[0].split(";"); + String filename = parts[parts.length - 1]; + long timestamp = key.getTimestamp(); + return new LogEntry(extent, timestamp, server, filename); } public Text getRow() { @@ -112,11 +103,16 @@ public class LogEntry { return MetadataSchema.TabletsSection.LogColumnFamily.NAME; } + public String getUniqueID() { + String parts[] = filename.split("/"); + return parts[parts.length - 1]; + } + public Text getColumnQualifier() { return new Text(server + "/" + filename); } public Value getValue() { - return new Value((Joiner.on(";").join(logSet) + "|" + tabletId).getBytes()); + return new Value(filename.getBytes(UTF_8)); } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java new file mode 100644 index 0000000..cfe59f2 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.accumulo.core.metadata; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection; +import org.apache.hadoop.io.Text; +import org.junit.Test; + +public class MetadataTableSchemaTest { + + @Test + public void testGetTabletServer() throws Exception { + Key key = new Key("~wal+host:43861[14a7df0e6420003]", "log", "hdfs://localhost:50514/accumulo/wal/host:43861/70c27ab3-6662-40ab-80fb-01c1f1a59df3"); + Text hostPort = new Text(); + Text session = new Text(); + CurrentLogsSection.getTabletServer(key, hostPort, session); + assertEquals("host:43861", hostPort.toString()); + assertEquals("14a7df0e6420003", session.toString()); + try { + Key bogus = new Key("~wal/host:43861[14a7df0e6420003]", "log", "hdfs://localhost:50514/accumulo/wal/host:43861/70c27ab3-6662-40ab-80fb-01c1f1a59df3"); + CurrentLogsSection.getTabletServer(bogus, hostPort, session); + fail("bad argument not thrown"); + } catch (IllegalArgumentException ex) { + + } + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java b/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java new file mode 100644 index 0000000..91e5ee9 --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/TabletLevel.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server; + +import org.apache.accumulo.core.data.KeyExtent; + +public enum TabletLevel { + ROOT, + META, + NORMAL; + + public static TabletLevel getLevel(KeyExtent extent) { + if (!extent.isMeta()) + return NORMAL; + if (extent.isRootTablet()) + return ROOT; + return META; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java index c3595cd..4722e60 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java @@ -128,15 +128,12 @@ public class VolumeUtil { switchedPath = le.filename; ArrayList<String> switchedLogs = new ArrayList<String>(); - for (String log : le.logSet) { - String switchedLog = switchVolume(le.filename, FileType.WAL, replacements); - if (switchedLog != null) { - switchedLogs.add(switchedLog); - numSwitched++; - } else { - switchedLogs.add(log); - } - + String switchedLog = switchVolume(le.filename, FileType.WAL, replacements); + if (switchedLog != null) { + switchedLogs.add(switchedLog); + numSwitched++; + } else { + switchedLogs.add(le.filename); } if (numSwitched == 0) { @@ -144,9 +141,7 @@ public class VolumeUtil { return null; } - LogEntry newLogEntry = new LogEntry(le); - newLogEntry.filename = switchedPath; - newLogEntry.logSet = switchedLogs; + LogEntry newLogEntry = new LogEntry(le.extent, le.timestamp, le.server, switchedPath); log.trace("Switched " + le + " to " + newLogEntry); @@ -244,7 +239,7 @@ public class VolumeUtil { log.debug("Tablet directory switched, need to record old log files " + logsToRemove + " " + ProtobufUtil.toString(status)); // Before deleting these logs, we need to mark them for replication for (LogEntry logEntry : logsToRemove) { - ReplicationTableUtil.updateFiles(context, extent, logEntry.logSet, status); + ReplicationTableUtil.updateFiles(context, extent, logEntry.filename, status); } } } @@ -253,7 +248,6 @@ public class VolumeUtil { // method this should return the exact strings that are in the metadata table return ret; - } private static String decommisionedTabletDir(AccumuloServerContext context, ZooLock zooLock, VolumeManager vm, KeyExtent extent, String metaDir) http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java index c6f1dd8..9afb93f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java @@ -533,6 +533,7 @@ public class Initialize implements KeywordExecutable { zoo.putPersistentData(zkInstanceRoot + Constants.ZPROBLEMS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_WALOGS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_CURRENT_LOGS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_PATH, rootTabletDir.getBytes(UTF_8), NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTERS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_LOCK, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java index 7ee6f0c..c154bd0 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java @@ -17,6 +17,9 @@ package org.apache.accumulo.server.master.state; import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.BatchWriter; @@ -27,9 +30,14 @@ import org.apache.accumulo.core.client.impl.ClientContext; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.server.AccumuloServerContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; public class MetaDataStateStore extends TabletStateStore { + private static final Logger log = Logger.getLogger(MetaDataStateStore.class); private static final int THREADS = 4; private static final int LATENCY = 1000; @@ -59,7 +67,7 @@ public class MetaDataStateStore extends TabletStateStore { @Override public ClosableIterator<TabletLocationState> iterator() { - return new MetaDataTableScanner(context, MetadataSchema.TabletsSection.getRange(), state); + return new MetaDataTableScanner(context, MetadataSchema.TabletsSection.getRange(), state, targetTableName); } @Override @@ -116,7 +124,7 @@ public class MetaDataStateStore extends TabletStateStore { } @Override - public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException { + public void unassign(Collection<TabletLocationState> tablets, Map<TServerInstance, List<Path>> logsForDeadServers) throws DistributedStoreException { BatchWriter writer = createBatchWriter(); try { @@ -124,6 +132,15 @@ public class MetaDataStateStore extends TabletStateStore { Mutation m = new Mutation(tls.extent.getMetadataEntry()); if (tls.current != null) { tls.current.clearLocation(m); + if (logsForDeadServers != null) { + List<Path> logs = logsForDeadServers.get(tls.current); + if (logs != null) { + for (Path log : logs) { + LogEntry entry = new LogEntry(tls.extent, 0, tls.current.hostPort(), log.toString()); + m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue()); + } + } + } } if (tls.future != null) { tls.future.clearFutureLocation(m); @@ -145,4 +162,30 @@ public class MetaDataStateStore extends TabletStateStore { public String name() { return "Normal Tablets"; } + + @Override + public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<Path>> logs) throws DistributedStoreException { + BatchWriter writer = createBatchWriter(); + try { + for (Entry<TServerInstance,List<Path>> entry : logs.entrySet()) { + if (entry.getValue().isEmpty()) { + continue; + } + Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + entry.getKey().toString()); + for (Path log : entry.getValue()) { + m.put(MetadataSchema.CurrentLogsSection.COLF, new Text(log.toString()), MetadataSchema.CurrentLogsSection.UNUSED); + } + writer.addMutation(m); + } + } catch (Exception ex) { + log.error("Error marking logs as unused: " + logs); + throw new DistributedStoreException(ex); + } finally { + try { + writer.close(); + } catch (MutationsRejectedException e) { + throw new DistributedStoreException(e); + } + } + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java index d64c108..bec2dc4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java @@ -141,6 +141,7 @@ public class MetaDataTableScanner implements ClosableIterator<TabletLocationStat boolean chopped = false; for (Entry<Key,Value> entry : decodedRow.entrySet()) { + Key key = entry.getKey(); Text row = key.getRow(); Text cf = key.getColumnFamily(); @@ -173,8 +174,9 @@ public class MetaDataTableScanner implements ClosableIterator<TabletLocationStat } } if (extent == null) { - log.warn("No prev-row for key extent: " + decodedRow); - return null; + String msg = "No prev-row for key extent " + decodedRow; + log.error(msg); + throw new BadLocationStateException(msg, k.getRow()); } return new TabletLocationState(extent, future, current, last, walogs, chopped); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java index fb30440..8116ecf 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletLocationState.java @@ -68,6 +68,13 @@ public class TabletLocationState { final public Collection<Collection<String>> walogs; final public boolean chopped; + public TServerInstance futureOrCurrent() { + if (current != null) { + return current; + } + return future; + } + @Override public String toString() { return extent + "@(" + future + "," + current + "," + last + ")" + (chopped ? " chopped" : ""); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java index 5413e31..acc10d8 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java @@ -18,8 +18,11 @@ package org.apache.accumulo.server.master.state; import java.util.Collection; import java.util.Collections; +import java.util.List; +import java.util.Map; import org.apache.accumulo.server.AccumuloServerContext; +import org.apache.hadoop.fs.Path; /** * Interface for storing information about tablet assignments. There are three implementations: @@ -56,10 +59,12 @@ public abstract class TabletStateStore implements Iterable<TabletLocationState> * * @param tablets * the tablets' current information + * @param logsForDeadServers + * a cache of logs in use by servers when they died */ - abstract public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException; + abstract public void unassign(Collection<TabletLocationState> tablets, Map<TServerInstance, List<Path>> logsForDeadServers) throws DistributedStoreException; - public static void unassign(AccumuloServerContext context, TabletLocationState tls) throws DistributedStoreException { + public static void unassign(AccumuloServerContext context, TabletLocationState tls, Map<TServerInstance, List<Path>> logsForDeadServers) throws DistributedStoreException { TabletStateStore store; if (tls.extent.isRootTablet()) { store = new ZooTabletStateStore(); @@ -68,7 +73,7 @@ public abstract class TabletStateStore implements Iterable<TabletLocationState> } else { store = new MetaDataStateStore(context); } - store.unassign(Collections.singletonList(tls)); + store.unassign(Collections.singletonList(tls), logsForDeadServers); } public static void setLocation(AccumuloServerContext context, Assignment assignment) throws DistributedStoreException { @@ -83,4 +88,9 @@ public abstract class TabletStateStore implements Iterable<TabletLocationState> store.setLocations(Collections.singletonList(assignment)); } + /** + * When a server fails, its logs must be marked as unused after the log markers are moved to the tablets. + */ + abstract public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance, List<Path>> logs) throws DistributedStoreException; + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java index ab99396..bce20fd 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java @@ -21,12 +21,17 @@ import static java.nio.charset.StandardCharsets.UTF_8; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.tabletserver.log.LogEntry; +import org.apache.accumulo.server.AccumuloServerContext; import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,10 +90,9 @@ public class ZooTabletStateStore extends TabletStateStore { for (String entry : store.getChildren(RootTable.ZROOT_TABLET_WALOGS)) { byte[] logInfo = store.get(RootTable.ZROOT_TABLET_WALOGS + "/" + entry); if (logInfo != null) { - LogEntry logEntry = new LogEntry(); - logEntry.fromBytes(logInfo); - logs.add(logEntry.logSet); - log.debug("root tablet logSet " + logEntry.logSet); + LogEntry logEntry = LogEntry.fromBytes(logInfo); + logs.add(Collections.singleton(logEntry.filename)); + log.debug("root tablet log " + logEntry.filename); } } TabletLocationState result = new TabletLocationState(RootTable.EXTENT, futureSession, currentSession, lastSession, logs, false); @@ -161,12 +165,28 @@ public class ZooTabletStateStore extends TabletStateStore { } @Override - public void unassign(Collection<TabletLocationState> tablets) throws DistributedStoreException { + public void unassign(Collection<TabletLocationState> tablets, Map<TServerInstance, List<Path>> logsForDeadServers) throws DistributedStoreException { if (tablets.size() != 1) throw new IllegalArgumentException("There is only one root tablet"); TabletLocationState tls = tablets.iterator().next(); if (tls.extent.compareTo(RootTable.EXTENT) != 0) throw new IllegalArgumentException("You can only store the root tablet location"); + if (logsForDeadServers != null) { + List<Path> logs = logsForDeadServers.get(tls.futureOrCurrent()); + if (logs != null) { + for (Path entry : logs) { + LogEntry logEntry = new LogEntry(RootTable.EXTENT, System.currentTimeMillis(), tls.futureOrCurrent().getLocation().toString(), entry.toString()); + byte[] value; + try { + value = logEntry.toBytes(); + } catch (IOException ex) { + throw new DistributedStoreException(ex); + } + store.put(RootTable.ZROOT_TABLET_WALOGS + "/" + logEntry.getUniqueID(), value); + store.remove(RootTable.ZROOT_TABLET_CURRENT_LOGS + "/" + MetadataSchema.CurrentLogsSection.getRowPrefix() + tls.current.toString() + logEntry.getUniqueID()); + } + } + } store.remove(RootTable.ZROOT_TABLET_LOCATION); store.remove(RootTable.ZROOT_TABLET_FUTURE_LOCATION); log.debug("unassign root tablet location"); @@ -177,4 +197,9 @@ public class ZooTabletStateStore extends TabletStateStore { return "Root Table"; } + @Override + public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<Path>> logs) { + // the root table is not replicated, so unassigning the root tablet has removed the current log marker + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java index 898e3d4..d72eea2 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusUtil.java @@ -153,6 +153,19 @@ public class StatusUtil { /** * @return A {@link Status} for an open file of unspecified length, all of which needs replicating. */ + public static Status openWithUnknownLength(long timeCreated) { + Builder builder = Status.newBuilder(); + builder.setBegin(0); + builder.setEnd(0); + builder.setInfiniteEnd(true); + builder.setClosed(false); + builder.setCreatedTime(timeCreated); + return builder.build(); + } + + /** + * @return A {@link Status} for an open file of unspecified length, all of which needs replicating. + */ public static Status openWithUnknownLength() { return INF_END_REPLICATION_STATUS; } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java index e90d1dd..9e3fc7d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java @@ -35,6 +35,7 @@ import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.conf.ServerConfigurationFactory; import org.apache.accumulo.server.fs.VolumeManager.FileType; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; /** * @@ -61,9 +62,6 @@ public class ListVolumesUsed { private static void getLogURIs(TreeSet<String> volumes, LogEntry logEntry) { volumes.add(getLogURI(logEntry.filename)); - for (String logSet : logEntry.logSet) { - volumes.add(getLogURI(logSet)); - } } private static void listZookeeper() throws Exception { @@ -123,6 +121,20 @@ public class ListVolumesUsed { for (String volume : volumes) System.out.println("\tVolume : " + volume); + + volumes.clear(); + scanner.clearColumns(); + scanner.setRange(MetadataSchema.CurrentLogsSection.getRange()); + Text path = new Text(); + for (Entry<Key,Value> entry : scanner) { + MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path); + volumes.add(getLogURI(path.toString())); + } + + System.out.println("Listing volumes referenced in " + name + " current logs section"); + + for (String volume : volumes) + System.out.println("\tVolume : " + volume); } public static void listVolumes(ClientContext context) throws Exception { http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java index 14eba68..4a5650e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java @@ -248,35 +248,27 @@ public class MasterMetadataUtil { if (unusedWalLogs != null) { updateRootTabletDataFile(extent, path, mergeFile, dfv, time, filesInUseByScans, address, zooLock, unusedWalLogs, lastLocation, flushId); } - return; } - Mutation m = getUpdateForTabletDataFile(extent, path, mergeFile, dfv, time, filesInUseByScans, address, zooLock, unusedWalLogs, lastLocation, flushId); - MetadataTableUtil.update(context, zooLock, m, extent); - } /** * Update the data file for the root tablet */ - protected static void updateRootTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time, + private static void updateRootTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time, Set<FileRef> filesInUseByScans, String address, ZooLock zooLock, Set<String> unusedWalLogs, TServerInstance lastLocation, long flushId) { IZooReaderWriter zk = ZooReaderWriter.getInstance(); - // unusedWalLogs will contain the location/name of each log in a log set - // the log set is stored under one of the log names, but not both - // find the entry under one of the names and delete it. String root = MetadataTableUtil.getZookeeperLogLocation(); - boolean foundEntry = false; for (String entry : unusedWalLogs) { String[] parts = entry.split("/"); String zpath = root + "/" + parts[parts.length - 1]; while (true) { try { if (zk.exists(zpath)) { + log.debug("Removing WAL reference for root table " + zpath); zk.recursiveDelete(zpath, NodeMissingPolicy.SKIP); - foundEntry = true; } break; } catch (KeeperException e) { @@ -287,16 +279,15 @@ public class MasterMetadataUtil { UtilWaitThread.sleep(1000); } } - if (unusedWalLogs.size() > 0 && !foundEntry) - log.warn("WALog entry for root tablet did not exist " + unusedWalLogs); } + /** * Create an update that updates a tablet * * @return A Mutation to update a tablet from the given information */ - protected static Mutation getUpdateForTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time, + private static Mutation getUpdateForTabletDataFile(KeyExtent extent, FileRef path, FileRef mergeFile, DataFileValue dfv, String time, Set<FileRef> filesInUseByScans, String address, ZooLock zooLock, Set<String> unusedWalLogs, TServerInstance lastLocation, long flushId) { Mutation m = new Mutation(extent.getMetadataEntry()); @@ -324,6 +315,7 @@ public class MasterMetadataUtil { TabletsSection.ServerColumnFamily.FLUSH_COLUMN.put(m, new Value(Long.toString(flushId).getBytes(UTF_8))); + return m; } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java index 5e74aac..4470c55 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java @@ -23,8 +23,6 @@ import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSec import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -61,6 +59,7 @@ import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily; @@ -82,10 +81,12 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.accumulo.server.AccumuloServerContext; import org.apache.accumulo.server.ServerConstants; +import org.apache.accumulo.server.TabletLevel; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.fs.FileRef; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManagerImpl; +import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.tablets.TabletTime; import org.apache.accumulo.server.zookeeper.ZooLock; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; @@ -121,7 +122,7 @@ public class MetadataTableUtil { return metadataTable; } - private synchronized static Writer getRootTable(ClientContext context) { + public synchronized static Writer getRootTable(ClientContext context) { Credentials credentials = context.getCredentials(); Writer rootTable = root_tables.get(credentials); if (rootTable == null) { @@ -223,7 +224,7 @@ public class MetadataTableUtil { // add before removing in case of process death for (LogEntry logEntry : logsToAdd) - addLogEntry(context, logEntry, zooLock); + addRootLogEntry(context, zooLock, logEntry); removeUnusedWALEntries(context, extent, logsToRemove, zooLock); } else { @@ -248,6 +249,35 @@ public class MetadataTableUtil { } } + private static interface ZooOperation { + void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException; + } + + private static void retryZooKeeperUpdate(ClientContext context, ZooLock zooLock, ZooOperation op) { + while (true) { + try { + IZooReaderWriter zoo = ZooReaderWriter.getInstance(); + if (zoo.isLockHeld(zooLock.getLockID())) { + op.run(zoo); + } + break; + } catch (Exception e) { + log.error("Unexpected exception {}", e.getMessage(), e); + } + UtilWaitThread.sleep(1000); + } + } + + private static void addRootLogEntry(AccumuloServerContext context, ZooLock zooLock, final LogEntry entry) { + retryZooKeeperUpdate(context, zooLock, new ZooOperation() { + @Override + public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException { + String root = getZookeeperLogLocation(); + rw.putPersistentData(root + "/" + entry.getUniqueID(), entry.toBytes(), NodeExistsPolicy.OVERWRITE); + } + }); + } + public static SortedMap<FileRef,DataFileValue> getDataFileSizes(KeyExtent extent, ClientContext context) throws IOException { TreeMap<FileRef,DataFileValue> sizes = new TreeMap<FileRef,DataFileValue>(); @@ -447,34 +477,6 @@ public class MetadataTableUtil { return ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_WALOGS; } - public static void addLogEntry(ClientContext context, LogEntry entry, ZooLock zooLock) { - if (entry.extent.isRootTablet()) { - String root = getZookeeperLogLocation(); - while (true) { - try { - IZooReaderWriter zoo = ZooReaderWriter.getInstance(); - if (zoo.isLockHeld(zooLock.getLockID())) { - String[] parts = entry.filename.split("/"); - String uniqueId = parts[parts.length - 1]; - zoo.putPersistentData(root + "/" + uniqueId, entry.toBytes(), NodeExistsPolicy.OVERWRITE); - } - break; - } catch (KeeperException e) { - log.error("{}", e.getMessage(), e); - } catch (InterruptedException e) { - log.error("{}", e.getMessage(), e); - } catch (IOException e) { - log.error("{}", e.getMessage(), e); - } - UtilWaitThread.sleep(1000); - } - } else { - Mutation m = new Mutation(entry.getRow()); - m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue()); - update(context, zooLock, m, entry.extent); - } - } - public static void setRootTabletDir(String dir) throws IOException { IZooReaderWriter zoo = ZooReaderWriter.getInstance(); String zpath = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_PATH; @@ -565,22 +567,11 @@ public class MetadataTableUtil { } } - Collections.sort(result, new Comparator<LogEntry>() { - @Override - public int compare(LogEntry o1, LogEntry o2) { - long diff = o1.timestamp - o2.timestamp; - if (diff < 0) - return -1; - if (diff > 0) - return 1; - return 0; - } - }); log.info("Returning logs " + result + " for extent " + extent); return result; } - static void getRootLogEntries(ArrayList<LogEntry> result) throws KeeperException, InterruptedException, IOException { + static void getRootLogEntries(final ArrayList<LogEntry> result) throws KeeperException, InterruptedException, IOException { IZooReaderWriter zoo = ZooReaderWriter.getInstance(); String root = getZookeeperLogLocation(); // there's a little race between getting the children and fetching @@ -588,11 +579,10 @@ public class MetadataTableUtil { while (true) { result.clear(); for (String child : zoo.getChildren(root)) { - LogEntry e = new LogEntry(); try { - e.fromBytes(zoo.getData(root + "/" + child, null)); + LogEntry e = LogEntry.fromBytes(zoo.getData(root + "/" + child, null)); // upgrade from !0;!0<< -> +r<< - e.extent = RootTable.EXTENT; + e = new LogEntry(RootTable.EXTENT, 0, e.server, e.filename); result.add(e); } catch (KeeperException.NoNodeException ex) { continue; @@ -662,28 +652,23 @@ public class MetadataTableUtil { return new LogEntryIterator(context); } - public static void removeUnusedWALEntries(AccumuloServerContext context, KeyExtent extent, List<LogEntry> logEntries, ZooLock zooLock) { + public static void removeUnusedWALEntries(AccumuloServerContext context, KeyExtent extent, final List<LogEntry> entries, ZooLock zooLock) { if (extent.isRootTablet()) { - for (LogEntry entry : logEntries) { - String root = getZookeeperLogLocation(); - while (true) { - try { - IZooReaderWriter zoo = ZooReaderWriter.getInstance(); - if (zoo.isLockHeld(zooLock.getLockID())) { - String parts[] = entry.filename.split("/"); - zoo.recursiveDelete(root + "/" + parts[parts.length - 1], NodeMissingPolicy.SKIP); - } - break; - } catch (Exception e) { - log.error("{}", e.getMessage(), e); + retryZooKeeperUpdate(context, zooLock, new ZooOperation() { + @Override + public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException { + String root = getZookeeperLogLocation(); + for (LogEntry entry : entries) { + String path = root + "/" + entry.getUniqueID(); + log.debug("Removing " + path + " from zookeeper"); + rw.recursiveDelete(path, NodeMissingPolicy.SKIP); } - UtilWaitThread.sleep(1000); } - } + }); } else { Mutation m = new Mutation(extent.getMetadataEntry()); - for (LogEntry entry : logEntries) { - m.putDelete(LogColumnFamily.NAME, new Text(entry.getName())); + for (LogEntry entry : entries) { + m.putDelete(entry.getColumnFamily(), entry.getColumnQualifier()); } update(context, zooLock, m, extent); } @@ -1068,4 +1053,130 @@ public class MetadataTableUtil { return tabletEntries; } + public static void addNewLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final Path filename, TabletLevel level) { + log.debug("Adding log entry " + filename); + if (level == TabletLevel.ROOT) { + retryZooKeeperUpdate(context, zooLock, new ZooOperation() { + @Override + public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException { + String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS; + String uniqueId = filename.getName(); + StringBuilder path = new StringBuilder(root); + path.append("/"); + path.append(CurrentLogsSection.getRowPrefix()); + path.append(tabletSession.toString()); + path.append(uniqueId); + rw.putPersistentData(path.toString(), filename.toString().getBytes(UTF_8), NodeExistsPolicy.OVERWRITE); + } + }); + } else { + Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + tabletSession.toString()); + m.put(CurrentLogsSection.COLF, new Text(filename.toString()), new Value(EMPTY_BYTES)); + String tableName = MetadataTable.NAME; + if (level == TabletLevel.META) { + tableName = RootTable.NAME; + } + BatchWriter bw = null; + try { + bw = context.getConnector().createBatchWriter(tableName, null); + bw.addMutation(m); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + if (bw != null) { + try { + bw.close(); + } catch (Exception e2) { + throw new RuntimeException(e2); + } + } + } + } + } + + private static void removeCurrentRootLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final Path filename) { + retryZooKeeperUpdate(context, zooLock, new ZooOperation() { + @Override + public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException { + String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS; + String uniqueId = filename.getName(); + String path = root + "/" + CurrentLogsSection.getRowPrefix() + tabletSession.toString() + uniqueId; + log.debug("Removing entry " + path + " from zookeeper"); + rw.recursiveDelete(path, NodeMissingPolicy.SKIP); + } + }); + } + + public static void markLogUnused(ClientContext context, ZooLock lock, TServerInstance tabletSession, Set<Path> all) throws AccumuloException { + // There could be a marker at the meta and/or root level, mark them both as unused + try { + BatchWriter root = null; + BatchWriter meta = null; + try { + root = context.getConnector().createBatchWriter(RootTable.NAME, null); + meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null); + for (Path fname : all) { + Text tname = new Text(fname.toString()); + Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString()); + m.putDelete(MetadataSchema.CurrentLogsSection.COLF, tname); + root.addMutation(m); + log.debug("deleting " + MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString() + " log:" + fname); + m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString()); + m.put(MetadataSchema.CurrentLogsSection.COLF, tname, MetadataSchema.CurrentLogsSection.UNUSED); + meta.addMutation(m); + removeCurrentRootLogMarker(context, lock, tabletSession, fname); + } + } finally { + if (root != null) { + root.close(); + } + if (meta != null) { + meta.close(); + } + } + } catch (Exception ex) { + throw new AccumuloException(ex); + } + } + + public static void fetchLogsForDeadServer(ClientContext context, ZooLock lock, KeyExtent extent, TServerInstance server, Map<TServerInstance,List<Path>> logsForDeadServers) + throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + // already cached + if (logsForDeadServers.containsKey(server)) { + return; + } + if (extent.isRootTablet()) { + final List<Path> logs = new ArrayList<>(); + retryZooKeeperUpdate(context, lock, new ZooOperation() { + @Override + public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException { + String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS; + logs.clear(); + for (String child : rw.getChildren(root)) { + logs.add(new Path(new String(rw.getData(root + "/" + child, null), UTF_8))); + } + } + }); + logsForDeadServers.put(server, logs); + } else { + // use the correct meta table + String table = MetadataTable.NAME; + if (extent.isMeta()) { + table = RootTable.NAME; + } + // fetch the current logs in use, and put them in the cache + Scanner scanner = context.getConnector().createScanner(table, Authorizations.EMPTY); + scanner.setRange(new Range(MetadataSchema.CurrentLogsSection.getRowPrefix() + server.toString())); + List<Path> logs = new ArrayList<>(); + Text path = new Text(); + for (Entry<Key,Value> entry : scanner) { + MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path); + if (!entry.getValue().equals(MetadataSchema.CurrentLogsSection.UNUSED)) { + logs.add(new Path(path.toString())); + } + } + logsForDeadServers.put(server, logs); + } + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java index 8e755a3..c6d5ce4 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/ReplicationTableUtil.java @@ -16,7 +16,6 @@ */ package org.apache.accumulo.server.util; -import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -176,20 +175,14 @@ public class ReplicationTableUtil { /** * Write replication ingest entries for each provided file with the given {@link Status}. */ - public static void updateFiles(ClientContext context, KeyExtent extent, Collection<String> files, Status stat) { + public static void updateFiles(ClientContext context, KeyExtent extent, String file, Status stat) { if (log.isDebugEnabled()) { - log.debug("Updating replication status for " + extent + " with " + files + " using " + ProtobufUtil.toString(stat)); + log.debug("Updating replication status for " + extent + " with " + file + " using " + ProtobufUtil.toString(stat)); } // TODO could use batch writer, would need to handle failure and retry like update does - ACCUMULO-1294 - if (files.isEmpty()) { - return; - } Value v = ProtobufUtil.toValue(stat); - for (String file : files) { - // TODO Can preclude this addition if the extent is for a table we don't need to replicate - update(context, createUpdateMutation(new Path(file), v, extent), extent); - } + update(context, createUpdateMutation(new Path(file), v, extent), extent); } static Mutation createUpdateMutation(Path file, Value v, KeyExtent extent) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/3fdd29f5/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java index 3983bde..04a83d3 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java @@ -91,7 +91,7 @@ public class ReplicationTableUtilTest { String myFile = "file:////home/user/accumulo/wal/server+port/" + uuid; long createdTime = System.currentTimeMillis(); - ReplicationTableUtil.updateFiles(context, new KeyExtent(new Text("1"), null, null), Collections.singleton(myFile), StatusUtil.fileCreated(createdTime)); + ReplicationTableUtil.updateFiles(context, new KeyExtent(new Text("1"), null, null), myFile, StatusUtil.fileCreated(createdTime)); verify(writer);