buildbot success in on flink-docs-release-0.9
The Buildbot has detected a restored build on builder flink-docs-release-0.9 while building . Full details are available at: https://ci.apache.org/builders/flink-docs-release-0.9/builds/294 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: lares_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.9' triggered this build Build Source Stamp: [branch release-0.9] HEAD Blamelist: Build succeeded! Sincerely, -The Buildbot
buildbot success in on flink-docs-release-0.10
The Buildbot has detected a restored build on builder flink-docs-release-0.10 while building . Full details are available at: https://ci.apache.org/builders/flink-docs-release-0.10/builds/179 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: lares_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.10' triggered this build Build Source Stamp: [branch release-0.10] HEAD Blamelist: Build succeeded! Sincerely, -The Buildbot
[3/5] flink git commit: [FLINK-3375] [Kafka Connector] Add tests for per-kafka-partition watermarks.
[FLINK-3375] [Kafka Connector] Add tests for per-kafka-partition watermarks. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2dcd27f4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2dcd27f4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2dcd27f4 Branch: refs/heads/master Commit: 2dcd27f403c2a7f10791bfe21c45e2a326aa46a1 Parents: e40e29d Author: Stephan EwenAuthored: Wed Apr 13 15:45:51 2016 +0200 Committer: Stephan Ewen Committed: Wed Apr 13 20:50:49 2016 +0200 -- .../kafka/internals/AbstractFetcher.java| 77 +++-- .../kafka/internals/ExceptionProxy.java | 60 +++- .../connectors/kafka/util/KafkaUtils.java | 33 +- .../AbstractFetcherTimestampsTest.java | 306 +++ .../kafka/testutils/MockRuntimeContext.java | 46 ++- 5 files changed, 478 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java index 594aa66..8183575 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java @@ -328,45 +328,66 @@ public abstract class AbstractFetcher { ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException { - @SuppressWarnings("unchecked") - KafkaTopicPartitionState[] partitions = - (KafkaTopicPartitionState[]) new KafkaTopicPartitionState[assignedPartitions.size()]; - - int pos = 0; - for (KafkaTopicPartition partition : assignedPartitions) { - // create the kafka version specific partition handle - KPH kafkaHandle = createKafkaPartitionHandle(partition); + switch (timestampWatermarkMode) { - // create the partition state - KafkaTopicPartitionState partitionState; - switch (timestampWatermarkMode) { - case NO_TIMESTAMPS_WATERMARKS: - partitionState = new KafkaTopicPartitionState<>(partition, kafkaHandle); - break; - case PERIODIC_WATERMARKS: { + case NO_TIMESTAMPS_WATERMARKS: { + @SuppressWarnings("unchecked") + KafkaTopicPartitionState[] partitions = + (KafkaTopicPartitionState[]) new KafkaTopicPartitionState[assignedPartitions.size()]; + + int pos = 0; + for (KafkaTopicPartition partition : assignedPartitions) { + // create the kafka version specific partition handle + KPH kafkaHandle = createKafkaPartitionHandle(partition); + partitions[pos++] = new KafkaTopicPartitionState<>(partition, kafkaHandle); + } + + return partitions; + } + + case PERIODIC_WATERMARKS: { + @SuppressWarnings("unchecked") + KafkaTopicPartitionStateWithPeriodicWatermarks [] partitions = + (KafkaTopicPartitionStateWithPeriodicWatermarks []) + new KafkaTopicPartitionStateWithPeriodicWatermarks[assignedPartitions.size()]; + + int pos = 0; + for (KafkaTopicPartition partition : assignedPartitions) { + KPH kafkaHandle = createKafkaPartitionHandle(partition); + AssignerWithPeriodicWatermarks assignerInstance =
[1/5] flink git commit: [hotfix] [kafka consumer] Increase Kafka test stability
Repository: flink Updated Branches: refs/heads/master b0a7a1b81 -> 1a34f2165 [hotfix] [kafka consumer] Increase Kafka test stability Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2728f924 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2728f924 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2728f924 Branch: refs/heads/master Commit: 2728f924cf64cd62929b8f0e394a1d4335af8156 Parents: b0a7a1b Author: Stephan EwenAuthored: Wed Apr 13 10:38:37 2016 +0200 Committer: Stephan Ewen Committed: Wed Apr 13 20:50:48 2016 +0200 -- .../connectors/kafka/KafkaConsumerTestBase.java| 13 - 1 file changed, 12 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/2728f924/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 8ff67b4..a65a411 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -1278,13 +1278,24 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { new Tuple2Partitioner(parallelism))) .setParallelism(parallelism); - writeEnv.execute("Write sequence"); + try { + writeEnv.execute("Write sequence"); + } + catch (Exception e) { + LOG.error("Write attempt failed, trying again", e); + deleteTestTopic(topicName); + JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout)); + continue; + } + LOG.info("Finished writing sequence"); // Validate the Sequence // we need to validate the sequence, because kafka's producers are not exactly once LOG.info("Validating sequence"); + + JobManagerCommunicationUtils.waitUntilNoJobIsRunning(flink.getLeaderGateway(timeout)); final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); readEnv.getConfig().setRestartStrategy(RestartStrategies.noRestart());
[4/5] flink git commit: [FLINK-3743] upgrading breeze from 0.11.2 to 0.12
[FLINK-3743] upgrading breeze from 0.11.2 to 0.12 This closes #1876 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a34f216 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a34f216 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a34f216 Branch: refs/heads/master Commit: 1a34f2165b217a4579df410dd7ccb879d6a58083 Parents: 2dcd27f Author: Todd LisonbeeAuthored: Mon Apr 11 23:20:43 2016 -0700 Committer: Stephan Ewen Committed: Wed Apr 13 20:50:49 2016 +0200 -- flink-libraries/flink-ml/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/1a34f216/flink-libraries/flink-ml/pom.xml -- diff --git a/flink-libraries/flink-ml/pom.xml b/flink-libraries/flink-ml/pom.xml index 986cb0f..0a3531e 100644 --- a/flink-libraries/flink-ml/pom.xml +++ b/flink-libraries/flink-ml/pom.xml @@ -44,7 +44,7 @@ org.scalanlp breeze_${scala.binary.version} - 0.11.2 + 0.12
[5/5] flink git commit: [FLINK-3745] [runtime] Fix early stopping of stream sources
[FLINK-3745] [runtime] Fix early stopping of stream sources Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8570b6dc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8570b6dc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8570b6dc Branch: refs/heads/master Commit: 8570b6dc3ae3c69dc50e81a46835d40df8a03992 Parents: 2728f92 Author: Stephan EwenAuthored: Wed Apr 13 12:26:42 2016 +0200 Committer: Stephan Ewen Committed: Wed Apr 13 20:50:49 2016 +0200 -- .../tasks/StoppableSourceStreamTask.java| 15 +++- .../tasks/SourceStreamTaskStoppingTest.java | 94 .../runtime/tasks/SourceStreamTaskTest.java | 40 + .../streaming/timestamp/TimestampITCase.java| 18 ++-- .../test/classloading/jar/UserCodeType.java | 1 + 5 files changed, 123 insertions(+), 45 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java index 5173796..7ff39b7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.java @@ -31,9 +31,20 @@ import org.apache.flink.streaming.api.operators.StoppableStreamSource; public class StoppableSourceStreamTask extends SourceStreamTask > implements StoppableTask { + private volatile boolean stopped; + @Override - public void stop() { - this.headOperator.stop(); + protected void run() throws Exception { + if (!stopped) { + super.run(); + } } + @Override + public void stop() { + stopped = true; + if (this.headOperator != null) { + this.headOperator.stop(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8570b6dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java -- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java new file mode 100644 index 000..ab9e59b --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskStoppingTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.functions.StoppableFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.operators.StoppableStreamSource; + +import org.junit.Test; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * These tests verify that the RichFunction methods are called (in correct order). And that + * checkpointing/element emission don't occur concurrently. + */ +public class SourceStreamTaskStoppingTest { + + + // test flag for testStop() + static boolean stopped = false; + + @Test + public void testStop() { + final StoppableSourceStreamTask
[2/5] flink git commit: [FLINK-3747] Consolidate TimestampAssigner Methods in Kafka Consumer
[FLINK-3747] Consolidate TimestampAssigner Methods in Kafka Consumer This closes #1877 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e40e29da Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e40e29da Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e40e29da Branch: refs/heads/master Commit: e40e29da9b68ec6da59f7b5372cb1483283c0530 Parents: 8570b6d Author: Aljoscha KrettekAuthored: Wed Apr 13 11:41:39 2016 +0200 Committer: Stephan Ewen Committed: Wed Apr 13 20:50:49 2016 +0200 -- .../connectors/kafka/FlinkKafkaConsumerBase.java| 4 ++-- .../connectors/kafka/FlinkKafkaConsumerBaseTest.java| 12 ++-- .../connectors/kafka/KafkaConsumerTestBase.java | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/e40e29da/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index 0ca8fd5..ed5c72f 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -148,7 +148,7 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti * @param assigner The timestamp assigner / watermark generator to use. * @return The consumer object, to allow function chaining. */ - public FlinkKafkaConsumerBase setPunctuatedWatermarkEmitter(AssignerWithPunctuatedWatermarks assigner) { + public FlinkKafkaConsumerBase assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks assigner) { checkNotNull(assigner); if (this.periodicWatermarkAssigner != null) { @@ -182,7 +182,7 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti * @param assigner The timestamp assigner / watermark generator to use. * @return The consumer object, to allow function chaining. */ - public FlinkKafkaConsumerBase setPeriodicWatermarkEmitter(AssignerWithPeriodicWatermarks assigner) { + public FlinkKafkaConsumerBase assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks assigner) { checkNotNull(assigner); if (this.punctuatedWatermarkAssigner != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/e40e29da/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index f4ef995..9b517df 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -44,12 +44,12 @@ public class FlinkKafkaConsumerBaseTest { @Test public void testEitherWatermarkExtractor() { try { - new DummyFlinkKafkaConsumer<>().setPeriodicWatermarkEmitter(null); + new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks) null); fail(); } catch (NullPointerException ignored) {} try { - new DummyFlinkKafkaConsumer<>().setPunctuatedWatermarkEmitter(null); + new DummyFlinkKafkaConsumer<>().assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks) null); fail(); } catch (NullPointerException ignored) {} @@ -59,16 +59,16 @@ public class FlinkKafkaConsumerBaseTest { final AssignerWithPunctuatedWatermarks punctuatedAssigner =
[2/2] flink git commit: [FLINK-2909] [gelly] Graph Generators
[FLINK-2909] [gelly] Graph Generators Initial set of scale-free graph generators: - Complete graph - Cycle graph - Empty graph - Grid graph - Hypercube graph - Path graph - RMat graph - Singleton edge graph - Star graph This closes #1807 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b0a7a1b8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b0a7a1b8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b0a7a1b8 Branch: refs/heads/master Commit: b0a7a1b813b29dbfd5c959ee2929372c31befda8 Parents: 5350bc4 Author: Greg HoganAuthored: Tue Mar 15 16:59:02 2016 -0400 Committer: EC2 Default User Committed: Wed Apr 13 18:25:29 2016 + -- docs/apis/batch/libs/gelly.md | 644 +++ docs/page/css/flink.css | 21 + .../flink/util/LongValueSequenceIterator.java | 193 ++ .../util/LongValueSequenceIteratorTest.java | 92 +++ .../java/org/apache/flink/api/java/Utils.java | 2 +- .../apache/flink/graph/examples/Graph500.java | 121 .../graph/generator/AbstractGraphGenerator.java | 33 + .../flink/graph/generator/CompleteGraph.java| 112 .../flink/graph/generator/CycleGraph.java | 60 ++ .../flink/graph/generator/EmptyGraph.java | 79 +++ .../flink/graph/generator/GraphGenerator.java | 51 ++ .../graph/generator/GraphGeneratorUtils.java| 119 .../apache/flink/graph/generator/GridGraph.java | 165 + .../flink/graph/generator/HypercubeGraph.java | 65 ++ .../apache/flink/graph/generator/PathGraph.java | 60 ++ .../apache/flink/graph/generator/RMatGraph.java | 316 + .../graph/generator/SingletonEdgeGraph.java | 107 +++ .../apache/flink/graph/generator/StarGraph.java | 100 +++ .../random/AbstractGeneratorFactory.java| 72 +++ .../flink/graph/generator/random/BlockInfo.java | 82 +++ .../random/JDKRandomGeneratorFactory.java | 72 +++ .../random/MersenneTwisterFactory.java | 72 +++ .../graph/generator/random/RandomGenerable.java | 41 ++ .../random/RandomGenerableFactory.java | 57 ++ .../graph/generator/AbstractGraphTest.java | 33 + .../graph/generator/CompleteGraphTest.java | 84 +++ .../flink/graph/generator/CycleGraphTest.java | 83 +++ .../flink/graph/generator/EmptyGraphTest.java | 78 +++ .../flink/graph/generator/GridGraphTest.java| 93 +++ .../graph/generator/HypercubeGraphTest.java | 85 +++ .../flink/graph/generator/PathGraphTest.java| 83 +++ .../flink/graph/generator/RMatGraphTest.java| 70 ++ .../graph/generator/SingletonEdgeGraphTest.java | 84 +++ .../flink/graph/generator/StarGraphTest.java| 85 +++ .../apache/flink/graph/generator/TestUtils.java | 103 +++ 35 files changed, 3616 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/docs/apis/batch/libs/gelly.md -- diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md index d803be2..53d628d 100644 --- a/docs/apis/batch/libs/gelly.md +++ b/docs/apis/batch/libs/gelly.md @@ -2012,3 +2012,647 @@ vertex represents a group of vertices and each edge represents a group of edges vertex and edge in the output graph stores the common group value and the number of represented elements. {% top %} + +Graph Generators +--- + +Gelly provides a collection of scalable graph generators. Each generator is + +* parallelizable, in order to create large datasets +* scale-free, generating the same graph regardless of parallelism +* thrifty, using as few operators as possible + +Graph generators are configured using the builder pattern. The parallelism of generator +operators can be set explicitly by calling `setParallelism(parallelism)`. Lowering the +parallelism will reduce the allocation of memory and network buffers. + +Graph-specific configuration must be called first, then configuration common to all +generators, and lastly the call to `generate()`. The following example configures a +grid graph with two dimensions, configures the parallelism, and generates the graph. + + + +{% highlight java %} +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + +boolean wrapEndpoints = false; + +int parallelism = 4; + +Graph graph = new GridGraph(env) +.addDimension(2, wrapEndpoints) +.addDimension(4, wrapEndpoints) +.setParallelism(parallelism) +.generate(); +{% endhighlight %} + + + +{% highlight scala %} +import org.apache.flink.api.scala._ +import org.apache.flink.graph.generator.GridGraph + +val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment +
flink git commit: [FLINK-3589] Allow setting Operator parallelism to default value
Repository: flink Updated Branches: refs/heads/master 6c061684f -> 5350bc48a [FLINK-3589] Allow setting Operator parallelism to default value Adds the public field ExecutionConfig.PARALLELISM_DEFAULT as a flag value to indicate that the default parallelism should be used. Adds the public field ExecutionConfig.PARALLELISM_UNKNOWN as a flag value to indicate that the parallelism should remain unchanged. This closes #1778 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5350bc48 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5350bc48 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5350bc48 Branch: refs/heads/master Commit: 5350bc48af37a1d6296de5b290b870b7d68407fa Parents: 6c06168 Author: Greg HoganAuthored: Wed Mar 9 12:41:45 2016 -0500 Committer: Greg Hogan Committed: Wed Apr 13 11:59:13 2016 -0400 -- .../org/apache/flink/client/LocalExecutor.java | 5 +- .../apache/flink/client/cli/ProgramOptions.java | 3 +- .../client/program/ContextEnvironment.java | 3 +- .../flink/api/common/ExecutionConfig.java | 30 ++--- .../java/org/apache/flink/api/common/Plan.java | 17 +++--- .../flink/api/common/operators/Operator.java| 16 ++--- .../flink/api/common/ExecutionConfigTest.java | 33 +- .../flink/api/java/ExecutionEnvironment.java| 2 +- .../apache/flink/api/java/LocalEnvironment.java | 3 +- .../flink/api/java/operators/DataSink.java | 3 +- .../api/java/operators/DeltaIteration.java | 8 ++- .../flink/api/java/operators/Operator.java | 20 +++--- .../flink/api/java/operator/OperatorTest.java | 64 .../translation/ReduceTranslationTests.java | 5 +- .../flink/optimizer/dag/OptimizerNode.java | 17 +++--- .../flink/runtime/jobgraph/JobVertex.java | 3 +- .../flink/streaming/api/graph/StreamNode.java | 3 +- .../translation/ReduceTranslationTest.scala | 7 ++- 18 files changed, 185 insertions(+), 57 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java -- diff --git a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java index 25da5c7..65bf5d8 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/LocalExecutor.java @@ -20,6 +20,7 @@ package org.apache.flink.client; import java.util.List; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.Plan; @@ -206,7 +207,7 @@ public class LocalExecutor extends PlanExecutor { */ @Override public String getOptimizerPlanAsJSON(Plan plan) throws Exception { - final int parallelism = plan.getDefaultParallelism() == -1 ? 1 : plan.getDefaultParallelism(); + final int parallelism = plan.getDefaultParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? 1 : plan.getDefaultParallelism(); Optimizer pc = new Optimizer(new DataStatistics(), this.configuration); pc.setDefaultParallelism(parallelism); @@ -271,7 +272,7 @@ public class LocalExecutor extends PlanExecutor { * @throws Exception Thrown, if the optimization process that creates the execution plan failed. */ public static String optimizerPlanAsJSON(Plan plan) throws Exception { - final int parallelism = plan.getDefaultParallelism() == -1 ? 1 : plan.getDefaultParallelism(); + final int parallelism = plan.getDefaultParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? 1 : plan.getDefaultParallelism(); Optimizer pc = new Optimizer(new DataStatistics(), new Configuration()); pc.setDefaultParallelism(parallelism); http://git-wip-us.apache.org/repos/asf/flink/blob/5350bc48/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java -- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java index 73d49b5..368ec19 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java @@ -18,6 +18,7 @@ package org.apache.flink.client.cli; import org.apache.commons.cli.CommandLine; +import
flink git commit: [hotfix] [tests] Add cross transformation to OverwriteObjects manual test
Repository: flink Updated Branches: refs/heads/master e7d78e63a -> 6c061684f [hotfix] [tests] Add cross transformation to OverwriteObjects manual test Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c061684 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c061684 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c061684 Branch: refs/heads/master Commit: 6c061684f93ddcd375f37d3ef71f6dd66783df8b Parents: e7d78e6 Author: Greg HoganAuthored: Wed Apr 13 10:48:52 2016 -0400 Committer: Greg Hogan Committed: Wed Apr 13 10:51:06 2016 -0400 -- .../flink/test/manual/OverwriteObjects.java | 67 ++-- 1 file changed, 63 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6c061684/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java -- diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java b/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java index 1a70c69..23d8e67 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/OverwriteObjects.java @@ -18,6 +18,7 @@ package org.apache.flink.test.manual; +import org.apache.flink.api.common.functions.CrossFunction; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.ReduceFunction; @@ -73,6 +74,7 @@ public class OverwriteObjects { testReduce(env); testGroupedReduce(env); testJoin(env); + testCross(env); } } @@ -127,7 +129,7 @@ public class OverwriteObjects { Assert.assertEquals(disabledChecksum, enabledChecksum); } - public class OverwriteObjectsReduce implements ReduceFunction > { + private class OverwriteObjectsReduce implements ReduceFunction > { private Scrambler scrambler; public OverwriteObjectsReduce(boolean keyed) { @@ -252,7 +254,7 @@ public class OverwriteObjects { } } - public class OverwriteObjectsJoin implements JoinFunction , Tuple2 , Tuple2 > { + private class OverwriteObjectsJoin implements JoinFunction , Tuple2 , Tuple2 > { private Scrambler scrambler = new Scrambler(true); @Override @@ -263,12 +265,69 @@ public class OverwriteObjects { // - private DataSet > getDataSet(ExecutionEnvironment env) { + public void testCross(ExecutionEnvironment env) throws Exception { + /* +* Test CrossDriver +*/ + + LOG.info("Testing cross"); + + DataSet > small = getDataSet(env, 100, 20); + DataSet > large = getDataSet(env, 1, 2000); + + // test NESTEDLOOP_BLOCKED_OUTER_FIRST and NESTEDLOOP_BLOCKED_OUTER_SECOND with object reuse enabled + + env.getConfig().enableObjectReuse(); + + ChecksumHashCode enabledChecksumWithHuge = DataSetUtils.checksumHashCode(small + .crossWithHuge(large) + .with(new OverwriteObjectsCross())); + + ChecksumHashCode enabledChecksumWithTiny = DataSetUtils.checksumHashCode(small + .crossWithTiny(large) + .with(new OverwriteObjectsCross())); + + Assert.assertEquals(enabledChecksumWithHuge, enabledChecksumWithTiny); + + // test NESTEDLOOP_BLOCKED_OUTER_FIRST and NESTEDLOOP_BLOCKED_OUTER_SECOND with object reuse disabled + + env.getConfig().disableObjectReuse(); + + ChecksumHashCode disabledChecksumWithHuge = DataSetUtils.checksumHashCode(small + .crossWithHuge(large) + .with(new OverwriteObjectsCross())); + + ChecksumHashCode disabledChecksumWithTiny = DataSetUtils.checksumHashCode(small + .crossWithTiny(large) + .with(new OverwriteObjectsCross())); + + Assert.assertEquals(disabledChecksumWithHuge, disabledChecksumWithTiny); + +
flink git commit: [hotfix] [storm] Add proper exception forwarding to FlinkTopology.copyObject
Repository: flink Updated Branches: refs/heads/master 8e036c38e -> e7d78e63a [hotfix] [storm] Add proper exception forwarding to FlinkTopology.copyObject Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7d78e63 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7d78e63 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7d78e63 Branch: refs/heads/master Commit: e7d78e63ae8e0fc49ad62a77dc4dd90afde85d49 Parents: 8e036c3 Author: Till RohrmannAuthored: Wed Apr 13 14:55:35 2016 +0200 Committer: Till Rohrmann Committed: Wed Apr 13 14:56:17 2016 +0200 -- .../src/main/java/org/apache/flink/storm/api/FlinkTopology.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/e7d78e63/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java -- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java index 6706a91..2546f17 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java @@ -142,7 +142,7 @@ public class FlinkTopology { getClass().getClassLoader() ); } catch (IOException | ClassNotFoundException e) { - throw new RuntimeException("Failed to copy object."); + throw new RuntimeException("Failed to copy object.", e); } }
flink git commit: [FLINK-3735] Make DataSetUnionRule match only for union-all
Repository: flink Updated Branches: refs/heads/master db6528be0 -> 8e036c38e [FLINK-3735] Make DataSetUnionRule match only for union-all This closes #1874 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e036c38 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e036c38 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e036c38 Branch: refs/heads/master Commit: 8e036c38e3fb7e48aa91a09debab3be6a24deefe Parents: db6528b Author: vasiaAuthored: Tue Apr 12 15:11:54 2016 +0200 Committer: vasia Committed: Wed Apr 13 12:36:04 2016 +0200 -- .../plan/rules/dataSet/DataSetUnionRule.scala | 34 1 file changed, 21 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/8e036c38/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala index 32400d0..cd1de1e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetUnionRule.scala @@ -18,7 +18,7 @@ package org.apache.flink.api.table.plan.rules.dataSet -import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.logical.LogicalUnion @@ -32,21 +32,29 @@ class DataSetUnionRule "FlinkUnionRule") { -def convert(rel: RelNode): RelNode = { + /** + * Only translate UNION ALL + */ + override def matches(call: RelOptRuleCall): Boolean = { +val union: LogicalUnion = call.rel(0).asInstanceOf[LogicalUnion] +union.all + } + + def convert(rel: RelNode): RelNode = { - val union: LogicalUnion = rel.asInstanceOf[LogicalUnion] - val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) - val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataSetConvention.INSTANCE) - val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataSetConvention.INSTANCE) +val union: LogicalUnion = rel.asInstanceOf[LogicalUnion] +val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) +val convLeft: RelNode = RelOptRule.convert(union.getInput(0), DataSetConvention.INSTANCE) +val convRight: RelNode = RelOptRule.convert(union.getInput(1), DataSetConvention.INSTANCE) - new DataSetUnion( -rel.getCluster, -traitSet, -convLeft, -convRight, -rel.getRowType) -} +new DataSetUnion( + rel.getCluster, + traitSet, + convLeft, + convRight, + rel.getRowType) } +} object DataSetUnionRule { val INSTANCE: RelOptRule = new DataSetUnionRule
flink git commit: [FLINK-3716] [kafka consumer] Decreasing socket timeout so testFailOnNoBroker() will pass before JUnit timeout
Repository: flink Updated Branches: refs/heads/release-1.0 ff38202ba -> ea50ed348 [FLINK-3716] [kafka consumer] Decreasing socket timeout so testFailOnNoBroker() will pass before JUnit timeout This closes #1864 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea50ed34 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea50ed34 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea50ed34 Branch: refs/heads/release-1.0 Commit: ea50ed348ccd34a343ebc501f989593253179774 Parents: ff38202 Author: Todd LisonbeeAuthored: Thu Apr 7 17:47:19 2016 -0700 Committer: Stephan Ewen Committed: Wed Apr 13 10:48:39 2016 +0200 -- .../flink/streaming/connectors/kafka/KafkaConsumerTestBase.java | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/ea50ed34/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 9377cee..f31e4f1 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -147,6 +147,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { properties.setProperty("zookeeper.connect", "localhost:80"); properties.setProperty("group.id", "test"); properties.setProperty("request.timeout.ms", "3000"); // let the test fail fast + properties.setProperty("socket.timeout.ms", "3000"); properties.setProperty("session.timeout.ms", "2000"); properties.setProperty("fetch.max.wait.ms", "2000"); properties.setProperty("heartbeat.interval.ms", "1000");
flink git commit: [FLINK-3730] Fix RocksDB Local Directory Initialization
Repository: flink Updated Branches: refs/heads/release-1.0 4f9c19808 -> ff38202ba [FLINK-3730] Fix RocksDB Local Directory Initialization Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff38202b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff38202b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff38202b Branch: refs/heads/release-1.0 Commit: ff38202bacfbe07e91bc1b46d44e5eb46e991e79 Parents: 4f9c198 Author: Aljoscha KrettekAuthored: Tue Apr 12 10:46:59 2016 +0200 Committer: Aljoscha Krettek Committed: Wed Apr 13 10:38:11 2016 +0200 -- .../contrib/streaming/state/RocksDBStateBackend.java | 9 ++--- .../streaming/state/RocksDBStateBackendConfigTest.java| 10 +- 2 files changed, 15 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/ff38202b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java -- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index e3b4f4d..8f846da 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -24,6 +24,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.UUID; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.FoldingState; @@ -181,13 +182,15 @@ public class RocksDBStateBackend extends AbstractStateBackend { for (Path path : configuredDbBasePaths) { File f = new File(path.toUri().getPath()); - if (!f.exists() && !f.mkdirs()) { - String msg = "Local DB files directory '" + f.getAbsolutePath() + File testDir = new File(f, UUID.randomUUID().toString()); + if (!testDir.mkdirs()) { + String msg = "Local DB files directory '" + path + "' does not exist and cannot be created. "; LOG.error(msg); errorMessage += msg; + } else { + dirs.add(f); } - dirs.add(f); } if (dirs.isEmpty()) { http://git-wip-us.apache.org/repos/asf/flink/blob/ff38202b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java -- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 8e0993b..42ba275 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -20,8 +20,10 @@ package org.apache.flink.contrib.streaming.state; import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.state.AbstractStateBackend; @@ -133,14 +135,17 @@ public class RocksDBStateBackendConfigTest { RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI); rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath()); - + +
flink git commit: [FLINK-3730] Fix RocksDB Local Directory Initialization
Repository: flink Updated Branches: refs/heads/master db85f3858 -> db6528be0 [FLINK-3730] Fix RocksDB Local Directory Initialization Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db6528be Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db6528be Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db6528be Branch: refs/heads/master Commit: db6528be00d624d6a389d5b303e565ca6b1c0f40 Parents: db85f38 Author: Aljoscha KrettekAuthored: Tue Apr 12 10:46:59 2016 +0200 Committer: Aljoscha Krettek Committed: Wed Apr 13 10:36:18 2016 +0200 -- .../contrib/streaming/state/RocksDBStateBackend.java | 9 ++--- .../streaming/state/RocksDBStateBackendConfigTest.java| 10 +- 2 files changed, 15 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/db6528be/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java -- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index e3b4f4d..8f846da 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -24,6 +24,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.UUID; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.FoldingState; @@ -181,13 +182,15 @@ public class RocksDBStateBackend extends AbstractStateBackend { for (Path path : configuredDbBasePaths) { File f = new File(path.toUri().getPath()); - if (!f.exists() && !f.mkdirs()) { - String msg = "Local DB files directory '" + f.getAbsolutePath() + File testDir = new File(f, UUID.randomUUID().toString()); + if (!testDir.mkdirs()) { + String msg = "Local DB files directory '" + path + "' does not exist and cannot be created. "; LOG.error(msg); errorMessage += msg; + } else { + dirs.add(f); } - dirs.add(f); } if (dirs.isEmpty()) { http://git-wip-us.apache.org/repos/asf/flink/blob/db6528be/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java -- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 8e0993b..42ba275 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -20,8 +20,10 @@ package org.apache.flink.contrib.streaming.state; import org.apache.commons.io.FileUtils; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.state.AbstractStateBackend; @@ -133,14 +135,17 @@ public class RocksDBStateBackendConfigTest { RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI); rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath()); - + + boolean
[06/14] flink git commit: [FLINK-3541] [Kafka Connector] Clean up workaround in FlinkKafkaConsumer09
[FLINK-3541] [Kafka Connector] Clean up workaround in FlinkKafkaConsumer09 This closes #1846 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/693d5ab0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/693d5ab0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/693d5ab0 Branch: refs/heads/master Commit: 693d5ab09efef8732b857437bf1089f841b5e864 Parents: d20eda1 Author: Tianji LiAuthored: Fri Apr 1 00:35:39 2016 -0400 Committer: Stephan Ewen Committed: Wed Apr 13 01:10:54 2016 +0200 -- .../connectors/kafka/FlinkKafkaConsumer09.java | 30 ++-- 1 file changed, 2 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/693d5ab0/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java index d34cd2f..bc2904c 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java @@ -171,43 +171,17 @@ public class FlinkKafkaConsumer09 extends FlinkKafkaConsumerBase { // read the partitions that belong to the listed topics final List partitions = new ArrayList<>(); - KafkaConsumer consumer = null; - try { - consumer = new KafkaConsumer<>(this.properties); + try (KafkaConsumer consumer = new KafkaConsumer<>(this.properties)) { for (final String topic: topics) { // get partitions for each topic - List partitionsForTopic = null; - for(int tri = 0; tri < 10; tri++) { - LOG.info("Trying to get partitions for topic {}", topic); - try { - partitionsForTopic = consumer.partitionsFor(topic); - if(partitionsForTopic != null && partitionsForTopic.size() > 0) { - break; // it worked - } - } catch (NullPointerException npe) { - // workaround for KAFKA-2880: Fetcher.getTopicMetadata NullPointerException when broker cannot be reached - // we ignore the NPE. - } - // create a new consumer - consumer.close(); - try { - Thread.sleep(1000); - } catch (InterruptedException ignored) {} - - consumer = new KafkaConsumer<>(properties); - } + List partitionsForTopic = consumer.partitionsFor(topic); // for non existing topics, the list might be null. if (partitionsForTopic != null) { partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); } } } - finally { - if(consumer != null) { - consumer.close(); - } - } if (partitions.isEmpty()) { throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics);
[05/14] flink git commit: [FLINK-3375] [kafka connector] Add per-Kafka-partition watermark generation to the FlinkKafkaConsumer
[FLINK-3375] [kafka connector] Add per-Kafka-partition watermark generation to the FlinkKafkaConsumer This closes #1839 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ac1549e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ac1549e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ac1549e Branch: refs/heads/master Commit: 0ac1549ec58c1737a79e5770a171a8b14bed56dc Parents: 885d543 Author: kl0uAuthored: Tue Mar 8 17:35:14 2016 +0100 Committer: Stephan Ewen Committed: Wed Apr 13 01:10:54 2016 +0200 -- .../connectors/kafka/FlinkKafkaConsumer08.java | 56 +-- .../connectors/kafka/internals/Fetcher.java | 12 +- .../kafka/internals/LegacyFetcher.java | 51 ++- .../kafka/internals/ZookeeperOffsetHandler.java | 4 +- .../connectors/kafka/Kafka08ITCase.java | 16 +- .../connectors/kafka/KafkaConsumerTest.java | 12 +- .../connectors/kafka/FlinkKafkaConsumer09.java | 13 +- .../connectors/kafka/Kafka09ITCase.java | 11 +- .../kafka/FlinkKafkaConsumerBase.java | 357 ++- .../kafka/internals/KafkaPartitionState.java| 65 .../connectors/kafka/KafkaConsumerTestBase.java | 234 +++- .../AssignerWithPeriodicWatermarks.java | 2 +- 12 files changed, 744 insertions(+), 89 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/0ac1549e/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java index 865fe36..4748781 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.internals.Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaPartitionState; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher; import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler; @@ -108,11 +109,6 @@ public class FlinkKafkaConsumer08 extends FlinkKafkaConsumerBase { private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer08.class); - /** Magic number to define an unset offset. Negative offsets are not used by Kafka (invalid), -* and we pick a number that is probably (hopefully) not used by Kafka as a magic number for anything else. */ - public static final long OFFSET_NOT_SET = -915623761776L; - - /** Configuration key for the number of retries for getting the partition info */ public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry"; @@ -252,17 +248,19 @@ public class FlinkKafkaConsumer08 extends FlinkKafkaConsumerBase { this.fetcher = null; // fetcher remains null return; } - // offset handling offsetHandler = new ZookeeperOffsetHandler(props); committedOffsets = new HashMap<>(); - Map subscribedPartitionsWithOffsets = new HashMap<>(subscribedPartitions.size()); - // initially load the map with "offset not set" + // initially load the map with "offset not set", last max read timestamp set to Long.MIN_VALUE + // and mark the partition as in-active, until we receive the first element + Map subscribedPartitionsWithOffsets = + new HashMap<>(subscribedPartitions.size()); for(KafkaTopicPartition ktp: subscribedPartitions) { - subscribedPartitionsWithOffsets.put(ktp, FlinkKafkaConsumer08.OFFSET_NOT_SET); + subscribedPartitionsWithOffsets.put(ktp, + new KafkaPartitionState(ktp.getPartition(),
[08/14] flink git commit: [FLINK-3375] [kafka connector] Rework/simplify Kafka Connector and have a WatermarkExtractor object per partition
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java new file mode 100644 index 000..99c5d69 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPeriodicWatermarks.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.watermark.Watermark; + +/** + * A special version of the per-kafka-partition-state that additionally holds + * a periodic watermark generator (and timestamp extractor) per partition. + * + * @param The type of records handled by the watermark generator + * @param The type of the Kafka partition descriptor, which varies across Kafka versions. + */ +public final class KafkaTopicPartitionStateWithPeriodicWatermarksextends KafkaTopicPartitionState { + + /** The timestamp assigner and watermark generator for the partition */ + private final AssignerWithPeriodicWatermarks timestampsAndWatermarks; + + /** The last watermark timestamp generated by this partition */ + private long partitionWatermark; + + // + + public KafkaTopicPartitionStateWithPeriodicWatermarks( + KafkaTopicPartition partition, KPH kafkaPartitionHandle, + AssignerWithPeriodicWatermarks timestampsAndWatermarks) + { + super(partition, kafkaPartitionHandle); + + this.timestampsAndWatermarks = timestampsAndWatermarks; + this.partitionWatermark = Long.MIN_VALUE; + } + + // + + public long getTimestampForRecord (T record) { + return timestampsAndWatermarks.extractTimestamp(record, Long.MIN_VALUE); + } + + public long getCurrentWatermarkTimestamp() { + Watermark wm = timestampsAndWatermarks.getCurrentWatermark(); + if (wm != null) { + partitionWatermark = Math.max(partitionWatermark, wm.getTimestamp()); + } + return partitionWatermark; + } + + // + + @Override + public String toString() { + return "KafkaTopicPartitionStateWithPeriodicWatermarks: partition=" + getKafkaTopicPartition() + + ", offset=" + getOffset() + ", watermark=" + partitionWatermark; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionStateWithPunctuatedWatermarks.java new file mode 100644 index 000..b265990 --- /dev/null +++
[13/14] flink git commit: [FLINK-3644] [web monitor] Add new config option to set web monitor tmp dir
[FLINK-3644] [web monitor] Add new config option to set web monitor tmp dir This closes #1824 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b188637b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b188637b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b188637b Branch: refs/heads/master Commit: b188637b9016116728f0d16f94e213584b2abfd6 Parents: 6bb085e Author: xueyan.liAuthored: Tue Apr 5 14:10:48 2016 +0800 Committer: Stephan Ewen Committed: Wed Apr 13 01:10:55 2016 +0200 -- .../apache/flink/configuration/ConfigConstants.java | 5 + .../flink/runtime/webmonitor/HttpRequestHandler.java | 15 --- .../flink/runtime/webmonitor/WebRuntimeMonitor.java | 10 +- .../runtime/webmonitor/WebRuntimeMonitorITCase.java | 4 ++-- 4 files changed, 20 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/b188637b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java -- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index ba2d880..53d9a37 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -435,6 +435,11 @@ public final class ConfigConstants { public static final String JOB_MANAGER_WEB_PORT_KEY = "jobmanager.web.port"; /** +* The config parameter defining the flink web directory to be used by the webmonitor. +*/ + public static final String JOB_MANAGER_WEB_TMPDIR_KEY = "jobmanager.web.tmpdir"; + + /** * The config parameter defining the number of archived jobs for the jobmanager */ public static final String JOB_MANAGER_WEB_ARCHIVE_COUNT = "jobmanager.web.history"; http://git-wip-us.apache.org/repos/asf/flink/blob/b188637b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java -- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java index c9190c9..bbd29fa 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/HttpRequestHandler.java @@ -69,17 +69,18 @@ public class HttpRequestHandler extends SimpleChannelInboundHandler /** A decoder factory that always stores POST chunks on disk */ private static final HttpDataFactory DATA_FACTORY = new DefaultHttpDataFactory(true); - - private static final File TMP_DIR = new File(System.getProperty("java.io.tmpdir")); - - + + private final File tmpDir; + private HttpRequest currentRequest; private HttpPostRequestDecoder currentDecoder; - private String currentRequestPath; - + public HttpRequestHandler(File tmpDir) { + this.tmpDir = tmpDir; + } + @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { if (currentDecoder != null) { @@ -130,7 +131,7 @@ public class HttpRequestHandler extends SimpleChannelInboundHandler if (file.isCompleted()) { String name = file.getFilename(); - File target = new File(TMP_DIR, UUID.randomUUID() + "_" + name); + File target = new File(tmpDir, UUID.randomUUID() + "_" + name); file.renameTo(target); QueryStringEncoder encoder = new QueryStringEncoder(currentRequestPath); http://git-wip-us.apache.org/repos/asf/flink/blob/b188637b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java -- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
[03/14] flink git commit: [FLINK-3716] [kafka consumer] Decreasing socket timeout so testFailOnNoBroker() will pass before JUnit timeout
[FLINK-3716] [kafka consumer] Decreasing socket timeout so testFailOnNoBroker() will pass before JUnit timeout This closes #1864 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/af799883 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/af799883 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/af799883 Branch: refs/heads/master Commit: af799883371b7a78c48f227b25b7a0b099dabf93 Parents: 3c93103 Author: Todd LisonbeeAuthored: Thu Apr 7 17:47:19 2016 -0700 Committer: Stephan Ewen Committed: Wed Apr 13 01:10:54 2016 +0200 -- .../flink/streaming/connectors/kafka/KafkaConsumerTestBase.java | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/af799883/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index aa5344b..dd468a4 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -152,6 +152,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { properties.setProperty("zookeeper.connect", "localhost:80"); properties.setProperty("group.id", "test"); properties.setProperty("request.timeout.ms", "3000"); // let the test fail fast + properties.setProperty("socket.timeout.ms", "3000"); properties.setProperty("session.timeout.ms", "2000"); properties.setProperty("fetch.max.wait.ms", "2000"); properties.setProperty("heartbeat.interval.ms", "1000");
[10/14] flink git commit: [FLINK-3375] [kafka connector] Rework/simplify Kafka Connector and have a WatermarkExtractor object per partition
http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java deleted file mode 100644 index a38c3bd..000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PartitionerWrapper.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka.internals; - -import kafka.producer.Partitioner; -import kafka.utils.VerifiableProperties; - -/** - * Hacky wrapper to send an object instance through a Properties - map. - * - * This works as follows: - * The recommended way of creating a KafkaSink is specifying a classname for the partitioner. - * - * Otherwise (if the user gave a (serializable) class instance), we give Kafka the PartitionerWrapper class of Flink. - * This is set in the key-value (java.util.Properties) map. - * In addition to that, we use the Properties.put(Object, Object) to store the instance of the (serializable). - * This is a hack because the put() method is called on the underlying Hashmap. - * - * This PartitionerWrapper is called with the Properties. From there, we extract the wrapped Partitioner instance. - * - * The serializable PartitionerWrapper is serialized into the Properties Hashmap and also deserialized from there. - */ -public class PartitionerWrapper implements Partitioner { - public final static String SERIALIZED_WRAPPER_NAME = "flink.kafka.wrapper.serialized"; - - private Partitioner wrapped; - public PartitionerWrapper(VerifiableProperties properties) { - wrapped = (Partitioner) properties.props().get(SERIALIZED_WRAPPER_NAME); - } - - @Override - public int partition(Object value, int numberOfPartitions) { - return wrapped.partition(value, numberOfPartitions); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java new file mode 100644 index 000..6aaeca9 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/PeriodicOffsetCommitter.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.internals; + +import java.util.HashMap; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static
[04/14] flink git commit: [FLINK-3700] [build] Add 'findbugs' (javax.annotation) annotations as a core dependency.
[FLINK-3700] [build] Add 'findbugs' (javax.annotation) annotations as a core dependency. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/272fd12b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/272fd12b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/272fd12b Branch: refs/heads/master Commit: 272fd12b41a6e85f0e1825ad4f4593a01f9062aa Parents: f315c57 Author: Stephan EwenAuthored: Tue Apr 5 12:37:33 2016 +0200 Committer: Stephan Ewen Committed: Wed Apr 13 01:10:54 2016 +0200 -- pom.xml | 13 + 1 file changed, 13 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/272fd12b/pom.xml -- diff --git a/pom.xml b/pom.xml index 82b1567..bdf32d1 100644 --- a/pom.xml +++ b/pom.xml @@ -113,6 +113,12 @@ under the License. 1.1-SNAPSHOT + + + com.google.code.findbugs + jsr305 + + org.apache.commons commons-lang3 @@ -191,6 +197,13 @@ under the License. --> + + + com.google.code.findbugs + jsr305 + 1.3.9 + + org.apache.avro
[01/14] flink git commit: [FLINK-3444] [APIs] Add fromElements method with based class type to avoid the exception.
Repository: flink Updated Branches: refs/heads/master f315c5700 -> db85f3858 [FLINK-3444] [APIs] Add fromElements method with based class type to avoid the exception. This closes #1857 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bb085ec Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bb085ec Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bb085ec Branch: refs/heads/master Commit: 6bb085ec6d70e196b7b61bec5f6dc3f924ca7906 Parents: 693d5ab Author: gallenvaraAuthored: Wed Apr 6 16:04:32 2016 +0800 Committer: Stephan Ewen Committed: Wed Apr 13 01:10:54 2016 +0200 -- .../flink/api/java/ExecutionEnvironment.java| 45 - .../flink/api/java/io/FromElementsTest.java | 51 .../environment/StreamExecutionEnvironment.java | 33 + .../api/StreamExecutionEnvironmentTest.java | 27 +++ 4 files changed, 155 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6bb085ec/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 1363e26..89c817d 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -777,7 +777,50 @@ public abstract class ExecutionEnvironment { throw new IllegalArgumentException("The number of elements must not be zero."); } - return fromCollection(Arrays.asList(data), TypeExtractor.getForObject(data[0]), Utils.getCallLocationName()); + TypeInformation typeInfo; + try { + typeInfo = TypeExtractor.getForObject(data[0]); + } + catch (Exception e) { + throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName() + + "; please specify the TypeInformation manually via " + + "ExecutionEnvironment#fromElements(Collection, TypeInformation)"); + } + + return fromCollection(Arrays.asList(data), typeInfo, Utils.getCallLocationName()); + } + + /** +* Creates a new data set that contains the given elements. The framework will determine the type according to the +* based type user supplied. The elements should be the same or be the subclass to the based type. +* The sequence of elements must not be empty. +* Note that this operation will result in a non-parallel data source, i.e. a data source with +* a parallelism of one. +* +* @param type The base class type for every element in the collection. +* @param data The elements to make up the data set. +* @return A DataSet representing the given list of elements. +*/ + @SafeVarargs + public final DataSource fromElements(Class type, X... data) { + if (data == null) { + throw new IllegalArgumentException("The data must not be null."); + } + if (data.length == 0) { + throw new IllegalArgumentException("The number of elements must not be zero."); + } + + TypeInformation typeInfo; + try { + typeInfo = TypeExtractor.getForClass(type); + } + catch (Exception e) { + throw new RuntimeException("Could not create TypeInformation for type " + type.getName() + + "; please specify the TypeInformation manually via " + + "ExecutionEnvironment#fromElements(Collection, TypeInformation)"); + } + + return fromCollection(Arrays.asList(data), typeInfo, Utils.getCallLocationName()); } http://git-wip-us.apache.org/repos/asf/flink/blob/6bb085ec/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java -- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/FromElementsTest.java new file mode 100644 index 000..2f403aa --- /dev/null +++
[02/14] flink git commit: [hotfix] [kafka consumer] Increase Kafka test stability by validating written data before consuming
[hotfix] [kafka consumer] Increase Kafka test stability by validating written data before consuming Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d20eda1b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d20eda1b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d20eda1b Branch: refs/heads/master Commit: d20eda1b5252d888189af29a2b493023b4621a88 Parents: af79988 Author: Stephan EwenAuthored: Tue Apr 12 20:14:50 2016 +0200 Committer: Stephan Ewen Committed: Wed Apr 13 01:10:54 2016 +0200 -- .../connectors/kafka/Kafka08ITCase.java | 72 ++ .../connectors/kafka/KafkaConsumerTestBase.java | 217 +++ .../connectors/kafka/KafkaTestBase.java | 4 - .../testutils/JobManagerCommunicationUtils.java | 24 +- .../kafka/testutils/Tuple2Partitioner.java | 7 +- .../org/apache/flink/test/util/TestUtils.java | 4 +- 6 files changed, 217 insertions(+), 111 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/d20eda1b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java index 0aef3bd..530c032 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java @@ -18,9 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.TreeCache; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; -import org.apache.curator.framework.recipes.cache.TreeCacheListener; + import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.runtime.client.JobCancellationException; import org.apache.flink.streaming.api.datastream.DataStream; @@ -33,7 +31,6 @@ import org.junit.Assert; import org.junit.Test; import java.util.Properties; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; @@ -91,16 +88,11 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { @Test(timeout = 6) public void testInvalidOffset() throws Exception { - final String topic = "invalidOffsetTopic"; + final int parallelism = 1; - - // create topic - createTestTopic(topic, parallelism, 1); - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - + // write 20 messages into topic: - writeSequence(env, topic, 20, parallelism); + final String topic = writeSequence("invalidOffsetTopic", 20, parallelism, 1); // set invalid offset: CuratorFramework curatorClient = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient(); @@ -110,6 +102,10 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { // read from topic final int valuesCount = 20; final int startFrom = 0; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); + env.getConfig().disableSysoutLogging(); + readSequence(env, standardProps, parallelism, topic, valuesCount, startFrom); deleteTestTopic(topic); @@ -193,10 +189,10 @@ public class Kafka08ITCase extends KafkaConsumerTestBase { */ @Test(timeout = 6) public void testOffsetInZookeeper() throws Exception { - final String topicName = "testOffsetInZK"; final int parallelism = 3; - createTestTopic(topicName, parallelism, 1); + // write a sequence from 0 to 99 to each of the 3 partitions. + final String topicName = writeSequence("testOffsetInZK", 100, parallelism, 1); StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); env1.getConfig().disableSysoutLogging(); @@ -210,16 +206,7 @@ public class
[14/14] flink git commit: [FLINK-3737] [tests] Adding comment about SOCKS proxy server for WikipediaEditsSourceTest
[FLINK-3737] [tests] Adding comment about SOCKS proxy server for WikipediaEditsSourceTest This closes #1872 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/342db48b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/342db48b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/342db48b Branch: refs/heads/master Commit: 342db48b03eceee20d9ec54d97f33054fa0627e8 Parents: b188637 Author: Todd LisonbeeAuthored: Mon Apr 11 00:41:34 2016 -0700 Committer: Stephan Ewen Committed: Wed Apr 13 01:10:55 2016 +0200 -- .../connectors/wikiedits/WikipediaEditsSourceTest.java | 5 + 1 file changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/342db48b/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java -- diff --git a/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java b/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java index 73ab8bf..0c5d93a 100644 --- a/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java +++ b/flink-contrib/flink-connector-wikiedits/src/test/java/org/apache/flink/streaming/connectors/wikiedits/WikipediaEditsSourceTest.java @@ -29,6 +29,11 @@ import static org.junit.Assert.fail; public class WikipediaEditsSourceTest { + /** +* NOTE: if you are behind a firewall you may need to use a SOCKS Proxy for this test +* +* @see http://docs.oracle.com/javase/8/docs/technotes/guides/net/proxies.html;>Socks Proxy +*/ @Test(timeout = 120 * 1000) public void testWikipediaEditsSource() throws Exception {
[07/14] flink git commit: [FLINK-3700] [core] Add 'Preconditions' utility class.
[FLINK-3700] [core] Add 'Preconditions' utility class. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/885d543b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/885d543b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/885d543b Branch: refs/heads/master Commit: 885d543be8a8c0d1acdffa0003e394347d5376ef Parents: 272fd12 Author: Stephan EwenAuthored: Tue Apr 5 13:23:14 2016 +0200 Committer: Stephan Ewen Committed: Wed Apr 13 01:10:54 2016 +0200 -- .../org/apache/flink/util/Preconditions.java| 213 +++ 1 file changed, 213 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/885d543b/flink-core/src/main/java/org/apache/flink/util/Preconditions.java -- diff --git a/flink-core/src/main/java/org/apache/flink/util/Preconditions.java b/flink-core/src/main/java/org/apache/flink/util/Preconditions.java new file mode 100644 index 000..135038b --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/Preconditions.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// +// This class is largely adapted from "com.google.common.base.Preconditions", +// which is part of the "Guava" library. +// +// Because of frequent issues with dependency conflicts, this class was +// added to the Flink code base to reduce dependency on Guava. +// + +package org.apache.flink.util; + +import org.apache.flink.annotation.Internal; + +import javax.annotation.Nullable; + +/** + * A collection of static utility methods to validate input. + * + * This class is modelled after Google Guava's Preconditions class, and partly takes code + * from that class. We add this code to the Flink code base in order to reduce external + * dependencies. + */ +@Internal +public final class Preconditions { + + // + // Null checks + // + + /** +* Ensures that the given object reference is not null. +* Upon violation, a {@code NullPointerException} with no message is thrown. +* +* @param reference The object reference +* @return The object reference itself (generically typed). +* +* @throws NullPointerException Thrown, if the passed reference was null. +*/ + public static T checkNotNull(T reference) { + if (reference == null) { + throw new NullPointerException(); + } + return reference; + } + + /** +* Ensures that the given object reference is not null. +* Upon violation, a {@code NullPointerException} with the given message is thrown. +* +* @param reference The object reference +* @param errorMessage The message for the {@code NullPointerException} that is thrown if the check fails. +* @return The object reference itself (generically typed). +* +* @throws NullPointerException Thrown, if the passed reference was null. +*/ + public static T checkNotNull(T reference, @Nullable String errorMessage) { + if (reference == null) { + throw new NullPointerException(String.valueOf(errorMessage)); + } + return reference; + } + + /** +* Ensures that the given object reference is not null. +* Upon violation, a {@code NullPointerException} with the given message is thrown. +* +* The error message is constructed from a template and an arguments array, after +* a similar fashion as {@link String#format(String, Object...)}, but supporting
[11/14] flink git commit: [FLINK-3375] [kafka connector] Rework/simplify Kafka Connector and have a WatermarkExtractor object per partition
[FLINK-3375] [kafka connector] Rework/simplify Kafka Connector and have a WatermarkExtractor object per partition Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c93103d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c93103d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c93103d Branch: refs/heads/master Commit: 3c93103d1476cb05ec0c018bfa6876e4ecad38e8 Parents: 0ac1549 Author: Stephan EwenAuthored: Wed Apr 6 23:21:17 2016 +0200 Committer: Stephan Ewen Committed: Wed Apr 13 01:10:54 2016 +0200 -- .../connectors/kafka/FlinkKafkaConsumer08.java | 358 ++-- .../connectors/kafka/internals/Fetcher.java | 72 -- .../kafka/internals/Kafka08Fetcher.java | 446 + .../kafka/internals/KillerWatchDog.java | 62 ++ .../kafka/internals/LegacyFetcher.java | 896 --- .../kafka/internals/OffsetHandler.java | 55 -- .../kafka/internals/PartitionInfoFetcher.java | 66 ++ .../kafka/internals/PartitionerWrapper.java | 49 - .../internals/PeriodicOffsetCommitter.java | 85 ++ .../kafka/internals/SimpleConsumerThread.java | 504 +++ .../kafka/internals/ZookeeperOffsetHandler.java | 58 +- .../connectors/kafka/Kafka08ITCase.java | 176 ++-- .../connectors/kafka/KafkaConsumer08Test.java | 90 ++ .../connectors/kafka/KafkaConsumerTest.java | 156 .../kafka/KafkaShortRetention08ITCase.java | 3 +- .../internals/ZookeeperOffsetHandlerTest.java | 56 -- .../src/test/resources/log4j-test.properties| 5 +- .../connectors/kafka/FlinkKafkaConsumer09.java | 398 ++-- .../kafka/internal/Kafka09Fetcher.java | 311 +++ .../connectors/kafka/Kafka09ITCase.java | 21 +- .../connectors/kafka/KafkaProducerTest.java | 8 +- .../kafka/KafkaShortRetention09ITCase.java | 1 + .../src/test/resources/log4j-test.properties| 5 +- .../kafka/FlinkKafkaConsumerBase.java | 668 ++ .../kafka/internals/AbstractFetcher.java| 439 + .../kafka/internals/ExceptionProxy.java | 73 ++ .../kafka/internals/KafkaPartitionState.java| 65 -- .../kafka/internals/KafkaTopicPartition.java| 36 +- .../internals/KafkaTopicPartitionState.java | 105 +++ ...picPartitionStateWithPeriodicWatermarks.java | 71 ++ ...cPartitionStateWithPunctuatedWatermarks.java | 84 ++ .../kafka/partitioner/KafkaPartitioner.java | 2 +- .../connectors/kafka/util/KafkaUtils.java | 13 +- .../kafka/FlinkKafkaConsumerBaseTest.java | 222 + .../KafkaConsumerPartitionAssignmentTest.java | 96 +- .../connectors/kafka/KafkaConsumerTestBase.java | 175 ++-- .../connectors/kafka/KafkaProducerTestBase.java | 8 +- .../kafka/KafkaShortRetentionTestBase.java | 45 +- .../internals/KafkaTopicPartitionTest.java | 57 ++ .../testutils/JobManagerCommunicationUtils.java | 49 +- .../kafka/testutils/MockRuntimeContext.java | 26 +- .../AssignerWithPeriodicWatermarks.java | 3 + .../AssignerWithPunctuatedWatermarks.java | 3 + 43 files changed, 3407 insertions(+), 2714 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/3c93103d/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java -- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java index 4748781..48cc461 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.kafka; +import kafka.api.OffsetRequest; import kafka.cluster.Broker; import kafka.common.ErrorMapping; import kafka.javaapi.PartitionMetadata; @@ -24,40 +25,32 @@ import kafka.javaapi.TopicMetadata; import kafka.javaapi.TopicMetadataRequest; import kafka.javaapi.consumer.SimpleConsumer; -import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; -import org.apache.flink.streaming.api.watermark.Watermark;
[12/14] flink git commit: [FLINK-3126] [core] Remove accumulator type from "value" in web frontend
[FLINK-3126] [core] Remove accumulator type from "value" in web frontend This closes #1868 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db85f385 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db85f385 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db85f385 Branch: refs/heads/master Commit: db85f385846a6541c0707ed1ba6fed78446423b5 Parents: 342db48 Author: Zack PierceAuthored: Mon Apr 11 10:17:34 2016 -0700 Committer: Stephan Ewen Committed: Wed Apr 13 01:10:55 2016 +0200 -- .../StringifiedAccumulatorResult.java | 14 +- .../StringifiedAccumulatorResultTest.java | 138 +++ 2 files changed, 149 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/db85f385/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java index a0d1eda..c4faad1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResult.java @@ -55,6 +55,9 @@ public class StringifiedAccumulatorResult implements java.io.Serializable{ // Utilities // + /** +* Flatten a map of accumulator names to Accumulator instances into an array of StringifiedAccumulatorResult values + */ public static StringifiedAccumulatorResult[] stringifyAccumulatorResults(Map accs) { if (accs == null || accs.isEmpty()) { return new StringifiedAccumulatorResult[0]; @@ -65,9 +68,14 @@ public class StringifiedAccumulatorResult implements java.io.Serializable{ int i = 0; for (Map.Entry entry : accs.entrySet()) { StringifiedAccumulatorResult result; - Accumulator value = entry.getValue(); - if (value != null) { - result = new StringifiedAccumulatorResult(entry.getKey(), value.getClass().getSimpleName(), value.toString()); + Accumulator accumulator = entry.getValue(); + if (accumulator != null) { + Object localValue = accumulator.getLocalValue(); + if (localValue != null) { + result = new StringifiedAccumulatorResult(entry.getKey(), accumulator.getClass().getSimpleName(), localValue.toString()); + } else { + result = new StringifiedAccumulatorResult(entry.getKey(), accumulator.getClass().getSimpleName(), "null"); + } } else { result = new StringifiedAccumulatorResult(entry.getKey(), "null", "null"); } http://git-wip-us.apache.org/repos/asf/flink/blob/db85f385/flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java new file mode 100644 index 000..e6d637b --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/accumulators/StringifiedAccumulatorResultTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR