DomGarguilo commented on code in PR #4668:
URL: https://github.com/apache/accumulo/pull/4668#discussion_r1638589215
##########
server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java:
##########
@@ -876,61 +853,53 @@ private void cancelOfflineTableMigrations(KeyExtent
extent) {
}
}
- private void repairMetadata(Text row) {
- Manager.log.debug("Attempting repair on {}", row);
- // ACCUMULO-2261 if a dying tserver writes a location before its lock
information propagates, it
- // may cause duplicate assignment.
- // Attempt to find the dead server entry and remove it.
+ private void logIncorrectTabletLocations(TabletMetadata tabletMetadata) {
try {
Map<Key,Value> future = new HashMap<>();
Map<Key,Value> assigned = new HashMap<>();
- KeyExtent extent = KeyExtent.fromMetaRow(row);
- String table = AccumuloTable.METADATA.tableName();
- if (extent.isMeta()) {
- table = AccumuloTable.ROOT.tableName();
+ KeyExtent extent = tabletMetadata.getExtent();
+ var level = Ample.DataLevel.of(extent.tableId());
+
+ Stream<? extends Entry<Key,Value>> entries;
+ if (level == Ample.DataLevel.ROOT) {
+ RootTabletMetadata rtm = RootTabletMetadata.read(manager.getContext());
+ entries = rtm.getKeyValues();
+ } else {
+ Scanner scanner =
+ manager.getContext().createScanner(level.metaTable(),
Authorizations.EMPTY);
+ scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME);
+ scanner.setRange(new Range(extent.toMetaRow()));
+ entries = scanner.stream();
Review Comment:
Could add an `onClose()` here to close the scanner. And then maybe close the
stream after we create the maps.
or
Maybe a new method that returns the stream:
```java
private Stream<? extends Entry<Key,Value>> getMetaEntries(KeyExtent extent)
throws TableNotFoundException, InterruptedException, KeeperException {
Ample.DataLevel level = Ample.DataLevel.of(extent.tableId());
if (level == Ample.DataLevel.ROOT) {
return RootTabletMetadata.read(manager.getContext()).getKeyValues();
} else {
Scanner scanner =
manager.getContext().createScanner(level.metaTable(), Authorizations.EMPTY);
scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME);
scanner.setRange(new Range(extent.toMetaRow()));
return scanner.stream().onClose(scanner::close);
}
}
```
Then we could use the stream in a try-with-resources:
```java
try (Stream<? extends Entry<Key,Value>> entries =
getMetaEntries(extent)) {
entries.forEach(entry -> {
if
(entry.getKey().getColumnFamily().equals(CurrentLocationColumnFamily.NAME)) {
assigned.put(entry.getKey(), entry.getValue());
} else if
(entry.getKey().getColumnFamily().equals(FutureLocationColumnFamily.NAME)) {
future.put(entry.getKey(), entry.getValue());
}
});
}
```
I am not sure if either of these options are an improvement in terms of
simplicity or readability though so might not be worth it.
##########
test/src/main/java/org/apache/accumulo/test/TestDualAssignment.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.time.Duration;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.TabletAvailability;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.Test;
+
+public class TestDualAssignment extends ConfigurableMacBase {
+
+ @Override
+ protected Duration defaultTimeout() {
+ return Duration.ofMinutes(5);
+ }
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration
hadoopCoreSite) {
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ cfg.setProperty(Property.MANAGER_RECOVERY_DELAY, "5s");
+ // use raw local file system so walogs sync and flush will work
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ @Test
+ public void test() throws Exception {
+ // make some tablets, spread 'em around
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProperties()).build()) {
+ ServerContext serverContext = cluster.getServerContext();
+ String table = this.getUniqueNames(1)[0];
+
+ SortedSet<Text> partitions = new TreeSet<>();
+ for (String part : "a b c d e f g h i j k l m n o p q r s t u v w x y
z".split(" ")) {
+ partitions.add(new Text(part));
+ }
+ NewTableConfiguration ntc = new
NewTableConfiguration().withSplits(partitions)
+ .withInitialTabletAvailability(TabletAvailability.HOSTED);
+ c.tableOperations().create(table, ntc);
+
+ var tableId = serverContext.getTableId(table);
+ var extent1 = new KeyExtent(tableId, new Text("m"), new Text("l"));
+
+ var loc1 = TabletMetadata.Location.current("192.168.1.1:9997", "56");
+ var loc2 = TabletMetadata.Location.future("192.168.1.2:9997", "67");
+
+ // set multiple locations for a tablet
+
serverContext.getAmple().mutateTablet(extent1).putLocation(loc1).putLocation(loc2).mutate();
+
+ // This operation will fail when there are two locations set on a tablet
+ assertThrows(AccumuloException.class, () ->
c.tableOperations().setTabletAvailability(table,
+ new Range(), TabletAvailability.HOSTED));
+
+ try (var scanner = c.createScanner(table)) {
+ // should not be able to scan the table when a tablet has multiple
locations
+ assertThrows(IllegalStateException.class, () ->
scanner.stream().count());
+ }
+
+ // fix the tablet metadata
+
serverContext.getAmple().mutateTablet(extent1).deleteLocation(loc1).deleteLocation(loc2)
+ .mutate();
+
+ try (var scanner = c.createScanner(table)) {
+ // should not be able to scan the table when a tablet has multiple
locations
Review Comment:
```suggestion
```
##########
server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java:
##########
@@ -876,61 +853,53 @@ private void cancelOfflineTableMigrations(KeyExtent
extent) {
}
}
- private void repairMetadata(Text row) {
- Manager.log.debug("Attempting repair on {}", row);
- // ACCUMULO-2261 if a dying tserver writes a location before its lock
information propagates, it
- // may cause duplicate assignment.
- // Attempt to find the dead server entry and remove it.
+ private void logIncorrectTabletLocations(TabletMetadata tabletMetadata) {
try {
Map<Key,Value> future = new HashMap<>();
Map<Key,Value> assigned = new HashMap<>();
- KeyExtent extent = KeyExtent.fromMetaRow(row);
- String table = AccumuloTable.METADATA.tableName();
- if (extent.isMeta()) {
- table = AccumuloTable.ROOT.tableName();
+ KeyExtent extent = tabletMetadata.getExtent();
+ var level = Ample.DataLevel.of(extent.tableId());
+
+ Stream<? extends Entry<Key,Value>> entries;
+ if (level == Ample.DataLevel.ROOT) {
+ RootTabletMetadata rtm = RootTabletMetadata.read(manager.getContext());
+ entries = rtm.getKeyValues();
+ } else {
+ Scanner scanner =
+ manager.getContext().createScanner(level.metaTable(),
Authorizations.EMPTY);
+ scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
+ scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME);
+ scanner.setRange(new Range(extent.toMetaRow()));
+ entries = scanner.stream();
}
- Scanner scanner = manager.getContext().createScanner(table,
Authorizations.EMPTY);
- scanner.fetchColumnFamily(CurrentLocationColumnFamily.NAME);
- scanner.fetchColumnFamily(FutureLocationColumnFamily.NAME);
- scanner.setRange(new Range(row));
- for (Entry<Key,Value> entry : scanner) {
+
+ entries.forEach(entry -> {
if
(entry.getKey().getColumnFamily().equals(CurrentLocationColumnFamily.NAME)) {
assigned.put(entry.getKey(), entry.getValue());
} else if
(entry.getKey().getColumnFamily().equals(FutureLocationColumnFamily.NAME)) {
future.put(entry.getKey(), entry.getValue());
}
- }
- if (!future.isEmpty() && !assigned.isEmpty()) {
- Manager.log.warn("Found a tablet assigned and hosted, attempting to
repair");
- } else if (future.size() > 1 && assigned.isEmpty()) {
- Manager.log.warn("Found a tablet assigned to multiple servers,
attempting to repair");
- } else if (future.isEmpty() && assigned.size() > 1) {
- Manager.log.warn("Found a tablet hosted on multiple servers,
attempting to repair");
+ });
+
+ var count = future.size() + assigned.size();
+ if (count <= 1) {
+ Manager.log.trace("Tablet {} seems to have correct location based on
inspection",
+ tabletMetadata.getExtent());
} else {
- Manager.log.info("Attempted a repair, but nothing seems to be
obviously wrong. {} {}",
- assigned, future);
- return;
- }
- Iterator<Entry<Key,Value>> iter =
- Iterators.concat(future.entrySet().iterator(),
assigned.entrySet().iterator());
- while (iter.hasNext()) {
- Entry<Key,Value> entry = iter.next();
- TServerInstance alive =
manager.tserverSet.find(entry.getValue().toString());
- if (alive == null) {
- Manager.log.info("Removing entry {}", entry);
- BatchWriter bw = manager.getContext().createBatchWriter(table);
- Mutation m = new Mutation(entry.getKey().getRow());
- m.putDelete(entry.getKey().getColumnFamily(),
entry.getKey().getColumnQualifier());
- bw.addMutation(m);
- bw.close();
- return;
+ for (Map.Entry<Key,Value> entry : future.entrySet()) {
+ TServerInstance alive =
manager.tserverSet.find(entry.getValue().toString());
+ Manager.log.debug("Saw duplicate future location key:{} value:{}
alive:{} ",
+ entry.getKey(), entry.getValue(), alive != null);
+ }
+ for (Map.Entry<Key,Value> entry : assigned.entrySet()) {
+ TServerInstance alive =
manager.tserverSet.find(entry.getValue().toString());
+ Manager.log.debug("Saw duplicate current location key:{} value:{}
alive:{} ",
+ entry.getKey(), entry.getValue(), alive != null);
}
Review Comment:
```java
private void logDuplicateLocations(Map<Key,Value> locations, String type) {
for (Map.Entry<Key,Value> entry : locations.entrySet()) {
TServerInstance alive =
manager.tserverSet.find(entry.getValue().toString());
Manager.log.debug("Saw duplicate {} location key:{} value:{}
alive:{}", type, entry.getKey(),
entry.getValue(), alive != null);
}
}
```
Could refactor these two loops into one method
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]