[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...

2015-07-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/917


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...

2015-07-23 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/917#issuecomment-124143378
  
Tests pass on Travis. Will merge it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...

2015-07-23 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/917#issuecomment-124038143
  
Will rebase and then merge this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...

2015-07-22 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/917#issuecomment-123755971
  
Addressed the following comments: Corrected order of visibility and 
abstract modifiers. Removed the lazy log field from `FlinkActor`. Now all 
implementing subclasses have to implement it. Made `RequiresLeaderSessionID` a 
Java interface.

All other comments haven been resolved by discussion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...

2015-07-22 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/917#issuecomment-123751721
  
Do you mean `JobManager.getJobManagerGateway`? This is only a temporary 
solution to obtain an `ActorGateway` for the JobManager for which you have to 
know the current leader session ID. This will be changed once HA with ZooKeeper 
is introduced.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...

2015-07-22 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/917#issuecomment-123757309
  
Looks good. +1 to merge this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...

2015-07-22 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/917#issuecomment-123720593
  
The mechanism looks good, all in all.

Some comments:
  - I think it makes the code more understandable, if the 
`decorateMessage()` method would be called something like `attachSession()`, or 
so. Is the decoration used 

  - We have decided to gradually transition the runtime to Java, as this 
mixture of languages is making it very clumsy in many parts. All other changes 
followed the pattern to add new classes only in Java. Are there principle 
reasons to not do this here as well? Especially by adding classes that are at 
the core of this new mechanism (like `RequiresLeaderSessionID`) in Scala, we 
effectively cement this language blend.

  - In prior refactoring, we changed it such that JobManager, TaskManager, 
etc do not use mixins any more. A big part of the decision were clean logs 
and Java interoperability. This patch reverts this effort. Is there any 
principle reason for that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...

2015-07-22 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/917#issuecomment-123723565
  
The `leaderSessionID` in the JobManager is constant. Can it not change over 
time? An example would be when a JobManager looses the ZooKeeper connection 
(network partition) and looses its leader status, but later comes back, and 
regains leader status.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...

2015-07-22 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/917#discussion_r35211408
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+import akka.actor.UntypedActor;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+import java.util.UUID;
+
+/**
+ * Base class for Flink's actors implemented with Java. Actors inheriting 
from this class
+ * automatically log received messages when the debug log level is 
activated. Furthermore,
+ * they filter out {@link LeaderSessionMessage} with the wrong leader 
session ID. If a message
+ * of type {@link RequiresLeaderSessionID} without being wrapped in a 
LeaderSessionMessage is
+ * detected, then an Exception is thrown.
+ *
+ * In order to implement the actor behavior, an implementing subclass has 
to override the method
+ * handleMessage, which defines how messages are processed. Furthermore, 
the subclass has to provide
+ * a leader session ID option which is returned by getLeaderSessionID.
+ */
+abstract public class FlinkUntypedActor extends UntypedActor {
+   protected static Logger LOG = 
LoggerFactory.getLogger(FlinkUntypedActor.class);
+
+   /**
+* This method is called by Akka if a new message has arrived for the 
actor. It logs the
+* processing time of the incoming message if the logging level is set 
to debug. After logging
+* the handleLeaderSessionID method is called.
+*
+* Important: This method cannot be overriden. The actor specific 
message handling logic is
+* implemented by the method handleMessage.
+*
+* @param message Incoming message
+* @throws Exception
+*/
+   @Override
+   public final void onReceive(Object message) throws Exception {
+   if(LOG.isDebugEnabled()) {
+   LOG.debug(Received message {} at {} from {}., 
message, getSelf().path(), getSender());
+
+   long start = System.nanoTime();
+
+   handleLeaderSessionID(message);
+
+   long duration = (System.nanoTime() - start)/ 100;
+
+   LOG.debug(Handled message {} in {} ms from {}., 
message, duration, getSender());
+   } else {
+   handleLeaderSessionID(message);
+   }
+   }
+
+   /**
+* This method filters out {@link LeaderSessionMessage} whose leader 
session ID is not equal
+* to the actors leader session ID. If a message of type {@link 
RequiresLeaderSessionID}
+* arrives, then an Exception is thrown, because these messages have to 
be wrapped in a
+* {@link LeaderSessionMessage}.
+*
+* @param message Incoming message
+* @throws Exception
+*/
+   private void handleLeaderSessionID(Object message) throws Exception {
+   if(message instanceof LeaderSessionMessage) {
+   LeaderSessionMessage msg = (LeaderSessionMessage) 
message;
+
+   if(msg.leaderSessionID().isDefined()  
getLeaderSessionID().isDefined()) {
+   
if(getLeaderSessionID().equals(msg.leaderSessionID())) {
+   // finally call method to handle message
+   handleMessage(msg.message());
+   } else {
+   handleDiscardedMessage(msg);
+   }
+   } else {
+   handleDiscardedMessage(msg);
+   }
+   } else if (message instanceof 

[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...

2015-07-22 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/917#discussion_r35212080
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/FlinkActor.scala ---
@@ -19,34 +19,30 @@
 package org.apache.flink.runtime
 
 import _root_.akka.actor.Actor
-
-/**
- * Mixin to add debug message logging
- */
-trait ActorLogMessages {
-  that: Actor with ActorSynchronousLogging =
-
-  override def receive: Receive = new Actor.Receive {
-private val _receiveWithLogMessages = receiveWithLogMessages
-
-override def isDefinedAt(x: Any): Boolean = 
_receiveWithLogMessages.isDefinedAt(x)
-
-override def apply(x: Any): Unit = {
-  if (!log.isDebugEnabled) {
-_receiveWithLogMessages(x)
-  }
-  else {
-log.debug(sReceived message $x at ${that.self.path} from 
${that.sender()}.)
-
-val start = System.nanoTime()
-
-_receiveWithLogMessages(x)
-
-val duration = (System.nanoTime() - start) / 100
-log.debug(sHandled message $x in $duration ms from 
${that.sender()}.)
-  }
-}
+import grizzled.slf4j.Logger
+
+/** Base trait for Flink's actors.
+  *
+  * The message handling logic is defined in the handleMessage method. 
This allows to mixin
+  * stackable traits which change the message receiving behaviour.
+  */
+trait FlinkActor extends Actor {
+  lazy val log = Logger(getClass)
--- End diff --

Is this log used anywhere?

Lazy variables usually have an extra cost on every access. They need 
strictly volatile accesses. That voids the paradigm of being able to 
inexpensively check whether the log is set to DEBUG, for example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...

2015-07-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/917#discussion_r35214927
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
 ---
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.akka;
+
+import akka.actor.UntypedActor;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+import java.util.UUID;
+
+/**
+ * Base class for Flink's actors implemented with Java. Actors inheriting 
from this class
+ * automatically log received messages when the debug log level is 
activated. Furthermore,
+ * they filter out {@link LeaderSessionMessage} with the wrong leader 
session ID. If a message
+ * of type {@link RequiresLeaderSessionID} without being wrapped in a 
LeaderSessionMessage is
+ * detected, then an Exception is thrown.
+ *
+ * In order to implement the actor behavior, an implementing subclass has 
to override the method
+ * handleMessage, which defines how messages are processed. Furthermore, 
the subclass has to provide
+ * a leader session ID option which is returned by getLeaderSessionID.
+ */
+abstract public class FlinkUntypedActor extends UntypedActor {
+   protected static Logger LOG = 
LoggerFactory.getLogger(FlinkUntypedActor.class);
+
+   /**
+* This method is called by Akka if a new message has arrived for the 
actor. It logs the
+* processing time of the incoming message if the logging level is set 
to debug. After logging
+* the handleLeaderSessionID method is called.
+*
+* Important: This method cannot be overriden. The actor specific 
message handling logic is
+* implemented by the method handleMessage.
+*
+* @param message Incoming message
+* @throws Exception
+*/
+   @Override
+   public final void onReceive(Object message) throws Exception {
+   if(LOG.isDebugEnabled()) {
+   LOG.debug(Received message {} at {} from {}., 
message, getSelf().path(), getSender());
+
+   long start = System.nanoTime();
+
+   handleLeaderSessionID(message);
+
+   long duration = (System.nanoTime() - start)/ 100;
+
+   LOG.debug(Handled message {} in {} ms from {}., 
message, duration, getSender());
+   } else {
+   handleLeaderSessionID(message);
+   }
+   }
+
+   /**
+* This method filters out {@link LeaderSessionMessage} whose leader 
session ID is not equal
+* to the actors leader session ID. If a message of type {@link 
RequiresLeaderSessionID}
+* arrives, then an Exception is thrown, because these messages have to 
be wrapped in a
+* {@link LeaderSessionMessage}.
+*
+* @param message Incoming message
+* @throws Exception
+*/
+   private void handleLeaderSessionID(Object message) throws Exception {
+   if(message instanceof LeaderSessionMessage) {
+   LeaderSessionMessage msg = (LeaderSessionMessage) 
message;
+
+   if(msg.leaderSessionID().isDefined()  
getLeaderSessionID().isDefined()) {
+   
if(getLeaderSessionID().equals(msg.leaderSessionID())) {
+   // finally call method to handle message
+   handleMessage(msg.message());
+   } else {
+   handleDiscardedMessage(msg);
+   }
+   } else {
+   handleDiscardedMessage(msg);
+   }
+   } else if (message instanceof 

[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...

2015-07-22 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/917#discussion_r35216181
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/FlinkActor.scala ---
@@ -19,34 +19,30 @@
 package org.apache.flink.runtime
 
 import _root_.akka.actor.Actor
-
-/**
- * Mixin to add debug message logging
- */
-trait ActorLogMessages {
-  that: Actor with ActorSynchronousLogging =
-
-  override def receive: Receive = new Actor.Receive {
-private val _receiveWithLogMessages = receiveWithLogMessages
-
-override def isDefinedAt(x: Any): Boolean = 
_receiveWithLogMessages.isDefinedAt(x)
-
-override def apply(x: Any): Unit = {
-  if (!log.isDebugEnabled) {
-_receiveWithLogMessages(x)
-  }
-  else {
-log.debug(sReceived message $x at ${that.self.path} from 
${that.sender()}.)
-
-val start = System.nanoTime()
-
-_receiveWithLogMessages(x)
-
-val duration = (System.nanoTime() - start) / 100
-log.debug(sHandled message $x in $duration ms from 
${that.sender()}.)
-  }
-}
+import grizzled.slf4j.Logger
+
+/** Base trait for Flink's actors.
+  *
+  * The message handling logic is defined in the handleMessage method. 
This allows to mixin
+  * stackable traits which change the message receiving behaviour.
+  */
+trait FlinkActor extends Actor {
+  lazy val log = Logger(getClass)
--- End diff --

I think abstract variables just give us function calls instead. Can it be a 
regular variable?
When is the `getClass()` method here actually called?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...

2015-07-22 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/917#issuecomment-123736731
  
Is the `JobManagerGateway` a temporary solution?
If I understood it correctly, in the full-fledged integration with 
Zookeeper, an actor that wants to send a message to the JobManager will look up 
both the leader Akka URL and the leader session ID from Zookeeper.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...

2015-07-22 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/917#discussion_r35215346
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/FlinkActor.scala ---
@@ -19,34 +19,30 @@
 package org.apache.flink.runtime
 
 import _root_.akka.actor.Actor
-
-/**
- * Mixin to add debug message logging
- */
-trait ActorLogMessages {
-  that: Actor with ActorSynchronousLogging =
-
-  override def receive: Receive = new Actor.Receive {
-private val _receiveWithLogMessages = receiveWithLogMessages
-
-override def isDefinedAt(x: Any): Boolean = 
_receiveWithLogMessages.isDefinedAt(x)
-
-override def apply(x: Any): Unit = {
-  if (!log.isDebugEnabled) {
-_receiveWithLogMessages(x)
-  }
-  else {
-log.debug(sReceived message $x at ${that.self.path} from 
${that.sender()}.)
-
-val start = System.nanoTime()
-
-_receiveWithLogMessages(x)
-
-val duration = (System.nanoTime() - start) / 100
-log.debug(sHandled message $x in $duration ms from 
${that.sender()}.)
-  }
-}
+import grizzled.slf4j.Logger
+
+/** Base trait for Flink's actors.
+  *
+  * The message handling logic is defined in the handleMessage method. 
This allows to mixin
+  * stackable traits which change the message receiving behaviour.
+  */
+trait FlinkActor extends Actor {
+  lazy val log = Logger(getClass)
--- End diff --

Yes it is used by the subclasses of `FlinkActor`. By initializing the 
logger lazily, we achieve that the logger gets the right class name from the 
implementing subclass.

But we can also define it abstractly so that the implementing subclass has 
to define it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...

2015-07-22 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/917#issuecomment-123737269
  
* I chose to call the method `decorateMessage` because it not necessarily 
only attaches leader session IDs. In the future we might use this method for 
something other than just wrapping the messages in a `LeaderSessionMessage`. 
Moreover, calling it `attachSession` would contradict the idea of transparency 
here. But we can change it as long as we don't have to do any other decorations.

* The trait `RequiresLeaderSessionID` is only a marker trait which does not 
contain any code. Thus, it is effectively equivalent to a Java interface and  
in fact Java classes can implement it. The only reason I chose a trait instead 
of an interface is that all messages which are sent by the actors are 
implemented as Scala classes. I can change it to be an interface, however I 
don't think that it cements the language blend.

* I agree that it's not necessary to use mixins here. The reason to 
implement the logging and leader session message decoration as a mixin was just 
the beauty of the implementation. But I forgot about the clumsy names in the 
log output. I will include the mixins in the class `FlinkActor`.

* It is true that the leader session ID can change in the case of HA with 
ZooKeeper. However, the ZooKeeper HA is not part of this PR. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2332] [runtime] Adds leader session IDs...

2015-07-16 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/917

[FLINK-2332] [runtime] Adds leader session IDs and registration session IDs

## Registration Session IDs

Introduces registration session IDs for all registration messages. Upon 
receiving a registration message, this ID is checked and if not correct, the 
message is discarded. That way, it is possible to distinguish old registration 
messages which are delayed from valid ones. 

In the current implementation, a static registration session ID is assigned 
when the `TaskManager` actor is created. However, with support for high 
availability, where the leader can change while trying to register at old 
leader, it becomes important to distinguish old from new registration messages.

## Leader Session IDs

In order to support high availability, we not only have to distinguish the 
old from the new registration messages, but also control messages which are 
sent to and from the `JobManager` and the `TaskManager`. In order to filter out 
possibly old messages, this PR introduces a leader session ID which denotes the 
currently valid messages. However, unlike the registration session ID, leader 
session IDs are assigned to messages transparently. 

Messages which extend the `RequiresLeaderSessionID` interface will be 
wrapped in a `LeaderSessionMessage` which also contains the currently known 
leader session ID. At the receiving end, the `LeaderSessionMessages` are 
unpacked and the received leader session ID is compared to the currently stored 
leader session ID. If both IDs are the same, the wrapped message is processed. 
If not, then wrapped message will be discarded.

In order to support this behaviour, the PR introduces a new `FlinkActor` 
for Scala actors and a `FlinkUntypedActor` for Java actors. Both actors provide 
a `decorateMessage` method which allows sub types of the 
`FlinkActor`/`FlinkUntypedActor` to decorate outgoing messages. Therefore, all 
implementing classes are supposed to call `decorateMessage` before sending a 
message to another actor.

The `FlinkUntypedActor` already comes with support for message logging and 
leader session message filtering. Furthermore, its `decorateMessage` method 
implementation checks for each message if it's an subtype of 
`RequiresLeaderSessionID` or not and if it is the case, then wraps this message 
in a `LeaderSessionMessage`. The receive method of this class, will then take 
care to unwrap the messages accordingly.

In order to have the same behavior with Scala actors one has to extend the 
`FlinkActor` and mixin the `LeaderSessionMessages` and `LoggingMessages` 
mixins. They effectively do the same as the `FlinkUntypedActor`, but offer a 
better extensibility of the Scala actors in the future.

In case that a `RequiresLeaderSessionID` message is received without being 
wrapped in a `LeaderSessionMessage` by a FlinkActor, an exception is thrown, 
which effectively terminates the execution of the actor. The reason for this is 
that a unwrapped message might leave the system in an inconsistent state if 
it's a message from an old leader. Furthermore, since not every 
`RequiresLeaderSessionID` message requires a response, it is not possible to 
notify the sender of the wrong message about the missing leader session ID.

## ActorGateway refactoring

In order to guarantee the similar wrapping behaviour when one sends 
messages outside of an actor, the former `InstanceGateway` has been refactored 
to `ActorGateway` and all `ActorRef` interactions have been replaced by 
`ActorGateway` instances. Only the web server still uses `ActorRefs`, because 
it is about to be refactored anyway (see #677). However, the PR #677 should be 
updated accordingly. 

`AkkaActorGateway` implements the `ActorGateway` and makes sure that all 
`RequiresLeaderSessionID` messages are wrapped correctly in a 
`LeaderSessionMessage`. In order to make this happen, the `AkkaActorGateway` is 
given the current leader session ID upon creation. For any actor interaction 
from outside of an actor, this class should be used. Using this abstraction 
will allow us to easily extend the decoration behaviour of messages in the 
future, too. 

## Style formatting

The PR also contains some Scala style harmonisation of old Scala code.

## TL;DR

In order to support leader session IDs all Flink actors should extend 
`FlinkUntypedActor` or `FlinkActor` with `LeaderSessionMessages` and 
`LogMessages` mixins. Whenever a message is sent from within an actor, the 
message should be decorated by calling `decorateMessage`. When messages are 
sent from outside of an actor, the `AkkaActorGateway` should be used instead of 
directly using `ActorRef`. That way, we guarantee the proper wrapping of the 
messages.

You can merge this pull request into a Git repository by running:

$ git