[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/917 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/917#issuecomment-124143378 Tests pass on Travis. Will merge it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/917#issuecomment-124038143 Will rebase and then merge this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/917#issuecomment-123755971 Addressed the following comments: Corrected order of visibility and abstract modifiers. Removed the lazy log field from `FlinkActor`. Now all implementing subclasses have to implement it. Made `RequiresLeaderSessionID` a Java interface. All other comments haven been resolved by discussion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/917#issuecomment-123751721 Do you mean `JobManager.getJobManagerGateway`? This is only a temporary solution to obtain an `ActorGateway` for the JobManager for which you have to know the current leader session ID. This will be changed once HA with ZooKeeper is introduced. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/917#issuecomment-123757309 Looks good. +1 to merge this... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/917#issuecomment-123720593 The mechanism looks good, all in all. Some comments: - I think it makes the code more understandable, if the `decorateMessage()` method would be called something like `attachSession()`, or so. Is the decoration used - We have decided to gradually transition the runtime to Java, as this mixture of languages is making it very clumsy in many parts. All other changes followed the pattern to add new classes only in Java. Are there principle reasons to not do this here as well? Especially by adding classes that are at the core of this new mechanism (like `RequiresLeaderSessionID`) in Scala, we effectively cement this language blend. - In prior refactoring, we changed it such that JobManager, TaskManager, etc do not use mixins any more. A big part of the decision were clean logs and Java interoperability. This patch reverts this effort. Is there any principle reason for that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/917#issuecomment-123723565 The `leaderSessionID` in the JobManager is constant. Can it not change over time? An example would be when a JobManager looses the ZooKeeper connection (network partition) and looses its leader status, but later comes back, and regains leader status. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/917#discussion_r35211408 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.akka; + +import akka.actor.UntypedActor; +import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; +import org.apache.flink.runtime.messages.RequiresLeaderSessionID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.util.UUID; + +/** + * Base class for Flink's actors implemented with Java. Actors inheriting from this class + * automatically log received messages when the debug log level is activated. Furthermore, + * they filter out {@link LeaderSessionMessage} with the wrong leader session ID. If a message + * of type {@link RequiresLeaderSessionID} without being wrapped in a LeaderSessionMessage is + * detected, then an Exception is thrown. + * + * In order to implement the actor behavior, an implementing subclass has to override the method + * handleMessage, which defines how messages are processed. Furthermore, the subclass has to provide + * a leader session ID option which is returned by getLeaderSessionID. + */ +abstract public class FlinkUntypedActor extends UntypedActor { + protected static Logger LOG = LoggerFactory.getLogger(FlinkUntypedActor.class); + + /** +* This method is called by Akka if a new message has arrived for the actor. It logs the +* processing time of the incoming message if the logging level is set to debug. After logging +* the handleLeaderSessionID method is called. +* +* Important: This method cannot be overriden. The actor specific message handling logic is +* implemented by the method handleMessage. +* +* @param message Incoming message +* @throws Exception +*/ + @Override + public final void onReceive(Object message) throws Exception { + if(LOG.isDebugEnabled()) { + LOG.debug(Received message {} at {} from {}., message, getSelf().path(), getSender()); + + long start = System.nanoTime(); + + handleLeaderSessionID(message); + + long duration = (System.nanoTime() - start)/ 100; + + LOG.debug(Handled message {} in {} ms from {}., message, duration, getSender()); + } else { + handleLeaderSessionID(message); + } + } + + /** +* This method filters out {@link LeaderSessionMessage} whose leader session ID is not equal +* to the actors leader session ID. If a message of type {@link RequiresLeaderSessionID} +* arrives, then an Exception is thrown, because these messages have to be wrapped in a +* {@link LeaderSessionMessage}. +* +* @param message Incoming message +* @throws Exception +*/ + private void handleLeaderSessionID(Object message) throws Exception { + if(message instanceof LeaderSessionMessage) { + LeaderSessionMessage msg = (LeaderSessionMessage) message; + + if(msg.leaderSessionID().isDefined() getLeaderSessionID().isDefined()) { + if(getLeaderSessionID().equals(msg.leaderSessionID())) { + // finally call method to handle message + handleMessage(msg.message()); + } else { + handleDiscardedMessage(msg); + } + } else { + handleDiscardedMessage(msg); + } + } else if (message instanceof
[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/917#discussion_r35212080 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/FlinkActor.scala --- @@ -19,34 +19,30 @@ package org.apache.flink.runtime import _root_.akka.actor.Actor - -/** - * Mixin to add debug message logging - */ -trait ActorLogMessages { - that: Actor with ActorSynchronousLogging = - - override def receive: Receive = new Actor.Receive { -private val _receiveWithLogMessages = receiveWithLogMessages - -override def isDefinedAt(x: Any): Boolean = _receiveWithLogMessages.isDefinedAt(x) - -override def apply(x: Any): Unit = { - if (!log.isDebugEnabled) { -_receiveWithLogMessages(x) - } - else { -log.debug(sReceived message $x at ${that.self.path} from ${that.sender()}.) - -val start = System.nanoTime() - -_receiveWithLogMessages(x) - -val duration = (System.nanoTime() - start) / 100 -log.debug(sHandled message $x in $duration ms from ${that.sender()}.) - } -} +import grizzled.slf4j.Logger + +/** Base trait for Flink's actors. + * + * The message handling logic is defined in the handleMessage method. This allows to mixin + * stackable traits which change the message receiving behaviour. + */ +trait FlinkActor extends Actor { + lazy val log = Logger(getClass) --- End diff -- Is this log used anywhere? Lazy variables usually have an extra cost on every access. They need strictly volatile accesses. That voids the paradigm of being able to inexpensively check whether the log is set to DEBUG, for example. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/917#discussion_r35214927 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.akka; + +import akka.actor.UntypedActor; +import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; +import org.apache.flink.runtime.messages.RequiresLeaderSessionID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.util.UUID; + +/** + * Base class for Flink's actors implemented with Java. Actors inheriting from this class + * automatically log received messages when the debug log level is activated. Furthermore, + * they filter out {@link LeaderSessionMessage} with the wrong leader session ID. If a message + * of type {@link RequiresLeaderSessionID} without being wrapped in a LeaderSessionMessage is + * detected, then an Exception is thrown. + * + * In order to implement the actor behavior, an implementing subclass has to override the method + * handleMessage, which defines how messages are processed. Furthermore, the subclass has to provide + * a leader session ID option which is returned by getLeaderSessionID. + */ +abstract public class FlinkUntypedActor extends UntypedActor { + protected static Logger LOG = LoggerFactory.getLogger(FlinkUntypedActor.class); + + /** +* This method is called by Akka if a new message has arrived for the actor. It logs the +* processing time of the incoming message if the logging level is set to debug. After logging +* the handleLeaderSessionID method is called. +* +* Important: This method cannot be overriden. The actor specific message handling logic is +* implemented by the method handleMessage. +* +* @param message Incoming message +* @throws Exception +*/ + @Override + public final void onReceive(Object message) throws Exception { + if(LOG.isDebugEnabled()) { + LOG.debug(Received message {} at {} from {}., message, getSelf().path(), getSender()); + + long start = System.nanoTime(); + + handleLeaderSessionID(message); + + long duration = (System.nanoTime() - start)/ 100; + + LOG.debug(Handled message {} in {} ms from {}., message, duration, getSender()); + } else { + handleLeaderSessionID(message); + } + } + + /** +* This method filters out {@link LeaderSessionMessage} whose leader session ID is not equal +* to the actors leader session ID. If a message of type {@link RequiresLeaderSessionID} +* arrives, then an Exception is thrown, because these messages have to be wrapped in a +* {@link LeaderSessionMessage}. +* +* @param message Incoming message +* @throws Exception +*/ + private void handleLeaderSessionID(Object message) throws Exception { + if(message instanceof LeaderSessionMessage) { + LeaderSessionMessage msg = (LeaderSessionMessage) message; + + if(msg.leaderSessionID().isDefined() getLeaderSessionID().isDefined()) { + if(getLeaderSessionID().equals(msg.leaderSessionID())) { + // finally call method to handle message + handleMessage(msg.message()); + } else { + handleDiscardedMessage(msg); + } + } else { + handleDiscardedMessage(msg); + } + } else if (message instanceof
[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/917#discussion_r35216181 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/FlinkActor.scala --- @@ -19,34 +19,30 @@ package org.apache.flink.runtime import _root_.akka.actor.Actor - -/** - * Mixin to add debug message logging - */ -trait ActorLogMessages { - that: Actor with ActorSynchronousLogging = - - override def receive: Receive = new Actor.Receive { -private val _receiveWithLogMessages = receiveWithLogMessages - -override def isDefinedAt(x: Any): Boolean = _receiveWithLogMessages.isDefinedAt(x) - -override def apply(x: Any): Unit = { - if (!log.isDebugEnabled) { -_receiveWithLogMessages(x) - } - else { -log.debug(sReceived message $x at ${that.self.path} from ${that.sender()}.) - -val start = System.nanoTime() - -_receiveWithLogMessages(x) - -val duration = (System.nanoTime() - start) / 100 -log.debug(sHandled message $x in $duration ms from ${that.sender()}.) - } -} +import grizzled.slf4j.Logger + +/** Base trait for Flink's actors. + * + * The message handling logic is defined in the handleMessage method. This allows to mixin + * stackable traits which change the message receiving behaviour. + */ +trait FlinkActor extends Actor { + lazy val log = Logger(getClass) --- End diff -- I think abstract variables just give us function calls instead. Can it be a regular variable? When is the `getClass()` method here actually called? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/917#issuecomment-123736731 Is the `JobManagerGateway` a temporary solution? If I understood it correctly, in the full-fledged integration with Zookeeper, an actor that wants to send a message to the JobManager will look up both the leader Akka URL and the leader session ID from Zookeeper. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/917#discussion_r35215346 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/FlinkActor.scala --- @@ -19,34 +19,30 @@ package org.apache.flink.runtime import _root_.akka.actor.Actor - -/** - * Mixin to add debug message logging - */ -trait ActorLogMessages { - that: Actor with ActorSynchronousLogging = - - override def receive: Receive = new Actor.Receive { -private val _receiveWithLogMessages = receiveWithLogMessages - -override def isDefinedAt(x: Any): Boolean = _receiveWithLogMessages.isDefinedAt(x) - -override def apply(x: Any): Unit = { - if (!log.isDebugEnabled) { -_receiveWithLogMessages(x) - } - else { -log.debug(sReceived message $x at ${that.self.path} from ${that.sender()}.) - -val start = System.nanoTime() - -_receiveWithLogMessages(x) - -val duration = (System.nanoTime() - start) / 100 -log.debug(sHandled message $x in $duration ms from ${that.sender()}.) - } -} +import grizzled.slf4j.Logger + +/** Base trait for Flink's actors. + * + * The message handling logic is defined in the handleMessage method. This allows to mixin + * stackable traits which change the message receiving behaviour. + */ +trait FlinkActor extends Actor { + lazy val log = Logger(getClass) --- End diff -- Yes it is used by the subclasses of `FlinkActor`. By initializing the logger lazily, we achieve that the logger gets the right class name from the implementing subclass. But we can also define it abstractly so that the implementing subclass has to define it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/917#issuecomment-123737269 * I chose to call the method `decorateMessage` because it not necessarily only attaches leader session IDs. In the future we might use this method for something other than just wrapping the messages in a `LeaderSessionMessage`. Moreover, calling it `attachSession` would contradict the idea of transparency here. But we can change it as long as we don't have to do any other decorations. * The trait `RequiresLeaderSessionID` is only a marker trait which does not contain any code. Thus, it is effectively equivalent to a Java interface and in fact Java classes can implement it. The only reason I chose a trait instead of an interface is that all messages which are sent by the actors are implemented as Scala classes. I can change it to be an interface, however I don't think that it cements the language blend. * I agree that it's not necessary to use mixins here. The reason to implement the logging and leader session message decoration as a mixin was just the beauty of the implementation. But I forgot about the clumsy names in the log output. I will include the mixins in the class `FlinkActor`. * It is true that the leader session ID can change in the case of HA with ZooKeeper. However, the ZooKeeper HA is not part of this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/917 [FLINK-2332] [runtime] Adds leader session IDs and registration session IDs ## Registration Session IDs Introduces registration session IDs for all registration messages. Upon receiving a registration message, this ID is checked and if not correct, the message is discarded. That way, it is possible to distinguish old registration messages which are delayed from valid ones. In the current implementation, a static registration session ID is assigned when the `TaskManager` actor is created. However, with support for high availability, where the leader can change while trying to register at old leader, it becomes important to distinguish old from new registration messages. ## Leader Session IDs In order to support high availability, we not only have to distinguish the old from the new registration messages, but also control messages which are sent to and from the `JobManager` and the `TaskManager`. In order to filter out possibly old messages, this PR introduces a leader session ID which denotes the currently valid messages. However, unlike the registration session ID, leader session IDs are assigned to messages transparently. Messages which extend the `RequiresLeaderSessionID` interface will be wrapped in a `LeaderSessionMessage` which also contains the currently known leader session ID. At the receiving end, the `LeaderSessionMessages` are unpacked and the received leader session ID is compared to the currently stored leader session ID. If both IDs are the same, the wrapped message is processed. If not, then wrapped message will be discarded. In order to support this behaviour, the PR introduces a new `FlinkActor` for Scala actors and a `FlinkUntypedActor` for Java actors. Both actors provide a `decorateMessage` method which allows sub types of the `FlinkActor`/`FlinkUntypedActor` to decorate outgoing messages. Therefore, all implementing classes are supposed to call `decorateMessage` before sending a message to another actor. The `FlinkUntypedActor` already comes with support for message logging and leader session message filtering. Furthermore, its `decorateMessage` method implementation checks for each message if it's an subtype of `RequiresLeaderSessionID` or not and if it is the case, then wraps this message in a `LeaderSessionMessage`. The receive method of this class, will then take care to unwrap the messages accordingly. In order to have the same behavior with Scala actors one has to extend the `FlinkActor` and mixin the `LeaderSessionMessages` and `LoggingMessages` mixins. They effectively do the same as the `FlinkUntypedActor`, but offer a better extensibility of the Scala actors in the future. In case that a `RequiresLeaderSessionID` message is received without being wrapped in a `LeaderSessionMessage` by a FlinkActor, an exception is thrown, which effectively terminates the execution of the actor. The reason for this is that a unwrapped message might leave the system in an inconsistent state if it's a message from an old leader. Furthermore, since not every `RequiresLeaderSessionID` message requires a response, it is not possible to notify the sender of the wrong message about the missing leader session ID. ## ActorGateway refactoring In order to guarantee the similar wrapping behaviour when one sends messages outside of an actor, the former `InstanceGateway` has been refactored to `ActorGateway` and all `ActorRef` interactions have been replaced by `ActorGateway` instances. Only the web server still uses `ActorRefs`, because it is about to be refactored anyway (see #677). However, the PR #677 should be updated accordingly. `AkkaActorGateway` implements the `ActorGateway` and makes sure that all `RequiresLeaderSessionID` messages are wrapped correctly in a `LeaderSessionMessage`. In order to make this happen, the `AkkaActorGateway` is given the current leader session ID upon creation. For any actor interaction from outside of an actor, this class should be used. Using this abstraction will allow us to easily extend the decoration behaviour of messages in the future, too. ## Style formatting The PR also contains some Scala style harmonisation of old Scala code. ## TL;DR In order to support leader session IDs all Flink actors should extend `FlinkUntypedActor` or `FlinkActor` with `LeaderSessionMessages` and `LogMessages` mixins. Whenever a message is sent from within an actor, the message should be decorated by calling `decorateMessage`. When messages are sent from outside of an actor, the `AkkaActorGateway` should be used instead of directly using `ActorRef`. That way, we guarantee the proper wrapping of the messages. You can merge this pull request into a Git repository by running: $ git