Repository: flink Updated Branches: refs/heads/master 63720634e -> 4046819b3
http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala index 1bd1bfb..35a94cd 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala @@ -165,7 +165,7 @@ object StreamJoinOperator { } } - return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2)) + op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2)) .addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)), returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2) } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala index 222eb6d..61d7db1 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala @@ -27,6 +27,8 @@ import org.apache.flink.streaming.api.datastream.{ WindowedDataStream => JavaWSt import org.apache.flink.streaming.api.datastream.{ SplitDataStream => SplitJavaStream } import org.apache.flink.streaming.api.datastream.{ ConnectedDataStream => JavaConStream } +import language.implicitConversions + package object scala { // We have this here so that we always have generated TypeInformationS when // using the Scala API http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java index 7318894..9396b66 100644 --- a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java +++ b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java @@ -18,12 +18,9 @@ package org.apache.flink.tachyon; - import org.apache.commons.io.IOUtils; import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.api.java.io.AvroOutputFormat; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.examples.java.wordcount.WordCount; http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java index 1a1098d..35d78eb 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java @@ -20,7 +20,6 @@ package org.apache.flink.test.util; import com.google.common.base.Charsets; import com.google.common.io.Files; -import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import scala.concurrent.duration.FiniteDuration; @@ -28,7 +27,9 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; + +import org.apache.flink.runtime.akka.AkkaUtils; + public abstract class AbstractTestBase extends TestBaseUtils { @@ -49,8 +50,7 @@ public abstract class AbstractTestBase extends TestBaseUtils { this.config = config; this.tempFiles = new ArrayList<File>(); - timeout = new FiniteDuration(config.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT, - ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS); + timeout = AkkaUtils.getTimeout(config); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index 8b6ac6c..804b005 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -63,7 +63,7 @@ public class TestBaseUtils { protected static final int DEFAULT_NUM_TASK_MANAGERS = 1; - protected static final int DEFAULT_AKKA_ASK_TIMEOUT = 1000; + protected static final long DEFAULT_AKKA_ASK_TIMEOUT = 1000; protected static final String DEFAULT_AKKA_STARTUP_TIMEOUT = "60 s"; @@ -88,7 +88,7 @@ public class TestBaseUtils { config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots); config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, numTaskManagers); - config.setInteger(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT); + config.setString(ConfigConstants.AKKA_ASK_TIMEOUT, DEFAULT_AKKA_ASK_TIMEOUT + "s"); config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT); return new ForkableFlinkMiniCluster(config); } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala index 18d2177..88ce698 100644 --- a/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala +++ b/flink-test-utils/src/main/scala/org/apache/flink/test/util/ForkableFlinkMiniCluster.scala @@ -76,7 +76,7 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSyst } val (connectionInfo, jobManagerAkkaURL, taskManagerConfig, networkConnectionConfig) = - TaskManager.parseConfiguration(HOSTNAME, config, localExecution) + TaskManager.parseConfiguration(HOSTNAME, config, singleActorSystem, localExecution) system.actorOf(Props(new TaskManager(connectionInfo, jobManagerAkkaURL, taskManagerConfig, networkConnectionConfig) with TestingTaskManager), TaskManager.TASK_MANAGER_NAME + index) http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java index 8b6aacb..3138bb6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/cancelling/CancellingTestBase.java @@ -115,14 +115,14 @@ public abstract class CancellingTestBase { boolean jobSuccessfullyCancelled = false; Future<Object> result = Patterns.ask(client, new JobClientMessages.SubmitJobAndWait - (jobGraph, false), new Timeout(AkkaUtils.DEFAULT_TIMEOUT())); + (jobGraph, false), new Timeout(AkkaUtils.getDefaultTimeout())); actorSystem.scheduler().scheduleOnce(new FiniteDuration(msecsTillCanceling, TimeUnit.MILLISECONDS), client, new JobManagerMessages.CancelJob(jobGraph.getJobID()), actorSystem.dispatcher(), ActorRef.noSender()); try { - Await.result(result, AkkaUtils.DEFAULT_TIMEOUT()); + Await.result(result, AkkaUtils.getDefaultTimeout()); } catch (JobExecutionException exception) { if (!exception.isJobCanceledByUser()) { throw new IllegalStateException("Job Failed."); http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala index 41c96b0..f0d6b9f 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala @@ -48,58 +48,58 @@ class ClosureCleanerITCase(mode: ExecutionMode) extends MultipleProgramsTestBase } @After - def after: Unit = { + def after(): Unit = { compareResultsByLinesInMemory(result, resultPath) } @Test - def testObject: Unit = { + def testObject(): Unit = { TestObject.run(resultPath) result = "30" } @Test - def testClass: Unit = { + def testClass(): Unit = { val obj = new TestClass obj.run(resultPath) result = "30" } @Test - def testClassWithoutDefaulConstructor: Unit = { + def testClassWithoutDefaulConstructor(): Unit = { val obj = new TestClassWithoutDefaultConstructor(5) obj.run(resultPath) result = "30" } @Test - def testClassWithoutFieldAccess: Unit = { + def testClassWithoutFieldAccess(): Unit = { val obj = new TestClassWithoutFieldAccess obj.run(resultPath) result = "30" // 6 + 7 + 8 + 9 } @Test - def testObjectWithNesting: Unit = { + def testObjectWithNesting(): Unit = { TestObjectWithNesting.run(resultPath) result = "27" } @Test - def testClassWithNesting: Unit = { + def testClassWithNesting(): Unit = { val obj = new TestClassWithNesting(1) obj.run(resultPath) result = "27" } @Test - def testObjectWithBogusReturns: Unit = { + def testObjectWithBogusReturns(): Unit = { TestObjectWithBogusReturns.run(resultPath) result = "1" } @Test - def testObjectWithNestedReturns: Unit = { + def testObjectWithNestedReturns(): Unit = { TestObjectWithNestedReturns.run(resultPath) result = "1" } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala index ae3512a..8839093 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/AggregateITCase.scala @@ -52,7 +52,7 @@ class AggregateITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode } @Test - def testFullAggregate: Unit = { + def testFullAggregate(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val ds = CollectionDataSets.get3TupleDataSet(env) @@ -73,7 +73,7 @@ class AggregateITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode } @Test - def testGroupedAggregate: Unit = { + def testGroupedAggregate(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val ds = CollectionDataSets.get3TupleDataSet(env) @@ -93,7 +93,7 @@ class AggregateITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode } @Test - def testNestedAggregate: Unit = { + def testNestedAggregate(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val ds = CollectionDataSets.get3TupleDataSet(env) http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala index 93ade52..e90cb1b 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CoGroupITCase.scala @@ -49,12 +49,12 @@ class CoGroupITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @After - def after: Unit = { + def after(): Unit = { compareResultsByLinesInMemory(expectedResult, resultPath) } @Test - def testCoGroupOnTuplesWithKeyFieldSelector: Unit = { + def testCoGroupOnTuplesWithKeyFieldSelector(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val ds = CollectionDataSets.get5TupleDataSet(env) val ds2 = CollectionDataSets.get5TupleDataSet(env) @@ -78,7 +78,7 @@ class CoGroupITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testCoGroupOnTwoCustomInputsWithKeyExtractors: Unit = { + def testCoGroupOnTwoCustomInputsWithKeyExtractors(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val ds = CollectionDataSets.getCustomTypeDataSet(env) val ds2 = CollectionDataSets.getCustomTypeDataSet(env) @@ -103,7 +103,7 @@ class CoGroupITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testCorrectnessIfCoGroupReturnsLeftInputObjects: Unit = { + def testCorrectnessIfCoGroupReturnsLeftInputObjects(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val ds = CollectionDataSets.get3TupleDataSet(env) val ds2 = CollectionDataSets.get3TupleDataSet(env) @@ -122,7 +122,7 @@ class CoGroupITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testCorrectnessIfCoGroupReturnsRightInputObjects: Unit = { + def testCorrectnessIfCoGroupReturnsRightInputObjects(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val ds = CollectionDataSets.get5TupleDataSet(env) val ds2 = CollectionDataSets.get5TupleDataSet(env) @@ -141,7 +141,7 @@ class CoGroupITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testCoGroupWithBroadcastVariable: Unit = { + def testCoGroupWithBroadcastVariable(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val intDs = CollectionDataSets.getIntDataSet(env) val ds = CollectionDataSets.get5TupleDataSet(env) @@ -182,7 +182,7 @@ class CoGroupITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testCoGroupOnTupleWithKeyFieldSelectorAndCustomTypeWithKeyExtractor: Unit = { + def testCoGroupOnTupleWithKeyFieldSelectorAndCustomTypeWithKeyExtractor(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val ds = CollectionDataSets.get5TupleDataSet(env) val ds2 = CollectionDataSets.getCustomTypeDataSet(env) @@ -208,7 +208,7 @@ class CoGroupITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testCoGroupOnCustomTypeWithKeyExtractorAndTupleInputKeyFieldSelector: Unit = { + def testCoGroupOnCustomTypeWithKeyExtractorAndTupleInputKeyFieldSelector(): Unit = { /* * CoGroup on a tuple input with key field selector and a custom type input with * key extractor @@ -239,7 +239,7 @@ class CoGroupITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testCoGroupWithMultipleKeyFields: Unit = { + def testCoGroupWithMultipleKeyFields(): Unit = { /* * CoGroup with multiple key fields */ @@ -263,7 +263,7 @@ class CoGroupITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testCoGroupWithMultipleKeyExtractors: Unit = { + def testCoGroupWithMultipleKeyExtractors(): Unit = { /* * CoGroup with multiple key extractors */ @@ -290,7 +290,7 @@ class CoGroupITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testCoGroupOnTwoCustomTypesUsingExpressionKeys: Unit = { + def testCoGroupOnTwoCustomTypesUsingExpressionKeys(): Unit = { /* * CoGroup on two custom type inputs using expression keys */ @@ -317,7 +317,7 @@ class CoGroupITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testCoGroupOnTwoCustomTypesUsingExpressionKeysAndFieldSelector: Unit = { + def testCoGroupOnTwoCustomTypesUsingExpressionKeysAndFieldSelector(): Unit = { /* * CoGroup on two custom type inputs using expression keys */ @@ -339,7 +339,7 @@ class CoGroupITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testCoGroupFieldSelectorAndKeySelector: Unit = { + def testCoGroupFieldSelectorAndKeySelector(): Unit = { /* * CoGroup field-selector (expression keys) + key selector function * The key selector is unnecessary complicated (Tuple1) ;) @@ -362,7 +362,7 @@ class CoGroupITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testCoGroupKeySelectorAndFieldSelector: Unit = { + def testCoGroupKeySelectorAndFieldSelector(): Unit = { /* * CoGroup field-selector (expression keys) + key selector function * The key selector is simple here http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala index 4732b58..17b8252 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/CrossITCase.scala @@ -48,12 +48,12 @@ class CrossITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @After - def after: Unit = { + def after(): Unit = { compareResultsByLinesInMemory(expected, resultPath) } @Test - def testCorrectnessOfCrossOnTwoTupleInputs: Unit = { + def testCorrectnessOfCrossOnTwoTupleInputs(): Unit = { /* * check correctness of cross on two tuple inputs */ @@ -71,7 +71,7 @@ class CrossITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testCorrectnessOfCrossIfUDFReturnsLeftInput: Unit = { + def testCorrectnessOfCrossIfUDFReturnsLeftInput(): Unit = { /* * check correctness of cross if UDF returns left input object */ @@ -87,7 +87,7 @@ class CrossITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testCorrectnessOfCrossIfUDFReturnsRightInput: Unit = { + def testCorrectnessOfCrossIfUDFReturnsRightInput(): Unit = { /* * check correctness of cross if UDF returns right input object */ @@ -104,7 +104,7 @@ class CrossITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testCorrectnessOfCrossWithBroadcastSet: Unit = { + def testCorrectnessOfCrossWithBroadcastSet(): Unit = { /* * check correctness of cross with broadcast set */ @@ -138,7 +138,7 @@ class CrossITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testCorrectnessOfCrossWithHuge: Unit = { + def testCorrectnessOfCrossWithHuge(): Unit = { /* * check correctness of crossWithHuge (only correctness of result -> should be the same * as with normal cross) @@ -156,7 +156,7 @@ class CrossITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testCorrectnessOfCrossWithTiny: Unit = { + def testCorrectnessOfCrossWithTiny(): Unit = { /* * check correctness of crossWithTiny (only correctness of result -> should be the same * as with normal cross) @@ -176,7 +176,7 @@ class CrossITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testCorrectnessOfDefaultCross: Unit = { + def testCorrectnessOfDefaultCross(): Unit = { /* * check correctness of default cross */ @@ -194,7 +194,7 @@ class CrossITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testCorrectnessOfCrossOnTwoCutomTypeInputs: Unit = { + def testCorrectnessOfCrossOnTwoCutomTypeInputs(): Unit = { /* * check correctness of cross on two custom type inputs */ @@ -212,7 +212,7 @@ class CrossITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testCorrectnessOfcrossTupleInputAndCustomTypeInput: Unit = { + def testCorrectnessOfcrossTupleInputAndCustomTypeInput(): Unit = { /* * check correctness of cross a tuple input and a custom type input */ http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala index 3711347..75cf676 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/DistinctITCase.scala @@ -45,12 +45,12 @@ class DistinctITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @After - def after: Unit = { + def after(): Unit = { compareResultsByLinesInMemory(expected, resultPath) } @Test - def testCorrectnessOfDistinctOnTuplesWithKeyFieldSelector: Unit = { + def testCorrectnessOfDistinctOnTuplesWithKeyFieldSelector(): Unit = { /* * Check correctness of distinct on tuples with key field selector */ @@ -68,7 +68,7 @@ class DistinctITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testCorrectnessOfDistinctOnTuplesWithKeyFieldSelectorNotAllFieldsSelected: Unit = { + def testCorrectnessOfDistinctOnTuplesWithKeyFieldSelectorNotAllFieldsSelected(): Unit = { /* * check correctness of distinct on tuples with key field selector with not all fields * selected @@ -84,7 +84,7 @@ class DistinctITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testCorrectnessOfDistinctOnTuplesWithKeyExtractor: Unit ={ + def testCorrectnessOfDistinctOnTuplesWithKeyExtractor(): Unit ={ /* * check correctness of distinct on tuples with key extractor */ @@ -99,7 +99,7 @@ class DistinctITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testCorrectnessOfDistinctOnCustomTypeWithTypeExtractor: Unit = { + def testCorrectnessOfDistinctOnCustomTypeWithTypeExtractor(): Unit = { /* * check correctness of distinct on custom type with type extractor */ @@ -114,7 +114,7 @@ class DistinctITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testCorrectnessOfDistinctOnTuples: Unit = { + def testCorrectnessOfDistinctOnTuples(): Unit = { /* * check correctness of distinct on tuples */ @@ -129,7 +129,7 @@ class DistinctITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testCorrectnessOfDistinctOnCustomTypeWithTupleReturningTypeExtractor: Unit = { + def testCorrectnessOfDistinctOnCustomTypeWithTupleReturningTypeExtractor(): Unit = { /* * check correctness of distinct on custom type with tuple-returning type extractor */ @@ -145,7 +145,7 @@ class DistinctITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testCorrectnessOfDistinctOnTuplesWithFieldExpressions: Unit = { + def testCorrectnessOfDistinctOnTuplesWithFieldExpressions(): Unit = { /* * check correctness of distinct on tuples with field expressions */ @@ -160,7 +160,7 @@ class DistinctITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testCorrectnessOfDistinctOnPojos: Unit = { + def testCorrectnessOfDistinctOnPojos(): Unit = { /* * check correctness of distinct on Pojos */ http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala index 770759b..58ce51b 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ExamplesITCase.scala @@ -76,12 +76,12 @@ class ExamplesITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @After - def after: Unit = { + def after(): Unit = { compareResultsByLinesInMemory(expected, resultPath) } @Test - def testNestesdTuplesWithIntOffset: Unit = { + def testNestesdTuplesWithIntOffset(): Unit = { /* * Test nested tuples with int offset */ @@ -95,7 +95,7 @@ class ExamplesITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testNestedTuplesWithExpressionKeys: Unit = { + def testNestedTuplesWithExpressionKeys(): Unit = { /* * Test nested tuples with expression keys */ @@ -111,7 +111,7 @@ class ExamplesITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testNestedPojos: Unit = { + def testNestedPojos(): Unit = { /* * Test nested pojos */ @@ -132,7 +132,7 @@ class ExamplesITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testPojoWithNestedCaseClass: Unit = { + def testPojoWithNestedCaseClass(): Unit = { /* * Test pojo with nested case class */ http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala index 13954e8..d0a4ee4 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FilterITCase.scala @@ -47,12 +47,12 @@ class FilterITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @After - def after: Unit = { + def after(): Unit = { compareResultsByLinesInMemory(expected, resultPath) } @Test - def testAllRejectingFilter: Unit = { + def testAllRejectingFilter(): Unit = { /* * Test all-rejecting filter. */ @@ -65,7 +65,7 @@ class FilterITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testAllPassingFilter: Unit = { + def testAllPassingFilter(): Unit = { /* * Test all-passing filter. */ @@ -83,7 +83,7 @@ class FilterITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testFilterOnStringTupleField: Unit = { + def testFilterOnStringTupleField(): Unit = { /* * Test filter on String tuple field. */ @@ -96,7 +96,7 @@ class FilterITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testFilterOnIntegerTupleField: Unit = { + def testFilterOnIntegerTupleField(): Unit = { /* * Test filter on Integer tuple field. */ @@ -111,7 +111,7 @@ class FilterITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testFilterBasicType: Unit = { + def testFilterBasicType(): Unit = { /* * Test filter on basic type */ @@ -124,7 +124,7 @@ class FilterITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testFilterOnCustomType: Unit = { + def testFilterOnCustomType(): Unit = { /* * Test filter on custom type */ @@ -137,7 +137,7 @@ class FilterITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testRichFilterOnStringTupleField: Unit = { + def testRichFilterOnStringTupleField(): Unit = { /* * Test filter on String tuple field. */ http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala index de07cb5..3409b48 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FirstNITCase.scala @@ -44,12 +44,12 @@ class FirstNITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @After - def after: Unit = { + def after(): Unit = { compareResultsByLinesInMemory(expected, resultPath) } @Test - def testFirstNOnUngroupedDataSet: Unit = { + def testFirstNOnUngroupedDataSet(): Unit = { /* * First-n on ungrouped data set */ @@ -62,7 +62,7 @@ class FirstNITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testFirstNOnGroupedDataSet: Unit = { + def testFirstNOnGroupedDataSet(): Unit = { /* * First-n on grouped data set */ @@ -75,7 +75,7 @@ class FirstNITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testFirstNOnGroupedAndSortedDataSet: Unit = { + def testFirstNOnGroupedAndSortedDataSet(): Unit = { /* * First-n on grouped and sorted data set */ http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala index d09ce1f..71ad28f 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/FlatMapITCase.scala @@ -49,12 +49,12 @@ class FlatMapITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @After - def after: Unit = { + def after(): Unit = { compareResultsByLinesInMemory(expected, resultPath) } @Test - def testNonPassingFlatMap: Unit = { + def testNonPassingFlatMap(): Unit = { /* * Test non-passing flatmap */ @@ -67,7 +67,7 @@ class FlatMapITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testDataDuplicatingFlatMap: Unit = { + def testDataDuplicatingFlatMap(): Unit = { /* * Test data duplicating flatmap */ @@ -83,7 +83,7 @@ class FlatMapITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testFlatMapWithVaryingNumberOfEmittedTuples: Unit = { + def testFlatMapWithVaryingNumberOfEmittedTuples(): Unit = { /* * Test flatmap with varying number of emitted tuples */ @@ -105,7 +105,7 @@ class FlatMapITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testTypeConversionFlatMapperCustomToTuple: Unit = { + def testTypeConversionFlatMapperCustomToTuple(): Unit = { /* * Test type conversion flatmapper (Custom -> Tuple) */ @@ -123,7 +123,7 @@ class FlatMapITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testTypeConversionFlatMapperTupleToBasic: Unit = { + def testTypeConversionFlatMapperTupleToBasic(): Unit = { /* * Test type conversion flatmapper (Tuple -> Basic) */ @@ -140,7 +140,7 @@ class FlatMapITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testFlatMapperIfUDFReturnsInputObjectMultipleTimesWhileChangingIt: Unit = { + def testFlatMapperIfUDFReturnsInputObjectMultipleTimesWhileChangingIt(): Unit = { /* * Test flatmapper if UDF returns input object * multiple times and changes it in between @@ -168,7 +168,7 @@ class FlatMapITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) } @Test - def testFlatMapWithBroadcastSet: Unit = { + def testFlatMapWithBroadcastSet(): Unit = { /* * Test flatmap with broadcast set */ http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala index ce3bba3..3f62133 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/GroupReduceITCase.scala @@ -56,14 +56,14 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @After - def after: Unit = { + def after(): Unit = { if(expected != null) { compareResultsByLinesInMemory(expected, resultPath) } } @Test - def testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelector: Unit = { + def testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelector(): Unit = { /* * check correctness of groupReduce on tuples with key field selector */ @@ -79,7 +79,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testCorrectnessOfGroupReduceOnTuplesWithMultipleKeyFieldSelector: Unit = { + def testCorrectnessOfGroupReduceOnTuplesWithMultipleKeyFieldSelector(): Unit = { /* * check correctness of groupReduce on tuples with multiple key field selector */ @@ -101,7 +101,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelectorAndGroupSorting: Unit = { + def testCorrectnessOfGroupReduceOnTuplesWithKeyFieldSelectorAndGroupSorting(): Unit = { /* * check correctness of groupReduce on tuples with key field selector and group sorting */ @@ -123,7 +123,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testCorrectnessOfGroupReduceOnTuplesWithKeyExtractor: Unit = { + def testCorrectnessOfGroupReduceOnTuplesWithKeyExtractor(): Unit = { /* * check correctness of groupReduce on tuples with key extractor */ @@ -139,7 +139,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testCorrectnessOfGroupReduceOnCustomTypeWithTypeExtractor: Unit = { + def testCorrectnessOfGroupReduceOnCustomTypeWithTypeExtractor(): Unit = { /* * check correctness of groupReduce on custom type with type extractor */ @@ -168,7 +168,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testCorrectnessOfAllGroupReduceForTuples: Unit = { + def testCorrectnessOfAllGroupReduceForTuples(): Unit = { /* * check correctness of all-groupreduce for tuples */ @@ -190,7 +190,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testCorrectnessOfAllGroupReduceForCustomTypes: Unit = { + def testCorrectnessOfAllGroupReduceForCustomTypes(): Unit = { /* * check correctness of all-groupreduce for custom types */ @@ -211,7 +211,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testCorrectnessOfGroupReduceWithBroadcastSet: Unit = { + def testCorrectnessOfGroupReduceWithBroadcastSet(): Unit = { /* * check correctness of groupReduce with broadcast set */ @@ -245,7 +245,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testCorrectnessOfGroupReduceIfUDFReturnsInputObjectMultipleTimesWhileChangingIt: Unit = { + def testCorrectnessOfGroupReduceIfUDFReturnsInputObjectMultipleTimesWhileChangingIt(): Unit = { /* * check correctness of groupReduce if UDF returns input objects multiple times and * changes it in between @@ -273,7 +273,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testCorrectnessOfGroupReduceOnCustomTypeWithKeyExtractorAndCombine: Unit = { + def testCorrectnessOfGroupReduceOnCustomTypeWithKeyExtractorAndCombine(): Unit = { /* * check correctness of groupReduce on custom type with key extractor and combine */ @@ -293,7 +293,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testCorrectnessOfGroupReduceOnTuplesWithCombine: Unit = { + def testCorrectnessOfGroupReduceOnTuplesWithCombine(): Unit = { /* * check correctness of groupReduce on tuples with combine */ @@ -314,7 +314,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testCorrectnessOfAllGroupReduceForTuplesWithCombine: Unit = { + def testCorrectnessOfAllGroupReduceForTuplesWithCombine(): Unit = { /* * check correctness of all-groupreduce for tuples with combine */ @@ -336,7 +336,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testCorrectnessOfGroupReduceWithDescendingGroupSort: Unit = { + def testCorrectnessOfGroupReduceWithDescendingGroupSort(): Unit = { /* * check correctness of groupReduce with descending group sort */ @@ -357,7 +357,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testCorrectnessOfGroupReduceOnTuplesWithTupleReturningKeySelector: Unit = { + def testCorrectnessOfGroupReduceOnTuplesWithTupleReturningKeySelector(): Unit = { /* * check correctness of groupReduce on tuples with tuple-returning key selector */ @@ -380,7 +380,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testInputOfCombinerIsSortedForCombinableGroupReduceWithGroupSorting: Unit = { + def testInputOfCombinerIsSortedForCombinableGroupReduceWithGroupSorting(): Unit = { /* * check that input of combiner is also sorted for combinable groupReduce with group * sorting @@ -400,7 +400,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testDeepNestingAndNullValueInPojo: Unit = { + def testDeepNestingAndNullValueInPojo(): Unit = { /* * Deep nesting test * + null value in pojo @@ -425,7 +425,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testPojoContainigAWritableAndTuples: Unit = { + def testPojoContainigAWritableAndTuples(): Unit = { /* * Test Pojo containing a Writable and Tuples */ @@ -450,7 +450,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testTupleContainingPojosAndRegularFields: Unit ={ + def testTupleContainingPojosAndRegularFields(): Unit ={ /* * Test Tuple containing pojos and regular fields */ @@ -472,7 +472,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testStringBasedDefinitionOnGroupSort: Unit = { + def testStringBasedDefinitionOnGroupSort(): Unit = { /* * Test string-based definition on group sort, based on test: * check correctness of groupReduce with descending group sort @@ -495,7 +495,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testIntBasedDefinitionOnGroupSortForFullNestedTuple: Unit = { + def testIntBasedDefinitionOnGroupSortForFullNestedTuple(): Unit = { /* * Test int-based definition on group sort, for (full) nested Tuple */ @@ -510,7 +510,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testIntBasedDefinitionOnGroupSortForPartialNestedTuple: Unit = { + def testIntBasedDefinitionOnGroupSortForPartialNestedTuple(): Unit = { /* * Test int-based definition on group sort, for (partial) nested Tuple ASC */ @@ -527,7 +527,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testStringBasedDefinitionOnGroupSortForPartialNestedTuple: Unit = { + def testStringBasedDefinitionOnGroupSortForPartialNestedTuple(): Unit = { /* * Test string-based definition on group sort, for (partial) nested Tuple DESC */ @@ -544,7 +544,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testStringBasedDefinitionOnGroupSortForTwoGroupingKeys: Unit = { + def testStringBasedDefinitionOnGroupSortForTwoGroupingKeys(): Unit = { /* * Test string-based definition on group sort, for two grouping keys */ @@ -561,7 +561,7 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo } @Test - def testStringBasedDefinitionOnGroupSortForTwoGroupingKeysWithPojos: Unit = { + def testStringBasedDefinitionOnGroupSortForTwoGroupingKeysWithPojos(): Unit = { /* * Test string-based definition on group sort, for two grouping keys with Pojos */ @@ -725,18 +725,19 @@ class GroupReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mo env.setDegreeOfParallelism(1) val ds = CollectionDataSets.getPojoWithMultiplePojos(env) val reduceDs = ds.groupBy("p2.a2") - .reduceGroup( - new GroupReduceFunction[CollectionDataSets.PojoWithMultiplePojos, String] { - def reduce( - values: Iterable[CollectionDataSets.PojoWithMultiplePojos], - out: Collector[String]) { - val concat: StringBuilder = new StringBuilder - for (value <- values.asScala) { - concat.append(value.p2.a2) - } - out.collect(concat.toString()) + .reduceGroup { + new GroupReduceFunction[CollectionDataSets.PojoWithMultiplePojos, String] { + def reduce( + values: Iterable[CollectionDataSets.PojoWithMultiplePojos], + out: Collector[String]) { + val concat: StringBuilder = new StringBuilder + for (value <- values.asScala) { + concat.append(value.p2.a2) } - }) + out.collect(concat.toString()) + } + } + } reduceDs.writeAsText(resultPath, WriteMode.OVERWRITE) env.execute() expected = "b\nccc\nee\n" http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala index 8ae8674..cf13a42 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/JoinITCase.scala @@ -47,12 +47,12 @@ class JoinITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @After - def after: Unit = { + def after(): Unit = { compareResultsByLinesInMemory(expected, resultPath) } @Test - def testUDFJoinOnTuplesWithKeyFieldPositions: Unit = { + def testUDFJoinOnTuplesWithKeyFieldPositions(): Unit = { /* * UDF Join on tuples with key field positions */ @@ -66,7 +66,7 @@ class JoinITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testUDFJoinOnTuplesWithMultipleKeyFieldPositions: Unit = { + def testUDFJoinOnTuplesWithMultipleKeyFieldPositions(): Unit = { /* * UDF Join on tuples with multiple key field positions */ @@ -81,7 +81,7 @@ class JoinITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testDefaultJoinOnTuples: Unit = { + def testDefaultJoinOnTuples(): Unit = { /* * Default Join on tuples */ @@ -96,7 +96,7 @@ class JoinITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testJoinWithHuge: Unit = { + def testJoinWithHuge(): Unit = { /* * Join with Huge */ @@ -110,7 +110,7 @@ class JoinITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testJoinWithTiny: Unit = { + def testJoinWithTiny(): Unit = { /* * Join with Tiny */ @@ -124,7 +124,7 @@ class JoinITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testJoinThatReturnsTheLeftInputObject: Unit = { + def testJoinThatReturnsTheLeftInputObject(): Unit = { /* * Join that returns the left input object */ @@ -138,7 +138,7 @@ class JoinITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testJoinThatReturnsTheRightInputObject: Unit = { + def testJoinThatReturnsTheRightInputObject(): Unit = { /* * Join that returns the right input object */ @@ -152,7 +152,7 @@ class JoinITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testJoinWithBroadcastSet: Unit ={ + def testJoinWithBroadcastSet(): Unit ={ /* * Join with broadcast set */ @@ -186,7 +186,7 @@ class JoinITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testJoinOnCustomTypeInputWithKeyExtractorAndTupleInputWithKeyFieldSelector: Unit = { + def testJoinOnCustomTypeInputWithKeyExtractorAndTupleInputWithKeyFieldSelector(): Unit = { /* * Join on a tuple input with key field selector and a custom type input with key extractor */ @@ -200,7 +200,7 @@ class JoinITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testJoinOnTupleInputWithKeyFieldSelectorAndCustomTypeInputWithKeyExtractor: Unit = { + def testJoinOnTupleInputWithKeyFieldSelectorAndCustomTypeInputWithKeyExtractor(): Unit = { /* * Join on a tuple input with key field selector and a custom type input with key extractor */ @@ -214,7 +214,7 @@ class JoinITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testDefaultJoinOnTwoCustomTypeInputsWithKeyExtractors: Unit = { + def testDefaultJoinOnTwoCustomTypeInputsWithKeyExtractors(): Unit = { /* * (Default) Join on two custom type inputs with key extractors */ @@ -229,7 +229,7 @@ class JoinITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testUDFJoinOnTuplesWithTupleReturningKeySelectors: Unit = { + def testUDFJoinOnTuplesWithTupleReturningKeySelectors(): Unit = { /* * UDF Join on tuples with tuple-returning key selectors */ @@ -246,7 +246,7 @@ class JoinITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testNestedPojoAgainstTupleAsString: Unit = { + def testNestedPojoAgainstTupleAsString(): Unit = { /* * Join nested pojo against tuple (selected using a string) */ @@ -262,7 +262,7 @@ class JoinITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testJoinNestedPojoAgainstTupleAsInteger: Unit = { + def testJoinNestedPojoAgainstTupleAsInteger(): Unit = { /* * Join nested pojo against tuple (selected as an integer) */ @@ -278,7 +278,7 @@ class JoinITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testSelectingMultipleFieldsUsingExpressionLanguage: Unit = { + def testSelectingMultipleFieldsUsingExpressionLanguage(): Unit = { /* * selecting multiple fields using expression language */ @@ -297,7 +297,7 @@ class JoinITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testNestedIntoTuple: Unit = { + def testNestedIntoTuple(): Unit = { /* * nested into tuple */ @@ -315,7 +315,7 @@ class JoinITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testNestedIntoTupleIntoPojo: Unit = { + def testNestedIntoTupleIntoPojo(): Unit = { /* * nested into tuple into pojo */ @@ -336,7 +336,7 @@ class JoinITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testNonPojoFullTuple: Unit = { + def testNonPojoFullTuple(): Unit = { /* * Non-POJO test to verify that full-tuple keys are working. */ @@ -353,7 +353,7 @@ class JoinITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testNonPojoNestedTuple: Unit = { + def testNonPojoNestedTuple(): Unit = { /* * Non-POJO test to verify "nested" tuple-element selection. */ @@ -369,7 +369,7 @@ class JoinITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testFullPojoWithFullTuple: Unit = { + def testFullPojoWithFullTuple(): Unit = { /* * full pojo with full tuple */ http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala index 78426ef..4c74ca0 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/MapITCase.scala @@ -48,12 +48,12 @@ class MapITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @After - def after: Unit = { + def after(): Unit = { compareResultsByLinesInMemory(expected, resultPath) } @Test - def testIdentityMapperWithBasicType: Unit = { + def testIdentityMapperWithBasicType(): Unit = { /* * Test identity map with basic type */ @@ -67,7 +67,7 @@ class MapITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testIdentityMapperWithTuple: Unit = { + def testIdentityMapperWithTuple(): Unit = { /* * Test identity map with a tuple */ @@ -85,7 +85,7 @@ class MapITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testTypeConversionMapperCustomToTuple: Unit = { + def testTypeConversionMapperCustomToTuple(): Unit = { /* * Test type conversion mapper (Custom -> Tuple) */ @@ -103,7 +103,7 @@ class MapITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testTypeConversionMapperTupleToBasic: Unit = { + def testTypeConversionMapperTupleToBasic(): Unit = { /* * Test type conversion mapper (Tuple -> Basic) */ @@ -120,7 +120,7 @@ class MapITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testMapperOnTupleIncrementFieldReorderSecondAndThirdFields: Unit = { + def testMapperOnTupleIncrementFieldReorderSecondAndThirdFields(): Unit = { /* * Test mapper on tuple - Increment Integer field, reorder second and third fields */ @@ -138,7 +138,7 @@ class MapITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testMapperOnCustomLowercaseString: Unit = { + def testMapperOnCustomLowercaseString(): Unit = { /* * Test mapper on Custom - lowercase myString */ @@ -156,7 +156,7 @@ class MapITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testMapperIfUDFReturnsInputObjectIncrementFirstFieldOfTuple: Unit = { + def testMapperIfUDFReturnsInputObjectIncrementFirstFieldOfTuple(): Unit = { /* * Test mapper if UDF returns input object - increment first field of a tuple */ @@ -176,7 +176,7 @@ class MapITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testMapWithBroadcastSet: Unit = { + def testMapWithBroadcastSet(): Unit = { /* * Test map with broadcast set */ @@ -205,7 +205,7 @@ class MapITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testPassingConfigurationObject: Unit = { + def testPassingConfigurationObject(): Unit = { /* * Test passing configuration object. */ http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala index 35c0e93..f21fe11 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/PartitionITCase.scala @@ -45,12 +45,12 @@ class PartitionITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode } @After - def after: Unit = { + def after(): Unit = { compareResultsByLinesInMemory(expected, resultPath) } @Test - def testHashPartitionByTupleField: Unit = { + def testHashPartitionByTupleField(): Unit = { /* * Test hash partition by tuple field */ @@ -66,7 +66,7 @@ class PartitionITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode } @Test - def testHashPartitionByKeySelector: Unit = { + def testHashPartitionByKeySelector(): Unit = { /* * Test hash partition by key selector */ @@ -80,7 +80,7 @@ class PartitionITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode } @Test - def testForcedRebalancing: Unit = { + def testForcedRebalancing(): Unit = { /* * Test forced rebalancing */ @@ -111,7 +111,7 @@ class PartitionITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode } @Test - def testMapPartitionAfterRepartitionHasCorrectDOP: Unit = { + def testMapPartitionAfterRepartitionHasCorrectDOP(): Unit = { // Verify that mapPartition operation after repartition picks up correct // DOP val env = ExecutionEnvironment.getExecutionEnvironment @@ -129,7 +129,7 @@ class PartitionITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode } @Test - def testMapAfterRepartitionHasCorrectDOP: Unit = { + def testMapAfterRepartitionHasCorrectDOP(): Unit = { // Verify that map operation after repartition picks up correct // DOP val env = ExecutionEnvironment.getExecutionEnvironment @@ -157,7 +157,7 @@ class PartitionITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode } @Test - def testFilterAfterRepartitionHasCorrectDOP: Unit = { + def testFilterAfterRepartitionHasCorrectDOP(): Unit = { // Verify that filter operation after repartition picks up correct // DOP val env = ExecutionEnvironment.getExecutionEnvironment @@ -186,7 +186,7 @@ class PartitionITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode } @Test - def testPartitionNestedPojo: Unit = { + def testPartitionNestedPojo(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment env.setDegreeOfParallelism(3) val ds = CollectionDataSets.getDuplicatePojoDataSet(env) http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala index 5f63dc4..c2cd20a 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/ReduceITCase.scala @@ -48,12 +48,12 @@ class ReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @After - def after: Unit = { + def after(): Unit = { compareResultsByLinesInMemory(expected, resultPath) } @Test - def testReduceOnTuplesWithKeyFieldSelector: Unit = { + def testReduceOnTuplesWithKeyFieldSelector(): Unit = { /* * Reduce on tuples with key field selector */ @@ -67,7 +67,7 @@ class ReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testReduceOnTuplesWithMultipleKeyFieldSelectors: Unit = { + def testReduceOnTuplesWithMultipleKeyFieldSelectors(): Unit = { /* * Reduce on tuples with multiple key field selectors */ @@ -83,7 +83,7 @@ class ReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testReduceOnTuplesWithKeyExtractor: Unit = { + def testReduceOnTuplesWithKeyExtractor(): Unit = { /* * Reduce on tuples with key extractor */ @@ -97,7 +97,7 @@ class ReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testReduceOnCustomTypeWithKeyExtractor: Unit = { + def testReduceOnCustomTypeWithKeyExtractor(): Unit = { /* * Reduce on custom type with key extractor */ @@ -116,7 +116,7 @@ class ReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testAllReduceForTuple: Unit = { + def testAllReduceForTuple(): Unit = { /* * All-reduce for tuple */ @@ -130,7 +130,7 @@ class ReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testAllReduceForCustomTypes: Unit = { + def testAllReduceForCustomTypes(): Unit = { /* * All-reduce for custom types */ @@ -149,7 +149,7 @@ class ReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testReduceWithBroadcastSet: Unit = { + def testReduceWithBroadcastSet(): Unit = { /* * Reduce with broadcast set */ @@ -177,7 +177,7 @@ class ReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testReduceWithUDFThatReturnsTheSecondInputObject: Unit = { + def testReduceWithUDFThatReturnsTheSecondInputObject(): Unit = { /* * Reduce with UDF that returns the second input object (check mutable object handling) */ @@ -201,7 +201,7 @@ class ReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testReduceWithATupleReturningKeySelector: Unit = { + def testReduceWithATupleReturningKeySelector(): Unit = { /* * Reduce with a Tuple-returning KeySelector */ @@ -217,7 +217,7 @@ class ReduceITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testReduceOnGroupedDSByExpressionKey: Unit = { + def testReduceOnGroupedDSByExpressionKey(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val ds = CollectionDataSets.get5TupleDataSet(env) val reduceDs = ds.groupBy("_5", "_1") http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala index 5e456e0..6071569 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SumMinMaxITCase.scala @@ -46,12 +46,12 @@ class SumMinMaxITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode } @After - def after: Unit = { + def after(): Unit = { compareResultsByLinesInMemory(expected, resultPath) } @Test - def testFullAggregate: Unit = { + def testFullAggregate(): Unit = { // Full aggregate val env = ExecutionEnvironment.getExecutionEnvironment val ds = CollectionDataSets.get3TupleDataSet(env) @@ -71,7 +71,7 @@ class SumMinMaxITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode } @Test - def testGroupedAggregate: Unit = { + def testGroupedAggregate(): Unit = { // Grouped aggregate val env = ExecutionEnvironment.getExecutionEnvironment val ds = CollectionDataSets.get3TupleDataSet(env) @@ -91,7 +91,7 @@ class SumMinMaxITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode } @Test - def testNestedAggregate: Unit = { + def testNestedAggregate(): Unit = { // Nested aggregate val env = ExecutionEnvironment.getExecutionEnvironment val ds = CollectionDataSets.get3TupleDataSet(env) http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala index 3eed128..255cce9 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/operators/UnionITCase.scala @@ -50,12 +50,12 @@ class UnionITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @After - def after: Unit = { + def after(): Unit = { compareResultsByLinesInMemory(expected, resultPath) } @Test - def testUnionOf2IdenticalDS: Unit = { + def testUnionOf2IdenticalDS(): Unit = { /* * Union of 2 Same Data Sets */ @@ -68,7 +68,7 @@ class UnionITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testUnionOf5IdenticalDSWithMultipleUnions: Unit = { + def testUnionOf5IdenticalDSWithMultipleUnions(): Unit = { /* * Union of 5 same Data Sets, with multiple unions */ @@ -86,7 +86,7 @@ class UnionITCase(mode: ExecutionMode) extends MultipleProgramsTestBase(mode) { } @Test - def testUnionWithEmptyDS: Unit = { + def testUnionWithEmptyDS(): Unit = { /* * Test on union with empty dataset */ http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/PairComparatorTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/PairComparatorTestBase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/PairComparatorTestBase.scala index 4dcef0a..d3d52b0 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/PairComparatorTestBase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/tuple/base/PairComparatorTestBase.scala @@ -33,7 +33,7 @@ abstract class PairComparatorTestBase[T, R] { @Test def testEqualityWithReference(): Unit = { try { - val comparator = getComparator(true) + val comparator = getComparator(ascending = true) val (dataT, dataR) = getSortedData for (i <- 0 until dataT.length) { @@ -51,8 +51,8 @@ abstract class PairComparatorTestBase[T, R] { @Test def testInequalityWithReference(): Unit = { - testGreatSmallAscDescWithReference(true) - testGreatSmallAscDescWithReference(false) + testGreatSmallAscDescWithReference(ascending = true) + testGreatSmallAscDescWithReference(ascending = false) } protected def testGreatSmallAscDescWithReference(ascending: Boolean) { http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index b66841a..a557f02 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -21,7 +21,6 @@ package org.apache.flink.yarn; import org.apache.flink.client.CliFrontend; import org.apache.flink.client.FlinkYarnSessionCli; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java index 273439e..3f2e72e 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnCluster.java @@ -25,6 +25,7 @@ import static akka.pattern.Patterns.ask; import akka.actor.Props; import akka.util.Timeout; import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.AkkaUtils$; import org.apache.flink.runtime.net.NetUtils; import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster; @@ -43,6 +44,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.None$; import scala.Some; +import scala.Tuple2; import scala.concurrent.Await; import scala.concurrent.Awaitable; import scala.concurrent.Future; @@ -95,7 +97,8 @@ public class FlinkYarnCluster extends AbstractFlinkYarnCluster { // start actor system LOG.info("Start actor system."); InetAddress ownHostname = NetUtils.resolveAddress(jobManagerAddress); // find name of own public interface, able to connect to the JM - actorSystem = YarnUtils.createActorSystem(ownHostname.getCanonicalHostName(), 0, GlobalConfiguration.getConfiguration()); // set port automatically. + actorSystem = AkkaUtils.createActorSystem(GlobalConfiguration.getConfiguration(), + new Some(new Tuple2<String, Integer>(ownHostname.getCanonicalHostName(), 0))); // start application client LOG.info("Start application client."); http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java index 3f1cc23..aea9727 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java @@ -35,7 +35,6 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import scala.Tuple2; - public class YarnTaskManagerRunner { private static final Logger LOG = LoggerFactory.getLogger(YarnTaskManagerRunner.class); @@ -60,9 +59,7 @@ public class YarnTaskManagerRunner { @Override public Object run() { try { - Tuple2<ActorSystem, ActorRef> tuple = YarnUtils - .startActorSystemAndTaskManager(newArgs); - + Tuple2<ActorSystem, ActorRef> tuple = YarnUtils.startActorSystemAndTaskManager(newArgs); tuple._1().awaitTermination(); } catch (Exception e) { LOG.error("Error while running the TaskManager", e); http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala index 73349b2..390835c 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationClient.scala @@ -30,9 +30,9 @@ import org.apache.flink.yarn.Messages._ import scala.collection.mutable import scala.concurrent.duration._ -class ApplicationClient +import scala.language.postfixOps - extends Actor with ActorLogMessages with ActorLogging { +class ApplicationClient extends Actor with ActorLogMessages with ActorLogging { import context._ val INITIAL_POLLING_DELAY = 0 seconds @@ -50,8 +50,7 @@ class ApplicationClient override def preStart(): Unit = { super.preStart() - timeout = new FiniteDuration(GlobalConfiguration.getInteger(ConfigConstants.AKKA_ASK_TIMEOUT, - ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT), TimeUnit.SECONDS) + timeout = AkkaUtils.getTimeout(GlobalConfiguration.getConfiguration()) } override def postStop(): Unit = { @@ -65,12 +64,12 @@ class ApplicationClient override def receiveWithLogMessages: Receive = { // ----------------------------- Registration -> Status updates -> shutdown ---------------- - case LocalRegisterClient(address: String) => { + case LocalRegisterClient(address: String) => val jmAkkaUrl = JobManager.getRemoteAkkaURL(address) yarnJobManager = Some(AkkaUtils.getReference(jmAkkaUrl)(system, timeout)) yarnJobManager match { - case Some(jm) => { + case Some(jm) => // the message came from the FlinkYarnCluster. We send the message to the JobManager. // it is important not to forward the message because the JobManager is storing the // sender as the Application Client (this class). @@ -80,19 +79,18 @@ class ApplicationClient // request the number of task managers and slots from the job manager pollingTimer = Some(context.system.scheduler.schedule(INITIAL_POLLING_DELAY, WAIT_FOR_YARN_INTERVAL, yarnJobManager.get, PollYarnClusterStatus)) - } case None => throw new RuntimeException("Registration at JobManager/ApplicationMaster " + "failed. Job Manager RPC connection has not properly been initialized") } - } - case msg: StopYarnSession => { + + case msg: StopYarnSession => log.info("Stop yarn session.") stopMessageReceiver = Some(sender()) yarnJobManager foreach { _ forward msg } - } - case JobManagerStopped => { + + case JobManagerStopped => log.info("Remote JobManager has been stopped successfully. " + "Stopping local application client") stopMessageReceiver foreach { @@ -100,27 +98,27 @@ class ApplicationClient } // stop ourselves context.system.shutdown() - } + // handle the responses from the PollYarnClusterStatus messages to the yarn job mgr - case status: FlinkYarnClusterStatus => { + case status: FlinkYarnClusterStatus => latestClusterStatus = Some(status) - } + // locally get cluster status - case LocalGetYarnClusterStatus => { + case LocalGetYarnClusterStatus => sender() ! latestClusterStatus - } + // ----------------- handle messages from the cluster ------------------- // receive remote messages - case msg: YarnMessage => { + case msg: YarnMessage => messagesQueue.enqueue(msg) - } + // locally forward messages - case LocalGetYarnMessage => { + case LocalGetYarnMessage => sender() ! (if( messagesQueue.size == 0) None else messagesQueue.dequeue) - } + case _ => } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala index 529686f..31a4b09 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMaster.scala @@ -24,6 +24,7 @@ import java.security.PrivilegedAction import akka.actor._ import org.apache.flink.client.CliFrontend import org.apache.flink.configuration.ConfigConstants +import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.jobmanager.{WithWebServer, JobManager} import org.apache.flink.yarn.Messages.StartYarnSession import org.apache.hadoop.security.UserGroupInformation @@ -45,7 +46,7 @@ object ApplicationMaster { def main(args: Array[String]): Unit ={ val yarnClientUsername = System.getenv(FlinkYarnClient.ENV_CLIENT_USERNAME) LOG.info(s"YARN daemon runs as ${UserGroupInformation.getCurrentUser.getShortUserName}" + - s" setting user to execute Flink ApplicationMaster/JobManager to ${yarnClientUsername}") + s"' setting user to execute Flink ApplicationMaster/JobManager to $yarnClientUsername'") val ugi = UserGroupInformation.createRemoteUser(yarnClientUsername) @@ -171,8 +172,8 @@ object ApplicationMaster { LOG.info("Start job manager for yarn") val args = Array[String]("--configDir", currDir) - LOG.info(s"Config path: ${currDir}.") - val (_, _, configuration, _) = JobManager.parseArgs(args) + LOG.info(s"Config path: $currDir.") + val (configuration, _, _) = JobManager.parseArgs(args) // add dynamic properties to JobManager configuration. val dynamicProperties = CliFrontend.getDynamicProperties(dynamicPropertiesEncodedString) @@ -183,7 +184,7 @@ object ApplicationMaster { configuration.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, jobManagerWebPort) // set port to 0 to let Akka automatically determine the port. - implicit val jobManagerSystem = YarnUtils.createActorSystem(hostname, port = 0, configuration) + implicit val jobManagerSystem = AkkaUtils.createActorSystem(configuration, Some((hostname, 0))) LOG.info("Start job manager actor.") (jobManagerSystem, JobManager.startActor(Props(new JobManager(configuration) with http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala index 7e1ce6f..c8dbbf1 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala @@ -20,7 +20,7 @@ package org.apache.flink.yarn import java.io.{IOException, File} import java.nio.ByteBuffer -import java.util.{ Collections} +import java.util.Collections import akka.actor.ActorRef import org.apache.flink.configuration.ConfigConstants @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.util.Records import scala.concurrent.duration._ +import scala.language.postfixOps trait YarnJobManager extends ActorLogMessages { @@ -100,7 +101,7 @@ trait YarnJobManager extends ActorLogMessages { sender() ! new FlinkYarnClusterStatus(instanceManager.getNumberOfRegisteredTaskManagers, instanceManager.getTotalNumberOfSlots) - case StartYarnSession(conf, actorSystemPort: Int) => { + case StartYarnSession(conf, actorSystemPort: Int) => log.info("Start yarn session.") val memoryPerTaskManager = env.get(FlinkYarnClient.ENV_TM_MEMORY).toInt val heapLimit = Utils.calculateHeapSize(memoryPerTaskManager) @@ -109,7 +110,7 @@ trait YarnJobManager extends ActorLogMessages { require(applicationMasterHost != null, s"Application master (${Environment.NM_HOST} not set.") numTaskManager = env.get(FlinkYarnClient.ENV_TM_COUNT).toInt - log.info(s"Requesting ${numTaskManager} task managers.") + log.info(s"Requesting $numTaskManager task managers.") val remoteFlinkJarPath = env.get(FlinkYarnClient.FLINK_JAR_PATH) val fs = FileSystem.get(conf) @@ -194,34 +195,31 @@ trait YarnJobManager extends ActorLogMessages { yarnClientUsername, conf, taskManagerLocalResources)) context.system.scheduler.scheduleOnce(ALLOCATION_DELAY, self, PollContainerCompletion) - } - case PollContainerCompletion => { + case PollContainerCompletion => rmClientOption match { - case Some(rmClient) => { + case Some(rmClient) => val response = rmClient.allocate(completedContainers.toFloat / numTaskManager) for (container <- response.getAllocatedContainers.asScala) { log.info(s"Got new container for TM ${container.getId} on host ${ - container.getNodeId.getHost}") + container.getNodeId.getHost + }") allocatedContainers += 1 log.info(s"Launching container #$allocatedContainers.") nmClientOption match { - case Some(nmClient) => { + case Some(nmClient) => containerLaunchContext match { case Some(ctx) => nmClient.startContainer(container, ctx) - case None => { + case None => log.error("The ContainerLaunchContext was not set.") self ! StopYarnSession(FinalApplicationStatus.FAILED) - } } - } - case None => { + case None => log.error("The NMClient was not set.") self ! StopYarnSession(FinalApplicationStatus.FAILED) - } } } @@ -244,13 +242,10 @@ trait YarnJobManager extends ActorLogMessages { } else { self ! StopYarnSession(FinalApplicationStatus.FAILED) } - } - case None => { + case None => log.error("The AMRMClient was not set.") self ! StopYarnSession(FinalApplicationStatus.FAILED) - } } - } } def createContainerLaunchContext(heapLimit: Int, hasLogback: Boolean, hasLog4j: Boolean, http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala index b596946..e5012ef 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnTaskManager.scala @@ -30,9 +30,8 @@ trait YarnTaskManager extends ActorLogMessages { } def receiveYarnMessages: Receive = { - case StopYarnSession(status) => { + case StopYarnSession(status) => log.info(s"Stopping YARN TaskManager with final application status $status") context.system.shutdown() - } } } http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala index 185190d..775fcd0 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala @@ -19,7 +19,7 @@ package org.apache.flink.yarn import akka.actor.{Props, ActorRef, ActorSystem} -import com.typesafe.config.ConfigFactory +import com.typesafe.config.{Config, ConfigFactory} import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.taskmanager.TaskManager @@ -29,54 +29,14 @@ object YarnUtils { val LOG = LoggerFactory.getLogger(this.getClass) - def createActorSystem(hostname: String, port: Int, configuration: Configuration): ActorSystem = { - val akkaConfig = ConfigFactory.parseString(AkkaUtils.getConfigString(hostname, port, - configuration) + getConfigString) - - AkkaUtils.createActorSystem(akkaConfig) - } - - def createActorSystem(): ActorSystem = { - val akkaConfig = ConfigFactory.parseString(AkkaUtils.getDefaultActorSystemConfigString + - getConfigString) - - AkkaUtils.createActorSystem(akkaConfig) - } - - def getConfigString: String = { - """ - |akka{ - | loglevel = "DEBUG" - | stdout-loglevel = "DEBUG" - | log-dead-letters-during-shutdown = off - | log-dead-letters = off - | - | actor { - | provider = "akka.remote.RemoteActorRefProvider" - | } - | - | remote{ - | log-remote-lifecycle-events = off - | - | netty{ - | tcp{ - | transport-class = "akka.remote.transport.netty.NettyTransport" - | tcp-nodelay = on - | maximum-frame-size = 1MB - | execution-pool-size = 4 - | } - | } - | } - |}""".stripMargin - } - def startActorSystemAndTaskManager(args: Array[String]): (ActorSystem, ActorRef) = { val (hostname, port, config) = TaskManager.parseArgs(args) - val actorSystem = createActorSystem(hostname, port, config) + val actorSystem = AkkaUtils.createActorSystem(config, Some((hostname, port))) val (connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration) = - TaskManager.parseConfiguration(hostname, config, false) + TaskManager.parseConfiguration(hostname, config, localAkkaCommunication = false, + localTaskManagerCommunication = false) (actorSystem, TaskManager.startActor(Props(new TaskManager(connectionInfo, jobManagerURL, taskManagerConfig, networkConnectionConfiguration) with YarnTaskManager))(actorSystem)) http://git-wip-us.apache.org/repos/asf/flink/blob/4046819b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTests.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTests.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTests.java index c8d639a..1d01b03 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTests.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTests.java @@ -17,7 +17,6 @@ */ package org.apache.flink.yarn; -import org.apache.hadoop.fs.Path; import org.junit.Assert; import org.junit.Test;