flink git commit: [docs] fix readme typos; use the same scala style in the examples

2016-02-29 Thread vasia
Repository: flink
Updated Branches:
  refs/heads/master a922473c0 -> e8e88afdc


[docs] fix readme typos; use the same scala style in the examples

This closes #1743


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e8e88afd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e8e88afd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e8e88afd

Branch: refs/heads/master
Commit: e8e88afdc4807277a1d8df8663b33e60b5688d0d
Parents: a922473
Author: vasia 
Authored: Mon Feb 29 22:49:35 2016 +0100
Committer: vasia 
Committed: Tue Mar 1 08:15:12 2016 +0100

--
 README.md | 14 +++---
 1 file changed, 7 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/e8e88afd/README.md
--
diff --git a/README.md b/README.md
index 41ea37d..1597a58 100644
--- a/README.md
+++ b/README.md
@@ -19,17 +19,17 @@ Learn more about Flink at 
[http://flink.apache.org/](http://flink.apache.org/)
 
 * Fault-tolerance with *exactly-once* processing guarantees
 
-* Natural back-pressure in streaming programs.
+* Natural back-pressure in streaming programs
 
 * Libraries for Graph processing (batch), Machine Learning (batch), and 
Complex Event Processing (streaming)
 
-* Built-in support for iterative programs (BSP) and in the DataSet (batch) API.
+* Built-in support for iterative programs (BSP) in the DataSet (batch) API
 
-* Custom memory management to for efficient and robust switching between 
in-memory and out-of-core data processing algorithms.
+* Custom memory management for efficient and robust switching between 
in-memory and out-of-core data processing algorithms
 
-* Compatibility layers for Apache Hadoop MapReduce and Apache Storm.
+* Compatibility layers for Apache Hadoop MapReduce and Apache Storm
 
-* Integration with YARN, HDFS, HBase, and other components of the Apache 
Hadoop ecosystem.
+* Integration with YARN, HDFS, HBase, and other components of the Apache 
Hadoop ecosystem
 
 
 ### Streaming Example
@@ -53,8 +53,8 @@ case class WordWithCount(word: String, count: Long)
 
 val text = env.readTextFile(path)
 
-val counts = text.flatMap { _.split("\\W+") }
-  .map { WordWithCount(_, 1) }
+val counts = text.flatMap { w => w.split("\\s") }
+  .map { w => WordWithCount(w, 1) }
   .groupBy("word")
   .sum("count")
 



[2/2] flink git commit: [FLINK-3537] Fix code gen for disjunctions.

2016-02-29 Thread vasia
[FLINK-3537] Fix code gen for disjunctions.

This closes #1733


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c57e2415
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c57e2415
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c57e2415

Branch: refs/heads/tableOnCalcite
Commit: c57e2415fdeac80f8c851f7ddb4fabe7a1781836
Parents: e097cf7
Author: Fabian Hueske 
Authored: Mon Feb 29 15:00:06 2016 +0100
Committer: vasia 
Committed: Mon Feb 29 23:15:56 2016 +0100

--
 .../api/table/codegen/calls/ScalarOperators.scala  |  2 +-
 .../flink/api/java/table/test/FilterITCase.java| 16 
 .../flink/api/scala/table/test/FilterITCase.scala  | 12 
 .../flink/api/scala/table/test/JoinITCase.scala| 17 +
 4 files changed, 46 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/c57e2415/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
index 8580b25..f71b643 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarOperators.scala
@@ -303,7 +303,7 @@ object ScalarOperators {
   s"""
 |${left.code}
 |${right.code}
-|boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm};
+|boolean $resultTerm = ${left.resultTerm} || ${right.resultTerm};
 |""".stripMargin
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c57e2415/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
--
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
index c69d1a7..c783524 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java
@@ -126,6 +126,22 @@ public class FilterITCase extends TableProgramsTestBase {
}
 
@Test
+   public void testDisjunctivePreds() throws Exception {
+   ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   TableEnvironment tableEnv = getJavaTableEnvironment();
+
+   DataSet> input = 
CollectionDataSets.get3TupleDataSet(env);
+
+   Table table = tableEnv.fromDataSet(input, "a, b, c");
+   Table result = table.filter("a < 2 || a > 20");
+
+   DataSet ds = tableEnv.toDataSet(result, Row.class);
+   List results = ds.collect();
+   String expected = "1,1,Hi\n" + "21,6,Comment#15\n";
+   compareResultAsText(results, expected);
+   }
+
+   @Test
public void testIntegerBiggerThan128() throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tableEnv = getJavaTableEnvironment();

http://git-wip-us.apache.org/repos/asf/flink/blob/c57e2415/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
index 2dfdb2c..0febd4d 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala
@@ -126,6 +126,18 @@ class FilterITCase(
   }
 
   @Test
+  def testDisjunctivePreds(): Unit = {
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+
+val filterDs = ds.filter( 'a < 2 || 'a > 20)
+val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
+val results = filterDs.collect()
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
   def testFilterMerge(): Unit = {
 // verify FilterMergeRule.
 

http://git-wip-us.apache.org/repos

[1/2] flink git commit: [FLINK-3504] Fix join translation. Equality predicates may only reference fields.

2016-02-29 Thread vasia
Repository: flink
Updated Branches:
  refs/heads/tableOnCalcite e097cf7db -> 031d6f745


[FLINK-3504] Fix join translation. Equality predicates may only reference 
fields.

Catch Calcite planner exception and rethrow with additional error message

This closes #1734


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/031d6f74
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/031d6f74
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/031d6f74

Branch: refs/heads/tableOnCalcite
Commit: 031d6f745fe63e1c089055b825f7816b91b8433b
Parents: c57e241
Author: Fabian Hueske 
Authored: Mon Feb 29 14:46:08 2016 +0100
Committer: vasia 
Committed: Mon Feb 29 23:15:56 2016 +0100

--
 .../api/java/table/JavaBatchTranslator.scala| 25 -
 .../plan/rules/logical/FlinkJoinRule.scala  | 54 +++-
 .../flink/api/scala/table/test/JoinITCase.scala | 20 
 3 files changed, 96 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/031d6f74/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
index 7e8ee77..f238df3 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.table
 
+import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
 import org.apache.calcite.plan.{RelTraitSet, RelOptUtil}
 import org.apache.calcite.rel.{RelCollations, RelNode}
 import org.apache.calcite.sql2rel.RelDecorrelator
@@ -75,7 +76,17 @@ class JavaBatchTranslator(config: TableConfig) extends 
PlanTranslator {
 // optimize the logical Flink plan
 val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES)
 val flinkOutputProps = RelTraitSet.createEmpty()
-val optPlan = optProgram.run(planner, decorPlan, flinkOutputProps)
+
+val optPlan = try {
+  optProgram.run(planner, decorPlan, flinkOutputProps)
+}
+catch {
+  case e: CannotPlanException =>
+throw new PlanGenException(
+  s"Cannot generate a valid execution plan for the given query: \n\n" +
+  s"${RelOptUtil.toString(lPlan)}\n" +
+  "Please consider filing a bug report.", e)
+}
 
 println("---")
 println("Optimized Plan:")
@@ -87,7 +98,17 @@ class JavaBatchTranslator(config: TableConfig) extends 
PlanTranslator {
 val dataSetOutputProps = RelTraitSet.createEmpty()
   .plus(DataSetConvention.INSTANCE)
   .plus(RelCollations.of()).simplify()
-val dataSetPlan = dataSetProgram.run(planner, optPlan, dataSetOutputProps)
+
+val dataSetPlan = try {
+  dataSetProgram.run(planner, optPlan, dataSetOutputProps)
+}
+catch {
+  case e: CannotPlanException =>
+throw new PlanGenException(
+  s"Cannot generate a valid execution plan for the given query: \n\n" +
+s"${RelOptUtil.toString(lPlan)}\n" +
+"Please consider filing a bug report.", e)
+}
 
 println("-")
 println("DataSet Plan:")

http://git-wip-us.apache.org/repos/asf/flink/blob/031d6f74/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala
index 3826c9a..82f3eaa 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/logical/FlinkJoinRule.scala
@@ -18,12 +18,16 @@
 
 package org.apache.flink.api.table.plan.rules.logical
 
-import org.apache.calcite.plan.{Convention, RelOptRule, RelTraitSet}
+import org.apache.calcite.plan.{RelOptRuleCall, Convention, RelOptRule, 
RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.convert.ConverterRule
 import org.apache.calcite.rel.logical.LogicalJoin
+import org.apache.calcite.rex.{RexInputRef, RexCall}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.api.table.plan.nodes.logical.{FlinkJoin, 
FlinkConvention}
 
+import scala.collec

buildbot success in on flink-docs-release-0.10

2016-02-29 Thread buildbot
The Buildbot has detected a restored build on builder flink-docs-release-0.10 
while building . Full details are available at:
https://ci.apache.org/builders/flink-docs-release-0.10/builds/134

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: orcus_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.10' 
triggered this build
Build Source Stamp: [branch release-0.10] HEAD
Blamelist: 

Build succeeded!

Sincerely,
 -The Buildbot





buildbot success in on flink-docs-master

2016-02-29 Thread buildbot
The Buildbot has detected a restored build on builder flink-docs-master while 
building . Full details are available at:
https://ci.apache.org/builders/flink-docs-master/builds/251

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: lares_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-master' triggered 
this build
Build Source Stamp: [branch master] HEAD
Blamelist: 

Build succeeded!

Sincerely,
 -The Buildbot





[2/2] flink git commit: [FLINK-3548] [api-breaking] Remove unnecessary generic parameter from SingleOutputStreamOperator

2016-02-29 Thread aljoscha
[FLINK-3548] [api-breaking] Remove unnecessary generic parameter from 
SingleOutputStreamOperator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a922473c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a922473c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a922473c

Branch: refs/heads/master
Commit: a922473c0835a757e7108c79ad52f103ace86030
Parents: 51fc298
Author: Aljoscha Krettek 
Authored: Mon Feb 29 20:20:07 2016 +0100
Committer: Aljoscha Krettek 
Committed: Mon Feb 29 21:37:19 2016 +0100

--
 .../apache/flink/storm/api/FlinkTopology.java   |  14 +--
 .../api/datastream/AllWindowedStream.java   |  54 +-
 .../api/datastream/ConnectedStreams.java|  20 ++--
 .../streaming/api/datastream/DataStream.java|  24 ++---
 .../api/datastream/DataStreamSource.java|   4 +-
 .../api/datastream/IterativeStream.java |  17 +--
 .../streaming/api/datastream/KeyedStream.java   |  38 +++
 .../datastream/SingleOutputStreamOperator.java  |  77 +++---
 .../api/datastream/StreamProjection.java| 104 +--
 .../api/datastream/WindowedStream.java  |  54 +-
 .../environment/StreamExecutionEnvironment.java |  12 +--
 .../flink/streaming/api/DataStreamTest.java |   4 +-
 .../apache/flink/streaming/api/IterateTest.java |   9 +-
 .../flink/streaming/api/TypeFillTest.java   |   2 +-
 .../api/graph/StreamGraphGeneratorTest.java |  10 +-
 .../flink/streaming/api/scala/DataStream.scala  |  18 ++--
 .../StreamingScalaAPICompletenessTest.scala |   2 +-
 17 files changed, 211 insertions(+), 252 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/a922473c/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
--
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
index 811cfb4..6706a91 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
@@ -200,7 +200,7 @@ public class FlinkTopology {
SplitStream> splitSource 
= multiSource
.split(new 
StormStreamSelector());
for (String streamId : sourceStreams.keySet()) {
-   SingleOutputStreamOperator 
outStream = splitSource.select(streamId)
+   SingleOutputStreamOperator 
outStream = splitSource.select(streamId)
.map(new 
SplitStreamMapper());

outStream.getTransformation().setOutputType(declarer.getOutputType(streamId));
outputStreams.put(streamId, outStream);
@@ -299,7 +299,7 @@ public class FlinkTopology {
inputStreams.put(streamId, 
processInput(boltId, userBolt, streamId, grouping, producer));
}
 
-   final SingleOutputStreamOperator 
outputStream = createOutput(boltId,
+   final SingleOutputStreamOperator 
outputStream = createOutput(boltId,
userBolt, inputStreams);
 
if (common.is_set_parallelism_hint()) {
@@ -359,7 +359,7 @@ public class FlinkTopology {
}
 
@SuppressWarnings({ "unchecked", "rawtypes" })
-   private SingleOutputStreamOperator createOutput(String boltId, 
IRichBolt bolt,
+   private SingleOutputStreamOperator createOutput(String boltId, 
IRichBolt bolt,
Map> inputStreams) {
assert (boltId != null);
assert (bolt != null);
@@ -403,7 +403,7 @@ public class FlinkTopology {
final HashMap boltOutputs = 
this.outputStreams.get(boltId);
final FlinkOutputFieldsDeclarer declarer = 
this.declarers.get(boltId);
 
-   final SingleOutputStreamOperator outputStream;
+   final SingleOutputStreamOperator outputStream;
 
if (boltOutputs.size() < 2) { // single output stream or sink
String outputStreamId;
@@ -415,7 +415,7 @@ public class FlinkTopology {
 
final TypeInformation outType = 
declarer.getOutputType(outputStreamId);
 
-   final SingleOutputStreamOperator outStream;
+   final SingleOutputStreamOperator outStream;
 
 

[1/2] flink git commit: [FLINK-3548] [api-breaking] Remove unnecessary generic parameter from SingleOutputStreamOperator

2016-02-29 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/master 51fc29812 -> a922473c0


http://git-wip-us.apache.org/repos/asf/flink/blob/a922473c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 4c1c265..fb7ec9f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -735,7 +735,7 @@ public abstract class StreamExecutionEnvironment {
 
SourceFunction function;
try {
-   function = new 
FromElementsFunction(typeInfo.createSerializer(getConfig()), data);
+   function = new 
FromElementsFunction<>(typeInfo.createSerializer(getConfig()), data);
}
catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
@@ -789,7 +789,7 @@ public abstract class StreamExecutionEnvironment {
public  DataStreamSource fromCollection(Iterator data, 
TypeInformation typeInfo) {
Preconditions.checkNotNull(data, "The iterator must not be 
null");
 
-   SourceFunction function = new 
FromIteratorFunction(data);
+   SourceFunction function = new FromIteratorFunction<>(data);
return addSource(function, "Collection Source", typeInfo);
}
 
@@ -838,7 +838,7 @@ public abstract class StreamExecutionEnvironment {
// private helper for passing different names
private  DataStreamSource 
fromParallelCollection(SplittableIterator iterator, TypeInformation
typeInfo, String operatorName) {
-   return addSource(new 
FromSplittableIteratorFunction(iterator), operatorName).returns(typeInfo);
+   return addSource(new 
FromSplittableIteratorFunction<>(iterator), operatorName, typeInfo);
}
 
/**
@@ -1033,8 +1033,8 @@ public abstract class StreamExecutionEnvironment {
// private helper for passing different names
private  DataStreamSource createInput(InputFormat 
inputFormat,
TypeInformation typeInfo, String sourceName) {
-   FileSourceFunction function = new 
FileSourceFunction(inputFormat, typeInfo);
-   return addSource(function, sourceName).returns(typeInfo);
+   FileSourceFunction function = new 
FileSourceFunction<>(inputFormat, typeInfo);
+   return addSource(function, sourceName, typeInfo);
}
 
/**
@@ -1136,7 +1136,7 @@ public abstract class StreamExecutionEnvironment {
sourceOperator = new StreamSource<>(function);
}
 
-   return new DataStreamSource(this, typeInfo, 
sourceOperator, isParallel, sourceName);
+   return new DataStreamSource<>(this, typeInfo, sourceOperator, 
isParallel, sourceName);
}
 
/**

http://git-wip-us.apache.org/repos/asf/flink/blob/a922473c/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
--
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 7a4d6f8..cf48160 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -435,7 +435,7 @@ public class DataStreamTest extends 
StreamingMultipleProgramsTestBase {
DataStreamSource> src = env.fromElements(new 
Tuple2<>(0L, 0L));
env.setParallelism(10);
 
-   SingleOutputStreamOperator map = src.map(new 
MapFunction, Long>() {
+   SingleOutputStreamOperator map = src.map(new 
MapFunction, Long>() {
@Override
public Long map(Tuple2 value) throws 
Exception {
return null;
@@ -759,7 +759,7 @@ public class DataStreamTest extends 
StreamingMultipleProgramsTestBase {
 
@SuppressWarnings("rawtypes,unchecked")
private static Integer createDownStreamId(ConnectedStreams dataStream) {
-   SingleOutputStreamOperator coMap = dataStream.map(new 
CoMapFunction, Tuple2, Object>() {
+   SingleOutputStreamOperator coMap = dataStream.map(new 
CoMapFunction, Tuple2, Object>() {
private static final long serialVersi

[1/2] flink git commit: [FLINK-3548] [api-breaking] Remove unnecessary generic parameter from SingleOutputStreamOperator

2016-02-29 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/release-1.0 74c62b0b8 -> 30486905b


http://git-wip-us.apache.org/repos/asf/flink/blob/30486905/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 4c1c265..fb7ec9f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -735,7 +735,7 @@ public abstract class StreamExecutionEnvironment {
 
SourceFunction function;
try {
-   function = new 
FromElementsFunction(typeInfo.createSerializer(getConfig()), data);
+   function = new 
FromElementsFunction<>(typeInfo.createSerializer(getConfig()), data);
}
catch (IOException e) {
throw new RuntimeException(e.getMessage(), e);
@@ -789,7 +789,7 @@ public abstract class StreamExecutionEnvironment {
public  DataStreamSource fromCollection(Iterator data, 
TypeInformation typeInfo) {
Preconditions.checkNotNull(data, "The iterator must not be 
null");
 
-   SourceFunction function = new 
FromIteratorFunction(data);
+   SourceFunction function = new FromIteratorFunction<>(data);
return addSource(function, "Collection Source", typeInfo);
}
 
@@ -838,7 +838,7 @@ public abstract class StreamExecutionEnvironment {
// private helper for passing different names
private  DataStreamSource 
fromParallelCollection(SplittableIterator iterator, TypeInformation
typeInfo, String operatorName) {
-   return addSource(new 
FromSplittableIteratorFunction(iterator), operatorName).returns(typeInfo);
+   return addSource(new 
FromSplittableIteratorFunction<>(iterator), operatorName, typeInfo);
}
 
/**
@@ -1033,8 +1033,8 @@ public abstract class StreamExecutionEnvironment {
// private helper for passing different names
private  DataStreamSource createInput(InputFormat 
inputFormat,
TypeInformation typeInfo, String sourceName) {
-   FileSourceFunction function = new 
FileSourceFunction(inputFormat, typeInfo);
-   return addSource(function, sourceName).returns(typeInfo);
+   FileSourceFunction function = new 
FileSourceFunction<>(inputFormat, typeInfo);
+   return addSource(function, sourceName, typeInfo);
}
 
/**
@@ -1136,7 +1136,7 @@ public abstract class StreamExecutionEnvironment {
sourceOperator = new StreamSource<>(function);
}
 
-   return new DataStreamSource(this, typeInfo, 
sourceOperator, isParallel, sourceName);
+   return new DataStreamSource<>(this, typeInfo, sourceOperator, 
isParallel, sourceName);
}
 
/**

http://git-wip-us.apache.org/repos/asf/flink/blob/30486905/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
--
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 7a4d6f8..cf48160 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -435,7 +435,7 @@ public class DataStreamTest extends 
StreamingMultipleProgramsTestBase {
DataStreamSource> src = env.fromElements(new 
Tuple2<>(0L, 0L));
env.setParallelism(10);
 
-   SingleOutputStreamOperator map = src.map(new 
MapFunction, Long>() {
+   SingleOutputStreamOperator map = src.map(new 
MapFunction, Long>() {
@Override
public Long map(Tuple2 value) throws 
Exception {
return null;
@@ -759,7 +759,7 @@ public class DataStreamTest extends 
StreamingMultipleProgramsTestBase {
 
@SuppressWarnings("rawtypes,unchecked")
private static Integer createDownStreamId(ConnectedStreams dataStream) {
-   SingleOutputStreamOperator coMap = dataStream.map(new 
CoMapFunction, Tuple2, Object>() {
+   SingleOutputStreamOperator coMap = dataStream.map(new 
CoMapFunction, Tuple2, Object>() {
private static final long serial

[2/2] flink git commit: [FLINK-3548] [api-breaking] Remove unnecessary generic parameter from SingleOutputStreamOperator

2016-02-29 Thread aljoscha
[FLINK-3548] [api-breaking] Remove unnecessary generic parameter from 
SingleOutputStreamOperator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30486905
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30486905
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30486905

Branch: refs/heads/release-1.0
Commit: 30486905b81e9150b5efdc81defa98513c7032dc
Parents: 74c62b0
Author: Aljoscha Krettek 
Authored: Mon Feb 29 20:20:07 2016 +0100
Committer: Aljoscha Krettek 
Committed: Mon Feb 29 21:37:46 2016 +0100

--
 .../apache/flink/storm/api/FlinkTopology.java   |  14 +--
 .../api/datastream/AllWindowedStream.java   |  54 +-
 .../api/datastream/ConnectedStreams.java|  20 ++--
 .../streaming/api/datastream/DataStream.java|  24 ++---
 .../api/datastream/DataStreamSource.java|   4 +-
 .../api/datastream/IterativeStream.java |  17 +--
 .../streaming/api/datastream/KeyedStream.java   |  38 +++
 .../datastream/SingleOutputStreamOperator.java  |  77 +++---
 .../api/datastream/StreamProjection.java| 104 +--
 .../api/datastream/WindowedStream.java  |  54 +-
 .../environment/StreamExecutionEnvironment.java |  12 +--
 .../flink/streaming/api/DataStreamTest.java |   4 +-
 .../apache/flink/streaming/api/IterateTest.java |   9 +-
 .../flink/streaming/api/TypeFillTest.java   |   2 +-
 .../api/graph/StreamGraphGeneratorTest.java |  10 +-
 .../flink/streaming/api/scala/DataStream.scala  |  18 ++--
 .../StreamingScalaAPICompletenessTest.scala |   2 +-
 17 files changed, 211 insertions(+), 252 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/30486905/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
--
diff --git 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
index 811cfb4..6706a91 100644
--- 
a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
+++ 
b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
@@ -200,7 +200,7 @@ public class FlinkTopology {
SplitStream> splitSource 
= multiSource
.split(new 
StormStreamSelector());
for (String streamId : sourceStreams.keySet()) {
-   SingleOutputStreamOperator 
outStream = splitSource.select(streamId)
+   SingleOutputStreamOperator 
outStream = splitSource.select(streamId)
.map(new 
SplitStreamMapper());

outStream.getTransformation().setOutputType(declarer.getOutputType(streamId));
outputStreams.put(streamId, outStream);
@@ -299,7 +299,7 @@ public class FlinkTopology {
inputStreams.put(streamId, 
processInput(boltId, userBolt, streamId, grouping, producer));
}
 
-   final SingleOutputStreamOperator 
outputStream = createOutput(boltId,
+   final SingleOutputStreamOperator 
outputStream = createOutput(boltId,
userBolt, inputStreams);
 
if (common.is_set_parallelism_hint()) {
@@ -359,7 +359,7 @@ public class FlinkTopology {
}
 
@SuppressWarnings({ "unchecked", "rawtypes" })
-   private SingleOutputStreamOperator createOutput(String boltId, 
IRichBolt bolt,
+   private SingleOutputStreamOperator createOutput(String boltId, 
IRichBolt bolt,
Map> inputStreams) {
assert (boltId != null);
assert (bolt != null);
@@ -403,7 +403,7 @@ public class FlinkTopology {
final HashMap boltOutputs = 
this.outputStreams.get(boltId);
final FlinkOutputFieldsDeclarer declarer = 
this.declarers.get(boltId);
 
-   final SingleOutputStreamOperator outputStream;
+   final SingleOutputStreamOperator outputStream;
 
if (boltOutputs.size() < 2) { // single output stream or sink
String outputStreamId;
@@ -415,7 +415,7 @@ public class FlinkTopology {
 
final TypeInformation outType = 
declarer.getOutputType(outputStreamId);
 
-   final SingleOutputStreamOperator outStream;
+   final SingleOutputStreamOperator outStream;
 

flink git commit: [FLINK-3540] Shade org.htrace in flink-shaded-hadoop to get rid of its Guava dependency

2016-02-29 Thread rmetzger
Repository: flink
Updated Branches:
  refs/heads/release-1.0 ae5b4573b -> 74c62b0b8


[FLINK-3540] Shade org.htrace in flink-shaded-hadoop to get rid of its Guava 
dependency

This closes #1737


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74c62b0b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74c62b0b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74c62b0b

Branch: refs/heads/release-1.0
Commit: 74c62b0b871d271f0dca18b88be64126a53774f6
Parents: ae5b457
Author: Robert Metzger 
Authored: Mon Feb 29 16:13:02 2016 +0100
Committer: Robert Metzger 
Committed: Mon Feb 29 21:11:24 2016 +0100

--
 flink-shaded-hadoop/pom.xml | 5 +
 1 file changed, 5 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/74c62b0b/flink-shaded-hadoop/pom.xml
--
diff --git a/flink-shaded-hadoop/pom.xml b/flink-shaded-hadoop/pom.xml
index 1920737..1e8bdf4 100644
--- a/flink-shaded-hadoop/pom.xml
+++ b/flink-shaded-hadoop/pom.xml
@@ -117,6 +117,7 @@ under the License.

io.netty:netty:*

org.apache.curator:*

org.apache.hadoop:*
+   
org.htrace:htrace-core
 


net.java.dev.jets3t:jets3t
@@ -152,6 +153,10 @@ under the License.

org.apache.commons.httpclient

org.apache.flink.hadoop.shaded.org.apache.commons.httpclient

+   
+   
org.htrace
+   
org.apache.flink.hadoop.shaded.org.htrace
+   






flink git commit: [FLINK-3540] Shade org.htrace in flink-shaded-hadoop to get rid of its Guava dependency

2016-02-29 Thread rmetzger
Repository: flink
Updated Branches:
  refs/heads/master 9922d10a0 -> 51fc29812


[FLINK-3540] Shade org.htrace in flink-shaded-hadoop to get rid of its Guava 
dependency

This closes #1737


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51fc2981
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51fc2981
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51fc2981

Branch: refs/heads/master
Commit: 51fc298128405d4ce0e047185ac1c6e5b1753546
Parents: 9922d10
Author: Robert Metzger 
Authored: Mon Feb 29 16:13:02 2016 +0100
Committer: Robert Metzger 
Committed: Mon Feb 29 21:10:28 2016 +0100

--
 flink-shaded-hadoop/pom.xml | 5 +
 1 file changed, 5 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/51fc2981/flink-shaded-hadoop/pom.xml
--
diff --git a/flink-shaded-hadoop/pom.xml b/flink-shaded-hadoop/pom.xml
index af49b92..6f4d441 100644
--- a/flink-shaded-hadoop/pom.xml
+++ b/flink-shaded-hadoop/pom.xml
@@ -117,6 +117,7 @@ under the License.

io.netty:netty:*

org.apache.curator:*

org.apache.hadoop:*
+   
org.htrace:htrace-core
 


net.java.dev.jets3t:jets3t
@@ -152,6 +153,10 @@ under the License.

org.apache.commons.httpclient

org.apache.flink.hadoop.shaded.org.apache.commons.httpclient

+   
+   
org.htrace
+   
org.apache.flink.hadoop.shaded.org.htrace
+   






[2/2] flink git commit: [FLINK-3536] Make clearer distinction between event time and processing time

2016-02-29 Thread aljoscha
[FLINK-3536] Make clearer distinction between event time and processing time

This brings it more in line with *ProcessingTimeWindows and makes it
clear what type of window assigner it is.

The old name, i.e. SlidingTimeWindows and TumblingTimeWindows is still
available but deprecated.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9922d10a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9922d10a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9922d10a

Branch: refs/heads/master
Commit: 9922d10a0f3e291bb7e6f75ccb70baecb5c2bcd9
Parents: 0ac2b1a
Author: Aljoscha Krettek 
Authored: Mon Feb 29 14:56:29 2016 +0100
Committer: Aljoscha Krettek 
Committed: Mon Feb 29 20:29:11 2016 +0100

--
 docs/apis/streaming/index.md|  16 +--
 docs/apis/streaming/time.md |   2 +-
 docs/apis/streaming/windows.md  |  31 +++--
 .../streaming/examples/join/WindowJoin.java |   4 +-
 .../scala/examples/join/WindowJoin.scala|   4 +-
 .../api/datastream/CoGroupedStreams.java|   2 +-
 .../streaming/api/datastream/DataStream.java|  12 +-
 .../streaming/api/datastream/JoinedStreams.java |   2 +-
 .../streaming/api/datastream/KeyedStream.java   |  12 +-
 .../assigners/SlidingEventTimeWindows.java  | 112 +++
 .../assigners/SlidingProcessingTimeWindows.java |   4 +-
 .../windowing/assigners/SlidingTimeWindows.java |  73 ++--
 .../assigners/TumblingEventTimeWindows.java |  98 
 .../TumblingProcessingTimeWindows.java  |   4 +-
 .../assigners/TumblingTimeWindows.java  |  60 ++
 .../windowing/AllWindowTranslationTest.java |  36 +++---
 .../operators/windowing/CoGroupJoinITCase.java  |   8 +-
 .../windowing/NonKeyedWindowOperatorTest.java   |  10 +-
 .../windowing/TimeWindowTranslationTest.java|   8 +-
 .../operators/windowing/WindowFoldITCase.java   |   6 +-
 .../operators/windowing/WindowOperatorTest.java |  12 +-
 .../windowing/WindowTranslationTest.java|  30 ++---
 .../streaming/timestamp/TimestampITCase.java|   6 +-
 .../streaming/api/scala/CoGroupedStreams.scala  |   2 +-
 .../flink/streaming/api/scala/DataStream.scala  |   4 +-
 .../streaming/api/scala/JoinedStreams.scala |   2 +-
 .../flink/streaming/api/scala/KeyedStream.scala |   4 +-
 .../api/scala/AllWindowTranslationTest.scala|  26 ++---
 .../streaming/api/scala/CoGroupJoinITCase.scala |   8 +-
 .../streaming/api/scala/WindowFoldITCase.scala  |   6 +-
 .../api/scala/WindowTranslationTest.scala   |  22 ++--
 31 files changed, 364 insertions(+), 262 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/docs/apis/streaming/index.md
--
diff --git a/docs/apis/streaming/index.md b/docs/apis/streaming/index.md
index b8a3541..3741a46 100644
--- a/docs/apis/streaming/index.md
+++ b/docs/apis/streaming/index.md
@@ -293,7 +293,7 @@ keyedStream.maxBy("key");
 key according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
 See windows for a complete description 
of windows.
 {% highlight java %}
-dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.seconds(5))); // Last 5 
seconds of data
+dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // 
Last 5 seconds of data
 {% endhighlight %}
 
   
@@ -307,7 +307,7 @@ 
dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.seconds(5))); // Last 5 s
   WARNING: This is in many cases a 
non-parallel transformation. All records will be
gathered in one task for the windowAll operator.
   {% highlight java %}
-dataStream.windowAll(TumblingTimeWindows.of(Time.seconds(5))); // Last 5 
seconds of data
+dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 
seconds of data
   {% endhighlight %}
   
 
@@ -410,7 +410,7 @@ dataStream.union(otherStream1, otherStream2, ...);
 {% highlight java %}
 dataStream.join(otherStream)
 .where(0).equalTo(1)
-.window(TumblingTimeWindows.of(Time.seconds(3)))
+.window(TumblingEventTimeWindows.of(Time.seconds(3)))
 .apply (new JoinFunction () {...});
 {% endhighlight %}
   
@@ -422,7 +422,7 @@ dataStream.join(otherStream)
 {% highlight java %}
 dataStream.coGroup(otherStream)
 .where(0).equalTo(1)
-.window(TumblingTimeWindows.of(Time.seconds(3)))
+.window(TumblingEventTimeWindows.of(Time.seconds(3)))
 .apply (new CoGroupFunction () {...});
 {% endhighlight %}
   
@@ -669,7 +669,7 @@ keyedStream.maxBy("key")
 key according to some characteristic (e.g., the data that arrived 
within the last 5 sec

[1/2] flink git commit: [FLINK-3536] Make clearer distinction between event time and processing time

2016-02-29 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/master 0ac2b1a7b -> 9922d10a0


http://git-wip-us.apache.org/repos/asf/flink/blob/9922d10a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
--
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index a676757..f4101cb 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.function.{WindowFunction, 
AllWindowFunction}
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import 
org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows,
 TumblingTimeWindows, SlidingTimeWindows}
+import 
org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows,
 TumblingEventTimeWindows, SlidingEventTimeWindows}
 import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, 
TimeEvictor}
 import org.apache.flink.streaming.api.windowing.time.Time
 import 
org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, 
CountTrigger}
@@ -59,7 +59,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 val reducer = new DummyReducer
 
 val window1 = source
-  .windowAll(SlidingTimeWindows.of(
+  .windowAll(SlidingEventTimeWindows.of(
 Time.of(1, TimeUnit.SECONDS),
 Time.of(100, TimeUnit.MILLISECONDS)))
   .reduce(reducer)
@@ -73,7 +73,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
 val window2 = source
   .keyBy(0)
-  .windowAll(SlidingTimeWindows.of(
+  .windowAll(SlidingEventTimeWindows.of(
 Time.of(1, TimeUnit.SECONDS),
 Time.of(100, TimeUnit.MILLISECONDS)))
   .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() 
{
@@ -100,7 +100,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 val reducer = new DummyReducer
 
 val window1 = source
-  .windowAll(SlidingTimeWindows.of(
+  .windowAll(SlidingEventTimeWindows.of(
 Time.of(1, TimeUnit.SECONDS),
 Time.of(100, TimeUnit.MILLISECONDS)))
   .trigger(CountTrigger.of(100))
@@ -114,13 +114,13 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _, _]])
 val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _, 
_]]
 assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
-assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
+
assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
 assertTrue(
   
winOperator1.getWindowBufferFactory.isInstanceOf[ReducingWindowBuffer.Factory[_]])
 
 
 val window2 = source
-  .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+  .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
   .trigger(CountTrigger.of(100))
   .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() 
{
   def apply(
@@ -137,7 +137,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _, _]])
 val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _, 
_]]
 assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
-
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
+
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
 
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[ListWindowBuffer.Factory[_]])
   }
 
@@ -170,7 +170,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
 
 val window2 = source
-  .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+  .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
   .trigger(CountTrigger.of(100))
   .evictor(CountEvictor.of(1000))
   .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() 
{
@@ -189,7 +189,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 val winOperator2 = 
operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _, _]]
 assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
 assertTrue(winOperator2.getEvictor.isInstanceOf

[1/2] flink git commit: [FLINK-3536] Make clearer distinction between event time and processing time

2016-02-29 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/release-1.0 9b9f84e80 -> ae5b4573b


http://git-wip-us.apache.org/repos/asf/flink/blob/ae5b4573/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
--
diff --git 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index a676757..f4101cb 100644
--- 
a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ 
b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.function.{WindowFunction, 
AllWindowFunction}
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import 
org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows,
 TumblingTimeWindows, SlidingTimeWindows}
+import 
org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows,
 TumblingEventTimeWindows, SlidingEventTimeWindows}
 import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, 
TimeEvictor}
 import org.apache.flink.streaming.api.windowing.time.Time
 import 
org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, 
CountTrigger}
@@ -59,7 +59,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 val reducer = new DummyReducer
 
 val window1 = source
-  .windowAll(SlidingTimeWindows.of(
+  .windowAll(SlidingEventTimeWindows.of(
 Time.of(1, TimeUnit.SECONDS),
 Time.of(100, TimeUnit.MILLISECONDS)))
   .reduce(reducer)
@@ -73,7 +73,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
 val window2 = source
   .keyBy(0)
-  .windowAll(SlidingTimeWindows.of(
+  .windowAll(SlidingEventTimeWindows.of(
 Time.of(1, TimeUnit.SECONDS),
 Time.of(100, TimeUnit.MILLISECONDS)))
   .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() 
{
@@ -100,7 +100,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 val reducer = new DummyReducer
 
 val window1 = source
-  .windowAll(SlidingTimeWindows.of(
+  .windowAll(SlidingEventTimeWindows.of(
 Time.of(1, TimeUnit.SECONDS),
 Time.of(100, TimeUnit.MILLISECONDS)))
   .trigger(CountTrigger.of(100))
@@ -114,13 +114,13 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _, _]])
 val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _, 
_]]
 assertTrue(winOperator1.getTrigger.isInstanceOf[CountTrigger[_]])
-assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
+
assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingEventTimeWindows])
 assertTrue(
   
winOperator1.getWindowBufferFactory.isInstanceOf[ReducingWindowBuffer.Factory[_]])
 
 
 val window2 = source
-  .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+  .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
   .trigger(CountTrigger.of(100))
   .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() 
{
   def apply(
@@ -137,7 +137,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _, _]])
 val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _, 
_]]
 assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
-
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
+
assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingEventTimeWindows])
 
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[ListWindowBuffer.Factory[_]])
   }
 
@@ -170,7 +170,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 
 
 val window2 = source
-  .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+  .windowAll(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
   .trigger(CountTrigger.of(100))
   .evictor(CountEvictor.of(1000))
   .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() 
{
@@ -189,7 +189,7 @@ class AllWindowTranslationTest extends 
StreamingMultipleProgramsTestBase {
 val winOperator2 = 
operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _, _]]
 assertTrue(winOperator2.getTrigger.isInstanceOf[CountTrigger[_]])
 assertTrue(winOperator2.getEvictor.isInsta

[2/2] flink git commit: [FLINK-3536] Make clearer distinction between event time and processing time

2016-02-29 Thread aljoscha
[FLINK-3536] Make clearer distinction between event time and processing time

This brings it more in line with *ProcessingTimeWindows and makes it
clear what type of window assigner it is.

The old name, i.e. SlidingTimeWindows and TumblingTimeWindows is still
available but deprecated.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae5b4573
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae5b4573
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae5b4573

Branch: refs/heads/release-1.0
Commit: ae5b4573b8e316042077ab85690d5d773c031865
Parents: 9b9f84e
Author: Aljoscha Krettek 
Authored: Mon Feb 29 14:56:29 2016 +0100
Committer: Aljoscha Krettek 
Committed: Mon Feb 29 20:39:51 2016 +0100

--
 docs/apis/streaming/index.md|  16 +--
 docs/apis/streaming/time.md |   2 +-
 docs/apis/streaming/windows.md  |  31 +++--
 .../streaming/examples/join/WindowJoin.java |   4 +-
 .../scala/examples/join/WindowJoin.scala|   4 +-
 .../api/datastream/CoGroupedStreams.java|   2 +-
 .../streaming/api/datastream/DataStream.java|  12 +-
 .../streaming/api/datastream/JoinedStreams.java |   2 +-
 .../streaming/api/datastream/KeyedStream.java   |  12 +-
 .../assigners/SlidingEventTimeWindows.java  | 112 +++
 .../assigners/SlidingProcessingTimeWindows.java |   4 +-
 .../windowing/assigners/SlidingTimeWindows.java |  73 ++--
 .../assigners/TumblingEventTimeWindows.java |  98 
 .../TumblingProcessingTimeWindows.java  |   4 +-
 .../assigners/TumblingTimeWindows.java  |  60 ++
 .../windowing/AllWindowTranslationTest.java |  36 +++---
 .../operators/windowing/CoGroupJoinITCase.java  |   8 +-
 .../windowing/NonKeyedWindowOperatorTest.java   |  10 +-
 .../windowing/TimeWindowTranslationTest.java|   8 +-
 .../operators/windowing/WindowFoldITCase.java   |   6 +-
 .../operators/windowing/WindowOperatorTest.java |  12 +-
 .../windowing/WindowTranslationTest.java|  30 ++---
 .../streaming/timestamp/TimestampITCase.java|   6 +-
 .../streaming/api/scala/CoGroupedStreams.scala  |   2 +-
 .../flink/streaming/api/scala/DataStream.scala  |   4 +-
 .../streaming/api/scala/JoinedStreams.scala |   2 +-
 .../flink/streaming/api/scala/KeyedStream.scala |   4 +-
 .../api/scala/AllWindowTranslationTest.scala|  26 ++---
 .../streaming/api/scala/CoGroupJoinITCase.scala |   8 +-
 .../streaming/api/scala/WindowFoldITCase.scala  |   6 +-
 .../api/scala/WindowTranslationTest.scala   |  22 ++--
 31 files changed, 364 insertions(+), 262 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/ae5b4573/docs/apis/streaming/index.md
--
diff --git a/docs/apis/streaming/index.md b/docs/apis/streaming/index.md
index b8a3541..3741a46 100644
--- a/docs/apis/streaming/index.md
+++ b/docs/apis/streaming/index.md
@@ -293,7 +293,7 @@ keyedStream.maxBy("key");
 key according to some characteristic (e.g., the data that arrived 
within the last 5 seconds).
 See windows for a complete description 
of windows.
 {% highlight java %}
-dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.seconds(5))); // Last 5 
seconds of data
+dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // 
Last 5 seconds of data
 {% endhighlight %}
 
   
@@ -307,7 +307,7 @@ 
dataStream.keyBy(0).window(TumblingTimeWindows.of(Time.seconds(5))); // Last 5 s
   WARNING: This is in many cases a 
non-parallel transformation. All records will be
gathered in one task for the windowAll operator.
   {% highlight java %}
-dataStream.windowAll(TumblingTimeWindows.of(Time.seconds(5))); // Last 5 
seconds of data
+dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 
seconds of data
   {% endhighlight %}
   
 
@@ -410,7 +410,7 @@ dataStream.union(otherStream1, otherStream2, ...);
 {% highlight java %}
 dataStream.join(otherStream)
 .where(0).equalTo(1)
-.window(TumblingTimeWindows.of(Time.seconds(3)))
+.window(TumblingEventTimeWindows.of(Time.seconds(3)))
 .apply (new JoinFunction () {...});
 {% endhighlight %}
   
@@ -422,7 +422,7 @@ dataStream.join(otherStream)
 {% highlight java %}
 dataStream.coGroup(otherStream)
 .where(0).equalTo(1)
-.window(TumblingTimeWindows.of(Time.seconds(3)))
+.window(TumblingEventTimeWindows.of(Time.seconds(3)))
 .apply (new CoGroupFunction () {...});
 {% endhighlight %}
   
@@ -669,7 +669,7 @@ keyedStream.maxBy("key")
 key according to some characteristic (e.g., the data that arrived 
within the last 

flink git commit: [FLINK-3538] [api-breaking] Streamline Scala DataStream.join/coGroup

2016-02-29 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/release-1.0 3adff87dd -> 9b9f84e80


[FLINK-3538] [api-breaking] Streamline Scala DataStream.join/coGroup

This enforces that the user always has to specify keys for both inputs
before .window() can be called.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9b9f84e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9b9f84e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9b9f84e8

Branch: refs/heads/release-1.0
Commit: 9b9f84e800b19b585e147ab6add4eb946356caa0
Parents: 3adff87
Author: Aljoscha Krettek 
Authored: Mon Feb 29 17:02:38 2016 +0100
Committer: Aljoscha Krettek 
Committed: Mon Feb 29 20:27:56 2016 +0100

--
 .../scala/examples/join/WindowJoin.scala|   4 +-
 .../streaming/api/scala/CoGroupedStreams.scala  | 326 +++--
 .../flink/streaming/api/scala/DataStream.scala  |   8 +-
 .../streaming/api/scala/JoinedStreams.scala | 357 +++
 .../StreamingScalaAPICompletenessTest.scala |   6 +-
 5 files changed, 261 insertions(+), 440 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/9b9f84e8/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
--
diff --git 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
index 81f12dc..50a2216 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
+++ 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.streaming.scala.examples.join
 
-import java.util.concurrent.TimeUnit
-
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala._
@@ -58,7 +56,7 @@ object WindowJoin {
 val joined = grades.join(salaries)
 .where(_.name)
 .equalTo(_.name)
-.window(SlidingTimeWindows.of(Time.of(2, TimeUnit.SECONDS), Time.of(1, 
TimeUnit.SECONDS)))
+.window(SlidingTimeWindows.of(Time.seconds(2), Time.seconds(1)))
 .apply { (g, s) => Person(g.name, g.grade, s.salary) }
 
 if (params.has("output")) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9b9f84e8/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
--
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
index f4ab2ee..4cce9e2 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
@@ -56,252 +56,164 @@ import scala.collection.JavaConverters._
  * } }}}
  */
 @Public
-object CoGroupedStreams {
+class CoGroupedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) 
{
 
   /**
-   * A co-group operation that does not yet have its [[KeySelector]]s defined.
-   *
-   * @tparam T1 Type of the elements from the first input
-   * @tparam T2 Type of the elements from the second input
+   * Specifies a [[KeySelector]] for elements from the first input.
*/
-  class Unspecified[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
-
-/**
- * Specifies a [[KeySelector]] for elements from the first input.
- */
-def where[KEY: TypeInformation](keySelector: T1 => KEY): WithKey[T1, T2, 
KEY] = {
-  val cleanFun = clean(keySelector)
-  val keyType = implicitly[TypeInformation[KEY]]
-  val javaSelector = new KeySelector[T1, KEY] with 
ResultTypeQueryable[KEY] {
-def getKey(in: T1) = cleanFun(in)
-override def getProducedType: TypeInformation[KEY] = keyType
-  }
-  new WithKey[T1, T2, KEY](input1, input2, javaSelector, null, keyType)
-}
-
-/**
- * Specifies a [[KeySelector]] for elements from the second input.
- */
-def equalTo[KEY: TypeInformation](keySelector: T2 => KEY): WithKey[T1, T2, 
KEY] = {
-  val cleanFun = clean(keySelector)
-  val keyType = implicitly[TypeInformation[KEY]]
-  val javaSelector = new KeySelector[T2, KEY] with 
ResultTypeQueryable[KEY] {
-def getKey(in: T2) = cleanFun(in)
-override def getProducedType: TypeIn

flink git commit: [FLINK-3538] [api-breaking] Streamline Scala DataStream.join/coGroup

2016-02-29 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/master 9580b8fe5 -> 0ac2b1a7b


[FLINK-3538] [api-breaking] Streamline Scala DataStream.join/coGroup

This enforces that the user always has to specify keys for both inputs
before .window() can be called.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ac2b1a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ac2b1a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ac2b1a7

Branch: refs/heads/master
Commit: 0ac2b1a7b4b44d0e5722532958e5bda00615dbb4
Parents: 9580b8f
Author: Aljoscha Krettek 
Authored: Mon Feb 29 17:02:38 2016 +0100
Committer: Aljoscha Krettek 
Committed: Mon Feb 29 20:26:14 2016 +0100

--
 .../scala/examples/join/WindowJoin.scala|   4 +-
 .../streaming/api/scala/CoGroupedStreams.scala  | 326 +++--
 .../flink/streaming/api/scala/DataStream.scala  |   8 +-
 .../streaming/api/scala/JoinedStreams.scala | 357 +++
 .../StreamingScalaAPICompletenessTest.scala |   6 +-
 5 files changed, 261 insertions(+), 440 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/0ac2b1a7/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
--
diff --git 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
index 81f12dc..50a2216 100644
--- 
a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
+++ 
b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.streaming.scala.examples.join
 
-import java.util.concurrent.TimeUnit
-
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala._
@@ -58,7 +56,7 @@ object WindowJoin {
 val joined = grades.join(salaries)
 .where(_.name)
 .equalTo(_.name)
-.window(SlidingTimeWindows.of(Time.of(2, TimeUnit.SECONDS), Time.of(1, 
TimeUnit.SECONDS)))
+.window(SlidingTimeWindows.of(Time.seconds(2), Time.seconds(1)))
 .apply { (g, s) => Person(g.name, g.grade, s.salary) }
 
 if (params.has("output")) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0ac2b1a7/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
--
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
index f4ab2ee..4cce9e2 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
@@ -56,252 +56,164 @@ import scala.collection.JavaConverters._
  * } }}}
  */
 @Public
-object CoGroupedStreams {
+class CoGroupedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) 
{
 
   /**
-   * A co-group operation that does not yet have its [[KeySelector]]s defined.
-   *
-   * @tparam T1 Type of the elements from the first input
-   * @tparam T2 Type of the elements from the second input
+   * Specifies a [[KeySelector]] for elements from the first input.
*/
-  class Unspecified[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
-
-/**
- * Specifies a [[KeySelector]] for elements from the first input.
- */
-def where[KEY: TypeInformation](keySelector: T1 => KEY): WithKey[T1, T2, 
KEY] = {
-  val cleanFun = clean(keySelector)
-  val keyType = implicitly[TypeInformation[KEY]]
-  val javaSelector = new KeySelector[T1, KEY] with 
ResultTypeQueryable[KEY] {
-def getKey(in: T1) = cleanFun(in)
-override def getProducedType: TypeInformation[KEY] = keyType
-  }
-  new WithKey[T1, T2, KEY](input1, input2, javaSelector, null, keyType)
-}
-
-/**
- * Specifies a [[KeySelector]] for elements from the second input.
- */
-def equalTo[KEY: TypeInformation](keySelector: T2 => KEY): WithKey[T1, T2, 
KEY] = {
-  val cleanFun = clean(keySelector)
-  val keyType = implicitly[TypeInformation[KEY]]
-  val javaSelector = new KeySelector[T2, KEY] with 
ResultTypeQueryable[KEY] {
-def getKey(in: T2) = cleanFun(in)
-override def getProducedType: TypeInformation[

flink git commit: [FLINK-3535] [runtime-web] Decrease log verbosity of StackTraceSampleCoordinator

2016-02-29 Thread uce
Repository: flink
Updated Branches:
  refs/heads/release-1.0 70ce072a4 -> 3adff87dd


[FLINK-3535] [runtime-web] Decrease log verbosity of StackTraceSampleCoordinator

This closes #1732.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3adff87d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3adff87d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3adff87d

Branch: refs/heads/release-1.0
Commit: 3adff87ddec7cb6c2aa29e4e23360e550ea8c118
Parents: 70ce072
Author: Ufuk Celebi 
Authored: Mon Feb 29 14:14:35 2016 +0100
Committer: Ufuk Celebi 
Committed: Mon Feb 29 20:10:18 2016 +0100

--
 .../runtime/webmonitor/BackPressureStatsTracker.java  |  9 +++--
 .../webmonitor/StackTraceSampleCoordinator.java   |  8 ++--
 .../webmonitor/StackTraceSampleCoordinatorTest.java   | 14 +-
 .../runtime/messages/StackTraceSampleMessages.scala   |  6 +-
 4 files changed, 23 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/3adff87d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
index b9b8a47..db88ffd 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
@@ -33,6 +33,7 @@ import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -170,6 +171,10 @@ public class BackPressureStatsTracker {
if (executionContext != null) {
pendingStats.add(vertex);
 
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Triggering stack 
trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
+   }
+
Future sample = 
coordinator.triggerStackTraceSample(

vertex.getTaskVertices(),
numSamples,
@@ -246,7 +251,7 @@ public class BackPressureStatsTracker {
OperatorBackPressureStats stats 
= createStatsFromSample(success);
operatorStatsCache.put(vertex, 
stats);
} else {
-   LOG.warn("Failed to gather 
stack trace sample.", failure);
+   LOG.debug("Failed to gather 
stack trace sample.", failure);
}
} catch (Throwable t) {
LOG.error("Error during stats 
completion.", t);
@@ -278,7 +283,7 @@ public class BackPressureStatsTracker {
if (sampledTasks.contains(taskId)) {
subtaskIndexMap.put(taskId, 
task.getParallelSubtaskIndex());
} else {
-   throw new RuntimeException("Outdated 
sample. A task, which is part of the " +
+   LOG.debug("Outdated sample. A task, 
which is part of the " +
"sample has been 
reset.");
}
}

http://git-wip-us.apache.org/repos/asf/flink/blob/3adff87d/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
index e7b292f..bbfb530 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
@@ -179,7 +179,9 @@ public class StackTraceSampleCoordinator {

pending.getSampleId());
 
 

flink git commit: [FLINK-3535] [runtime-web] Decrease log verbosity of StackTraceSampleCoordinator

2016-02-29 Thread uce
Repository: flink
Updated Branches:
  refs/heads/master 734ba01dd -> 9580b8fe5


[FLINK-3535] [runtime-web] Decrease log verbosity of StackTraceSampleCoordinator

This closes #1732.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9580b8fe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9580b8fe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9580b8fe

Branch: refs/heads/master
Commit: 9580b8fe5a5ec8b3b23ffa7e09123b1e160e2016
Parents: 734ba01
Author: Ufuk Celebi 
Authored: Mon Feb 29 14:14:35 2016 +0100
Committer: Ufuk Celebi 
Committed: Mon Feb 29 20:09:26 2016 +0100

--
 .../runtime/webmonitor/BackPressureStatsTracker.java  |  9 +++--
 .../webmonitor/StackTraceSampleCoordinator.java   |  8 ++--
 .../webmonitor/StackTraceSampleCoordinatorTest.java   | 14 +-
 .../runtime/messages/StackTraceSampleMessages.scala   |  6 +-
 4 files changed, 23 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/9580b8fe/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
index b9b8a47..db88ffd 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
@@ -33,6 +33,7 @@ import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -170,6 +171,10 @@ public class BackPressureStatsTracker {
if (executionContext != null) {
pendingStats.add(vertex);
 
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Triggering stack 
trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
+   }
+
Future sample = 
coordinator.triggerStackTraceSample(

vertex.getTaskVertices(),
numSamples,
@@ -246,7 +251,7 @@ public class BackPressureStatsTracker {
OperatorBackPressureStats stats 
= createStatsFromSample(success);
operatorStatsCache.put(vertex, 
stats);
} else {
-   LOG.warn("Failed to gather 
stack trace sample.", failure);
+   LOG.debug("Failed to gather 
stack trace sample.", failure);
}
} catch (Throwable t) {
LOG.error("Error during stats 
completion.", t);
@@ -278,7 +283,7 @@ public class BackPressureStatsTracker {
if (sampledTasks.contains(taskId)) {
subtaskIndexMap.put(taskId, 
task.getParallelSubtaskIndex());
} else {
-   throw new RuntimeException("Outdated 
sample. A task, which is part of the " +
+   LOG.debug("Outdated sample. A task, 
which is part of the " +
"sample has been 
reset.");
}
}

http://git-wip-us.apache.org/repos/asf/flink/blob/9580b8fe/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
index e7b292f..bbfb530 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
@@ -179,7 +179,9 @@ public class StackTraceSampleCoordinator {

pending.getSampleId());
 
   

flink git commit: [FLINK-3534] [runtime] Prevent canceling Execution from failing

2016-02-29 Thread uce
Repository: flink
Updated Branches:
  refs/heads/master 5f0af06fe -> 734ba01dd


[FLINK-3534] [runtime] Prevent canceling Execution from failing

This closes #1735.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/734ba01d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/734ba01d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/734ba01d

Branch: refs/heads/master
Commit: 734ba01dda51ab93d10e6c36cf5a0c4c65b28008
Parents: 5f0af06
Author: Ufuk Celebi 
Authored: Mon Feb 29 15:37:45 2016 +0100
Committer: Ufuk Celebi 
Committed: Mon Feb 29 20:06:08 2016 +0100

--
 .../flink/runtime/executiongraph/Execution.java |   5 +
 .../runtime/executiongraph/ExecutionGraph.java  |   2 +-
 .../ExecutionGraphRestartTest.java  | 105 ++-
 .../ExecutionVertexCancelTest.java  |  11 +-
 4 files changed, 114 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/734ba01d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index bc75664..6d5832b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -797,6 +797,11 @@ public class Execution implements Serializable {
return false;
}
 
+   if (current == CANCELING) {
+   cancelingComplete();
+   return false;
+   }
+
if (transitionState(current, FAILED, t)) {
// success (in a manner of speaking)
this.failureCause = t;

http://git-wip-us.apache.org/repos/asf/flink/blob/734ba01d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 0d6de98..ed50bea 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -809,7 +809,7 @@ public class ExecutionGraph implements Serializable {
public void fail(Throwable t) {
while (true) {
JobStatus current = state;
-   if (current == JobStatus.FAILED || current == 
JobStatus.FAILING) {
+   if (current == JobStatus.FAILING || 
current.isTerminalState()) {
return;
}
else if (transitionState(current, JobStatus.FAILING, 
t)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/734ba01d/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 925b574..b1f11fb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -310,7 +311,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
while (deadline.hasTimeLeft() && !success) {
success = true;
for (ExecutionVertex vertex : 
executionGraph.getAllExecutionVertices()) {
-   if (vertex.getExecutionState() != 
ExecutionState.FAILED) {
+   ExecutionState state = 
vertex.getExecutionState();
+   if (state != ExecutionState.FAILED && state != 
ExecutionState.CANCELED) {
 

flink git commit: [FLINK-3534] [runtime] Prevent canceling Execution from failing

2016-02-29 Thread uce
Repository: flink
Updated Branches:
  refs/heads/release-1.0 702627116 -> 70ce072a4


[FLINK-3534] [runtime] Prevent canceling Execution from failing

This closes #1735.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70ce072a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70ce072a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70ce072a

Branch: refs/heads/release-1.0
Commit: 70ce072a484b0e4372f80f47440fdca702bb5042
Parents: 7026271
Author: Ufuk Celebi 
Authored: Mon Feb 29 15:37:45 2016 +0100
Committer: Ufuk Celebi 
Committed: Mon Feb 29 20:06:50 2016 +0100

--
 .../flink/runtime/executiongraph/Execution.java |   5 +
 .../runtime/executiongraph/ExecutionGraph.java  |   2 +-
 .../ExecutionGraphRestartTest.java  | 105 ++-
 .../ExecutionVertexCancelTest.java  |  11 +-
 4 files changed, 114 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/70ce072a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index bc75664..6d5832b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -797,6 +797,11 @@ public class Execution implements Serializable {
return false;
}
 
+   if (current == CANCELING) {
+   cancelingComplete();
+   return false;
+   }
+
if (transitionState(current, FAILED, t)) {
// success (in a manner of speaking)
this.failureCause = t;

http://git-wip-us.apache.org/repos/asf/flink/blob/70ce072a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 0d6de98..ed50bea 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -809,7 +809,7 @@ public class ExecutionGraph implements Serializable {
public void fail(Throwable t) {
while (true) {
JobStatus current = state;
-   if (current == JobStatus.FAILED || current == 
JobStatus.FAILING) {
+   if (current == JobStatus.FAILING || 
current.isTerminalState()) {
return;
}
else if (transitionState(current, JobStatus.FAILING, 
t)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/70ce072a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 925b574..b1f11fb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -310,7 +311,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
while (deadline.hasTimeLeft() && !success) {
success = true;
for (ExecutionVertex vertex : 
executionGraph.getAllExecutionVertices()) {
-   if (vertex.getExecutionState() != 
ExecutionState.FAILED) {
+   ExecutionState state = 
vertex.getExecutionState();
+   if (state != ExecutionState.FAILED && state != 
ExecutionState.CANCELED) {
   

flink git commit: [docs] Update readme with current feature list and streaming example

2016-02-29 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/release-1.0 5b5136e95 -> 702627116


[docs] Update readme with current feature list and streaming example


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70262711
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70262711
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70262711

Branch: refs/heads/release-1.0
Commit: 70262711641358dea28485c0c6926b1bea57bb95
Parents: 5b5136e
Author: Stephan Ewen 
Authored: Mon Feb 29 16:24:47 2016 +0100
Committer: Stephan Ewen 
Committed: Mon Feb 29 16:32:12 2016 +0100

--
 README.md | 67 +-
 1 file changed, 52 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/70262711/README.md
--
diff --git a/README.md b/README.md
index 3cf08c7..41ea37d 100644
--- a/README.md
+++ b/README.md
@@ -1,10 +1,55 @@
 # Apache Flink
 
-Apache Flink is an open source platform for scalable batch and stream data 
processing. Flink supports batch and streaming analytics,
-in one system. Analytical programs can be written in concise and elegant APIs 
in Java and Scala.
+Apache Flink is an open source stream processing framework with powerful 
stream- and batch-processing capabilities.
 
+Learn more about Flink at [http://flink.apache.org/](http://flink.apache.org/)
+
+
+### Features
+
+* A streaming-first runtime that supports both batch processing and data 
streaming programs
+
+* Elegant and fluent APIs in Java and Scala
+
+* A runtime that supports very high throughput and low event latency at the 
same time
+
+* Support for *event time* and *out-of-order* processing in the DataStream 
API, based on the *Dataflow Model*
+
+* Flexible windowing (time, count, sessions, custom triggers) accross 
different time semantics (event time, processing time)
+
+* Fault-tolerance with *exactly-once* processing guarantees
+
+* Natural back-pressure in streaming programs.
+
+* Libraries for Graph processing (batch), Machine Learning (batch), and 
Complex Event Processing (streaming)
+
+* Built-in support for iterative programs (BSP) and in the DataSet (batch) API.
+
+* Custom memory management to for efficient and robust switching between 
in-memory and out-of-core data processing algorithms.
+
+* Compatibility layers for Apache Hadoop MapReduce and Apache Storm.
+
+* Integration with YARN, HDFS, HBase, and other components of the Apache 
Hadoop ecosystem.
+
+
+### Streaming Example
+```scala
+case class WordWithCount(word: String, count: Long)
+
+val text = env.socketTextStream(host, port, '\n')
+
+val windowCounts = text.flatMap { w => w.split("\\s") }
+  .map { w => WordWithCount(w, 1) }
+  .keyBy("word")
+  .timeWindow(Time.seconds(5))
+  .sum("count")
+
+windowCounts.print()
+```
+
+### Batch Example
 ```scala
-case class WordWithCount(word: String, count: Int)
+case class WordWithCount(word: String, count: Long)
 
 val text = env.readTextFile(path)
 
@@ -16,16 +61,6 @@ val counts = text.flatMap { _.split("\\W+") }
 counts.writeAsCsv(outputPath)
 ```
 
-These are some of the unique features of Flink:
-
-* Hybrid batch/streaming runtime that supports batch processing and data 
streaming programs.
-* Custom memory management to guarantee efficient, adaptive, and highly robust 
switching between in-memory and out-of-core data processing algorithms.
-* Flexible and expressive windowing semantics for data stream programs.
-* Built-in program optimizer that chooses the proper runtime operations for 
each program.
-* Custom type analysis and serialization stack for high performance.
-
-
-Learn more about Flink at [http://flink.apache.org/](http://flink.apache.org/)
 
 
 ## Building Apache Flink from Source
@@ -34,21 +69,23 @@ Prerequisites for building Flink:
 
 * Unix-like environment (We use Linux, Mac OS X, Cygwin)
 * git
-* Maven (at least version 3.0.4)
+* Maven (we recommend version 3.0.4)
 * Java 7 or 8
 
 ```
 git clone https://github.com/apache/flink.git
 cd flink
-mvn clean package -DskipTests # this will take up to 5 minutes
+mvn clean package -DskipTests # this will take up to 10 minutes
 ```
 
 Flink is now installed in `build-target`
 
+*NOTE: Maven 3.3.x can build Flink, but will not properly shade away certain 
dependencies. Maven 3.0.3 creates the libraries properly.*
 
 ## Developing Flink
 
 The Flink committers use IntelliJ IDEA and Eclipse IDE to develop the Flink 
codebase.
+We recommend IntelliJ IDEA for developing projects that involve Scala code.
 
 Minimal requirements for an IDE are:
 * Support for Java and Scala (also mixed projects)



flink git commit: [docs] Update readme with current feature list and streaming example

2016-02-29 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/master 405d22236 -> 5f0af06fe


[docs] Update readme with current feature list and streaming example


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f0af06f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f0af06f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f0af06f

Branch: refs/heads/master
Commit: 5f0af06fef3046273f26d0015fe1c9b6df381751
Parents: 405d222
Author: Stephan Ewen 
Authored: Mon Feb 29 16:24:47 2016 +0100
Committer: Stephan Ewen 
Committed: Mon Feb 29 16:28:38 2016 +0100

--
 README.md | 67 +-
 1 file changed, 52 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/5f0af06f/README.md
--
diff --git a/README.md b/README.md
index 3cf08c7..41ea37d 100644
--- a/README.md
+++ b/README.md
@@ -1,10 +1,55 @@
 # Apache Flink
 
-Apache Flink is an open source platform for scalable batch and stream data 
processing. Flink supports batch and streaming analytics,
-in one system. Analytical programs can be written in concise and elegant APIs 
in Java and Scala.
+Apache Flink is an open source stream processing framework with powerful 
stream- and batch-processing capabilities.
 
+Learn more about Flink at [http://flink.apache.org/](http://flink.apache.org/)
+
+
+### Features
+
+* A streaming-first runtime that supports both batch processing and data 
streaming programs
+
+* Elegant and fluent APIs in Java and Scala
+
+* A runtime that supports very high throughput and low event latency at the 
same time
+
+* Support for *event time* and *out-of-order* processing in the DataStream 
API, based on the *Dataflow Model*
+
+* Flexible windowing (time, count, sessions, custom triggers) accross 
different time semantics (event time, processing time)
+
+* Fault-tolerance with *exactly-once* processing guarantees
+
+* Natural back-pressure in streaming programs.
+
+* Libraries for Graph processing (batch), Machine Learning (batch), and 
Complex Event Processing (streaming)
+
+* Built-in support for iterative programs (BSP) and in the DataSet (batch) API.
+
+* Custom memory management to for efficient and robust switching between 
in-memory and out-of-core data processing algorithms.
+
+* Compatibility layers for Apache Hadoop MapReduce and Apache Storm.
+
+* Integration with YARN, HDFS, HBase, and other components of the Apache 
Hadoop ecosystem.
+
+
+### Streaming Example
+```scala
+case class WordWithCount(word: String, count: Long)
+
+val text = env.socketTextStream(host, port, '\n')
+
+val windowCounts = text.flatMap { w => w.split("\\s") }
+  .map { w => WordWithCount(w, 1) }
+  .keyBy("word")
+  .timeWindow(Time.seconds(5))
+  .sum("count")
+
+windowCounts.print()
+```
+
+### Batch Example
 ```scala
-case class WordWithCount(word: String, count: Int)
+case class WordWithCount(word: String, count: Long)
 
 val text = env.readTextFile(path)
 
@@ -16,16 +61,6 @@ val counts = text.flatMap { _.split("\\W+") }
 counts.writeAsCsv(outputPath)
 ```
 
-These are some of the unique features of Flink:
-
-* Hybrid batch/streaming runtime that supports batch processing and data 
streaming programs.
-* Custom memory management to guarantee efficient, adaptive, and highly robust 
switching between in-memory and out-of-core data processing algorithms.
-* Flexible and expressive windowing semantics for data stream programs.
-* Built-in program optimizer that chooses the proper runtime operations for 
each program.
-* Custom type analysis and serialization stack for high performance.
-
-
-Learn more about Flink at [http://flink.apache.org/](http://flink.apache.org/)
 
 
 ## Building Apache Flink from Source
@@ -34,21 +69,23 @@ Prerequisites for building Flink:
 
 * Unix-like environment (We use Linux, Mac OS X, Cygwin)
 * git
-* Maven (at least version 3.0.4)
+* Maven (we recommend version 3.0.4)
 * Java 7 or 8
 
 ```
 git clone https://github.com/apache/flink.git
 cd flink
-mvn clean package -DskipTests # this will take up to 5 minutes
+mvn clean package -DskipTests # this will take up to 10 minutes
 ```
 
 Flink is now installed in `build-target`
 
+*NOTE: Maven 3.3.x can build Flink, but will not properly shade away certain 
dependencies. Maven 3.0.3 creates the libraries properly.*
 
 ## Developing Flink
 
 The Flink committers use IntelliJ IDEA and Eclipse IDE to develop the Flink 
codebase.
+We recommend IntelliJ IDEA for developing projects that involve Scala code.
 
 Minimal requirements for an IDE are:
 * Support for Java and Scala (also mixed projects)



flink git commit: [hotfix] Properly copy stream record in ReducingWindowBuffer and FoldingWindowBuffer

2016-02-29 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/master f881e7079 -> 405d22236


[hotfix] Properly copy stream record in ReducingWindowBuffer and 
FoldingWindowBuffer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/405d2223
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/405d2223
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/405d2223

Branch: refs/heads/master
Commit: 405d2223697344e41aa11cc66cadf6b9afcacd89
Parents: f881e70
Author: Stephan Ewen 
Authored: Fri Feb 26 21:23:56 2016 +0100
Committer: Stephan Ewen 
Committed: Mon Feb 29 13:38:37 2016 +0100

--
 .../runtime/operators/windowing/buffers/FoldingWindowBuffer.java   | 2 +-
 .../runtime/operators/windowing/buffers/ReducingWindowBuffer.java  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/405d2223/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
index fa44f9d..f6c2319 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java
@@ -63,7 +63,7 @@ public class FoldingWindowBuffer implements 
WindowBuffer {
 
@Override
public void storeElement(StreamRecord element) throws Exception {
-   data.replace(foldFunction.fold(data.getValue(), 
element.getValue()), element.getTimestamp());
+   data.replace(foldFunction.fold(data.getValue(), 
element.getValue()));
}
 
@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/405d2223/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
index 1f2b639..d3bf4b4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java
@@ -57,7 +57,7 @@ public class ReducingWindowBuffer implements 
WindowBuffer {
@Override
public void storeElement(StreamRecord element) throws Exception {
if (data == null) {
-   data = new StreamRecord<>(element.getValue(), 
element.getTimestamp());
+   data = element.copy(element.getValue());
} else {
data.replace(reduceFunction.reduce(data.getValue(), 
element.getValue()));
}



flink git commit: [FLINK-3532] [gelly] Fix artifact ID of flink-gelly-examples module

2016-02-29 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/release-1.0 a94aa9c6a -> 5b5136e95


[FLINK-3532] [gelly] Fix artifact ID of flink-gelly-examples module

The current flink-gelly-examples artifact id wrongly used an underscore to 
separate
examples from flink-gelly. This commit replaces the underscore with an hyphen.

This closes #1731.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5b5136e9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5b5136e9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5b5136e9

Branch: refs/heads/release-1.0
Commit: 5b5136e9585a553485bc13abdfe3b8a6bd1805fc
Parents: a94aa9c
Author: Till Rohrmann 
Authored: Mon Feb 29 12:01:10 2016 +0100
Committer: Till Rohrmann 
Committed: Mon Feb 29 12:18:07 2016 +0100

--
 flink-libraries/flink-gelly-examples/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/5b5136e9/flink-libraries/flink-gelly-examples/pom.xml
--
diff --git a/flink-libraries/flink-gelly-examples/pom.xml 
b/flink-libraries/flink-gelly-examples/pom.xml
index 2b84cc1..96492ae 100644
--- a/flink-libraries/flink-gelly-examples/pom.xml
+++ b/flink-libraries/flink-gelly-examples/pom.xml
@@ -27,7 +27,7 @@
..

 
-   flink-gelly_examples_2.10
+   flink-gelly-examples_2.10
flink-gelly-examples
jar
 



flink git commit: [FLINK-3532] [gelly] Fix artifact ID of flink-gelly-examples module

2016-02-29 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master 512c84ff6 -> f881e7079


[FLINK-3532] [gelly] Fix artifact ID of flink-gelly-examples module

The current flink-gelly-examples artifact id wrongly used an underscore to 
separate
examples from flink-gelly. This commit replaces the underscore with an hyphen.

This closes #1731.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f881e707
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f881e707
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f881e707

Branch: refs/heads/master
Commit: f881e707924ed1c5c3bac50bde7660af9d9eea74
Parents: 512c84f
Author: Till Rohrmann 
Authored: Mon Feb 29 12:01:10 2016 +0100
Committer: Till Rohrmann 
Committed: Mon Feb 29 12:16:17 2016 +0100

--
 flink-libraries/flink-gelly-examples/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f881e707/flink-libraries/flink-gelly-examples/pom.xml
--
diff --git a/flink-libraries/flink-gelly-examples/pom.xml 
b/flink-libraries/flink-gelly-examples/pom.xml
index 06340fb..ab7eacc 100644
--- a/flink-libraries/flink-gelly-examples/pom.xml
+++ b/flink-libraries/flink-gelly-examples/pom.xml
@@ -27,7 +27,7 @@
..

 
-   flink-gelly_examples_2.10
+   flink-gelly-examples_2.10
flink-gelly-examples
jar
 



flink git commit: [FLINK-3502] Add test case. Bug was resolved by a previous commit.

2016-02-29 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/tableOnCalcite 4a63e2914 -> e097cf7db


[FLINK-3502] Add test case. Bug was resolved by a previous commit.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e097cf7d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e097cf7d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e097cf7d

Branch: refs/heads/tableOnCalcite
Commit: e097cf7dbbeec6ecbd77e77842c1c869ae40de1b
Parents: 4a63e29
Author: Fabian Hueske 
Authored: Fri Feb 26 15:53:19 2016 +0100
Committer: Fabian Hueske 
Committed: Fri Feb 26 15:53:19 2016 +0100

--
 .../table/test/GroupedAggregationsITCase.scala | 17 +
 1 file changed, 17 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/e097cf7d/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
--
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
index a88abcb..062967d 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/GroupedAggregationsITCase.scala
@@ -137,4 +137,21 @@ class GroupedAggregationsITCase(mode: TestExecutionMode) 
extends MultipleProgram
 
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testGroupedByExpression2(): Unit = {
+
+// verify AggregateProjectPullUpConstantsRule
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c)
+  .select('b, 4 as 'four, 'a)
+  .groupBy('b, 'four)
+  .select('four, 'a.sum)
+
+val expected = "4,1\n" + "4,5\n" + "4,15\n" + "4,34\n" + "4,65\n" + 
"4,111\n"
+val results = t.toDataSet[Row].collect()
+
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
 }



flink git commit: Bump version to 1.1-SNAPSHOT

2016-02-29 Thread rmetzger
Repository: flink
Updated Branches:
  refs/heads/master e840bbf71 -> 512c84ff6


Bump version to 1.1-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/512c84ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/512c84ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/512c84ff

Branch: refs/heads/master
Commit: 512c84ff6ba82a095c51a60ac09ce69e2fc8bbc6
Parents: e840bbf
Author: Robert Metzger 
Authored: Fri Feb 26 14:49:53 2016 +0100
Committer: Robert Metzger 
Committed: Mon Feb 29 10:32:15 2016 +0100

--
 docs/_config.yml   | 6 +++---
 flink-annotations/pom.xml  | 2 +-
 flink-batch-connectors/flink-avro/pom.xml  | 2 +-
 flink-batch-connectors/flink-hadoop-compatibility/pom.xml  | 2 +-
 flink-batch-connectors/flink-hbase/pom.xml | 2 +-
 flink-batch-connectors/flink-hcatalog/pom.xml  | 2 +-
 flink-batch-connectors/flink-jdbc/pom.xml  | 2 +-
 flink-batch-connectors/pom.xml | 2 +-
 flink-clients/pom.xml  | 2 +-
 flink-contrib/flink-connector-wikiedits/pom.xml| 2 +-
 flink-contrib/flink-operator-stats/pom.xml | 2 +-
 flink-contrib/flink-statebackend-rocksdb/pom.xml   | 2 +-
 flink-contrib/flink-storm-examples/pom.xml | 2 +-
 flink-contrib/flink-storm/pom.xml  | 2 +-
 flink-contrib/flink-streaming-contrib/pom.xml  | 2 +-
 flink-contrib/flink-tweet-inputformat/pom.xml  | 2 +-
 flink-contrib/pom.xml  | 2 +-
 flink-core/pom.xml | 2 +-
 flink-dist/pom.xml | 2 +-
 flink-examples/flink-examples-batch/pom.xml| 2 +-
 flink-examples/flink-examples-streaming/pom.xml| 2 +-
 flink-examples/pom.xml | 2 +-
 flink-fs-tests/pom.xml | 2 +-
 flink-java/pom.xml | 2 +-
 flink-java8/pom.xml| 2 +-
 flink-libraries/flink-cep/pom.xml  | 2 +-
 flink-libraries/flink-gelly-examples/pom.xml   | 2 +-
 flink-libraries/flink-gelly-scala/pom.xml  | 2 +-
 flink-libraries/flink-gelly/pom.xml| 2 +-
 flink-libraries/flink-ml/pom.xml   | 2 +-
 flink-libraries/flink-python/pom.xml   | 2 +-
 flink-libraries/flink-table/pom.xml| 2 +-
 flink-libraries/pom.xml| 2 +-
 flink-optimizer/pom.xml| 2 +-
 flink-quickstart/flink-quickstart-java/pom.xml | 2 +-
 .../src/main/resources/archetype-resources/pom.xml | 2 +-
 flink-quickstart/flink-quickstart-scala/pom.xml| 2 +-
 .../src/main/resources/archetype-resources/pom.xml | 2 +-
 flink-quickstart/pom.xml   | 2 +-
 flink-runtime-web/pom.xml  | 2 +-
 flink-runtime/pom.xml  | 2 +-
 flink-scala-shell/pom.xml  | 2 +-
 flink-scala/pom.xml| 2 +-
 flink-shaded-curator/flink-shaded-curator-recipes/pom.xml  | 2 +-
 flink-shaded-curator/flink-shaded-curator-test/pom.xml | 2 +-
 flink-shaded-curator/pom.xml   | 2 +-
 flink-shaded-hadoop/flink-shaded-hadoop1/pom.xml   | 2 +-
 flink-shaded-hadoop/flink-shaded-hadoop2/pom.xml   | 2 +-
 flink-shaded-hadoop/flink-shaded-include-yarn-tests/pom.xml| 2 +-
 flink-shaded-hadoop/pom.xml| 2 +-
 .../flink-connector-elasticsearch/pom.xml  | 2 +-
 flink-streaming-connectors/flink-connector-filesystem/pom.xml  | 2 +-
 flink-streaming-connectors/flink-connector-flume/pom.xml   | 2 +-
 flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml   | 2 +-
 flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml   | 2 +-
 flink-streaming-connectors/flink-connector-kafka-base/pom.xml  | 2 +-
 flink-streaming-connectors/flink-connector-nifi/pom.xml| 2 +-
 flink-streaming-connectors/flink-connector-rabbitmq/pom.xml| 2 +-
 flink-streaming-connectors/flink-connector-twitter/pom.xml | 2 +-
 flink-streaming-connectors/pom.xml | 2 +-
 flink-streaming-java/pom.xml   | 

flink git commit: [FLINK-3517] [dist] Only count active PIDs in start script

2016-02-29 Thread uce
Repository: flink
Updated Branches:
  refs/heads/master 49069823b -> e840bbf71


[FLINK-3517] [dist] Only count active PIDs in start script

This closes #1716.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e840bbf7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e840bbf7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e840bbf7

Branch: refs/heads/master
Commit: e840bbf7192aa3667e7ba128adc84cfd8318ddea
Parents: 4906982
Author: Ufuk Celebi 
Authored: Fri Feb 26 00:11:48 2016 +0100
Committer: Ufuk Celebi 
Committed: Mon Feb 29 10:31:44 2016 +0100

--
 flink-dist/src/main/flink-bin/bin/flink-daemon.sh | 12 ++--
 1 file changed, 10 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/e840bbf7/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
--
diff --git a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh 
b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
index f6eab70..2388ba7 100644
--- a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
+++ b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
@@ -85,8 +85,16 @@ case $STARTSTOP in
 
 # Print a warning if daemons are already running on host
 if [ -f $pid ]; then
-count=$(wc -l $pid | awk '{print $1}')
-echo "[WARNING] $count instance(s) of $DAEMON are already running 
on $HOSTNAME."
+  active=()
+  while IFS='' read -r p || [[ -n "$p" ]]; do
+kill -0 $p >/dev/null 2>&1
+if [ $? -eq 0 ]; then
+  active+=($p)
+fi
+  done < "${pid}"
+
+  count="${#active[@]}"
+  echo "[INFO] $count instance(s) of $DAEMON are already running on 
$HOSTNAME."
 fi
 
 echo "Starting $DAEMON daemon on host $HOSTNAME."



flink git commit: [FLINK-3517] [dist] Only count active PIDs in start script

2016-02-29 Thread uce
Repository: flink
Updated Branches:
  refs/heads/release-1.0 945f25b97 -> a94aa9c6a


[FLINK-3517] [dist] Only count active PIDs in start script


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a94aa9c6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a94aa9c6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a94aa9c6

Branch: refs/heads/release-1.0
Commit: a94aa9c6a7d47d8074120b77fe051f98931c11c6
Parents: 945f25b
Author: Ufuk Celebi 
Authored: Fri Feb 26 00:11:48 2016 +0100
Committer: Ufuk Celebi 
Committed: Mon Feb 29 10:30:34 2016 +0100

--
 flink-dist/src/main/flink-bin/bin/flink-daemon.sh | 12 ++--
 1 file changed, 10 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/a94aa9c6/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
--
diff --git a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh 
b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
index f6eab70..2388ba7 100644
--- a/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
+++ b/flink-dist/src/main/flink-bin/bin/flink-daemon.sh
@@ -85,8 +85,16 @@ case $STARTSTOP in
 
 # Print a warning if daemons are already running on host
 if [ -f $pid ]; then
-count=$(wc -l $pid | awk '{print $1}')
-echo "[WARNING] $count instance(s) of $DAEMON are already running 
on $HOSTNAME."
+  active=()
+  while IFS='' read -r p || [[ -n "$p" ]]; do
+kill -0 $p >/dev/null 2>&1
+if [ $? -eq 0 ]; then
+  active+=($p)
+fi
+  done < "${pid}"
+
+  count="${#active[@]}"
+  echo "[INFO] $count instance(s) of $DAEMON are already running on 
$HOSTNAME."
 fi
 
 echo "Starting $DAEMON daemon on host $HOSTNAME."



flink git commit: [FLINK-3461] [runtime] Remove redundant check in ZooKeeperLeaderElectionService

2016-02-29 Thread uce
Repository: flink
Updated Branches:
  refs/heads/release-1.0 eb164211e -> 945f25b97


[FLINK-3461] [runtime] Remove redundant check in ZooKeeperLeaderElectionService

This closes #1700.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/945f25b9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/945f25b9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/945f25b9

Branch: refs/heads/release-1.0
Commit: 945f25b974f3773be31a108330d6cc5f7ec36235
Parents: eb16421
Author: sahitya-pavurala 
Authored: Tue Feb 23 21:08:23 2016 -0500
Committer: Ufuk Celebi 
Committed: Mon Feb 29 10:18:28 2016 +0100

--
 .../runtime/leaderelection/ZooKeeperLeaderElectionService.java | 6 +-
 1 file changed, 1 insertion(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/945f25b9/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
index 811037c..5c10293 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
@@ -148,11 +148,7 @@ public class ZooKeeperLeaderElectionService implements 
LeaderElectionService, Le
 
@Override
public boolean hasLeadership() {
-   if(leaderLatch.getState().equals(LeaderLatch.State.STARTED)) {
-   return leaderLatch.hasLeadership();
-   } else {
-   return false;
-   }
+   return leaderLatch.hasLeadership();
}
 
@Override



flink git commit: [FLINK-3461] [runtime] Fix space indentation in ZooKeeperLeaderElectionService

2016-02-29 Thread uce
Repository: flink
Updated Branches:
  refs/heads/master febcb7f11 -> 49069823b


[FLINK-3461] [runtime] Fix space indentation in ZooKeeperLeaderElectionService


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49069823
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49069823
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49069823

Branch: refs/heads/master
Commit: 49069823bacd31ac8139664707874af9cec2720d
Parents: febcb7f
Author: Ufuk Celebi 
Authored: Mon Feb 29 10:16:38 2016 +0100
Committer: Ufuk Celebi 
Committed: Mon Feb 29 10:16:38 2016 +0100

--
 .../runtime/leaderelection/ZooKeeperLeaderElectionService.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/49069823/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
index 8d39fe2..5c10293 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
@@ -148,7 +148,7 @@ public class ZooKeeperLeaderElectionService implements 
LeaderElectionService, Le
 
@Override
public boolean hasLeadership() {
-return leaderLatch.hasLeadership();
+   return leaderLatch.hasLeadership();
}
 
@Override



flink git commit: [FLINK-3461] [runtime] Remove redundant check in ZooKeeperLeaderElectionService

2016-02-29 Thread uce
Repository: flink
Updated Branches:
  refs/heads/master 9924c3afe -> febcb7f11


[FLINK-3461] [runtime] Remove redundant check in ZooKeeperLeaderElectionService

This closes #1700.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/febcb7f1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/febcb7f1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/febcb7f1

Branch: refs/heads/master
Commit: febcb7f114439cf0b99f2aa968861387d3f46deb
Parents: 9924c3a
Author: sahitya-pavurala 
Authored: Tue Feb 23 21:08:23 2016 -0500
Committer: Ufuk Celebi 
Committed: Mon Feb 29 10:08:10 2016 +0100

--
 .../runtime/leaderelection/ZooKeeperLeaderElectionService.java | 6 +-
 1 file changed, 1 insertion(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/febcb7f1/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
index 811037c..8d39fe2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.java
@@ -148,11 +148,7 @@ public class ZooKeeperLeaderElectionService implements 
LeaderElectionService, Le
 
@Override
public boolean hasLeadership() {
-   if(leaderLatch.getState().equals(LeaderLatch.State.STARTED)) {
-   return leaderLatch.hasLeadership();
-   } else {
-   return false;
-   }
+return leaderLatch.hasLeadership();
}
 
@Override