buildbot success in on flink-docs-release-0.10
The Buildbot has detected a restored build on builder flink-docs-release-0.10 while building . Full details are available at: https://ci.apache.org/builders/flink-docs-release-0.10/builds/459 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: bb_slave2_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.10' triggered this build Build Source Stamp: [branch release-0.10] HEAD Blamelist: Build succeeded! Sincerely, -The Buildbot
[1/2] flink git commit: [FLINK-5459] [docs] Add troubleshooting guide for classloading issues
Repository: flink Updated Branches: refs/heads/master 6dffe282b -> 3b6267555 [FLINK-5459] [docs] Add troubleshooting guide for classloading issues Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b626755 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b626755 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b626755 Branch: refs/heads/master Commit: 3b626755588a461c8f195a48c4777fd533bf54ec Parents: 24ff9eb Author: Stephan Ewen Authored: Fri Jan 20 18:50:21 2017 +0100 Committer: Stephan Ewen Committed: Fri Jan 20 18:54:30 2017 +0100 -- docs/monitoring/debugging_classloading.md | 101 ++--- 1 file changed, 92 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/3b626755/docs/monitoring/debugging_classloading.md -- diff --git a/docs/monitoring/debugging_classloading.md b/docs/monitoring/debugging_classloading.md index e4e908e..40822c0 100644 --- a/docs/monitoring/debugging_classloading.md +++ b/docs/monitoring/debugging_classloading.md @@ -27,19 +27,102 @@ under the License. ## Overview of Classloading in Flink - - What is in the Application Classloader for different deployment techs - - What is in the user code classloader +When running Flink applications, the JVM will load various classes over time. +These classes can be devided into two domains: - - Access to the user code classloader for applications + - The **Flink Framework** domain: This includes all code in the `/lib` directory in the Flink directory. +By default these are the classes of Apache Flink and its core dependencies. -## Classpath Setups + - The **User Code** domain: These are all classes that are included in the JAR file submitted via the CLI or web interface. +That includes the job's classes, and all libraries and connectors that are put into the uber JAR. + + +The class loading behaves slightly different for various Flink setups: + +**Standalone** + +When starting a the Flink cluster, the JobManagers and TaskManagers are started with the Flink framework classes in the +classpath. The classes from all jobs that are submitted against the cluster are loaded *dynamically*. + +**YARN** + +YARN classloading differs between single job deploymens and sessions: + + - When submitting a Flink job directly to YARN (via `bin/flink run -m yarn-cluster ...`), dedicated TaskManagers and +JobManagers are started for that job. Those JVMs have both Flink framework classes and user code classes in their classpath. +That means that there is *no dynamic classloading* involved in that case. + + - When starting a YARN session, the JobManagers and TaskManagers are started with the Flink framework classes in the +classpath. The classes from all jobs that are submitted against the session are loaded dynamically. + +**Mesos** + +Mesos setups following [this documentation](../setup/mesos.html) currently behave very much like the a +YARN session: The TaskManager and JobManager processes are started with the Flink framework classes in classpath, job +classes are loaded dynamically when the jobs are submitted. + + +## Avoiding Dynamic Classloading + +All components (JobManger, TaskManager, Client, ApplicationMaster, ...) log their classpath setting on startup. +They can be found as part of the environment information at the beginnign of the log. + +When running a setup where the Flink JobManager and TaskManagers are exclusive to one particular job, one can put JAR files +directly into the `/lib` folder to make sure they are part of the classpath and not loaded dynamically. + +It usually works to put the job's JAR file into the `/lib` directory. The JAR will be part of both the classpath +(the *AppClassLoader*) and the dynamic class loader (*FlinkUserCodeClassLoader*). +Because the AppClassLoader is the parent of the FlinkUserCodeClassLoader (and Java loads parent-first), this should +result in classes being loaded only once. + +For setups where the job's JAR file cannot be put to the `/lib` folder (for example because the setup is a session that is +used by multiple jobs), it may still be posible to put common libraries to the `/lib` folder, and avoid dynamic class loading +for those. + + +## Manual Classloading in the Job + +In some cases, a transformation function, source, or sink needs to manually load classes (dynamically via reflection). +To do that, it needs the classloader that has access to the job's classes. + +In that case, the functions (or sources or sinks) can be made a `RichFunction` (for example `RichMapFunction` or `RichWindowFunction`) +and access the user code class loader via `getRuntimeContext().getUserCodeClassLoader()`. + + +## X cannot be
[2/2] flink git commit: [FLINK-5585] [jobmanager] Fix NullPointerException in JobManager.updateAccumulators
[FLINK-5585] [jobmanager] Fix NullPointerException in JobManager.updateAccumulators Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24ff9eba Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24ff9eba Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24ff9eba Branch: refs/heads/master Commit: 24ff9ebac926dda13c029c79c44c40580a5a1a2f Parents: 6dffe28 Author: Stephan Ewen Authored: Fri Jan 20 11:12:12 2017 +0100 Committer: Stephan Ewen Committed: Fri Jan 20 18:54:30 2017 +0100 -- .../flink/runtime/jobmanager/JobManager.scala | 23 ++-- 1 file changed, 12 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/24ff9eba/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala -- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 4938cfc..d175c46 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1849,18 +1849,19 @@ class JobManager( * * @param accumulators list of accumulator snapshots */ - private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]) = { -accumulators foreach { - case accumulatorEvent => -currentJobs.get(accumulatorEvent.getJobID) match { - case Some((jobGraph, jobInfo)) => -future { - jobGraph.updateAccumulators(accumulatorEvent) -}(context.dispatcher) - case None => - // ignore accumulator values for old job + private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]): Unit = { +accumulators.foreach( snapshot => { +if (snapshot != null) { + currentJobs.get(snapshot.getJobID) match { +case Some((jobGraph, jobInfo)) => + future { +jobGraph.updateAccumulators(snapshot) + }(context.dispatcher) +case None => + // ignore accumulator values for old job + } } -} +}) } /**
flink git commit: [FLINK-5459] [docs] Add troubleshooting guide for classloading issues
Repository: flink Updated Branches: refs/heads/release-1.2 8c5edb2fc -> 289932cdb [FLINK-5459] [docs] Add troubleshooting guide for classloading issues Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/289932cd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/289932cd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/289932cd Branch: refs/heads/release-1.2 Commit: 289932cdbea695a529bbd618fe1764a41a06f3fb Parents: 8c5edb2 Author: Stephan Ewen Authored: Fri Jan 20 18:50:21 2017 +0100 Committer: Stephan Ewen Committed: Fri Jan 20 19:00:07 2017 +0100 -- docs/monitoring/debugging_classloading.md | 101 ++--- 1 file changed, 92 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/289932cd/docs/monitoring/debugging_classloading.md -- diff --git a/docs/monitoring/debugging_classloading.md b/docs/monitoring/debugging_classloading.md index e4e908e..40822c0 100644 --- a/docs/monitoring/debugging_classloading.md +++ b/docs/monitoring/debugging_classloading.md @@ -27,19 +27,102 @@ under the License. ## Overview of Classloading in Flink - - What is in the Application Classloader for different deployment techs - - What is in the user code classloader +When running Flink applications, the JVM will load various classes over time. +These classes can be devided into two domains: - - Access to the user code classloader for applications + - The **Flink Framework** domain: This includes all code in the `/lib` directory in the Flink directory. +By default these are the classes of Apache Flink and its core dependencies. -## Classpath Setups + - The **User Code** domain: These are all classes that are included in the JAR file submitted via the CLI or web interface. +That includes the job's classes, and all libraries and connectors that are put into the uber JAR. + + +The class loading behaves slightly different for various Flink setups: + +**Standalone** + +When starting a the Flink cluster, the JobManagers and TaskManagers are started with the Flink framework classes in the +classpath. The classes from all jobs that are submitted against the cluster are loaded *dynamically*. + +**YARN** + +YARN classloading differs between single job deploymens and sessions: + + - When submitting a Flink job directly to YARN (via `bin/flink run -m yarn-cluster ...`), dedicated TaskManagers and +JobManagers are started for that job. Those JVMs have both Flink framework classes and user code classes in their classpath. +That means that there is *no dynamic classloading* involved in that case. + + - When starting a YARN session, the JobManagers and TaskManagers are started with the Flink framework classes in the +classpath. The classes from all jobs that are submitted against the session are loaded dynamically. + +**Mesos** + +Mesos setups following [this documentation](../setup/mesos.html) currently behave very much like the a +YARN session: The TaskManager and JobManager processes are started with the Flink framework classes in classpath, job +classes are loaded dynamically when the jobs are submitted. + + +## Avoiding Dynamic Classloading + +All components (JobManger, TaskManager, Client, ApplicationMaster, ...) log their classpath setting on startup. +They can be found as part of the environment information at the beginnign of the log. + +When running a setup where the Flink JobManager and TaskManagers are exclusive to one particular job, one can put JAR files +directly into the `/lib` folder to make sure they are part of the classpath and not loaded dynamically. + +It usually works to put the job's JAR file into the `/lib` directory. The JAR will be part of both the classpath +(the *AppClassLoader*) and the dynamic class loader (*FlinkUserCodeClassLoader*). +Because the AppClassLoader is the parent of the FlinkUserCodeClassLoader (and Java loads parent-first), this should +result in classes being loaded only once. + +For setups where the job's JAR file cannot be put to the `/lib` folder (for example because the setup is a session that is +used by multiple jobs), it may still be posible to put common libraries to the `/lib` folder, and avoid dynamic class loading +for those. + + +## Manual Classloading in the Job + +In some cases, a transformation function, source, or sink needs to manually load classes (dynamically via reflection). +To do that, it needs the classloader that has access to the job's classes. + +In that case, the functions (or sources or sinks) can be made a `RichFunction` (for example `RichMapFunction` or `RichWindowFunction`) +and access the user code class loader via `getRuntimeContext().getUserCodeClassLoader()`. + + +## X
flink git commit: [FLINK-2662] [optimizer] Fix translation of broadcasted unions.
Repository: flink Updated Branches: refs/heads/release-1.1 f6f1c244c -> 6566b63aa [FLINK-2662] [optimizer] Fix translation of broadcasted unions. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6566b63a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6566b63a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6566b63a Branch: refs/heads/release-1.1 Commit: 6566b63aa50af946ecf0d028c4e284fc6dc2a55b Parents: f6f1c24 Author: Fabian Hueske Authored: Fri Jan 6 00:00:30 2017 +0100 Committer: Fabian Hueske Committed: Fri Jan 20 17:35:44 2017 +0100 -- .../flink/optimizer/dag/BinaryUnionNode.java| 35 ++ .../operators/BinaryUnionOpDescriptor.java | 4 ++ .../flink/optimizer/UnionReplacementTest.java | 72 ++-- 3 files changed, 106 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6566b63a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java -- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java index d262cf6..cb496a2 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java @@ -104,6 +104,8 @@ public class BinaryUnionNode extends TwoInputNode { throw new CompilerException("BinaryUnionNode has more than one successor."); } + boolean childrenSkippedDueToReplicatedInput = false; + // check if we have a cached version if (this.cachedPlans != null) { return this.cachedPlans; @@ -143,7 +145,30 @@ public class BinaryUnionNode extends TwoInputNode { // create all candidates for (PlanNode child1 : subPlans1) { + + if (child1.getGlobalProperties().isFullyReplicated()) { + // fully replicated input is always locally forwarded if parallelism is not changed + if (dopChange1) { + // can not continue with this child + childrenSkippedDueToReplicatedInput = true; + continue; + } else { + this.input1.setShipStrategy(ShipStrategyType.FORWARD); + } + } + for (PlanNode child2 : subPlans2) { + + if (child2.getGlobalProperties().isFullyReplicated()) { + // fully replicated input is always locally forwarded if parallelism is not changed + if (dopChange2) { + // can not continue with this child + childrenSkippedDueToReplicatedInput = true; + continue; + } else { + this.input2.setShipStrategy(ShipStrategyType.FORWARD); + } + } // check that the children go together. that is the case if they build upon the same // candidate at the joined branch plan. @@ -249,6 +274,16 @@ public class BinaryUnionNode extends TwoInputNode { } } + if(outputPlans.isEmpty()) { + if(childrenSkippedDueToReplicatedInput) { + throw new CompilerException("No plan meeting the requirements could be created @ " + this + + ". Most likely reason: Invalid use of replicated input."); + } else { + throw new CompilerException("No plan meeting the requirements could be created @ " + this + + ". Most likely reason: Too restrictive plan hints."); + } + } + // cost and prune the plans for (PlanNode node : outputPlans) { estimator.costOperator(node); http://git-wip-us.apache.org/repos/asf/flink/blob/6566b63a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java -
flink git commit: [FLINK-2662] [optimizer] Fix translation of broadcasted unions.
Repository: flink Updated Branches: refs/heads/release-1.2 03a1f25fa -> 8c5edb2fc [FLINK-2662] [optimizer] Fix translation of broadcasted unions. This closes #3083. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c5edb2f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c5edb2f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c5edb2f Branch: refs/heads/release-1.2 Commit: 8c5edb2fc8e905b881401815a39941ce2aab3bbd Parents: 03a1f25 Author: Fabian Hueske Authored: Fri Jan 6 00:00:30 2017 +0100 Committer: Fabian Hueske Committed: Fri Jan 20 16:59:42 2017 +0100 -- .../flink/optimizer/dag/BinaryUnionNode.java| 35 ++ .../operators/BinaryUnionOpDescriptor.java | 4 ++ .../flink/optimizer/UnionReplacementTest.java | 72 ++-- 3 files changed, 106 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/8c5edb2f/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java -- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java index d262cf6..cb496a2 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java @@ -104,6 +104,8 @@ public class BinaryUnionNode extends TwoInputNode { throw new CompilerException("BinaryUnionNode has more than one successor."); } + boolean childrenSkippedDueToReplicatedInput = false; + // check if we have a cached version if (this.cachedPlans != null) { return this.cachedPlans; @@ -143,7 +145,30 @@ public class BinaryUnionNode extends TwoInputNode { // create all candidates for (PlanNode child1 : subPlans1) { + + if (child1.getGlobalProperties().isFullyReplicated()) { + // fully replicated input is always locally forwarded if parallelism is not changed + if (dopChange1) { + // can not continue with this child + childrenSkippedDueToReplicatedInput = true; + continue; + } else { + this.input1.setShipStrategy(ShipStrategyType.FORWARD); + } + } + for (PlanNode child2 : subPlans2) { + + if (child2.getGlobalProperties().isFullyReplicated()) { + // fully replicated input is always locally forwarded if parallelism is not changed + if (dopChange2) { + // can not continue with this child + childrenSkippedDueToReplicatedInput = true; + continue; + } else { + this.input2.setShipStrategy(ShipStrategyType.FORWARD); + } + } // check that the children go together. that is the case if they build upon the same // candidate at the joined branch plan. @@ -249,6 +274,16 @@ public class BinaryUnionNode extends TwoInputNode { } } + if(outputPlans.isEmpty()) { + if(childrenSkippedDueToReplicatedInput) { + throw new CompilerException("No plan meeting the requirements could be created @ " + this + + ". Most likely reason: Invalid use of replicated input."); + } else { + throw new CompilerException("No plan meeting the requirements could be created @ " + this + + ". Most likely reason: Too restrictive plan hints."); + } + } + // cost and prune the plans for (PlanNode node : outputPlans) { estimator.costOperator(node); http://git-wip-us.apache.org/repos/asf/flink/blob/8c5edb2f/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java -
flink git commit: [FLINK-2662] [optimizer] Fix translation of broadcasted unions.
Repository: flink Updated Branches: refs/heads/master 8d6426320 -> 6dffe282b [FLINK-2662] [optimizer] Fix translation of broadcasted unions. This closes #3083. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6dffe282 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6dffe282 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6dffe282 Branch: refs/heads/master Commit: 6dffe282bc5459b9c099df9a77cc2c3f26b28ae5 Parents: 8d64263 Author: Fabian Hueske Authored: Fri Jan 6 00:00:30 2017 +0100 Committer: Fabian Hueske Committed: Fri Jan 20 16:58:24 2017 +0100 -- .../flink/optimizer/dag/BinaryUnionNode.java| 35 ++ .../operators/BinaryUnionOpDescriptor.java | 4 ++ .../flink/optimizer/UnionReplacementTest.java | 72 ++-- 3 files changed, 106 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6dffe282/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java -- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java index d262cf6..cb496a2 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/BinaryUnionNode.java @@ -104,6 +104,8 @@ public class BinaryUnionNode extends TwoInputNode { throw new CompilerException("BinaryUnionNode has more than one successor."); } + boolean childrenSkippedDueToReplicatedInput = false; + // check if we have a cached version if (this.cachedPlans != null) { return this.cachedPlans; @@ -143,7 +145,30 @@ public class BinaryUnionNode extends TwoInputNode { // create all candidates for (PlanNode child1 : subPlans1) { + + if (child1.getGlobalProperties().isFullyReplicated()) { + // fully replicated input is always locally forwarded if parallelism is not changed + if (dopChange1) { + // can not continue with this child + childrenSkippedDueToReplicatedInput = true; + continue; + } else { + this.input1.setShipStrategy(ShipStrategyType.FORWARD); + } + } + for (PlanNode child2 : subPlans2) { + + if (child2.getGlobalProperties().isFullyReplicated()) { + // fully replicated input is always locally forwarded if parallelism is not changed + if (dopChange2) { + // can not continue with this child + childrenSkippedDueToReplicatedInput = true; + continue; + } else { + this.input2.setShipStrategy(ShipStrategyType.FORWARD); + } + } // check that the children go together. that is the case if they build upon the same // candidate at the joined branch plan. @@ -249,6 +274,16 @@ public class BinaryUnionNode extends TwoInputNode { } } + if(outputPlans.isEmpty()) { + if(childrenSkippedDueToReplicatedInput) { + throw new CompilerException("No plan meeting the requirements could be created @ " + this + + ". Most likely reason: Invalid use of replicated input."); + } else { + throw new CompilerException("No plan meeting the requirements could be created @ " + this + + ". Most likely reason: Too restrictive plan hints."); + } + } + // cost and prune the plans for (PlanNode node : outputPlans) { estimator.costOperator(node); http://git-wip-us.apache.org/repos/asf/flink/blob/6dffe282/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/BinaryUnionOpDescriptor.java ---
flink git commit: [FLINK-5528] [tests] Reduce query retry delay in QueryableStateITCase
Repository: flink Updated Branches: refs/heads/release-1.2 694927634 -> 03a1f25fa [FLINK-5528] [tests] Reduce query retry delay in QueryableStateITCase Using 100ms instead of the 1s previously used does not impose too much additional query load and reduces the test suite's duration from 16-20s to 13-15s on my machine with the current set of unit tests. Further reductions in the retry delay do not yield more improvements so far. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/03a1f25f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/03a1f25f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/03a1f25f Branch: refs/heads/release-1.2 Commit: 03a1f25fa63bda30aa8c83751a2c75fb44dc98e8 Parents: 6949276 Author: Nico Kruber Authored: Tue Jan 17 15:01:32 2017 +0100 Committer: Ufuk Celebi Committed: Fri Jan 20 16:53:46 2017 +0100 -- .../flink/test/query/QueryableStateITCase.java| 18 +++--- 1 file changed, 7 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/03a1f25f/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java -- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java index 113f5c6..327a715 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java @@ -94,6 +94,7 @@ import static org.junit.Assert.fail; public class QueryableStateITCase extends TestLogger { private final static FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS); + private final static FiniteDuration QUERY_RETRY_DELAY = new FiniteDuration(100, TimeUnit.MILLISECONDS); private final static ActorSystem TEST_ACTOR_SYSTEM = AkkaUtils.createDefaultActorSystem(); @@ -200,8 +201,6 @@ public class QueryableStateITCase extends TestLogger { final AtomicLongArray counts = new AtomicLongArray(numKeys); - final FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS); - boolean allNonZero = false; while (!allNonZero && deadline.hasTimeLeft()) { allNonZero = true; @@ -230,7 +229,7 @@ public class QueryableStateITCase extends TestLogger { queryName, key, serializedKey, - retryDelay); + QUERY_RETRY_DELAY); serializedResult.onSuccess(new OnSuccess() { @Override @@ -347,14 +346,13 @@ public class QueryableStateITCase extends TestLogger { boolean success = false; while (!success && deadline.hasTimeLeft()) { - final FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS); Future serializedResultFuture = getKvStateWithRetries( client, jobId, queryName, key, serializedKey, - retryDelay); + QUERY_RETRY_DELAY); byte[] serializedResult = Await.result(serializedResultFuture, deadline.timeLeft()); @@ -451,14 +449,13 @@ public class QueryableStateITCase extends TestLogger { // Now start another task manager cluster.addTaskManager(); - final FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS); Future serializedResultFuture = getKvStateWithRetries( client, jobId, queryName, key, serializedKey, - retryDelay); + QUERY_RETRY_DELAY); byte[] serializedResult = Await.result(serializedResultFuture, deadline.timeLeft()); @@ -
flink git commit: [FLINK-5528] [tests] Reduce query retry delay in QueryableStateITCase
Repository: flink Updated Branches: refs/heads/master f266e8255 -> 8d6426320 [FLINK-5528] [tests] Reduce query retry delay in QueryableStateITCase Using 100ms instead of the 1s previously used does not impose too much additional query load and reduces the test suite's duration from 16-20s to 13-15s on my machine with the current set of unit tests. Further reductions in the retry delay do not yield more improvements so far. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8d642632 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8d642632 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8d642632 Branch: refs/heads/master Commit: 8d64263203b4af7f3fbbe2a30ef67ddf67cc45a5 Parents: f266e82 Author: Nico Kruber Authored: Tue Jan 17 15:01:32 2017 +0100 Committer: Ufuk Celebi Committed: Fri Jan 20 16:44:19 2017 +0100 -- .../flink/test/query/QueryableStateITCase.java| 18 +++--- 1 file changed, 7 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/8d642632/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java -- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java index 113f5c6..327a715 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java @@ -94,6 +94,7 @@ import static org.junit.Assert.fail; public class QueryableStateITCase extends TestLogger { private final static FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS); + private final static FiniteDuration QUERY_RETRY_DELAY = new FiniteDuration(100, TimeUnit.MILLISECONDS); private final static ActorSystem TEST_ACTOR_SYSTEM = AkkaUtils.createDefaultActorSystem(); @@ -200,8 +201,6 @@ public class QueryableStateITCase extends TestLogger { final AtomicLongArray counts = new AtomicLongArray(numKeys); - final FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS); - boolean allNonZero = false; while (!allNonZero && deadline.hasTimeLeft()) { allNonZero = true; @@ -230,7 +229,7 @@ public class QueryableStateITCase extends TestLogger { queryName, key, serializedKey, - retryDelay); + QUERY_RETRY_DELAY); serializedResult.onSuccess(new OnSuccess() { @Override @@ -347,14 +346,13 @@ public class QueryableStateITCase extends TestLogger { boolean success = false; while (!success && deadline.hasTimeLeft()) { - final FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS); Future serializedResultFuture = getKvStateWithRetries( client, jobId, queryName, key, serializedKey, - retryDelay); + QUERY_RETRY_DELAY); byte[] serializedResult = Await.result(serializedResultFuture, deadline.timeLeft()); @@ -451,14 +449,13 @@ public class QueryableStateITCase extends TestLogger { // Now start another task manager cluster.addTaskManager(); - final FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS); Future serializedResultFuture = getKvStateWithRetries( client, jobId, queryName, key, serializedKey, - retryDelay); + QUERY_RETRY_DELAY); byte[] serializedResult = Await.result(serializedResultFuture, deadline.timeLeft()); @@ -719,7 +716
flink git commit: [FLINK-5368] [kafka] Log msg if kafka topic doesn't have any partitions
Repository: flink Updated Branches: refs/heads/release-1.2 5cbaf796d -> 694927634 [FLINK-5368] [kafka] Log msg if kafka topic doesn't have any partitions This closes #3036. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/69492763 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/69492763 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/69492763 Branch: refs/heads/release-1.2 Commit: 69492763444a9cc5efb1e26e2864abce71787211 Parents: 5cbaf79 Author: HungUnicorn Authored: Mon Jan 9 16:48:24 2017 +0100 Committer: Tzu-Li (Gordon) Tai Committed: Fri Jan 20 16:51:54 2017 +0100 -- .../flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java| 3 +++ 1 file changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/69492763/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java -- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java index 29bb8e4..2b816c4 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java @@ -208,6 +208,9 @@ public class FlinkKafkaConsumer09 extends FlinkKafkaConsumerBase { if (partitionsForTopic != null) { partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); } + else{ + LOG.info("Unable to retrieve any partitions for the requested topic: {}", topic); + } } }
flink git commit: [FLINK-5580] [security] Fix path setting of shipped Kerberos keytabs in YARN mode
Repository: flink Updated Branches: refs/heads/release-1.2 3b5882afa -> 5cbaf796d [FLINK-5580] [security] Fix path setting of shipped Kerberos keytabs in YARN mode This closes #3177. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5cbaf796 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5cbaf796 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5cbaf796 Branch: refs/heads/release-1.2 Commit: 5cbaf796d2e40db26ccdcfc458f5f1baf0230bb6 Parents: 3b5882a Author: Tzu-Li (Gordon) Tai Authored: Fri Jan 20 01:41:05 2017 +0100 Committer: Tzu-Li (Gordon) Tai Committed: Fri Jan 20 16:50:55 2017 +0100 -- .../flink/yarn/YarnApplicationMasterRunner.java| 4 +++- .../apache/flink/yarn/YarnTaskManagerRunner.java | 17 + 2 files changed, 12 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/5cbaf796/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java -- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index e4027d4..ad9bc10 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -169,7 +169,9 @@ public class YarnApplicationMasterRunner { LOG.debug("YARN dynamic properties: {}", dynamicProperties); final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties); - if(keytabPath != null && remoteKeytabPrincipal != null) { + + // set keytab principal and replace path with the local path of the shipped keytab file in NodeManager + if (keytabPath != null && remoteKeytabPrincipal != null) { flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath); flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal); } http://git-wip-us.apache.org/repos/asf/flink/blob/5cbaf796/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java -- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java index 059f1aa..e41869a 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java @@ -93,11 +93,11 @@ public class YarnTaskManagerRunner { // tell akka to die in case of an error configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true); - String keytabPath = null; + String localKeytabPath = null; if(remoteKeytabPath != null) { File f = new File(currDir, Utils.KEYTAB_FILE_NAME); - keytabPath = f.getAbsolutePath(); - LOG.info("keytabPath: {}", keytabPath); + localKeytabPath = f.getAbsolutePath(); + LOG.info("localKeytabPath: {}", localKeytabPath); } UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); @@ -124,6 +124,12 @@ public class YarnTaskManagerRunner { hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); } + // set keytab principal and replace path with the local path of the shipped keytab file in NodeManager + if (localKeytabPath != null && remoteKeytabPrincipal != null) { + configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, localKeytabPath); + configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal); + } + SecurityUtils.SecurityConfiguration sc; if(hadoopConfiguration != null) { sc = new SecurityUtils.SecurityConfiguration(configuration, hadoopConfiguration); @@ -131,11 +137,6 @@ public class YarnTaskManagerRunner { sc = new SecurityUtils.SecurityConfiguration(configuration); } - if(keytabPath != null && remoteKeytabPrincipal != null) { -
flink git commit: [FLINK-5355] [kinesis] Handle AmazonKinesisException gracefully in Kinesis Streaming Connector
Repository: flink Updated Branches: refs/heads/release-1.2 be09143cd -> 3b5882afa [FLINK-5355] [kinesis] Handle AmazonKinesisException gracefully in Kinesis Streaming Connector This closes #3078. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b5882af Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b5882af Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b5882af Branch: refs/heads/release-1.2 Commit: 3b5882afa0c5e60cfb39c2b098fcae2b112a5990 Parents: be09143c Author: Scott Kidder Authored: Fri Dec 16 08:46:54 2016 -0800 Committer: Tzu-Li (Gordon) Tai Committed: Fri Jan 20 16:48:53 2017 +0100 -- .../connectors/kinesis/proxy/KinesisProxy.java | 58 ++ .../kinesis/proxy/KinesisProxyTest.java | 63 2 files changed, 108 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/3b5882af/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java -- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java index 9ffc8e6..0b0fccf 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java @@ -17,14 +17,15 @@ package org.apache.flink.streaming.connectors.kinesis.proxy; +import com.amazonaws.AmazonServiceException; import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.model.DescribeStreamRequest; import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.GetRecordsRequest; import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.GetShardIteratorResult; -import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.StreamStatus; import com.amazonaws.services.kinesis.model.Shard; @@ -193,12 +194,16 @@ public class KinesisProxy implements KinesisProxyInterface { while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) { try { getRecordsResult = kinesisClient.getRecords(getRecordsRequest); - } catch (ProvisionedThroughputExceededException ex) { - long backoffMillis = fullJitterBackoff( - getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++); - LOG.warn("Got ProvisionedThroughputExceededException. Backing off for " - + backoffMillis + " millis."); - Thread.sleep(backoffMillis); + } catch (AmazonServiceException ex) { + if (isRecoverableException(ex)) { + long backoffMillis = fullJitterBackoff( + getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++); + LOG.warn("Got recoverable AmazonServiceException. Backing off for " + + backoffMillis + " millis (" + ex.getErrorMessage() + ")"); + Thread.sleep(backoffMillis); + } else { + throw ex; + } } } @@ -237,12 +242,16 @@ public class KinesisProxy implements KinesisProxyInterface { try { getShardIteratorResult = kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(), shardIteratorType, startingSeqNum); - } catch (ProvisionedThroughputExceededException ex) { - long backoffMillis = fullJitterBackoff( - getShardIteratorBaseBackoffMillis, getS
flink git commit: [FLINK-5386] [table] Refactor window clause.
Repository: flink Updated Branches: refs/heads/release-1.2 0a24611a8 -> be09143cd [FLINK-5386] [table] Refactor window clause. - move window() before groupBy() - make window alias mandatory - groupBy() must include window alias Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be09143c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be09143c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be09143c Branch: refs/heads/release-1.2 Commit: be09143cd323d478811447b10807ae7f7d6a4b7b Parents: 0a24611 Author: Jincheng Sun Authored: Thu Dec 15 10:31:33 2016 +0800 Committer: Fabian Hueske Committed: Fri Jan 20 16:40:21 2017 +0100 -- docs/dev/table_api.md | 71 ++--- .../org/apache/flink/table/api/table.scala | 100 --- .../org/apache/flink/table/api/windows.scala| 175 .../flink/table/plan/logical/groupWindows.scala | 69 +++-- .../scala/batch/table/FieldProjectionTest.scala | 14 +- .../scala/stream/table/AggregationsITCase.scala | 15 +- .../scala/stream/table/GroupWindowTest.scala| 277 ++- 7 files changed, 429 insertions(+), 292 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/be09143c/docs/dev/table_api.md -- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index 0d7331b..80b61f9 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -1006,69 +1006,74 @@ Temporal intervals can be represented as number of months (`Types.INTERVAL_MONTH ### Windows -The Table API is a declarative API to define queries on batch and streaming tables. Projection, selection, and union operations can be applied both on streaming and batch tables without additional semantics. Aggregations on (possibly) infinite streaming tables, however, can only be computed on finite groups of records. Group-window aggregates group rows into finite groups based on time or row-count intervals and evaluate aggregation functions once per group. For batch tables, group-windows are a convenient shortcut to group records by time intervals. +The Table API is a declarative API to define queries on batch and streaming tables. Projection, selection, and union operations can be applied both on streaming and batch tables without additional semantics. Aggregations on (possibly) infinite streaming tables, however, can only be computed on finite groups of records. Window aggregates group rows into finite groups based on time or row-count intervals and evaluate aggregation functions once per group. -Group-windows are defined using the `window(w: GroupWindow)` clause. The following example shows how to define a group-window aggregation on a table. +**Note:** Windows are currently only supported for streaming tables. Support for batch tables will be added in the next release. + +Windows are defined using the `window(w: Window)` clause and require an alias, which is specified using the `as` clause. In order to group a table by a window, the window alias must be referenced in the `groupBy(...)` clause like a regular grouping attribute. +The following example shows how to define a window aggregation on a table. {% highlight java %} Table table = input - .window(GroupWindow w) // define window - .select("b.sum")// aggregate + .window([Window w].as("w")) // define window with alias w + .groupBy("w") // group the table by window w + .select("b.sum") // aggregate {% endhighlight %} {% highlight scala %} val table = input - .window(w: GroupWindow) // define window - .select('b.sum) // aggregate + .window([w: Window] as 'w) // define window with alias w + .groupBy('w) // group the table by window w + .select('b.sum) // aggregate {% endhighlight %} -In streaming environments, group-window aggregates can only be computed in parallel, if they are *keyed*, i.e., there is an additional `groupBy` attribute. Group-window aggregates without additional `groupBy`, such as in the example above, can only be evaluated in a single, non-parallel task. The following example shows how to define a keyed group-window aggregation on a table. +In streaming environments, window aggregates can only be computed in parallel if they group on one or more attributes in addition to the window, i.e., the `groupBy(...)` clause references a window alias and at least one additional attribute. A `groupBy(...)` clause that only references a window alias (such as in the example above) can only be evaluated by a single, non-parallel task. +The following example shows how to define a window aggregation with additional grouping attributes. {% highlight java %} Table table = input - .groupBy("a") - .windo
[2/3] flink git commit: [FLINK-5580] [security] Fix path setting of shipped Kerberos keytabs in YARN mode
[FLINK-5580] [security] Fix path setting of shipped Kerberos keytabs in YARN mode This closes #3177. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/640a149e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/640a149e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/640a149e Branch: refs/heads/master Commit: 640a149ea69c0fc2314c6d5b422500c6c9587f43 Parents: b380bd3 Author: Tzu-Li (Gordon) Tai Authored: Fri Jan 20 01:41:05 2017 +0100 Committer: Tzu-Li (Gordon) Tai Committed: Fri Jan 20 16:30:52 2017 +0100 -- .../flink/yarn/YarnApplicationMasterRunner.java| 2 ++ .../apache/flink/yarn/YarnTaskManagerRunner.java | 17 + 2 files changed, 11 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/640a149e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java -- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 71be589..2193174 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -169,6 +169,8 @@ public class YarnApplicationMasterRunner { LOG.debug("YARN dynamic properties: {}", dynamicProperties); final Configuration flinkConfig = createConfiguration(currDir, dynamicProperties); + + // set keytab principal and replace path with the local path of the shipped keytab file in NodeManager if (keytabPath != null && remoteKeytabPrincipal != null) { flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath); flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal); http://git-wip-us.apache.org/repos/asf/flink/blob/640a149e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java -- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java index 4a780e0..849a8a6 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java @@ -93,11 +93,11 @@ public class YarnTaskManagerRunner { // tell akka to die in case of an error configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true); - String keytabPath = null; + String localKeytabPath = null; if(remoteKeytabPath != null) { File f = new File(currDir, Utils.KEYTAB_FILE_NAME); - keytabPath = f.getAbsolutePath(); - LOG.info("keytabPath: {}", keytabPath); + localKeytabPath = f.getAbsolutePath(); + LOG.info("localKeytabPath: {}", localKeytabPath); } UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); @@ -124,6 +124,12 @@ public class YarnTaskManagerRunner { hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true"); } + // set keytab principal and replace path with the local path of the shipped keytab file in NodeManager + if (localKeytabPath != null && remoteKeytabPrincipal != null) { + configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, localKeytabPath); + configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal); + } + SecurityUtils.SecurityConfiguration sc; if (hadoopConfiguration != null) { sc = new SecurityUtils.SecurityConfiguration(configuration, hadoopConfiguration); @@ -131,11 +137,6 @@ public class YarnTaskManagerRunner { sc = new SecurityUtils.SecurityConfiguration(configuration); } - if (keytabPath != null && remoteKeytabPrincipal != null) { - configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath); - configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPr
[1/3] flink git commit: [FLINK-5355] [kinesis] Handle AmazonKinesisException gracefully in Kinesis Streaming Connector
Repository: flink Updated Branches: refs/heads/master 988729ec1 -> f266e8255 [FLINK-5355] [kinesis] Handle AmazonKinesisException gracefully in Kinesis Streaming Connector This closes #3078. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b380bd31 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b380bd31 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b380bd31 Branch: refs/heads/master Commit: b380bd313e036f2bc66941367f014c12bf8baa6d Parents: 988729e Author: Scott Kidder Authored: Fri Dec 16 08:46:54 2016 -0800 Committer: Tzu-Li (Gordon) Tai Committed: Fri Jan 20 16:29:40 2017 +0100 -- .../connectors/kinesis/proxy/KinesisProxy.java | 58 ++ .../kinesis/proxy/KinesisProxyTest.java | 63 2 files changed, 108 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/b380bd31/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java -- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java index 9ffc8e6..0b0fccf 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java @@ -17,14 +17,15 @@ package org.apache.flink.streaming.connectors.kinesis.proxy; +import com.amazonaws.AmazonServiceException; import com.amazonaws.services.kinesis.AmazonKinesisClient; import com.amazonaws.services.kinesis.model.DescribeStreamRequest; import com.amazonaws.services.kinesis.model.DescribeStreamResult; import com.amazonaws.services.kinesis.model.GetRecordsRequest; import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.GetShardIteratorResult; -import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException; import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.StreamStatus; import com.amazonaws.services.kinesis.model.Shard; @@ -193,12 +194,16 @@ public class KinesisProxy implements KinesisProxyInterface { while (attempt <= getRecordsMaxAttempts && getRecordsResult == null) { try { getRecordsResult = kinesisClient.getRecords(getRecordsRequest); - } catch (ProvisionedThroughputExceededException ex) { - long backoffMillis = fullJitterBackoff( - getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++); - LOG.warn("Got ProvisionedThroughputExceededException. Backing off for " - + backoffMillis + " millis."); - Thread.sleep(backoffMillis); + } catch (AmazonServiceException ex) { + if (isRecoverableException(ex)) { + long backoffMillis = fullJitterBackoff( + getRecordsBaseBackoffMillis, getRecordsMaxBackoffMillis, getRecordsExpConstant, attempt++); + LOG.warn("Got recoverable AmazonServiceException. Backing off for " + + backoffMillis + " millis (" + ex.getErrorMessage() + ")"); + Thread.sleep(backoffMillis); + } else { + throw ex; + } } } @@ -237,12 +242,16 @@ public class KinesisProxy implements KinesisProxyInterface { try { getShardIteratorResult = kinesisClient.getShardIterator(shard.getStreamName(), shard.getShard().getShardId(), shardIteratorType, startingSeqNum); - } catch (ProvisionedThroughputExceededException ex) { - long backoffMillis = fullJitterBackoff( - getShardIteratorBaseBackoffMillis, getShardIterato
[3/3] flink git commit: [FLINK-5368] [kafka] Log msg if kafka topic doesn't have any partitions
[FLINK-5368] [kafka] Log msg if kafka topic doesn't have any partitions This closes #3036. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f266e825 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f266e825 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f266e825 Branch: refs/heads/master Commit: f266e825557f4091094a866e6887f52ca54ff2d7 Parents: 640a149 Author: HungUnicorn Authored: Mon Jan 9 16:48:24 2017 +0100 Committer: Tzu-Li (Gordon) Tai Committed: Fri Jan 20 16:33:25 2017 +0100 -- .../flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java| 3 +++ 1 file changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/f266e825/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java -- diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java index 29bb8e4..2b816c4 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java @@ -208,6 +208,9 @@ public class FlinkKafkaConsumer09 extends FlinkKafkaConsumerBase { if (partitionsForTopic != null) { partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic)); } + else{ + LOG.info("Unable to retrieve any partitions for the requested topic: {}", topic); + } } }
flink git commit: [FLINK-5468] [savepoints] Improve error message for migrating semi async RocksDB snapshot
Repository: flink Updated Branches: refs/heads/master 6c4644de1 -> 988729ec1 [FLINK-5468] [savepoints] Improve error message for migrating semi async RocksDB snapshot This closes #3119. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/988729ec Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/988729ec Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/988729ec Branch: refs/heads/master Commit: 988729ec150713c584f9cefb0b5c7f7183de5181 Parents: 6c4644d Author: Stefan Richter Authored: Fri Jan 13 15:19:37 2017 +0100 Committer: Ufuk Celebi Committed: Fri Jan 20 16:10:09 2017 +0100 -- .../streaming/state/RocksDBStateBackend.java| 13 +++ .../savepoint/SavepointV0Serializer.java| 41 ++-- 2 files changed, 34 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/988729ec/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java -- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java index 509eb4c..fa1cc45 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java @@ -64,4 +64,17 @@ public class RocksDBStateBackend extends AbstractStateBackend { stateHandle.close(); } } + + public static class FinalSemiAsyncSnapshot { + + static { + throwExceptionOnLoadingThisClass(); + } + + private static void throwExceptionOnLoadingThisClass() { + throw new RuntimeException("Attempt to migrate RocksDB state created with semi async snapshot mode failed. " + + "Unfortunately, this is not supported. Please create a new savepoint for the job using fully " + + "async mode in Flink 1.1 and run migration again with the new savepoint."); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/988729ec/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java index 9e37dbb..2efe786 100644 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java @@ -106,16 +106,7 @@ public class SavepointV0Serializer implements SavepointSerializer { for (int j = 0; j < numSubTaskStates; j++) { int subtaskIndex = dis.readInt(); - int length = dis.readInt(); - - SerializedValue> serializedValue; - if (length == -1) { - serializedValue = new SerializedValue<>(null); - } else { - byte[] serializedData = new byte[length]; - dis.readFully(serializedData, 0, length); - serializedValue = SerializedValue.fromBytes(serializedData); - } + SerializedValue> serializedValue = readSerializedValueStateHandle(dis); long stateSize = dis.readLong(); long duration = dis.readLong(); @@ -133,16 +124,7 @@ public class SavepointV0Serializer implements SavepointSerializer { for (int j = 0; j < numKvStates; j++) { int keyGroupIndex = dis.readInt(); - int length = dis.readInt(); - - SerializedValue> serializedValue; - if (length == -1) { - serializedValue = new SerializedValue<>(null); -
flink git commit: [FLINK-5549] [core] TypeExtractor fails with RuntimeException, but should use GenericTypeInfo
Repository: flink Updated Branches: refs/heads/master 6bf556e60 -> 2703d339a [FLINK-5549] [core] TypeExtractor fails with RuntimeException, but should use GenericTypeInfo This closes #3154. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2703d339 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2703d339 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2703d339 Branch: refs/heads/master Commit: 2703d339abafdd9f39bc35faa1eed718501f71a7 Parents: 6bf556e Author: twalthr Authored: Wed Jan 18 14:43:32 2017 +0100 Committer: twalthr Committed: Fri Jan 20 16:04:57 2017 +0100 -- .../flink/api/java/typeutils/TypeExtractor.java | 21 ++-- .../java/typeutils/PojoTypeExtractionTest.java | 5 +++-- 2 files changed, 18 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/2703d339/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 8bf2867..a2664f9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -893,7 +893,7 @@ public class TypeExtractor { // build the entire type hierarchy for the pojo getTypeHierarchy(inputTypeHierarchy, inType, Object.class); // determine a field containing the type variable - List fields = getAllDeclaredFields(typeToClass(inType)); + List fields = getAllDeclaredFields(typeToClass(inType), false); for (Field field : fields) { Type fieldType = field.getGenericType(); if (fieldType instanceof TypeVariable && sameTypeVars(returnTypeVar, materializeTypeVariable(inputTypeHierarchy, (TypeVariable) fieldType))) { @@ -1738,7 +1738,7 @@ public class TypeExtractor { getTypeHierarchy(typeHierarchy, clazz, Object.class); } - List fields = getAllDeclaredFields(clazz); + List fields = getAllDeclaredFields(clazz, false); if (fields.size() == 0) { LOG.info("No fields detected for " + clazz + ". Cannot be used as a PojoType. Will be handled as GenericType"); return new GenericTypeInfo(clazz); @@ -1803,12 +1803,17 @@ public class TypeExtractor { } /** -* recursively determine all declared fields +* Recursively determine all declared fields * This is required because class.getFields() is not returning fields defined * in parent classes. +* +* @param clazz class to be analyzed +* @param ignoreDuplicates if true, in case of duplicate field names only the lowest one +*in a hierarchy will be returned; throws an exception otherwise +* @return list of fields */ @PublicEvolving - public static List getAllDeclaredFields(Class clazz) { + public static List getAllDeclaredFields(Class clazz, boolean ignoreDuplicates) { List result = new ArrayList(); while (clazz != null) { Field[] fields = clazz.getDeclaredFields(); @@ -1817,8 +1822,12 @@ public class TypeExtractor { continue; // we have no use for transient or static fields } if(hasFieldWithSameName(field.getName(), result)) { - throw new RuntimeException("The field "+field+" is already contained in the hierarchy of the "+clazz+"." + if (ignoreDuplicates) { + continue; + } else { + throw new InvalidTypesException("The field "+field+" is already contained in the hierarchy of the "+clazz+"." + "Please use unique field names through your classes hierarchy"); + } } result.add(field); } @@ -1829,7 +1838,7 @@ public class TypeExtractor { @PublicEvolving public static Field getDeclaredField(Class clazz, String name) { - f
flink git commit: [FLINK-5549] [core] TypeExtractor fails with RuntimeException, but should use GenericTypeInfo
Repository: flink Updated Branches: refs/heads/release-1.2 d4313731c -> 0a24611a8 [FLINK-5549] [core] TypeExtractor fails with RuntimeException, but should use GenericTypeInfo This closes #3154. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a24611a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a24611a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a24611a Branch: refs/heads/release-1.2 Commit: 0a24611a865ae6e321016672c951a8a632363bd4 Parents: d431373 Author: twalthr Authored: Wed Jan 18 14:43:32 2017 +0100 Committer: twalthr Committed: Fri Jan 20 16:11:19 2017 +0100 -- .../flink/api/java/typeutils/TypeExtractor.java | 21 ++-- .../java/typeutils/PojoTypeExtractionTest.java | 5 +++-- 2 files changed, 18 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/0a24611a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java -- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java index 08f8c53..9b59a78 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java @@ -926,7 +926,7 @@ public class TypeExtractor { // build the entire type hierarchy for the pojo getTypeHierarchy(inputTypeHierarchy, inType, Object.class); // determine a field containing the type variable - List fields = getAllDeclaredFields(typeToClass(inType)); + List fields = getAllDeclaredFields(typeToClass(inType), false); for (Field field : fields) { Type fieldType = field.getGenericType(); if (fieldType instanceof TypeVariable && sameTypeVars(returnTypeVar, materializeTypeVariable(inputTypeHierarchy, (TypeVariable) fieldType))) { @@ -1799,7 +1799,7 @@ public class TypeExtractor { getTypeHierarchy(typeHierarchy, clazz, Object.class); } - List fields = getAllDeclaredFields(clazz); + List fields = getAllDeclaredFields(clazz, false); if (fields.size() == 0) { LOG.info("No fields detected for " + clazz + ". Cannot be used as a PojoType. Will be handled as GenericType"); return new GenericTypeInfo(clazz); @@ -1864,12 +1864,17 @@ public class TypeExtractor { } /** -* recursively determine all declared fields +* Recursively determine all declared fields * This is required because class.getFields() is not returning fields defined * in parent classes. +* +* @param clazz class to be analyzed +* @param ignoreDuplicates if true, in case of duplicate field names only the lowest one +*in a hierarchy will be returned; throws an exception otherwise +* @return list of fields */ @PublicEvolving - public static List getAllDeclaredFields(Class clazz) { + public static List getAllDeclaredFields(Class clazz, boolean ignoreDuplicates) { List result = new ArrayList(); while (clazz != null) { Field[] fields = clazz.getDeclaredFields(); @@ -1878,8 +1883,12 @@ public class TypeExtractor { continue; // we have no use for transient or static fields } if(hasFieldWithSameName(field.getName(), result)) { - throw new RuntimeException("The field "+field+" is already contained in the hierarchy of the "+clazz+"." + if (ignoreDuplicates) { + continue; + } else { + throw new InvalidTypesException("The field "+field+" is already contained in the hierarchy of the "+clazz+"." + "Please use unique field names through your classes hierarchy"); + } } result.add(field); } @@ -1890,7 +1899,7 @@ public class TypeExtractor { @PublicEvolving public static Field getDeclaredField(Class clazz, String name) { -
flink git commit: [FLINK-5468] [savepoints] Improve error message for migrating semi async RocksDB snapshot
Repository: flink Updated Branches: refs/heads/release-1.2 108865db2 -> d4313731c [FLINK-5468] [savepoints] Improve error message for migrating semi async RocksDB snapshot This closes #3119. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d4313731 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d4313731 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d4313731 Branch: refs/heads/release-1.2 Commit: d4313731c215296a5db840d7d840407ed01ec2c9 Parents: 108865d Author: Stefan Richter Authored: Fri Jan 13 15:19:37 2017 +0100 Committer: Ufuk Celebi Committed: Fri Jan 20 16:10:25 2017 +0100 -- .../streaming/state/RocksDBStateBackend.java| 13 +++ .../savepoint/SavepointV0Serializer.java| 41 ++-- 2 files changed, 34 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/d4313731/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java -- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java index 509eb4c..fa1cc45 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/migration/contrib/streaming/state/RocksDBStateBackend.java @@ -64,4 +64,17 @@ public class RocksDBStateBackend extends AbstractStateBackend { stateHandle.close(); } } + + public static class FinalSemiAsyncSnapshot { + + static { + throwExceptionOnLoadingThisClass(); + } + + private static void throwExceptionOnLoadingThisClass() { + throw new RuntimeException("Attempt to migrate RocksDB state created with semi async snapshot mode failed. " + + "Unfortunately, this is not supported. Please create a new savepoint for the job using fully " + + "async mode in Flink 1.1 and run migration again with the new savepoint."); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/d4313731/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java index 9e37dbb..2efe786 100644 --- a/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.java @@ -106,16 +106,7 @@ public class SavepointV0Serializer implements SavepointSerializer { for (int j = 0; j < numSubTaskStates; j++) { int subtaskIndex = dis.readInt(); - int length = dis.readInt(); - - SerializedValue> serializedValue; - if (length == -1) { - serializedValue = new SerializedValue<>(null); - } else { - byte[] serializedData = new byte[length]; - dis.readFully(serializedData, 0, length); - serializedValue = SerializedValue.fromBytes(serializedData); - } + SerializedValue> serializedValue = readSerializedValueStateHandle(dis); long stateSize = dis.readLong(); long duration = dis.readLong(); @@ -133,16 +124,7 @@ public class SavepointV0Serializer implements SavepointSerializer { for (int j = 0; j < numKvStates; j++) { int keyGroupIndex = dis.readInt(); - int length = dis.readInt(); - - SerializedValue> serializedValue; - if (length == -1) { - serializedValue = new SerializedValue<>(null); -
flink git commit: [FLINK-5561] [runtime] Fix DataInputDeserializer#available()
Repository: flink Updated Branches: refs/heads/release-1.2 e2a4f3232 -> 108865db2 [FLINK-5561] [runtime] Fix DataInputDeserializer#available() This closes #3171. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/108865db Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/108865db Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/108865db Branch: refs/heads/release-1.2 Commit: 108865db23aec48e2b26509fb9e6ccd1342f0405 Parents: e2a4f32 Author: Nico Kruber Authored: Wed Jan 18 18:52:57 2017 +0100 Committer: Ufuk Celebi Committed: Fri Jan 20 16:06:05 2017 +0100 -- .../runtime/util/DataInputDeserializer.java | 4 +- .../runtime/util/DataInputDeserializerTest.java | 59 2 files changed, 61 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/108865db/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java index 9822a83..0f99496 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java @@ -55,7 +55,7 @@ public class DataInputDeserializer implements DataInputView, java.io.Serializabl } // - // Chaning buffers + // Changing buffers // public void setBuffer(ByteBuffer buffer) { @@ -98,7 +98,7 @@ public class DataInputDeserializer implements DataInputView, java.io.Serializabl public int available() { if (position < end) { - return end - position - 1; + return end - position; } else { return 0; } http://git-wip-us.apache.org/repos/asf/flink/blob/108865db/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java new file mode 100644 index 000..2032d45 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +/** + * Test suite for the {@link DataInputDeserializer} class. + */ +public class DataInputDeserializerTest { + + @Test + public void testAvailable() throws Exception { + byte[] bytes; + DataInputDeserializer dis; + + bytes = new byte[] {}; + dis = new DataInputDeserializer(bytes, 0, bytes.length); + Assert.assertEquals(bytes.length, dis.available()); + + bytes = new byte[] {1, 2, 3}; + dis = new DataInputDeserializer(bytes, 0, bytes.length); + Assert.assertEquals(bytes.length, dis.available()); + + dis.readByte(); + Assert.assertEquals(2, dis.available()); + dis.readByte(); + Assert.assertEquals(1, dis.available()); + dis.readByte(); + Assert.assertEquals(0, dis.available()); + + try { + dis.readByte(); + Assert.fail("Did not throw expected IOException"); + } catch (IOException e) { + // ignore +
flink git commit: [FLINK-5561] [runtime] Fix DataInputDeserializer#available()
Repository: flink Updated Branches: refs/heads/master 2703d339a -> 6c4644de1 [FLINK-5561] [runtime] Fix DataInputDeserializer#available() This closes #3171. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c4644de Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c4644de Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c4644de Branch: refs/heads/master Commit: 6c4644de19ef38ff96abe7ccbca8971f70258905 Parents: 2703d33 Author: Nico Kruber Authored: Wed Jan 18 18:52:57 2017 +0100 Committer: Ufuk Celebi Committed: Fri Jan 20 16:05:49 2017 +0100 -- .../runtime/util/DataInputDeserializer.java | 4 +- .../runtime/util/DataInputDeserializerTest.java | 59 2 files changed, 61 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6c4644de/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java index 9822a83..0f99496 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java @@ -55,7 +55,7 @@ public class DataInputDeserializer implements DataInputView, java.io.Serializabl } // - // Chaning buffers + // Changing buffers // public void setBuffer(ByteBuffer buffer) { @@ -98,7 +98,7 @@ public class DataInputDeserializer implements DataInputView, java.io.Serializabl public int available() { if (position < end) { - return end - position - 1; + return end - position; } else { return 0; } http://git-wip-us.apache.org/repos/asf/flink/blob/6c4644de/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java new file mode 100644 index 000..2032d45 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DataInputDeserializerTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +/** + * Test suite for the {@link DataInputDeserializer} class. + */ +public class DataInputDeserializerTest { + + @Test + public void testAvailable() throws Exception { + byte[] bytes; + DataInputDeserializer dis; + + bytes = new byte[] {}; + dis = new DataInputDeserializer(bytes, 0, bytes.length); + Assert.assertEquals(bytes.length, dis.available()); + + bytes = new byte[] {1, 2, 3}; + dis = new DataInputDeserializer(bytes, 0, bytes.length); + Assert.assertEquals(bytes.length, dis.available()); + + dis.readByte(); + Assert.assertEquals(2, dis.available()); + dis.readByte(); + Assert.assertEquals(1, dis.available()); + dis.readByte(); + Assert.assertEquals(0, dis.available()); + + try { + dis.readByte(); + Assert.fail("Did not throw expected IOException"); + } catch (IOException e) { + // ignore +
flink git commit: [FLINK-5386] [table] Refactor window clause.
Repository: flink Updated Branches: refs/heads/master 8ccedd103 -> 6bf556e60 [FLINK-5386] [table] Refactor window clause. - move window() before groupBy() - make window alias mandatory - groupBy() must include window alias This closes #3046. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bf556e6 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bf556e6 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bf556e6 Branch: refs/heads/master Commit: 6bf556e60148917794f81088fa20c5cc7465823a Parents: 8ccedd1 Author: Jincheng Sun Authored: Thu Dec 15 10:31:33 2016 +0800 Committer: Fabian Hueske Committed: Fri Jan 20 15:08:25 2017 +0100 -- docs/dev/table_api.md | 57 ++-- .../org/apache/flink/table/api/table.scala | 92 +-- .../org/apache/flink/table/api/windows.scala| 175 .../flink/table/plan/logical/groupWindows.scala | 54 ++-- .../scala/batch/table/FieldProjectionTest.scala | 5 +- .../api/scala/batch/table/GroupWindowTest.scala | 113 +--- .../scala/stream/table/AggregationsITCase.scala | 15 +- .../scala/stream/table/GroupWindowTest.scala| 273 ++- .../dataset/DataSetWindowAggregateITCase.scala | 31 ++- 9 files changed, 501 insertions(+), 314 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6bf556e6/docs/dev/table_api.md -- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index f2f398c..0efd258 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -1022,69 +1022,72 @@ Temporal intervals can be represented as number of months (`Types.INTERVAL_MONTH ### Windows -The Table API is a declarative API to define queries on batch and streaming tables. Projection, selection, and union operations can be applied both on streaming and batch tables without additional semantics. Aggregations on (possibly) infinite streaming tables, however, can only be computed on finite groups of records. Group-window aggregates group rows into finite groups based on time or row-count intervals and evaluate aggregation functions once per group. For batch tables, group-windows are a convenient shortcut to group records by time intervals. +The Table API is a declarative API to define queries on batch and streaming tables. Projection, selection, and union operations can be applied both on streaming and batch tables without additional semantics. Aggregations on (possibly) infinite streaming tables, however, can only be computed on finite groups of records. Window aggregates group rows into finite groups based on time or row-count intervals and evaluate aggregation functions once per group. For batch tables, windows are a convenient shortcut to group records by time intervals. -Group-windows are defined using the `window(w: GroupWindow)` clause. The following example shows how to define a group-window aggregation on a table. +Windows are defined using the `window(w: Window)` clause and require an alias, which is specified using the `as` clause. In order to group a table by a window, the window alias must be referenced in the `groupBy(...)` clause like a regular grouping attribute. +The following example shows how to define a window aggregation on a table. {% highlight java %} Table table = input - .window(GroupWindow w) // define window - .select("b.sum")// aggregate + .window([Window w].as("w")) // define window with alias w + .groupBy("w") // group the table by window w + .select("b.sum") // aggregate {% endhighlight %} {% highlight scala %} val table = input - .window(w: GroupWindow) // define window - .select('b.sum) // aggregate + .window([w: Window] as 'w) // define window with alias w + .groupBy('w) // group the table by window w + .select('b.sum) // aggregate {% endhighlight %} -In streaming environments, group-window aggregates can only be computed in parallel, if they are *keyed*, i.e., there is an additional `groupBy` attribute. Group-window aggregates without additional `groupBy`, such as in the example above, can only be evaluated in a single, non-parallel task. The following example shows how to define a keyed group-window aggregation on a table. +In streaming environments, window aggregates can only be computed in parallel if they group on one or more attributes in addition to the window, i.e., the `groupBy(...)` clause references a window alias and at least one additional attribute. A `groupBy(...)` clause that only references a window alias (such as in the example above) can only be evaluated by a single, non-parallel task. +The following example shows how to define a window aggregation with additional grouping
flink git commit: [FLINK-5585] [jobmanager] Fix NullPointerException in JobManager.updateAccumulators
Repository: flink Updated Branches: refs/heads/release-1.2 6e85106b1 -> e2a4f3232 [FLINK-5585] [jobmanager] Fix NullPointerException in JobManager.updateAccumulators Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e2a4f323 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e2a4f323 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e2a4f323 Branch: refs/heads/release-1.2 Commit: e2a4f323292dfdee4c69c9261b265860cba6f6a0 Parents: 6e85106 Author: Stephan Ewen Authored: Fri Jan 20 11:12:12 2017 +0100 Committer: Stephan Ewen Committed: Fri Jan 20 14:45:59 2017 +0100 -- .../flink/runtime/jobmanager/JobManager.scala | 23 ++-- 1 file changed, 12 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/e2a4f323/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala -- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index c5682e2..50a619c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1844,18 +1844,19 @@ class JobManager( * * @param accumulators list of accumulator snapshots */ - private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]) = { -accumulators foreach { - case accumulatorEvent => -currentJobs.get(accumulatorEvent.getJobID) match { - case Some((jobGraph, jobInfo)) => -future { - jobGraph.updateAccumulators(accumulatorEvent) -}(context.dispatcher) - case None => - // ignore accumulator values for old job + private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]): Unit = { +accumulators.foreach( snapshot => { +if (snapshot != null) { + currentJobs.get(snapshot.getJobID) match { +case Some((jobGraph, jobInfo)) => + future { +jobGraph.updateAccumulators(snapshot) + }(context.dispatcher) +case None => + // ignore accumulator values for old job + } } -} +}) } /**
flink git commit: [hotfix] [table] Update documentation about limitations
Repository: flink Updated Branches: refs/heads/master 1fc34502e -> 8ccedd103 [hotfix] [table] Update documentation about limitations Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ccedd10 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ccedd10 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ccedd10 Branch: refs/heads/master Commit: 8ccedd103f1c6f31047e4dcd62b7bf4889e89d0f Parents: 1fc3450 Author: twalthr Authored: Fri Jan 20 14:17:45 2017 +0100 Committer: twalthr Committed: Fri Jan 20 14:17:45 2017 +0100 -- docs/dev/table_api.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/8ccedd10/docs/dev/table_api.md -- diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md index 78f9a43..f2f398c 100644 --- a/docs/dev/table_api.md +++ b/docs/dev/table_api.md @@ -1279,7 +1279,7 @@ A session window is defined by using the `Session` class as follows: Currently the following features are not supported yet: - Row-count windows on event-time -- Session windows on batch tables +- Non-grouped session windows on batch tables - Sliding windows on batch tables SQL
flink git commit: [FLINK-4693] [table] Add session group-windows for batch tables
Repository: flink Updated Branches: refs/heads/master ddd7c36ec -> 1fc34502e [FLINK-4693] [table] Add session group-windows for batch tables This closes #3150. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1fc34502 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1fc34502 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1fc34502 Branch: refs/heads/master Commit: 1fc34502e09bfde1f510fe8c22b34795170988ca Parents: ddd7c36 Author: Jincheng Sun Authored: Wed Jan 18 12:34:34 2017 +0800 Committer: twalthr Committed: Fri Jan 20 13:57:13 2017 +0100 -- .../nodes/dataset/DataSetWindowAggregate.scala | 102 +++- .../AggregateAllTimeWindowFunction.scala| 3 +- .../aggregate/AggregateTimeWindowFunction.scala | 3 +- .../table/runtime/aggregate/AggregateUtil.scala | 122 +++--- ...ionWindowAggregateCombineGroupFunction.scala | 128 ++ ...sionWindowAggregateReduceGroupFunction.scala | 165 +++ ...TumbleTimeWindowAggReduceGroupFunction.scala | 3 +- .../DataSetWindowAggregateMapFunction.scala | 2 +- ...rementalAggregateAllTimeWindowFunction.scala | 3 +- ...IncrementalAggregateTimeWindowFunction.scala | 3 +- .../aggregate/TimeWindowPropertyCollector.scala | 8 +- .../api/scala/batch/table/GroupWindowTest.scala | 23 +-- .../dataset/DataSetWindowAggregateITCase.scala | 32 13 files changed, 539 insertions(+), 58 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/1fc34502/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala index 79497e6..b165afa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetWindowAggregate.scala @@ -128,10 +128,8 @@ class DataSetWindowAggregate( inputDS, isTimeInterval(size.resultType), caseSensitive) - - case EventTimeSessionGroupWindow(_, _, _) => -throw new UnsupportedOperationException( - "Event-time session windows in a batch environment are currently not supported") + case EventTimeSessionGroupWindow(_, _, gap) => +createEventTimeSessionWindowDataSet(inputDS, caseSensitive) case EventTimeSlidingGroupWindow(_, _, _, _) => throw new UnsupportedOperationException( "Event-time sliding windows in a batch environment are currently not supported") @@ -172,7 +170,7 @@ class DataSetWindowAggregate( grouping, inputType, isParserCaseSensitive) -val groupReduceFunction = createDataSetWindowAggGroupReduceFunction( +val groupReduceFunction = createDataSetWindowAggregationGroupReduceFunction( window, namedAggregates, inputType, @@ -213,9 +211,101 @@ class DataSetWindowAggregate( // TODO: count tumbling all window on event-time should sort all the data set // on event time before applying the windowing logic. throw new UnsupportedOperationException( - "Count tumbling non-grouping window on event-time are currently not supported.") + "Count tumbling non-grouping windows on event-time are currently not supported.") + } +} + } + + private[this] def createEventTimeSessionWindowDataSet( +inputDS: DataSet[Any], +isParserCaseSensitive: Boolean): DataSet[Any] = { + +val groupingKeys = grouping.indices.toArray +val rowTypeInfo = resultRowTypeInfo + +// grouping window +if (groupingKeys.length > 0) { + // create mapFunction for initializing the aggregations + val mapFunction = createDataSetWindowPrepareMapFunction( +window, +namedAggregates, +grouping, +inputType, +isParserCaseSensitive) + + val mappedInput = +inputDS +.map(mapFunction) +.name(prepareOperatorName) + + val mapReturnType = mapFunction.asInstanceOf[ResultTypeQueryable[Row]].getProducedType + + // the position of the rowtime field in the intermediate result for map output + val rowTimeFieldPos = mapReturnType.getArity - 1 + + // do incremental aggregation + if (doAllSupportPartialAggregation( +namedAggregates.map(_.getKey), +inputType, +grouping.length)) { + +// gets the window-start and window-end position in the intermediate
flink git commit: [FLINK-5585] [jobmanager] Fix NullPointerException in JobManager.updateAccumulators
Repository: flink Updated Branches: refs/heads/release-1.1 931929bf8 -> f6f1c244c [FLINK-5585] [jobmanager] Fix NullPointerException in JobManager.updateAccumulators Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6f1c244 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6f1c244 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6f1c244 Branch: refs/heads/release-1.1 Commit: f6f1c244cf149d451a32fb3231a6bf1168bc31d1 Parents: 931929b Author: Stephan Ewen Authored: Fri Jan 20 11:12:12 2017 +0100 Committer: Stephan Ewen Committed: Fri Jan 20 11:14:35 2017 +0100 -- .../flink/runtime/jobmanager/JobManager.scala | 23 ++-- 1 file changed, 12 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/f6f1c244/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala -- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index d6d23d9..1720d94 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1784,18 +1784,19 @@ class JobManager( * * @param accumulators list of accumulator snapshots */ - private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]) = { -accumulators foreach { - case accumulatorEvent => -currentJobs.get(accumulatorEvent.getJobID) match { - case Some((jobGraph, jobInfo)) => -future { - jobGraph.updateAccumulators(accumulatorEvent) -}(context.dispatcher) - case None => - // ignore accumulator values for old job + private def updateAccumulators(accumulators : Seq[AccumulatorSnapshot]): Unit = { +accumulators.foreach( snapshot => { +if (snapshot != null) { + currentJobs.get(snapshot.getJobID) match { +case Some((jobGraph, jobInfo)) => + future { +jobGraph.updateAccumulators(snapshot) + }(context.dispatcher) +case None => + // ignore accumulator values for old job + } } -} +}) } /**
flink git commit: [FLINK-5515] [queryable state] Remove unused getSerializedValue call
Repository: flink Updated Branches: refs/heads/release-1.2 1c8a48f8f -> 6e85106b1 [FLINK-5515] [queryable state] Remove unused getSerializedValue call This closes #3131. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e85106b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e85106b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e85106b Branch: refs/heads/release-1.2 Commit: 6e85106b1c6e5913825ee256b2a7fc147fcb001a Parents: 1c8a48f Author: Nico Kruber Authored: Mon Jan 16 18:45:49 2017 +0100 Committer: Ufuk Celebi Committed: Fri Jan 20 12:39:01 2017 +0100 -- .../org/apache/flink/runtime/query/netty/KvStateServerHandler.java | 2 -- 1 file changed, 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6e85106b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java index 8542099..34cf15f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java @@ -239,8 +239,6 @@ class KvStateServerHandler extends ChannelInboundHandlerAdapter { success = true; } else { - kvState.getSerializedValue(serializedKeyAndNamespace); - // No data for the key/namespace. This is considered to be // a failure. ByteBuf unknownKey = KvStateRequestSerializer.serializeKvStateRequestFailure(
flink git commit: [FLINK-5515] [queryable state] Remove unused getSerializedValue call
Repository: flink Updated Branches: refs/heads/master 63a6af3be -> ddd7c36ec [FLINK-5515] [queryable state] Remove unused getSerializedValue call This closes #3131. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ddd7c36e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ddd7c36e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ddd7c36e Branch: refs/heads/master Commit: ddd7c36ece20e3f265ae11863c660b79b06e35bd Parents: 63a6af3 Author: Nico Kruber Authored: Mon Jan 16 18:45:49 2017 +0100 Committer: Ufuk Celebi Committed: Fri Jan 20 12:38:51 2017 +0100 -- .../org/apache/flink/runtime/query/netty/KvStateServerHandler.java | 2 -- 1 file changed, 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/ddd7c36e/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java index 8542099..34cf15f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/KvStateServerHandler.java @@ -239,8 +239,6 @@ class KvStateServerHandler extends ChannelInboundHandlerAdapter { success = true; } else { - kvState.getSerializedValue(serializedKeyAndNamespace); - // No data for the key/namespace. This is considered to be // a failure. ByteBuf unknownKey = KvStateRequestSerializer.serializeKvStateRequestFailure(
flink git commit: [FLINK-5507] [queryable state] Remove list variant of asQueryableState
Repository: flink Updated Branches: refs/heads/release-1.2 1db810218 -> 1c8a48f8f [FLINK-5507] [queryable state] Remove list variant of asQueryableState The queryable state "sink" using ListState stores all incoming data forever and is never cleaned. Eventually, it will pile up too much memory and is thus of limited use. This closes #3129. This closes #3120 (left over). Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1c8a48f8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1c8a48f8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1c8a48f8 Branch: refs/heads/release-1.2 Commit: 1c8a48f8f70dea2d291aed5b5ebf19d4fafac168 Parents: 1db8102 Author: Nico Kruber Authored: Mon Jan 16 13:36:02 2017 +0100 Committer: Ufuk Celebi Committed: Fri Jan 20 12:32:29 2017 +0100 -- .../streaming/api/datastream/KeyedStream.java | 25 - .../flink/streaming/api/scala/KeyedStream.scala | 28 + .../flink/test/query/QueryableStateITCase.java | 104 --- 3 files changed, 2 insertions(+), 155 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/1c8a48f8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index 73d8926..3e3afd3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -24,7 +24,6 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -682,30 +681,6 @@ public class KeyedStream extends DataStream { } /** -* Publishes the keyed stream as a queryable ListStance instance. -* -* @param queryableStateName Name under which to the publish the queryable state instance -* @param stateDescriptor State descriptor to create state instance from -* @return Queryable state instance -*/ - @PublicEvolving - public QueryableStateStream asQueryableState( - String queryableStateName, - ListStateDescriptor stateDescriptor) { - - transform("Queryable state: " + queryableStateName, - getType(), - new QueryableAppendingStateOperator<>(queryableStateName, stateDescriptor)); - - stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig()); - - return new QueryableStateStream<>( - queryableStateName, - getType().createSerializer(getExecutionConfig()), - getKeyType().createSerializer(getExecutionConfig())); - } - - /** * Publishes the keyed stream as a queryable FoldingState instance. * * @param queryableStateName Name under which to the publish the queryable state instance http://git-wip-us.apache.org/repos/asf/flink/blob/1c8a48f8/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala -- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala index b251ca6..99936e7 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala @@ -20,10 +20,10 @@ package org.apache.flink.streaming.api.scala import org.apache.flink.annotation.{Internal, Public, PublicEvolving} import org.apache.flink.api.common.functions._ -import org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescriptor, ReducingStateDescriptor, ValueStateDescriptor} +import org.apache.flink.api.common.state.{FoldingStateDescriptor, ReducingStateDescriptor, ValueStateDescriptor} import org.apache.flink.api.common.typeinfo.TypeInformation im
flink git commit: [FLINK-5507] [queryable state] Remove list variant of asQueryableState
Repository: flink Updated Branches: refs/heads/master 7ff7f431d -> 63a6af3be [FLINK-5507] [queryable state] Remove list variant of asQueryableState The queryable state "sink" using ListState stores all incoming data forever and is never cleaned. Eventually, it will pile up too much memory and is thus of limited use. This closes #3129. This closes #3120 (left over). Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/63a6af3b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/63a6af3b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/63a6af3b Branch: refs/heads/master Commit: 63a6af3bec17f2e45563d41032d1e60b8ee97d81 Parents: 7ff7f43 Author: Nico Kruber Authored: Mon Jan 16 13:36:02 2017 +0100 Committer: Ufuk Celebi Committed: Fri Jan 20 12:32:18 2017 +0100 -- .../streaming/api/datastream/KeyedStream.java | 25 - .../flink/streaming/api/scala/KeyedStream.scala | 28 + .../flink/test/query/QueryableStateITCase.java | 104 --- 3 files changed, 2 insertions(+), 155 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/63a6af3b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java index 73d8926..3e3afd3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java @@ -24,7 +24,6 @@ import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.FoldFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -682,30 +681,6 @@ public class KeyedStream extends DataStream { } /** -* Publishes the keyed stream as a queryable ListStance instance. -* -* @param queryableStateName Name under which to the publish the queryable state instance -* @param stateDescriptor State descriptor to create state instance from -* @return Queryable state instance -*/ - @PublicEvolving - public QueryableStateStream asQueryableState( - String queryableStateName, - ListStateDescriptor stateDescriptor) { - - transform("Queryable state: " + queryableStateName, - getType(), - new QueryableAppendingStateOperator<>(queryableStateName, stateDescriptor)); - - stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig()); - - return new QueryableStateStream<>( - queryableStateName, - getType().createSerializer(getExecutionConfig()), - getKeyType().createSerializer(getExecutionConfig())); - } - - /** * Publishes the keyed stream as a queryable FoldingState instance. * * @param queryableStateName Name under which to the publish the queryable state instance http://git-wip-us.apache.org/repos/asf/flink/blob/63a6af3b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala -- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala index b251ca6..99936e7 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala @@ -20,10 +20,10 @@ package org.apache.flink.streaming.api.scala import org.apache.flink.annotation.{Internal, Public, PublicEvolving} import org.apache.flink.api.common.functions._ -import org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescriptor, ReducingStateDescriptor, ValueStateDescriptor} +import org.apache.flink.api.common.state.{FoldingStateDescriptor, ReducingStateDescriptor, ValueStateDescriptor} import org.apache.flink.api.common.typeinfo.TypeInformation import org.a
[2/2] flink git commit: [FLINK-5482] [queryable state] Re-issue location lookup upon failure
[FLINK-5482] [queryable state] Re-issue location lookup upon failure Any failing lookup, e.g. in case the job has not been started yet, previously remained in the lookup cache and thus future queries did not retry the lookup and failed. This commit changes the lookup caching code so that completed and failed futures are removed from the cache and replaced by new lookups. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1db81021 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1db81021 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1db81021 Branch: refs/heads/release-1.2 Commit: 1db8102184a30a3df5448189cbc0a99938b906ab Parents: da10a2e Author: Nico Kruber Authored: Thu Jan 12 16:48:27 2017 +0100 Committer: Ufuk Celebi Committed: Fri Jan 20 12:27:47 2017 +0100 -- .../runtime/query/QueryableStateClient.java | 20 +- .../flink/test/query/QueryableStateITCase.java | 73 2 files changed, 92 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/1db81021/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java index 98c3580..7ba3199 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java @@ -341,7 +341,25 @@ public class QueryableStateClient { return previous; } } else { - return cachedFuture; + // do not retain futures which failed as they will remain in + // the cache even if the error cause is not present any more + // and a new lookup may succeed + if (cachedFuture.isCompleted() && + cachedFuture.value().get().isFailure()) { + // issue a new lookup + Future lookupFuture = lookupService + .getKvStateLookupInfo(jobId, queryableStateName); + + // replace the existing one if it has not been replaced yet + // otherwise return the one in the cache + if (lookupCache.replace(cacheKey, cachedFuture, lookupFuture)) { + return lookupFuture; + } else { + return lookupCache.get(cacheKey); + } + } else { + return cachedFuture; + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/1db81021/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java -- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java index eccd8e0..88e4f9a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java @@ -640,6 +640,79 @@ public class QueryableStateITCase extends TestLogger { } /** +* Similar tests as {@link #testValueState()} but before submitting the +* job, we already issue one request which fails. +*/ + @Test + public void testQueryNonStartedJobState() throws Exception { + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final int numElements = 1024; + + final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); + + JobID jobId = null; + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(NUM_SLOTS); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before +
[1/2] flink git commit: [FLINK-5482] [tests] Dedup code in QueryableStateITCase
Repository: flink Updated Branches: refs/heads/release-1.2 9073c53f9 -> 1db810218 [FLINK-5482] [tests] Dedup code in QueryableStateITCase Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da10a2e9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da10a2e9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da10a2e9 Branch: refs/heads/release-1.2 Commit: da10a2e9fccb19f1bec626b9907de4e3a93be76d Parents: 9073c53 Author: Nico Kruber Authored: Thu Jan 12 16:41:30 2017 +0100 Committer: Ufuk Celebi Committed: Fri Jan 20 12:27:43 2017 +0100 -- .../flink/test/query/QueryableStateITCase.java | 152 ++- 1 file changed, 50 insertions(+), 102 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/da10a2e9/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java -- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java index a5ed6ad..eccd8e0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java @@ -622,40 +622,8 @@ public class QueryableStateITCase extends TestLogger { // Now query long expected = numElements; - FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS); - for (int key = 0; key < NUM_SLOTS; key++) { - final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace( - key, - queryableState.getKeySerializer(), - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE); - - boolean success = false; - while (deadline.hasTimeLeft() && !success) { - Future future = getKvStateWithRetries(client, - jobId, - queryableState.getQueryableStateName(), - key, - serializedKey, - retryDelay); - - byte[] serializedValue = Await.result(future, deadline.timeLeft()); - - Tuple2 value = KvStateRequestSerializer.deserializeValue( - serializedValue, - queryableState.getValueSerializer()); - - assertEquals("Key mismatch", key, value.f0.intValue()); - if (expected == value.f1) { - success = true; - } else { - // Retry - Thread.sleep(50); - } - } - - assertTrue("Did not succeed query", success); - } + executeValueQuery(deadline, client, jobId, queryableState, + expected); } finally { // Free cluster resources if (jobId != null) { @@ -672,6 +640,50 @@ public class QueryableStateITCase extends TestLogger { } /** +* Retry a query for state for keys between 0 and {@link #NUM_SLOTS} until +* expected equals the value of the result tuple's second field. +*/ + private void executeValueQuery(final Deadline deadline, + final QueryableStateClient client, final JobID jobId, + final QueryableStateStream> queryableState, + final long expected) throws Exception { + FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS); + for (int key = 0; key < NUM_SLOTS; key++) { + final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace( + key, + queryableState.getKeySerializer(), + VoidNamespace.INSTANCE, + VoidName
[1/2] flink git commit: [FLINK-5482] [tests] Dedup code in QueryableStateITCase
Repository: flink Updated Branches: refs/heads/master 883fc5a77 -> 7ff7f431d [FLINK-5482] [tests] Dedup code in QueryableStateITCase Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/661b3f90 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/661b3f90 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/661b3f90 Branch: refs/heads/master Commit: 661b3f90e481f1de8a35041d5d136988414dc621 Parents: 883fc5a Author: Nico Kruber Authored: Thu Jan 12 16:41:30 2017 +0100 Committer: Ufuk Celebi Committed: Fri Jan 20 12:26:45 2017 +0100 -- .../flink/test/query/QueryableStateITCase.java | 152 ++- 1 file changed, 50 insertions(+), 102 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/661b3f90/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java -- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java index a5ed6ad..eccd8e0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java @@ -622,40 +622,8 @@ public class QueryableStateITCase extends TestLogger { // Now query long expected = numElements; - FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS); - for (int key = 0; key < NUM_SLOTS; key++) { - final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace( - key, - queryableState.getKeySerializer(), - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE); - - boolean success = false; - while (deadline.hasTimeLeft() && !success) { - Future future = getKvStateWithRetries(client, - jobId, - queryableState.getQueryableStateName(), - key, - serializedKey, - retryDelay); - - byte[] serializedValue = Await.result(future, deadline.timeLeft()); - - Tuple2 value = KvStateRequestSerializer.deserializeValue( - serializedValue, - queryableState.getValueSerializer()); - - assertEquals("Key mismatch", key, value.f0.intValue()); - if (expected == value.f1) { - success = true; - } else { - // Retry - Thread.sleep(50); - } - } - - assertTrue("Did not succeed query", success); - } + executeValueQuery(deadline, client, jobId, queryableState, + expected); } finally { // Free cluster resources if (jobId != null) { @@ -672,6 +640,50 @@ public class QueryableStateITCase extends TestLogger { } /** +* Retry a query for state for keys between 0 and {@link #NUM_SLOTS} until +* expected equals the value of the result tuple's second field. +*/ + private void executeValueQuery(final Deadline deadline, + final QueryableStateClient client, final JobID jobId, + final QueryableStateStream> queryableState, + final long expected) throws Exception { + FiniteDuration retryDelay = new FiniteDuration(1, TimeUnit.SECONDS); + for (int key = 0; key < NUM_SLOTS; key++) { + final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace( + key, + queryableState.getKeySerializer(), + VoidNamespace.INSTANCE, + VoidNamespaceSeria
[2/2] flink git commit: [FLINK-5482] [queryable state] Re-issue location lookup upon failure
[FLINK-5482] [queryable state] Re-issue location lookup upon failure Any failing lookup, e.g. in case the job has not been started yet, previously remained in the lookup cache and thus future queries did not retry the lookup and failed. This commit changes the lookup caching code so that completed and failed futures are removed from the cache and replaced by new lookups. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ff7f431 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ff7f431 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ff7f431 Branch: refs/heads/master Commit: 7ff7f431dab1d5fa70d71747cda619b9f6491bd2 Parents: 661b3f9 Author: Nico Kruber Authored: Thu Jan 12 16:48:27 2017 +0100 Committer: Ufuk Celebi Committed: Fri Jan 20 12:26:50 2017 +0100 -- .../runtime/query/QueryableStateClient.java | 20 +- .../flink/test/query/QueryableStateITCase.java | 73 2 files changed, 92 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/7ff7f431/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java index 98c3580..7ba3199 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java @@ -341,7 +341,25 @@ public class QueryableStateClient { return previous; } } else { - return cachedFuture; + // do not retain futures which failed as they will remain in + // the cache even if the error cause is not present any more + // and a new lookup may succeed + if (cachedFuture.isCompleted() && + cachedFuture.value().get().isFailure()) { + // issue a new lookup + Future lookupFuture = lookupService + .getKvStateLookupInfo(jobId, queryableStateName); + + // replace the existing one if it has not been replaced yet + // otherwise return the one in the cache + if (lookupCache.replace(cacheKey, cachedFuture, lookupFuture)) { + return lookupFuture; + } else { + return lookupCache.get(cacheKey); + } + } else { + return cachedFuture; + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/7ff7f431/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java -- diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java index eccd8e0..88e4f9a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java @@ -640,6 +640,79 @@ public class QueryableStateITCase extends TestLogger { } /** +* Similar tests as {@link #testValueState()} but before submitting the +* job, we already issue one request which fails. +*/ + @Test + public void testQueryNonStartedJobState() throws Exception { + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final int numElements = 1024; + + final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); + + JobID jobId = null; + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(NUM_SLOTS); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before +