Repository: flink
Updated Branches:
  refs/heads/master 63720634e -> 4046819b3


http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
index 1bd1bfb..35a94cd 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -165,7 +165,7 @@ object StreamJoinOperator {
         }
       }
 
-      return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2))
+      op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2))
         .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
           returnType, op.windowSize, op.slideInterval, op.timeStamp1, 
op.timeStamp2)
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
index 222eb6d..61d7db1 100644
--- 
a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
+++ 
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala
@@ -27,6 +27,8 @@ import org.apache.flink.streaming.api.datastream.{ 
WindowedDataStream => JavaWSt
 import org.apache.flink.streaming.api.datastream.{ SplitDataStream => 
SplitJavaStream }
 import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => 
JavaConStream }
 
+import language.implicitConversions
+
 package object scala {
   // We have this here so that we always have generated TypeInformationS when
   // using the Scala API

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
 
b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
index 7318894..9396b66 100644
--- 
a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
+++ 
b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
@@ -18,12 +18,9 @@
 
 package org.apache.flink.tachyon;
 
-
 import org.apache.commons.io.IOUtils;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.examples.java.wordcount.WordCount;

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
index 1a1098d..35d78eb 100644
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
+++ 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java
@@ -20,7 +20,6 @@ package org.apache.flink.test.util;
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -28,7 +27,9 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.runtime.akka.AkkaUtils;
+
 
 public abstract class AbstractTestBase extends TestBaseUtils {
 
@@ -49,8 +50,7 @@ public abstract class AbstractTestBase extends TestBaseUtils {
                this.config = config;
                this.tempFiles = new ArrayList<File>();
 
-               timeout = new 
FiniteDuration(config.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
-                               ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), 
TimeUnit.SECONDS);
+               timeout = AkkaUtils.getTimeout(config);
        }
 
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 8b6ac6c..804b005 100644
--- 
a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ 
b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -63,7 +63,7 @@ public class TestBaseUtils {
 
        protected static final int DEFAULT_NUM_TASK_MANAGERS = 1;
 
-       protected static final int DEFAULT_AKKA_ASK_TIMEOUT = 1000;
+       protected static final long DEFAULT_AKKA_ASK_TIMEOUT = 1000;
 
        protected static final String DEFAULT_AKKA_STARTUP_TIMEOUT = "60 s";
 
@@ -88,7 +88,7 @@ public class TestBaseUtils {
                config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 
TASK_MANAGER_MEMORY_SIZE);
                config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
taskManagerNumSlots);
                
config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 
numTaskManagers);
-               config.setInteger(ConfigConstants.AKKA_ASK_TIMEOUT, 
DEFAULT_AKKA_ASK_TIMEOUT);
+               config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, 
DEFAULT_AKKA_ASK_TIMEOUT + "s");
                config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, 
DEFAULT_AKKA_STARTUP_TIMEOUT);
                return new ForkableFlinkMiniCluster(config);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
 
