buildbot success in on flink-docs-release-0.10

2017-01-20 Thread buildbot
The Buildbot has detected a restored build on builder flink-docs-release-0.10 
while building . Full details are available at:
https://ci.apache.org/builders/flink-docs-release-0.10/builds/459

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave2_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.10' 
triggered this build
Build Source Stamp: [branch release-0.10] HEAD
Blamelist: 

Build succeeded!

Sincerely,
 -The Buildbot





[1/2] flink git commit: [FLINK-5459] [docs] Add troubleshooting guide for classloading issues

2017-01-20 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/master 6dffe282b -> 3b6267555


[FLINK-5459] [docs] Add troubleshooting guide for classloading issues


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b626755
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b626755
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b626755

Branch: refs/heads/master
Commit: 3b626755588a461c8f195a48c4777fd533bf54ec
Parents: 24ff9eb
Author: Stephan Ewen 
Authored: Fri Jan 20 18:50:21 2017 +0100
Committer: Stephan Ewen 
Committed: Fri Jan 20 18:54:30 2017 +0100

--
 docs/monitoring/debugging_classloading.md | 101 ++---
 1 file changed, 92 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/3b626755/docs/monitoring/debugging_classloading.md
--
diff --git a/docs/monitoring/debugging_classloading.md 
b/docs/monitoring/debugging_classloading.md
index e4e908e..40822c0 100644
--- a/docs/monitoring/debugging_classloading.md
+++ b/docs/monitoring/debugging_classloading.md
@@ -27,19 +27,102 @@ under the License.
 
 ## Overview of Classloading in Flink
 
-  - What is in the Application Classloader for different deployment techs
-  - What is in the user code classloader
+When running Flink applications, the JVM will load various classes over time.
+These classes can be devided into two domains:
 
-  - Access to the user code classloader for applications
+  - The **Flink Framework** domain: This includes all code in the `/lib` 
directory in the Flink directory.
+By default these are the classes of Apache Flink and its core dependencies.
 
-## Classpath Setups
+  - The **User Code** domain: These are all classes that are included in the 
JAR file submitted via the CLI or web interface.
+That includes the job's classes, and all libraries and connectors that are 
put into the uber JAR.
+
+
+The class loading behaves slightly different for various Flink setups:
+
+**Standalone**
+
+When starting a the Flink cluster, the JobManagers and TaskManagers are 
started with the Flink framework classes in the
+classpath. The classes from all jobs that are submitted against the cluster 
are loaded *dynamically*.
+
+**YARN**
+
+YARN classloading differs between single job deploymens and sessions:
+
+  - When submitting a Flink job directly to YARN (via `bin/flink run -m 
yarn-cluster ...`), dedicated TaskManagers and
+JobManagers are started for that job. Those JVMs have both Flink framework 
classes and user code classes in their classpath.
+That means that there is *no dynamic classloading* involved in that case.
+
+  - When starting a YARN session, the JobManagers and TaskManagers are started 
with the Flink framework classes in the
+classpath. The classes from all jobs that are submitted against the 
session are loaded dynamically.
+
+**Mesos**
+
+Mesos setups following [this documentation](../setup/mesos.html) currently 
behave very much like the a 
+YARN session: The TaskManager and JobManager processes are started with the 
Flink framework classes in classpath, job
+classes are loaded dynamically when the jobs are submitted.
+
+
+## Avoiding Dynamic Classloading
+
+All components (JobManger, TaskManager, Client, ApplicationMaster, ...) log 
their classpath setting on startup.
+They can be found as part of the environment information at the beginnign of 
the log.
+
+When running a setup where the Flink JobManager and TaskManagers are exclusive 
to one particular job, one can put JAR files
+directly into the `/lib` folder to make sure they are part of the classpath 
and not loaded dynamically. 
+
+It usually works to put the job's JAR file into the `/lib` directory. The JAR 
will be part of both the classpath
+(the *AppClassLoader*) and the dynamic class loader 
(*FlinkUserCodeClassLoader*).
+Because the AppClassLoader is the parent of the FlinkUserCodeClassLoader (and 
Java loads parent-first), this should
+result in classes being loaded only once.
+
+For setups where the job's JAR file cannot be put to the `/lib` folder (for 
example because the setup is a session that is
+used by multiple jobs), it may still be posible to put common libraries to the 
`/lib` folder, and avoid dynamic class loading
+for those.
+
+
+## Manual Classloading in the Job
+
+In some cases, a transformation function, source, or sink needs to manually 
load classes (dynamically via reflection).
+To do that, it needs the classloader that has access to the job's classes.
+
+In that case, the functions (or sources or sinks) can be made a `RichFunction` 
(for example `RichMapFunction` or `RichWindowFunction`)
+and access the user code class loader via 
`getRuntimeContext().getUserCodeClassLoader()`.
+
+
+## X cannot be 

[2/2] flink git commit: [FLINK-5585] [jobmanager] Fix NullPointerException in JobManager.updateAccumulators

2017-01-20 Thread sewen
[FLINK-5585] [jobmanager] Fix NullPointerException in 
JobManager.updateAccumulators


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24ff9eba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24ff9eba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24ff9eba

Branch: refs/heads/master
Commit: 24ff9ebac926dda13c029c79c44c40580a5a1a2f
Parents: 6dffe28
Author: Stephan Ewen 
Authored: Fri Jan 20 11:12:12 2017 +0100
Committer: Stephan Ewen 
Committed: Fri Jan 20 18:54:30 2017 +0100

--
 .../flink/runtime/jobmanager/JobManager.scala   | 23 ++--
 1 file changed, 12 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/24ff9eba/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
