jiajunwang commented on a change in pull request #1066:
URL: https://github.com/apache/helix/pull/1066#discussion_r483307174
##########
File path:
helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
##########
@@ -199,7 +208,7 @@ public String getMetadataStoreConnectionString() {
@Override
public String getInstanceName() {
// TODO Auto-generated method stub
Review comment:
Remove it.
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
##########
@@ -1967,18 +1937,19 @@ public void asyncSetData(final String path, Object
datat, final int version,
new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null);
return;
}
- doAsyncSetData(path, data, version, startT, cb);
+ doAsyncSetData(path, data, version, startT, cb,
parseExpectedSessionId(datat));
}
private void doAsyncSetData(final String path, byte[] data, final int
version, final long startT,
Review comment:
How about the sync version of setting data? Maybe also multiOps, which
we may start using soon.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/stages/MessageDispatchStage.java
##########
@@ -78,7 +79,17 @@ protected void processEvent(ClusterEvent event,
MessageOutput messageOutput) thr
batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap,
liveInstanceMap,
manager.getProperties());
+ // An early check for expected leader session. If the sessions don't
match, it means the
+ // controller's session changes, then messages should not be sent and
pipeline should stop.
+ Optional<String> expectedSession =
event.getAttribute(AttributeName.EVENT_SESSION.name());
+ if (!expectedSession.isPresent() ||
!expectedSession.get().equals(manager.getSessionId())) {
+ throw new StageException(String.format(
Review comment:
Can we skip the dispatch but continue the pipeline for the other stages?
I'm not sure, this is a question.
##########
File path: helix-core/src/main/java/org/apache/helix/HelixProperty.java
##########
@@ -148,7 +154,12 @@ public String toString() {
}
}
- private Stat _stat;
+ protected Stat _stat;
+
+ public HelixProperty() {
+ _record = DEFAULT_ZNRECORD;
+ _stat = DEFAULT_STAT;
+ }
Review comment:
I know there are multiple options. But how about the one that I
mentioned above?
##########
File path: helix-core/src/main/java/org/apache/helix/model/Message.java
##########
@@ -937,6 +946,13 @@ private boolean isNullOrEmpty(String data) {
return data == null || data.length() == 0 || data.trim().length() == 0;
}
+ private void initStat() {
Review comment:
This looks ugly.
Let's do this,
public Message(ZNRecord record, String id) {
_record = new SessionAwareZNRecord(record, id);
_stat = new Stat(_record.getVersion(), _record.getCreationTime(),
_record.getModifiedTime(),
_record.getEphemeralOwner());
}
Then the other constructors refer to this one.
##########
File path: helix-core/src/main/java/org/apache/helix/model/Message.java
##########
@@ -206,10 +194,27 @@ public Message(ZNRecord record, String id) {
* @param id unique message identifier
*/
public Message(Message message, String id) {
Review comment:
This seems not being used, just deprecate it?
##########
File path: helix-core/src/main/java/org/apache/helix/HelixManager.java
##########
@@ -419,6 +420,17 @@ void
addExternalViewChangeListener(org.apache.helix.ExternalViewChangeListener l
*/
Long getSessionStartTime();
+ /**
+ * Checks whether the cluster manager is leader and returns the session ID
associated to the
+ * connection of cluster data store, if and only if it is leader.
+ *
+ * @return {@code Optional<String>} session ID is present inside the {@code
Optional} object
+ * if the cluster manager is leader. Otherwise, returns an empty {@code
Optional} object.
+ */
+ default Optional<String> getSessionIdIfLead() {
Review comment:
Optional is not looking good here.
It might be overkill, but how about we define a "private"
SessionAwareHelixManager interface? Which extends HelixManager but add
getSessionIdIfLead() method.
##########
File path:
helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
##########
@@ -703,9 +705,15 @@ private void handleEvent(ClusterEvent event,
BaseControllerDataProvider dataProv
event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(),
_rebalancerRef.getRebalancer(manager));
- if (!manager.isLeader()) {
- logger.error("Cluster manager: " + manager.getInstanceName() + " is not
leader for " + manager
- .getClusterName() + ". Pipeline will not be invoked");
+ // If manager session changes, no need to run pipeline for the stale event.
Review comment:
This is right, but the original logic was, if the manager is not leader,
do not run the pipeline. Where is that logic going?
##########
File path:
zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/SessionAwareZNRecord.java
##########
@@ -0,0 +1,68 @@
+package org.apache.helix.zookeeper.datamodel;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+/**
+ * A class represents a session aware ZNRecord: the ZNRecord should be written
to zk by
+ * the expected zk session. When the ZNRecord is being written to zk, if the
actual
+ * zk session id doesn't match the expected zk session id set in the {@code
SessionAwareZNRecord},
+ * writing to zk will fail. It is supposed to be used within Helix only.
+ * <p>
+ * If this ZNRecord is not supposed to be written only by the expected zk
session,
+ * {@link ZNRecord} is recommended to use.
+ */
+public class SessionAwareZNRecord extends ZNRecord {
+ @JsonIgnore
+ private String expectedSessionId;
+
+ public SessionAwareZNRecord(String id) {
Review comment:
In this case, I guess you don't need public
SessionAwareZNRecord(ZNRecord record) {} it can be transformed to
SessionAwareZNRecord(record, record.getId()) without much overhead.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]