b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
index 18d2177..88ce698 100644
--- 
a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
+++ 
b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala
@@ -76,7 +76,7 @@ class ForkableFlinkMiniCluster(userConfiguration: 
Configuration, singleActorSyst
     }
 
     val (connectionInfo, jobManagerAkkaURL, taskManagerConfig, 
networkConnectionConfig) =
-      TaskManager.parseConfiguration(HOSTNAME, config, localExecution)
+      TaskManager.parseConfiguration(HOSTNAME, config, singleActorSystem, 
localExecution)
 
     system.actorOf(Props(new TaskManager(connectionInfo, jobManagerAkkaURL, 
taskManagerConfig,
       networkConnectionConfig) with TestingTaskManager), 
TaskManager.TASK_MANAGER_NAME + index)

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
index 8b6aacb..3138bb6 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java
@@ -115,14 +115,14 @@ public abstract class CancellingTestBase {
                        boolean jobSuccessfullyCancelled = false;
 
                        Future<Object> result = Patterns.ask(client, new 
JobClientMessages.SubmitJobAndWait
-                                       (jobGraph, false), new 
Timeout(AkkaUtils.DEFAULT_TIMEOUT()));
+                                       (jobGraph, false), new 
Timeout(AkkaUtils.getDefaultTimeout()));
 
                        actorSystem.scheduler().scheduleOnce(new 
FiniteDuration(msecsTillCanceling,
                                                        TimeUnit.MILLISECONDS), 
client, new JobManagerMessages.CancelJob(jobGraph.getJobID()),
                                        actorSystem.dispatcher(), 
ActorRef.noSender());
 
                        try {
-                               Await.result(result, 
AkkaUtils.DEFAULT_TIMEOUT());
+                               Await.result(result, 
AkkaUtils.getDefaultTimeout());
                        } catch (JobExecutionException exception) {
                                if (!exception.isJobCanceledByUser()) {
                                        throw new IllegalStateException("Job 
Failed.");

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
index 41c96b0..f0d6b9f 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
@@ -48,58 +48,58 @@ class ClosureCleanerITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase
   }
 
   @After
-  def after: Unit = {
+  def after(): Unit = {
     compareResultsByLinesInMemory(result, resultPath)
   }
 
   @Test
-  def testObject: Unit = {
+  def testObject(): Unit = {
     TestObject.run(resultPath)
     result = "30"
   }
 
   @Test
-  def testClass: Unit = {
+  def testClass(): Unit = {
     val obj = new TestClass
     obj.run(resultPath)
     result = "30"
   }
 
   @Test
-  def testClassWithoutDefaulConstructor: Unit = {
+  def testClassWithoutDefaulConstructor(): Unit = {
     val obj = new TestClassWithoutDefaultConstructor(5)
     obj.run(resultPath)
     result = "30"
   }
 
   @Test
-  def testClassWithoutFieldAccess: Unit = {
+  def testClassWithoutFieldAccess(): Unit = {
     val obj = new TestClassWithoutFieldAccess
     obj.run(resultPath)
     result = "30" // 6 + 7 + 8 + 9
   }
 
   @Test
-  def testObjectWithNesting: Unit = {
+  def testObjectWithNesting(): Unit = {
     TestObjectWithNesting.run(resultPath)
     result = "27"
   }
 
   @Test
-  def testClassWithNesting: Unit = {
+  def testClassWithNesting(): Unit = {
     val obj = new TestClassWithNesting(1)
     obj.run(resultPath)
     result = "27"
   }
 
   @Test
-  def testObjectWithBogusReturns: Unit = {
+  def testObjectWithBogusReturns(): Unit = {
     TestObjectWithBogusReturns.run(resultPath)
     result = "1"
   }
 
   @Test
-  def testObjectWithNestedReturns: Unit = {
+  def testObjectWithNestedReturns(): Unit = {
     TestObjectWithNestedReturns.run(resultPath)
     result = "1"
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
index ae3512a..8839093 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala
@@ -52,7 +52,7 @@ class AggregateITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode
   }
 
   @Test
-  def testFullAggregate: Unit = {
+  def testFullAggregate(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
 
     val ds = CollectionDataSets.get3TupleDataSet(env)
@@ -73,7 +73,7 @@ class AggregateITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode
   }
 
   @Test
-  def testGroupedAggregate: Unit = {
+  def testGroupedAggregate(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env)
 
@@ -93,7 +93,7 @@ class AggregateITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode
   }
 
   @Test
-  def testNestedAggregate: Unit = {
+  def testNestedAggregate(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
index 93ade52..e90cb1b 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala
@@ -49,12 +49,12 @@ class CoGroupITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @After
-  def after: Unit = {
+  def after(): Unit = {
     compareResultsByLinesInMemory(expectedResult, resultPath)
   }
 
   @Test
-  def testCoGroupOnTuplesWithKeyFieldSelector: Unit = {
+  def testCoGroupOnTuplesWithKeyFieldSelector(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get5TupleDataSet(env)
     val ds2 = CollectionDataSets.get5TupleDataSet(env)
@@ -78,7 +78,7 @@ class CoGroupITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testCoGroupOnTwoCustomInputsWithKeyExtractors: Unit = {
+  def testCoGroupOnTwoCustomInputsWithKeyExtractors(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.getCustomTypeDataSet(env)
     val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
@@ -103,7 +103,7 @@ class CoGroupITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testCorrectnessIfCoGroupReturnsLeftInputObjects: Unit = {
+  def testCorrectnessIfCoGroupReturnsLeftInputObjects(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env)
     val ds2 = CollectionDataSets.get3TupleDataSet(env)
@@ -122,7 +122,7 @@ class CoGroupITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testCorrectnessIfCoGroupReturnsRightInputObjects: Unit = {
+  def testCorrectnessIfCoGroupReturnsRightInputObjects(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get5TupleDataSet(env)
     val ds2 = CollectionDataSets.get5TupleDataSet(env)
@@ -141,7 +141,7 @@ class CoGroupITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testCoGroupWithBroadcastVariable: Unit = {
+  def testCoGroupWithBroadcastVariable(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val intDs = CollectionDataSets.getIntDataSet(env)
     val ds = CollectionDataSets.get5TupleDataSet(env)
@@ -182,7 +182,7 @@ class CoGroupITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testCoGroupOnTupleWithKeyFieldSelectorAndCustomTypeWithKeyExtractor: 
Unit = {
+  def testCoGroupOnTupleWithKeyFieldSelectorAndCustomTypeWithKeyExtractor(): 
Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get5TupleDataSet(env)
     val ds2 = CollectionDataSets.getCustomTypeDataSet(env)
@@ -208,7 +208,7 @@ class CoGroupITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testCoGroupOnCustomTypeWithKeyExtractorAndTupleInputKeyFieldSelector: 
Unit = {
+  def testCoGroupOnCustomTypeWithKeyExtractorAndTupleInputKeyFieldSelector(): 
Unit = {
     /*
          * CoGroup on a tuple input with key field selector and a custom type 
input with
          * key extractor
@@ -239,7 +239,7 @@ class CoGroupITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testCoGroupWithMultipleKeyFields: Unit = {
+  def testCoGroupWithMultipleKeyFields(): Unit = {
     /*
         * CoGroup with multiple key fields
         */
@@ -263,7 +263,7 @@ class CoGroupITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testCoGroupWithMultipleKeyExtractors: Unit = {
+  def testCoGroupWithMultipleKeyExtractors(): Unit = {
     /*
         * CoGroup with multiple key extractors
         */
@@ -290,7 +290,7 @@ class CoGroupITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testCoGroupOnTwoCustomTypesUsingExpressionKeys: Unit = {
+  def testCoGroupOnTwoCustomTypesUsingExpressionKeys(): Unit = {
     /*
      * CoGroup on two custom type inputs using expression keys
      */
@@ -317,7 +317,7 @@ class CoGroupITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testCoGroupOnTwoCustomTypesUsingExpressionKeysAndFieldSelector: Unit = {
+  def testCoGroupOnTwoCustomTypesUsingExpressionKeysAndFieldSelector(): Unit = 
{
     /*
      * CoGroup on two custom type inputs using expression keys
      */
@@ -339,7 +339,7 @@ class CoGroupITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testCoGroupFieldSelectorAndKeySelector: Unit = {
+  def testCoGroupFieldSelectorAndKeySelector(): Unit = {
     /*
      * CoGroup field-selector (expression keys) + key selector function
      * The key selector is unnecessary complicated (Tuple1) ;)
@@ -362,7 +362,7 @@ class CoGroupITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testCoGroupKeySelectorAndFieldSelector: Unit = {
+  def testCoGroupKeySelectorAndFieldSelector(): Unit = {
     /*
          * CoGroup field-selector (expression keys) + key selector function
          * The key selector is simple here

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
index 4732b58..17b8252 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala
@@ -48,12 +48,12 @@ class CrossITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @After
-  def after: Unit = {
+  def after(): Unit = {
     compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testCorrectnessOfCrossOnTwoTupleInputs: Unit = {
+  def testCorrectnessOfCrossOnTwoTupleInputs(): Unit = {
     /*
      * check correctness of cross on two tuple inputs
      */
@@ -71,7 +71,7 @@ class CrossITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testCorrectnessOfCrossIfUDFReturnsLeftInput: Unit = {
+  def testCorrectnessOfCrossIfUDFReturnsLeftInput(): Unit = {
     /*
      * check correctness of cross if UDF returns left input object
      */
@@ -87,7 +87,7 @@ class CrossITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testCorrectnessOfCrossIfUDFReturnsRightInput: Unit = {
+  def testCorrectnessOfCrossIfUDFReturnsRightInput(): Unit = {
     /*
      * check correctness of cross if UDF returns right input object
      */
@@ -104,7 +104,7 @@ class CrossITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testCorrectnessOfCrossWithBroadcastSet: Unit = {
+  def testCorrectnessOfCrossWithBroadcastSet(): Unit = {
     /*
      * check correctness of cross with broadcast set
      */
@@ -138,7 +138,7 @@ class CrossITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testCorrectnessOfCrossWithHuge: Unit = {
+  def testCorrectnessOfCrossWithHuge(): Unit = {
     /*
      * check correctness of crossWithHuge (only correctness of result -> 
should be the same
      * as with normal cross)
@@ -156,7 +156,7 @@ class CrossITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testCorrectnessOfCrossWithTiny: Unit = {
+  def testCorrectnessOfCrossWithTiny(): Unit = {
     /*
      * check correctness of crossWithTiny (only correctness of result -> 
should be the same
      * as with normal cross)
@@ -176,7 +176,7 @@ class CrossITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testCorrectnessOfDefaultCross: Unit = {
+  def testCorrectnessOfDefaultCross(): Unit = {
     /*
      * check correctness of default cross
      */
@@ -194,7 +194,7 @@ class CrossITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testCorrectnessOfCrossOnTwoCutomTypeInputs: Unit = {
+  def testCorrectnessOfCrossOnTwoCutomTypeInputs(): Unit = {
     /*
      * check correctness of cross on two custom type inputs
      */
@@ -212,7 +212,7 @@ class CrossITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testCorrectnessOfcrossTupleInputAndCustomTypeInput: Unit = {
+  def testCorrectnessOfcrossTupleInputAndCustomTypeInput(): Unit = {
     /*
      * check correctness of cross a tuple input and a custom type input
      */

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
index 3711347..75cf676 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala
@@ -45,12 +45,12 @@ class DistinctITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @After
-  def after: Unit = {
+  def after(): Unit = {
     compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testCorrectnessOfDistinctOnTuplesWithKeyFieldSelector: Unit = {
+  def testCorrectnessOfDistinctOnTuplesWithKeyFieldSelector(): Unit = {
     /*
      * Check correctness of distinct on tuples with key field selector
      */
@@ -68,7 +68,7 @@ class DistinctITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def 
testCorrectnessOfDistinctOnTuplesWithKeyFieldSelectorNotAllFieldsSelected: Unit 
= {
+  def 
testCorrectnessOfDistinctOnTuplesWithKeyFieldSelectorNotAllFieldsSelected(): 
Unit = {
     /*
      * check correctness of distinct on tuples with key field selector with 
not all fields
      * selected
@@ -84,7 +84,7 @@ class DistinctITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testCorrectnessOfDistinctOnTuplesWithKeyExtractor: Unit ={
+  def testCorrectnessOfDistinctOnTuplesWithKeyExtractor(): Unit ={
     /*
      * check correctness of distinct on tuples with key extractor
      */
@@ -99,7 +99,7 @@ class DistinctITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testCorrectnessOfDistinctOnCustomTypeWithTypeExtractor: Unit = {
+  def testCorrectnessOfDistinctOnCustomTypeWithTypeExtractor(): Unit = {
     /*
      * check correctness of distinct on custom type with type extractor
      */
@@ -114,7 +114,7 @@ class DistinctITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testCorrectnessOfDistinctOnTuples: Unit = {
+  def testCorrectnessOfDistinctOnTuples(): Unit = {
     /*
      * check correctness of distinct on tuples
      */
@@ -129,7 +129,7 @@ class DistinctITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testCorrectnessOfDistinctOnCustomTypeWithTupleReturningTypeExtractor: 
Unit = {
+  def testCorrectnessOfDistinctOnCustomTypeWithTupleReturningTypeExtractor(): 
Unit = {
     /*
      * check correctness of distinct on custom type with tuple-returning type 
extractor
      */
@@ -145,7 +145,7 @@ class DistinctITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testCorrectnessOfDistinctOnTuplesWithFieldExpressions: Unit = {
+  def testCorrectnessOfDistinctOnTuplesWithFieldExpressions(): Unit = {
     /*
      * check correctness of distinct on tuples with field expressions
      */
@@ -160,7 +160,7 @@ class DistinctITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testCorrectnessOfDistinctOnPojos: Unit = {
+  def testCorrectnessOfDistinctOnPojos(): Unit = {
     /*
      * check correctness of distinct on Pojos
      */

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
index 770759b..58ce51b 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala
@@ -76,12 +76,12 @@ class ExamplesITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @After
-  def after: Unit = {
+  def after(): Unit = {
     compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testNestesdTuplesWithIntOffset: Unit = {
+  def testNestesdTuplesWithIntOffset(): Unit = {
     /*
      * Test nested tuples with int offset
      */
@@ -95,7 +95,7 @@ class ExamplesITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testNestedTuplesWithExpressionKeys: Unit = {
+  def testNestedTuplesWithExpressionKeys(): Unit = {
     /*
      * Test nested tuples with expression keys
      */
@@ -111,7 +111,7 @@ class ExamplesITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testNestedPojos: Unit = {
+  def testNestedPojos(): Unit = {
     /*
      * Test nested pojos
      */
@@ -132,7 +132,7 @@ class ExamplesITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testPojoWithNestedCaseClass: Unit = {
+  def testPojoWithNestedCaseClass(): Unit = {
     /*
      * Test pojo with nested case class
      */

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
index 13954e8..d0a4ee4 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala
@@ -47,12 +47,12 @@ class FilterITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @After
-  def after: Unit = {
+  def after(): Unit = {
     compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testAllRejectingFilter: Unit = {
+  def testAllRejectingFilter(): Unit = {
     /*
      * Test all-rejecting filter.
      */
@@ -65,7 +65,7 @@ class FilterITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testAllPassingFilter: Unit = {
+  def testAllPassingFilter(): Unit = {
     /*
      * Test all-passing filter.
      */
@@ -83,7 +83,7 @@ class FilterITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testFilterOnStringTupleField: Unit = {
+  def testFilterOnStringTupleField(): Unit = {
     /*
      * Test filter on String tuple field.
      */
@@ -96,7 +96,7 @@ class FilterITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testFilterOnIntegerTupleField: Unit = {
+  def testFilterOnIntegerTupleField(): Unit = {
     /*
      * Test filter on Integer tuple field.
      */
@@ -111,7 +111,7 @@ class FilterITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testFilterBasicType: Unit = {
+  def testFilterBasicType(): Unit = {
     /*
      * Test filter on basic type
      */
@@ -124,7 +124,7 @@ class FilterITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testFilterOnCustomType: Unit = {
+  def testFilterOnCustomType(): Unit = {
     /*
      * Test filter on custom type
      */
@@ -137,7 +137,7 @@ class FilterITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testRichFilterOnStringTupleField: Unit = {
+  def testRichFilterOnStringTupleField(): Unit = {
     /*
      * Test filter on String tuple field.
      */

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
index de07cb5..3409b48 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala
@@ -44,12 +44,12 @@ class FirstNITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @After
-  def after: Unit = {
+  def after(): Unit = {
     compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testFirstNOnUngroupedDataSet: Unit = {
+  def testFirstNOnUngroupedDataSet(): Unit = {
     /*
      * First-n on ungrouped data set
      */
@@ -62,7 +62,7 @@ class FirstNITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testFirstNOnGroupedDataSet: Unit = {
+  def testFirstNOnGroupedDataSet(): Unit = {
     /*
      * First-n on grouped data set
      */
@@ -75,7 +75,7 @@ class FirstNITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testFirstNOnGroupedAndSortedDataSet: Unit = {
+  def testFirstNOnGroupedAndSortedDataSet(): Unit = {
     /*
      * First-n on grouped and sorted data set
      */

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
index d09ce1f..71ad28f 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala
@@ -49,12 +49,12 @@ class FlatMapITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @After
-  def after: Unit = {
+  def after(): Unit = {
     compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testNonPassingFlatMap: Unit = {
+  def testNonPassingFlatMap(): Unit = {
     /*
      * Test non-passing flatmap
      */
@@ -67,7 +67,7 @@ class FlatMapITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testDataDuplicatingFlatMap: Unit = {
+  def testDataDuplicatingFlatMap(): Unit = {
     /*
      * Test data duplicating flatmap
      */
@@ -83,7 +83,7 @@ class FlatMapITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testFlatMapWithVaryingNumberOfEmittedTuples: Unit = {
+  def testFlatMapWithVaryingNumberOfEmittedTuples(): Unit = {
     /*
      * Test flatmap with varying number of emitted tuples
      */
@@ -105,7 +105,7 @@ class FlatMapITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testTypeConversionFlatMapperCustomToTuple: Unit = {
+  def testTypeConversionFlatMapperCustomToTuple(): Unit = {
     /*
      * Test type conversion flatmapper (Custom -> Tuple)
      */
@@ -123,7 +123,7 @@ class FlatMapITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testTypeConversionFlatMapperTupleToBasic: Unit = {
+  def testTypeConversionFlatMapperTupleToBasic(): Unit = {
     /*
          * Test type conversion flatmapper (Tuple -> Basic)
          */
@@ -140,7 +140,7 @@ class FlatMapITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testFlatMapperIfUDFReturnsInputObjectMultipleTimesWhileChangingIt: Unit 
= {
+  def testFlatMapperIfUDFReturnsInputObjectMultipleTimesWhileChangingIt(): 
Unit = {
     /*
      * Test flatmapper if UDF returns input object
      * multiple times and changes it in between
@@ -168,7 +168,7 @@ class FlatMapITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode)
   }
 
   @Test
-  def testFlatMapWithBroadcastSet: Unit = {
+  def testFlatMapWithBroadcastSet(): Unit = {
     /*
      * Test flatmap with broadcast set
      */

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
index ce3bba3..3f62133 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala
@@ -56,14 +56,14 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @After
-  def after: Unit = {
+  def after(): Unit = {
     if(expected != null) {
       compareResultsByLinesInMemory(expected, resultPath)
     }
   }
 
   @Test
-  def testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelector: Unit = {
+  def testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelector(): Unit = {
     /*
      * check correctness of groupReduce on tuples with key field selector
      */
@@ -79,7 +79,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testCorrectnessOfGroupReduceOnTuplesWithMultipleKeyFieldSelector: Unit = 
{
+  def testCorrectnessOfGroupReduceOnTuplesWithMultipleKeyFieldSelector(): Unit 
= {
     /*
      * check correctness of groupReduce on tuples with multiple key field 
selector
      */
@@ -101,7 +101,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelectorAndGroupSorting: 
Unit = {
+  def 
testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelectorAndGroupSorting(): Unit 
= {
     /*
      * check correctness of groupReduce on tuples with key field selector and 
group sorting
      */
@@ -123,7 +123,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testCorrectnessOfGroupReduceOnTuplesWithKeyExtractor: Unit = {
+  def testCorrectnessOfGroupReduceOnTuplesWithKeyExtractor(): Unit = {
     /*
      * check correctness of groupReduce on tuples with key extractor
      */
@@ -139,7 +139,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testCorrectnessOfGroupReduceOnCustomTypeWithTypeExtractor: Unit = {
+  def testCorrectnessOfGroupReduceOnCustomTypeWithTypeExtractor(): Unit = {
     /*
      * check correctness of groupReduce on custom type with type extractor
      */
@@ -168,7 +168,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testCorrectnessOfAllGroupReduceForTuples: Unit = {
+  def testCorrectnessOfAllGroupReduceForTuples(): Unit = {
     /*
      * check correctness of all-groupreduce for tuples
      */
@@ -190,7 +190,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testCorrectnessOfAllGroupReduceForCustomTypes: Unit = {
+  def testCorrectnessOfAllGroupReduceForCustomTypes(): Unit = {
     /*
      * check correctness of all-groupreduce for custom types
      */
@@ -211,7 +211,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testCorrectnessOfGroupReduceWithBroadcastSet: Unit = {
+  def testCorrectnessOfGroupReduceWithBroadcastSet(): Unit = {
     /*
      * check correctness of groupReduce with broadcast set
      */
@@ -245,7 +245,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def 
testCorrectnessOfGroupReduceIfUDFReturnsInputObjectMultipleTimesWhileChangingIt:
 Unit = {
+  def 
testCorrectnessOfGroupReduceIfUDFReturnsInputObjectMultipleTimesWhileChangingIt():
 Unit = {
     /*
      * check correctness of groupReduce if UDF returns input objects multiple 
times and
      * changes it in between
@@ -273,7 +273,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testCorrectnessOfGroupReduceOnCustomTypeWithKeyExtractorAndCombine: Unit 
= {
+  def testCorrectnessOfGroupReduceOnCustomTypeWithKeyExtractorAndCombine(): 
Unit = {
     /*
      * check correctness of groupReduce on custom type with key extractor and 
combine
      */
@@ -293,7 +293,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testCorrectnessOfGroupReduceOnTuplesWithCombine: Unit = {
+  def testCorrectnessOfGroupReduceOnTuplesWithCombine(): Unit = {
     /*
      * check correctness of groupReduce on tuples with combine
      */
@@ -314,7 +314,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testCorrectnessOfAllGroupReduceForTuplesWithCombine: Unit = {
+  def testCorrectnessOfAllGroupReduceForTuplesWithCombine(): Unit = {
     /*
      * check correctness of all-groupreduce for tuples with combine
      */
@@ -336,7 +336,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testCorrectnessOfGroupReduceWithDescendingGroupSort: Unit = {
+  def testCorrectnessOfGroupReduceWithDescendingGroupSort(): Unit = {
     /*
      * check correctness of groupReduce with descending group sort
      */
@@ -357,7 +357,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testCorrectnessOfGroupReduceOnTuplesWithTupleReturningKeySelector: Unit 
= {
+  def testCorrectnessOfGroupReduceOnTuplesWithTupleReturningKeySelector(): 
Unit = {
     /*
      * check correctness of groupReduce on tuples with tuple-returning key 
selector
      */
@@ -380,7 +380,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testInputOfCombinerIsSortedForCombinableGroupReduceWithGroupSorting: 
Unit = {
+  def testInputOfCombinerIsSortedForCombinableGroupReduceWithGroupSorting(): 
Unit = {
     /*
      * check that input of combiner is also sorted for combinable groupReduce 
with group
      * sorting
@@ -400,7 +400,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testDeepNestingAndNullValueInPojo: Unit = {
+  def testDeepNestingAndNullValueInPojo(): Unit = {
     /*
      * Deep nesting test
      * + null value in pojo
@@ -425,7 +425,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testPojoContainigAWritableAndTuples: Unit = {
+  def testPojoContainigAWritableAndTuples(): Unit = {
     /*
      * Test Pojo containing a Writable and Tuples
      */
@@ -450,7 +450,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testTupleContainingPojosAndRegularFields: Unit ={
+  def testTupleContainingPojosAndRegularFields(): Unit ={
     /*
      * Test Tuple containing pojos and regular fields
      */
@@ -472,7 +472,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testStringBasedDefinitionOnGroupSort: Unit = {
+  def testStringBasedDefinitionOnGroupSort(): Unit = {
     /*
      * Test string-based definition on group sort, based on test:
      * check correctness of groupReduce with descending group sort
@@ -495,7 +495,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testIntBasedDefinitionOnGroupSortForFullNestedTuple: Unit = {
+  def testIntBasedDefinitionOnGroupSortForFullNestedTuple(): Unit = {
     /*
      * Test int-based definition on group sort, for (full) nested Tuple
      */
@@ -510,7 +510,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testIntBasedDefinitionOnGroupSortForPartialNestedTuple: Unit = {
+  def testIntBasedDefinitionOnGroupSortForPartialNestedTuple(): Unit = {
     /*
      * Test int-based definition on group sort, for (partial) nested Tuple ASC
      */
@@ -527,7 +527,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testStringBasedDefinitionOnGroupSortForPartialNestedTuple: Unit = {
+  def testStringBasedDefinitionOnGroupSortForPartialNestedTuple(): Unit = {
     /*
      * Test string-based definition on group sort, for (partial) nested Tuple 
DESC
      */
@@ -544,7 +544,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testStringBasedDefinitionOnGroupSortForTwoGroupingKeys: Unit = {
+  def testStringBasedDefinitionOnGroupSortForTwoGroupingKeys(): Unit = {
     /*
      * Test string-based definition on group sort, for two grouping keys
      */
@@ -561,7 +561,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
   }
 
   @Test
-  def testStringBasedDefinitionOnGroupSortForTwoGroupingKeysWithPojos: Unit = {
+  def testStringBasedDefinitionOnGroupSortForTwoGroupingKeysWithPojos(): Unit 
= {
     /*
      * Test string-based definition on group sort, for two grouping keys with 
Pojos
      */
@@ -725,18 +725,19 @@ class GroupReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mo
     env.setDegreeOfParallelism(1)
     val ds =  CollectionDataSets.getPojoWithMultiplePojos(env)
     val reduceDs =  ds.groupBy("p2.a2")
-      .reduceGroup(
-        new GroupReduceFunction[CollectionDataSets.PojoWithMultiplePojos, 
String] {
-          def reduce(
-                      values: 
Iterable[CollectionDataSets.PojoWithMultiplePojos],
-                      out: Collector[String]) {
-            val concat: StringBuilder = new StringBuilder
-            for (value <- values.asScala) {
-              concat.append(value.p2.a2)
-            }
-            out.collect(concat.toString())
+      .reduceGroup {
+      new GroupReduceFunction[CollectionDataSets.PojoWithMultiplePojos, 
String] {
+        def reduce(
+                    values: Iterable[CollectionDataSets.PojoWithMultiplePojos],
+                    out: Collector[String]) {
+          val concat: StringBuilder = new StringBuilder
+          for (value <- values.asScala) {
+            concat.append(value.p2.a2)
           }
-        })
+          out.collect(concat.toString())
+        }
+      }
+    }
     reduceDs.writeAsText(resultPath, WriteMode.OVERWRITE)
     env.execute()
     expected = "b\nccc\nee\n"

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
index 8ae8674..cf13a42 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala
@@ -47,12 +47,12 @@ class JoinITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @After
-  def after: Unit = {
+  def after(): Unit = {
     compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testUDFJoinOnTuplesWithKeyFieldPositions: Unit = {
+  def testUDFJoinOnTuplesWithKeyFieldPositions(): Unit = {
     /*
      * UDF Join on tuples with key field positions
      */
@@ -66,7 +66,7 @@ class JoinITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testUDFJoinOnTuplesWithMultipleKeyFieldPositions: Unit = {
+  def testUDFJoinOnTuplesWithMultipleKeyFieldPositions(): Unit = {
     /*
      * UDF Join on tuples with multiple key field positions
      */
@@ -81,7 +81,7 @@ class JoinITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testDefaultJoinOnTuples: Unit = {
+  def testDefaultJoinOnTuples(): Unit = {
     /*
      * Default Join on tuples
      */
@@ -96,7 +96,7 @@ class JoinITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testJoinWithHuge: Unit = {
+  def testJoinWithHuge(): Unit = {
     /*
      * Join with Huge
      */
@@ -110,7 +110,7 @@ class JoinITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testJoinWithTiny: Unit = {
+  def testJoinWithTiny(): Unit = {
     /*
      * Join with Tiny
      */
@@ -124,7 +124,7 @@ class JoinITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testJoinThatReturnsTheLeftInputObject: Unit = {
+  def testJoinThatReturnsTheLeftInputObject(): Unit = {
     /*
      * Join that returns the left input object
      */
@@ -138,7 +138,7 @@ class JoinITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testJoinThatReturnsTheRightInputObject: Unit = {
+  def testJoinThatReturnsTheRightInputObject(): Unit = {
     /*
      * Join that returns the right input object
      */
@@ -152,7 +152,7 @@ class JoinITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testJoinWithBroadcastSet: Unit ={
+  def testJoinWithBroadcastSet(): Unit ={
     /*
      * Join with broadcast set
      */
@@ -186,7 +186,7 @@ class JoinITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def 
testJoinOnCustomTypeInputWithKeyExtractorAndTupleInputWithKeyFieldSelector: 
Unit = {
+  def 
testJoinOnCustomTypeInputWithKeyExtractorAndTupleInputWithKeyFieldSelector(): 
Unit = {
     /*
      * Join on a tuple input with key field selector and a custom type input 
with key extractor
      */
@@ -200,7 +200,7 @@ class JoinITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def 
testJoinOnTupleInputWithKeyFieldSelectorAndCustomTypeInputWithKeyExtractor: 
Unit = {
+  def 
testJoinOnTupleInputWithKeyFieldSelectorAndCustomTypeInputWithKeyExtractor(): 
Unit = {
     /*
      * Join on a tuple input with key field selector and a custom type input 
with key extractor
      */
@@ -214,7 +214,7 @@ class JoinITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testDefaultJoinOnTwoCustomTypeInputsWithKeyExtractors: Unit = {
+  def testDefaultJoinOnTwoCustomTypeInputsWithKeyExtractors(): Unit = {
     /*
      * (Default) Join on two custom type inputs with key extractors
      */
@@ -229,7 +229,7 @@ class JoinITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testUDFJoinOnTuplesWithTupleReturningKeySelectors: Unit = {
+  def testUDFJoinOnTuplesWithTupleReturningKeySelectors(): Unit = {
     /*
      * UDF Join on tuples with tuple-returning key selectors
      */
@@ -246,7 +246,7 @@ class JoinITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testNestedPojoAgainstTupleAsString: Unit = {
+  def testNestedPojoAgainstTupleAsString(): Unit = {
     /*
      * Join nested pojo against tuple (selected using a string)
      */
@@ -262,7 +262,7 @@ class JoinITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testJoinNestedPojoAgainstTupleAsInteger: Unit = {
+  def testJoinNestedPojoAgainstTupleAsInteger(): Unit = {
     /*
      * Join nested pojo against tuple (selected as an integer)
      */
@@ -278,7 +278,7 @@ class JoinITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testSelectingMultipleFieldsUsingExpressionLanguage: Unit = {
+  def testSelectingMultipleFieldsUsingExpressionLanguage(): Unit = {
     /*
      * selecting multiple fields using expression language
      */
@@ -297,7 +297,7 @@ class JoinITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testNestedIntoTuple: Unit = {
+  def testNestedIntoTuple(): Unit = {
     /*
      * nested into tuple
      */
@@ -315,7 +315,7 @@ class JoinITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testNestedIntoTupleIntoPojo: Unit = {
+  def testNestedIntoTupleIntoPojo(): Unit = {
     /*
      * nested into tuple into pojo
      */
@@ -336,7 +336,7 @@ class JoinITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testNonPojoFullTuple: Unit = {
+  def testNonPojoFullTuple(): Unit = {
     /*
     * Non-POJO test to verify that full-tuple keys are working.
     */
@@ -353,7 +353,7 @@ class JoinITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testNonPojoNestedTuple: Unit = {
+  def testNonPojoNestedTuple(): Unit = {
     /*
      * Non-POJO test to verify "nested" tuple-element selection.
      */
@@ -369,7 +369,7 @@ class JoinITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testFullPojoWithFullTuple: Unit = {
+  def testFullPojoWithFullTuple(): Unit = {
     /*
      * full pojo with full tuple
      */

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
index 78426ef..4c74ca0 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala
@@ -48,12 +48,12 @@ class MapITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @After
-  def after: Unit = {
+  def after(): Unit = {
     compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testIdentityMapperWithBasicType: Unit = {
+  def testIdentityMapperWithBasicType(): Unit = {
     /*
      * Test identity map with basic type
      */
@@ -67,7 +67,7 @@ class MapITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testIdentityMapperWithTuple: Unit = {
+  def testIdentityMapperWithTuple(): Unit = {
     /*
      * Test identity map with a tuple
      */
@@ -85,7 +85,7 @@ class MapITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testTypeConversionMapperCustomToTuple: Unit = {
+  def testTypeConversionMapperCustomToTuple(): Unit = {
     /*
      * Test type conversion mapper (Custom -> Tuple)
      */
@@ -103,7 +103,7 @@ class MapITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testTypeConversionMapperTupleToBasic: Unit = {
+  def testTypeConversionMapperTupleToBasic(): Unit = {
     /*
      * Test type conversion mapper (Tuple -> Basic)
      */
@@ -120,7 +120,7 @@ class MapITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testMapperOnTupleIncrementFieldReorderSecondAndThirdFields: Unit = {
+  def testMapperOnTupleIncrementFieldReorderSecondAndThirdFields(): Unit = {
     /*
      * Test mapper on tuple - Increment Integer field, reorder second and 
third fields
      */
@@ -138,7 +138,7 @@ class MapITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testMapperOnCustomLowercaseString: Unit = {
+  def testMapperOnCustomLowercaseString(): Unit = {
     /*
      * Test mapper on Custom - lowercase myString
      */
@@ -156,7 +156,7 @@ class MapITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testMapperIfUDFReturnsInputObjectIncrementFirstFieldOfTuple: Unit = {
+  def testMapperIfUDFReturnsInputObjectIncrementFirstFieldOfTuple(): Unit = {
     /*
      * Test mapper if UDF returns input object - increment first field of a 
tuple
      */
@@ -176,7 +176,7 @@ class MapITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testMapWithBroadcastSet: Unit = {
+  def testMapWithBroadcastSet(): Unit = {
     /*
      * Test map with broadcast set
      */
@@ -205,7 +205,7 @@ class MapITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testPassingConfigurationObject: Unit = {
+  def testPassingConfigurationObject(): Unit = {
     /*
      * Test passing configuration object.
      */

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
index 35c0e93..f21fe11 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala
@@ -45,12 +45,12 @@ class PartitionITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode
   }
 
   @After
-  def after: Unit = {
+  def after(): Unit = {
     compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testHashPartitionByTupleField: Unit = {
+  def testHashPartitionByTupleField(): Unit = {
     /*
      * Test hash partition by tuple field
      */
@@ -66,7 +66,7 @@ class PartitionITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode
   }
 
   @Test
-  def testHashPartitionByKeySelector: Unit = {
+  def testHashPartitionByKeySelector(): Unit = {
     /*
      * Test hash partition by key selector
      */
@@ -80,7 +80,7 @@ class PartitionITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode
   }
 
   @Test
-  def testForcedRebalancing: Unit = {
+  def testForcedRebalancing(): Unit = {
     /*
      * Test forced rebalancing
      */
@@ -111,7 +111,7 @@ class PartitionITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode
   }
 
   @Test
-  def testMapPartitionAfterRepartitionHasCorrectDOP: Unit = {
+  def testMapPartitionAfterRepartitionHasCorrectDOP(): Unit = {
     // Verify that mapPartition operation after repartition picks up correct
     // DOP
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -129,7 +129,7 @@ class PartitionITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode
   }
 
   @Test
-  def testMapAfterRepartitionHasCorrectDOP: Unit = {
+  def testMapAfterRepartitionHasCorrectDOP(): Unit = {
     // Verify that map operation after repartition picks up correct
     // DOP
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -157,7 +157,7 @@ class PartitionITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode
   }
 
   @Test
-  def testFilterAfterRepartitionHasCorrectDOP: Unit = {
+  def testFilterAfterRepartitionHasCorrectDOP(): Unit = {
     // Verify that filter operation after repartition picks up correct
     // DOP
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -186,7 +186,7 @@ class PartitionITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode
   }
 
   @Test
-  def testPartitionNestedPojo: Unit = {
+  def testPartitionNestedPojo(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     env.setDegreeOfParallelism(3)
     val ds = CollectionDataSets.getDuplicatePojoDataSet(env)

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
index 5f63dc4..c2cd20a 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala
@@ -48,12 +48,12 @@ class ReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @After
-  def after: Unit = {
+  def after(): Unit = {
     compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testReduceOnTuplesWithKeyFieldSelector: Unit = {
+  def testReduceOnTuplesWithKeyFieldSelector(): Unit = {
     /*
      * Reduce on tuples with key field selector
      */
@@ -67,7 +67,7 @@ class ReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testReduceOnTuplesWithMultipleKeyFieldSelectors: Unit = {
+  def testReduceOnTuplesWithMultipleKeyFieldSelectors(): Unit = {
     /*
      * Reduce on tuples with multiple key field selectors
      */
@@ -83,7 +83,7 @@ class ReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testReduceOnTuplesWithKeyExtractor: Unit = {
+  def testReduceOnTuplesWithKeyExtractor(): Unit = {
     /*
      * Reduce on tuples with key extractor
      */
@@ -97,7 +97,7 @@ class ReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testReduceOnCustomTypeWithKeyExtractor: Unit = {
+  def testReduceOnCustomTypeWithKeyExtractor(): Unit = {
     /*
      * Reduce on custom type with key extractor
      */
@@ -116,7 +116,7 @@ class ReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testAllReduceForTuple: Unit = {
+  def testAllReduceForTuple(): Unit = {
     /*
      * All-reduce for tuple
      */
@@ -130,7 +130,7 @@ class ReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testAllReduceForCustomTypes: Unit = {
+  def testAllReduceForCustomTypes(): Unit = {
     /*
      * All-reduce for custom types
      */
@@ -149,7 +149,7 @@ class ReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testReduceWithBroadcastSet: Unit = {
+  def testReduceWithBroadcastSet(): Unit = {
     /*
      * Reduce with broadcast set
      */
@@ -177,7 +177,7 @@ class ReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testReduceWithUDFThatReturnsTheSecondInputObject: Unit = {
+  def testReduceWithUDFThatReturnsTheSecondInputObject(): Unit = {
     /*
      * Reduce with UDF that returns the second input object (check mutable 
object handling)
      */
@@ -201,7 +201,7 @@ class ReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testReduceWithATupleReturningKeySelector: Unit = {
+  def testReduceWithATupleReturningKeySelector(): Unit = {
     /*
      * Reduce with a Tuple-returning KeySelector
      */
@@ -217,7 +217,7 @@ class ReduceITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testReduceOnGroupedDSByExpressionKey: Unit = {
+  def testReduceOnGroupedDSByExpressionKey(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get5TupleDataSet(env)
     val reduceDs = ds.groupBy("_5", "_1")

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
index 5e456e0..6071569 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala
@@ -46,12 +46,12 @@ class SumMinMaxITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode
   }
 
   @After
-  def after: Unit = {
+  def after(): Unit = {
     compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testFullAggregate: Unit = {
+  def testFullAggregate(): Unit = {
     // Full aggregate
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env)
@@ -71,7 +71,7 @@ class SumMinMaxITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode
   }
 
   @Test
-  def testGroupedAggregate: Unit = {
+  def testGroupedAggregate(): Unit = {
     // Grouped aggregate
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env)
@@ -91,7 +91,7 @@ class SumMinMaxITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode
   }
 
   @Test
-  def testNestedAggregate: Unit = {
+  def testNestedAggregate(): Unit = {
     // Nested aggregate
     val env = ExecutionEnvironment.getExecutionEnvironment
     val ds = CollectionDataSets.get3TupleDataSet(env)

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
index 3eed128..255cce9 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala
@@ -50,12 +50,12 @@ class UnionITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @After
-  def after: Unit = {
+  def after(): Unit = {
     compareResultsByLinesInMemory(expected, resultPath)
   }
 
   @Test
-  def testUnionOf2IdenticalDS: Unit = {
+  def testUnionOf2IdenticalDS(): Unit = {
     /*
      * Union of 2 Same Data Sets
      */
@@ -68,7 +68,7 @@ class UnionITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testUnionOf5IdenticalDSWithMultipleUnions: Unit = {
+  def testUnionOf5IdenticalDSWithMultipleUnions(): Unit = {
     /*
      * Union of 5 same Data Sets, with multiple unions
      */
@@ -86,7 +86,7 @@ class UnionITCase(mode: ExecutionMode) extends 
MultipleProgramsTestBase(mode) {
   }
 
   @Test
-  def testUnionWithEmptyDS: Unit = {
+  def testUnionWithEmptyDS(): Unit = {
     /*
      * Test on union with empty dataset
      */

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/PairComparatorTestBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/PairComparatorTestBase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/PairComparatorTestBase.scala
index 4dcef0a..d3d52b0 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/PairComparatorTestBase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/PairComparatorTestBase.scala
@@ -33,7 +33,7 @@ abstract class PairComparatorTestBase[T, R] {
   @Test
   def testEqualityWithReference(): Unit = {
     try {
-      val comparator = getComparator(true)
+      val comparator = getComparator(ascending = true)
       
       val (dataT, dataR) = getSortedData
       for (i <- 0 until dataT.length) {
@@ -51,8 +51,8 @@ abstract class PairComparatorTestBase[T, R] {
 
   @Test
   def testInequalityWithReference(): Unit = {
-    testGreatSmallAscDescWithReference(true)
-    testGreatSmallAscDescWithReference(false)
+    testGreatSmallAscDescWithReference(ascending = true)
+    testGreatSmallAscDescWithReference(ascending = false)
   }
 
   protected def testGreatSmallAscDescWithReference(ascending: Boolean) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index b66841a..a557f02 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -21,7 +21,6 @@ package org.apache.flink.yarn;
 import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.FlinkYarnSessionCli;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
index 273439e..3f2e72e 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java
@@ -25,6 +25,7 @@ import static akka.pattern.Patterns.ask;
 import akka.actor.Props;
 import akka.util.Timeout;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.AkkaUtils$;
 import org.apache.flink.runtime.net.NetUtils;
 import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
@@ -43,6 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.None$;
 import scala.Some;
+import scala.Tuple2;
 import scala.concurrent.Await;
 import scala.concurrent.Awaitable;
 import scala.concurrent.Future;
@@ -95,7 +97,8 @@ public class FlinkYarnCluster extends 
AbstractFlinkYarnCluster {
                // start actor system
                LOG.info("Start actor system.");
                InetAddress ownHostname = 
NetUtils.resolveAddress(jobManagerAddress); // find name of own public 
interface, able to connect to the JM
-               actorSystem = 
YarnUtils.createActorSystem(ownHostname.getCanonicalHostName(), 0, 
GlobalConfiguration.getConfiguration()); // set port automatically.
+               actorSystem = 
AkkaUtils.createActorSystem(GlobalConfiguration.getConfiguration(),
+                               new Some(new Tuple2<String, 
Integer>(ownHostname.getCanonicalHostName(), 0)));
 
                // start application client
                LOG.info("Start application client.");

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
index 3f1cc23..aea9727 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import scala.Tuple2;
 
-
 public class YarnTaskManagerRunner {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(YarnTaskManagerRunner.class);
@@ -60,9 +59,7 @@ public class YarnTaskManagerRunner {
                        @Override
                        public Object run() {
                                try {
-                                       Tuple2<ActorSystem, ActorRef> tuple = 
YarnUtils
-                                                       
.startActorSystemAndTaskManager(newArgs);
-
+                                       Tuple2<ActorSystem, ActorRef> tuple = 
YarnUtils.startActorSystemAndTaskManager(newArgs);
                                        tuple._1().awaitTermination();
                                } catch (Exception e) {
                                        LOG.error("Error while running the 
TaskManager", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
index 73349b2..390835c 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala
@@ -30,9 +30,9 @@ import org.apache.flink.yarn.Messages._
 import scala.collection.mutable
 import scala.concurrent.duration._
 
-class ApplicationClient
+import scala.language.postfixOps
 
-  extends Actor with ActorLogMessages with ActorLogging {
+class ApplicationClient extends Actor with ActorLogMessages with ActorLogging {
   import context._
 
   val INITIAL_POLLING_DELAY = 0 seconds
@@ -50,8 +50,7 @@ class ApplicationClient
   override def preStart(): Unit = {
     super.preStart()
 
-    timeout = new 
FiniteDuration(GlobalConfiguration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT,
-      ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS)
+    timeout = AkkaUtils.getTimeout(GlobalConfiguration.getConfiguration())
   }
 
   override def postStop(): Unit = {
@@ -65,12 +64,12 @@ class ApplicationClient
 
   override def receiveWithLogMessages: Receive = {
     // ----------------------------- Registration -> Status updates -> 
shutdown ----------------
-    case LocalRegisterClient(address: String) => {
+    case LocalRegisterClient(address: String) =>
       val jmAkkaUrl = JobManager.getRemoteAkkaURL(address)
 
       yarnJobManager = Some(AkkaUtils.getReference(jmAkkaUrl)(system, timeout))
       yarnJobManager match {
-        case Some(jm) => {
+        case Some(jm) =>
           // the message came from the FlinkYarnCluster. We send the message 
to the JobManager.
           // it is important not to forward the message because the JobManager 
is storing the
           // sender as the Application Client (this class).
@@ -80,19 +79,18 @@ class ApplicationClient
           // request the number of task managers and slots from the job manager
           pollingTimer = 
Some(context.system.scheduler.schedule(INITIAL_POLLING_DELAY,
             WAIT_FOR_YARN_INTERVAL, yarnJobManager.get, PollYarnClusterStatus))
-        }
         case None => throw new RuntimeException("Registration at 
JobManager/ApplicationMaster " +
           "failed. Job Manager RPC connection has not properly been 
initialized")
       }
-    }
-    case msg: StopYarnSession => {
+
+    case msg: StopYarnSession =>
       log.info("Stop yarn session.")
       stopMessageReceiver = Some(sender())
       yarnJobManager foreach {
         _ forward msg
       }
-    }
-    case JobManagerStopped => {
+
+    case JobManagerStopped =>
       log.info("Remote JobManager has been stopped successfully. " +
         "Stopping local application client")
       stopMessageReceiver foreach {
@@ -100,27 +98,27 @@ class ApplicationClient
       }
       // stop ourselves
       context.system.shutdown()
-    }
+
 
     // handle the responses from the PollYarnClusterStatus messages to the 
yarn job mgr
-    case status: FlinkYarnClusterStatus => {
+    case status: FlinkYarnClusterStatus =>
       latestClusterStatus = Some(status)
-    }
+
 
     // locally get cluster status
-    case LocalGetYarnClusterStatus => {
+    case LocalGetYarnClusterStatus =>
       sender() ! latestClusterStatus
-    }
+
 
     // -----------------  handle messages from the cluster -------------------
     // receive remote messages
-    case msg: YarnMessage => {
+    case msg: YarnMessage =>
       messagesQueue.enqueue(msg)
-    }
+
     // locally forward messages
-    case LocalGetYarnMessage => {
+    case LocalGetYarnMessage =>
       sender() ! (if( messagesQueue.size == 0) None else messagesQueue.dequeue)
-    }
+
     case _ =>
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
index 529686f..31a4b09 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala
@@ -24,6 +24,7 @@ import java.security.PrivilegedAction
 import akka.actor._
 import org.apache.flink.client.CliFrontend
 import org.apache.flink.configuration.ConfigConstants
+import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.jobmanager.{WithWebServer, JobManager}
 import org.apache.flink.yarn.Messages.StartYarnSession
 import org.apache.hadoop.security.UserGroupInformation
@@ -45,7 +46,7 @@ object ApplicationMaster {
   def main(args: Array[String]): Unit ={
     val yarnClientUsername = System.getenv(FlinkYarnClient.ENV_CLIENT_USERNAME)
     LOG.info(s"YARN daemon runs as 
${UserGroupInformation.getCurrentUser.getShortUserName}" +
-      s" setting user to execute Flink ApplicationMaster/JobManager to 
${yarnClientUsername}")
+      s"' setting user to execute Flink ApplicationMaster/JobManager to 
$yarnClientUsername'")
 
     val ugi = UserGroupInformation.createRemoteUser(yarnClientUsername)
 
@@ -171,8 +172,8 @@ object ApplicationMaster {
     LOG.info("Start job manager for yarn")
     val args = Array[String]("--configDir", currDir)
 
-    LOG.info(s"Config path: ${currDir}.")
-    val (_, _, configuration, _) = JobManager.parseArgs(args)
+    LOG.info(s"Config path: $currDir.")
+    val (configuration, _, _) = JobManager.parseArgs(args)
 
     // add dynamic properties to JobManager configuration.
     val dynamicProperties = 
CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString)
@@ -183,7 +184,7 @@ object ApplicationMaster {
     configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 
jobManagerWebPort)
 
     // set port to 0 to let Akka automatically determine the port.
-    implicit val jobManagerSystem = YarnUtils.createActorSystem(hostname, port 
= 0, configuration)
+   implicit val jobManagerSystem = AkkaUtils.createActorSystem(configuration, 
Some((hostname, 0)))
 
     LOG.info("Start job manager actor.")
     (jobManagerSystem, JobManager.startActor(Props(new 
JobManager(configuration) with

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 7e1ce6f..c8dbbf1 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -20,7 +20,7 @@ package org.apache.flink.yarn
 
 import java.io.{IOException, File}
 import java.nio.ByteBuffer
-import java.util.{ Collections}
+import java.util.Collections
 
 import akka.actor.ActorRef
 import org.apache.flink.configuration.ConfigConstants
@@ -41,6 +41,7 @@ import 
org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
 import org.apache.hadoop.yarn.util.Records
 
 import scala.concurrent.duration._
+import scala.language.postfixOps
 
 
 trait YarnJobManager extends ActorLogMessages {
@@ -100,7 +101,7 @@ trait YarnJobManager extends ActorLogMessages {
       sender() ! new 
FlinkYarnClusterStatus(instanceManager.getNumberOfRegisteredTaskManagers,
         instanceManager.getTotalNumberOfSlots)
 
-    case StartYarnSession(conf, actorSystemPort: Int) => {
+    case StartYarnSession(conf, actorSystemPort: Int) =>
       log.info("Start yarn session.")
       val memoryPerTaskManager = env.get(FlinkYarnClient.ENV_TM_MEMORY).toInt
       val heapLimit = Utils.calculateHeapSize(memoryPerTaskManager)
@@ -109,7 +110,7 @@ trait YarnJobManager extends ActorLogMessages {
       require(applicationMasterHost != null, s"Application master 
(${Environment.NM_HOST} not set.")
 
       numTaskManager = env.get(FlinkYarnClient.ENV_TM_COUNT).toInt
-      log.info(s"Requesting ${numTaskManager} task managers.")
+      log.info(s"Requesting $numTaskManager task managers.")
 
       val remoteFlinkJarPath = env.get(FlinkYarnClient.FLINK_JAR_PATH)
       val fs = FileSystem.get(conf)
@@ -194,34 +195,31 @@ trait YarnJobManager extends ActorLogMessages {
         yarnClientUsername, conf, taskManagerLocalResources))
 
       context.system.scheduler.scheduleOnce(ALLOCATION_DELAY, self, 
PollContainerCompletion)
-    }
 
-    case PollContainerCompletion => {
+    case PollContainerCompletion =>
       rmClientOption match {
-        case Some(rmClient) => {
+        case Some(rmClient) =>
           val response = rmClient.allocate(completedContainers.toFloat / 
numTaskManager)
 
           for (container <- response.getAllocatedContainers.asScala) {
             log.info(s"Got new container for TM ${container.getId} on host ${
-              container.getNodeId.getHost}")
+              container.getNodeId.getHost
+            }")
 
             allocatedContainers += 1
 
             log.info(s"Launching container #$allocatedContainers.")
             nmClientOption match {
-              case Some(nmClient) => {
+              case Some(nmClient) =>
                 containerLaunchContext match {
                   case Some(ctx) => nmClient.startContainer(container, ctx)
-                  case None => {
+                  case None =>
                     log.error("The ContainerLaunchContext was not set.")
                     self ! StopYarnSession(FinalApplicationStatus.FAILED)
-                  }
                 }
-              }
-              case None => {
+              case None =>
                 log.error("The NMClient was not set.")
                 self ! StopYarnSession(FinalApplicationStatus.FAILED)
-              }
             }
           }
 
@@ -244,13 +242,10 @@ trait YarnJobManager extends ActorLogMessages {
           } else {
             self ! StopYarnSession(FinalApplicationStatus.FAILED)
           }
-        }
-        case None => {
+        case None =>
           log.error("The AMRMClient was not set.")
           self ! StopYarnSession(FinalApplicationStatus.FAILED)
-        }
       }
-    }
   }
 
   def createContainerLaunchContext(heapLimit: Int, hasLogback: Boolean, 
hasLog4j: Boolean,

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
index b596946..e5012ef 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala
@@ -30,9 +30,8 @@ trait YarnTaskManager extends ActorLogMessages {
   }
 
   def receiveYarnMessages: Receive = {
-    case StopYarnSession(status) => {
+    case StopYarnSession(status) =>
       log.info(s"Stopping YARN TaskManager with final application status 
$status")
       context.system.shutdown()
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
index 185190d..775fcd0 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.yarn
 
 import akka.actor.{Props, ActorRef, ActorSystem}
-import com.typesafe.config.ConfigFactory
+import com.typesafe.config.{Config, ConfigFactory}
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.taskmanager.TaskManager
@@ -29,54 +29,14 @@ object YarnUtils {
 
   val LOG = LoggerFactory.getLogger(this.getClass)
 
-  def createActorSystem(hostname: String, port: Int, configuration: 
Configuration): ActorSystem = {
-    val akkaConfig = 
ConfigFactory.parseString(AkkaUtils.getConfigString(hostname, port,
-      configuration) + getConfigString)
-
-    AkkaUtils.createActorSystem(akkaConfig)
-  }
-
-  def createActorSystem(): ActorSystem = {
-    val akkaConfig = 
ConfigFactory.parseString(AkkaUtils.getDefaultActorSystemConfigString +
-      getConfigString)
-
-    AkkaUtils.createActorSystem(akkaConfig)
-  }
-
-  def getConfigString: String = {
-    """
-    |akka{
-    |  loglevel = "DEBUG"
-    |  stdout-loglevel = "DEBUG"
-    |  log-dead-letters-during-shutdown = off
-    |  log-dead-letters = off
-    |
-    |  actor {
-    |    provider = "akka.remote.RemoteActorRefProvider"
-    |  }
-    |
-    |  remote{
-    |    log-remote-lifecycle-events = off
-    |
-    |    netty{
-    |      tcp{
-    |        transport-class = "akka.remote.transport.netty.NettyTransport"
-    |        tcp-nodelay = on
-    |        maximum-frame-size = 1MB
-    |        execution-pool-size = 4
-    |      }
-    |    }
-    |  }
-    |}""".stripMargin
-  }
-
   def startActorSystemAndTaskManager(args: Array[String]): (ActorSystem, 
ActorRef) = {
     val (hostname, port, config) = TaskManager.parseArgs(args)
 
-    val actorSystem = createActorSystem(hostname, port, config)
+    val actorSystem = AkkaUtils.createActorSystem(config, Some((hostname, 
port)))
 
     val (connectionInfo, jobManagerURL, taskManagerConfig, 
networkConnectionConfiguration) =
-      TaskManager.parseConfiguration(hostname, config, false)
+      TaskManager.parseConfiguration(hostname, config, localAkkaCommunication 
= false,
+        localTaskManagerCommunication = false)
 
     (actorSystem, TaskManager.startActor(Props(new TaskManager(connectionInfo, 
jobManagerURL,
       taskManagerConfig, networkConnectionConfiguration) with 
YarnTaskManager))(actorSystem))

http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTests.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTests.java 
b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTests.java
index c8d639a..1d01b03 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTests.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTests.java
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.yarn;
 
-import org.apache.hadoop.fs.Path;
 import org.junit.Assert;
 import org.junit.Test;
 

Reply via email to