This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push: new d12ff6c Update Last Location. Solution for #1169 (#1453) d12ff6c is described below commit d12ff6c700e003202fd2e9842499324561246b15 Author: Jeffrey Manno <jeffreymann...@gmail.com> AuthorDate: Mon Apr 6 10:36:44 2020 -0400 Update Last Location. Solution for #1169 (#1453) * Add suspending tablets to ample and in stateStore Function * remove functions not used and finalize changes for refactors * remove collection<assignments> from setFuture * Deleted more unused code. Various requested changes Co-authored-by: Jeffrey Manno <jeffreyma...@gmail.com> Co-authored-by: Mike Miller <mmil...@apache.org> --- .../accumulo/core/metadata/schema/Ample.java | 4 + .../master/state/DistributedStoreException.java | 33 ----- .../master/state/LoggingTabletStateStore.java | 21 ++-- .../server/master/state/MetaDataStateStore.java | 133 +++++++-------------- .../server/master/state/SuspendingTServer.java | 11 -- .../server/master/state/TServerInstance.java | 34 ------ .../server/master/state/TabletStateStore.java | 23 ++-- .../server/master/state/ZooTabletStateStore.java | 26 ++-- .../server/metadata/TabletMutatorBase.java | 17 +++ .../accumulo/server/util/MasterMetadataUtil.java | 22 +--- .../master/state/RootTabletStateStoreTest.java | 13 +- .../apache/accumulo/master/TabletGroupWatcher.java | 7 +- .../org/apache/accumulo/tserver/TabletServer.java | 5 +- .../accumulo/tserver/tablet/DatafileManager.java | 4 +- .../org/apache/accumulo/tserver/tablet/Tablet.java | 7 +- .../test/MasterRepairsDualAssignmentIT.java | 24 ++-- .../accumulo/test/functional/SplitRecoveryIT.java | 12 +- .../apache/accumulo/test/master/MergeStateIT.java | 6 +- .../accumulo/test/performance/NullTserver.java | 9 +- 19 files changed, 138 insertions(+), 273 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java index a7039d4..bd3eb09 100644 --- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java +++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java @@ -222,6 +222,10 @@ public interface Ample { public TabletMutator putChopped(); + public TabletMutator putSuspension(TServer tserver, long suspensionTime); + + public TabletMutator deleteSuspension(); + /** * This method persist (or queues for persisting) previous put and deletes against this object. * Unless this method is called, previous calls will never be persisted. The purpose of this diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java deleted file mode 100644 index a099bbf..0000000 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/DistributedStoreException.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.accumulo.server.master.state; - -public class DistributedStoreException extends Exception { - - private static final long serialVersionUID = 1L; - - public DistributedStoreException(String why) { - super(why); - } - - public DistributedStoreException(Exception cause) { - super(cause); - } - -} diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/LoggingTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/LoggingTabletStateStore.java index 8b5fe1a..4249fc2 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/LoggingTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/LoggingTabletStateStore.java @@ -48,21 +48,21 @@ class LoggingTabletStateStore implements TabletStateStore { } @Override - public void setFutureLocations(Collection<Assignment> assignments) - throws DistributedStoreException { - wrapped.setFutureLocations(assignments); - assignments.forEach(assignment -> TabletLogger.assigned(assignment.tablet, assignment.server)); + public void setFutureLocation(Assignment assignment) { + wrapped.setFutureLocation(assignment); + TabletLogger.assigned(assignment.tablet, assignment.server); + } @Override - public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException { - wrapped.setLocations(assignments); - assignments.forEach(assignment -> TabletLogger.loaded(assignment.tablet, assignment.server)); + public void setLocation(Assignment assignment, TServerInstance prevLastLoc) { + wrapped.setLocation(assignment, prevLastLoc); + TabletLogger.loaded(assignment.tablet, assignment.server); } @Override public void unassign(Collection<TabletLocationState> tablets, - Map<TServerInstance,List<Path>> logsForDeadServers) throws DistributedStoreException { + Map<TServerInstance,List<Path>> logsForDeadServers) { wrapped.unassign(tablets, logsForDeadServers); if (logsForDeadServers == null) @@ -75,8 +75,7 @@ class LoggingTabletStateStore implements TabletStateStore { @Override public void suspend(Collection<TabletLocationState> tablets, - Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp) - throws DistributedStoreException { + Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp) { wrapped.suspend(tablets, logsForDeadServers, suspensionTimestamp); if (logsForDeadServers == null) @@ -89,7 +88,7 @@ class LoggingTabletStateStore implements TabletStateStore { } @Override - public void unsuspend(Collection<TabletLocationState> tablets) throws DistributedStoreException { + public void unsuspend(Collection<TabletLocationState> tablets) { wrapped.unsuspend(tablets); for (TabletLocationState tls : tablets) { TabletLogger.unsuspended(tls.extent); diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java index 6adef3d..ce2fc7f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java @@ -21,17 +21,17 @@ package org.apache.accumulo.server.master.state; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.clientImpl.ClientContext; -import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.metadata.schema.Ample; +import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator; import org.apache.accumulo.core.metadata.schema.MetadataSchema; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class MetaDataStateStore implements TabletStateStore { @@ -42,10 +42,13 @@ class MetaDataStateStore implements TabletStateStore { protected final ClientContext context; protected final CurrentState state; private final String targetTableName; + private final Ample ample; + private static final Logger log = LoggerFactory.getLogger(MetaDataStateStore.class); protected MetaDataStateStore(ClientContext context, CurrentState state, String targetTableName) { this.context = context; this.state = state; + this.ample = context.getAmple(); this.targetTableName = targetTableName; } @@ -60,135 +63,87 @@ class MetaDataStateStore implements TabletStateStore { } @Override - public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException { - BatchWriter writer = createBatchWriter(); - try { - for (Assignment assignment : assignments) { - Mutation m = new Mutation(assignment.tablet.getMetadataEntry()); - assignment.server.putLocation(m); - assignment.server.clearFutureLocation(m); - SuspendingTServer.clearSuspension(m); - writer.addMutation(m); - } - } catch (Exception ex) { - throw new DistributedStoreException(ex); - } finally { - try { - writer.close(); - } catch (MutationsRejectedException e) { - throw new DistributedStoreException(e); - } - } - } + public void setLocation(Assignment assignment, TServerInstance prevLastLoc) { - BatchWriter createBatchWriter() { - try { - return context.createBatchWriter(targetTableName, - new BatchWriterConfig().setMaxMemory(MAX_MEMORY) - .setMaxLatency(LATENCY, TimeUnit.MILLISECONDS).setMaxWriteThreads(THREADS)); - } catch (Exception e) { - throw new RuntimeException(e); + TabletMutator tabletMutator = ample.mutateTablet(assignment.tablet); + tabletMutator.putLocation(assignment.server, LocationType.CURRENT); + tabletMutator.putLocation(assignment.server, LocationType.LAST); + tabletMutator.deleteLocation(assignment.server, LocationType.FUTURE); + + // remove the old location + if (prevLastLoc != null && !prevLastLoc.equals(assignment.server)) { + tabletMutator.deleteLocation(prevLastLoc, LocationType.LAST); } + + tabletMutator.mutate(); + } @Override - public void setFutureLocations(Collection<Assignment> assignments) - throws DistributedStoreException { - BatchWriter writer = createBatchWriter(); - try { - for (Assignment assignment : assignments) { - Mutation m = new Mutation(assignment.tablet.getMetadataEntry()); - SuspendingTServer.clearSuspension(m); - assignment.server.putFutureLocation(m); - writer.addMutation(m); - } - } catch (Exception ex) { - throw new DistributedStoreException(ex); - } finally { - try { - writer.close(); - } catch (MutationsRejectedException e) { - throw new DistributedStoreException(e); - } - } + public void setFutureLocation(Assignment assignment) { + + TabletMutator tabletMutator = ample.mutateTablet(assignment.tablet); + tabletMutator.deleteSuspension(); + tabletMutator.putLocation(assignment.server, LocationType.FUTURE); + tabletMutator.mutate(); + } @Override public void unassign(Collection<TabletLocationState> tablets, - Map<TServerInstance,List<Path>> logsForDeadServers) throws DistributedStoreException { + Map<TServerInstance,List<Path>> logsForDeadServers) { unassign(tablets, logsForDeadServers, -1); } @Override public void suspend(Collection<TabletLocationState> tablets, - Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp) - throws DistributedStoreException { + Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp) { unassign(tablets, logsForDeadServers, suspensionTimestamp); } private void unassign(Collection<TabletLocationState> tablets, - Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp) - throws DistributedStoreException { - BatchWriter writer = createBatchWriter(); - try { + Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp) { + try (var tabletsMutator = ample.mutateTablets()) { for (TabletLocationState tls : tablets) { - Mutation m = new Mutation(tls.extent.getMetadataEntry()); + TabletMutator tabletMutator = tabletsMutator.mutateTablet(tls.extent); if (tls.current != null) { - tls.current.clearLocation(m); + tabletMutator.deleteLocation(tls.current, LocationType.CURRENT); if (logsForDeadServers != null) { List<Path> logs = logsForDeadServers.get(tls.current); if (logs != null) { for (Path log : logs) { LogEntry entry = new LogEntry(tls.extent, 0, tls.current.hostPort(), log.toString()); - m.put(entry.getColumnFamily(), entry.getColumnQualifier(), entry.getValue()); + tabletMutator.putWal(entry); } } } if (suspensionTimestamp >= 0) { - SuspendingTServer suspender = - new SuspendingTServer(tls.current.getLocation(), suspensionTimestamp); - suspender.setSuspension(m); + tabletMutator.putSuspension(tls.current, suspensionTimestamp); } } if (tls.suspend != null && suspensionTimestamp < 0) { - SuspendingTServer.clearSuspension(m); + tabletMutator.deleteSuspension(); } if (tls.future != null) { - tls.future.clearFutureLocation(m); + tabletMutator.deleteLocation(tls.future, LocationType.FUTURE); } - writer.addMutation(m); - } - } catch (Exception ex) { - throw new DistributedStoreException(ex); - } finally { - try { - writer.close(); - } catch (MutationsRejectedException e) { - throw new DistributedStoreException(e); + tabletMutator.mutate(); } } } @Override - public void unsuspend(Collection<TabletLocationState> tablets) throws DistributedStoreException { - BatchWriter writer = createBatchWriter(); - try { + public void unsuspend(Collection<TabletLocationState> tablets) { + + try (var tabletsMutator = ample.mutateTablets()) { for (TabletLocationState tls : tablets) { if (tls.suspend != null) { continue; } - Mutation m = new Mutation(tls.extent.getMetadataEntry()); - SuspendingTServer.clearSuspension(m); - writer.addMutation(m); - } - } catch (Exception ex) { - throw new DistributedStoreException(ex); - } finally { - try { - writer.close(); - } catch (MutationsRejectedException e) { - throw new DistributedStoreException(e); + TabletMutator tabletMutator = tabletsMutator.mutateTablet(tls.extent); + tabletMutator.deleteSuspension(); + tabletMutator.mutate(); } } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java index c6bb343..52de28f 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/SuspendingTServer.java @@ -18,11 +18,8 @@ */ package org.apache.accumulo.server.master.state; -import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.SuspendLocationColumn.SUSPEND_COLUMN; - import java.util.Objects; -import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.util.HostAndPort; @@ -57,14 +54,6 @@ public class SuspendingTServer { return server.equals(rhs.server) && suspensionTime == rhs.suspensionTime; } - public void setSuspension(Mutation m) { - m.put(SUSPEND_COLUMN.getColumnFamily(), SUSPEND_COLUMN.getColumnQualifier(), toValue()); - } - - public static void clearSuspension(Mutation m) { - m.putDelete(SUSPEND_COLUMN.getColumnFamily(), SUSPEND_COLUMN.getColumnQualifier()); - } - @Override public int hashCode() { return Objects.hash(server, suspensionTime); diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java index a539643..d5e40c5 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java @@ -25,10 +25,8 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; -import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.metadata.schema.Ample; -import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; import org.apache.accumulo.core.util.AddressUtil; import org.apache.accumulo.core.util.HostAndPort; @@ -81,30 +79,6 @@ public class TServerInstance implements Ample.TServer, Comparable<TServerInstanc this(location.getHostAndPort(), location.getSession()); } - public void putLocation(Mutation m) { - m.put(TabletsSection.CurrentLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue()); - } - - public void putFutureLocation(Mutation m) { - m.put(TabletsSection.FutureLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue()); - } - - public void putLastLocation(Mutation m) { - m.put(TabletsSection.LastLocationColumnFamily.NAME, asColumnQualifier(), asMutationValue()); - } - - public void clearLastLocation(Mutation m) { - m.putDelete(TabletsSection.LastLocationColumnFamily.NAME, asColumnQualifier()); - } - - public void clearFutureLocation(Mutation m) { - m.putDelete(TabletsSection.FutureLocationColumnFamily.NAME, asColumnQualifier()); - } - - public void clearLocation(Mutation m) { - m.putDelete(TabletsSection.CurrentLocationColumnFamily.NAME, asColumnQualifier()); - } - @Override public int compareTo(TServerInstance other) { if (this == other) @@ -138,14 +112,6 @@ public class TServerInstance implements Ample.TServer, Comparable<TServerInstanc return getLocation().toString(); } - private Text asColumnQualifier() { - return new Text(this.getSession()); - } - - private Value asMutationValue() { - return new Value(getLocation().toString()); - } - @Override public HostAndPort getLocation() { return location; diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java index e74c9ec..5182677 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java @@ -48,12 +48,12 @@ public interface TabletStateStore extends Iterable<TabletLocationState> { /** * Store the assigned locations in the data store. */ - void setFutureLocations(Collection<Assignment> assignments) throws DistributedStoreException; + void setFutureLocation(Assignment assignment); /** * Tablet servers will update the data store with the location when they bring the tablet online */ - void setLocations(Collection<Assignment> assignments) throws DistributedStoreException; + void setLocation(Assignment assignment, TServerInstance prevLastLoc); /** * Mark the tablets as having no known or future location. @@ -64,38 +64,35 @@ public interface TabletStateStore extends Iterable<TabletLocationState> { * a cache of logs in use by servers when they died */ void unassign(Collection<TabletLocationState> tablets, - Map<TServerInstance,List<Path>> logsForDeadServers) throws DistributedStoreException; + Map<TServerInstance,List<Path>> logsForDeadServers); /** * Mark tablets as having no known or future location, but desiring to be returned to their * previous tserver. */ void suspend(Collection<TabletLocationState> tablets, - Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp) - throws DistributedStoreException; + Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp); /** * Remove a suspension marker for a collection of tablets, moving them to being simply unassigned. */ - void unsuspend(Collection<TabletLocationState> tablets) throws DistributedStoreException; + void unsuspend(Collection<TabletLocationState> tablets); public static void unassign(ServerContext context, TabletLocationState tls, - Map<TServerInstance,List<Path>> logsForDeadServers) throws DistributedStoreException { + Map<TServerInstance,List<Path>> logsForDeadServers) { getStoreForTablet(tls.extent, context).unassign(Collections.singletonList(tls), logsForDeadServers); } public static void suspend(ServerContext context, TabletLocationState tls, - Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp) - throws DistributedStoreException { + Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp) { getStoreForTablet(tls.extent, context).suspend(Collections.singletonList(tls), logsForDeadServers, suspensionTimestamp); } - public static void setLocation(ServerContext context, Assignment assignment) - throws DistributedStoreException { - getStoreForTablet(assignment.tablet, context) - .setLocations(Collections.singletonList(assignment)); + public static void setLocation(ServerContext context, Assignment assignment, + TServerInstance prevLastLoc) { + getStoreForTablet(assignment.tablet, context).setLocation(assignment, prevLastLoc); } static TabletStateStore getStoreForTablet(KeyExtent extent, ServerContext context) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java index e34911a..bd5cdf0 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java @@ -103,13 +103,7 @@ class ZooTabletStateStore implements TabletStateStore { } @Override - public void setFutureLocations(Collection<Assignment> assignments) - throws DistributedStoreException { - if (assignments.size() != 1) - throw new IllegalArgumentException("There is only one root tablet"); - Assignment assignment = assignments.iterator().next(); - if (assignment.tablet.compareTo(RootTable.EXTENT) != 0) - throw new IllegalArgumentException("You can only store the root tablet location"); + public void setFutureLocation(Assignment assignment) { TabletMutator tabletMutator = ample.mutateTablet(assignment.tablet); tabletMutator.putLocation(assignment.server, LocationType.FUTURE); @@ -117,23 +111,22 @@ class ZooTabletStateStore implements TabletStateStore { } @Override - public void setLocations(Collection<Assignment> assignments) throws DistributedStoreException { - if (assignments.size() != 1) - throw new IllegalArgumentException("There is only one root tablet"); - Assignment assignment = assignments.iterator().next(); - if (assignment.tablet.compareTo(RootTable.EXTENT) != 0) - throw new IllegalArgumentException("You can only store the root tablet location"); - + public void setLocation(Assignment assignment, TServerInstance prevLastLoc) { TabletMutator tabletMutator = ample.mutateTablet(assignment.tablet); tabletMutator.putLocation(assignment.server, LocationType.CURRENT); + tabletMutator.putLocation(assignment.server, LocationType.LAST); tabletMutator.deleteLocation(assignment.server, LocationType.FUTURE); + if (prevLastLoc != null && !prevLastLoc.equals(assignment.server)) { + tabletMutator.deleteLocation(prevLastLoc, LocationType.LAST); + } + tabletMutator.mutate(); } @Override public void unassign(Collection<TabletLocationState> tablets, - Map<TServerInstance,List<Path>> logsForDeadServers) throws DistributedStoreException { + Map<TServerInstance,List<Path>> logsForDeadServers) { if (tablets.size() != 1) throw new IllegalArgumentException("There is only one root tablet"); TabletLocationState tls = tablets.iterator().next(); @@ -162,8 +155,7 @@ class ZooTabletStateStore implements TabletStateStore { @Override public void suspend(Collection<TabletLocationState> tablets, - Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp) - throws DistributedStoreException { + Map<TServerInstance,List<Path>> logsForDeadServers, long suspensionTimestamp) { // No support for suspending root tablet. unassign(tablets, logsForDeadServers); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java index 6dd9a75..bce340d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java +++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorBase.java @@ -198,6 +198,23 @@ public abstract class TabletMutatorBase implements Ample.TabletMutator { return this; } + @Override + public Ample.TabletMutator putSuspension(Ample.TServer tServer, long suspensionTime) { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + mutation.put(TabletsSection.SuspendLocationColumn.SUSPEND_COLUMN.getColumnFamily(), + TabletsSection.SuspendLocationColumn.SUSPEND_COLUMN.getColumnQualifier(), + new Value(tServer + "|" + suspensionTime)); + return this; + } + + @Override + public Ample.TabletMutator deleteSuspension() { + Preconditions.checkState(updatesEnabled, "Cannot make updates after calling mutate."); + mutation.putDelete(TabletsSection.SuspendLocationColumn.SUSPEND_COLUMN.getColumnFamily(), + TabletsSection.SuspendLocationColumn.SUSPEND_COLUMN.getColumnQualifier()); + return this; + } + protected Mutation getMutation() { updatesEnabled = false; return mutation; diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java index 366e941..f60aa36 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java @@ -181,8 +181,7 @@ public class MasterMetadataUtil { public static void replaceDatafiles(ServerContext context, KeyExtent extent, Set<StoredTabletFile> datafilesToDelete, Set<StoredTabletFile> scanFiles, TabletFile path, - Long compactionId, DataFileValue size, String address, TServerInstance lastLocation, - ZooLock zooLock) { + Long compactionId, DataFileValue size, ZooLock zooLock) { context.getAmple().putGcCandidates(extent.getTableId(), datafilesToDelete); @@ -197,13 +196,6 @@ public class MasterMetadataUtil { if (compactionId != null) tablet.putCompactionId(compactionId); - TServerInstance self = getTServerInstance(address, zooLock); - tablet.putLocation(self, LocationType.LAST); - - // remove the old location - if (lastLocation != null && !lastLocation.equals(self)) - tablet.deleteLocation(lastLocation, LocationType.LAST); - tablet.putZooLock(zooLock); tablet.mutate(); @@ -218,8 +210,8 @@ public class MasterMetadataUtil { */ public static StoredTabletFile updateTabletDataFile(ServerContext context, KeyExtent extent, TabletFile path, StoredTabletFile mergeFile, DataFileValue dfv, MetadataTime time, - Set<StoredTabletFile> filesInUseByScans, String address, ZooLock zooLock, - Set<String> unusedWalLogs, TServerInstance lastLocation, long flushId) { + Set<StoredTabletFile> filesInUseByScans, ZooLock zooLock, + Set<String> unusedWalLogs, long flushId) { TabletMutator tablet = context.getAmple().mutateTablet(extent); StoredTabletFile newFile = null; @@ -228,14 +220,6 @@ public class MasterMetadataUtil { tablet.putFile(path, dfv); tablet.putTime(time); newFile = path.insert(); - - TServerInstance self = getTServerInstance(address, zooLock); - tablet.putLocation(self, LocationType.LAST); - - // remove the old location - if (lastLocation != null && !lastLocation.equals(self)) { - tablet.deleteLocation(lastLocation, LocationType.LAST); - } } tablet.putFlushId(flushId); diff --git a/server/base/src/test/java/org/apache/accumulo/server/master/state/RootTabletStateStoreTest.java b/server/base/src/test/java/org/apache/accumulo/server/master/state/RootTabletStateStoreTest.java index 88959f0..26a8876 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/master/state/RootTabletStateStoreTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/master/state/RootTabletStateStoreTest.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; import java.util.Collections; -import java.util.List; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; @@ -75,14 +74,14 @@ public class RootTabletStateStoreTest { } @Test - public void testRootTabletStateStore() throws DistributedStoreException { + public void testRootTabletStateStore() { ZooTabletStateStore tstore = new ZooTabletStateStore(new TestAmple()); KeyExtent root = RootTable.EXTENT; String sessionId = "this is my unique session data"; TServerInstance server = new TServerInstance(HostAndPort.fromParts("127.0.0.1", 10000), sessionId); - List<Assignment> assignments = Collections.singletonList(new Assignment(root, server)); - tstore.setFutureLocations(assignments); + Assignment assignment = new Assignment(root, server); + tstore.setFutureLocation(assignment); int count = 0; for (TabletLocationState location : tstore) { assertEquals(location.extent, root); @@ -91,7 +90,7 @@ public class RootTabletStateStoreTest { count++; } assertEquals(count, 1); - tstore.setLocations(assignments); + tstore.setLocation(assignment, server); count = 0; for (TabletLocationState location : tstore) { assertEquals(location.extent, root); @@ -118,12 +117,12 @@ public class RootTabletStateStoreTest { KeyExtent notRoot = new KeyExtent(TableId.of("0"), null, null); try { - tstore.setLocations(Collections.singletonList(new Assignment(notRoot, server))); + tstore.setLocation(new Assignment(notRoot, server), assigned.last); fail("should not get here"); } catch (IllegalArgumentException ex) {} try { - tstore.setFutureLocations(Collections.singletonList(new Assignment(notRoot, server))); + tstore.setFutureLocation(new Assignment(notRoot, server)); fail("should not get here"); } catch (IllegalArgumentException ex) {} diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java index 5f29286..49115bf 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java +++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java @@ -80,7 +80,6 @@ import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection; import org.apache.accumulo.server.master.state.Assignment; import org.apache.accumulo.server.master.state.ClosableIterator; -import org.apache.accumulo.server.master.state.DistributedStoreException; import org.apache.accumulo.server.master.state.MergeInfo; import org.apache.accumulo.server.master.state.MergeState; import org.apache.accumulo.server.master.state.TServerInstance; @@ -819,7 +818,7 @@ abstract class TabletGroupWatcher extends Daemon { List<TabletLocationState> assignedToDeadServers, Map<TServerInstance,List<Path>> logsForDeadServers, List<TabletLocationState> suspendedToGoneServers, Map<KeyExtent,TServerInstance> unassigned) - throws DistributedStoreException, TException, WalMarkerException { + throws TException, WalMarkerException { boolean tabletsSuspendable = canSuspendTablets(); if (!assignedToDeadServers.isEmpty()) { int maxServersToShow = min(assignedToDeadServers.size(), 100); @@ -872,7 +871,9 @@ abstract class TabletGroupWatcher extends Daemon { if (assignments.size() > 0) { Master.log.info(String.format("Assigning %d tablets", assignments.size())); - store.setFutureLocations(assignments); + + for (Assignment assignment : assignments) + store.setFutureLocation(assignment); } assignments.addAll(assigned); for (Assignment a : assignments) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index d1ef5ef..17a0519 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -192,7 +192,6 @@ import org.apache.accumulo.server.log.WalStateManager; import org.apache.accumulo.server.log.WalStateManager.WalMarkerException; import org.apache.accumulo.server.master.recovery.RecoveryPath; import org.apache.accumulo.server.master.state.Assignment; -import org.apache.accumulo.server.master.state.DistributedStoreException; import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.master.state.TabletLocationState; import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException; @@ -2364,8 +2363,6 @@ public class TabletServer extends AbstractServer { TabletStateStore.suspend(getContext(), tls, null, requestTimeSkew + MILLISECONDS.convert(System.nanoTime(), NANOSECONDS)); } - } catch (DistributedStoreException ex) { - log.warn("Unable to update storage", ex); } catch (KeeperException e) { log.warn("Unable determine our zookeeper session information", e); } catch (InterruptedException e) { @@ -2508,7 +2505,7 @@ public class TabletServer extends AbstractServer { throw new RuntimeException("Minor compaction after recovery fails for " + extent); } Assignment assignment = new Assignment(extent, getTabletSession()); - TabletStateStore.setLocation(getContext(), assignment); + TabletStateStore.setLocation(getContext(), assignment, data.getLastLocation()); synchronized (openingTablets) { synchronized (onlineTablets) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java index 776b99a..4c0e9ed 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/DatafileManager.java @@ -548,9 +548,7 @@ class DatafileManager { if (filesInUseByScans.size() > 0) log.debug("Adding scan refs to metadata {} {}", extent, filesInUseByScans); MasterMetadataUtil.replaceDatafiles(tablet.getContext(), extent, oldDatafiles, - filesInUseByScans, newFile, compactionId, dfv, - tablet.getTabletServer().getClientAddressString(), lastLocation, - tablet.getTabletServer().getLock()); + filesInUseByScans, newFile, compactionId, dfv, tablet.getTabletServer().getLock()); removeFilesAfterScan(filesInUseByScans); if (log.isTraceEnabled()) { diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java index 50dd5d5..9d237ac 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java @@ -2684,10 +2684,9 @@ public class Tablet { persistedTime = maxCommittedTime; } - return MasterMetadataUtil.updateTabletDataFile(getTabletServer().getContext(), extent, - newDatafile, absMergeFile, dfv, tabletTime.getMetadataTime(persistedTime), - filesInUseByScans, tabletServer.getClientAddressString(), tabletServer.getLock(), - unusedWalLogs, lastLocation, flushId); + return MasterMetadataUtil.updateTabletDataFile(getTabletServer().getContext(), extent, newDatafile, + absMergeFile, dfv, tabletTime.getMetadataTime(persistedTime), filesInUseByScans, + tabletServer.getLock(), unusedWalLogs, flushId); } } diff --git a/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java b/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java index b0537a1..26b7a97 100644 --- a/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java +++ b/test/src/main/java/org/apache/accumulo/test/MasterRepairsDualAssignmentIT.java @@ -28,20 +28,20 @@ import java.util.TreeSet; import org.apache.accumulo.core.client.Accumulo; import org.apache.accumulo.core.client.AccumuloClient; -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.conf.Property; -import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; +import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; import org.apache.accumulo.core.replication.ReplicationTable; import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.fate.util.UtilWaitThread; import org.apache.accumulo.minicluster.ServerType; import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.master.state.ClosableIterator; import org.apache.accumulo.server.master.state.TServerInstance; import org.apache.accumulo.server.master.state.TabletLocationState; @@ -74,6 +74,7 @@ public class MasterRepairsDualAssignmentIT extends ConfigurableMacBase { // make some tablets, spread 'em around try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) { ClientContext context = (ClientContext) c; + ServerContext serverContext = cluster.getServerContext(); String table = this.getUniqueNames(1)[0]; c.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE); @@ -134,19 +135,16 @@ public class MasterRepairsDualAssignmentIT extends ConfigurableMacBase { } assertNotEquals(null, moved); // throw a mutation in as if we were the dying tablet - BatchWriter bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); - Mutation assignment = new Mutation(moved.extent.getMetadataEntry()); - moved.current.putLocation(assignment); - bw.addMutation(assignment); - bw.close(); + TabletMutator tabletMutator = serverContext.getAmple().mutateTablet(moved.extent); + tabletMutator.putLocation(moved.current, LocationType.CURRENT); + tabletMutator.mutate(); // wait for the master to fix the problem waitForCleanStore(store); // now jam up the metadata table - bw = c.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); - assignment = new Mutation(new KeyExtent(MetadataTable.ID, null, null).getMetadataEntry()); - moved.current.putLocation(assignment); - bw.addMutation(assignment); - bw.close(); + tabletMutator = + serverContext.getAmple().mutateTablet(new KeyExtent(MetadataTable.ID, null, null)); + tabletMutator.putLocation(moved.current, LocationType.CURRENT); + tabletMutator.mutate(); waitForCleanStore(TabletStateStore.getStoreForLevel(DataLevel.METADATA, context)); } } diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java index c0760fa..6337ef9 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java @@ -37,10 +37,8 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.clientImpl.ScannerImpl; -import org.apache.accumulo.core.clientImpl.Writer; import org.apache.accumulo.core.conf.SiteConfiguration; import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.dataImpl.KeyExtent; @@ -48,11 +46,13 @@ import org.apache.accumulo.core.file.rfile.RFile; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.TabletFile; +import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator; import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.metadata.schema.MetadataTime; import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.util.ColumnFQ; import org.apache.accumulo.fate.zookeeper.ZooLock; @@ -201,11 +201,11 @@ public class SplitRecoveryIT extends ConfigurableMacBase { MetadataTableUtil.splitTablet(high, extent.getPrevEndRow(), splitRatio, context, zl); TServerInstance instance = new TServerInstance(location, zl.getSessionId()); - Writer writer = MetadataTableUtil.getMetadataTable(context); Assignment assignment = new Assignment(high, instance); - Mutation m = new Mutation(assignment.tablet.getMetadataEntry()); - assignment.server.putFutureLocation(m); - writer.update(m); + + TabletMutator tabletMutator = context.getAmple().mutateTablet(extent); + tabletMutator.putLocation(assignment.server, LocationType.FUTURE); + tabletMutator.mutate(); if (steps >= 1) { Map<Long,List<TabletFile>> bulkFiles = getBulkFilesLoaded(context, extent); diff --git a/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java b/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java index e2b846f..3a081ab 100644 --- a/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/master/MergeStateIT.java @@ -41,6 +41,7 @@ import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.schema.Ample.DataLevel; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.TablePermission; import org.apache.accumulo.core.util.HostAndPort; @@ -180,9 +181,10 @@ public class MergeStateIT extends ConfigurableMacBase { KeyExtent tablet = new KeyExtent(tableId, new Text("p"), new Text("o")); m = tablet.getPrevRowUpdateMutation(); TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN.put(m, new Value("0.5")); + TabletMetadata tabletMetadata = context.getAmple().readTablet(tablet); update(accumuloClient, m); - metaDataStateStore - .setLocations(Collections.singletonList(new Assignment(tablet, state.someTServer))); + TServerInstance tServerInstance = new TServerInstance(tabletMetadata.getLast()); + metaDataStateStore.setLocation(new Assignment(tablet, state.someTServer), tServerInstance); // onos... there's a new tablet online stats = scan(state, metaDataStateStore); diff --git a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java index 9ce7525..b2ec934 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java @@ -321,11 +321,12 @@ public class NullTserver { TabletLocationState next = s.next(); assignments.add(new Assignment(next.extent, instance)); } - } - // point them to this server - TabletStateStore store = TabletStateStore.getStoreForLevel(DataLevel.USER, context); - store.setLocations(assignments); + // point them to this server + TabletStateStore store = TabletStateStore.getStoreForLevel(DataLevel.USER, context); + for (Assignment assignment : assignments) + store.setLocation(assignment, instance); + } while (true) { sleepUninterruptibly(10, TimeUnit.SECONDS); }