--
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4938cfc..d175c46 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1849,18 +1849,19 @@ class JobManager(
*
* @param accumulators list of accumulator snapshots
*/
-  private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]) = {
-accumulators foreach {
-  case accumulatorEvent =>
-currentJobs.get(accumulatorEvent.getJobID) match {
-  case Some((jobGraph, jobInfo)) =>
-future {
-  jobGraph.updateAccumulators(accumulatorEvent)
-}(context.dispatcher)
-  case None =>
-  // ignore accumulator values for old job
+  private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]): 
Unit = {
+accumulators.foreach( snapshot => {
+if (snapshot != null) {
+  currentJobs.get(snapshot.getJobID) match {
+case Some((jobGraph, jobInfo)) =>
+  future {
+jobGraph.updateAccumulators(snapshot)
+  }(context.dispatcher)
+case None =>
+  // ignore accumulator values for old job
+  }
 }
-}
+})
   }
 
   /**



flink git commit: [FLINK-5459] [docs] Add troubleshooting guide for classloading issues

2017-01-20 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/release-1.2 8c5edb2fc -> 289932cdb


[FLINK-5459] [docs] Add troubleshooting guide for classloading issues


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/289932cd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/289932cd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/289932cd

Branch: refs/heads/release-1.2
Commit: 289932cdbea695a529bbd618fe1764a41a06f3fb
Parents: 8c5edb2
Author: Stephan Ewen 
Authored: Fri Jan 20 18:50:21 2017 +0100
Committer: Stephan Ewen 
Committed: Fri Jan 20 19:00:07 2017 +0100

--
 docs/monitoring/debugging_classloading.md | 101 ++---
 1 file changed, 92 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/289932cd/docs/monitoring/debugging_classloading.md
--
diff --git a/docs/monitoring/debugging_classloading.md 
b/docs/monitoring/debugging_classloading.md
index e4e908e..40822c0 100644
--- a/docs/monitoring/debugging_classloading.md
+++ b/docs/monitoring/debugging_classloading.md
@@ -27,19 +27,102 @@ under the License.
 
 ## Overview of Classloading in Flink
 
-  - What is in the Application Classloader for different deployment techs
-  - What is in the user code classloader
+When running Flink applications, the JVM will load various classes over time.
+These classes can be devided into two domains:
 
-  - Access to the user code classloader for applications
+  - The **Flink Framework** domain: This includes all code in the `/lib` 
directory in the Flink directory.
+By default these are the classes of Apache Flink and its core dependencies.
 
-## Classpath Setups
+  - The **User Code** domain: These are all classes that are included in the 
JAR file submitted via the CLI or web interface.
+That includes the job's classes, and all libraries and connectors that are 
put into the uber JAR.
+
+
+The class loading behaves slightly different for various Flink setups:
+
+**Standalone**
+
+When starting a the Flink cluster, the JobManagers and TaskManagers are 
started with the Flink framework classes in the
+classpath. The classes from all jobs that are submitted against the cluster 
are loaded *dynamically*.
+
+**YARN**
+
+YARN classloading differs between single job deploymens and sessions:
+
+  - When submitting a Flink job directly to YARN (via `bin/flink run -m 
yarn-cluster ...`), dedicated TaskManagers and
+JobManagers are started for that job. Those JVMs have both Flink framework 
classes and user code classes in their classpath.
+That means that there is *no dynamic classloading* involved in that case.
+
+  - When starting a YARN session, the JobManagers and TaskManagers are started 
with the Flink framework classes in the
+classpath. The classes from all jobs that are submitted against the 
session are loaded dynamically.
+
+**Mesos**
+
+Mesos setups following [this documentation](../setup/mesos.html) currently 
behave very much like the a 
+YARN session: The TaskManager and JobManager processes are started with the 
Flink framework classes in classpath, job
+classes are loaded dynamically when the jobs are submitted.
+
+
+## Avoiding Dynamic Classloading
+
+All components (JobManger, TaskManager, Client, ApplicationMaster, ...) log 
their classpath setting on startup.
+They can be found as part of the environment information at the beginnign of 
the log.
+
+When running a setup where the Flink JobManager and TaskManagers are exclusive 
to one particular job, one can put JAR files
+directly into the `/lib` folder to make sure they are part of the classpath 
and not loaded dynamically. 
+
+It usually works to put the job's JAR file into the `/lib` directory. The JAR 
will be part of both the classpath
+(the *AppClassLoader*) and the dynamic class loader 
(*FlinkUserCodeClassLoader*).
+Because the AppClassLoader is the parent of the FlinkUserCodeClassLoader (and 
Java loads parent-first), this should
+result in classes being loaded only once.
+
+For setups where the job's JAR file cannot be put to the `/lib` folder (for 
example because the setup is a session that is
+used by multiple jobs), it may still be posible to put common libraries to the 
`/lib` folder, and avoid dynamic class loading
+for those.
+
+
+## Manual Classloading in the Job
+
+In some cases, a transformation function, source, or sink needs to manually 
load classes (dynamically via reflection).
+To do that, it needs the classloader that has access to the job's classes.
+
+In that case, the functions (or sources or sinks) can be made a `RichFunction` 
(for example `RichMapFunction` or `RichWindowFunction`)
+and access the user code class loader via 
`getRuntimeContext().getUserCodeClassLoader()`.
+
+
+## X 

flink git commit: [FLINK-2662] [optimizer] Fix translation of broadcasted unions.

2017-01-20 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/release-1.1 f6f1c244c -> 6566b63aa


[FLINK-2662] [optimizer] Fix translation of broadcasted unions.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6566b63a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6566b63a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6566b63a

Branch: refs/heads/release-1.1
Commit: 6566b63aa50af946ecf0d028c4e284fc6dc2a55b
Parents: f6f1c24
Author: Fabian Hueske 
Authored: Fri Jan 6 00:00:30 2017 +0100
Committer: Fabian Hueske 
Committed: Fri Jan 20 17:35:44 2017 +0100

--
 .../flink/optimizer/dag/BinaryUnionNode.java| 35 ++
 .../operators/BinaryUnionOpDescriptor.java  |  4 ++
 .../flink/optimizer/UnionReplacementTest.java   | 72 ++--
 3 files changed, 106 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6566b63a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
--
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
index d262cf6..cb496a2 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
@@ -104,6 +104,8 @@ public class BinaryUnionNode extends TwoInputNode {
throw new CompilerException("BinaryUnionNode has more 
than one successor.");
}
 
+   boolean childrenSkippedDueToReplicatedInput = false;
+
// check if we have a cached version
if (this.cachedPlans != null) {
return this.cachedPlans;
@@ -143,7 +145,30 @@ public class BinaryUnionNode extends TwoInputNode {

// create all candidates
for (PlanNode child1 : subPlans1) {
+
+   if (child1.getGlobalProperties().isFullyReplicated()) {
+   // fully replicated input is always locally 
forwarded if parallelism is not changed
+   if (dopChange1) {
+   // can not continue with this child
+   childrenSkippedDueToReplicatedInput = 
true;
+   continue;
+   } else {
+   
this.input1.setShipStrategy(ShipStrategyType.FORWARD);
+   }
+   }
+
for (PlanNode child2 : subPlans2) {
+
+   if 
(child2.getGlobalProperties().isFullyReplicated()) {
+   // fully replicated input is always 
locally forwarded if parallelism is not changed
+   if (dopChange2) {
+   // can not continue with this 
child
+   
childrenSkippedDueToReplicatedInput = true;
+   continue;
+   } else {
+   
this.input2.setShipStrategy(ShipStrategyType.FORWARD);
+   }
+   }

// check that the children go together. that is 
the case if they build upon the same
// candidate at the joined branch plan. 
@@ -249,6 +274,16 @@ public class BinaryUnionNode extends TwoInputNode {
}
}
 
+   if(outputPlans.isEmpty()) {
+   if(childrenSkippedDueToReplicatedInput) {
+   throw new CompilerException("No plan meeting 
the requirements could be created @ " + this
+   + ". Most likely reason: Invalid use of 
replicated input.");
+   } else {
+   throw new CompilerException("No plan meeting 
the requirements could be created @ " + this
+   + ". Most likely reason: Too 
restrictive plan hints.");
+   }
+   }
+
// cost and prune the plans
for (PlanNode node : outputPlans) {
estimator.costOperator(node);

http://git-wip-us.apache.org/repos/asf/flink/blob/6566b63a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java
-

flink git commit: [FLINK-2662] [optimizer] Fix translation of broadcasted unions.

2017-01-20 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/release-1.2 03a1f25fa -> 8c5edb2fc


[FLINK-2662] [optimizer] Fix translation of broadcasted unions.

This closes #3083.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c5edb2f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c5edb2f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c5edb2f

Branch: refs/heads/release-1.2
Commit: 8c5edb2fc8e905b881401815a39941ce2aab3bbd
Parents: 03a1f25
Author: Fabian Hueske 
Authored: Fri Jan 6 00:00:30 2017 +0100
Committer: Fabian Hueske 
Committed: Fri Jan 20 16:59:42 2017 +0100

--
 .../flink/optimizer/dag/BinaryUnionNode.java| 35 ++
 .../operators/BinaryUnionOpDescriptor.java  |  4 ++
 .../flink/optimizer/UnionReplacementTest.java   | 72 ++--
 3 files changed, 106 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8c5edb2f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
--
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
index d262cf6..cb496a2 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
@@ -104,6 +104,8 @@ public class BinaryUnionNode extends TwoInputNode {
throw new CompilerException("BinaryUnionNode has more 
than one successor.");
}
 
+   boolean childrenSkippedDueToReplicatedInput = false;
+
// check if we have a cached version
if (this.cachedPlans != null) {
return this.cachedPlans;
@@ -143,7 +145,30 @@ public class BinaryUnionNode extends TwoInputNode {

// create all candidates
for (PlanNode child1 : subPlans1) {
+
+   if (child1.getGlobalProperties().isFullyReplicated()) {
+   // fully replicated input is always locally 
forwarded if parallelism is not changed
+   if (dopChange1) {
+   // can not continue with this child
+   childrenSkippedDueToReplicatedInput = 
true;
+   continue;
+   } else {
+   
this.input1.setShipStrategy(ShipStrategyType.FORWARD);
+   }
+   }
+
for (PlanNode child2 : subPlans2) {
+
+   if 
(child2.getGlobalProperties().isFullyReplicated()) {
+   // fully replicated input is always 
locally forwarded if parallelism is not changed
+   if (dopChange2) {
+   // can not continue with this 
child
+   
childrenSkippedDueToReplicatedInput = true;
+   continue;
+   } else {
+   
this.input2.setShipStrategy(ShipStrategyType.FORWARD);
+   }
+   }

// check that the children go together. that is 
the case if they build upon the same
// candidate at the joined branch plan. 
@@ -249,6 +274,16 @@ public class BinaryUnionNode extends TwoInputNode {
}
}
 
+   if(outputPlans.isEmpty()) {
+   if(childrenSkippedDueToReplicatedInput) {
+   throw new CompilerException("No plan meeting 
the requirements could be created @ " + this
+   + ". Most likely reason: Invalid use of 
replicated input.");
+   } else {
+   throw new CompilerException("No plan meeting 
the requirements could be created @ " + this
+   + ". Most likely reason: Too 
restrictive plan hints.");
+   }
+   }
+
// cost and prune the plans
for (PlanNode node : outputPlans) {
estimator.costOperator(node);

http://git-wip-us.apache.org/repos/asf/flink/blob/8c5edb2f/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java
-

flink git commit: [FLINK-2662] [optimizer] Fix translation of broadcasted unions.

2017-01-20 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/master 8d6426320 -> 6dffe282b


[FLINK-2662] [optimizer] Fix translation of broadcasted unions.

This closes #3083.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6dffe282
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6dffe282
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6dffe282

Branch: refs/heads/master
Commit: 6dffe282bc5459b9c099df9a77cc2c3f26b28ae5
Parents: 8d64263
Author: Fabian Hueske 
Authored: Fri Jan 6 00:00:30 2017 +0100
Committer: Fabian Hueske 
Committed: Fri Jan 20 16:58:24 2017 +0100

--
 .../flink/optimizer/dag/BinaryUnionNode.java| 35 ++
 .../operators/BinaryUnionOpDescriptor.java  |  4 ++
 .../flink/optimizer/UnionReplacementTest.java   | 72 ++--
 3 files changed, 106 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6dffe282/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
--
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
index d262cf6..cb496a2 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java
@@ -104,6 +104,8 @@ public class BinaryUnionNode extends TwoInputNode {
throw new CompilerException("BinaryUnionNode has more 
than one successor.");
}
 
+   boolean childrenSkippedDueToReplicatedInput = false;
+
// check if we have a cached version
if (this.cachedPlans != null) {
return this.cachedPlans;
@@ -143,7 +145,30 @@ public class BinaryUnionNode extends TwoInputNode {

// create all candidates
for (PlanNode child1 : subPlans1) {
+
+   if (child1.getGlobalProperties().isFullyReplicated()) {
+   // fully replicated input is always locally 
forwarded if parallelism is not changed
+   if (dopChange1) {
+   // can not continue with this child
+   childrenSkippedDueToReplicatedInput = 
true;
+   continue;
+   } else {
+   
this.input1.setShipStrategy(ShipStrategyType.FORWARD);
+   }
+   }
+
for (PlanNode child2 : subPlans2) {
+
+   if 
(child2.getGlobalProperties().isFullyReplicated()) {
+   // fully replicated input is always 
locally forwarded if parallelism is not changed
+   if (dopChange2) {
+   // can not continue with this 
child
+   
childrenSkippedDueToReplicatedInput = true;
+   continue;
+   } else {
+   
this.input2.setShipStrategy(ShipStrategyType.FORWARD);
+   }
+   }

// check that the children go together. that is 
the case if they build upon the same
// candidate at the joined branch plan. 
@@ -249,6 +274,16 @@ public class BinaryUnionNode extends TwoInputNode {
}
}
 
+   if(outputPlans.isEmpty()) {
+   if(childrenSkippedDueToReplicatedInput) {
+   throw new CompilerException("No plan meeting 
the requirements could be created @ " + this
+   + ". Most likely reason: Invalid use of 
replicated input.");
+   } else {
+   throw new CompilerException("No plan meeting 
the requirements could be created @ " + this
+   + ". Most likely reason: Too 
restrictive plan hints.");
+   }
+   }
+
// cost and prune the plans
for (PlanNode node : outputPlans) {
estimator.costOperator(node);

http://git-wip-us.apache.org/repos/asf/flink/blob/6dffe282/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java
---

flink git commit: [FLINK-5528] [tests] Reduce query retry delay in QueryableStateITCase

2017-01-20 Thread uce
Repository: flink
Updated Branches:
  refs/heads/release-1.2 694927634 -> 03a1f25fa


[FLINK-5528] [tests] Reduce query retry delay in QueryableStateITCase

Using 100ms instead of the 1s previously used does not impose too much
additional query load and reduces the test suite's duration from 16-20s to
13-15s on my machine with the current set of unit tests. Further reductions
in the retry delay do not yield more improvements so far.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/03a1f25f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/03a1f25f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/03a1f25f

Branch: refs/heads/release-1.2
Commit: 03a1f25fa63bda30aa8c83751a2c75fb44dc98e8
Parents: 6949276
Author: Nico Kruber 
Authored: Tue Jan 17 15:01:32 2017 +0100
Committer: Ufuk Celebi 
Committed: Fri Jan 20 16:53:46 2017 +0100

--
 .../flink/test/query/QueryableStateITCase.java| 18 +++---
 1 file changed, 7 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/03a1f25f/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index 113f5c6..327a715 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -94,6 +94,7 @@ import static org.junit.Assert.fail;
 public class QueryableStateITCase extends TestLogger {
 
private final static FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(100, TimeUnit.SECONDS);
+   private final static FiniteDuration QUERY_RETRY_DELAY = new 
FiniteDuration(100, TimeUnit.MILLISECONDS);
 
private final static ActorSystem TEST_ACTOR_SYSTEM = 
AkkaUtils.createDefaultActorSystem();
 
@@ -200,8 +201,6 @@ public class QueryableStateITCase extends TestLogger {
 
final AtomicLongArray counts = new 
AtomicLongArray(numKeys);
 
-   final FiniteDuration retryDelay = new FiniteDuration(1, 
TimeUnit.SECONDS);
-
boolean allNonZero = false;
while (!allNonZero && deadline.hasTimeLeft()) {
allNonZero = true;
@@ -230,7 +229,7 @@ public class QueryableStateITCase extends TestLogger {
queryName,
key,
serializedKey,
-   retryDelay);
+   QUERY_RETRY_DELAY);
 
serializedResult.onSuccess(new 
OnSuccess() {
@Override
@@ -347,14 +346,13 @@ public class QueryableStateITCase extends TestLogger {
 
boolean success = false;
while (!success && deadline.hasTimeLeft()) {
-   final FiniteDuration retryDelay = new 
FiniteDuration(1, TimeUnit.SECONDS);
Future serializedResultFuture = 
getKvStateWithRetries(
client,
jobId,
queryName,
key,
serializedKey,
-   retryDelay);
+   QUERY_RETRY_DELAY);
 
byte[] serializedResult = 
Await.result(serializedResultFuture, deadline.timeLeft());
 
@@ -451,14 +449,13 @@ public class QueryableStateITCase extends TestLogger {
// Now start another task manager
cluster.addTaskManager();
 
-   final FiniteDuration retryDelay = new FiniteDuration(1, 
TimeUnit.SECONDS);
Future serializedResultFuture = 
getKvStateWithRetries(
client,
jobId,
queryName,
key,
serializedKey,
-   retryDelay);
+   QUERY_RETRY_DELAY);
 
byte[] serializedResult = 
Await.result(serializedResultFuture, deadline.timeLeft());
 
@@ -

flink git commit: [FLINK-5528] [tests] Reduce query retry delay in QueryableStateITCase

2017-01-20 Thread uce
Repository: flink
Updated Branches:
  refs/heads/master f266e8255 -> 8d6426320


[FLINK-5528] [tests] Reduce query retry delay in QueryableStateITCase

Using 100ms instead of the 1s previously used does not impose too much
additional query load and reduces the test suite's duration from 16-20s to
13-15s on my machine with the current set of unit tests. Further reductions
in the retry delay do not yield more improvements so far.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d642632
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d642632
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d642632

Branch: refs/heads/master
Commit: 8d64263203b4af7f3fbbe2a30ef67ddf67cc45a5
Parents: f266e82
Author: Nico Kruber 
Authored: Tue Jan 17 15:01:32 2017 +0100
Committer: Ufuk Celebi 
Committed: Fri Jan 20 16:44:19 2017 +0100

--
 .../flink/test/query/QueryableStateITCase.java| 18 +++---
 1 file changed, 7 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8d642632/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index 113f5c6..327a715 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -94,6 +94,7 @@ import static org.junit.Assert.fail;
 public class QueryableStateITCase extends TestLogger {
 
private final static FiniteDuration TEST_TIMEOUT = new 
FiniteDuration(100, TimeUnit.SECONDS);
+   private final static FiniteDuration QUERY_RETRY_DELAY = new 
FiniteDuration(100, TimeUnit.MILLISECONDS);
 
private final static ActorSystem TEST_ACTOR_SYSTEM = 
AkkaUtils.createDefaultActorSystem();
 
@@ -200,8 +201,6 @@ public class QueryableStateITCase extends TestLogger {
 
final AtomicLongArray counts = new 
AtomicLongArray(numKeys);
 
-   final FiniteDuration retryDelay = new FiniteDuration(1, 
TimeUnit.SECONDS);
-
boolean allNonZero = false;
while (!allNonZero && deadline.hasTimeLeft()) {
allNonZero = true;
@@ -230,7 +229,7 @@ public class QueryableStateITCase extends TestLogger {
queryName,
key,
serializedKey,
-   retryDelay);
+   QUERY_RETRY_DELAY);
 
serializedResult.onSuccess(new 
OnSuccess() {
@Override
@@ -347,14 +346,13 @@ public class QueryableStateITCase extends TestLogger {
 
boolean success = false;
while (!success && deadline.hasTimeLeft()) {
-   final FiniteDuration retryDelay = new 
FiniteDuration(1, TimeUnit.SECONDS);
Future serializedResultFuture = 
getKvStateWithRetries(
client,
jobId,
queryName,
key,
serializedKey,
-   retryDelay);
+   QUERY_RETRY_DELAY);
 
byte[] serializedResult = 
Await.result(serializedResultFuture, deadline.timeLeft());
 
@@ -451,14 +449,13 @@ public class QueryableStateITCase extends TestLogger {
// Now start another task manager
cluster.addTaskManager();
 
-   final FiniteDuration retryDelay = new FiniteDuration(1, 
TimeUnit.SECONDS);
Future serializedResultFuture = 
getKvStateWithRetries(
client,
jobId,
queryName,
key,
serializedKey,
-   retryDelay);
+   QUERY_RETRY_DELAY);
 
byte[] serializedResult = 
Await.result(serializedResultFuture, deadline.timeLeft());
 
@@ -719,7 +716

flink git commit: [FLINK-5368] [kafka] Log msg if kafka topic doesn't have any partitions

2017-01-20 Thread tzulitai
Repository: flink
Updated Branches:
  refs/heads/release-1.2 5cbaf796d -> 694927634


[FLINK-5368] [kafka] Log msg if kafka topic doesn't have any partitions

This closes #3036.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/69492763
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/69492763
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/69492763

Branch: refs/heads/release-1.2
Commit: 69492763444a9cc5efb1e26e2864abce71787211
Parents: 5cbaf79
Author: HungUnicorn 
Authored: Mon Jan 9 16:48:24 2017 +0100
Committer: Tzu-Li (Gordon) Tai 
Committed: Fri Jan 20 16:51:54 2017 +0100

--
 .../flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java| 3 +++
 1 file changed, 3 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/69492763/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
--
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index 29bb8e4..2b816c4 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -208,6 +208,9 @@ public class FlinkKafkaConsumer09 extends 
FlinkKafkaConsumerBase {
if (partitionsForTopic != null) {

partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic));
}
+   else{
+   LOG.info("Unable to retrieve any 
partitions for the requested topic: {}", topic);
+   }
}
}
 



flink git commit: [FLINK-5580] [security] Fix path setting of shipped Kerberos keytabs in YARN mode

2017-01-20 Thread tzulitai
Repository: flink
Updated Branches:
  refs/heads/release-1.2 3b5882afa -> 5cbaf796d


[FLINK-5580] [security] Fix path setting of shipped Kerberos keytabs in YARN 
mode

This closes #3177.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5cbaf796
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5cbaf796
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5cbaf796

Branch: refs/heads/release-1.2
Commit: 5cbaf796d2e40db26ccdcfc458f5f1baf0230bb6
Parents: 3b5882a
Author: Tzu-Li (Gordon) Tai 
Authored: Fri Jan 20 01:41:05 2017 +0100
Committer: Tzu-Li (Gordon) Tai 
Committed: Fri Jan 20 16:50:55 2017 +0100

--
 .../flink/yarn/YarnApplicationMasterRunner.java|  4 +++-
 .../apache/flink/yarn/YarnTaskManagerRunner.java   | 17 +
 2 files changed, 12 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/5cbaf796/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
--
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index e4027d4..ad9bc10 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -169,7 +169,9 @@ public class YarnApplicationMasterRunner {
LOG.debug("YARN dynamic properties: {}", 
dynamicProperties);
 
final Configuration flinkConfig = 
createConfiguration(currDir, dynamicProperties);
-   if(keytabPath != null && remoteKeytabPrincipal != null) 
{
+
+   // set keytab principal and replace path with the local 
path of the shipped keytab file in NodeManager
+   if (keytabPath != null && remoteKeytabPrincipal != 
null) {

flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);

flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, 
remoteKeytabPrincipal);
}

http://git-wip-us.apache.org/repos/asf/flink/blob/5cbaf796/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
--
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index 059f1aa..e41869a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -93,11 +93,11 @@ public class YarnTaskManagerRunner {
// tell akka to die in case of an error

configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
 
-   String keytabPath = null;
+   String localKeytabPath = null;
if(remoteKeytabPath != null) {
File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
-   keytabPath = f.getAbsolutePath();
-   LOG.info("keytabPath: {}", keytabPath);
+   localKeytabPath = f.getAbsolutePath();
+   LOG.info("localKeytabPath: {}", localKeytabPath);
}
 
UserGroupInformation currentUser = 
UserGroupInformation.getCurrentUser();
@@ -124,6 +124,12 @@ public class YarnTaskManagerRunner {

hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
 "true");
}
 
+   // set keytab principal and replace path with the local 
path of the shipped keytab file in NodeManager
+   if (localKeytabPath != null && remoteKeytabPrincipal != 
null) {
+   
configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, localKeytabPath);
+   
configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, 
remoteKeytabPrincipal);
+   }
+
SecurityUtils.SecurityConfiguration sc;
if(hadoopConfiguration != null) {
sc = new 
SecurityUtils.SecurityConfiguration(configuration, hadoopConfiguration);
@@ -131,11 +137,6 @@ public class YarnTaskManagerRunner {
sc = new 
SecurityUtils.SecurityConfiguration(configuration);
}
 
-   if(keytabPath != null && remoteKeytabPrincipal != null) 
{
-  

flink git commit: [FLINK-5355] [kinesis] Handle AmazonKinesisException gracefully in Kinesis Streaming Connector

2017-01-20 Thread tzulitai
Repository: flink
Updated Branches:
  refs/heads/release-1.2 be09143cd -> 3b5882afa


[FLINK-5355] [kinesis] Handle AmazonKinesisException gracefully in Kinesis 
Streaming Connector

This closes #3078.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b5882af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b5882af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b5882af

Branch: refs/heads/release-1.2
Commit: 3b5882afa0c5e60cfb39c2b098fcae2b112a5990
Parents: be09143c
Author: Scott Kidder 
Authored: Fri Dec 16 08:46:54 2016 -0800
Committer: Tzu-Li (Gordon) Tai 
Committed: Fri Jan 20 16:48:53 2017 +0100

--
 .../connectors/kinesis/proxy/KinesisProxy.java  | 58 ++
 .../kinesis/proxy/KinesisProxyTest.java | 63 
 2 files changed, 108 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/3b5882af/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
--
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 9ffc8e6..0b0fccf 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -17,14 +17,15 @@
 
 package org.apache.flink.streaming.connectors.kinesis.proxy;
 
+import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
 import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
 import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.GetRecordsRequest;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
-import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
 import com.amazonaws.services.kinesis.model.LimitExceededException;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
 import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
 import com.amazonaws.services.kinesis.model.StreamStatus;
 import com.amazonaws.services.kinesis.model.Shard;
@@ -193,12 +194,16 @@ public class KinesisProxy implements 
KinesisProxyInterface {
while (attempt <= getRecordsMaxAttempts && getRecordsResult == 
null) {
try {
getRecordsResult = 
kinesisClient.getRecords(getRecordsRequest);
-   } catch (ProvisionedThroughputExceededException ex) {
-   long backoffMillis = fullJitterBackoff(
-   getRecordsBaseBackoffMillis, 
getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
-   LOG.warn("Got 
ProvisionedThroughputExceededException. Backing off for "
-   + backoffMillis + " millis.");
-   Thread.sleep(backoffMillis);
+   } catch (AmazonServiceException ex) {
+   if (isRecoverableException(ex)) {
+   long backoffMillis = fullJitterBackoff(
+   getRecordsBaseBackoffMillis, 
getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
+   LOG.warn("Got recoverable 
AmazonServiceException. Backing off for "
+   + backoffMillis + " millis (" + 
ex.getErrorMessage() + ")");
+   Thread.sleep(backoffMillis);
+   } else {
+   throw ex;
+   }
}
}
 
@@ -237,12 +242,16 @@ public class KinesisProxy implements 
KinesisProxyInterface {
try {
getShardIteratorResult =

kinesisClient.getShardIterator(shard.getStreamName(), 
shard.getShard().getShardId(), shardIteratorType, startingSeqNum);
-   } catch (ProvisionedThroughputExceededException ex) {
-   long backoffMillis = fullJitterBackoff(
-   getShardIteratorBaseBackoffMillis, 
getS

flink git commit: [FLINK-5386] [table] Refactor window clause.

2017-01-20 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/release-1.2 0a24611a8 -> be09143cd


[FLINK-5386] [table] Refactor window clause.

- move window() before groupBy()
- make window alias mandatory
- groupBy() must include window alias


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be09143c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be09143c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be09143c

Branch: refs/heads/release-1.2
Commit: be09143cd323d478811447b10807ae7f7d6a4b7b
Parents: 0a24611
Author: Jincheng Sun 
Authored: Thu Dec 15 10:31:33 2016 +0800
Committer: Fabian Hueske 
Committed: Fri Jan 20 16:40:21 2017 +0100

--
 docs/dev/table_api.md   |  71 ++---
 .../org/apache/flink/table/api/table.scala  | 100 ---
 .../org/apache/flink/table/api/windows.scala| 175 
 .../flink/table/plan/logical/groupWindows.scala |  69 +++--
 .../scala/batch/table/FieldProjectionTest.scala |  14 +-
 .../scala/stream/table/AggregationsITCase.scala |  15 +-
 .../scala/stream/table/GroupWindowTest.scala| 277 ++-
 7 files changed, 429 insertions(+), 292 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/be09143c/docs/dev/table_api.md
--
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 0d7331b..80b61f9 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1006,69 +1006,74 @@ Temporal intervals can be represented as number of 
months (`Types.INTERVAL_MONTH
 
 ### Windows
 
-The Table API is a declarative API to define queries on batch and streaming 
tables. Projection, selection, and union operations can be applied both on 
streaming and batch tables without additional semantics. Aggregations on 
(possibly) infinite streaming tables, however, can only be computed on finite 
groups of records. Group-window aggregates group rows into finite groups based 
on time or row-count intervals and evaluate aggregation functions once per 
group. For batch tables, group-windows are a convenient shortcut to group 
records by time intervals.
+The Table API is a declarative API to define queries on batch and streaming 
tables. Projection, selection, and union operations can be applied both on 
streaming and batch tables without additional semantics. Aggregations on 
(possibly) infinite streaming tables, however, can only be computed on finite 
groups of records. Window aggregates group rows into finite groups based on 
time or row-count intervals and evaluate aggregation functions once per group. 
 
-Group-windows are defined using the `window(w: GroupWindow)` clause. The 
following example shows how to define a group-window aggregation on a table.
+**Note:** Windows are currently only supported for streaming tables. Support 
for batch tables will be added in the next release.
+
+Windows are defined using the `window(w: Window)` clause and require an alias, 
which is specified using the `as` clause. In order to group a table by a 
window, the window alias must be referenced in the `groupBy(...)` clause like a 
regular grouping attribute. 
+The following example shows how to define a window aggregation on a table.
 
 
 
 {% highlight java %}
 Table table = input
-  .window(GroupWindow w)  // define window
-  .select("b.sum")// aggregate
+  .window([Window w].as("w"))  // define window with alias w
+  .groupBy("w")  // group the table by window w
+  .select("b.sum")  // aggregate
 {% endhighlight %}
 
 
 
 {% highlight scala %}
 val table = input
-  .window(w: GroupWindow) // define window
-  .select('b.sum) // aggregate
+  .window([w: Window] as 'w)  // define window with alias w
+  .groupBy('w)   // group the table by window w
+  .select('b.sum)  // aggregate
 {% endhighlight %}
 
 
 
-In streaming environments, group-window aggregates can only be computed in 
parallel, if they are *keyed*, i.e., there is an additional `groupBy` 
attribute. Group-window aggregates without additional `groupBy`, such as in the 
example above, can only be evaluated in a single, non-parallel task. The 
following example shows how to define a keyed group-window aggregation on a 
table. 
+In streaming environments, window aggregates can only be computed in parallel 
if they group on one or more attributes in addition to the window, i.e., the 
`groupBy(...)` clause references a window alias and at least one additional 
attribute. A `groupBy(...)` clause that only references a window alias (such as 
in the example above) can only be evaluated by a single, non-parallel task. 
+The following example shows how to define a window aggregation with additional 
grouping attributes.
 
 
 
 {% highlight java %}
 Table table = input
-  .groupBy("a")
-  .windo

[2/3] flink git commit: [FLINK-5580] [security] Fix path setting of shipped Kerberos keytabs in YARN mode

2017-01-20 Thread tzulitai
[FLINK-5580] [security] Fix path setting of shipped Kerberos keytabs in YARN 
mode

This closes #3177.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/640a149e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/640a149e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/640a149e

Branch: refs/heads/master
Commit: 640a149ea69c0fc2314c6d5b422500c6c9587f43
Parents: b380bd3
Author: Tzu-Li (Gordon) Tai 
Authored: Fri Jan 20 01:41:05 2017 +0100
Committer: Tzu-Li (Gordon) Tai 
Committed: Fri Jan 20 16:30:52 2017 +0100

--
 .../flink/yarn/YarnApplicationMasterRunner.java|  2 ++
 .../apache/flink/yarn/YarnTaskManagerRunner.java   | 17 +
 2 files changed, 11 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/640a149e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
--
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 71be589..2193174 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -169,6 +169,8 @@ public class YarnApplicationMasterRunner {
LOG.debug("YARN dynamic properties: {}", 
dynamicProperties);
 
final Configuration flinkConfig = 
createConfiguration(currDir, dynamicProperties);
+
+   // set keytab principal and replace path with the local 
path of the shipped keytab file in NodeManager
if (keytabPath != null && remoteKeytabPrincipal != 
null) {

flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);

flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, 
remoteKeytabPrincipal);

http://git-wip-us.apache.org/repos/asf/flink/blob/640a149e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
--
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index 4a780e0..849a8a6 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -93,11 +93,11 @@ public class YarnTaskManagerRunner {
// tell akka to die in case of an error

configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
 
-   String keytabPath = null;
+   String localKeytabPath = null;
if(remoteKeytabPath != null) {
File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
-   keytabPath = f.getAbsolutePath();
-   LOG.info("keytabPath: {}", keytabPath);
+   localKeytabPath = f.getAbsolutePath();
+   LOG.info("localKeytabPath: {}", localKeytabPath);
}
 
UserGroupInformation currentUser = 
UserGroupInformation.getCurrentUser();
@@ -124,6 +124,12 @@ public class YarnTaskManagerRunner {

hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
 "true");
}
 
+   // set keytab principal and replace path with the local 
path of the shipped keytab file in NodeManager
+   if (localKeytabPath != null && remoteKeytabPrincipal != 
null) {
+   
configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, localKeytabPath);
+   
configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, 
remoteKeytabPrincipal);
+   }
+
SecurityUtils.SecurityConfiguration sc;
if (hadoopConfiguration != null) {
sc = new 
SecurityUtils.SecurityConfiguration(configuration, hadoopConfiguration);
@@ -131,11 +137,6 @@ public class YarnTaskManagerRunner {
sc = new 
SecurityUtils.SecurityConfiguration(configuration);
}
 
-   if (keytabPath != null && remoteKeytabPrincipal != 
null) {
-   
configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
-   
configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, 
remoteKeytabPr

[1/3] flink git commit: [FLINK-5355] [kinesis] Handle AmazonKinesisException gracefully in Kinesis Streaming Connector

2017-01-20 Thread tzulitai
Repository: flink
Updated Branches:
  refs/heads/master 988729ec1 -> f266e8255


[FLINK-5355] [kinesis] Handle AmazonKinesisException gracefully in Kinesis 
Streaming Connector

This closes #3078.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b380bd31
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b380bd31
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b380bd31

Branch: refs/heads/master
Commit: b380bd313e036f2bc66941367f014c12bf8baa6d
Parents: 988729e
Author: Scott Kidder 
Authored: Fri Dec 16 08:46:54 2016 -0800
Committer: Tzu-Li (Gordon) Tai 
Committed: Fri Jan 20 16:29:40 2017 +0100

--
 .../connectors/kinesis/proxy/KinesisProxy.java  | 58 ++
 .../kinesis/proxy/KinesisProxyTest.java | 63 
 2 files changed, 108 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b380bd31/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
--
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
index 9ffc8e6..0b0fccf 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
@@ -17,14 +17,15 @@
 
 package org.apache.flink.streaming.connectors.kinesis.proxy;
 
+import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
 import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
 import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.GetRecordsRequest;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
-import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
 import com.amazonaws.services.kinesis.model.LimitExceededException;
+import 
com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
 import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
 import com.amazonaws.services.kinesis.model.StreamStatus;
 import com.amazonaws.services.kinesis.model.Shard;
@@ -193,12 +194,16 @@ public class KinesisProxy implements 
KinesisProxyInterface {
while (attempt <= getRecordsMaxAttempts && getRecordsResult == 
null) {
try {
getRecordsResult = 
kinesisClient.getRecords(getRecordsRequest);
-   } catch (ProvisionedThroughputExceededException ex) {
-   long backoffMillis = fullJitterBackoff(
-   getRecordsBaseBackoffMillis, 
getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
-   LOG.warn("Got 
ProvisionedThroughputExceededException. Backing off for "
-   + backoffMillis + " millis.");
-   Thread.sleep(backoffMillis);
+   } catch (AmazonServiceException ex) {
+   if (isRecoverableException(ex)) {
+   long backoffMillis = fullJitterBackoff(
+   getRecordsBaseBackoffMillis, 
getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++);
+   LOG.warn("Got recoverable 
AmazonServiceException. Backing off for "
+   + backoffMillis + " millis (" + 
ex.getErrorMessage() + ")");
+   Thread.sleep(backoffMillis);
+   } else {
+   throw ex;
+   }
}
}
 
@@ -237,12 +242,16 @@ public class KinesisProxy implements 
KinesisProxyInterface {
try {
getShardIteratorResult =

kinesisClient.getShardIterator(shard.getStreamName(), 
shard.getShard().getShardId(), shardIteratorType, startingSeqNum);
-   } catch (ProvisionedThroughputExceededException ex) {
-   long backoffMillis = fullJitterBackoff(
-   getShardIteratorBaseBackoffMillis, 
getShardIterato

[3/3] flink git commit: [FLINK-5368] [kafka] Log msg if kafka topic doesn't have any partitions

2017-01-20 Thread tzulitai
[FLINK-5368] [kafka] Log msg if kafka topic doesn't have any partitions

This closes #3036.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f266e825
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f266e825
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f266e825

Branch: refs/heads/master
Commit: f266e825557f4091094a866e6887f52ca54ff2d7
Parents: 640a149
Author: HungUnicorn 
Authored: Mon Jan 9 16:48:24 2017 +0100
Committer: Tzu-Li (Gordon) Tai 
Committed: Fri Jan 20 16:33:25 2017 +0100

--
 .../flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java| 3 +++
 1 file changed, 3 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f266e825/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
--
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index 29bb8e4..2b816c4 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -208,6 +208,9 @@ public class FlinkKafkaConsumer09 extends 
FlinkKafkaConsumerBase {
if (partitionsForTopic != null) {

partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic));
}
+   else{
+   LOG.info("Unable to retrieve any 
partitions for the requested topic: {}", topic);
+   }
}
}
 



flink git commit: [FLINK-5468] [savepoints] Improve error message for migrating semi async RocksDB snapshot

2017-01-20 Thread uce
Repository: flink
Updated Branches:
  refs/heads/master 6c4644de1 -> 988729ec1


[FLINK-5468] [savepoints] Improve error message for migrating semi async 
RocksDB snapshot

This closes #3119.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/988729ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/988729ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/988729ec

Branch: refs/heads/master
Commit: 988729ec150713c584f9cefb0b5c7f7183de5181
Parents: 6c4644d
Author: Stefan Richter 
Authored: Fri Jan 13 15:19:37 2017 +0100
Committer: Ufuk Celebi 
Committed: Fri Jan 20 16:10:09 2017 +0100

--
 .../streaming/state/RocksDBStateBackend.java| 13 +++
 .../savepoint/SavepointV0Serializer.java| 41 ++--
 2 files changed, 34 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/988729ec/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
index 509eb4c..fa1cc45 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
@@ -64,4 +64,17 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
stateHandle.close();
}
}
+
+   public static class FinalSemiAsyncSnapshot {
+
+   static {
+   throwExceptionOnLoadingThisClass();
+   }
+
+   private static void throwExceptionOnLoadingThisClass() {
+   throw new RuntimeException("Attempt to migrate RocksDB 
state created with semi async snapshot mode failed. "
+   + "Unfortunately, this is not 
supported. Please create a new savepoint for the job using fully "
+   + "async mode in Flink 1.1 and run 
migration again with the new savepoint.");
+   }
+   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/988729ec/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
index 9e37dbb..2efe786 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
@@ -106,16 +106,7 @@ public class SavepointV0Serializer implements 
SavepointSerializer {
for (int j = 0; j < numSubTaskStates; j++) {
int subtaskIndex = dis.readInt();
 
-   int length = dis.readInt();
-
-   SerializedValue> serializedValue;
-   if (length == -1) {
-   serializedValue = new 
SerializedValue<>(null);
-   } else {
-   byte[] serializedData = new 
byte[length];
-   dis.readFully(serializedData, 0, 
length);
-   serializedValue = 
SerializedValue.fromBytes(serializedData);
-   }
+   SerializedValue> serializedValue 
= readSerializedValueStateHandle(dis);
 
long stateSize = dis.readLong();
long duration = dis.readLong();
@@ -133,16 +124,7 @@ public class SavepointV0Serializer implements 
SavepointSerializer {
for (int j = 0; j < numKvStates; j++) {
int keyGroupIndex = dis.readInt();
 
-   int length = dis.readInt();
-
-   SerializedValue> serializedValue;
-   if (length == -1) {
-   serializedValue = new 
SerializedValue<>(null);
-  

flink git commit: [FLINK-5549] [core] TypeExtractor fails with RuntimeException, but should use GenericTypeInfo

2017-01-20 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master 6bf556e60 -> 2703d339a


[FLINK-5549] [core] TypeExtractor fails with RuntimeException, but should use 
GenericTypeInfo

This closes #3154.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2703d339
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2703d339
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2703d339

Branch: refs/heads/master
Commit: 2703d339abafdd9f39bc35faa1eed718501f71a7
Parents: 6bf556e
Author: twalthr 
Authored: Wed Jan 18 14:43:32 2017 +0100
Committer: twalthr 
Committed: Fri Jan 20 16:04:57 2017 +0100

--
 .../flink/api/java/typeutils/TypeExtractor.java | 21 ++--
 .../java/typeutils/PojoTypeExtractionTest.java  |  5 +++--
 2 files changed, 18 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/2703d339/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 8bf2867..a2664f9 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -893,7 +893,7 @@ public class TypeExtractor {
// build the entire type hierarchy for the pojo
getTypeHierarchy(inputTypeHierarchy, inType, 
Object.class);
// determine a field containing the type variable
-   List fields = 
getAllDeclaredFields(typeToClass(inType));
+   List fields = 
getAllDeclaredFields(typeToClass(inType), false);
for (Field field : fields) {
Type fieldType = field.getGenericType();
if (fieldType instanceof TypeVariable && 
sameTypeVars(returnTypeVar, materializeTypeVariable(inputTypeHierarchy, 
(TypeVariable) fieldType))) {
@@ -1738,7 +1738,7 @@ public class TypeExtractor {
getTypeHierarchy(typeHierarchy, clazz, Object.class);
}

-   List fields = getAllDeclaredFields(clazz);
+   List fields = getAllDeclaredFields(clazz, false);
if (fields.size() == 0) {
LOG.info("No fields detected for " + clazz + ". Cannot 
be used as a PojoType. Will be handled as GenericType");
return new GenericTypeInfo(clazz);
@@ -1803,12 +1803,17 @@ public class TypeExtractor {
}
 
/**
-* recursively determine all declared fields
+* Recursively determine all declared fields
 * This is required because class.getFields() is not returning fields 
defined
 * in parent classes.
+*
+* @param clazz class to be analyzed
+* @param ignoreDuplicates if true, in case of duplicate field names 
only the lowest one
+*in a hierarchy will be returned; throws 
an exception otherwise
+* @return list of fields
 */
