Persist participant's offline timestamp in ParticipantHistory. This is to persist the timestamp when a participant is going offline. 1) If a participant goes offlien gracefully (by calling disconnect()), participant will write a timestamp to its history record. 2) If a participant goes offline without calling disconnect() (e.g, GC, machine crashes), controller will try to set the timestamp in its pipeline triggered by liveInstanceChanges.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/46705c59 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/46705c59 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/46705c59 Branch: refs/heads/helix-0.6.x Commit: 46705c598e054cec54c4d4cd6fb309b3d1a25475 Parents: 9e8ec45 Author: Lei Xia <l...@linkedin.com> Authored: Thu Sep 1 18:21:36 2016 -0700 Committer: Lei Xia <l...@linkedin.com> Committed: Wed Feb 8 09:50:48 2017 -0800 ---------------------------------------------------------------------- .../controller/stages/ClusterDataCache.java | 46 +++++++++++++++ .../helix/manager/zk/ParticipantManager.java | 23 +++++--- .../apache/helix/manager/zk/ZKHelixAdmin.java | 8 +-- .../apache/helix/manager/zk/ZKHelixManager.java | 17 ++++-- .../apache/helix/model/ParticipantHistory.java | 42 +++++++++++++- .../integration/TestNodeOfflineTimeStamp.java | 59 ++++++++++++++++++++ .../TestRebalancerPersistAssignments.java | 38 ++++++++++++- .../integration/ZkStandAloneCMTestBase.java | 11 +++- 8 files changed, 222 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index dbc12d4..2cd8ec0 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -19,6 +19,7 @@ package org.apache.helix.controller.stages; * under the License. */ +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -39,6 +40,7 @@ import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Message; +import org.apache.helix.model.ParticipantHistory; import org.apache.helix.model.ResourceConfig; import org.apache.helix.model.StateModelDefinition; import org.apache.log4j.Logger; @@ -62,6 +64,7 @@ public class ClusterDataCache { Map<String, StateModelDefinition> _stateModelDefMap; Map<String, InstanceConfig> _instanceConfigMap; Map<String, InstanceConfig> _instanceConfigCacheMap; + Map<String, Long> _instanceOfflineTimeMap; Map<String, ResourceConfig> _resourceConfigMap; Map<String, ResourceConfig> _resourceConfigCacheMap; Map<String, ClusterConstraints> _constraintMap; @@ -74,6 +77,8 @@ public class ClusterDataCache { boolean _init = true; + boolean _updateInstanceOfflineTime = true; + private static final Logger LOG = Logger.getLogger(ClusterDataCache.class.getName()); /** @@ -102,6 +107,10 @@ public class ClusterDataCache { _stateModelDefMap = accessor.getChildValuesMap(keyBuilder.stateModelDefs()); _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints()); + if (_init || _updateInstanceOfflineTime) { + updateOfflineInstanceHistory(accessor); + } + if (LOG.isTraceEnabled()) { for (LiveInstance instance : _liveInstanceMap.values()) { LOG.trace("live instance: " + instance.getInstanceName() + " " + instance.getSessionId()); @@ -236,6 +245,42 @@ public class ClusterDataCache { } /** + * Return the last offline time for given instance. + * Return NULL if the instance is ONLINE currently, or the record is not persisted somehow. + * + * @param instanceName + * @return + */ + public Long getInstanceOfflineTime(String instanceName) { + if (_instanceOfflineTimeMap != null) { + return _instanceOfflineTimeMap.get(instanceName); + } + return null; + } + + private void updateOfflineInstanceHistory(HelixDataAccessor accessor) { + List<String> offlineNodes = new ArrayList<String>(_instanceConfigMap.keySet()); + offlineNodes.removeAll(_liveInstanceMap.keySet()); + _instanceOfflineTimeMap = new HashMap<String, Long>(); + + for(String instance : offlineNodes) { + Builder keyBuilder = accessor.keyBuilder(); + PropertyKey propertyKey = keyBuilder.participantHistory(instance); + ParticipantHistory history = accessor.getProperty(propertyKey); + if (history == null) { + history = new ParticipantHistory(instance); + } + if (history.getLastOfflineTime() == ParticipantHistory.ONLINE) { + history.reportOffline(); + // persist history back to ZK. + accessor.setProperty(propertyKey, history); + } + _instanceOfflineTimeMap.put(instance, history.getLastOfflineTime()); + } + _updateInstanceOfflineTime = false; + } + + /** * Retrieves the idealstates for all resources * @return */ @@ -269,6 +314,7 @@ public class ClusterDataCache { liveInstanceMap.put(liveInstance.getId(), liveInstance); } _liveInstanceCacheMap = liveInstanceMap; + _updateInstanceOfflineTime = true; } /** http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java index e6176ef..bf7302b 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java @@ -246,7 +246,9 @@ public class ParticipantManager { } } - updateHistory(); + ParticipantHistory history = getHistory(); + history.reportOnline(_sessionId); + persistHistory(history); } /** @@ -339,20 +341,27 @@ public class ParticipantManager { _messagingService.onConnected(); } - /** - * Update participant session history. - */ - private void updateHistory() { + private ParticipantHistory getHistory() { PropertyKey propertyKey = _keyBuilder.participantHistory(_instanceName); ParticipantHistory history = _dataAccessor.getProperty(propertyKey); if (history == null) { history = new ParticipantHistory(_instanceName); } - history.updateHistory(_sessionId); + return history; + } + + private void persistHistory(ParticipantHistory history) { + PropertyKey propertyKey = _keyBuilder.participantHistory(_instanceName); _dataAccessor.setProperty(propertyKey, history); } - public void shutdown() { + public void reset() { + } + public void disconnect() { + ParticipantHistory history = getHistory(); + history.reportOffline(); + persistHistory(history); + reset(); } } http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 629f40a..b1ce406 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -1054,10 +1054,10 @@ public class ZKHelixAdmin implements HelixAdmin { String path = keyBuilder.constraint(constraintType.toString()).getPath(); baseAccessor.update(path, new DataUpdater<ZNRecord>() { - @Override - public ZNRecord update(ZNRecord currentData) { - ClusterConstraints constraints = - currentData == null ? new ClusterConstraints(constraintType) : new ClusterConstraints(currentData); + @Override public ZNRecord update(ZNRecord currentData) { + ClusterConstraints constraints = currentData == null ? + new ClusterConstraints(constraintType) : + new ClusterConstraints(currentData); constraints.addConstraintItem(constraintId, constraintItem); return constraints.getRecord(); http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java index aa6dc7a..37634bd 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java @@ -572,11 +572,6 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { } } finally { - _zkclient.close(); - _zkclient = null; - _sessionStartTime = null; - LOG.info("Cluster manager: " + _instanceName + " disconnected"); - if (_controller != null) { try { _controller.shutdown(); @@ -587,6 +582,16 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { _leaderElectionHandler = null; } } + + if (_participantManager != null) { + _participantManager.disconnect(); + _participantManager = null; + } + + _zkclient.close(); + _zkclient = null; + _sessionStartTime = null; + LOG.info("Cluster manager: " + _instanceName + " disconnected"); } } @@ -898,7 +903,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { void handleNewSessionAsParticipant() throws Exception { if (_participantManager != null) { - _participantManager.shutdown(); + _participantManager.reset(); } _participantManager = new ParticipantManager(this, _zkclient, _sessionTimeout, _liveInstanceInfoProvider, http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/main/java/org/apache/helix/model/ParticipantHistory.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/ParticipantHistory.java b/helix-core/src/main/java/org/apache/helix/model/ParticipantHistory.java index 585fd59..0ee8a58 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ParticipantHistory.java +++ b/helix-core/src/main/java/org/apache/helix/model/ParticipantHistory.java @@ -40,9 +40,12 @@ public class ParticipantHistory extends HelixProperty { TIME, DATE, SESSION, - HISTORY + HISTORY, + LAST_OFFLINE_TIME } + public static long ONLINE = -1; + public ParticipantHistory(String id) { super(id); } @@ -52,11 +55,44 @@ public class ParticipantHistory extends HelixProperty { } /** - * Update last offline timestamp in participant history. + * Called when a participant went offline or is about to go offline. + * This will update the offline timestamp in participant history. + */ + public void reportOffline() { + long time = System.currentTimeMillis(); + _record.setSimpleField(ConfigProperty.LAST_OFFLINE_TIME.name(), String.valueOf(time)); + } + + /** + * Called when a participant goes online, this will update all related session history. * * @return */ - public void updateHistory(String sessionId) { + public void reportOnline(String sessionId) { + updateSessionHistory(sessionId); + _record.setSimpleField(ConfigProperty.LAST_OFFLINE_TIME.name(), String.valueOf(ONLINE)); + } + + /** + * Get the time when this node goes offline last time (epoch time). + * If the node is currently online, return -1. + * If no offline time is record, return NULL. + * + * @return + */ + public Long getLastOfflineTime() { + String time = _record.getSimpleField(ConfigProperty.LAST_OFFLINE_TIME.name()); + if (time == null) { + return ONLINE; + } + + return Long.valueOf(time); + } + + /** + * Add record to session online history list + */ + private void updateSessionHistory(String sessionId) { List<String> list = _record.getListField(ConfigProperty.HISTORY.name()); if (list == null) { list = new ArrayList<String>(); http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/test/java/org/apache/helix/integration/TestNodeOfflineTimeStamp.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNodeOfflineTimeStamp.java b/helix-core/src/test/java/org/apache/helix/integration/TestNodeOfflineTimeStamp.java new file mode 100644 index 0000000..cfff20b --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestNodeOfflineTimeStamp.java @@ -0,0 +1,59 @@ +package org.apache.helix.integration; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.PropertyKey; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.ParticipantHistory; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestNodeOfflineTimeStamp extends ZkStandAloneCMTestBase { + final String className = getShortClassName(); + + @Test + public void testNodeShutdown() throws Exception { + for (MockParticipantManager participant : _participants) { + ParticipantHistory history = getInstanceHistory(participant.getInstanceName()); + Assert.assertNotNull(history); + Assert.assertEquals(history.getLastOfflineTime(), Long.valueOf(ParticipantHistory.ONLINE)); + } + + long shutdownTime = System.currentTimeMillis(); + _participants[0].syncStop(); + ParticipantHistory history = getInstanceHistory(_participants[0].getInstanceName()); + long recordTime = history.getLastOfflineTime(); + + Assert.assertTrue(Math.abs(shutdownTime - recordTime) <= 500L); + + _participants[0].reset(); + _participants[0].syncStart(); + + history = getInstanceHistory(_participants[0].getInstanceName()); + Assert.assertEquals(history.getLastOfflineTime(), Long.valueOf(ParticipantHistory.ONLINE)); + } + + private ParticipantHistory getInstanceHistory(String instance) { + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); + PropertyKey propertyKey = accessor.keyBuilder().participantHistory(instance); + return accessor.getProperty(propertyKey); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java b/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java index 74c7a9f..fd0cc64 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestRebalancerPersistAssignments.java @@ -29,12 +29,15 @@ import org.apache.helix.model.IdealState; import org.apache.helix.model.IdealState.RebalanceMode; import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterStateVerifier.BestPossibleExternalViewVerifier; import org.apache.helix.tools.ClusterStateVerifier.ClusterStateVerifier; +import org.apache.helix.tools.ClusterStateVerifier.HelixClusterVerifier; import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import java.util.Collections; import java.util.Date; import java.util.HashSet; import java.util.Map; @@ -81,7 +84,40 @@ public class TestRebalancerPersistAssignments extends ZkStandAloneCMTestBase { } @Test(dataProvider = "rebalanceModes") - public void testAutoRebalanceWithPersistAssignmentEnable(RebalanceMode rebalanceMode) + public void testDisablePersist(RebalanceMode rebalanceMode) + throws Exception { + String testDb = "TestDB2-" + rebalanceMode.name(); + + _setupTool.addResourceToCluster(CLUSTER_NAME, testDb, 5, + BuiltInStateModelDefinitions.LeaderStandby.name(), rebalanceMode.name()); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, testDb, 3); + + HelixClusterVerifier verifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR) + .setResources(new HashSet<String>(Collections.singleton(testDb))).build(); + Assert.assertTrue(verifier.verify()); + + // kill 1 node + _participants[0].syncStop(); + + Assert.assertTrue(verifier.verify()); + + IdealState idealState = + _setupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, testDb); + + Set<String> excludedInstances = new HashSet<String>(); + excludedInstances.add(_participants[0].getInstanceName()); + verifyAssignmentInIdealStateWithPersistDisabled(idealState, excludedInstances); + + // clean up + _setupTool.getClusterManagementTool().dropResource(CLUSTER_NAME, testDb); + _participants[0] = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _participants[0].getInstanceName()); + _participants[0].syncStart(); + } + + @Test(dataProvider = "rebalanceModes", dependsOnMethods = {"testDisablePersist"}) + public void testEnablePersist(RebalanceMode rebalanceMode) throws Exception { String testDb = "TestDB1-" + rebalanceMode.name(); enablePersistAssignment(true); http://git-wip-us.apache.org/repos/asf/helix/blob/46705c59/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java index 9f1cf1c..d0609ac 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/integration/ZkStandAloneCMTestBase.java @@ -21,6 +21,9 @@ package org.apache.helix.integration; import java.util.Date; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; import org.apache.helix.tools.ClusterSetup; @@ -46,7 +49,8 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase { protected static final String TEST_DB = "TestDB"; protected static final int _PARTITIONS = 20; - protected ClusterSetup _setupTool = null; + protected ClusterSetup _setupTool; + protected HelixManager _manager; protected final String CLASS_NAME = getShortClassName(); protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; @@ -97,6 +101,11 @@ public class ZkStandAloneCMTestBase extends ZkIntegrationTestBase { ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, CLUSTER_NAME)); Assert.assertTrue(result); + + // create cluster manager + _manager = HelixManagerFactory + .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR); + _manager.connect(); } @AfterClass