flink git commit: [docs] fix readme typos; use the same scala style in the examples
Repository: flink Updated Branches: refs/heads/master a922473c0 -> e8e88afdc [docs] fix readme typos; use the same scala style in the examples This closes #1743 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e8e88afd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e8e88afd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e8e88afd Branch: refs/heads/master Commit: e8e88afdc4807277a1d8df8663b33e60b5688d0d Parents: a922473 Author: vasia Authored: Mon Feb 29 22:49:35 2016 +0100 Committer: vasia Committed: Tue Mar 1 08:15:12 2016 +0100 -- README.md | 14 +++--- 1 file changed, 7 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/e8e88afd/README.md -- diff --git a/README.md b/README.md index 41ea37d..1597a58 100644 --- a/README.md +++ b/README.md @@ -19,17 +19,17 @@ Learn more about Flink at [http://flink.apache.org/](http://flink.apache.org/) * Fault-tolerance with *exactly-once* processing guarantees -* Natural back-pressure in streaming programs. +* Natural back-pressure in streaming programs * Libraries for Graph processing (batch), Machine Learning (batch), and Complex Event Processing (streaming) -* Built-in support for iterative programs (BSP) and in the DataSet (batch) API. +* Built-in support for iterative programs (BSP) in the DataSet (batch) API -* Custom memory management to for efficient and robust switching between in-memory and out-of-core data processing algorithms. +* Custom memory management for efficient and robust switching between in-memory and out-of-core data processing algorithms -* Compatibility layers for Apache Hadoop MapReduce and Apache Storm. +* Compatibility layers for Apache Hadoop MapReduce and Apache Storm -* Integration with YARN, HDFS, HBase, and other components of the Apache Hadoop ecosystem. +* Integration with YARN, HDFS, HBase, and other components of the Apache Hadoop ecosystem ### Streaming Example @@ -53,8 +53,8 @@ case class WordWithCount(word: String, count: Long) val text = env.readTextFile(path) -val counts = text.flatMap { _.split("\\W+") } - .map { WordWithCount(_, 1) } +val counts = text.flatMap { w => w.split("\\s") } + .map { w => WordWithCount(w, 1) } .groupBy("word") .sum("count")
[2/2] flink git commit: [FLINK-3537] Fix code gen for disjunctions.
[FLINK-3537] Fix code gen for disjunctions. This closes #1733 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c57e2415 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c57e2415 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c57e2415 Branch: refs/heads/tableOnCalcite Commit: c57e2415fdeac80f8c851f7ddb4fabe7a1781836 Parents: e097cf7 Author: Fabian Hueske Authored: Mon Feb 29 15:00:06 2016 +0100 Committer: vasia Committed: Mon Feb 29 23:15:56 2016 +0100 -- .../api/table/codegen/calls/ScalarOperators.scala | 2 +- .../flink/api/java/table/test/FilterITCase.java| 16 .../flink/api/scala/table/test/FilterITCase.scala | 12 .../flink/api/scala/table/test/JoinITCase.scala| 17 + 4 files changed, 46 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/c57e2415/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala index 8580b25..f71b643 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala @@ -303,7 +303,7 @@ object ScalarOperators { s""" |${left.code} |${right.code} -|boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm}; +|boolean $resultTerm = ${left.resultTerm} || ${right.resultTerm}; |""".stripMargin } http://git-wip-us.apache.org/repos/asf/flink/blob/c57e2415/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java -- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java index c69d1a7..c783524 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java @@ -126,6 +126,22 @@ public class FilterITCase extends TableProgramsTestBase { } @Test + public void testDisjunctivePreds() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); + + DataSet> input = CollectionDataSets.get3TupleDataSet(env); + + Table table = tableEnv.fromDataSet(input, "a, b, c"); + Table result = table.filter("a < 2 || a > 20"); + + DataSet ds = tableEnv.toDataSet(result, Row.class); + List results = ds.collect(); + String expected = "1,1,Hi\n" + "21,6,Comment#15\n"; + compareResultAsText(results, expected); + } + + @Test public void testIntegerBiggerThan128() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = getJavaTableEnvironment(); http://git-wip-us.apache.org/repos/asf/flink/blob/c57e2415/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala index 2dfdb2c..0febd4d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala @@ -126,6 +126,18 @@ class FilterITCase( } @Test + def testDisjunctivePreds(): Unit = { + +val env = ExecutionEnvironment.getExecutionEnvironment +val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + +val filterDs = ds.filter( 'a < 2 || 'a > 20) +val expected = "1,1,Hi\n" + "21,6,Comment#15\n" +val results = filterDs.collect() +TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test def testFilterMerge(): Unit = { // verify FilterMergeRule. http://git-wip-us.apache.org/repos
[1/2] flink git commit: [FLINK-3504] Fix join translation. Equality predicates may only reference fields.
Repository: flink Updated Branches: refs/heads/tableOnCalcite e097cf7db -> 031d6f745 [FLINK-3504] Fix join translation. Equality predicates may only reference fields. Catch Calcite planner exception and rethrow with additional error message This closes #1734 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/031d6f74 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/031d6f74 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/031d6f74 Branch: refs/heads/tableOnCalcite Commit: 031d6f745fe63e1c089055b825f7816b91b8433b Parents: c57e241 Author: Fabian Hueske Authored: Mon Feb 29 14:46:08 2016 +0100 Committer: vasia Committed: Mon Feb 29 23:15:56 2016 +0100 -- .../api/java/table/JavaBatchTranslator.scala| 25 - .../plan/rules/logical/FlinkJoinRule.scala | 54 +++- .../flink/api/scala/table/test/JoinITCase.scala | 20 3 files changed, 96 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/031d6f74/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala index 7e8ee77..f238df3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.java.table +import org.apache.calcite.plan.RelOptPlanner.CannotPlanException import org.apache.calcite.plan.{RelTraitSet, RelOptUtil} import org.apache.calcite.rel.{RelCollations, RelNode} import org.apache.calcite.sql2rel.RelDecorrelator @@ -75,7 +76,17 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator { // optimize the logical Flink plan val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES) val flinkOutputProps = RelTraitSet.createEmpty() -val optPlan = optProgram.run(planner, decorPlan, flinkOutputProps) + +val optPlan = try { + optProgram.run(planner, decorPlan, flinkOutputProps) +} +catch { + case e: CannotPlanException => +throw new PlanGenException( + s"Cannot generate a valid execution plan for the given query: \n\n" + + s"${RelOptUtil.toString(lPlan)}\n" + + "Please consider filing a bug report.", e) +} println("---") println("Optimized Plan:") @@ -87,7 +98,17 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator { val dataSetOutputProps = RelTraitSet.createEmpty() .plus(DataSetConvention.INSTANCE) .plus(RelCollations.of()).simplify() -val dataSetPlan = dataSetProgram.run(planner, optPlan, dataSetOutputProps) + +val dataSetPlan = try { + dataSetProgram.run(planner, optPlan, dataSetOutputProps) +} +catch { + case e: CannotPlanException => +throw new PlanGenException( + s"Cannot generate a valid execution plan for the given query: \n\n" + +s"${RelOptUtil.toString(lPlan)}\n" + +"Please consider filing a bug report.", e) +} println("-") println("DataSet Plan:") http://git-wip-us.apache.org/repos/asf/flink/blob/031d6f74/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala index 3826c9a..82f3eaa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala @@ -18,12 +18,16 @@ package org.apache.flink.api.table.plan.rules.logical -import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet} +import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.logical.LogicalJoin +import org.apache.calcite.rex.{RexInputRef, RexCall} +import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.flink.api.table.plan.nodes.logical.{FlinkJoin, FlinkConvention} +import scala.collec
buildbot success in on flink-docs-release-0.10
The Buildbot has detected a restored build on builder flink-docs-release-0.10 while building . Full details are available at: https://ci.apache.org/builders/flink-docs-release-0.10/builds/134 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: orcus_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.10' triggered this build Build Source Stamp: [branch release-0.10] HEAD Blamelist: Build succeeded! Sincerely, -The Buildbot
buildbot success in on flink-docs-master
The Buildbot has detected a restored build on builder flink-docs-master while building . Full details are available at: https://ci.apache.org/builders/flink-docs-master/builds/251 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: lares_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-master' triggered this build Build Source Stamp: [branch master] HEAD Blamelist: Build succeeded! Sincerely, -The Buildbot
[2/2] flink git commit: [FLINK-3548] [api-breaking] Remove unnecessary generic parameter from SingleOutputStreamOperator
[FLINK-3548] [api-breaking] Remove unnecessary generic parameter from SingleOutputStreamOperator Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a922473c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a922473c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a922473c Branch: refs/heads/master Commit: a922473c0835a757e7108c79ad52f103ace86030 Parents: 51fc298 Author: Aljoscha Krettek Authored: Mon Feb 29 20:20:07 2016 +0100 Committer: Aljoscha Krettek Committed: Mon Feb 29 21:37:19 2016 +0100 -- .../apache/flink/storm/api/FlinkTopology.java | 14 +-- .../api/datastream/AllWindowedStream.java | 54 +- .../api/datastream/ConnectedStreams.java| 20 ++-- .../streaming/api/datastream/DataStream.java| 24 ++--- .../api/datastream/DataStreamSource.java| 4 +- .../api/datastream/IterativeStream.java | 17 +-- .../streaming/api/datastream/KeyedStream.java | 38 +++ .../datastream/SingleOutputStreamOperator.java | 77 +++--- .../api/datastream/StreamProjection.java| 104 +-- .../api/datastream/WindowedStream.java | 54 +- .../environment/StreamExecutionEnvironment.java | 12 +-- .../flink/streaming/api/DataStreamTest.java | 4 +- .../apache/flink/streaming/api/IterateTest.java | 9 +- .../flink/streaming/api/TypeFillTest.java | 2 +- .../api/graph/StreamGraphGeneratorTest.java | 10 +- .../flink/streaming/api/scala/DataStream.scala | 18 ++-- .../StreamingScalaAPICompletenessTest.scala | 2 +- 17 files changed, 211 insertions(+), 252 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/a922473c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java -- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java index 811cfb4..6706a91 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java @@ -200,7 +200,7 @@ public class FlinkTopology { SplitStream> splitSource = multiSource .split(new StormStreamSelector()); for (String streamId : sourceStreams.keySet()) { - SingleOutputStreamOperator outStream = splitSource.select(streamId) + SingleOutputStreamOperator outStream = splitSource.select(streamId) .map(new SplitStreamMapper()); outStream.getTransformation().setOutputType(declarer.getOutputType(streamId)); outputStreams.put(streamId, outStream); @@ -299,7 +299,7 @@ public class FlinkTopology { inputStreams.put(streamId, processInput(boltId, userBolt, streamId, grouping, producer)); } - final SingleOutputStreamOperator outputStream = createOutput(boltId, + final SingleOutputStreamOperator outputStream = createOutput(boltId, userBolt, inputStreams); if (common.is_set_parallelism_hint()) { @@ -359,7 +359,7 @@ public class FlinkTopology { } @SuppressWarnings({ "unchecked", "rawtypes" }) - private SingleOutputStreamOperator createOutput(String boltId, IRichBolt bolt, + private SingleOutputStreamOperator createOutput(String boltId, IRichBolt bolt, Map> inputStreams) { assert (boltId != null); assert (bolt != null); @@ -403,7 +403,7 @@ public class FlinkTopology { final HashMap boltOutputs = this.outputStreams.get(boltId); final FlinkOutputFieldsDeclarer declarer = this.declarers.get(boltId); - final SingleOutputStreamOperator outputStream; + final SingleOutputStreamOperator outputStream; if (boltOutputs.size() < 2) { // single output stream or sink String outputStreamId; @@ -415,7 +415,7 @@ public class FlinkTopology { final TypeInformation outType = declarer.getOutputType(outputStreamId); - final SingleOutputStreamOperator outStream; + final SingleOutputStreamOperator outStream;
[1/2] flink git commit: [FLINK-3548] [api-breaking] Remove unnecessary generic parameter from SingleOutputStreamOperator
Repository: flink Updated Branches: refs/heads/master 51fc29812 -> a922473c0 http://git-wip-us.apache.org/repos/asf/flink/blob/a922473c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 4c1c265..fb7ec9f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -735,7 +735,7 @@ public abstract class StreamExecutionEnvironment { SourceFunction function; try { - function = new FromElementsFunction(typeInfo.createSerializer(getConfig()), data); + function = new FromElementsFunction<>(typeInfo.createSerializer(getConfig()), data); } catch (IOException e) { throw new RuntimeException(e.getMessage(), e); @@ -789,7 +789,7 @@ public abstract class StreamExecutionEnvironment { public DataStreamSource fromCollection(Iterator data, TypeInformation typeInfo) { Preconditions.checkNotNull(data, "The iterator must not be null"); - SourceFunction function = new FromIteratorFunction(data); + SourceFunction function = new FromIteratorFunction<>(data); return addSource(function, "Collection Source", typeInfo); } @@ -838,7 +838,7 @@ public abstract class StreamExecutionEnvironment { // private helper for passing different names private DataStreamSource fromParallelCollection(SplittableIterator iterator, TypeInformation typeInfo, String operatorName) { - return addSource(new FromSplittableIteratorFunction(iterator), operatorName).returns(typeInfo); + return addSource(new FromSplittableIteratorFunction<>(iterator), operatorName, typeInfo); } /** @@ -1033,8 +1033,8 @@ public abstract class StreamExecutionEnvironment { // private helper for passing different names private DataStreamSource createInput(InputFormat inputFormat, TypeInformation typeInfo, String sourceName) { - FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo); - return addSource(function, sourceName).returns(typeInfo); + FileSourceFunction function = new FileSourceFunction<>(inputFormat, typeInfo); + return addSource(function, sourceName, typeInfo); } /** @@ -1136,7 +1136,7 @@ public abstract class StreamExecutionEnvironment { sourceOperator = new StreamSource<>(function); } - return new DataStreamSource(this, typeInfo, sourceOperator, isParallel, sourceName); + return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/a922473c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java -- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index 7a4d6f8..cf48160 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -435,7 +435,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase { DataStreamSource> src = env.fromElements(new Tuple2<>(0L, 0L)); env.setParallelism(10); - SingleOutputStreamOperator map = src.map(new MapFunction, Long>() { + SingleOutputStreamOperator map = src.map(new MapFunction, Long>() { @Override public Long map(Tuple2 value) throws Exception { return null; @@ -759,7 +759,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase { @SuppressWarnings("rawtypes,unchecked") private static Integer createDownStreamId(ConnectedStreams dataStream) { - SingleOutputStreamOperator coMap = dataStream.map(new CoMapFunction, Tuple2, Object>() { + SingleOutputStreamOperator coMap = dataStream.map(new CoMapFunction, Tuple2, Object>() { private static final long serialVersi
[1/2] flink git commit: [FLINK-3548] [api-breaking] Remove unnecessary generic parameter from SingleOutputStreamOperator
Repository: flink Updated Branches: refs/heads/release-1.0 74c62b0b8 -> 30486905b http://git-wip-us.apache.org/repos/asf/flink/blob/30486905/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 4c1c265..fb7ec9f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -735,7 +735,7 @@ public abstract class StreamExecutionEnvironment { SourceFunction function; try { - function = new FromElementsFunction(typeInfo.createSerializer(getConfig()), data); + function = new FromElementsFunction<>(typeInfo.createSerializer(getConfig()), data); } catch (IOException e) { throw new RuntimeException(e.getMessage(), e); @@ -789,7 +789,7 @@ public abstract class StreamExecutionEnvironment { public DataStreamSource fromCollection(Iterator data, TypeInformation typeInfo) { Preconditions.checkNotNull(data, "The iterator must not be null"); - SourceFunction function = new FromIteratorFunction(data); + SourceFunction function = new FromIteratorFunction<>(data); return addSource(function, "Collection Source", typeInfo); } @@ -838,7 +838,7 @@ public abstract class StreamExecutionEnvironment { // private helper for passing different names private DataStreamSource fromParallelCollection(SplittableIterator iterator, TypeInformation typeInfo, String operatorName) { - return addSource(new FromSplittableIteratorFunction(iterator), operatorName).returns(typeInfo); + return addSource(new FromSplittableIteratorFunction<>(iterator), operatorName, typeInfo); } /** @@ -1033,8 +1033,8 @@ public abstract class StreamExecutionEnvironment { // private helper for passing different names private DataStreamSource createInput(InputFormat inputFormat, TypeInformation typeInfo, String sourceName) { - FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo); - return addSource(function, sourceName).returns(typeInfo); + FileSourceFunction function = new FileSourceFunction<>(inputFormat, typeInfo); + return addSource(function, sourceName, typeInfo); } /** @@ -1136,7 +1136,7 @@ public abstract class StreamExecutionEnvironment { sourceOperator = new StreamSource<>(function); } - return new DataStreamSource(this, typeInfo, sourceOperator, isParallel, sourceName); + return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/30486905/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java -- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index 7a4d6f8..cf48160 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -435,7 +435,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase { DataStreamSource> src = env.fromElements(new Tuple2<>(0L, 0L)); env.setParallelism(10); - SingleOutputStreamOperator map = src.map(new MapFunction, Long>() { + SingleOutputStreamOperator map = src.map(new MapFunction, Long>() { @Override public Long map(Tuple2 value) throws Exception { return null; @@ -759,7 +759,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase { @SuppressWarnings("rawtypes,unchecked") private static Integer createDownStreamId(ConnectedStreams dataStream) { - SingleOutputStreamOperator coMap = dataStream.map(new CoMapFunction, Tuple2, Object>() { + SingleOutputStreamOperator coMap = dataStream.map(new CoMapFunction, Tuple2, Object>() { private static final long serial
[2/2] flink git commit: [FLINK-3548] [api-breaking] Remove unnecessary generic parameter from SingleOutputStreamOperator
[FLINK-3548] [api-breaking] Remove unnecessary generic parameter from SingleOutputStreamOperator Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30486905 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30486905 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30486905 Branch: refs/heads/release-1.0 Commit: 30486905b81e9150b5efdc81defa98513c7032dc Parents: 74c62b0 Author: Aljoscha Krettek Authored: Mon Feb 29 20:20:07 2016 +0100 Committer: Aljoscha Krettek Committed: Mon Feb 29 21:37:46 2016 +0100 -- .../apache/flink/storm/api/FlinkTopology.java | 14 +-- .../api/datastream/AllWindowedStream.java | 54 +- .../api/datastream/ConnectedStreams.java| 20 ++-- .../streaming/api/datastream/DataStream.java| 24 ++--- .../api/datastream/DataStreamSource.java| 4 +- .../api/datastream/IterativeStream.java | 17 +-- .../streaming/api/datastream/KeyedStream.java | 38 +++ .../datastream/SingleOutputStreamOperator.java | 77 +++--- .../api/datastream/StreamProjection.java| 104 +-- .../api/datastream/WindowedStream.java | 54 +- .../environment/StreamExecutionEnvironment.java | 12 +-- .../flink/streaming/api/DataStreamTest.java | 4 +- .../apache/flink/streaming/api/IterateTest.java | 9 +- .../flink/streaming/api/TypeFillTest.java | 2 +- .../api/graph/StreamGraphGeneratorTest.java | 10 +- .../flink/streaming/api/scala/DataStream.scala | 18 ++-- .../StreamingScalaAPICompletenessTest.scala | 2 +- 17 files changed, 211 insertions(+), 252 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/30486905/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java -- diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java index 811cfb4..6706a91 100644 --- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java +++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java @@ -200,7 +200,7 @@ public class FlinkTopology { SplitStream> splitSource = multiSource .split(new StormStreamSelector()); for (String streamId : sourceStreams.keySet()) { - SingleOutputStreamOperator outStream = splitSource.select(streamId) + SingleOutputStreamOperator outStream = splitSource.select(streamId) .map(new SplitStreamMapper()); outStream.getTransformation().setOutputType(declarer.getOutputType(streamId)); outputStreams.put(streamId, outStream); @@ -299,7 +299,7 @@ public class FlinkTopology { inputStreams.put(streamId, processInput(boltId, userBolt, streamId, grouping, producer)); } - final SingleOutputStreamOperator outputStream = createOutput(boltId, + final SingleOutputStreamOperator outputStream = createOutput(boltId, userBolt, inputStreams); if (common.is_set_parallelism_hint()) { @@ -359,7 +359,7 @@ public class FlinkTopology { } @SuppressWarnings({ "unchecked", "rawtypes" }) - private SingleOutputStreamOperator createOutput(String boltId, IRichBolt bolt, + private SingleOutputStreamOperator createOutput(String boltId, IRichBolt bolt, Map> inputStreams) { assert (boltId != null); assert (bolt != null); @@ -403,7 +403,7 @@ public class FlinkTopology { final HashMap boltOutputs = this.outputStreams.get(boltId); final FlinkOutputFieldsDeclarer declarer = this.declarers.get(boltId); - final SingleOutputStreamOperator outputStream; + final SingleOutputStreamOperator outputStream; if (boltOutputs.size() < 2) { // single output stream or sink String outputStreamId; @@ -415,7 +415,7 @@ public class FlinkTopology { final TypeInformation outType = declarer.getOutputType(outputStreamId); - final SingleOutputStreamOperator outStream; + final SingleOutputStreamOperator outStream;
flink git commit: [FLINK-3540] Shade org.htrace in flink-shaded-hadoop to get rid of its Guava dependency
Repository: flink Updated Branches: refs/heads/release-1.0 ae5b4573b -> 74c62b0b8 [FLINK-3540] Shade org.htrace in flink-shaded-hadoop to get rid of its Guava dependency This closes #1737 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74c62b0b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74c62b0b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74c62b0b Branch: refs/heads/release-1.0 Commit: 74c62b0b871d271f0dca18b88be64126a53774f6 Parents: ae5b457 Author: Robert Metzger Authored: Mon Feb 29 16:13:02 2016 +0100 Committer: Robert Metzger Committed: Mon Feb 29 21:11:24 2016 +0100 -- flink-shaded-hadoop/pom.xml | 5 + 1 file changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/74c62b0b/flink-shaded-hadoop/pom.xml -- diff --git a/flink-shaded-hadoop/pom.xml b/flink-shaded-hadoop/pom.xml index 1920737..1e8bdf4 100644 --- a/flink-shaded-hadoop/pom.xml +++ b/flink-shaded-hadoop/pom.xml @@ -117,6 +117,7 @@ under the License. io.netty:netty:* org.apache.curator:* org.apache.hadoop:* + org.htrace:htrace-core net.java.dev.jets3t:jets3t @@ -152,6 +153,10 @@ under the License. org.apache.commons.httpclient org.apache.flink.hadoop.shaded.org.apache.commons.httpclient + + org.htrace + org.apache.flink.hadoop.shaded.org.htrace +
flink git commit: [FLINK-3540] Shade org.htrace in flink-shaded-hadoop to get rid of its Guava dependency
Repository: flink Updated Branches: refs/heads/master 9922d10a0 -> 51fc29812 [FLINK-3540] Shade org.htrace in flink-shaded-hadoop to get rid of its Guava dependency This closes #1737 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51fc2981 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51fc2981 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51fc2981 Branch: refs/heads/master Commit: 51fc298128405d4ce0e047185ac1c6e5b1753546 Parents: 9922d10 Author: Robert Metzger Authored: Mon Feb 29 16:13:02 2016 +0100 Committer: Robert Metzger Committed: Mon Feb 29 21:10:28 2016 +0100 -- flink-shaded-hadoop/pom.xml | 5 + 1 file changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/51fc2981/flink-shaded-hadoop/pom.xml -- diff --git a/flink-shaded-hadoop/pom.xml b/flink-shaded-hadoop/pom.xml index af49b92..6f4d441 100644 --- a/flink-shaded-hadoop/pom.xml +++ b/flink-shaded-hadoop/pom.xml @@ -117,6 +117,7 @@ under the License. io.netty:netty:* org.apache.curator:* org.apache.hadoop:* + org.htrace:htrace-core net.java.dev.jets3t:jets3t @@ -152,6 +153,10 @@ under the License. org.apache.commons.httpclient org.apache.flink.hadoop.shaded.org.apache.commons.httpclient + + org.htrace + org.apache.flink.hadoop.shaded.org.htrace +
[2/2] flink git commit: [FLINK-3536] Make clearer distinction between event time and processing time
[FLINK-3536] Make clearer distinction between event time and processing time This brings it more in line with *ProcessingTimeWindows and makes it clear what type of window assigner it is. The old name, i.e. SlidingTimeWindows and TumblingTimeWindows is still available but deprecated. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9922d10a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9922d10a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9922d10a Branch: refs/heads/master Commit: 9922d10a0f3e291bb7e6f75ccb70baecb5c2bcd9 Parents: 0ac2b1a Author: Aljoscha Krettek Authored: Mon Feb 29 14:56:29 2016 +0100 Committer: Aljoscha Krettek Committed: Mon Feb 29 20:29:11 2016 +0100 -- docs/apis/streaming/index.md| 16 +-- docs/apis/streaming/time.md | 2 +- docs/apis/streaming/windows.md | 31 +++-- .../streaming/examples/join/WindowJoin.java | 4 +- .../scala/examples/join/WindowJoin.scala| 4 +- .../api/datastream/CoGroupedStreams.java| 2 +- .../streaming/api/datastream/DataStream.java| 12 +- .../streaming/api/datastream/JoinedStreams.java | 2 +- .../streaming/api/datastream/KeyedStream.java | 12 +- .../assigners/SlidingEventTimeWindows.java | 112 +++ .../assigners/SlidingProcessingTimeWindows.java | 4 +- .../windowing/assigners/SlidingTimeWindows.java | 73 ++-- .../assigners/TumblingEventTimeWindows.java | 98 .../TumblingProcessingTimeWindows.java | 4 +- .../assigners/TumblingTimeWindows.java | 60 ++ .../windowing/AllWindowTranslationTest.java | 36 +++--- .../operators/windowing/CoGroupJoinITCase.java | 8 +- .../windowing/NonKeyedWindowOperatorTest.java | 10 +- .../windowing/TimeWindowTranslationTest.java| 8 +- .../operators/windowing/WindowFoldITCase.java | 6 +- .../operators/windowing/WindowOperatorTest.java | 12 +- .../windowing/WindowTranslationTest.java| 30 ++--- .../streaming/timestamp/TimestampITCase.java| 6 +- .../streaming/api/scala/CoGroupedStreams.scala | 2 +- .../flink/streaming/api/scala/DataStream.scala | 4 +- .../streaming/api/scala/JoinedStreams.scala | 2 +- .../flink/streaming/api/scala/KeyedStream.scala | 4 +- .../api/scala/AllWindowTranslationTest.scala| 26 ++--- .../streaming/api/scala/CoGroupJoinITCase.scala | 8 +- .../streaming/api/scala/WindowFoldITCase.scala | 6 +- .../api/scala/WindowTranslationTest.scala | 22 ++-- 31 files changed, 364 insertions(+), 262 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/docs/apis/streaming/index.md -- diff --git a/docs/apis/streaming/index.md b/docs/apis/streaming/index.md index b8a3541..3741a46 100644 --- a/docs/apis/streaming/index.md +++ b/docs/apis/streaming/index.md @@ -293,7 +293,7 @@ keyedStream.maxBy("key"); key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows. {% highlight java %} -dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data +dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data {% endhighlight %} @@ -307,7 +307,7 @@ dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.seconds(5))); // Last 5 s WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator. {% highlight java %} -dataStream.windowAll(TumblingTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data +dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data {% endhighlight %} @@ -410,7 +410,7 @@ dataStream.union(otherStream1, otherStream2, ...); {% highlight java %} dataStream.join(otherStream) .where(0).equalTo(1) -.window(TumblingTimeWindows.of(Time.seconds(3))) +.window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new JoinFunction () {...}); {% endhighlight %} @@ -422,7 +422,7 @@ dataStream.join(otherStream) {% highlight java %} dataStream.coGroup(otherStream) .where(0).equalTo(1) -.window(TumblingTimeWindows.of(Time.seconds(3))) +.window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new CoGroupFunction () {...}); {% endhighlight %} @@ -669,7 +669,7 @@ keyedStream.maxBy("key") key according to some characteristic (e.g., the data that arrived within the last 5 sec
[1/2] flink git commit: [FLINK-3536] Make clearer distinction between event time and processing time
Repository: flink Updated Branches: refs/heads/master 0ac2b1a7b -> 9922d10a0 http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala -- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala index a676757..f4101cb 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala @@ -27,7 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.function.{WindowFunction, AllWindowFunction} import org.apache.flink.streaming.api.transformations.OneInputTransformation -import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingTimeWindows, SlidingTimeWindows} +import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingEventTimeWindows, SlidingEventTimeWindows} import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger} @@ -59,7 +59,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val reducer = new DummyReducer val window1 = source - .windowAll(SlidingTimeWindows.of( + .windowAll(SlidingEventTimeWindows.of( Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .reduce(reducer) @@ -73,7 +73,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val window2 = source .keyBy(0) - .windowAll(SlidingTimeWindows.of( + .windowAll(SlidingEventTimeWindows.of( Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { @@ -100,7 +100,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val reducer = new DummyReducer val window1 = source - .windowAll(SlidingTimeWindows.of( + .windowAll(SlidingEventTimeWindows.of( Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .trigger(CountTrigger.of(100)) @@ -114,13 +114,13 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _, _]]) val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _, _]] assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]]) -assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows]) + assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) assertTrue( winOperator1.getWindowBufferFactory.isInstanceOf[ReducingWindowBuffer.Factory[_]]) val window2 = source - .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { def apply( @@ -137,7 +137,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _, _]]) val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _, _]] assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]]) - assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows]) + assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[ListWindowBuffer.Factory[_]]) } @@ -170,7 +170,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val window2 = source - .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .evictor(CountEvictor.of(1000)) .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { @@ -189,7 +189,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val winOperator2 = operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _, _]] assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]]) assertTrue(winOperator2.getEvictor.isInstanceOf
[1/2] flink git commit: [FLINK-3536] Make clearer distinction between event time and processing time
Repository: flink Updated Branches: refs/heads/release-1.0 9b9f84e80 -> ae5b4573b http://git-wip-us.apache.org/repos/asf/flink/blob/ae5b4573/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala -- diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala index a676757..f4101cb 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala @@ -27,7 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.function.{WindowFunction, AllWindowFunction} import org.apache.flink.streaming.api.transformations.OneInputTransformation -import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingTimeWindows, SlidingTimeWindows} +import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingEventTimeWindows, SlidingEventTimeWindows} import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger} @@ -59,7 +59,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val reducer = new DummyReducer val window1 = source - .windowAll(SlidingTimeWindows.of( + .windowAll(SlidingEventTimeWindows.of( Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .reduce(reducer) @@ -73,7 +73,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val window2 = source .keyBy(0) - .windowAll(SlidingTimeWindows.of( + .windowAll(SlidingEventTimeWindows.of( Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { @@ -100,7 +100,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val reducer = new DummyReducer val window1 = source - .windowAll(SlidingTimeWindows.of( + .windowAll(SlidingEventTimeWindows.of( Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS))) .trigger(CountTrigger.of(100)) @@ -114,13 +114,13 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _, _]]) val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _, _]] assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]]) -assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows]) + assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows]) assertTrue( winOperator1.getWindowBufferFactory.isInstanceOf[ReducingWindowBuffer.Factory[_]]) val window2 = source - .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { def apply( @@ -137,7 +137,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _, _]]) val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _, _]] assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]]) - assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows]) + assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows]) assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[ListWindowBuffer.Factory[_]]) } @@ -170,7 +170,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val window2 = source - .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) + .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS))) .trigger(CountTrigger.of(100)) .evictor(CountEvictor.of(1000)) .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() { @@ -189,7 +189,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase { val winOperator2 = operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _, _]] assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]]) assertTrue(winOperator2.getEvictor.isInsta
[2/2] flink git commit: [FLINK-3536] Make clearer distinction between event time and processing time
[FLINK-3536] Make clearer distinction between event time and processing time This brings it more in line with *ProcessingTimeWindows and makes it clear what type of window assigner it is. The old name, i.e. SlidingTimeWindows and TumblingTimeWindows is still available but deprecated. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae5b4573 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae5b4573 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae5b4573 Branch: refs/heads/release-1.0 Commit: ae5b4573b8e316042077ab85690d5d773c031865 Parents: 9b9f84e Author: Aljoscha Krettek Authored: Mon Feb 29 14:56:29 2016 +0100 Committer: Aljoscha Krettek Committed: Mon Feb 29 20:39:51 2016 +0100 -- docs/apis/streaming/index.md| 16 +-- docs/apis/streaming/time.md | 2 +- docs/apis/streaming/windows.md | 31 +++-- .../streaming/examples/join/WindowJoin.java | 4 +- .../scala/examples/join/WindowJoin.scala| 4 +- .../api/datastream/CoGroupedStreams.java| 2 +- .../streaming/api/datastream/DataStream.java| 12 +- .../streaming/api/datastream/JoinedStreams.java | 2 +- .../streaming/api/datastream/KeyedStream.java | 12 +- .../assigners/SlidingEventTimeWindows.java | 112 +++ .../assigners/SlidingProcessingTimeWindows.java | 4 +- .../windowing/assigners/SlidingTimeWindows.java | 73 ++-- .../assigners/TumblingEventTimeWindows.java | 98 .../TumblingProcessingTimeWindows.java | 4 +- .../assigners/TumblingTimeWindows.java | 60 ++ .../windowing/AllWindowTranslationTest.java | 36 +++--- .../operators/windowing/CoGroupJoinITCase.java | 8 +- .../windowing/NonKeyedWindowOperatorTest.java | 10 +- .../windowing/TimeWindowTranslationTest.java| 8 +- .../operators/windowing/WindowFoldITCase.java | 6 +- .../operators/windowing/WindowOperatorTest.java | 12 +- .../windowing/WindowTranslationTest.java| 30 ++--- .../streaming/timestamp/TimestampITCase.java| 6 +- .../streaming/api/scala/CoGroupedStreams.scala | 2 +- .../flink/streaming/api/scala/DataStream.scala | 4 +- .../streaming/api/scala/JoinedStreams.scala | 2 +- .../flink/streaming/api/scala/KeyedStream.scala | 4 +- .../api/scala/AllWindowTranslationTest.scala| 26 ++--- .../streaming/api/scala/CoGroupJoinITCase.scala | 8 +- .../streaming/api/scala/WindowFoldITCase.scala | 6 +- .../api/scala/WindowTranslationTest.scala | 22 ++-- 31 files changed, 364 insertions(+), 262 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/ae5b4573/docs/apis/streaming/index.md -- diff --git a/docs/apis/streaming/index.md b/docs/apis/streaming/index.md index b8a3541..3741a46 100644 --- a/docs/apis/streaming/index.md +++ b/docs/apis/streaming/index.md @@ -293,7 +293,7 @@ keyedStream.maxBy("key"); key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows. {% highlight java %} -dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data +dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data {% endhighlight %} @@ -307,7 +307,7 @@ dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.seconds(5))); // Last 5 s WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator. {% highlight java %} -dataStream.windowAll(TumblingTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data +dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data {% endhighlight %} @@ -410,7 +410,7 @@ dataStream.union(otherStream1, otherStream2, ...); {% highlight java %} dataStream.join(otherStream) .where(0).equalTo(1) -.window(TumblingTimeWindows.of(Time.seconds(3))) +.window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new JoinFunction () {...}); {% endhighlight %} @@ -422,7 +422,7 @@ dataStream.join(otherStream) {% highlight java %} dataStream.coGroup(otherStream) .where(0).equalTo(1) -.window(TumblingTimeWindows.of(Time.seconds(3))) +.window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new CoGroupFunction () {...}); {% endhighlight %} @@ -669,7 +669,7 @@ keyedStream.maxBy("key") key according to some characteristic (e.g., the data that arrived within the last
flink git commit: [FLINK-3538] [api-breaking] Streamline Scala DataStream.join/coGroup
Repository: flink Updated Branches: refs/heads/release-1.0 3adff87dd -> 9b9f84e80 [FLINK-3538] [api-breaking] Streamline Scala DataStream.join/coGroup This enforces that the user always has to specify keys for both inputs before .window() can be called. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9b9f84e8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9b9f84e8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9b9f84e8 Branch: refs/heads/release-1.0 Commit: 9b9f84e800b19b585e147ab6add4eb946356caa0 Parents: 3adff87 Author: Aljoscha Krettek Authored: Mon Feb 29 17:02:38 2016 +0100 Committer: Aljoscha Krettek Committed: Mon Feb 29 20:27:56 2016 +0100 -- .../scala/examples/join/WindowJoin.scala| 4 +- .../streaming/api/scala/CoGroupedStreams.scala | 326 +++-- .../flink/streaming/api/scala/DataStream.scala | 8 +- .../streaming/api/scala/JoinedStreams.scala | 357 +++ .../StreamingScalaAPICompletenessTest.scala | 6 +- 5 files changed, 261 insertions(+), 440 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/9b9f84e8/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala -- diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala index 81f12dc..50a2216 100644 --- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala @@ -18,8 +18,6 @@ package org.apache.flink.streaming.scala.examples.join -import java.util.concurrent.TimeUnit - import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ @@ -58,7 +56,7 @@ object WindowJoin { val joined = grades.join(salaries) .where(_.name) .equalTo(_.name) -.window(SlidingTimeWindows.of(Time.of(2, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))) +.window(SlidingTimeWindows.of(Time.seconds(2), Time.seconds(1))) .apply { (g, s) => Person(g.name, g.grade, s.salary) } if (params.has("output")) { http://git-wip-us.apache.org/repos/asf/flink/blob/9b9f84e8/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala -- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala index f4ab2ee..4cce9e2 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala @@ -56,252 +56,164 @@ import scala.collection.JavaConverters._ * } }}} */ @Public -object CoGroupedStreams { +class CoGroupedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) { /** - * A co-group operation that does not yet have its [[KeySelector]]s defined. - * - * @tparam T1 Type of the elements from the first input - * @tparam T2 Type of the elements from the second input + * Specifies a [[KeySelector]] for elements from the first input. */ - class Unspecified[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) { - -/** - * Specifies a [[KeySelector]] for elements from the first input. - */ -def where[KEY: TypeInformation](keySelector: T1 => KEY): WithKey[T1, T2, KEY] = { - val cleanFun = clean(keySelector) - val keyType = implicitly[TypeInformation[KEY]] - val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] { -def getKey(in: T1) = cleanFun(in) -override def getProducedType: TypeInformation[KEY] = keyType - } - new WithKey[T1, T2, KEY](input1, input2, javaSelector, null, keyType) -} - -/** - * Specifies a [[KeySelector]] for elements from the second input. - */ -def equalTo[KEY: TypeInformation](keySelector: T2 => KEY): WithKey[T1, T2, KEY] = { - val cleanFun = clean(keySelector) - val keyType = implicitly[TypeInformation[KEY]] - val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] { -def getKey(in: T2) = cleanFun(in) -override def getProducedType: TypeIn
flink git commit: [FLINK-3538] [api-breaking] Streamline Scala DataStream.join/coGroup
Repository: flink Updated Branches: refs/heads/master 9580b8fe5 -> 0ac2b1a7b [FLINK-3538] [api-breaking] Streamline Scala DataStream.join/coGroup This enforces that the user always has to specify keys for both inputs before .window() can be called. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ac2b1a7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ac2b1a7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ac2b1a7 Branch: refs/heads/master Commit: 0ac2b1a7b4b44d0e5722532958e5bda00615dbb4 Parents: 9580b8f Author: Aljoscha Krettek Authored: Mon Feb 29 17:02:38 2016 +0100 Committer: Aljoscha Krettek Committed: Mon Feb 29 20:26:14 2016 +0100 -- .../scala/examples/join/WindowJoin.scala| 4 +- .../streaming/api/scala/CoGroupedStreams.scala | 326 +++-- .../flink/streaming/api/scala/DataStream.scala | 8 +- .../streaming/api/scala/JoinedStreams.scala | 357 +++ .../StreamingScalaAPICompletenessTest.scala | 6 +- 5 files changed, 261 insertions(+), 440 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/0ac2b1a7/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala -- diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala index 81f12dc..50a2216 100644 --- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala @@ -18,8 +18,6 @@ package org.apache.flink.streaming.scala.examples.join -import java.util.concurrent.TimeUnit - import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ @@ -58,7 +56,7 @@ object WindowJoin { val joined = grades.join(salaries) .where(_.name) .equalTo(_.name) -.window(SlidingTimeWindows.of(Time.of(2, TimeUnit.SECONDS), Time.of(1, TimeUnit.SECONDS))) +.window(SlidingTimeWindows.of(Time.seconds(2), Time.seconds(1))) .apply { (g, s) => Person(g.name, g.grade, s.salary) } if (params.has("output")) { http://git-wip-us.apache.org/repos/asf/flink/blob/0ac2b1a7/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala -- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala index f4ab2ee..4cce9e2 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala @@ -56,252 +56,164 @@ import scala.collection.JavaConverters._ * } }}} */ @Public -object CoGroupedStreams { +class CoGroupedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) { /** - * A co-group operation that does not yet have its [[KeySelector]]s defined. - * - * @tparam T1 Type of the elements from the first input - * @tparam T2 Type of the elements from the second input + * Specifies a [[KeySelector]] for elements from the first input. */ - class Unspecified[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) { - -/** - * Specifies a [[KeySelector]] for elements from the first input. - */ -def where[KEY: TypeInformation](keySelector: T1 => KEY): WithKey[T1, T2, KEY] = { - val cleanFun = clean(keySelector) - val keyType = implicitly[TypeInformation[KEY]] - val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] { -def getKey(in: T1) = cleanFun(in) -override def getProducedType: TypeInformation[KEY] = keyType - } - new WithKey[T1, T2, KEY](input1, input2, javaSelector, null, keyType) -} - -/** - * Specifies a [[KeySelector]] for elements from the second input. - */ -def equalTo[KEY: TypeInformation](keySelector: T2 => KEY): WithKey[T1, T2, KEY] = { - val cleanFun = clean(keySelector) - val keyType = implicitly[TypeInformation[KEY]] - val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] { -def getKey(in: T2) = cleanFun(in) -override def getProducedType: TypeInformation[
flink git commit: [FLINK-3535] [runtime-web] Decrease log verbosity of StackTraceSampleCoordinator
Repository: flink Updated Branches: refs/heads/release-1.0 70ce072a4 -> 3adff87dd [FLINK-3535] [runtime-web] Decrease log verbosity of StackTraceSampleCoordinator This closes #1732. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3adff87d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3adff87d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3adff87d Branch: refs/heads/release-1.0 Commit: 3adff87ddec7cb6c2aa29e4e23360e550ea8c118 Parents: 70ce072 Author: Ufuk Celebi Authored: Mon Feb 29 14:14:35 2016 +0100 Committer: Ufuk Celebi Committed: Mon Feb 29 20:10:18 2016 +0100 -- .../runtime/webmonitor/BackPressureStatsTracker.java | 9 +++-- .../webmonitor/StackTraceSampleCoordinator.java | 8 ++-- .../webmonitor/StackTraceSampleCoordinatorTest.java | 14 +- .../runtime/messages/StackTraceSampleMessages.scala | 6 +- 4 files changed, 23 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/3adff87d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java -- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java index b9b8a47..db88ffd 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java @@ -33,6 +33,7 @@ import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -170,6 +171,10 @@ public class BackPressureStatsTracker { if (executionContext != null) { pendingStats.add(vertex); + if (LOG.isDebugEnabled()) { + LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices())); + } + Future sample = coordinator.triggerStackTraceSample( vertex.getTaskVertices(), numSamples, @@ -246,7 +251,7 @@ public class BackPressureStatsTracker { OperatorBackPressureStats stats = createStatsFromSample(success); operatorStatsCache.put(vertex, stats); } else { - LOG.warn("Failed to gather stack trace sample.", failure); + LOG.debug("Failed to gather stack trace sample.", failure); } } catch (Throwable t) { LOG.error("Error during stats completion.", t); @@ -278,7 +283,7 @@ public class BackPressureStatsTracker { if (sampledTasks.contains(taskId)) { subtaskIndexMap.put(taskId, task.getParallelSubtaskIndex()); } else { - throw new RuntimeException("Outdated sample. A task, which is part of the " + + LOG.debug("Outdated sample. A task, which is part of the " + "sample has been reset."); } } http://git-wip-us.apache.org/repos/asf/flink/blob/3adff87d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java -- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java index e7b292f..bbfb530 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java @@ -179,7 +179,9 @@ public class StackTraceSampleCoordinator { pending.getSampleId());
flink git commit: [FLINK-3535] [runtime-web] Decrease log verbosity of StackTraceSampleCoordinator
Repository: flink Updated Branches: refs/heads/master 734ba01dd -> 9580b8fe5 [FLINK-3535] [runtime-web] Decrease log verbosity of StackTraceSampleCoordinator This closes #1732. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9580b8fe Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9580b8fe Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9580b8fe Branch: refs/heads/master Commit: 9580b8fe5a5ec8b3b23ffa7e09123b1e160e2016 Parents: 734ba01 Author: Ufuk Celebi Authored: Mon Feb 29 14:14:35 2016 +0100 Committer: Ufuk Celebi Committed: Mon Feb 29 20:09:26 2016 +0100 -- .../runtime/webmonitor/BackPressureStatsTracker.java | 9 +++-- .../webmonitor/StackTraceSampleCoordinator.java | 8 ++-- .../webmonitor/StackTraceSampleCoordinatorTest.java | 14 +- .../runtime/messages/StackTraceSampleMessages.scala | 6 +- 4 files changed, 23 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/9580b8fe/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java -- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java index b9b8a47..db88ffd 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java @@ -33,6 +33,7 @@ import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -170,6 +171,10 @@ public class BackPressureStatsTracker { if (executionContext != null) { pendingStats.add(vertex); + if (LOG.isDebugEnabled()) { + LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices())); + } + Future sample = coordinator.triggerStackTraceSample( vertex.getTaskVertices(), numSamples, @@ -246,7 +251,7 @@ public class BackPressureStatsTracker { OperatorBackPressureStats stats = createStatsFromSample(success); operatorStatsCache.put(vertex, stats); } else { - LOG.warn("Failed to gather stack trace sample.", failure); + LOG.debug("Failed to gather stack trace sample.", failure); } } catch (Throwable t) { LOG.error("Error during stats completion.", t); @@ -278,7 +283,7 @@ public class BackPressureStatsTracker { if (sampledTasks.contains(taskId)) { subtaskIndexMap.put(taskId, task.getParallelSubtaskIndex()); } else { - throw new RuntimeException("Outdated sample. A task, which is part of the " + + LOG.debug("Outdated sample. A task, which is part of the " + "sample has been reset."); } } http://git-wip-us.apache.org/repos/asf/flink/blob/9580b8fe/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java -- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java index e7b292f..bbfb530 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java @@ -179,7 +179,9 @@ public class StackTraceSampleCoordinator { pending.getSampleId());
flink git commit: [FLINK-3534] [runtime] Prevent canceling Execution from failing
Repository: flink Updated Branches: refs/heads/master 5f0af06fe -> 734ba01dd [FLINK-3534] [runtime] Prevent canceling Execution from failing This closes #1735. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/734ba01d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/734ba01d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/734ba01d Branch: refs/heads/master Commit: 734ba01dda51ab93d10e6c36cf5a0c4c65b28008 Parents: 5f0af06 Author: Ufuk Celebi Authored: Mon Feb 29 15:37:45 2016 +0100 Committer: Ufuk Celebi Committed: Mon Feb 29 20:06:08 2016 +0100 -- .../flink/runtime/executiongraph/Execution.java | 5 + .../runtime/executiongraph/ExecutionGraph.java | 2 +- .../ExecutionGraphRestartTest.java | 105 ++- .../ExecutionVertexCancelTest.java | 11 +- 4 files changed, 114 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/734ba01d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index bc75664..6d5832b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -797,6 +797,11 @@ public class Execution implements Serializable { return false; } + if (current == CANCELING) { + cancelingComplete(); + return false; + } + if (transitionState(current, FAILED, t)) { // success (in a manner of speaking) this.failureCause = t; http://git-wip-us.apache.org/repos/asf/flink/blob/734ba01d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 0d6de98..ed50bea 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -809,7 +809,7 @@ public class ExecutionGraph implements Serializable { public void fail(Throwable t) { while (true) { JobStatus current = state; - if (current == JobStatus.FAILED || current == JobStatus.FAILING) { + if (current == JobStatus.FAILING || current.isTerminalState()) { return; } else if (transitionState(current, JobStatus.FAILING, t)) { http://git-wip-us.apache.org/repos/asf/flink/blob/734ba01d/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 925b574..b1f11fb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.execution.ExecutionState; @@ -310,7 +311,8 @@ public class ExecutionGraphRestartTest extends TestLogger { while (deadline.hasTimeLeft() && !success) { success = true; for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) { - if (vertex.getExecutionState() != ExecutionState.FAILED) { + ExecutionState state = vertex.getExecutionState(); + if (state != ExecutionState.FAILED && state != ExecutionState.CANCELED) {
flink git commit: [FLINK-3534] [runtime] Prevent canceling Execution from failing
Repository: flink Updated Branches: refs/heads/release-1.0 702627116 -> 70ce072a4 [FLINK-3534] [runtime] Prevent canceling Execution from failing This closes #1735. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70ce072a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70ce072a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70ce072a Branch: refs/heads/release-1.0 Commit: 70ce072a484b0e4372f80f47440fdca702bb5042 Parents: 7026271 Author: Ufuk Celebi Authored: Mon Feb 29 15:37:45 2016 +0100 Committer: Ufuk Celebi Committed: Mon Feb 29 20:06:50 2016 +0100 -- .../flink/runtime/executiongraph/Execution.java | 5 + .../runtime/executiongraph/ExecutionGraph.java | 2 +- .../ExecutionGraphRestartTest.java | 105 ++- .../ExecutionVertexCancelTest.java | 11 +- 4 files changed, 114 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/70ce072a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index bc75664..6d5832b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -797,6 +797,11 @@ public class Execution implements Serializable { return false; } + if (current == CANCELING) { + cancelingComplete(); + return false; + } + if (transitionState(current, FAILED, t)) { // success (in a manner of speaking) this.failureCause = t; http://git-wip-us.apache.org/repos/asf/flink/blob/70ce072a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 0d6de98..ed50bea 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -809,7 +809,7 @@ public class ExecutionGraph implements Serializable { public void fail(Throwable t) { while (true) { JobStatus current = state; - if (current == JobStatus.FAILED || current == JobStatus.FAILING) { + if (current == JobStatus.FAILING || current.isTerminalState()) { return; } else if (transitionState(current, JobStatus.FAILING, t)) { http://git-wip-us.apache.org/repos/asf/flink/blob/70ce072a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 925b574..b1f11fb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.execution.ExecutionState; @@ -310,7 +311,8 @@ public class ExecutionGraphRestartTest extends TestLogger { while (deadline.hasTimeLeft() && !success) { success = true; for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) { - if (vertex.getExecutionState() != ExecutionState.FAILED) { + ExecutionState state = vertex.getExecutionState(); + if (state != ExecutionState.FAILED && state != ExecutionState.CANCELED) {
flink git commit: [docs] Update readme with current feature list and streaming example
Repository: flink Updated Branches: refs/heads/release-1.0 5b5136e95 -> 702627116 [docs] Update readme with current feature list and streaming example Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70262711 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70262711 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70262711 Branch: refs/heads/release-1.0 Commit: 70262711641358dea28485c0c6926b1bea57bb95 Parents: 5b5136e Author: Stephan Ewen Authored: Mon Feb 29 16:24:47 2016 +0100 Committer: Stephan Ewen Committed: Mon Feb 29 16:32:12 2016 +0100 -- README.md | 67 +- 1 file changed, 52 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/70262711/README.md -- diff --git a/README.md b/README.md index 3cf08c7..41ea37d 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,55 @@ # Apache Flink -Apache Flink is an open source platform for scalable batch and stream data processing. Flink supports batch and streaming analytics, -in one system. Analytical programs can be written in concise and elegant APIs in Java and Scala. +Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities. +Learn more about Flink at [http://flink.apache.org/](http://flink.apache.org/) + + +### Features + +* A streaming-first runtime that supports both batch processing and data streaming programs + +* Elegant and fluent APIs in Java and Scala + +* A runtime that supports very high throughput and low event latency at the same time + +* Support for *event time* and *out-of-order* processing in the DataStream API, based on the *Dataflow Model* + +* Flexible windowing (time, count, sessions, custom triggers) accross different time semantics (event time, processing time) + +* Fault-tolerance with *exactly-once* processing guarantees + +* Natural back-pressure in streaming programs. + +* Libraries for Graph processing (batch), Machine Learning (batch), and Complex Event Processing (streaming) + +* Built-in support for iterative programs (BSP) and in the DataSet (batch) API. + +* Custom memory management to for efficient and robust switching between in-memory and out-of-core data processing algorithms. + +* Compatibility layers for Apache Hadoop MapReduce and Apache Storm. + +* Integration with YARN, HDFS, HBase, and other components of the Apache Hadoop ecosystem. + + +### Streaming Example +```scala +case class WordWithCount(word: String, count: Long) + +val text = env.socketTextStream(host, port, '\n') + +val windowCounts = text.flatMap { w => w.split("\\s") } + .map { w => WordWithCount(w, 1) } + .keyBy("word") + .timeWindow(Time.seconds(5)) + .sum("count") + +windowCounts.print() +``` + +### Batch Example ```scala -case class WordWithCount(word: String, count: Int) +case class WordWithCount(word: String, count: Long) val text = env.readTextFile(path) @@ -16,16 +61,6 @@ val counts = text.flatMap { _.split("\\W+") } counts.writeAsCsv(outputPath) ``` -These are some of the unique features of Flink: - -* Hybrid batch/streaming runtime that supports batch processing and data streaming programs. -* Custom memory management to guarantee efficient, adaptive, and highly robust switching between in-memory and out-of-core data processing algorithms. -* Flexible and expressive windowing semantics for data stream programs. -* Built-in program optimizer that chooses the proper runtime operations for each program. -* Custom type analysis and serialization stack for high performance. - - -Learn more about Flink at [http://flink.apache.org/](http://flink.apache.org/) ## Building Apache Flink from Source @@ -34,21 +69,23 @@ Prerequisites for building Flink: * Unix-like environment (We use Linux, Mac OS X, Cygwin) * git -* Maven (at least version 3.0.4) +* Maven (we recommend version 3.0.4) * Java 7 or 8 ``` git clone https://github.com/apache/flink.git cd flink -mvn clean package -DskipTests # this will take up to 5 minutes +mvn clean package -DskipTests # this will take up to 10 minutes ``` Flink is now installed in `build-target` +*NOTE: Maven 3.3.x can build Flink, but will not properly shade away certain dependencies. Maven 3.0.3 creates the libraries properly.* ## Developing Flink The Flink committers use IntelliJ IDEA and Eclipse IDE to develop the Flink codebase. +We recommend IntelliJ IDEA for developing projects that involve Scala code. Minimal requirements for an IDE are: * Support for Java and Scala (also mixed projects)
flink git commit: [docs] Update readme with current feature list and streaming example
Repository: flink Updated Branches: refs/heads/master 405d22236 -> 5f0af06fe [docs] Update readme with current feature list and streaming example Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f0af06f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f0af06f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f0af06f Branch: refs/heads/master Commit: 5f0af06fef3046273f26d0015fe1c9b6df381751 Parents: 405d222 Author: Stephan Ewen Authored: Mon Feb 29 16:24:47 2016 +0100 Committer: Stephan Ewen Committed: Mon Feb 29 16:28:38 2016 +0100 -- README.md | 67 +- 1 file changed, 52 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/5f0af06f/README.md -- diff --git a/README.md b/README.md index 3cf08c7..41ea37d 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,55 @@ # Apache Flink -Apache Flink is an open source platform for scalable batch and stream data processing. Flink supports batch and streaming analytics, -in one system. Analytical programs can be written in concise and elegant APIs in Java and Scala. +Apache Flink is an open source stream processing framework with powerful stream- and batch-processing capabilities. +Learn more about Flink at [http://flink.apache.org/](http://flink.apache.org/) + + +### Features + +* A streaming-first runtime that supports both batch processing and data streaming programs + +* Elegant and fluent APIs in Java and Scala + +* A runtime that supports very high throughput and low event latency at the same time + +* Support for *event time* and *out-of-order* processing in the DataStream API, based on the *Dataflow Model* + +* Flexible windowing (time, count, sessions, custom triggers) accross different time semantics (event time, processing time) + +* Fault-tolerance with *exactly-once* processing guarantees + +* Natural back-pressure in streaming programs. + +* Libraries for Graph processing (batch), Machine Learning (batch), and Complex Event Processing (streaming) + +* Built-in support for iterative programs (BSP) and in the DataSet (batch) API. + +* Custom memory management to for efficient and robust switching between in-memory and out-of-core data processing algorithms. + +* Compatibility layers for Apache Hadoop MapReduce and Apache Storm. + +* Integration with YARN, HDFS, HBase, and other components of the Apache Hadoop ecosystem. + + +### Streaming Example +```scala +case class WordWithCount(word: String, count: Long) + +val text = env.socketTextStream(host, port, '\n') + +val windowCounts = text.flatMap { w => w.split("\\s") } + .map { w => WordWithCount(w, 1) } + .keyBy("word") + .timeWindow(Time.seconds(5)) + .sum("count") + +windowCounts.print() +``` + +### Batch Example ```scala -case class WordWithCount(word: String, count: Int) +case class WordWithCount(word: String, count: Long) val text = env.readTextFile(path) @@ -16,16 +61,6 @@ val counts = text.flatMap { _.split("\\W+") } counts.writeAsCsv(outputPath) ``` -These are some of the unique features of Flink: - -* Hybrid batch/streaming runtime that supports batch processing and data streaming programs. -* Custom memory management to guarantee efficient, adaptive, and highly robust switching between in-memory and out-of-core data processing algorithms. -* Flexible and expressive windowing semantics for data stream programs. -* Built-in program optimizer that chooses the proper runtime operations for each program. -* Custom type analysis and serialization stack for high performance. - - -Learn more about Flink at [http://flink.apache.org/](http://flink.apache.org/) ## Building Apache Flink from Source @@ -34,21 +69,23 @@ Prerequisites for building Flink: * Unix-like environment (We use Linux, Mac OS X, Cygwin) * git -* Maven (at least version 3.0.4) +* Maven (we recommend version 3.0.4) * Java 7 or 8 ``` git clone https://github.com/apache/flink.git cd flink -mvn clean package -DskipTests # this will take up to 5 minutes +mvn clean package -DskipTests # this will take up to 10 minutes ``` Flink is now installed in `build-target` +*NOTE: Maven 3.3.x can build Flink, but will not properly shade away certain dependencies. Maven 3.0.3 creates the libraries properly.* ## Developing Flink The Flink committers use IntelliJ IDEA and Eclipse IDE to develop the Flink codebase. +We recommend IntelliJ IDEA for developing projects that involve Scala code. Minimal requirements for an IDE are: * Support for Java and Scala (also mixed projects)
flink git commit: [hotfix] Properly copy stream record in ReducingWindowBuffer and FoldingWindowBuffer
Repository: flink Updated Branches: refs/heads/master f881e7079 -> 405d22236 [hotfix] Properly copy stream record in ReducingWindowBuffer and FoldingWindowBuffer Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/405d2223 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/405d2223 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/405d2223 Branch: refs/heads/master Commit: 405d2223697344e41aa11cc66cadf6b9afcacd89 Parents: f881e70 Author: Stephan Ewen Authored: Fri Feb 26 21:23:56 2016 +0100 Committer: Stephan Ewen Committed: Mon Feb 29 13:38:37 2016 +0100 -- .../runtime/operators/windowing/buffers/FoldingWindowBuffer.java | 2 +- .../runtime/operators/windowing/buffers/ReducingWindowBuffer.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/405d2223/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java index fa44f9d..f6c2319 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java @@ -63,7 +63,7 @@ public class FoldingWindowBuffer implements WindowBuffer { @Override public void storeElement(StreamRecord element) throws Exception { - data.replace(foldFunction.fold(data.getValue(), element.getValue()), element.getTimestamp()); + data.replace(foldFunction.fold(data.getValue(), element.getValue())); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/405d2223/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java index 1f2b639..d3bf4b4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java @@ -57,7 +57,7 @@ public class ReducingWindowBuffer implements WindowBuffer { @Override public void storeElement(StreamRecord element) throws Exception { if (data == null) { - data = new StreamRecord<>(element.getValue(), element.getTimestamp()); + data = element.copy(element.getValue()); } else { data.replace(reduceFunction.reduce(data.getValue(), element.getValue())); }
flink git commit: [FLINK-3532] [gelly] Fix artifact ID of flink-gelly-examples module
Repository: flink Updated Branches: refs/heads/release-1.0 a94aa9c6a -> 5b5136e95 [FLINK-3532] [gelly] Fix artifact ID of flink-gelly-examples module The current flink-gelly-examples artifact id wrongly used an underscore to separate examples from flink-gelly. This commit replaces the underscore with an hyphen. This closes #1731. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b5136e9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b5136e9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b5136e9 Branch: refs/heads/release-1.0 Commit: 5b5136e9585a553485bc13abdfe3b8a6bd1805fc Parents: a94aa9c Author: Till Rohrmann Authored: Mon Feb 29 12:01:10 2016 +0100 Committer: Till Rohrmann Committed: Mon Feb 29 12:18:07 2016 +0100 -- flink-libraries/flink-gelly-examples/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/5b5136e9/flink-libraries/flink-gelly-examples/pom.xml -- diff --git a/flink-libraries/flink-gelly-examples/pom.xml b/flink-libraries/flink-gelly-examples/pom.xml index 2b84cc1..96492ae 100644 --- a/flink-libraries/flink-gelly-examples/pom.xml +++ b/flink-libraries/flink-gelly-examples/pom.xml @@ -27,7 +27,7 @@ .. - flink-gelly_examples_2.10 + flink-gelly-examples_2.10 flink-gelly-examples jar
flink git commit: [FLINK-3532] [gelly] Fix artifact ID of flink-gelly-examples module
Repository: flink Updated Branches: refs/heads/master 512c84ff6 -> f881e7079 [FLINK-3532] [gelly] Fix artifact ID of flink-gelly-examples module The current flink-gelly-examples artifact id wrongly used an underscore to separate examples from flink-gelly. This commit replaces the underscore with an hyphen. This closes #1731. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f881e707 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f881e707 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f881e707 Branch: refs/heads/master Commit: f881e707924ed1c5c3bac50bde7660af9d9eea74 Parents: 512c84f Author: Till Rohrmann Authored: Mon Feb 29 12:01:10 2016 +0100 Committer: Till Rohrmann Committed: Mon Feb 29 12:16:17 2016 +0100 -- flink-libraries/flink-gelly-examples/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/f881e707/flink-libraries/flink-gelly-examples/pom.xml -- diff --git a/flink-libraries/flink-gelly-examples/pom.xml b/flink-libraries/flink-gelly-examples/pom.xml index 06340fb..ab7eacc 100644 --- a/flink-libraries/flink-gelly-examples/pom.xml +++ b/flink-libraries/flink-gelly-examples/pom.xml @@ -27,7 +27,7 @@ .. - flink-gelly_examples_2.10 + flink-gelly-examples_2.10 flink-gelly-examples jar
flink git commit: [FLINK-3502] Add test case. Bug was resolved by a previous commit.
Repository: flink Updated Branches: refs/heads/tableOnCalcite 4a63e2914 -> e097cf7db [FLINK-3502] Add test case. Bug was resolved by a previous commit. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e097cf7d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e097cf7d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e097cf7d Branch: refs/heads/tableOnCalcite Commit: e097cf7dbbeec6ecbd77e77842c1c869ae40de1b Parents: 4a63e29 Author: Fabian Hueske Authored: Fri Feb 26 15:53:19 2016 +0100 Committer: Fabian Hueske Committed: Fri Feb 26 15:53:19 2016 +0100 -- .../table/test/GroupedAggregationsITCase.scala | 17 + 1 file changed, 17 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/e097cf7d/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala -- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala index a88abcb..062967d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala @@ -137,4 +137,21 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgram TestBaseUtils.compareResultAsText(results.asJava, expected) } + + @Test + def testGroupedByExpression2(): Unit = { + +// verify AggregateProjectPullUpConstantsRule + +val env = ExecutionEnvironment.getExecutionEnvironment +val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + .select('b, 4 as 'four, 'a) + .groupBy('b, 'four) + .select('four, 'a.sum) + +val expected = "4,1\n" + "4,5\n" + "4,15\n" + "4,34\n" + "4,65\n" + "4,111\n" +val results = t.toDataSet[Row].collect() + +TestBaseUtils.compareResultAsText(results.asJava, expected) + } }
flink git commit: Bump version to 1.1-SNAPSHOT
Repository: flink Updated Branches: refs/heads/master e840bbf71 -> 512c84ff6 Bump version to 1.1-SNAPSHOT Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/512c84ff Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/512c84ff Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/512c84ff Branch: refs/heads/master Commit: 512c84ff6ba82a095c51a60ac09ce69e2fc8bbc6 Parents: e840bbf Author: Robert Metzger Authored: Fri Feb 26 14:49:53 2016 +0100 Committer: Robert Metzger Committed: Mon Feb 29 10:32:15 2016 +0100 -- docs/_config.yml | 6 +++--- flink-annotations/pom.xml | 2 +- flink-batch-connectors/flink-avro/pom.xml | 2 +- flink-batch-connectors/flink-hadoop-compatibility/pom.xml | 2 +- flink-batch-connectors/flink-hbase/pom.xml | 2 +- flink-batch-connectors/flink-hcatalog/pom.xml | 2 +- flink-batch-connectors/flink-jdbc/pom.xml | 2 +- flink-batch-connectors/pom.xml | 2 +- flink-clients/pom.xml | 2 +- flink-contrib/flink-connector-wikiedits/pom.xml| 2 +- flink-contrib/flink-operator-stats/pom.xml | 2 +- flink-contrib/flink-statebackend-rocksdb/pom.xml | 2 +- flink-contrib/flink-storm-examples/pom.xml | 2 +- flink-contrib/flink-storm/pom.xml | 2 +- flink-contrib/flink-streaming-contrib/pom.xml | 2 +- flink-contrib/flink-tweet-inputformat/pom.xml | 2 +- flink-contrib/pom.xml | 2 +- flink-core/pom.xml | 2 +- flink-dist/pom.xml | 2 +- flink-examples/flink-examples-batch/pom.xml| 2 +- flink-examples/flink-examples-streaming/pom.xml| 2 +- flink-examples/pom.xml | 2 +- flink-fs-tests/pom.xml | 2 +- flink-java/pom.xml | 2 +- flink-java8/pom.xml| 2 +- flink-libraries/flink-cep/pom.xml | 2 +- flink-libraries/flink-gelly-examples/pom.xml | 2 +- flink-libraries/flink-gelly-scala/pom.xml | 2 +- flink-libraries/flink-gelly/pom.xml| 2 +- flink-libraries/flink-ml/pom.xml | 2 +- flink-libraries/flink-python/pom.xml | 2 +- flink-libraries/flink-table/pom.xml| 2 +- flink-libraries/pom.xml| 2 +- flink-optimizer/pom.xml| 2 +- flink-quickstart/flink-quickstart-java/pom.xml | 2 +- .../src/main/resources/archetype-resources/pom.xml | 2 +- flink-quickstart/flink-quickstart-scala/pom.xml| 2 +- .../src/main/resources/archetype-resources/pom.xml | 2 +- flink-quickstart/pom.xml | 2 +- flink-runtime-web/pom.xml | 2 +- flink-runtime/pom.xml | 2 +- flink-scala-shell/pom.xml | 2 +- flink-scala/pom.xml| 2 +- flink-shaded-curator/flink-shaded-curator-recipes/pom.xml | 2 +- flink-shaded-curator/flink-shaded-curator-test/pom.xml | 2 +- flink-shaded-curator/pom.xml | 2 +- flink-shaded-hadoop/flink-shaded-hadoop1/pom.xml | 2 +- flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml | 2 +- flink-shaded-hadoop/flink-shaded-include-yarn-tests/pom.xml| 2 +- flink-shaded-hadoop/pom.xml| 2 +- .../flink-connector-elasticsearch/pom.xml | 2 +- flink-streaming-connectors/flink-connector-filesystem/pom.xml | 2 +- flink-streaming-connectors/flink-connector-flume/pom.xml | 2 +- flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml | 2 +- flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml | 2 +- flink-streaming-connectors/flink-connector-kafka-base/pom.xml | 2 +- flink-streaming-connectors/flink-connector-nifi/pom.xml| 2 +- flink-streaming-connectors/flink-connector-rabbitmq/pom.xml| 2 +- flink-streaming-connectors/flink-connector-twitter/pom.xml | 2 +- flink-streaming-connectors/pom.xml | 2 +- flink-streaming-java/pom.xml |
flink git commit: [FLINK-3517] [dist] Only count active PIDs in start script
Repository: flink Updated Branches: refs/heads/master 49069823b -> e840bbf71 [FLINK-3517] [dist] Only count active PIDs in start script This closes #1716. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e840bbf7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e840bbf7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e840bbf7 Branch: refs/heads/master Commit: e840bbf7192aa3667e7ba128adc84cfd8318ddea Parents: 4906982 Author: Ufuk Celebi Authored: Fri Feb 26 00:11:48 2016 +0100 Committer: Ufuk Celebi Committed: Mon Feb 29 10:31:44 2016 +0100 -- flink-dist/src/main/flink-bin/bin/flink-daemon.sh | 12 ++-- 1 file changed, 10 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/e840bbf7/flink-dist/src/main/flink-bin/bin/flink-daemon.sh -- diff --git a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh index f6eab70..2388ba7 100644 --- a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh +++ b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh @@ -85,8 +85,16 @@ case $STARTSTOP in # Print a warning if daemons are already running on host if [ -f $pid ]; then -count=$(wc -l $pid | awk '{print $1}') -echo "[WARNING] $count instance(s) of $DAEMON are already running on $HOSTNAME." + active=() + while IFS='' read -r p || [[ -n "$p" ]]; do +kill -0 $p >/dev/null 2>&1 +if [ $? -eq 0 ]; then + active+=($p) +fi + done < "${pid}" + + count="${#active[@]}" + echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME." fi echo "Starting $DAEMON daemon on host $HOSTNAME."
flink git commit: [FLINK-3517] [dist] Only count active PIDs in start script
Repository: flink Updated Branches: refs/heads/release-1.0 945f25b97 -> a94aa9c6a [FLINK-3517] [dist] Only count active PIDs in start script Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a94aa9c6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a94aa9c6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a94aa9c6 Branch: refs/heads/release-1.0 Commit: a94aa9c6a7d47d8074120b77fe051f98931c11c6 Parents: 945f25b Author: Ufuk Celebi Authored: Fri Feb 26 00:11:48 2016 +0100 Committer: Ufuk Celebi Committed: Mon Feb 29 10:30:34 2016 +0100 -- flink-dist/src/main/flink-bin/bin/flink-daemon.sh | 12 ++-- 1 file changed, 10 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/a94aa9c6/flink-dist/src/main/flink-bin/bin/flink-daemon.sh -- diff --git a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh index f6eab70..2388ba7 100644 --- a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh +++ b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh @@ -85,8 +85,16 @@ case $STARTSTOP in # Print a warning if daemons are already running on host if [ -f $pid ]; then -count=$(wc -l $pid | awk '{print $1}') -echo "[WARNING] $count instance(s) of $DAEMON are already running on $HOSTNAME." + active=() + while IFS='' read -r p || [[ -n "$p" ]]; do +kill -0 $p >/dev/null 2>&1 +if [ $? -eq 0 ]; then + active+=($p) +fi + done < "${pid}" + + count="${#active[@]}" + echo "[INFO] $count instance(s) of $DAEMON are already running on $HOSTNAME." fi echo "Starting $DAEMON daemon on host $HOSTNAME."
flink git commit: [FLINK-3461] [runtime] Remove redundant check in ZooKeeperLeaderElectionService
Repository: flink Updated Branches: refs/heads/release-1.0 eb164211e -> 945f25b97 [FLINK-3461] [runtime] Remove redundant check in ZooKeeperLeaderElectionService This closes #1700. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/945f25b9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/945f25b9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/945f25b9 Branch: refs/heads/release-1.0 Commit: 945f25b974f3773be31a108330d6cc5f7ec36235 Parents: eb16421 Author: sahitya-pavurala Authored: Tue Feb 23 21:08:23 2016 -0500 Committer: Ufuk Celebi Committed: Mon Feb 29 10:18:28 2016 +0100 -- .../runtime/leaderelection/ZooKeeperLeaderElectionService.java | 6 +- 1 file changed, 1 insertion(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/945f25b9/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java index 811037c..5c10293 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java @@ -148,11 +148,7 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le @Override public boolean hasLeadership() { - if(leaderLatch.getState().equals(LeaderLatch.State.STARTED)) { - return leaderLatch.hasLeadership(); - } else { - return false; - } + return leaderLatch.hasLeadership(); } @Override
flink git commit: [FLINK-3461] [runtime] Fix space indentation in ZooKeeperLeaderElectionService
Repository: flink Updated Branches: refs/heads/master febcb7f11 -> 49069823b [FLINK-3461] [runtime] Fix space indentation in ZooKeeperLeaderElectionService Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49069823 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49069823 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49069823 Branch: refs/heads/master Commit: 49069823bacd31ac8139664707874af9cec2720d Parents: febcb7f Author: Ufuk Celebi Authored: Mon Feb 29 10:16:38 2016 +0100 Committer: Ufuk Celebi Committed: Mon Feb 29 10:16:38 2016 +0100 -- .../runtime/leaderelection/ZooKeeperLeaderElectionService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/49069823/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java index 8d39fe2..5c10293 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java @@ -148,7 +148,7 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le @Override public boolean hasLeadership() { -return leaderLatch.hasLeadership(); + return leaderLatch.hasLeadership(); } @Override
flink git commit: [FLINK-3461] [runtime] Remove redundant check in ZooKeeperLeaderElectionService
Repository: flink Updated Branches: refs/heads/master 9924c3afe -> febcb7f11 [FLINK-3461] [runtime] Remove redundant check in ZooKeeperLeaderElectionService This closes #1700. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/febcb7f1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/febcb7f1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/febcb7f1 Branch: refs/heads/master Commit: febcb7f114439cf0b99f2aa968861387d3f46deb Parents: 9924c3a Author: sahitya-pavurala Authored: Tue Feb 23 21:08:23 2016 -0500 Committer: Ufuk Celebi Committed: Mon Feb 29 10:08:10 2016 +0100 -- .../runtime/leaderelection/ZooKeeperLeaderElectionService.java | 6 +- 1 file changed, 1 insertion(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/febcb7f1/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java index 811037c..8d39fe2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java @@ -148,11 +148,7 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le @Override public boolean hasLeadership() { - if(leaderLatch.getState().equals(LeaderLatch.State.STARTED)) { - return leaderLatch.hasLeadership(); - } else { - return false; - } +return leaderLatch.hasLeadership(); } @Override