junkaixue commented on a change in pull request #1812:
URL: https://github.com/apache/helix/pull/1812#discussion_r665698609
##########
File path:
helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
##########
@@ -162,6 +166,20 @@ public void handleNewSession() throws Exception {
setupMsgHandler();
}
+ private boolean shouldCarryOver() {
+ if (_liveInstanceInfoProvider == null) {
+ return true;
+ }
+ ZNRecord additionalLiveInstanceInfo =
_liveInstanceInfoProvider.getAdditionalLiveInstanceInfo();
Review comment:
Why not add checks here?
if (_liveInstanceInfoProvider == null ||
_liveInstanceInfoProvider.getAdditionalLiveInstanceInfo() == null) {
return true;
}
Then we dont have to check additionalLiveInstanceInfo whether it is null or
not. That simplifies the code.
##########
File path:
helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
##########
@@ -413,13 +433,16 @@ private void carryOverPreviousCurrentState() {
* remove previous current state parent nodes
*/
for (String session : sessions) {
- if (session.equals(_sessionId)) {
+ if (session.equals(sessionId)) {
continue;
}
- String path = _keyBuilder.currentStates(_instanceName,
session).getPath();
- LOG.info("Removing current states from previous sessions. path: " +
path);
- _zkclient.deleteRecursively(path);
+ PropertyKey currentStatesProperty =
keyBuilder.currentStates(instanceName, session);
+ String path = currentStatesProperty.getPath();
+ LOG.info("Removing current states from previous sessions. path: {}",
path);
+ if (!dataAccessor.removeProperty(currentStatesProperty)) {
Review comment:
NIT: cannot remember the details. Was this remove recursively delete?
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -630,6 +634,20 @@ void unregisterMessageHandlerFactory(String type) {
+ ", pool: " + pool);
}
+ private void syncFactoryState() {
+ for (Map.Entry<String, MsgHandlerFactoryRegistryItem> entry :
_hdlrFtyRegistry.entrySet()) {
Review comment:
Sorry for the confusing. My bad. This factory registration is still
needed to be synchronized. Because the following function reset could be
parallel called.
##########
File path:
helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
##########
@@ -1376,12 +1384,27 @@ public void handleNewSession(String sessionId) throws
Exception {
}
}
- void handleNewSessionAsParticipant(final String sessionId) throws Exception {
+ private void handleNewSessionInManagementMode(String sessionId) throws
Exception {
+ LOG.info("Skip reset because instance is in {} status",
LiveInstance.LiveInstanceStatus.PAUSED);
+ if (!InstanceType.PARTICIPANT.equals(_instanceType)) {
Review comment:
This logic is not consistent. You add things to CONTROLLER_PARTICIPANT.
But you just allow PARTICIPANT to handle new sessions.
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/MessageHandlerFactory.java
##########
@@ -37,4 +37,8 @@
String getMessageType();
void reset();
+
+ default void sync() {
+ throw new UnsupportedOperationException("Not implemented");
Review comment:
NIT: let's make the sentence more comprehensive.
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -1305,6 +1330,43 @@ String getMessageTarget(String resourceName, String
partitionName) {
return String.format("%s_%s", resourceName, partitionName);
}
+ private void changeParticipantStatus(String instanceName,
+ LiveInstance.LiveInstanceStatus toStatus, HelixManager manager) {
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ String sessionId = manager.getSessionId();
+ String path = accessor.keyBuilder().liveInstance(instanceName).getPath();
+
+ if (LiveInstance.LiveInstanceStatus.PAUSED.equals(toStatus)) {
+ // Entering freeze mode, update live instance status
+ boolean success = accessor.getBaseDataAccessor().update(path, record -> {
+ record.setEnumField(LiveInstance.LiveInstanceProperty.STATUS.name(),
toStatus);
+ return record;
+ }, AccessOption.EPHEMERAL);
+ if (success) {
+ _freezeSessionId = sessionId;
+ _liveInstanceStatus = toStatus;
+ }
+ } else if (LiveInstance.LiveInstanceStatus.NORMAL.equals(toStatus)) {
+ // Exiting freeze mode
+ // session changed, should call state model syncState()
+ if (_freezeSessionId != null && !_freezeSessionId.equals(sessionId)) {
+ syncFactoryState();
+ PropertyKey.Builder keyBuilder = new
PropertyKey.Builder(manager.getClusterName());
+ ParticipantManager
Review comment:
As mentioned above, this carry should not change the state it has for
that state model.
##########
File path:
helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
##########
@@ -369,11 +389,11 @@ private void carryOverPreviousCurrentState() {
}
StateModelDefinition stateModel =
-
_dataAccessor.getProperty(_keyBuilder.stateModelDef(stateModelDefRef));
+
dataAccessor.getProperty(keyBuilder.stateModelDef(stateModelDefRef));
- BaseDataAccessor<ZNRecord> baseAccessor =
_dataAccessor.getBaseDataAccessor();
+ BaseDataAccessor<ZNRecord> baseAccessor =
dataAccessor.getBaseDataAccessor();
String curStatePath =
- _keyBuilder.currentState(_instanceName, _sessionId,
lastCurState.getResourceName())
+ keyBuilder.currentState(instanceName, sessionId,
lastCurState.getResourceName())
.getPath();
String initState = stateModel.getInitialState();
Review comment:
This require some options here:
1. If reset happens, it carries the initial state.
2. If last exist of freeze mode, it requires carrying over with original
state without resetting.
##########
File path:
helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
##########
@@ -1305,6 +1330,43 @@ String getMessageTarget(String resourceName, String
partitionName) {
return String.format("%s_%s", resourceName, partitionName);
}
+ private void changeParticipantStatus(String instanceName,
+ LiveInstance.LiveInstanceStatus toStatus, HelixManager manager) {
+ HelixDataAccessor accessor = manager.getHelixDataAccessor();
+ String sessionId = manager.getSessionId();
+ String path = accessor.keyBuilder().liveInstance(instanceName).getPath();
+
+ if (LiveInstance.LiveInstanceStatus.PAUSED.equals(toStatus)) {
Review comment:
NIT: switch case logic could be a better code viewing for readers.
##########
File path:
helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
##########
@@ -146,18 +147,29 @@ private void sendNopMessage() {
@Override
public void reset() {
- logger.info("Resetting HelixStateMachineEngine");
+ loopStateModelFactories(stateModel -> {
+ stateModel.reset();
+ String initialState =
_stateModelParser.getInitialState(stateModel.getClass());
+ stateModel.updateState(initialState);
+ }, "reset");
Review comment:
NIT: Let's have the enum for "reset" and "syncState" instead of hard
coding.
##########
File path:
helix-core/src/main/java/org/apache/helix/participant/statemachine/StateModel.java
##########
@@ -37,6 +37,13 @@ public String getCurrentState() {
return _currentState;
}
+ /**
+ * Called when cluster is recovering from freeze mode if session changed
during freeze mode.
+ */
+ public void syncState() {
+ logger.warn("Default syncState method invoked.");
Review comment:
NIT: add one more sentence like "No operation has been done."
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]