@PublicEvolving
-   public static List getAllDeclaredFields(Class clazz) {
+   public static List getAllDeclaredFields(Class clazz, boolean 
ignoreDuplicates) {
List result = new ArrayList();
while (clazz != null) {
Field[] fields = clazz.getDeclaredFields();
@@ -1817,8 +1822,12 @@ public class TypeExtractor {
continue; // we have no use for 
transient or static fields
}
if(hasFieldWithSameName(field.getName(), 
result)) {
-   throw new RuntimeException("The field 
"+field+" is already contained in the hierarchy of the "+clazz+"."
+   if (ignoreDuplicates) {
+   continue;
+   } else {
+   throw new 
InvalidTypesException("The field "+field+" is already contained in the 
hierarchy of the "+clazz+"."
+ "Please use unique 
field names through your classes hierarchy");
+   }
}
result.add(field);
}
@@ -1829,7 +1838,7 @@ public class TypeExtractor {
 
@PublicEvolving
public static Field getDeclaredField(Class clazz, String name) {
-   f

flink git commit: [FLINK-5549] [core] TypeExtractor fails with RuntimeException, but should use GenericTypeInfo

2017-01-20 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/release-1.2 d4313731c -> 0a24611a8


[FLINK-5549] [core] TypeExtractor fails with RuntimeException, but should use 
GenericTypeInfo

This closes #3154.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a24611a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a24611a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a24611a

Branch: refs/heads/release-1.2
Commit: 0a24611a865ae6e321016672c951a8a632363bd4
Parents: d431373
Author: twalthr 
Authored: Wed Jan 18 14:43:32 2017 +0100
Committer: twalthr 
Committed: Fri Jan 20 16:11:19 2017 +0100

--
 .../flink/api/java/typeutils/TypeExtractor.java | 21 ++--
 .../java/typeutils/PojoTypeExtractionTest.java  |  5 +++--
 2 files changed, 18 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/0a24611a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 08f8c53..9b59a78 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -926,7 +926,7 @@ public class TypeExtractor {
// build the entire type hierarchy for the pojo
getTypeHierarchy(inputTypeHierarchy, inType, 
Object.class);
// determine a field containing the type variable
-   List fields = 
getAllDeclaredFields(typeToClass(inType));
+   List fields = 
getAllDeclaredFields(typeToClass(inType), false);
for (Field field : fields) {
Type fieldType = field.getGenericType();
if (fieldType instanceof TypeVariable && 
sameTypeVars(returnTypeVar, materializeTypeVariable(inputTypeHierarchy, 
(TypeVariable) fieldType))) {
@@ -1799,7 +1799,7 @@ public class TypeExtractor {
getTypeHierarchy(typeHierarchy, clazz, Object.class);
}

-   List fields = getAllDeclaredFields(clazz);
+   List fields = getAllDeclaredFields(clazz, false);
if (fields.size() == 0) {
LOG.info("No fields detected for " + clazz + ". Cannot 
be used as a PojoType. Will be handled as GenericType");
return new GenericTypeInfo(clazz);
@@ -1864,12 +1864,17 @@ public class TypeExtractor {
}
 
/**
-* recursively determine all declared fields
+* Recursively determine all declared fields
 * This is required because class.getFields() is not returning fields 
defined
 * in parent classes.
+*
+* @param clazz class to be analyzed
+* @param ignoreDuplicates if true, in case of duplicate field names 
only the lowest one
+*in a hierarchy will be returned; throws 
an exception otherwise
+* @return list of fields
 */
@PublicEvolving
-   public static List getAllDeclaredFields(Class clazz) {
+   public static List getAllDeclaredFields(Class clazz, boolean 
ignoreDuplicates) {
List result = new ArrayList();
while (clazz != null) {
Field[] fields = clazz.getDeclaredFields();
@@ -1878,8 +1883,12 @@ public class TypeExtractor {
continue; // we have no use for 
transient or static fields
}
if(hasFieldWithSameName(field.getName(), 
result)) {
-   throw new RuntimeException("The field 
"+field+" is already contained in the hierarchy of the "+clazz+"."
+   if (ignoreDuplicates) {
+   continue;
+   } else {
+   throw new 
InvalidTypesException("The field "+field+" is already contained in the 
hierarchy of the "+clazz+"."
+ "Please use unique 
field names through your classes hierarchy");
+   }
}
result.add(field);
}
@@ -1890,7 +1899,7 @@ public class TypeExtractor {
 
@PublicEvolving
public static Field getDeclaredField(Class clazz, String name) {
-  

flink git commit: [FLINK-5468] [savepoints] Improve error message for migrating semi async RocksDB snapshot

2017-01-20 Thread uce
Repository: flink
Updated Branches:
  refs/heads/release-1.2 108865db2 -> d4313731c


[FLINK-5468] [savepoints] Improve error message for migrating semi async 
RocksDB snapshot

This closes #3119.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d4313731
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d4313731
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d4313731

Branch: refs/heads/release-1.2
Commit: d4313731c215296a5db840d7d840407ed01ec2c9
Parents: 108865d
Author: Stefan Richter 
Authored: Fri Jan 13 15:19:37 2017 +0100
Committer: Ufuk Celebi 
Committed: Fri Jan 20 16:10:25 2017 +0100

--
 .../streaming/state/RocksDBStateBackend.java| 13 +++
 .../savepoint/SavepointV0Serializer.java| 41 ++--
 2 files changed, 34 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/d4313731/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
index 509eb4c..fa1cc45 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java
@@ -64,4 +64,17 @@ public class RocksDBStateBackend extends 
AbstractStateBackend {
stateHandle.close();
}
}
+
+   public static class FinalSemiAsyncSnapshot {
+
+   static {
+   throwExceptionOnLoadingThisClass();
+   }
+
+   private static void throwExceptionOnLoadingThisClass() {
+   throw new RuntimeException("Attempt to migrate RocksDB 
state created with semi async snapshot mode failed. "
+   + "Unfortunately, this is not 
supported. Please create a new savepoint for the job using fully "
+   + "async mode in Flink 1.1 and run 
migration again with the new savepoint.");
+   }
+   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d4313731/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
index 9e37dbb..2efe786 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java
@@ -106,16 +106,7 @@ public class SavepointV0Serializer implements 
SavepointSerializer {
for (int j = 0; j < numSubTaskStates; j++) {
int subtaskIndex = dis.readInt();
 
-   int length = dis.readInt();
-
-   SerializedValue> serializedValue;
-   if (length == -1) {
-   serializedValue = new 
SerializedValue<>(null);
-   } else {
-   byte[] serializedData = new 
byte[length];
-   dis.readFully(serializedData, 0, 
length);
-   serializedValue = 
SerializedValue.fromBytes(serializedData);
-   }
+   SerializedValue> serializedValue 
= readSerializedValueStateHandle(dis);
 
long stateSize = dis.readLong();
long duration = dis.readLong();
@@ -133,16 +124,7 @@ public class SavepointV0Serializer implements 
SavepointSerializer {
for (int j = 0; j < numKvStates; j++) {
int keyGroupIndex = dis.readInt();
 
-   int length = dis.readInt();
-
-   SerializedValue> serializedValue;
-   if (length == -1) {
-   serializedValue = new 
SerializedValue<>(null);
-

flink git commit: [FLINK-5561] [runtime] Fix DataInputDeserializer#available()

2017-01-20 Thread uce
Repository: flink
Updated Branches:
  refs/heads/release-1.2 e2a4f3232 -> 108865db2


[FLINK-5561] [runtime] Fix DataInputDeserializer#available()

This closes #3171.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/108865db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/108865db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/108865db

Branch: refs/heads/release-1.2
Commit: 108865db23aec48e2b26509fb9e6ccd1342f0405
Parents: e2a4f32
Author: Nico Kruber 
Authored: Wed Jan 18 18:52:57 2017 +0100
Committer: Ufuk Celebi 
Committed: Fri Jan 20 16:06:05 2017 +0100

--
 .../runtime/util/DataInputDeserializer.java |  4 +-
 .../runtime/util/DataInputDeserializerTest.java | 59 
 2 files changed, 61 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/108865db/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
index 9822a83..0f99496 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
@@ -55,7 +55,7 @@ public class DataInputDeserializer implements DataInputView, 
java.io.Serializabl
}
 
// 

-   //  Chaning buffers
+   //  Changing buffers
// 


public void setBuffer(ByteBuffer buffer) {
@@ -98,7 +98,7 @@ public class DataInputDeserializer implements DataInputView, 
java.io.Serializabl
 
public int available() {
if (position < end) {
-   return end - position - 1;
+   return end - position;
} else {
return 0;
}

http://git-wip-us.apache.org/repos/asf/flink/blob/108865db/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java
new file mode 100644
index 000..2032d45
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Test suite for the {@link DataInputDeserializer} class.
+ */
+public class DataInputDeserializerTest {
+
+   @Test
+   public void testAvailable() throws Exception {
+   byte[] bytes;
+   DataInputDeserializer dis;
+
+   bytes = new byte[] {};
+   dis = new DataInputDeserializer(bytes, 0, bytes.length);
+   Assert.assertEquals(bytes.length, dis.available());
+
+   bytes = new byte[] {1, 2, 3};
+   dis = new DataInputDeserializer(bytes, 0, bytes.length);
+   Assert.assertEquals(bytes.length, dis.available());
+
+   dis.readByte();
+   Assert.assertEquals(2, dis.available());
+   dis.readByte();
+   Assert.assertEquals(1, dis.available());
+   dis.readByte();
+   Assert.assertEquals(0, dis.available());
+
+   try {
+   dis.readByte();
+   Assert.fail("Did not throw expected IOException");
+   } catch (IOException e) {
+   // ignore
+ 

flink git commit: [FLINK-5561] [runtime] Fix DataInputDeserializer#available()

2017-01-20 Thread uce
Repository: flink
Updated Branches:
  refs/heads/master 2703d339a -> 6c4644de1


[FLINK-5561] [runtime] Fix DataInputDeserializer#available()

This closes #3171.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c4644de
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c4644de
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c4644de

Branch: refs/heads/master
Commit: 6c4644de19ef38ff96abe7ccbca8971f70258905
Parents: 2703d33
Author: Nico Kruber 
Authored: Wed Jan 18 18:52:57 2017 +0100
Committer: Ufuk Celebi 
Committed: Fri Jan 20 16:05:49 2017 +0100

--
 .../runtime/util/DataInputDeserializer.java |  4 +-
 .../runtime/util/DataInputDeserializerTest.java | 59 
 2 files changed, 61 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6c4644de/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
index 9822a83..0f99496 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
@@ -55,7 +55,7 @@ public class DataInputDeserializer implements DataInputView, 
java.io.Serializabl
}
 
// 

-   //  Chaning buffers
+   //  Changing buffers
// 


public void setBuffer(ByteBuffer buffer) {
@@ -98,7 +98,7 @@ public class DataInputDeserializer implements DataInputView, 
java.io.Serializabl
 
public int available() {
if (position < end) {
-   return end - position - 1;
+   return end - position;
} else {
return 0;
}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c4644de/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java
new file mode 100644
index 000..2032d45
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Test suite for the {@link DataInputDeserializer} class.
+ */
+public class DataInputDeserializerTest {
+
+   @Test
+   public void testAvailable() throws Exception {
+   byte[] bytes;
+   DataInputDeserializer dis;
+
+   bytes = new byte[] {};
+   dis = new DataInputDeserializer(bytes, 0, bytes.length);
+   Assert.assertEquals(bytes.length, dis.available());
+
+   bytes = new byte[] {1, 2, 3};
+   dis = new DataInputDeserializer(bytes, 0, bytes.length);
+   Assert.assertEquals(bytes.length, dis.available());
+
+   dis.readByte();
+   Assert.assertEquals(2, dis.available());
+   dis.readByte();
+   Assert.assertEquals(1, dis.available());
+   dis.readByte();
+   Assert.assertEquals(0, dis.available());
+
+   try {
+   dis.readByte();
+   Assert.fail("Did not throw expected IOException");
+   } catch (IOException e) {
+   // ignore
+   

flink git commit: [FLINK-5386] [table] Refactor window clause.

2017-01-20 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/master 8ccedd103 -> 6bf556e60


[FLINK-5386] [table] Refactor window clause.

- move window() before groupBy()
- make window alias mandatory
- groupBy() must include window alias

This closes #3046.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bf556e6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bf556e6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bf556e6

Branch: refs/heads/master
Commit: 6bf556e60148917794f81088fa20c5cc7465823a
Parents: 8ccedd1
Author: Jincheng Sun 
Authored: Thu Dec 15 10:31:33 2016 +0800
Committer: Fabian Hueske 
Committed: Fri Jan 20 15:08:25 2017 +0100

--
 docs/dev/table_api.md   |  57 ++--
 .../org/apache/flink/table/api/table.scala  |  92 +--
 .../org/apache/flink/table/api/windows.scala| 175 
 .../flink/table/plan/logical/groupWindows.scala |  54 ++--
 .../scala/batch/table/FieldProjectionTest.scala |   5 +-
 .../api/scala/batch/table/GroupWindowTest.scala | 113 +---
 .../scala/stream/table/AggregationsITCase.scala |  15 +-
 .../scala/stream/table/GroupWindowTest.scala| 273 ++-
 .../dataset/DataSetWindowAggregateITCase.scala  |  31 ++-
 9 files changed, 501 insertions(+), 314 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6bf556e6/docs/dev/table_api.md
--
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index f2f398c..0efd258 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1022,69 +1022,72 @@ Temporal intervals can be represented as number of 
months (`Types.INTERVAL_MONTH
 
 ### Windows
 
-The Table API is a declarative API to define queries on batch and streaming 
tables. Projection, selection, and union operations can be applied both on 
streaming and batch tables without additional semantics. Aggregations on 
(possibly) infinite streaming tables, however, can only be computed on finite 
groups of records. Group-window aggregates group rows into finite groups based 
on time or row-count intervals and evaluate aggregation functions once per 
group. For batch tables, group-windows are a convenient shortcut to group 
records by time intervals.
+The Table API is a declarative API to define queries on batch and streaming 
tables. Projection, selection, and union operations can be applied both on 
streaming and batch tables without additional semantics. Aggregations on 
(possibly) infinite streaming tables, however, can only be computed on finite 
groups of records. Window aggregates group rows into finite groups based on 
time or row-count intervals and evaluate aggregation functions once per group. 
For batch tables, windows are a convenient shortcut to group records by time 
intervals.
 
-Group-windows are defined using the `window(w: GroupWindow)` clause. The 
following example shows how to define a group-window aggregation on a table.
+Windows are defined using the `window(w: Window)` clause and require an alias, 
which is specified using the `as` clause. In order to group a table by a 
window, the window alias must be referenced in the `groupBy(...)` clause like a 
regular grouping attribute. 
+The following example shows how to define a window aggregation on a table.
 
 
 
 {% highlight java %}
 Table table = input
-  .window(GroupWindow w)  // define window
-  .select("b.sum")// aggregate
+  .window([Window w].as("w"))  // define window with alias w
+  .groupBy("w")  // group the table by window w
+  .select("b.sum")  // aggregate
 {% endhighlight %}
 
 
 
 {% highlight scala %}
 val table = input
-  .window(w: GroupWindow) // define window
-  .select('b.sum) // aggregate
+  .window([w: Window] as 'w)  // define window with alias w
+  .groupBy('w)   // group the table by window w
+  .select('b.sum)  // aggregate
 {% endhighlight %}
 
 
 
-In streaming environments, group-window aggregates can only be computed in 
parallel, if they are *keyed*, i.e., there is an additional `groupBy` 
attribute. Group-window aggregates without additional `groupBy`, such as in the 
example above, can only be evaluated in a single, non-parallel task. The 
following example shows how to define a keyed group-window aggregation on a 
table. 
+In streaming environments, window aggregates can only be computed in parallel 
if they group on one or more attributes in addition to the window, i.e., the 
`groupBy(...)` clause references a window alias and at least one additional 
attribute. A `groupBy(...)` clause that only references a window alias (such as 
in the example above) can only be evaluated by a single, non-parallel task. 
+The following example shows how to define a window aggregation with additional 
grouping 

flink git commit: [FLINK-5585] [jobmanager] Fix NullPointerException in JobManager.updateAccumulators

2017-01-20 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/release-1.2 6e85106b1 -> e2a4f3232


[FLINK-5585] [jobmanager] Fix NullPointerException in 
JobManager.updateAccumulators


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e2a4f323
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e2a4f323
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e2a4f323

Branch: refs/heads/release-1.2
Commit: e2a4f323292dfdee4c69c9261b265860cba6f6a0
Parents: 6e85106
Author: Stephan Ewen 
Authored: Fri Jan 20 11:12:12 2017 +0100
Committer: Stephan Ewen 
Committed: Fri Jan 20 14:45:59 2017 +0100

--
 .../flink/runtime/jobmanager/JobManager.scala   | 23 ++--
 1 file changed, 12 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f323/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
--
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index c5682e2..50a619c 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1844,18 +1844,19 @@ class JobManager(
*
* @param accumulators list of accumulator snapshots
*/
-  private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]) = {
-accumulators foreach {
-  case accumulatorEvent =>
-currentJobs.get(accumulatorEvent.getJobID) match {
-  case Some((jobGraph, jobInfo)) =>
-future {
-  jobGraph.updateAccumulators(accumulatorEvent)
-}(context.dispatcher)
-  case None =>
-  // ignore accumulator values for old job
+  private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]): 
Unit = {
+accumulators.foreach( snapshot => {
+if (snapshot != null) {
+  currentJobs.get(snapshot.getJobID) match {
+case Some((jobGraph, jobInfo)) =>
+  future {
+jobGraph.updateAccumulators(snapshot)
+  }(context.dispatcher)
+case None =>
+  // ignore accumulator values for old job
+  }
 }
-}
+})
   }
 
   /**



flink git commit: [hotfix] [table] Update documentation about limitations

2017-01-20 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master 1fc34502e -> 8ccedd103


[hotfix] [table] Update documentation about limitations


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ccedd10
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ccedd10
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ccedd10

Branch: refs/heads/master
Commit: 8ccedd103f1c6f31047e4dcd62b7bf4889e89d0f
Parents: 1fc3450
Author: twalthr 
Authored: Fri Jan 20 14:17:45 2017 +0100
Committer: twalthr 
Committed: Fri Jan 20 14:17:45 2017 +0100

--
 docs/dev/table_api.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8ccedd10/docs/dev/table_api.md
--
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 78f9a43..f2f398c 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1279,7 +1279,7 @@ A session window is defined by using the `Session` class 
as follows:
 Currently the following features are not supported yet:
 
 - Row-count windows on event-time
-- Session windows on batch tables
+- Non-grouped session windows on batch tables
 - Sliding windows on batch tables
 
 SQL



flink git commit: [FLINK-4693] [table] Add session group-windows for batch tables

2017-01-20 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master ddd7c36ec -> 1fc34502e


[FLINK-4693] [table] Add session group-windows for batch tables

This closes #3150.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1fc34502
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1fc34502
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1fc34502

Branch: refs/heads/master
Commit: 1fc34502e09bfde1f510fe8c22b34795170988ca
Parents: ddd7c36
Author: Jincheng Sun 
Authored: Wed Jan 18 12:34:34 2017 +0800
Committer: twalthr 
Committed: Fri Jan 20 13:57:13 2017 +0100

--
 .../nodes/dataset/DataSetWindowAggregate.scala  | 102 +++-
 .../AggregateAllTimeWindowFunction.scala|   3 +-
 .../aggregate/AggregateTimeWindowFunction.scala |   3 +-
 .../table/runtime/aggregate/AggregateUtil.scala | 122 +++---
 ...ionWindowAggregateCombineGroupFunction.scala | 128 ++
 ...sionWindowAggregateReduceGroupFunction.scala | 165 +++
 ...TumbleTimeWindowAggReduceGroupFunction.scala |   3 +-
 .../DataSetWindowAggregateMapFunction.scala |   2 +-
 ...rementalAggregateAllTimeWindowFunction.scala |   3 +-
 ...IncrementalAggregateTimeWindowFunction.scala |   3 +-
 .../aggregate/TimeWindowPropertyCollector.scala |   8 +-
 .../api/scala/batch/table/GroupWindowTest.scala |  23 +--
 .../dataset/DataSetWindowAggregateITCase.scala  |  32 
 13 files changed, 539 insertions(+), 58 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/1fc34502/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
index 79497e6..b165afa 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala
@@ -128,10 +128,8 @@ class DataSetWindowAggregate(
   inputDS,
   isTimeInterval(size.resultType),
   caseSensitive)
-
-  case EventTimeSessionGroupWindow(_, _, _) =>
-throw new UnsupportedOperationException(
-  "Event-time session windows in a batch environment are currently not 
supported")
+  case EventTimeSessionGroupWindow(_, _, gap) =>
+createEventTimeSessionWindowDataSet(inputDS, caseSensitive)
   case EventTimeSlidingGroupWindow(_, _, _, _) =>
 throw new UnsupportedOperationException(
   "Event-time sliding windows in a batch environment are currently not 
supported")
@@ -172,7 +170,7 @@ class DataSetWindowAggregate(
   grouping,
   inputType,
   isParserCaseSensitive)
-val groupReduceFunction = createDataSetWindowAggGroupReduceFunction(
+val groupReduceFunction = 
createDataSetWindowAggregationGroupReduceFunction(
   window,
   namedAggregates,
   inputType,
@@ -213,9 +211,101 @@ class DataSetWindowAggregate(
 // TODO: count tumbling all window on event-time should sort all the 
data set
 // on event time before applying the windowing logic.
 throw new UnsupportedOperationException(
-  "Count tumbling non-grouping window on event-time are currently not 
supported.")
+  "Count tumbling non-grouping windows on event-time are currently not 
supported.")
+  }
+}
+  }
+
+  private[this] def createEventTimeSessionWindowDataSet(
+inputDS: DataSet[Any],
+isParserCaseSensitive: Boolean): DataSet[Any] = {
+
+val groupingKeys = grouping.indices.toArray
+val rowTypeInfo = resultRowTypeInfo
+
+// grouping window
+if (groupingKeys.length > 0) {
+  // create mapFunction for initializing the aggregations
+  val mapFunction = createDataSetWindowPrepareMapFunction(
+window,
+namedAggregates,
+grouping,
+inputType,
+isParserCaseSensitive)
+
+  val mappedInput =
+inputDS
+.map(mapFunction)
+.name(prepareOperatorName)
+
+  val mapReturnType = 
mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType
+
+  // the position of the rowtime field in the intermediate result for map 
output
+  val rowTimeFieldPos = mapReturnType.getArity - 1
+
+  // do incremental aggregation
+  if (doAllSupportPartialAggregation(
+namedAggregates.map(_.getKey),
+inputType,
+grouping.length)) {
+
+// gets the window-start and window-end position  in the intermediate

flink git commit: [FLINK-5585] [jobmanager] Fix NullPointerException in JobManager.updateAccumulators

2017-01-20 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/release-1.1 931929bf8 -> f6f1c244c


[FLINK-5585] [jobmanager] Fix NullPointerException in 
JobManager.updateAccumulators


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6f1c244
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6f1c244
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6f1c244

Branch: refs/heads/release-1.1
Commit: f6f1c244cf149d451a32fb3231a6bf1168bc31d1
Parents: 931929b
Author: Stephan Ewen 
Authored: Fri Jan 20 11:12:12 2017 +0100
Committer: Stephan Ewen 
Committed: Fri Jan 20 11:14:35 2017 +0100

--
 .../flink/runtime/jobmanager/JobManager.scala   | 23 ++--
 1 file changed, 12 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f6f1c244/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
--
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d6d23d9..1720d94 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1784,18 +1784,19 @@ class JobManager(
*
* @param accumulators list of accumulator snapshots
*/
-  private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]) = {
-accumulators foreach {
-  case accumulatorEvent =>
-currentJobs.get(accumulatorEvent.getJobID) match {
-  case Some((jobGraph, jobInfo)) =>
-future {
-  jobGraph.updateAccumulators(accumulatorEvent)
-}(context.dispatcher)
-  case None =>
-  // ignore accumulator values for old job
+  private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]): 
Unit = {
+accumulators.foreach( snapshot => {
+if (snapshot != null) {
+  currentJobs.get(snapshot.getJobID) match {
+case Some((jobGraph, jobInfo)) =>
+  future {
+jobGraph.updateAccumulators(snapshot)
+  }(context.dispatcher)
+case None =>
+  // ignore accumulator values for old job
+  }
 }
-}
+})
   }
 
   /**



flink git commit: [FLINK-5515] [queryable state] Remove unused getSerializedValue call

2017-01-20 Thread uce
Repository: flink
Updated Branches:
  refs/heads/release-1.2 1c8a48f8f -> 6e85106b1


[FLINK-5515] [queryable state] Remove unused getSerializedValue call

This closes #3131.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e85106b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e85106b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e85106b

Branch: refs/heads/release-1.2
Commit: 6e85106b1c6e5913825ee256b2a7fc147fcb001a
Parents: 1c8a48f
Author: Nico Kruber 
Authored: Mon Jan 16 18:45:49 2017 +0100
Committer: Ufuk Celebi 
Committed: Fri Jan 20 12:39:01 2017 +0100

--
 .../org/apache/flink/runtime/query/netty/KvStateServerHandler.java | 2 --
 1 file changed, 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6e85106b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
index 8542099..34cf15f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
@@ -239,8 +239,6 @@ class KvStateServerHandler extends 
ChannelInboundHandlerAdapter {
 
success = true;
} else {
-   
kvState.getSerializedValue(serializedKeyAndNamespace);
-
// No data for the key/namespace. This 
is considered to be
// a failure.
ByteBuf unknownKey = 
KvStateRequestSerializer.serializeKvStateRequestFailure(



flink git commit: [FLINK-5515] [queryable state] Remove unused getSerializedValue call

2017-01-20 Thread uce
Repository: flink
Updated Branches:
  refs/heads/master 63a6af3be -> ddd7c36ec


[FLINK-5515] [queryable state] Remove unused getSerializedValue call

This closes #3131.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ddd7c36e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ddd7c36e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ddd7c36e

Branch: refs/heads/master
Commit: ddd7c36ece20e3f265ae11863c660b79b06e35bd
Parents: 63a6af3
Author: Nico Kruber 
Authored: Mon Jan 16 18:45:49 2017 +0100
Committer: Ufuk Celebi 
Committed: Fri Jan 20 12:38:51 2017 +0100

--
 .../org/apache/flink/runtime/query/netty/KvStateServerHandler.java | 2 --
 1 file changed, 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/ddd7c36e/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
index 8542099..34cf15f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java
@@ -239,8 +239,6 @@ class KvStateServerHandler extends 
ChannelInboundHandlerAdapter {
 
success = true;
} else {
-   
kvState.getSerializedValue(serializedKeyAndNamespace);
-
// No data for the key/namespace. This 
is considered to be
// a failure.
ByteBuf unknownKey = 
KvStateRequestSerializer.serializeKvStateRequestFailure(



flink git commit: [FLINK-5507] [queryable state] Remove list variant of asQueryableState

2017-01-20 Thread uce
Repository: flink
Updated Branches:
  refs/heads/release-1.2 1db810218 -> 1c8a48f8f


[FLINK-5507] [queryable state] Remove list variant of asQueryableState

The queryable state "sink" using ListState stores all incoming data
forever and is never cleaned. Eventually, it will pile up too much
memory and is thus of limited use.

This closes #3129.
This closes #3120 (left over).


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1c8a48f8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1c8a48f8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1c8a48f8

Branch: refs/heads/release-1.2
Commit: 1c8a48f8f70dea2d291aed5b5ebf19d4fafac168
Parents: 1db8102
Author: Nico Kruber 
Authored: Mon Jan 16 13:36:02 2017 +0100
Committer: Ufuk Celebi 
Committed: Fri Jan 20 12:32:29 2017 +0100

--
 .../streaming/api/datastream/KeyedStream.java   |  25 -
 .../flink/streaming/api/scala/KeyedStream.scala |  28 +
 .../flink/test/query/QueryableStateITCase.java  | 104 ---
 3 files changed, 2 insertions(+), 155 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/1c8a48f8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 73d8926..3e3afd3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -682,30 +681,6 @@ public class KeyedStream extends DataStream {
}
 
/**
-* Publishes the keyed stream as a queryable ListStance instance.
-*
-* @param queryableStateName Name under which to the publish the 
queryable state instance
-* @param stateDescriptor State descriptor to create state instance from
-* @return Queryable state instance
-*/
-   @PublicEvolving
-   public QueryableStateStream asQueryableState(
-   String queryableStateName,
-   ListStateDescriptor stateDescriptor) {
-
-   transform("Queryable state: " + queryableStateName,
-   getType(),
-   new 
QueryableAppendingStateOperator<>(queryableStateName, stateDescriptor));
-
-   
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
-
-   return new QueryableStateStream<>(
-   queryableStateName,
-   
getType().createSerializer(getExecutionConfig()),
-   
getKeyType().createSerializer(getExecutionConfig()));
-   }
-
-   /**
 * Publishes the keyed stream as a queryable FoldingState instance.
 *
 * @param queryableStateName Name under which to the publish the 
queryable state instance

http://git-wip-us.apache.org/repos/asf/flink/blob/1c8a48f8/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
--
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index b251ca6..99936e7 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -20,10 +20,10 @@ package org.apache.flink.streaming.api.scala
 
 import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
 import org.apache.flink.api.common.functions._
-import org.apache.flink.api.common.state.{FoldingStateDescriptor, 
ListStateDescriptor, ReducingStateDescriptor, ValueStateDescriptor}
+import org.apache.flink.api.common.state.{FoldingStateDescriptor, 
ReducingStateDescriptor, ValueStateDescriptor}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 im

flink git commit: [FLINK-5507] [queryable state] Remove list variant of asQueryableState

2017-01-20 Thread uce
Repository: flink
Updated Branches:
  refs/heads/master 7ff7f431d -> 63a6af3be


[FLINK-5507] [queryable state] Remove list variant of asQueryableState

The queryable state "sink" using ListState stores all incoming data
forever and is never cleaned. Eventually, it will pile up too much
memory and is thus of limited use.

This closes #3129.
This closes #3120 (left over).


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63a6af3b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63a6af3b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63a6af3b

Branch: refs/heads/master
Commit: 63a6af3bec17f2e45563d41032d1e60b8ee97d81
Parents: 7ff7f43
Author: Nico Kruber 
Authored: Mon Jan 16 13:36:02 2017 +0100
Committer: Ufuk Celebi 
Committed: Fri Jan 20 12:32:18 2017 +0100

--
 .../streaming/api/datastream/KeyedStream.java   |  25 -
 .../flink/streaming/api/scala/KeyedStream.scala |  28 +
 .../flink/test/query/QueryableStateITCase.java  | 104 ---
 3 files changed, 2 insertions(+), 155 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/63a6af3b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 73d8926..3e3afd3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -682,30 +681,6 @@ public class KeyedStream extends DataStream {
}
 
/**
-* Publishes the keyed stream as a queryable ListStance instance.
-*
-* @param queryableStateName Name under which to the publish the 
queryable state instance
-* @param stateDescriptor State descriptor to create state instance from
-* @return Queryable state instance
-*/
-   @PublicEvolving
-   public QueryableStateStream asQueryableState(
-   String queryableStateName,
-   ListStateDescriptor stateDescriptor) {
-
-   transform("Queryable state: " + queryableStateName,
-   getType(),
-   new 
QueryableAppendingStateOperator<>(queryableStateName, stateDescriptor));
-
-   
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
-
-   return new QueryableStateStream<>(
-   queryableStateName,
-   
getType().createSerializer(getExecutionConfig()),
-   
getKeyType().createSerializer(getExecutionConfig()));
-   }
-
-   /**
 * Publishes the keyed stream as a queryable FoldingState instance.
 *
 * @param queryableStateName Name under which to the publish the 
queryable state instance

http://git-wip-us.apache.org/repos/asf/flink/blob/63a6af3b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
--
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index b251ca6..99936e7 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -20,10 +20,10 @@ package org.apache.flink.streaming.api.scala
 
 import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
 import org.apache.flink.api.common.functions._
-import org.apache.flink.api.common.state.{FoldingStateDescriptor, 
ListStateDescriptor, ReducingStateDescriptor, ValueStateDescriptor}
+import org.apache.flink.api.common.state.{FoldingStateDescriptor, 
ReducingStateDescriptor, ValueStateDescriptor}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.a

[2/2] flink git commit: [FLINK-5482] [queryable state] Re-issue location lookup upon failure

2017-01-20 Thread uce
[FLINK-5482] [queryable state] Re-issue location lookup upon failure

Any failing lookup, e.g. in case the job has not been started yet, previously
remained in the lookup cache and thus future queries did not retry the lookup
and failed. This commit changes the lookup caching code so that completed
and failed futures are removed from the cache and replaced by new lookups.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1db81021
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1db81021
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1db81021

Branch: refs/heads/release-1.2
Commit: 1db8102184a30a3df5448189cbc0a99938b906ab
Parents: da10a2e
Author: Nico Kruber 
Authored: Thu Jan 12 16:48:27 2017 +0100
Committer: Ufuk Celebi 
Committed: Fri Jan 20 12:27:47 2017 +0100

--
 .../runtime/query/QueryableStateClient.java | 20 +-
 .../flink/test/query/QueryableStateITCase.java  | 73 
 2 files changed, 92 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/1db81021/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
index 98c3580..7ba3199 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
@@ -341,7 +341,25 @@ public class QueryableStateClient {
return previous;
}
} else {
-   return cachedFuture;
+   // do not retain futures which failed as they 
will remain in
+   // the cache even if the error cause is not 
present any more
+   // and a new lookup may succeed
+   if (cachedFuture.isCompleted() &&
+   cachedFuture.value().get().isFailure()) 
{
+   // issue a new lookup
+   Future lookupFuture = 
lookupService
+   .getKvStateLookupInfo(jobId, 
queryableStateName);
+
+   // replace the existing one if it has 
not been replaced yet
+   // otherwise return the one in the cache
+   if (lookupCache.replace(cacheKey, 
cachedFuture, lookupFuture)) {
+   return lookupFuture;
+   } else {
+   return 
lookupCache.get(cacheKey);
+   }
+   } else {
+   return cachedFuture;
+   }
}
}
}

http://git-wip-us.apache.org/repos/asf/flink/blob/1db81021/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index eccd8e0..88e4f9a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -640,6 +640,79 @@ public class QueryableStateITCase extends TestLogger {
}
 
/**
+* Similar tests as {@link #testValueState()} but before submitting the
+* job, we already issue one request which fails.
+*/
+   @Test
+   public void testQueryNonStartedJobState() throws Exception {
+   // Config
+   final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+   final int numElements = 1024;
+
+   final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
+
+   JobID jobId = null;
+   try {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(NUM_SLOTS);
+   // Very important, because cluster is shared between 
tests and we
+   // don't explicitly check that all slots are available 
before
+

[1/2] flink git commit: [FLINK-5482] [tests] Dedup code in QueryableStateITCase

2017-01-20 Thread uce
Repository: flink
Updated Branches:
  refs/heads/release-1.2 9073c53f9 -> 1db810218


[FLINK-5482] [tests] Dedup code in QueryableStateITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da10a2e9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da10a2e9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da10a2e9

Branch: refs/heads/release-1.2
Commit: da10a2e9fccb19f1bec626b9907de4e3a93be76d
Parents: 9073c53
Author: Nico Kruber 
Authored: Thu Jan 12 16:41:30 2017 +0100
Committer: Ufuk Celebi 
Committed: Fri Jan 20 12:27:43 2017 +0100

--
 .../flink/test/query/QueryableStateITCase.java  | 152 ++-
 1 file changed, 50 insertions(+), 102 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/da10a2e9/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index a5ed6ad..eccd8e0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -622,40 +622,8 @@ public class QueryableStateITCase extends TestLogger {
// Now query
long expected = numElements;
 
-   FiniteDuration retryDelay = new FiniteDuration(1, 
TimeUnit.SECONDS);
-   for (int key = 0; key < NUM_SLOTS; key++) {
-   final byte[] serializedKey = 
KvStateRequestSerializer.serializeKeyAndNamespace(
-   key,
-   
queryableState.getKeySerializer(),
-   VoidNamespace.INSTANCE,
-   
VoidNamespaceSerializer.INSTANCE);
-
-   boolean success = false;
-   while (deadline.hasTimeLeft() && !success) {
-   Future future = 
getKvStateWithRetries(client,
-   jobId,
-   
queryableState.getQueryableStateName(),
-   key,
-   serializedKey,
-   retryDelay);
-
-   byte[] serializedValue = 
Await.result(future, deadline.timeLeft());
-
-   Tuple2 value = 
KvStateRequestSerializer.deserializeValue(
-   serializedValue,
-   
queryableState.getValueSerializer());
-
-   assertEquals("Key mismatch", key, 
value.f0.intValue());
-   if (expected == value.f1) {
-   success = true;
-   } else {
-   // Retry
-   Thread.sleep(50);
-   }
-   }
-
-   assertTrue("Did not succeed query", success);
-   }
+   executeValueQuery(deadline, client, jobId, 
queryableState,
+   expected);
} finally {
// Free cluster resources
if (jobId != null) {
@@ -672,6 +640,50 @@ public class QueryableStateITCase extends TestLogger {
}
 
/**
+* Retry a query for state for keys between 0 and {@link #NUM_SLOTS} 
until
+* expected equals the value of the result tuple's second 
field.
+*/
+   private void executeValueQuery(final Deadline deadline,
+   final QueryableStateClient client, final JobID jobId,
+   final QueryableStateStream> 
queryableState,
+   final long expected) throws Exception {
+   FiniteDuration retryDelay = new FiniteDuration(1, 
TimeUnit.SECONDS);
+   for (int key = 0; key < NUM_SLOTS; key++) {
+   final byte[] serializedKey = 
KvStateRequestSerializer.serializeKeyAndNamespace(
+   key,
+   queryableState.getKeySerializer(),
+   VoidNamespace.INSTANCE,
+   VoidName

[1/2] flink git commit: [FLINK-5482] [tests] Dedup code in QueryableStateITCase

2017-01-20 Thread uce
Repository: flink
Updated Branches:
  refs/heads/master 883fc5a77 -> 7ff7f431d


[FLINK-5482] [tests] Dedup code in QueryableStateITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/661b3f90
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/661b3f90
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/661b3f90

Branch: refs/heads/master
Commit: 661b3f90e481f1de8a35041d5d136988414dc621
Parents: 883fc5a
Author: Nico Kruber 
Authored: Thu Jan 12 16:41:30 2017 +0100
Committer: Ufuk Celebi 
Committed: Fri Jan 20 12:26:45 2017 +0100

--
 .../flink/test/query/QueryableStateITCase.java  | 152 ++-
 1 file changed, 50 insertions(+), 102 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/661b3f90/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index a5ed6ad..eccd8e0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -622,40 +622,8 @@ public class QueryableStateITCase extends TestLogger {
// Now query
long expected = numElements;
 
-   FiniteDuration retryDelay = new FiniteDuration(1, 
TimeUnit.SECONDS);
-   for (int key = 0; key < NUM_SLOTS; key++) {
-   final byte[] serializedKey = 
KvStateRequestSerializer.serializeKeyAndNamespace(
-   key,
-   
queryableState.getKeySerializer(),
-   VoidNamespace.INSTANCE,
-   
VoidNamespaceSerializer.INSTANCE);
-
-   boolean success = false;
-   while (deadline.hasTimeLeft() && !success) {
-   Future future = 
getKvStateWithRetries(client,
-   jobId,
-   
queryableState.getQueryableStateName(),
-   key,
-   serializedKey,
-   retryDelay);
-
-   byte[] serializedValue = 
Await.result(future, deadline.timeLeft());
-
-   Tuple2 value = 
KvStateRequestSerializer.deserializeValue(
-   serializedValue,
-   
queryableState.getValueSerializer());
-
-   assertEquals("Key mismatch", key, 
value.f0.intValue());
-   if (expected == value.f1) {
-   success = true;
-   } else {
-   // Retry
-   Thread.sleep(50);
-   }
-   }
-
-   assertTrue("Did not succeed query", success);
-   }
+   executeValueQuery(deadline, client, jobId, 
queryableState,
+   expected);
} finally {
// Free cluster resources
if (jobId != null) {
@@ -672,6 +640,50 @@ public class QueryableStateITCase extends TestLogger {
}
 
/**
+* Retry a query for state for keys between 0 and {@link #NUM_SLOTS} 
until
+* expected equals the value of the result tuple's second 
field.
+*/
+   private void executeValueQuery(final Deadline deadline,
+   final QueryableStateClient client, final JobID jobId,
+   final QueryableStateStream> 
queryableState,
+   final long expected) throws Exception {
+   FiniteDuration retryDelay = new FiniteDuration(1, 
TimeUnit.SECONDS);
+   for (int key = 0; key < NUM_SLOTS; key++) {
+   final byte[] serializedKey = 
KvStateRequestSerializer.serializeKeyAndNamespace(
+   key,
+   queryableState.getKeySerializer(),
+   VoidNamespace.INSTANCE,
+   VoidNamespaceSeria

[2/2] flink git commit: [FLINK-5482] [queryable state] Re-issue location lookup upon failure

2017-01-20 Thread uce
[FLINK-5482] [queryable state] Re-issue location lookup upon failure

Any failing lookup, e.g. in case the job has not been started yet, previously
remained in the lookup cache and thus future queries did not retry the lookup
and failed. This commit changes the lookup caching code so that completed
and failed futures are removed from the cache and replaced by new lookups.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ff7f431
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ff7f431
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ff7f431

Branch: refs/heads/master
Commit: 7ff7f431dab1d5fa70d71747cda619b9f6491bd2
Parents: 661b3f9
Author: Nico Kruber 
Authored: Thu Jan 12 16:48:27 2017 +0100
Committer: Ufuk Celebi 
Committed: Fri Jan 20 12:26:50 2017 +0100

--
 .../runtime/query/QueryableStateClient.java | 20 +-
 .../flink/test/query/QueryableStateITCase.java  | 73 
 2 files changed, 92 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/7ff7f431/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
index 98c3580..7ba3199 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java
@@ -341,7 +341,25 @@ public class QueryableStateClient {
return previous;
}
} else {
-   return cachedFuture;
+   // do not retain futures which failed as they 
will remain in
+   // the cache even if the error cause is not 
present any more
+   // and a new lookup may succeed
+   if (cachedFuture.isCompleted() &&
+   cachedFuture.value().get().isFailure()) 
{
+   // issue a new lookup
+   Future lookupFuture = 
lookupService
+   .getKvStateLookupInfo(jobId, 
queryableStateName);
+
+   // replace the existing one if it has 
not been replaced yet
+   // otherwise return the one in the cache
+   if (lookupCache.replace(cacheKey, 
cachedFuture, lookupFuture)) {
+   return lookupFuture;
+   } else {
+   return 
lookupCache.get(cacheKey);
+   }
+   } else {
+   return cachedFuture;
+   }
}
}
}

http://git-wip-us.apache.org/repos/asf/flink/blob/7ff7f431/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
index eccd8e0..88e4f9a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java
@@ -640,6 +640,79 @@ public class QueryableStateITCase extends TestLogger {
}
 
/**
+* Similar tests as {@link #testValueState()} but before submitting the
+* job, we already issue one request which fails.
+*/
+   @Test
+   public void testQueryNonStartedJobState() throws Exception {
+   // Config
+   final Deadline deadline = TEST_TIMEOUT.fromNow();
+
+   final int numElements = 1024;
+
+   final QueryableStateClient client = new 
QueryableStateClient(cluster.configuration());
+
+   JobID jobId = null;
+   try {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setParallelism(NUM_SLOTS);
+   // Very important, because cluster is shared between 
tests and we
+   // don't explicitly check that all slots are available 
before
+