ACCUMULO-3147 Create replication table at initialize, upgrade Migrate to all replication code to a built-in system-managed replication table instead of the user-space one. Ensure all replication-related tests pass. Also fix a small bug in Mock which threw an exception when using deleteRows with null start and end rows.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e840549a Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e840549a Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e840549a Branch: refs/heads/master Commit: e840549a9c6376c8c5dbe9663e14b4987c10dc5e Parents: f1a47b3 Author: Christopher Tubbs <ctubb...@apache.org> Authored: Sat Nov 1 18:53:14 2014 -0400 Committer: Christopher Tubbs <ctubb...@apache.org> Committed: Thu Nov 6 18:39:55 2014 -0500 ---------------------------------------------------------------------- .../client/impl/ReplicationOperationsImpl.java | 5 +- .../accumulo/core/client/mock/MockAccumulo.java | 2 + .../core/client/mock/MockTableOperations.java | 8 +- .../replication/PrintReplicationRecords.java | 2 +- .../core/replication/ReplicationTable.java | 51 ++++- .../ReplicationTableOfflineException.java | 32 +++ .../ReplicationOperationsImplTest.java | 22 +-- .../apache/accumulo/server/ServerConstants.java | 8 +- .../server/constraints/MetadataConstraints.java | 119 ++++++----- .../apache/accumulo/server/init/Initialize.java | 174 ++++++++++------ .../server/replication/ReplicationUtil.java | 149 +------------- .../server/security/SecurityOperation.java | 3 +- .../accumulo/server/util/MetadataTableUtil.java | 24 ++- .../replication/ReplicationTableTest.java | 133 ------------- .../gc/GarbageCollectWriteAheadLogs.java | 40 ++-- .../accumulo/gc/SimpleGarbageCollector.java | 3 +- .../CloseWriteAheadLogReferences.java | 14 +- .../gc/GarbageCollectWriteAheadLogsTest.java | 9 +- .../CloseWriteAheadLogReferencesTest.java | 14 +- .../accumulo/master/FateServiceHandler.java | 3 +- .../java/org/apache/accumulo/master/Master.java | 21 +- .../master/metrics/ReplicationMetrics.java | 5 +- .../DistributedWorkQueueWorkAssigner.java | 8 +- .../master/replication/FinishedWorkUpdater.java | 10 +- .../RemoveCompleteReplicationRecords.java | 8 +- .../master/replication/StatusMaker.java | 16 +- .../accumulo/master/replication/WorkMaker.java | 10 +- .../accumulo/master/util/TableValidators.java | 26 ++- .../replication/FinishedWorkUpdaterTest.java | 7 - .../RemoveCompleteReplicationRecordsTest.java | 8 +- .../replication/SequentialWorkAssignerTest.java | 15 +- .../replication/UnorderedWorkAssignerTest.java | 10 +- .../master/replication/WorkMakerTest.java | 15 +- .../monitor/servlets/ReplicationServlet.java | 5 - .../replication/ReplicationProcessor.java | 6 +- .../apache/accumulo/test/LargeSplitRowIT.java | 5 +- .../test/MasterRepairsDualAssignmentIT.java | 8 +- .../accumulo/test/MetaConstraintRetryIT.java | 7 +- .../org/apache/accumulo/test/NamespacesIT.java | 6 +- .../org/apache/accumulo/test/ShellServerIT.java | 17 +- .../apache/accumulo/test/WaitForBalanceIT.java | 14 +- .../replication/MultiInstanceReplicationIT.java | 19 +- .../test/replication/ReplicationIT.java | 198 +++++++++++++------ .../test/replication/StatusCombinerMacTest.java | 19 +- .../UnorderedWorkAssignerReplicationIT.java | 11 +- 45 files changed, 612 insertions(+), 677 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java index a8f3f21..cd57c2f 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java @@ -61,7 +61,7 @@ import org.slf4j.LoggerFactory; import com.google.protobuf.InvalidProtocolBufferException; /** - * + * */ public class ReplicationOperationsImpl implements ReplicationOperations { private static final Logger log = LoggerFactory.getLogger(ReplicationOperationsImpl.class); @@ -216,9 +216,6 @@ public class ReplicationOperationsImpl implements ReplicationOperations { protected Text getTableId(Connector conn, String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { TableOperations tops = conn.tableOperations(); - while (!tops.exists(ReplicationTable.NAME)) { - UtilWaitThread.sleep(200); - } if (!conn.tableOperations().exists(tableName)) { throw new TableNotFoundException(null, tableName, null); http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/core/src/main/java/org/apache/accumulo/core/client/mock/MockAccumulo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockAccumulo.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockAccumulo.java index 32dbb28..66035f7 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockAccumulo.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockAccumulo.java @@ -31,6 +31,7 @@ import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.NamespacePermission; import org.apache.accumulo.core.security.SystemPermission; @@ -58,6 +59,7 @@ public class MockAccumulo { namespaces.put(Namespaces.ACCUMULO_NAMESPACE, new MockNamespace()); createTable("root", RootTable.NAME, true, TimeType.LOGICAL); createTable("root", MetadataTable.NAME, true, TimeType.LOGICAL); + createTable("root", ReplicationTable.NAME, true, TimeType.LOGICAL); } public FileSystem getFileSystem() { http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java index 08750fe..59afc8b 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java @@ -38,8 +38,8 @@ import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.DiskUsage; import org.apache.accumulo.core.client.admin.FindMax; -import org.apache.accumulo.core.client.impl.TableOperationsHelper; import org.apache.accumulo.core.client.admin.TimeType; +import org.apache.accumulo.core.client.impl.TableOperationsHelper; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.data.Key; @@ -376,6 +376,10 @@ class MockTableOperations extends TableOperationsHelper { throw new TableNotFoundException(tableName, tableName, ""); MockTable t = acu.tables.get(tableName); Text startText = start != null ? new Text(start) : new Text(); + if (startText.getLength() == 0 && end == null) { + t.table.clear(); + return; + } Text endText = end != null ? new Text(end) : new Text(t.table.lastKey().getRow().getBytes()); startText.append(ZERO, 0, 1); endText.append(ZERO, 0, 1); @@ -442,7 +446,7 @@ class MockTableOperations extends TableOperationsHelper { try { AccumuloVFSClassLoader.loadClass(className, Class.forName(asTypeName)); } catch (ClassNotFoundException e) { - log.warn("Could not load class '"+className+"' with type name '"+asTypeName+"' in testClassLoad().", e); + log.warn("Could not load class '" + className + "' with type name '" + asTypeName + "' in testClassLoad().", e); return false; } return true; http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java b/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java index ee606b5..35b6df6 100644 --- a/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java +++ b/core/src/main/java/org/apache/accumulo/core/replication/PrintReplicationRecords.java @@ -37,7 +37,7 @@ import org.slf4j.LoggerFactory; import com.google.protobuf.InvalidProtocolBufferException; /** - * + * */ public class PrintReplicationRecords implements Runnable { private static final Logger log = LoggerFactory.getLogger(PrintReplicationRecords.class); http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java index 2736762..b6d343f 100644 --- a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java +++ b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTable.java @@ -20,12 +20,18 @@ import java.util.Collections; import java.util.Map; import java.util.Set; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.TableOfflineException; +import org.apache.accumulo.core.client.impl.Namespaces; +import org.apache.accumulo.core.client.impl.Tables; +import org.apache.accumulo.core.master.state.tables.TableState; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; import org.apache.accumulo.core.security.Authorizations; @@ -35,7 +41,8 @@ import com.google.common.collect.ImmutableMap; public class ReplicationTable { - public static final String NAME = "replication"; + public static final String ID = "+rep"; + public static final String NAME = Namespaces.ACCUMULO_NAMESPACE + ".replication"; public static final String COMBINER_NAME = "statuscombiner"; @@ -46,16 +53,46 @@ public class ReplicationTable { public static final Map<String,Set<Text>> LOCALITY_GROUPS = ImmutableMap.of(STATUS_LG_NAME, STATUS_LG_COLFAMS, WORK_LG_NAME, WORK_LG_COLFAMS); public static final String STATUS_FORMATTER_CLASS_NAME = StatusFormatter.class.getName(); - public static Scanner getScanner(Connector conn) throws TableNotFoundException { - return conn.createScanner(NAME, Authorizations.EMPTY); + public static Scanner getScanner(Connector conn) throws ReplicationTableOfflineException { + try { + return conn.createScanner(NAME, Authorizations.EMPTY); + } catch (TableNotFoundException e) { + throw new AssertionError(NAME + " should exist, but doesn't."); + } catch (TableOfflineException e) { + throw new ReplicationTableOfflineException(e); + } } - public static BatchWriter getBatchWriter(Connector conn) throws TableNotFoundException { - return conn.createBatchWriter(NAME, new BatchWriterConfig()); + public static BatchWriter getBatchWriter(Connector conn) throws ReplicationTableOfflineException { + try { + return conn.createBatchWriter(NAME, new BatchWriterConfig()); + } catch (TableNotFoundException e) { + throw new AssertionError(NAME + " should exist, but doesn't."); + } catch (TableOfflineException e) { + throw new ReplicationTableOfflineException(e); + } } - public static BatchScanner getBatchScanner(Connector conn, int queryThreads) throws TableNotFoundException { - return conn.createBatchScanner(NAME, Authorizations.EMPTY, queryThreads); + public static BatchScanner getBatchScanner(Connector conn, int queryThreads) throws ReplicationTableOfflineException { + try { + return conn.createBatchScanner(NAME, Authorizations.EMPTY, queryThreads); + } catch (TableNotFoundException e) { + throw new AssertionError(NAME + " should exist, but doesn't."); + } catch (TableOfflineException e) { + throw new ReplicationTableOfflineException(e); + } + } + + public static boolean isOnline(Connector conn) { + return TableState.ONLINE == Tables.getTableState(conn.getInstance(), ID); + } + + public static void setOnline(Connector conn) throws AccumuloSecurityException, AccumuloException { + try { + conn.tableOperations().online(NAME, true); + } catch (TableNotFoundException e) { + throw new AssertionError(NAME + " should exist, but doesn't."); + } } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTableOfflineException.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTableOfflineException.java b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTableOfflineException.java new file mode 100644 index 0000000..3cc56e0 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationTableOfflineException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.replication; + +import org.apache.accumulo.core.client.TableOfflineException; + +/** + * + */ +public class ReplicationTableOfflineException extends Exception { + + private static final long serialVersionUID = 1L; + + public ReplicationTableOfflineException(TableOfflineException e) { + super(e); + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java index 54a9f8c..b404d3d 100644 --- a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java +++ b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationOperationsImplTest.java @@ -49,7 +49,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * + * */ public class ReplicationOperationsImplTest { private static final Logger log = LoggerFactory.getLogger(ReplicationOperationsImplTest.class); @@ -67,14 +67,13 @@ public class ReplicationOperationsImplTest { @Test public void waitsUntilEntriesAreReplicated() throws Exception { Connector conn = inst.getConnector("root", new PasswordToken("")); - conn.tableOperations().create(ReplicationTable.NAME); conn.tableOperations().create("foo"); Text tableId = new Text(conn.tableOperations().tableIdMap().get("foo")); String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build(); - BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig()); + BatchWriter bw = ReplicationTable.getBatchWriter(conn); Mutation m = new Mutation(file1); StatusSection.add(m, tableId, ProtobufUtil.toValue(stat)); @@ -136,7 +135,7 @@ public class ReplicationOperationsImplTest { Assert.assertFalse(done.get()); // Remove the replication entries too - bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig()); + bw = ReplicationTable.getBatchWriter(conn); m = new Mutation(file1); m.putDelete(StatusSection.NAME, tableId); bw.addMutation(m); @@ -163,7 +162,6 @@ public class ReplicationOperationsImplTest { @Test public void unrelatedReplicationRecordsDontBlockDrain() throws Exception { Connector conn = inst.getConnector("root", new PasswordToken("")); - conn.tableOperations().create(ReplicationTable.NAME); conn.tableOperations().create("foo"); conn.tableOperations().create("bar"); @@ -173,7 +171,7 @@ public class ReplicationOperationsImplTest { String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(), file2 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build(); - BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig()); + BatchWriter bw = ReplicationTable.getBatchWriter(conn); Mutation m = new Mutation(file1); StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat)); @@ -227,7 +225,7 @@ public class ReplicationOperationsImplTest { Assert.assertFalse(done.get()); // Remove the replication entries too - bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig()); + bw = ReplicationTable.getBatchWriter(conn); m = new Mutation(file1); m.putDelete(StatusSection.NAME, tableId1); bw.addMutation(m); @@ -247,7 +245,6 @@ public class ReplicationOperationsImplTest { @Test public void inprogressReplicationRecordsBlockExecution() throws Exception { Connector conn = inst.getConnector("root", new PasswordToken("")); - conn.tableOperations().create(ReplicationTable.NAME); conn.tableOperations().create("foo"); Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo")); @@ -255,7 +252,7 @@ public class ReplicationOperationsImplTest { String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build(); - BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig()); + BatchWriter bw = ReplicationTable.getBatchWriter(conn); Mutation m = new Mutation(file1); StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat)); @@ -313,7 +310,7 @@ public class ReplicationOperationsImplTest { Assert.assertFalse(done.get()); // Remove the replication entries too - bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig()); + bw = ReplicationTable.getBatchWriter(conn); m = new Mutation(file1); m.put(StatusSection.NAME, tableId1, ProtobufUtil.toValue(newStatus)); bw.addMutation(m); @@ -333,7 +330,6 @@ public class ReplicationOperationsImplTest { @Test public void laterCreatedLogsDontBlockExecution() throws Exception { Connector conn = inst.getConnector("root", new PasswordToken("")); - conn.tableOperations().create(ReplicationTable.NAME); conn.tableOperations().create("foo"); Text tableId1 = new Text(conn.tableOperations().tableIdMap().get("foo")); @@ -341,7 +337,7 @@ public class ReplicationOperationsImplTest { String file1 = "/accumulo/wals/tserver+port/" + UUID.randomUUID(); Status stat = Status.newBuilder().setBegin(0).setEnd(10000).setInfiniteEnd(false).setClosed(false).build(); - BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig()); + BatchWriter bw = ReplicationTable.getBatchWriter(conn); Mutation m = new Mutation(file1); StatusSection.add(m, tableId1, ProtobufUtil.toValue(stat)); bw.addMutation(m); @@ -395,7 +391,7 @@ public class ReplicationOperationsImplTest { System.out.println(e.getKey()); } - bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig()); + bw = ReplicationTable.getBatchWriter(conn); m = new Mutation(file1); m.putDelete(StatusSection.NAME, tableId1); bw.addMutation(m); http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java b/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java index 6c331e0..9cc9bcb 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java +++ b/server/base/src/main/java/org/apache/accumulo/server/ServerConstants.java @@ -47,9 +47,13 @@ public class ServerConstants { public static final Integer WIRE_VERSION = 3; /** - * version (7) reflects the change in the representation of trace information in TraceRepo + * version (7) also reflects the addition of a replication table */ - public static final int DATA_VERSION = 7; + public static final int MOVE_TO_REPLICATION_TABLE = 7; + /** + * this is the current data version + */ + public static final int DATA_VERSION = MOVE_TO_REPLICATION_TABLE; /** * version (6) reflects the addition of a separate root table (ACCUMULO-1481) in version 1.6.0 */ http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java index e4f61f9..487829e 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java +++ b/server/base/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java @@ -49,48 +49,47 @@ import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; public class MetadataConstraints implements Constraint { - + private ZooCache zooCache = null; private String zooRoot = null; - + private static final Logger log = Logger.getLogger(MetadataConstraints.class); - + private static boolean[] validTableNameChars = new boolean[256]; - + { for (int i = 0; i < 256; i++) { - validTableNameChars[i] = ((i >= 'a' && i <= 'z') || (i >= '0' && i <= '9')) || i == '!'; + validTableNameChars[i] = ((i >= 'a' && i <= 'z') || (i >= '0' && i <= '9')) || i == '!' || i == '+'; } } - - private static final HashSet<ColumnFQ> validColumnQuals = new HashSet<ColumnFQ>(Arrays.asList( - TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN, TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN, - TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN, TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN, TabletsSection.ServerColumnFamily.TIME_COLUMN, - TabletsSection.ServerColumnFamily.LOCK_COLUMN, TabletsSection.ServerColumnFamily.FLUSH_COLUMN, TabletsSection.ServerColumnFamily.COMPACT_COLUMN)); - - private static final HashSet<Text> validColumnFams = new HashSet<Text>(Arrays.asList(TabletsSection.BulkFileColumnFamily.NAME, - LogColumnFamily.NAME, ScanFileColumnFamily.NAME, DataFileColumnFamily.NAME, - TabletsSection.CurrentLocationColumnFamily.NAME, TabletsSection.LastLocationColumnFamily.NAME, TabletsSection.FutureLocationColumnFamily.NAME, - ChoppedColumnFamily.NAME, ClonedColumnFamily.NAME)); - + + private static final HashSet<ColumnFQ> validColumnQuals = new HashSet<ColumnFQ>(Arrays.asList(TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN, + TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN, TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN, + TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN, TabletsSection.ServerColumnFamily.TIME_COLUMN, TabletsSection.ServerColumnFamily.LOCK_COLUMN, + TabletsSection.ServerColumnFamily.FLUSH_COLUMN, TabletsSection.ServerColumnFamily.COMPACT_COLUMN)); + + private static final HashSet<Text> validColumnFams = new HashSet<Text>(Arrays.asList(TabletsSection.BulkFileColumnFamily.NAME, LogColumnFamily.NAME, + ScanFileColumnFamily.NAME, DataFileColumnFamily.NAME, TabletsSection.CurrentLocationColumnFamily.NAME, TabletsSection.LastLocationColumnFamily.NAME, + TabletsSection.FutureLocationColumnFamily.NAME, ChoppedColumnFamily.NAME, ClonedColumnFamily.NAME)); + private static boolean isValidColumn(ColumnUpdate cu) { - + if (validColumnFams.contains(new Text(cu.getColumnFamily()))) return true; - + if (validColumnQuals.contains(new ColumnFQ(cu))) return true; - + return false; } - + static private ArrayList<Short> addViolation(ArrayList<Short> lst, int violation) { if (lst == null) lst = new ArrayList<Short>(); lst.add((short) violation); return lst; } - + static private ArrayList<Short> addIfNotPresent(ArrayList<Short> lst, int intViolation) { if (lst == null) return addViolation(lst, intViolation); @@ -99,38 +98,38 @@ public class MetadataConstraints implements Constraint { return addViolation(lst, intViolation); return lst; } - + @Override public List<Short> check(Environment env, Mutation mutation) { - + ArrayList<Short> violations = null; - + Collection<ColumnUpdate> colUpdates = mutation.getUpdates(); - + // check the row, it should contains at least one ; or end with < boolean containsSemiC = false; - + byte[] row = mutation.getRow(); - + // always allow rows that fall within reserved areas if (row.length > 0 && row[0] == '~') return null; if (row.length > 2 && row[0] == '!' && row[1] == '!' && row[2] == '~') return null; - + for (byte b : row) { if (b == ';') { containsSemiC = true; } - + if (b == ';' || b == '<') break; - + if (!validTableNameChars[0xff & b]) { violations = addIfNotPresent(violations, 4); } } - + if (!containsSemiC) { // see if last row char is < if (row.length == 0 || row[row.length - 1] != '<') { @@ -141,38 +140,38 @@ public class MetadataConstraints implements Constraint { violations = addIfNotPresent(violations, 4); } } - + if (row.length > 0 && row[0] == '!') { if (row.length < 3 || row[1] != '0' || (row[2] != '<' && row[2] != ';')) { violations = addIfNotPresent(violations, 4); } } - + // ensure row is not less than Constants.METADATA_TABLE_ID if (new Text(row).compareTo(new Text(MetadataTable.ID)) < 0) { violations = addViolation(violations, 5); } - + boolean checkedBulk = false; - + for (ColumnUpdate columnUpdate : colUpdates) { Text columnFamily = new Text(columnUpdate.getColumnFamily()); - + if (columnUpdate.isDeleted()) { if (!isValidColumn(columnUpdate)) { violations = addViolation(violations, 2); } continue; } - + if (columnUpdate.getValue().length == 0 && !columnFamily.equals(ScanFileColumnFamily.NAME)) { violations = addViolation(violations, 6); } - + if (columnFamily.equals(DataFileColumnFamily.NAME)) { try { DataFileValue dfv = new DataFileValue(columnUpdate.getValue()); - + if (dfv.getSize() < 0 || dfv.getNumEntries() < 0) { violations = addViolation(violations, 1); } @@ -182,7 +181,7 @@ public class MetadataConstraints implements Constraint { violations = addViolation(violations, 1); } } else if (columnFamily.equals(ScanFileColumnFamily.NAME)) { - + } else if (columnFamily.equals(TabletsSection.BulkFileColumnFamily.NAME)) { if (!columnUpdate.isDeleted() && !checkedBulk) { // splits, which also write the time reference, are allowed to write this reference even when @@ -193,13 +192,13 @@ public class MetadataConstraints implements Constraint { // but it writes everything. We allow it to re-write the bulk information if it is setting the location. // See ACCUMULO-1230. boolean isLocationMutation = false; - + HashSet<Text> dataFiles = new HashSet<Text>(); HashSet<Text> loadedFiles = new HashSet<Text>(); String tidString = new String(columnUpdate.getValue(), UTF_8); int otherTidCount = 0; - + for (ColumnUpdate update : mutation.getUpdates()) { if (new ColumnFQ(update).equals(TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN)) { isSplitMutation = true; @@ -209,16 +208,16 @@ public class MetadataConstraints implements Constraint { dataFiles.add(new Text(update.getColumnQualifier())); } else if (new Text(update.getColumnFamily()).equals(TabletsSection.BulkFileColumnFamily.NAME)) { loadedFiles.add(new Text(update.getColumnQualifier())); - + if (!new String(update.getValue(), UTF_8).equals(tidString)) { otherTidCount++; } } } - + if (!isSplitMutation && !isLocationMutation) { long tid = Long.parseLong(tidString); - + try { if (otherTidCount > 0 || !dataFiles.equals(loadedFiles) || !getArbitrator().transactionAlive(Constants.BULK_ARBITRATOR_TYPE, tid)) { violations = addViolation(violations, 8); @@ -227,7 +226,7 @@ public class MetadataConstraints implements Constraint { violations = addViolation(violations, 8); } } - + checkedBulk = true; } } else { @@ -236,11 +235,11 @@ public class MetadataConstraints implements Constraint { } else if (new ColumnFQ(columnUpdate).equals(TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN) && columnUpdate.getValue().length > 0 && (violations == null || !violations.contains((short) 4))) { KeyExtent ke = new KeyExtent(new Text(mutation.getRow()), (Text) null); - + Text per = KeyExtent.decodePrevEndRow(new Value(columnUpdate.getValue())); - + boolean prevEndRowLessThanEndRow = per == null || ke.getEndRow() == null || per.compareTo(ke.getEndRow()) < 0; - + if (!prevEndRowLessThanEndRow) { violations = addViolation(violations, 3); } @@ -248,28 +247,28 @@ public class MetadataConstraints implements Constraint { if (zooCache == null) { zooCache = new ZooCache(); } - + if (zooRoot == null) { zooRoot = ZooUtil.getRoot(HdfsZooInstance.getInstance()); } - + boolean lockHeld = false; String lockId = new String(columnUpdate.getValue(), UTF_8); - + try { lockHeld = ZooLock.isLockHeld(zooCache, new ZooUtil.LockID(zooRoot, lockId)); } catch (Exception e) { log.debug("Failed to verify lock was held " + lockId + " " + e.getMessage()); } - + if (!lockHeld) { violations = addViolation(violations, 7); } } - + } } - + if (violations != null) { log.debug("violating metadata mutation : " + new String(mutation.getRow(), UTF_8)); for (ColumnUpdate update : mutation.getUpdates()) { @@ -277,14 +276,14 @@ public class MetadataConstraints implements Constraint { + (update.isDeleted() ? "[delete]" : new String(update.getValue(), UTF_8))); } } - + return violations; } - + protected Arbitrator getArbitrator() { return new ZooArbitrator(); } - + @Override public String getViolationDescription(short violationCode) { switch (violationCode) { @@ -307,11 +306,11 @@ public class MetadataConstraints implements Constraint { } return null; } - + @Override protected void finalize() { if (zooCache != null) zooCache.clear(); } - + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java index 45e883d..24ff637 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java +++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java @@ -25,10 +25,13 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.Arrays; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Locale; import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; import java.util.UUID; import jline.console.ConsoleReader; @@ -57,10 +60,20 @@ import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.FutureLocationColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; +import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily; import org.apache.accumulo.core.replication.ReplicationConstants; +import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; +import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; +import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.security.SecurityUtil; import org.apache.accumulo.core.util.CachedConfiguration; import org.apache.accumulo.core.util.ColumnFQ; +import org.apache.accumulo.core.util.LocalityGroupUtil; import org.apache.accumulo.core.volume.VolumeConfiguration; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; @@ -91,6 +104,7 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; import com.beust.jcommander.Parameter; +import com.google.common.base.Joiner; /** * This class is used to setup the directory structure and the root tablet to get an instance started @@ -99,7 +113,7 @@ import com.beust.jcommander.Parameter; public class Initialize { private static final Logger log = Logger.getLogger(Initialize.class); private static final String DEFAULT_ROOT_USER = "root"; - public static final String TABLE_TABLETS_TABLET_DIR = "/table_info"; + private static final String TABLE_TABLETS_TABLET_DIR = "/table_info"; private static ConsoleReader reader = null; private static IZooReaderWriter zoo = ZooReaderWriter.getInstance(); @@ -130,6 +144,8 @@ public class Initialize { } private static HashMap<String,String> initialMetadataConf = new HashMap<String,String>(); + private static HashMap<String,String> initialMetadataCombinerConf = new HashMap<String,String>(); + private static HashMap<String,String> initialReplicationTableConf = new HashMap<String,String>(); static { initialMetadataConf.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(), "32K"); initialMetadataConf.put(Property.TABLE_FILE_REPLICATION.getKey(), "5"); @@ -146,13 +162,44 @@ public class Initialize { initialMetadataConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.bulkLoadFilter", "20," + MetadataBulkLoadFilter.class.getName()); initialMetadataConf.put(Property.TABLE_FAILURES_IGNORE.getKey(), "false"); initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "tablet", - String.format("%s,%s", TabletsSection.TabletColumnFamily.NAME, TabletsSection.CurrentLocationColumnFamily.NAME)); - initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "server", String.format("%s,%s,%s,%s", TabletsSection.DataFileColumnFamily.NAME, - TabletsSection.LogColumnFamily.NAME, TabletsSection.ServerColumnFamily.NAME, TabletsSection.FutureLocationColumnFamily.NAME)); + String.format("%s,%s", TabletColumnFamily.NAME, CurrentLocationColumnFamily.NAME)); + initialMetadataConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() + "server", + String.format("%s,%s,%s,%s", DataFileColumnFamily.NAME, LogColumnFamily.NAME, ServerColumnFamily.NAME, FutureLocationColumnFamily.NAME)); initialMetadataConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), "tablet,server"); initialMetadataConf.put(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey(), ""); initialMetadataConf.put(Property.TABLE_INDEXCACHE_ENABLED.getKey(), "true"); initialMetadataConf.put(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "true"); + + // ACCUMULO-3077 Set the combiner on accumulo.metadata during init to reduce the likelihood of a race + // condition where a tserver compacts away Status updates because it didn't see the Combiner configured + IteratorSetting setting = new IteratorSetting(9, ReplicationTableUtil.COMBINER_NAME, StatusCombiner.class); + Combiner.setColumns(setting, Collections.singletonList(new Column(ReplicationSection.COLF))); + for (IteratorScope scope : IteratorScope.values()) { + String root = String.format("%s%s.%s", Property.TABLE_ITERATOR_PREFIX, scope.name().toLowerCase(), setting.getName()); + for (Entry<String,String> prop : setting.getOptions().entrySet()) { + initialMetadataCombinerConf.put(root + ".opt." + prop.getKey(), prop.getValue()); + } + initialMetadataCombinerConf.put(root, setting.getPriority() + "," + setting.getIteratorClass()); + } + + // add combiners to replication table + setting = new IteratorSetting(30, ReplicationTable.COMBINER_NAME, StatusCombiner.class); + setting.setPriority(30); + Combiner.setColumns(setting, Arrays.asList(new Column(StatusSection.NAME), new Column(WorkSection.NAME))); + for (IteratorScope scope : EnumSet.allOf(IteratorScope.class)) { + String root = String.format("%s%s.%s", Property.TABLE_ITERATOR_PREFIX, scope.name().toLowerCase(), setting.getName()); + for (Entry<String,String> prop : setting.getOptions().entrySet()) { + initialReplicationTableConf.put(root + ".opt." + prop.getKey(), prop.getValue()); + } + initialReplicationTableConf.put(root, setting.getPriority() + "," + setting.getIteratorClass()); + } + // add locality groups to replication table + for (Entry<String,Set<Text>> g : ReplicationTable.LOCALITY_GROUPS.entrySet()) { + initialReplicationTableConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX + g.getKey(), LocalityGroupUtil.encodeColumnFamilies(g.getValue())); + } + initialReplicationTableConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(), Joiner.on(",").join(ReplicationTable.LOCALITY_GROUPS.keySet())); + // add formatter to replication table + initialReplicationTableConf.put(Property.TABLE_FORMATTER_CLASS.getKey(), ReplicationTable.STATUS_FORMATTER_CLASS_NAME); } static boolean checkInit(Configuration conf, VolumeManager fs, SiteConfiguration sconf) throws IOException { @@ -305,60 +352,79 @@ public class Initialize { private static void initFileSystem(Opts opts, VolumeManager fs, UUID uuid, String rootTabletDir) throws IOException { initDirs(fs, uuid, VolumeConfiguration.getVolumeUris(SiteConfiguration.getInstance()), false); - // initialize initial metadata config in zookeeper - initMetadataConfig(); + // initialize initial system tables config in zookeeper + initSystemTablesConfig(); String tableMetadataTabletDir = fs.choose(ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + MetadataTable.ID + TABLE_TABLETS_TABLET_DIR; + String replicationTableDefaultTabletDir = fs.choose(ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + ReplicationTable.ID + + Constants.DEFAULT_TABLET_LOCATION; + String defaultMetadataTabletDir = fs.choose(ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + MetadataTable.ID + Constants.DEFAULT_TABLET_LOCATION; // create table and default tablets directories - createDirectories(fs, rootTabletDir, tableMetadataTabletDir, defaultMetadataTabletDir); + createDirectories(fs, rootTabletDir, tableMetadataTabletDir, defaultMetadataTabletDir, replicationTableDefaultTabletDir); + + String ext = FileOperations.getNewFileExtension(AccumuloConfiguration.getDefaultConfiguration()); - // populate the root tablet with info about the metadata tablets - String fileName = rootTabletDir + Path.SEPARATOR + "00000_00000." + FileOperations.getNewFileExtension(AccumuloConfiguration.getDefaultConfiguration()); - createMetadataFile(fs, fileName, MetadataTable.ID, tableMetadataTabletDir, defaultMetadataTabletDir); + // populate the metadata tables tablet with info about the replication table's one initial tablet + String metadataFileName = tableMetadataTabletDir + Path.SEPARATOR + "0_1." + ext; + Tablet replicationTablet = new Tablet(ReplicationTable.ID, replicationTableDefaultTabletDir, null, null); + createMetadataFile(fs, metadataFileName, replicationTablet); + + // populate the root tablet with info about the metadata table's two initial tablets + String rootTabletFileName = rootTabletDir + Path.SEPARATOR + "00000_00000." + ext; + Text splitPoint = TabletsSection.getRange().getEndKey().getRow(); + Tablet tablesTablet = new Tablet(MetadataTable.ID, tableMetadataTabletDir, null, splitPoint, metadataFileName); + Tablet defaultTablet = new Tablet(MetadataTable.ID, defaultMetadataTabletDir, splitPoint, null); + createMetadataFile(fs, rootTabletFileName, tablesTablet, defaultTablet); } - /** - * Create an rfile in the default tablet's directory for a new table. This method is used to create the initial root tablet contents, with information about - * the metadata table's tablets - * - * @param volmanager - * The VolumeManager - * @param fileName - * The location to create the file - * @param tableId - * TableID that is being "created" - * @param tableTabletDir - * The table_info directory for the new table - * @param defaultTabletDir - * The default_tablet directory for the new table - */ - private static void createMetadataFile(VolumeManager volmanager, String fileName, String tableId, String tableTabletDir, String defaultTabletDir) - throws IOException { + private static class Tablet { + String tableId, dir; + Text prevEndRow, endRow; + String[] files; + + Tablet(String tableId, String dir, Text prevEndRow, Text endRow, String... files) { + this.tableId = tableId; + this.dir = dir; + this.prevEndRow = prevEndRow; + this.endRow = endRow; + this.files = files; + } + } + + private static void createMetadataFile(VolumeManager volmanager, String fileName, Tablet... tablets) throws IOException { + // sort file contents in memory, then play back to the file + TreeMap<Key,Value> sorted = new TreeMap<>(); + for (Tablet tablet : tablets) { + createEntriesForTablet(sorted, tablet); + } FileSystem fs = volmanager.getVolumeByPath(new Path(fileName)).getFileSystem(); FileSKVWriter tabletWriter = FileOperations.getInstance().openWriter(fileName, fs, fs.getConf(), AccumuloConfiguration.getDefaultConfiguration()); tabletWriter.startDefaultLocalityGroup(); - Text splitPoint = TabletsSection.getRange().getEndKey().getRow(); - createEntriesForTablet(tabletWriter, tableId, tableTabletDir, null, splitPoint); - createEntriesForTablet(tabletWriter, tableId, defaultTabletDir, splitPoint, null); + for (Entry<Key,Value> entry : sorted.entrySet()) { + tabletWriter.append(entry.getKey(), entry.getValue()); + } tabletWriter.close(); } - private static void createEntriesForTablet(FileSKVWriter writer, String tableId, String tabletDir, Text tabletPrevEndRow, Text tabletEndRow) - throws IOException { - Text extent = new Text(KeyExtent.getMetadataEntry(new Text(tableId), tabletEndRow)); - addEntry(writer, extent, DIRECTORY_COLUMN, new Value(tabletDir.getBytes(UTF_8))); - addEntry(writer, extent, TIME_COLUMN, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes(UTF_8))); - addEntry(writer, extent, PREV_ROW_COLUMN, KeyExtent.encodePrevEndRow(tabletPrevEndRow)); + private static void createEntriesForTablet(TreeMap<Key,Value> map, Tablet tablet) { + Value EMPTY_SIZE = new Value("0,0".getBytes(UTF_8)); + Text extent = new Text(KeyExtent.getMetadataEntry(new Text(tablet.tableId), tablet.endRow)); + addEntry(map, extent, DIRECTORY_COLUMN, new Value(tablet.dir.getBytes(UTF_8))); + addEntry(map, extent, TIME_COLUMN, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes(UTF_8))); + addEntry(map, extent, PREV_ROW_COLUMN, KeyExtent.encodePrevEndRow(tablet.prevEndRow)); + for (String file : tablet.files) { + addEntry(map, extent, new ColumnFQ(DataFileColumnFamily.NAME, new Text(file)), EMPTY_SIZE); + } } - private static void addEntry(FileSKVWriter writer, Text row, ColumnFQ col, Value value) throws IOException { - writer.append(new Key(row, col.getColumnFamily(), col.getColumnQualifier(), 0), value); + private static void addEntry(TreeMap<Key,Value> map, Text row, ColumnFQ col, Value value) { + map.put(new Key(row, col.getColumnFamily(), col.getColumnQualifier(), 0), value); } private static void createDirectories(VolumeManager fs, String... dirs) throws IOException { @@ -404,6 +470,8 @@ public class Initialize { TableManager.prepareNewNamespaceState(uuid, Namespaces.ACCUMULO_NAMESPACE_ID, Namespaces.ACCUMULO_NAMESPACE, NodeExistsPolicy.FAIL); TableManager.prepareNewTableState(uuid, RootTable.ID, Namespaces.ACCUMULO_NAMESPACE_ID, RootTable.NAME, TableState.ONLINE, NodeExistsPolicy.FAIL); TableManager.prepareNewTableState(uuid, MetadataTable.ID, Namespaces.ACCUMULO_NAMESPACE_ID, MetadataTable.NAME, TableState.ONLINE, NodeExistsPolicy.FAIL); + TableManager.prepareNewTableState(uuid, ReplicationTable.ID, Namespaces.ACCUMULO_NAMESPACE_ID, ReplicationTable.NAME, TableState.OFFLINE, + NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + Constants.ZTSERVERS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + Constants.ZPROBLEMS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); @@ -412,8 +480,7 @@ public class Initialize { zoo.putPersistentData(zkInstanceRoot + Constants.ZTRACERS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTERS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_LOCK, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); - zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_GOAL_STATE, MasterGoalState.NORMAL.toString().getBytes(UTF_8), - NodeExistsPolicy.FAIL); + zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_GOAL_STATE, MasterGoalState.NORMAL.toString().getBytes(UTF_8), NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + Constants.ZGC, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + Constants.ZGC_LOCK, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); zoo.putPersistentData(zkInstanceRoot + Constants.ZCONFIG, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL); @@ -484,7 +551,7 @@ public class Initialize { opts.rootpass); } - public static void initMetadataConfig() throws IOException { + public static void initSystemTablesConfig() throws IOException { try { Configuration conf = CachedConfiguration.getInstance(); int max = conf.getInt("dfs.replication.max", 512); @@ -500,21 +567,16 @@ public class Initialize { if (!TablePropUtil.setTableProperty(MetadataTable.ID, entry.getKey(), entry.getValue())) throw new IOException("Cannot create per-table property " + entry.getKey()); } - } catch (Exception e) { - log.fatal("error talking to zookeeper", e); - throw new IOException(e); - } - // ACCUMULO-3077 Set the combiner on accumulo.metadata during init to reduce the likelihood of a race - // condition where a tserver compacts away Status updates because it didn't see the Combiner configured - IteratorSetting setting = new IteratorSetting(9, ReplicationTableUtil.COMBINER_NAME, StatusCombiner.class); - Combiner.setColumns(setting, Collections.singletonList(new Column(ReplicationSection.COLF))); - try { - for (IteratorScope scope : IteratorScope.values()) { - String root = String.format("%s%s.%s", Property.TABLE_ITERATOR_PREFIX, scope.name().toLowerCase(), setting.getName()); - for (Entry<String,String> prop : setting.getOptions().entrySet()) { - TablePropUtil.setTableProperty(MetadataTable.ID, root + ".opt." + prop.getKey(), prop.getValue()); - } - TablePropUtil.setTableProperty(MetadataTable.ID, root, setting.getPriority() + "," + setting.getIteratorClass()); + // Only add combiner config to accumulo.metadata table (ACCUMULO-3077) + for (Entry<String,String> entry : initialMetadataCombinerConf.entrySet()) { + if (!TablePropUtil.setTableProperty(MetadataTable.ID, entry.getKey(), entry.getValue())) + throw new IOException("Cannot create per-table property " + entry.getKey()); + } + + // add configuration to the replication table + for (Entry<String,String> entry : initialReplicationTableConf.entrySet()) { + if (!TablePropUtil.setTableProperty(ReplicationTable.ID, entry.getKey(), entry.getValue())) + throw new IOException("Cannot create per-table property " + entry.getKey()); } } catch (Exception e) { log.fatal("Error talking to ZooKeeper", e); http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java index e05a08c..50b15f5 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/replication/ReplicationUtil.java @@ -18,9 +18,7 @@ package org.apache.accumulo.server.replication; import static java.nio.charset.StandardCharsets.UTF_8; -import java.util.Arrays; import java.util.Collections; -import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -32,10 +30,7 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.IteratorSetting.Column; import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.replication.ReplicaSystem; @@ -44,20 +39,16 @@ import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.Combiner; -import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; import org.apache.accumulo.core.master.thrift.MasterMonitorInfo; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; -import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.core.replication.ReplicationTableOfflineException; import org.apache.accumulo.core.replication.ReplicationTarget; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.security.TablePermission; -import org.apache.accumulo.fate.util.UtilWaitThread; import org.apache.accumulo.server.zookeeper.ZooCache; import org.apache.hadoop.io.Text; import org.slf4j.Logger; @@ -243,9 +234,9 @@ public class ReplicationUtil { if (null != path) { Scanner s; try { - s = conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY); - } catch (TableNotFoundException e) { - log.debug("Replication table no long exists", e); + s = ReplicationTable.getScanner(conn); + } catch (ReplicationTableOfflineException e) { + log.debug("Replication table no longer online", e); return status; } @@ -294,136 +285,4 @@ public class ReplicationUtil { return newMap; } - public static synchronized void createReplicationTable(Connector conn) { - TableOperations tops = conn.tableOperations(); - if (tops.exists(ReplicationTable.NAME)) { - if (configureReplicationTable(conn)) { - return; - } - } - - for (int i = 0; i < 5; i++) { - try { - if (!tops.exists(ReplicationTable.NAME)) { - tops.create(ReplicationTable.NAME, false); - } - break; - } catch (AccumuloException | AccumuloSecurityException e) { - log.error("Failed to create replication table", e); - } catch (TableExistsException e) { - // Shouldn't happen unless someone else made the table - } - log.error("Retrying table creation in 1 second..."); - UtilWaitThread.sleep(1000); - } - - for (int i = 0; i < 5; i++) { - if (configureReplicationTable(conn)) { - return; - } - - log.error("Failed to configure the replication table, retying..."); - UtilWaitThread.sleep(1000); - } - - throw new RuntimeException("Could not configure replication table"); - } - - /** - * Attempts to configure the replication table, will return false if it fails - * - * @param conn - * Connector for the instance - * @return True if the replication table is properly configured - */ - protected static synchronized boolean configureReplicationTable(Connector conn) { - try { - conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.READ); - } catch (AccumuloException | AccumuloSecurityException e) { - log.warn("Could not grant root user read access to replication table", e); - // Should this be fatal? It's only for convenience, all r/w is done by !SYSTEM - } - - TableOperations tops = conn.tableOperations(); - Map<String,EnumSet<IteratorScope>> iterators = null; - try { - iterators = tops.listIterators(ReplicationTable.NAME); - } catch (AccumuloSecurityException | AccumuloException | TableNotFoundException e) { - log.error("Could not fetch iterators for " + ReplicationTable.NAME, e); - return false; - } - - if (!iterators.containsKey(ReplicationTable.COMBINER_NAME)) { - // Set our combiner and combine all columns - IteratorSetting setting = new IteratorSetting(30, ReplicationTable.COMBINER_NAME, StatusCombiner.class); - Combiner.setColumns(setting, Arrays.asList(new Column(StatusSection.NAME), new Column(WorkSection.NAME))); - try { - tops.attachIterator(ReplicationTable.NAME, setting); - } catch (AccumuloSecurityException | AccumuloException | TableNotFoundException e) { - log.error("Could not set StatusCombiner on replication table", e); - return false; - } - } - - Map<String,Set<Text>> localityGroups; - try { - localityGroups = tops.getLocalityGroups(ReplicationTable.NAME); - } catch (TableNotFoundException | AccumuloException e) { - log.error("Could not fetch locality groups", e); - return false; - } - - Set<Text> statusColfams = localityGroups.get(ReplicationTable.STATUS_LG_NAME), workColfams = localityGroups.get(ReplicationTable.WORK_LG_NAME); - if (null == statusColfams || null == workColfams) { - try { - tops.setLocalityGroups(ReplicationTable.NAME, ReplicationTable.LOCALITY_GROUPS); - } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { - log.error("Could not set locality groups on replication table", e); - return false; - } - } - - // Make sure the StatusFormatter is set on the metadata table - Iterable<Entry<String,String>> properties; - try { - properties = tops.getProperties(ReplicationTable.NAME); - } catch (AccumuloException | TableNotFoundException e) { - log.error("Could not fetch table properties on replication table", e); - return false; - } - - boolean formatterConfigured = false; - for (Entry<String,String> property : properties) { - if (Property.TABLE_FORMATTER_CLASS.getKey().equals(property.getKey())) { - if (!ReplicationTable.STATUS_FORMATTER_CLASS_NAME.equals(property.getValue())) { - log.info("Changing formatter for {} table from {} to {}", ReplicationTable.NAME, property.getValue(), ReplicationTable.STATUS_FORMATTER_CLASS_NAME); - try { - tops.setProperty(ReplicationTable.NAME, Property.TABLE_FORMATTER_CLASS.getKey(), ReplicationTable.STATUS_FORMATTER_CLASS_NAME); - } catch (AccumuloException | AccumuloSecurityException e) { - log.error("Could not set formatter on replication table", e); - return false; - } - } - - formatterConfigured = true; - - // Don't need to keep iterating over the properties after we found the one we were looking for - break; - } - } - - if (!formatterConfigured) { - try { - tops.setProperty(ReplicationTable.NAME, Property.TABLE_FORMATTER_CLASS.getKey(), ReplicationTable.STATUS_FORMATTER_CLASS_NAME); - } catch (AccumuloException | AccumuloSecurityException e) { - log.error("Could not set formatter on replication table", e); - return false; - } - } - - log.debug("Successfully configured replication table"); - - return true; - } - } http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java index fa90b3e..71dcdcf 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java @@ -41,6 +41,7 @@ import org.apache.accumulo.core.data.thrift.TRange; import org.apache.accumulo.core.master.thrift.FateOperation; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; +import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.security.NamespacePermission; @@ -296,7 +297,7 @@ public class SecurityOperation { protected boolean _hasTablePermission(String user, String table, TablePermission permission, boolean useCached) throws ThriftSecurityException { targetUserExists(user); - if ((table.equals(MetadataTable.ID) || table.equals(RootTable.ID)) && permission.equals(TablePermission.READ)) + if ((table.equals(MetadataTable.ID) || table.equals(RootTable.ID) || table.equals(ReplicationTable.ID)) && permission.equals(TablePermission.READ)) return true; try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java index 8f84169..903142b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java @@ -17,6 +17,9 @@ package org.apache.accumulo.server.util; import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.TIME_COLUMN; +import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN; import java.io.IOException; import java.util.ArrayList; @@ -64,6 +67,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Da import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily; +import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.core.tabletserver.log.LogEntry; @@ -82,6 +86,7 @@ import org.apache.accumulo.server.fs.FileRef; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManagerImpl; import org.apache.accumulo.server.security.SystemCredentials; +import org.apache.accumulo.server.tablets.TabletTime; import org.apache.accumulo.server.zookeeper.ZooLock; import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.hadoop.fs.FileStatus; @@ -132,10 +137,10 @@ public class MetadataTableUtil { public static void update(Credentials credentials, ZooLock zooLock, Mutation m, KeyExtent extent) { Writer t = extent.isMeta() ? getRootTable(credentials) : getMetadataTable(credentials); - update(t, credentials, zooLock, m); + update(t, zooLock, m); } - public static void update(Writer t, Credentials credentials, ZooLock zooLock, Mutation m) { + public static void update(Writer t, ZooLock zooLock, Mutation m) { if (zooLock != null) putLockID(zooLock, m); while (true) { @@ -971,6 +976,20 @@ public class MetadataTableUtil { } /** + * During an upgrade from 1.6 to 1.7, we need to add the replication table + */ + public static void createReplicationTable(Instance instance, SystemCredentials systemCredentials) throws IOException { + String dir = VolumeManagerImpl.get().choose(ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + ReplicationTable.ID + + Constants.DEFAULT_TABLET_LOCATION; + + Mutation m = new Mutation(new Text(KeyExtent.getMetadataEntry(new Text(ReplicationTable.ID), null))); + m.put(DIRECTORY_COLUMN.getColumnFamily(), DIRECTORY_COLUMN.getColumnQualifier(), 0, new Value(dir.getBytes(UTF_8))); + m.put(TIME_COLUMN.getColumnFamily(), TIME_COLUMN.getColumnQualifier(), 0, new Value((TabletTime.LOGICAL_TIME_ID + "0").getBytes(UTF_8))); + m.put(PREV_ROW_COLUMN.getColumnFamily(), PREV_ROW_COLUMN.getColumnQualifier(), 0, KeyExtent.encodePrevEndRow(null)); + update(getMetadataTable(systemCredentials), null, m); + } + + /** * During an upgrade we need to move deletion requests for files under the !METADATA table to the root tablet. */ public static void moveMetaDeleteMarkers(Instance instance, Credentials creds) { @@ -988,7 +1007,6 @@ public class MetadataTableUtil { break; } } - } public static void moveMetaDeleteMarkersFrom14(Instance instance, Credentials creds) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java b/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java deleted file mode 100644 index 44ac3fe..0000000 --- a/server/base/src/test/java/org/apache/accumulo/server/replication/ReplicationTableTest.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.server.replication; - -import java.util.Arrays; -import java.util.Collections; -import java.util.EnumSet; -import java.util.Map; -import java.util.Set; - -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.IteratorSetting.Column; -import org.apache.accumulo.core.client.admin.TableOperations; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.iterators.Combiner; -import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; -import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; -import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection; -import org.apache.accumulo.core.replication.ReplicationTable; -import org.apache.hadoop.io.Text; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; - -import com.google.common.collect.ImmutableMap; - -/** - * - */ -public class ReplicationTableTest { - - @Rule - public TestName testName = new TestName(); - - private MockInstance instance; - private Connector conn; - - @Before - public void setupMockAccumulo() throws Exception { - instance = new MockInstance(testName.getMethodName()); - conn = instance.getConnector("root", new PasswordToken("")); - } - - @Test - public void replicationTableCreated() { - TableOperations tops = conn.tableOperations(); - Assert.assertFalse(tops.exists(ReplicationTable.NAME)); - ReplicationUtil.createReplicationTable(conn); - Assert.assertTrue(tops.exists(ReplicationTable.NAME)); - } - - @Test - public void replicationTableDoubleCreate() { - TableOperations tops = conn.tableOperations(); - Assert.assertFalse(tops.exists(ReplicationTable.NAME)); - - // Create the table - ReplicationUtil.createReplicationTable(conn); - - // Make sure it exists and save off the id - Assert.assertTrue(tops.exists(ReplicationTable.NAME)); - String tableId = tops.tableIdMap().get(ReplicationTable.NAME); - - // Try to make it again, should return quickly - ReplicationUtil.createReplicationTable(conn); - Assert.assertTrue(tops.exists(ReplicationTable.NAME)); - - // Verify we have the same table as previously - Assert.assertEquals(tableId, tops.tableIdMap().get(ReplicationTable.NAME)); - } - - @Test - public void configureOnExistingTable() throws Exception { - TableOperations tops = conn.tableOperations(); - - // Create the table by hand - tops.create(ReplicationTable.NAME); - Map<String,EnumSet<IteratorScope>> iterators = tops.listIterators(ReplicationTable.NAME); - - Assert.assertFalse(iterators.containsKey(ReplicationTable.COMBINER_NAME)); - - ReplicationUtil.configureReplicationTable(conn); - - // After configure the iterator should be set - iterators = tops.listIterators(ReplicationTable.NAME); - Assert.assertTrue(iterators.containsKey(ReplicationTable.COMBINER_NAME)); - - // Needs to be set below versioning - IteratorSetting expected = new IteratorSetting(30, ReplicationTable.COMBINER_NAME, StatusCombiner.class); - Combiner.setColumns(expected, Arrays.asList(new Column(StatusSection.NAME), new Column(WorkSection.NAME))); - IteratorSetting setting = tops.getIteratorSetting(ReplicationTable.NAME, ReplicationTable.COMBINER_NAME, IteratorScope.scan); - - Assert.assertEquals(expected, setting); - - // Check for locality groups too - Map<String,Set<Text>> expectedLocalityGroups = ImmutableMap.of(ReplicationTable.STATUS_LG_NAME, ReplicationTable.STATUS_LG_COLFAMS, - ReplicationTable.WORK_LG_NAME, ReplicationTable.WORK_LG_COLFAMS); - Assert.assertEquals(expectedLocalityGroups, tops.getLocalityGroups(ReplicationTable.NAME)); - } - - @Test - public void disablesVersioning() throws Exception { - TableOperations tops = conn.tableOperations(); - - if (tops.exists(ReplicationTable.NAME)) { - tops.delete(ReplicationTable.NAME); - } - - ReplicationUtil.createReplicationTable(conn); - - Set<String> iters = tops.listIterators(ReplicationTable.NAME).keySet(); - - Assert.assertEquals(Collections.singleton(ReplicationTable.COMBINER_NAME), iters); - } -} http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index 9928d3c..4c0a3a9 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@ -46,6 +46,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSectio import org.apache.accumulo.core.protobuf.ProtobufUtil; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.core.replication.ReplicationTableOfflineException; import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Authorizations; @@ -86,7 +87,7 @@ public class GarbageCollectWriteAheadLogs { /** * Creates a new GC WAL object. - * + * * @param instance * instance to use * @param fs @@ -102,7 +103,7 @@ public class GarbageCollectWriteAheadLogs { /** * Gets the instance used by this object. - * + * * @return instance */ Instance getInstance() { @@ -111,7 +112,7 @@ public class GarbageCollectWriteAheadLogs { /** * Gets the volume manager used by this object. - * + * * @return volume manager */ VolumeManager getVolumeManager() { @@ -120,7 +121,7 @@ public class GarbageCollectWriteAheadLogs { /** * Checks if the volume manager should move files to the trash rather than delete them. - * + * * @return true if trash is used */ boolean isUsingTrash() { @@ -277,7 +278,7 @@ public class GarbageCollectWriteAheadLogs { /** * Converts a list of paths to their corresponding strings. - * + * * @param paths * list of paths * @return string forms of paths @@ -292,7 +293,7 @@ public class GarbageCollectWriteAheadLogs { /** * Reverses the given mapping of file paths to servers. The returned map provides a list of file paths for each server. Any path whose name is not in the * mapping of file names to paths is skipped. - * + * * @param fileToServerMap * map of file paths to servers * @param nameToFileMap @@ -379,7 +380,7 @@ public class GarbageCollectWriteAheadLogs { /** * Determine if the given WAL is needed for replication - * + * * @param wal * The full path (URI) * @return True if the WAL is still needed by replication (not a candidate for deletion) @@ -447,7 +448,7 @@ public class GarbageCollectWriteAheadLogs { replScanner.setRange(Range.exact(wal)); return Iterables.concat(metaScanner, replScanner); - } catch (TableNotFoundException e) { + } catch (ReplicationTableOfflineException e) { // do nothing } @@ -458,15 +459,15 @@ public class GarbageCollectWriteAheadLogs { return scanServers(ServerConstants.getWalDirs(), fileToServerMap, nameToFileMap); } - // TODO Remove deprecation warning suppression when Hadoop1 support is dropped - @SuppressWarnings("deprecation") /** - * Scans write-ahead log directories for logs. The maps passed in are - * populated with scan information. + * Scans write-ahead log directories for logs. The maps passed in are populated with scan information. * - * @param walDirs write-ahead log directories - * @param fileToServerMap map of file paths to servers - * @param nameToFileMap map of file names to paths + * @param walDirs + * write-ahead log directories + * @param fileToServerMap + * map of file paths to servers + * @param nameToFileMap + * map of file names to paths * @return number of servers located (including those with no logs present) */ int scanServers(String[] walDirs, Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception { @@ -484,7 +485,10 @@ public class GarbageCollectWriteAheadLogs { continue; for (FileStatus status : listing) { String server = status.getPath().getName(); - if (status.isDir()) { + // TODO Remove deprecation warning suppression when Hadoop1 support is dropped + @SuppressWarnings("deprecation") + boolean isDirectory = status.isDir(); + if (isDirectory) { servers.add(server); for (FileStatus file : fs.listStatus(new Path(walRoot, server))) { if (isUUID(file.getPath().getName())) { @@ -513,7 +517,7 @@ public class GarbageCollectWriteAheadLogs { /** * Looks for write-ahead logs in recovery directories. - * + * * @param recoveryDirs * recovery directories * @return map of log file names to paths @@ -540,7 +544,7 @@ public class GarbageCollectWriteAheadLogs { /** * Checks if a string is a valid UUID. - * + * * @param name * string to check * @return true if string is a UUID http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index bbb2010..01fd2c8 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -63,6 +63,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Da import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ScanFileColumnFamily; import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection; import org.apache.accumulo.core.replication.ReplicationTable; +import org.apache.accumulo.core.replication.ReplicationTableOfflineException; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.Credentials; @@ -537,7 +538,7 @@ public class SimpleGarbageCollector implements Iface { } }); - } catch (TableNotFoundException e) { + } catch (ReplicationTableOfflineException e) { // No elements that we need to preclude return Iterators.emptyIterator(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java index d13edf7..1c94253 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java @@ -91,8 +91,8 @@ public class CloseWriteAheadLogReferences implements Runnable { throw new RuntimeException(e); } - if (!conn.tableOperations().exists(ReplicationTable.NAME)) { - log.debug("Replication table doesn't exist, not attempting to clean up wals"); + if (!ReplicationTable.isOnline(conn)) { + log.debug("Replication table isn't online, not attempting to clean up wals"); return; } @@ -124,7 +124,7 @@ public class CloseWriteAheadLogReferences implements Runnable { /** * Construct the set of referenced WALs from the metadata table - * + * * @param conn * Connector * @return The Set of WALs that are referenced in the metadata table @@ -145,7 +145,7 @@ public class CloseWriteAheadLogReferences implements Runnable { BatchScanner bs = null; try { // TODO Configurable number of threads - bs = conn.createBatchScanner(MetadataTable.NAME, new Authorizations(), 4); + bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4); bs.setRanges(Collections.singleton(TabletsSection.getRange())); bs.fetchColumnFamily(LogColumnFamily.NAME); @@ -176,7 +176,7 @@ public class CloseWriteAheadLogReferences implements Runnable { /** * Given the set of WALs which have references in the metadata table, close any status messages with reference that WAL. - * + * * @param conn * Connector * @param referencedWals @@ -188,7 +188,7 @@ public class CloseWriteAheadLogReferences implements Runnable { long recordsClosed = 0; try { bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); - bs = conn.createBatchScanner(MetadataTable.NAME, new Authorizations(), 4); + bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4); bs.setRanges(Collections.singleton(Range.prefix(ReplicationSection.getRowPrefix()))); bs.fetchColumnFamily(ReplicationSection.COLF); @@ -239,7 +239,7 @@ public class CloseWriteAheadLogReferences implements Runnable { /** * Write a closed {@link Status} mutation for the given {@link Key} using the provided {@link BatchWriter} - * + * * @param bw * BatchWriter * @param k http://git-wip-us.apache.org/repos/asf/accumulo/blob/e840549a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java ---------------------------------------------------------------------- diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java index cd69efb..71e5f7d 100644 --- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java +++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java @@ -55,7 +55,6 @@ import org.apache.accumulo.core.replication.StatusUtil; import org.apache.accumulo.core.replication.proto.Replication.Status; import org.apache.accumulo.core.security.Credentials; import org.apache.accumulo.server.fs.VolumeManager; -import org.apache.accumulo.server.replication.ReplicationUtil; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -384,11 +383,9 @@ public class GarbageCollectWriteAheadLogsTest { GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(inst, volMgr, false); - ReplicationUtil.createReplicationTable(conn); - long file1CreateTime = System.currentTimeMillis(); long file2CreateTime = file1CreateTime + 50; - BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig()); + BatchWriter bw = ReplicationTable.getBatchWriter(conn); Mutation m = new Mutation("/wals/" + file1); StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime)); bw.addMutation(m); @@ -428,8 +425,6 @@ public class GarbageCollectWriteAheadLogsTest { GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(inst, volMgr, false); - ReplicationUtil.createReplicationTable(conn); - long file1CreateTime = System.currentTimeMillis(); long file2CreateTime = file1CreateTime + 50; // Write some records to the metadata table, we haven't yet written status records to the replication table @@ -488,8 +483,6 @@ public class GarbageCollectWriteAheadLogsTest { public void fetchesReplicationEntriesFromMetadataAndReplicationTables() throws Exception { Instance inst = new MockInstance(testName.getMethodName()); Connector conn = inst.getConnector("root", new PasswordToken("")); - ReplicationUtil.createReplicationTable(conn); - long walCreateTime = System.currentTimeMillis(); String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678"; BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());