YARN-2136. Changed RMStateStore to ignore store opearations when fenced. Contributed by Varun Saxena
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/52bcefca Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/52bcefca Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/52bcefca Branch: refs/heads/HDFS-EC Commit: 52bcefca8bb13d3757009f1f08203e7dca3b1e16 Parents: 26319ba Author: Jian He <jia...@apache.org> Authored: Tue Dec 2 10:53:55 2014 -0800 Committer: Jian He <jia...@apache.org> Committed: Tue Dec 2 10:54:48 2014 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../resourcemanager/recovery/RMStateStore.java | 62 +++++++++++-- .../recovery/RMStateStoreEventType.java | 3 +- .../recovery/ZKRMStateStore.java | 8 ++ .../recovery/TestZKRMStateStore.java | 94 ++++++++++++++++++++ 5 files changed, 161 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bcefca/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index d95679e..3744a1ecb 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -109,6 +109,9 @@ Release 2.7.0 - UNRELEASED YARN-2907. SchedulerNode#toString should print all resource detail instead of only memory. (Rohith via junping_du) + YARN-2136. Changed RMStateStore to ignore store opearations when fenced. + (Varun Saxena via jianhe) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bcefca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java index 35a54c3..00e1dfc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -26,6 +27,7 @@ import java.util.TreeMap; import javax.crypto.SecretKey; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -88,7 +90,8 @@ public abstract class RMStateStore extends AbstractService { public static final Log LOG = LogFactory.getLog(RMStateStore.class); private enum RMStateStoreState { - DEFAULT + ACTIVE, + FENCED }; private static final StateMachineFactory<RMStateStore, @@ -99,17 +102,27 @@ public abstract class RMStateStore extends AbstractService { RMStateStoreState, RMStateStoreEventType, RMStateStoreEvent>( - RMStateStoreState.DEFAULT) - .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + RMStateStoreState.ACTIVE) + .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, RMStateStoreEventType.STORE_APP, new StoreAppTransition()) - .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition()) - .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition()) - .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, + .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition()) - .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, - RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition()); + .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE, + RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition()) + .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED, + RMStateStoreEventType.FENCED) + .addTransition(RMStateStoreState.FENCED, RMStateStoreState.FENCED, + EnumSet.of( + RMStateStoreEventType.STORE_APP, + RMStateStoreEventType.UPDATE_APP, + RMStateStoreEventType.REMOVE_APP, + RMStateStoreEventType.STORE_APP_ATTEMPT, + RMStateStoreEventType.UPDATE_APP_ATTEMPT, + RMStateStoreEventType.FENCED)); private final StateMachine<RMStateStoreState, RMStateStoreEventType, @@ -432,6 +445,11 @@ public abstract class RMStateStore extends AbstractService { dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState)); } + public synchronized void updateFencedState() { + this.stateMachine.doTransition(RMStateStoreEventType.FENCED, + new RMStateStoreEvent(RMStateStoreEventType.FENCED)); + } + /** * Blocking API * Derived classes must implement this method to store the state of an @@ -494,6 +512,10 @@ public abstract class RMStateStore extends AbstractService { public synchronized void storeRMDelegationTokenAndSequenceNumber( RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, int latestSequenceNumber) { + if(isFencedState()) { + LOG.info("State store is in Fenced state. Can't store RM Delegation Token."); + return; + } try { storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate, latestSequenceNumber); @@ -516,6 +538,10 @@ public abstract class RMStateStore extends AbstractService { */ public synchronized void removeRMDelegationToken( RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) { + if(isFencedState()) { + LOG.info("State store is in Fenced state. Can't remove RM Delegation Token."); + return; + } try { removeRMDelegationTokenState(rmDTIdentifier); } catch (Exception e) { @@ -537,6 +563,10 @@ public abstract class RMStateStore extends AbstractService { public synchronized void updateRMDelegationTokenAndSequenceNumber( RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate, int latestSequenceNumber) { + if(isFencedState()) { + LOG.info("State store is in Fenced state. Can't update RM Delegation Token."); + return; + } try { updateRMDelegationTokenAndSequenceNumberInternal(rmDTIdentifier, renewDate, latestSequenceNumber); @@ -558,6 +588,11 @@ public abstract class RMStateStore extends AbstractService { * RMDTSecretManager call this to store the state of a master key */ public synchronized void storeRMDTMasterKey(DelegationKey delegationKey) { + if(isFencedState()) { + LOG.info("State store is in Fenced state. Can't store RM Delegation " + + "Token Master key."); + return; + } try { storeRMDTMasterKeyState(delegationKey); } catch (Exception e) { @@ -577,6 +612,11 @@ public abstract class RMStateStore extends AbstractService { * RMDTSecretManager call this to remove the state of a master key */ public synchronized void removeRMDTMasterKey(DelegationKey delegationKey) { + if(isFencedState()) { + LOG.info("State store is in Fenced state. Can't remove RM Delegation " + + "Token Master key."); + return; + } try { removeRMDTMasterKeyState(delegationKey); } catch (Exception e) { @@ -647,6 +687,11 @@ public abstract class RMStateStore extends AbstractService { } return credentials; } + + @VisibleForTesting + synchronized boolean isFencedState() { + return (RMStateStoreState.FENCED == this.stateMachine.getCurrentState()); + } // Dispatcher related code protected void handleStoreEvent(RMStateStoreEvent event) { @@ -665,6 +710,7 @@ public abstract class RMStateStore extends AbstractService { */ protected void notifyStoreOperationFailed(Exception failureCause) { if (failureCause instanceof StoreFencedException) { + updateFencedState(); Thread standByTransitionThread = new Thread(new StandByTransitionThread()); standByTransitionThread.setName("StandByTransitionThread Handler"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bcefca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java index 903f4e7..9301bf9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java @@ -23,5 +23,6 @@ public enum RMStateStoreEventType { STORE_APP, UPDATE_APP, UPDATE_APP_ATTEMPT, - REMOVE_APP + REMOVE_APP, + FENCED } http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bcefca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java index a19ed30..a718fb5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java @@ -1017,6 +1017,9 @@ public class ZKRMStateStore extends RMStateStore { public void run() { try { while (true) { + if(isFencedState()) { + break; + } doMultiWithRetries(emptyOpList); Thread.sleep(zkSessionTimeout); } @@ -1134,6 +1137,11 @@ public class ZKRMStateStore extends RMStateStore { public synchronized void storeOrUpdateAMRMTokenSecretManagerState( AMRMTokenSecretManagerState amrmTokenSecretManagerState, boolean isUpdate) { + if(isFencedState()) { + LOG.info("State store is in Fenced state. Can't store/update " + + "AMRMToken Secret Manager state."); + return; + } AMRMTokenSecretManagerState data = AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState); byte[] stateData = data.getProto().toByteArray(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bcefca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index 3c7170a..e936677 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -20,22 +20,40 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.util.List; +import javax.crypto.SecretKey; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.junit.Test; @@ -191,4 +209,80 @@ public class TestZKRMStateStore extends RMStateStoreTestBase { HAServiceProtocol.HAServiceState.ACTIVE, rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); } + + @Test + public void testFencedState() throws Exception { + TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester(); + RMStateStore store = zkTester.getRMStateStore(); + + // Move state to FENCED from ACTIVE + store.updateFencedState(); + assertEquals("RMStateStore should have been in fenced state", + true, store.isFencedState()); + + long submitTime = System.currentTimeMillis(); + long startTime = submitTime + 1000; + + // Add a new app + RMApp mockApp = mock(RMApp.class); + ApplicationSubmissionContext context = + new ApplicationSubmissionContextPBImpl(); + when(mockApp.getSubmitTime()).thenReturn(submitTime); + when(mockApp.getStartTime()).thenReturn(startTime); + when(mockApp.getApplicationSubmissionContext()).thenReturn(context); + when(mockApp.getUser()).thenReturn("test"); + store.storeNewApplication(mockApp); + assertEquals("RMStateStore should have been in fenced state", + true, store.isFencedState()); + + // Add a new attempt + ClientToAMTokenSecretManagerInRM clientToAMTokenMgr = + new ClientToAMTokenSecretManagerInRM(); + ApplicationAttemptId attemptId = ConverterUtils + .toApplicationAttemptId("appattempt_1234567894321_0001_000001"); + SecretKey clientTokenMasterKey = + clientToAMTokenMgr.createMasterKey(attemptId); + RMAppAttemptMetrics mockRmAppAttemptMetrics = + mock(RMAppAttemptMetrics.class); + Container container = new ContainerPBImpl(); + container.setId(ConverterUtils.toContainerId("container_1234567891234_0001_01_000001")); + RMAppAttempt mockAttempt = mock(RMAppAttempt.class); + when(mockAttempt.getAppAttemptId()).thenReturn(attemptId); + when(mockAttempt.getMasterContainer()).thenReturn(container); + when(mockAttempt.getClientTokenMasterKey()) + .thenReturn(clientTokenMasterKey); + when(mockAttempt.getRMAppAttemptMetrics()) + .thenReturn(mockRmAppAttemptMetrics); + when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage()) + .thenReturn(new AggregateAppResourceUsage(0,0)); + store.storeNewApplicationAttempt(mockAttempt); + assertEquals("RMStateStore should have been in fenced state", + true, store.isFencedState()); + + long finishTime = submitTime + 1000; + // Update attempt + ApplicationAttemptStateData newAttemptState = + ApplicationAttemptStateData.newInstance(attemptId, container, + store.getCredentialsFromAppAttempt(mockAttempt), + startTime, RMAppAttemptState.FINISHED, "testUrl", + "test", FinalApplicationStatus.SUCCEEDED, 100, + finishTime, 0, 0); + store.updateApplicationAttemptState(newAttemptState); + assertEquals("RMStateStore should have been in fenced state", + true, store.isFencedState()); + + // Update app + ApplicationStateData appState = ApplicationStateData.newInstance(submitTime, + startTime, context, "test"); + store.updateApplicationState(appState); + assertEquals("RMStateStore should have been in fenced state", + true, store.isFencedState()); + + // Remove app + store.removeApplication(mockApp); + assertEquals("RMStateStore should have been in fenced state", + true, store.isFencedState()); + + store.close(); + } }