pkuwm commented on a change in pull request #1066:
URL: https://github.com/apache/helix/pull/1066#discussion_r459881518
##########
File path: helix-core/src/main/java/org/apache/helix/HelixProperty.java
##########
@@ -189,7 +200,7 @@ public final String getId() {
* Get the backing ZNRecord
* @return ZNRecord object associated with this property
*/
- public final ZNRecord getRecord() {
+ public ZNRecord getRecord() {
Review comment:
I had another code change that required override `getRecord()` in
Message. But now it doesn't need that. I will restore this.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
##########
@@ -1360,8 +1390,20 @@ private boolean
updateControllerState(NotificationContext changeContext, PauseSi
String uid = UUID.randomUUID().toString().substring(0, 8);
ClusterEvent event = new ClusterEvent(_clusterName,
ClusterEventType.Resume,
String.format("%s_%s", uid, Pipeline.Type.DEFAULT.name()));
+
+ HelixManager manager = changeContext.getManager();
+ Optional<String> leaderSession = manager.getSessionIdIfLead();
Review comment:
`manager.getSessionIdIfLead` checks whether the cluster manager is
leader and returns the session ID associated to the connection of cluster data
store, if and only if it is leader. It combines `isLeader()` and
`getSessionId()` in one method to ensure the session id returning is what we
check leadership in isLeader().
Without it, we may do this:
```
if (isLeader()) {
sessionId = manager.getSessionId();
}
```
The problem is race condition. sessionId may change after isLeader().
So we need `manager.getSessionIdIfLead` to check leadership and get
sessionId at the same time.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/MessageDispatchStage.java
##########
@@ -78,7 +78,17 @@ protected void processEvent(ClusterEvent event,
MessageOutput messageOutput) thr
batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap,
liveInstanceMap,
manager.getProperties());
+ String expectedSession =
event.getAttribute(AttributeName.EVENT_SESSION.name());
+ // An early check for expected leader session. If the sessions don't
match, it means the
+ // controller lost leadership, then messages should not be sent and
pipeline should stop.
+ // This avoids potential double masters for a single partition.
+ if (expectedSession != null &&
!expectedSession.equals(manager.getSessionId())) {
+ throw new StageException(
+ "Controller: " + manager.getInstanceName() + " lost leadership!
Expected session: "
+ + expectedSession + ", actual: " + manager.getSessionId());
+ }
Review comment:
We've discussed this question. We decided not to do the check for all
stages (too many stages). Performance is not concern. Correctness is the most
critical. We only do it for to ensure correctness here.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
##########
@@ -1360,8 +1390,20 @@ private boolean
updateControllerState(NotificationContext changeContext, PauseSi
String uid = UUID.randomUUID().toString().substring(0, 8);
ClusterEvent event = new ClusterEvent(_clusterName,
ClusterEventType.Resume,
String.format("%s_%s", uid, Pipeline.Type.DEFAULT.name()));
+
+ HelixManager manager = changeContext.getManager();
Review comment:
As mentioned above, they are different: ForceRebalance() and
pushEventToQueue() are creating new events. We want to check the leadership and
add the session to the event.
For later checks such as in HandleEvent(), they are just using the event
session id to check current controller session. If controller session changes,
we stop the event/pipeline.
So, since we have 3 places that create new events, I don't think we could
combine them together in one place(I've thought about it and tried).
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
##########
@@ -312,6 +312,17 @@ private void forceRebalance(HelixManager manager,
ClusterEventType eventType) {
changeContext.setType(NotificationContext.Type.CALLBACK);
String uid = UUID.randomUUID().toString().substring(0, 8);
ClusterEvent event = new ClusterEvent(_clusterName, eventType, uid);
+
+ Optional<String> leaderSession = manager.getSessionIdIfLead();
+ // If session is not present, this cluster manager is not leader for the
cluster.
+ if (!leaderSession.isPresent()) {
+ logger.warn("Cluster manager {} is not leader for {}. Event {} is
discarded.",
+ manager.getInstanceName(), manager.getClusterName(), event);
+ return;
+ }
+
+ // Pipeline should be run and Zk writes should be completed by the event
session.
+ event.addAttribute(AttributeName.EVENT_SESSION.name(),
leaderSession.get());
Review comment:
The purpose of this check here is to only add events to the queue if and
only if controller is leader. We attach the session id as event session id. If
controller is not leader, we discard the event and don't push it to queue.
In HandleEvent(), we check it as a filter if we should start the pipeline
for the event. Because during the event in the queue, controller's session may
change. If it changes, there is no need to start the pipeline for the stale
event.
##########
File path: helix-core/src/main/java/org/apache/helix/model/Message.java
##########
@@ -953,4 +970,35 @@ public boolean isValid() {
}
return true;
}
+
+ // A class represents session aware ZNRecord for message. The message should
be written to zk
+ // by the expected session.
+ // TODO: remove this class once public session-aware ZNRecord is available
+ private static class SessionAwareZNRecord extends ZNRecord implements
SessionAwareZkWriteData {
Review comment:
This is the most challenging part remaining in the PR that we consider.
Multiple options are considered and tried. This one is by far a relatively
cleaner one.
Considering:
1. not adding new session aware methods/interfaces ---> attach expected
session together with data object
2. making it generic in zkclient ---> new interface
`SessionAwareZkDataRecord`
3. Less hacky way than checking ZNRecord in zkclient ---->
SessionAwareZNRecord extends ZNRecord implements SessionAwareZkWriteData. We
don't have a public implementation `SessionAwareZNRecord`. So this is needed.
I've added TODO if in future we implement public `SessionAwareZNRecord`, we
should remove this private class.
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/SessionAwareZkWriteData.java
##########
@@ -0,0 +1,34 @@
+package org.apache.helix.zookeeper.zkclient;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * An interface representing data being written to ZK is session aware:
+ * data is supposed to be written by expected ZK session. If ZkClient's actual
session
+ * doesn't match expected session, data is not written to ZK.
+ */
+public interface SessionAwareZkWriteData {
Review comment:
Yep I actually also thought about `SessionAwareZkDataRecord`. But
considering the data is only session-aware for write but not read, I made it
`SessionAwareZkDataRecord` to indicate reading data may not use this one as
session aware. We could think of a better name to make it clear.
@jiajunwang What's your choice?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]