[jira] [Created] (FLINK-8830) YarnResourceManager throws NullPointerException
Piotr Nowojski created FLINK-8830: - Summary: YarnResourceManager throws NullPointerException Key: FLINK-8830 URL: https://issues.apache.org/jira/browse/FLINK-8830 Project: Flink Issue Type: Bug Components: YARN Affects Versions: 1.5.0 Reporter: Piotr Nowojski {code:java} java.lang.NullPointerException at java.io.File.(File.java:277) at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:502) at org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:445) at org.apache.flink.yarn.YarnResourceManager.onContainersAllocated(YarnResourceManager.java:338) at org.apache.flink.yarn.YarnResourceManagerTest$1.(YarnResourceManagerTest.java:340) at org.apache.flink.yarn.YarnResourceManagerTest.testStopWorker(YarnResourceManagerTest.java:317) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) {code} This exception is being thrown in `org.apache.flink.yarn.YarnResourceManagerTest#testStopWorker`. Exception apparently is being ignored, since the test completes. It seems like this line: {code:java} String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION){code} Is not guarded against returned null value. I don't know if that's a test or production code issue. CC [~till.rohrmann] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8806) Failure in UnionInputGate getNextBufferOrEvent()
[ https://issues.apache.org/jira/browse/FLINK-8806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber closed FLINK-8806. -- Resolution: Duplicate > Failure in UnionInputGate getNextBufferOrEvent() > > > Key: FLINK-8806 > URL: https://issues.apache.org/jira/browse/FLINK-8806 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.5.0, 1.6.0 >Reporter: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0, 1.6.0 > > > Error occurs in {{SelfConnectionITCase}}: > Full log: https://api.travis-ci.org/v3/job/346847455/log.txt > Exception Stack Trace > {code} > org.apache.flink.runtime.client.JobExecutionException: > java.lang.IllegalStateException > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:527) > at > org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:79) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510) > at > org.apache.flink.test.streaming.runtime.SelfConnectionITCase.differentDataStreamDifferentChain(SelfConnectionITCase.java:158) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runners.Suite.runChild(Suite.java:128) > at org.junit.runners.Suite.runChild(Suite.java:27) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:108) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:78) > at > org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:54) > at > org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:144) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) > Caused by: java.lang.IllegalStateException > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:179) > at > org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:173) > at > org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94) > at > org.apache.flink.streaming.runtime.io.StreamTwoInpu
[GitHub] flink pull request #5608: [FLINK-8821][TableAPI && SQL] Fix non-terminating ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5608 ---
[jira] [Commented] (FLINK-8821) Fix non-terminating decimal error
[ https://issues.apache.org/jira/browse/FLINK-8821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383351#comment-16383351 ] ASF GitHub Bot commented on FLINK-8821: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5608 > Fix non-terminating decimal error > - > > Key: FLINK-8821 > URL: https://issues.apache.org/jira/browse/FLINK-8821 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Major > > The DecimalAvgAggFunction lacks precision protection -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-8821) Fix non-terminating decimal error
[ https://issues.apache.org/jira/browse/FLINK-8821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-8821. - Resolution: Fixed Fix Version/s: 1.6.0 1.5.0 Fixed in 1.6.0: b31b707cb20f34633815718ff356e187f3397620 Fixed in 1.5.0: 347ec3848ec603ac452e394a5211cf888db6663f > Fix non-terminating decimal error > - > > Key: FLINK-8821 > URL: https://issues.apache.org/jira/browse/FLINK-8821 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Major > Fix For: 1.5.0, 1.6.0 > > > The DecimalAvgAggFunction lacks precision protection -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383356#comment-16383356 ] yanxiaobin commented on FLINK-8794: --- hi, [~pnowojski] ! Thank you for your suggestion! The downstream processor can ignore the files with "*pending" or "*in-progress" sufixes and "_" prefix, but I don't think it's a good way to deal with it. We can change this behaviour/add an option for BucketingSink to use temporary "in-progress" and "pending" directories instead of prefixes, but the temporary "in-progress" and "pending" directories is still also a subdirectory of the base directory, and the downstream processor may still read the base directory recursively, It also results in reading redundant dirty data. I think the temporary data produced during the program should be isolated from the final output data. Thanks! Also [~kkl0u] could you elaborate why rescaling forced us to keep lingering files? > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Bug > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383356#comment-16383356 ] yanxiaobin edited comment on FLINK-8794 at 3/2/18 8:40 AM: --- hi, [~pnowojski] Thank you for your suggestion! The downstream processor can ignore the files with "*pending" or "*in-progress" sufixes and "_" prefix, but I don't think it's a good way to deal with it. We can change this behaviour/add an option for BucketingSink to use temporary "in-progress" and "pending" directories instead of prefixes, but the temporary "in-progress" and "pending" directories is still also a subdirectory of the base directory, and the downstream processor may still read the base directory recursively, It also results in reading redundant dirty data. I think the temporary data produced during the program should be isolated from the final output data. Thanks! Also [~kkl0u] could you elaborate why rescaling forced us to keep lingering files? was (Author: backlight): hi, [~pnowojski] ! Thank you for your suggestion! The downstream processor can ignore the files with "*pending" or "*in-progress" sufixes and "_" prefix, but I don't think it's a good way to deal with it. We can change this behaviour/add an option for BucketingSink to use temporary "in-progress" and "pending" directories instead of prefixes, but the temporary "in-progress" and "pending" directories is still also a subdirectory of the base directory, and the downstream processor may still read the base directory recursively, It also results in reading redundant dirty data. I think the temporary data produced during the program should be isolated from the final output data. Thanks! Also [~kkl0u] could you elaborate why rescaling forced us to keep lingering files? > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Bug > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8831) Create SQL Client dependencies
Timo Walther created FLINK-8831: --- Summary: Create SQL Client dependencies Key: FLINK-8831 URL: https://issues.apache.org/jira/browse/FLINK-8831 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther A first minimum version of FLIP-24 for the upcoming Flink SQL Client has been merged to the master. We also merged possibilities to discover and configure table sources without a single line of code using string-based properties and Java service provider discovery. We are now facing the issue of how to manage dependencies in this new environment. It is different from how regular Flink projects are created (by setting up a a new Maven project and build a jar or fat jar). Ideally, a user should be able to select from a set of prepared connectors, catalogs, and formats. E.g., if a Kafka connector and Avro format is needed, all that should be required is to move a "flink-kafka.jar" and "flink-avro.jar" into the "sql_lib" directory that is shipped to a Flink cluster together with the SQL query. [As discussed on ML|http://mail-archives.apache.org/mod_mbox/flink-dev/201802.mbox/%3C9c73518b-ec8e-3b01-f200-dea816c75efc%40apache.org%3E], we will build fat jars for these modules with every Flink release that can be hostet somewhere (e.g. Apache infrastructure, but not Maven central). This would make it very easy to add a dependency by downloading the prepared JAR files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8832) Create a SQL Client Kafka fat-jar
Timo Walther created FLINK-8832: --- Summary: Create a SQL Client Kafka fat-jar Key: FLINK-8832 URL: https://issues.apache.org/jira/browse/FLINK-8832 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther Create fat-jars for Apache Kafka. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8833) Create SQL Client JSON format fat-jar
Timo Walther created FLINK-8833: --- Summary: Create SQL Client JSON format fat-jar Key: FLINK-8833 URL: https://issues.apache.org/jira/browse/FLINK-8833 Project: Flink Issue Type: Sub-task Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther Create a fat-jar for flink-json. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-7282) Credit-based Network Flow Control
[ https://issues.apache.org/jira/browse/FLINK-7282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhijiang reassigned FLINK-7282: --- Assignee: zhijiang > Credit-based Network Flow Control > - > > Key: FLINK-7282 > URL: https://issues.apache.org/jira/browse/FLINK-7282 > Project: Flink > Issue Type: New Feature > Components: Network >Reporter: zhijiang >Assignee: zhijiang >Priority: Major > > This is a part of work for network stack improvements proposed in > [~StephanEwen] 's > [FLIP|https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit#] > Backpressure currently happens very naturally through the TCP network > connections and the bounded buffering capacity. The downsides are : > * All channels multiplexed into the same TCP connection stall together, as > soon as one channel has backpressure. > * Under backpressure, connections can not transport checkpoint barriers. > This flink-managed flow control is similar to the window-based advertisement > mechanism in TCP. The basic approaches are the following: > * Each RemoteInputChannel has fixed exclusive buffers as initial credits, and > SingleInputGate has a fixed buffer pool for managing floating buffers for all > RemoteInputChannels. > * RemoteInputChannel as receiver notifies the current available credits to > the sender side. > * Senders must never send buffers without credit, that means all the buffers > sent must be accepted by receivers so no buffers accumulated on the network > wire. > * Senders also send the current size of backlog that indicates how many > buffers are available on the sender side. The receivers use this information > to decide how to request floating buffers from the fixed buffer pool. > To avoid immediate commits affecting master branch, it will be implemented > into a separate feature branch. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8833) Create a SQL Client JSON format fat-jar
[ https://issues.apache.org/jira/browse/FLINK-8833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther updated FLINK-8833: Summary: Create a SQL Client JSON format fat-jar (was: Create SQL Client JSON format fat-jar) > Create a SQL Client JSON format fat-jar > --- > > Key: FLINK-8833 > URL: https://issues.apache.org/jira/browse/FLINK-8833 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > > Create a fat-jar for flink-json. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8828) Add collect method to DataStream / DataSet scala api
[ https://issues.apache.org/jira/browse/FLINK-8828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383391#comment-16383391 ] ASF GitHub Bot commented on FLINK-8828: --- GitHub user jelmerk opened a pull request: https://github.com/apache/flink/pull/5616 [FLINK-8828] [stream, dataset, scala] Introduce collect method ## What is the purpose of the change A collect function is a method that takes a Partial Function as its parameter and applies it to all the elements in the collection to create a new collection which satisfies the Partial Function. It makes certain things nicer to express ## Brief change log - added collect method on scala dataset and datastream api ## Verifying this change It seems to be hard to find a place where this could be tested in isolation, suggestions welcome ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: don't know - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper:don't know - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/jelmerk/flink collect_support Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5616.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5616 commit 61b4bcb7c941950d62d0db0aa2041f1796fdbaaf Author: Jelmer Kuperus Date: 2018-03-02T09:17:54Z [FLINK-8828] [stream, dataset, scala] Introduce collect method > Add collect method to DataStream / DataSet scala api > > > Key: FLINK-8828 > URL: https://issues.apache.org/jira/browse/FLINK-8828 > Project: Flink > Issue Type: Improvement > Components: Core, DataSet API, DataStream API, Scala API >Affects Versions: 1.4.0 >Reporter: Jelmer Kuperus >Priority: Major > > A collect function is a method that takes a Partial Function as its parameter > and applies it to all the elements in the collection to create a new > collection which satisfies the Partial Function. > It can be found on all [core scala collection > classes|http://www.scala-lang.org/api/2.9.2/scala/collection/TraversableLike.html] > as well as on spark's [rdd > interface|https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD] > To understand its utility imagine the following scenario : > Given a DataStream that produces events of type _Purchase_ and _View_ > Transform this stream into a stream of purchase amounts over 1000 euros. > Currently an implementation might look like > {noformat} > val x = dataStream > .filter(_.isInstanceOf[Purchase]) > .map(_.asInstanceOf[Purchase]) > .filter(_.amount > 1000) > .map(_.amount){noformat} > Or alternatively you could do this > {noformat} > dataStream.flatMap(_ match { > case p: Purchase if p.amount > 1000 => Some(p.amount) > case _ => None > }){noformat} > But with collect implemented it could look like > {noformat} > dataStream.collect { > case p: Purchase if p.amount > 1000 => p.amount > }{noformat} > > Which is a lot nicer to both read and write -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5616: [FLINK-8828] [stream, dataset, scala] Introduce co...
GitHub user jelmerk opened a pull request: https://github.com/apache/flink/pull/5616 [FLINK-8828] [stream, dataset, scala] Introduce collect method ## What is the purpose of the change A collect function is a method that takes a Partial Function as its parameter and applies it to all the elements in the collection to create a new collection which satisfies the Partial Function. It makes certain things nicer to express ## Brief change log - added collect method on scala dataset and datastream api ## Verifying this change It seems to be hard to find a place where this could be tested in isolation, suggestions welcome ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: yes - The serializers: don't know - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper:don't know - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/jelmerk/flink collect_support Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5616.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5616 commit 61b4bcb7c941950d62d0db0aa2041f1796fdbaaf Author: Jelmer Kuperus Date: 2018-03-02T09:17:54Z [FLINK-8828] [stream, dataset, scala] Introduce collect method ---
[jira] [Updated] (FLINK-8799) Make AbstractYarnClusterDescriptor immutable
[ https://issues.apache.org/jira/browse/FLINK-8799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-8799: Issue Type: Improvement (was: Bug) > Make AbstractYarnClusterDescriptor immutable > > > Key: FLINK-8799 > URL: https://issues.apache.org/jira/browse/FLINK-8799 > Project: Flink > Issue Type: Improvement > Components: YARN >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: vinoyang >Priority: Major > Fix For: 1.6.0 > > > {{AbstractYarnClusterDescriptor}} should be made immutable. Currently, its > internal configuration is modified from different places which makes it > difficult to reason about the code. For example, it should not be possible to > modify the {{zookeeperNamespace}} using a setter method. A user of this class > should be forced to provide all information prior to creating the instance, > e.g., by passing a {{org.apache.flink.configuration.Configuration}} object. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5617: [FLINK-8799][YARN] Make AbstractYarnClusterDescrip...
GitHub user yanghua opened a pull request: https://github.com/apache/flink/pull/5617 [FLINK-8799][YARN] Make AbstractYarnClusterDescriptor immutable ## What is the purpose of the change *This pull request Make AbstractYarnClusterDescriptor immutable* ## Brief change log - *removed or closed some setter accessor in class `AbstractYarnClusterDescriptor`* - *deleted some set property code and replaced with adding option to `Configuration` instance* - *fetch the config item from `Configuration` and init the field for `AbstractYarnClusterDescriptor`* - *add some config to `YarnConfigOptions`* - *fixed some old test cast and some new test case for refactored config properties* ## Verifying this change This change added tests and can be verified as follows: - *fixed some old test cast and some new test case for refactored config properties such as flink jar path and name and so on* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? ( no) - If yes, how is the feature documented? (not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yanghua/flink FLINK-8799 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5617.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5617 commit f04f8d68a0859923dcdba594ce22b5f420305df5 Author: vinoyang Date: 2018-03-02T09:22:54Z [FLINK-8799][YARN] Make AbstractYarnClusterDescriptor immutable ---
[jira] [Commented] (FLINK-8799) Make AbstractYarnClusterDescriptor immutable
[ https://issues.apache.org/jira/browse/FLINK-8799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383393#comment-16383393 ] ASF GitHub Bot commented on FLINK-8799: --- GitHub user yanghua opened a pull request: https://github.com/apache/flink/pull/5617 [FLINK-8799][YARN] Make AbstractYarnClusterDescriptor immutable ## What is the purpose of the change *This pull request Make AbstractYarnClusterDescriptor immutable* ## Brief change log - *removed or closed some setter accessor in class `AbstractYarnClusterDescriptor`* - *deleted some set property code and replaced with adding option to `Configuration` instance* - *fetch the config item from `Configuration` and init the field for `AbstractYarnClusterDescriptor`* - *add some config to `YarnConfigOptions`* - *fixed some old test cast and some new test case for refactored config properties* ## Verifying this change This change added tests and can be verified as follows: - *fixed some old test cast and some new test case for refactored config properties such as flink jar path and name and so on* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? ( no) - If yes, how is the feature documented? (not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yanghua/flink FLINK-8799 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5617.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5617 commit f04f8d68a0859923dcdba594ce22b5f420305df5 Author: vinoyang Date: 2018-03-02T09:22:54Z [FLINK-8799][YARN] Make AbstractYarnClusterDescriptor immutable > Make AbstractYarnClusterDescriptor immutable > > > Key: FLINK-8799 > URL: https://issues.apache.org/jira/browse/FLINK-8799 > Project: Flink > Issue Type: Improvement > Components: YARN >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: vinoyang >Priority: Major > Fix For: 1.6.0 > > > {{AbstractYarnClusterDescriptor}} should be made immutable. Currently, its > internal configuration is modified from different places which makes it > difficult to reason about the code. For example, it should not be possible to > modify the {{zookeeperNamespace}} using a setter method. A user of this class > should be forced to provide all information prior to creating the instance, > e.g., by passing a {{org.apache.flink.configuration.Configuration}} object. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-8830) YarnResourceManager throws NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-8830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-8830: --- Assignee: vinoyang > YarnResourceManager throws NullPointerException > --- > > Key: FLINK-8830 > URL: https://issues.apache.org/jira/browse/FLINK-8830 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: vinoyang >Priority: Major > > > {code:java} > java.lang.NullPointerException > at java.io.File.(File.java:277) > at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:502) > at > org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:445) > at > org.apache.flink.yarn.YarnResourceManager.onContainersAllocated(YarnResourceManager.java:338) > at > org.apache.flink.yarn.YarnResourceManagerTest$1.(YarnResourceManagerTest.java:340) > at > org.apache.flink.yarn.YarnResourceManagerTest.testStopWorker(YarnResourceManagerTest.java:317) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > > This exception is being thrown in > `org.apache.flink.yarn.YarnResourceManagerTest#testStopWorker`. Exception > apparently is being ignored, since the test completes. It seems like this > line: > {code:java} > String fileLocation = > System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION){code} > Is not guarded against returned null value. I don't know if that's a test or > production code issue. > CC [~till.rohrmann] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8834) Job fails to restart due to some tasks stuck in cancelling state
Daniel Harper created FLINK-8834: Summary: Job fails to restart due to some tasks stuck in cancelling state Key: FLINK-8834 URL: https://issues.apache.org/jira/browse/FLINK-8834 Project: Flink Issue Type: Bug Affects Versions: 1.4.0 Environment: EMR 5.12 Flink 1.4.0 Beam 2.3.0 Reporter: Daniel Harper Our job threw an exception overnight, causing the job to commence attempting a restart. However it never managed to restart because 2 tasks are stuck in "Cancelling" state, with the following exception {code:java} 2018-03-02 02:29:31,604 WARN org.apache.flink.runtime.taskmanager.Task - Task 'PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> ParDoTranslation.RawParDo -> uk.co.bbc.sawmill.streaming.pipeline.output.io.file.WriteWindowToFile-RDotRecord/TextIO.Write/WriteFiles/GatherTempFileResults/Reshuffle/Window.Into()/Window.Assign.out -> ParDoTranslation.RawParDo -> ToKeyedWorkItem (24/32)' did not react to cancelling signal, but is stuck in method: java.lang.Thread.blockedOn(Thread.java:239) java.lang.System$2.blockedOn(System.java:1252) java.nio.channels.spi.AbstractInterruptibleChannel.blockedOn(AbstractInterruptibleChannel.java:211) java.nio.channels.spi.AbstractInterruptibleChannel.begin(AbstractInterruptibleChannel.java:170) java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:457) java.nio.channels.Channels.writeFullyImpl(Channels.java:78) java.nio.channels.Channels.writeFully(Channels.java:101) java.nio.channels.Channels.access$000(Channels.java:61) java.nio.channels.Channels$1.write(Channels.java:174) java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253) java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211) java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145) java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458) java.nio.channels.Channels.writeFullyImpl(Channels.java:78) java.nio.channels.Channels.writeFully(Channels.java:101) java.nio.channels.Channels.access$000(Channels.java:61) java.nio.channels.Channels$1.write(Channels.java:174) sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125) sun.nio.cs.StreamEncoder.write(StreamEncoder.java:135) java.io.OutputStreamWriter.write(OutputStreamWriter.java:220) java.io.Writer.write(Writer.java:157) org.apache.beam.sdk.io.TextSink$TextWriter.writeLine(TextSink.java:102) org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:118) org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:76) org.apache.beam.sdk.io.WriteFiles.writeOrClose(WriteFiles.java:550) org.apache.beam.sdk.io.WriteFiles.access$1000(WriteFiles.java:112) org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:718) org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138) org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65) org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:425) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:888) org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:865) org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:94) org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:87) org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1040) org.apache.beam.runners.core.ReduceFnRunner$$Lambda$163/1408647946.output(Unknown Source) org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:433) org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceF
[jira] [Updated] (FLINK-8834) Job fails to restart due to some tasks stuck in cancelling state
[ https://issues.apache.org/jira/browse/FLINK-8834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Harper updated FLINK-8834: - Environment: AWS EMR 5.12 Flink 1.4.0 Beam 2.3.0 was: EMR 5.12 Flink 1.4.0 Beam 2.3.0 > Job fails to restart due to some tasks stuck in cancelling state > > > Key: FLINK-8834 > URL: https://issues.apache.org/jira/browse/FLINK-8834 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 > Environment: AWS EMR 5.12 > Flink 1.4.0 > Beam 2.3.0 >Reporter: Daniel Harper >Priority: Major > > Our job threw an exception overnight, causing the job to commence attempting > a restart. > However it never managed to restart because 2 tasks are stuck in "Cancelling" > state, with the following exception > {code:java} > 2018-03-02 02:29:31,604 WARN org.apache.flink.runtime.taskmanager.Task > - Task 'PTransformTranslation.UnknownRawPTransform -> > ParDoTranslation.RawParDo -> ParDoTranslation.RawParDo -> > uk.co.bbc.sawmill.streaming.pipeline.output.io.file.WriteWindowToFile-RDotRecord/TextIO.Write/WriteFiles/GatherTempFileResults/Reshuffle/Window.Into()/Window.Assign.out > -> ParDoTranslation.RawParDo -> ToKeyedWorkItem (24/32)' did not react to > cancelling signal, but is stuck in method: > java.lang.Thread.blockedOn(Thread.java:239) > java.lang.System$2.blockedOn(System.java:1252) > java.nio.channels.spi.AbstractInterruptibleChannel.blockedOn(AbstractInterruptibleChannel.java:211) > java.nio.channels.spi.AbstractInterruptibleChannel.begin(AbstractInterruptibleChannel.java:170) > java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:457) > java.nio.channels.Channels.writeFullyImpl(Channels.java:78) > java.nio.channels.Channels.writeFully(Channels.java:101) > java.nio.channels.Channels.access$000(Channels.java:61) > java.nio.channels.Channels$1.write(Channels.java:174) > java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253) > java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211) > java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145) > java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458) > java.nio.channels.Channels.writeFullyImpl(Channels.java:78) > java.nio.channels.Channels.writeFully(Channels.java:101) > java.nio.channels.Channels.access$000(Channels.java:61) > java.nio.channels.Channels$1.write(Channels.java:174) > sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) > sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125) > sun.nio.cs.StreamEncoder.write(StreamEncoder.java:135) > java.io.OutputStreamWriter.write(OutputStreamWriter.java:220) > java.io.Writer.write(Writer.java:157) > org.apache.beam.sdk.io.TextSink$TextWriter.writeLine(TextSink.java:102) > org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:118) > org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:76) > org.apache.beam.sdk.io.WriteFiles.writeOrClose(WriteFiles.java:550) > org.apache.beam.sdk.io.WriteFiles.access$1000(WriteFiles.java:112) > org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:718) > org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown > Source) > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138) > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65) > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:425) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:888) > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:865) > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:94) > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.out
[jira] [Updated] (FLINK-8834) Job fails to restart due to some tasks stuck in cancelling state
[ https://issues.apache.org/jira/browse/FLINK-8834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Harper updated FLINK-8834: - Description: Our job threw an exception overnight, causing the job to commence attempting a restart. However it never managed to restart because 2 tasks on one of the Task Managers are stuck in "Cancelling" state, with the following exception {code:java} 2018-03-02 02:29:31,604 WARN org.apache.flink.runtime.taskmanager.Task - Task 'PTransformTranslation.UnknownRawPTransform -> ParDoTranslation.RawParDo -> ParDoTranslation.RawParDo -> uk.co.bbc.sawmill.streaming.pipeline.output.io.file.WriteWindowToFile-RDotRecord/TextIO.Write/WriteFiles/GatherTempFileResults/Reshuffle/Window.Into()/Window.Assign.out -> ParDoTranslation.RawParDo -> ToKeyedWorkItem (24/32)' did not react to cancelling signal, but is stuck in method: java.lang.Thread.blockedOn(Thread.java:239) java.lang.System$2.blockedOn(System.java:1252) java.nio.channels.spi.AbstractInterruptibleChannel.blockedOn(AbstractInterruptibleChannel.java:211) java.nio.channels.spi.AbstractInterruptibleChannel.begin(AbstractInterruptibleChannel.java:170) java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:457) java.nio.channels.Channels.writeFullyImpl(Channels.java:78) java.nio.channels.Channels.writeFully(Channels.java:101) java.nio.channels.Channels.access$000(Channels.java:61) java.nio.channels.Channels$1.write(Channels.java:174) java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253) java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211) java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145) java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458) java.nio.channels.Channels.writeFullyImpl(Channels.java:78) java.nio.channels.Channels.writeFully(Channels.java:101) java.nio.channels.Channels.access$000(Channels.java:61) java.nio.channels.Channels$1.write(Channels.java:174) sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125) sun.nio.cs.StreamEncoder.write(StreamEncoder.java:135) java.io.OutputStreamWriter.write(OutputStreamWriter.java:220) java.io.Writer.write(Writer.java:157) org.apache.beam.sdk.io.TextSink$TextWriter.writeLine(TextSink.java:102) org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:118) org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:76) org.apache.beam.sdk.io.WriteFiles.writeOrClose(WriteFiles.java:550) org.apache.beam.sdk.io.WriteFiles.access$1000(WriteFiles.java:112) org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:718) org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source) org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138) org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65) org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:425) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:888) org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:865) org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:94) org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:87) org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1040) org.apache.beam.runners.core.ReduceFnRunner$$Lambda$163/1408647946.output(Unknown Source) org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:433) org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:127) org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1043) org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:911) org.apache.beam.runners.core.Red
[jira] [Closed] (FLINK-8751) Canceling a job results in a InterruptedException in the TM
[ https://issues.apache.org/jira/browse/FLINK-8751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Richter closed FLINK-8751. - Resolution: Fixed Assignee: Stefan Richter Fix Version/s: 1.5.0 Fixed in f9a583b. > Canceling a job results in a InterruptedException in the TM > --- > > Key: FLINK-8751 > URL: https://issues.apache.org/jira/browse/FLINK-8751 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.4.1 >Reporter: Elias Levy >Assignee: Stefan Richter >Priority: Major > Fix For: 1.5.0 > > > Canceling a job results in the following exception reported by the TM: > {code:java} > ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - Could > not shut down timer service java.lang.InterruptedException > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown > Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown > Source) > at > org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Unknown Source){code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5607: [hotfix][docs] Drop the incorrect parallel remark in wind...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/5607 Thanks :) ---
[jira] [Commented] (FLINK-8830) YarnResourceManager throws NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-8830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383435#comment-16383435 ] Till Rohrmann commented on FLINK-8830: -- Looks like a bug. We should add a guard. > YarnResourceManager throws NullPointerException > --- > > Key: FLINK-8830 > URL: https://issues.apache.org/jira/browse/FLINK-8830 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: vinoyang >Priority: Major > > > {code:java} > java.lang.NullPointerException > at java.io.File.(File.java:277) > at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:502) > at > org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:445) > at > org.apache.flink.yarn.YarnResourceManager.onContainersAllocated(YarnResourceManager.java:338) > at > org.apache.flink.yarn.YarnResourceManagerTest$1.(YarnResourceManagerTest.java:340) > at > org.apache.flink.yarn.YarnResourceManagerTest.testStopWorker(YarnResourceManagerTest.java:317) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > > This exception is being thrown in > `org.apache.flink.yarn.YarnResourceManagerTest#testStopWorker`. Exception > apparently is being ignored, since the test completes. It seems like this > line: > {code:java} > String fileLocation = > System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION){code} > Is not guarded against returned null value. I don't know if that's a test or > production code issue. > CC [~till.rohrmann] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5600: [FLINK-8811] [flip6] Add initial implementation of...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5600#discussion_r171814808 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java --- @@ -178,6 +181,13 @@ public URI getRestAddress() { } } + public HighAvailabilityServices getHighAvailabilityServices() { + synchronized (lock) { + checkState(running, "MiniCluster is not yet running."); --- End diff -- Because all state accesses to the MiniCluster are guarded since it can be used by potentially multiple threads. ---
[jira] [Commented] (FLINK-8811) Add MiniClusterClient to allow fast MiniCluster operations
[ https://issues.apache.org/jira/browse/FLINK-8811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383436#comment-16383436 ] ASF GitHub Bot commented on FLINK-8811: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5600#discussion_r171814808 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java --- @@ -178,6 +181,13 @@ public URI getRestAddress() { } } + public HighAvailabilityServices getHighAvailabilityServices() { + synchronized (lock) { + checkState(running, "MiniCluster is not yet running."); --- End diff -- Because all state accesses to the MiniCluster are guarded since it can be used by potentially multiple threads. > Add MiniClusterClient to allow fast MiniCluster operations > -- > > Key: FLINK-8811 > URL: https://issues.apache.org/jira/browse/FLINK-8811 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Labels: flip-6 > Fix For: 1.5.0, 1.6.0 > > > We should offer a {{ClusterClient}} implementation for the {{MiniCluster}}. > That way we would be able to submit and wait for result without polling how > it would be the case by using the {{RestClusterClient}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8830) YarnResourceManager throws NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-8830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383435#comment-16383435 ] Till Rohrmann edited comment on FLINK-8830 at 3/2/18 10:39 AM: --- Looks like a bug. We should add a guard. The good thing is that it happens in a {{try-catch}} block. Therefore, it won't crash the system. was (Author: till.rohrmann): Looks like a bug. We should add a guard. > YarnResourceManager throws NullPointerException > --- > > Key: FLINK-8830 > URL: https://issues.apache.org/jira/browse/FLINK-8830 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: vinoyang >Priority: Major > > > {code:java} > java.lang.NullPointerException > at java.io.File.(File.java:277) > at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:502) > at > org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:445) > at > org.apache.flink.yarn.YarnResourceManager.onContainersAllocated(YarnResourceManager.java:338) > at > org.apache.flink.yarn.YarnResourceManagerTest$1.(YarnResourceManagerTest.java:340) > at > org.apache.flink.yarn.YarnResourceManagerTest.testStopWorker(YarnResourceManagerTest.java:317) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {code} > > This exception is being thrown in > `org.apache.flink.yarn.YarnResourceManagerTest#testStopWorker`. Exception > apparently is being ignored, since the test completes. It seems like this > line: > {code:java} > String fileLocation = > System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION){code} > Is not guarded against returned null value. I don't know if that's a test or > production code issue. > CC [~till.rohrmann] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-6968) Store streaming, updating tables with unique key in queryable state
[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Renjie Liu reassigned FLINK-6968: - Assignee: Renjie Liu > Store streaming, updating tables with unique key in queryable state > --- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Renjie Liu >Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state
[ https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383441#comment-16383441 ] Piotr Nowojski commented on FLINK-8794: --- The temporary data is already separated from the final output - it's in different files. If we allow for different directory that should be already enough. Besides, writing to local disks would decrease performance, since you would need to write the same data twice (first locally then copy remotely, which is unnecessary, while moving files between directories is cheap) and stil "pending" files would have to be copied to remote location, since in some cases "pending" files are committed during recovery. Thus it wouldn't solve your problem. > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state > > > Key: FLINK-8794 > URL: https://issues.apache.org/jira/browse/FLINK-8794 > Project: Flink > Issue Type: Bug > Components: filesystem-connector >Affects Versions: 1.4.0, 1.4.1 >Reporter: yanxiaobin >Priority: Major > > When using BucketingSink, it happens that one of the files is always in the > [.in-progress] state. And this state has never changed after that. The > underlying use of S3 as storage. > > {code:java} > // code placeholder > {code} > 2018-02-28 11:58:42 147341619 {color:#d04437}_part-28-0.in-progress{color} > 2018-02-28 12:06:27 147315059 part-0-0 > 2018-02-28 12:06:27 147462359 part-1-0 > 2018-02-28 12:06:27 147316006 part-10-0 > 2018-02-28 12:06:28 147349854 part-100-0 > 2018-02-28 12:06:27 147421625 part-101-0 > 2018-02-28 12:06:27 147443830 part-102-0 > 2018-02-28 12:06:27 147372801 part-103-0 > 2018-02-28 12:06:27 147343670 part-104-0 > .. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5621) Flink should provide a mechanism to prevent scheduling tasks on TaskManagers with operational issues
[ https://issues.apache.org/jira/browse/FLINK-5621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383439#comment-16383439 ] Till Rohrmann commented on FLINK-5621: -- Hi [~yanghua], I think such a feature would indeed be a nice addition for Flink. Black-listing TMs with known issues could be done in the {{ResourceManager}}. We could also add a RPC call which tells the {{TMs}} to shut down in such a case. > Flink should provide a mechanism to prevent scheduling tasks on TaskManagers > with operational issues > > > Key: FLINK-5621 > URL: https://issues.apache.org/jira/browse/FLINK-5621 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.1.4 >Reporter: Jamie Grier >Priority: Critical > > There are cases where jobs can get into a state where no progress can be made > if there is something pathologically wrong with one of the TaskManager nodes > in the cluster. > An example of this would be a TaskManager on a machine that runs out of disk > space. Flink never considers the TM to be "bad" and will keep using it to > attempt to run tasks -- which will continue to fail. > A suggestion for overcoming this would be to allow an option where a TM will > commit suicide if that TM was the source of an exception that caused a job to > fail/restart. > I'm sure there are plenty of other approaches to solving this.. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-5206) Flakey PythonPlanBinderTest
[ https://issues.apache.org/jira/browse/FLINK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reopened FLINK-5206: -- It seems to happen again with the following exception. {code:java} java.io.IOException: Mkdirs failed to create /tmp/flink at org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:271) at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121) at org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:248) at org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:88) at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:202) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748){code} https://api.travis-ci.org/v3/job/347676340/log.txt > Flakey PythonPlanBinderTest > --- > > Key: FLINK-5206 > URL: https://issues.apache.org/jira/browse/FLINK-5206 > Project: Flink > Issue Type: Bug > Components: Python API >Affects Versions: 1.2.0 > Environment: in TravisCI >Reporter: Nico Kruber >Assignee: Chesnay Schepler >Priority: Major > Labels: test-stability > Fix For: 1.2.0 > > > {code:none} > --- > T E S T S > --- > Running org.apache.flink.python.api.PythonPlanBinderTest > Job execution failed. > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:903) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:846) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:846) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.io.IOException: Output path '/tmp/flink/result2' could not be > initialized. Canceling task... > at > org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:233) > at > org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:78) > at > org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:178) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654) > at java.lang.Thread.run(Thread.java:745) > Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 36.891 sec > <<< FAILURE! - in org.apache.flink.python.api.PythonPlanBinderTest > testJobWithoutObjectReuse(org.apache.flink.python.api.PythonPlanBinderTest) > Time elapsed: 11.53 sec <<< FAILURE! > java.lang.AssertionError: Error while calling the test program: Job execution > failed. > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:180) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRun
[jira] [Updated] (FLINK-5206) Flakey PythonPlanBinderTest
[ https://issues.apache.org/jira/browse/FLINK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-5206: - Affects Version/s: 1.6.0 > Flakey PythonPlanBinderTest > --- > > Key: FLINK-5206 > URL: https://issues.apache.org/jira/browse/FLINK-5206 > Project: Flink > Issue Type: Bug > Components: Python API >Affects Versions: 1.2.0, 1.5.0, 1.6.0 > Environment: in TravisCI >Reporter: Nico Kruber >Assignee: Chesnay Schepler >Priority: Major > Labels: test-stability > Fix For: 1.2.0 > > > {code:none} > --- > T E S T S > --- > Running org.apache.flink.python.api.PythonPlanBinderTest > Job execution failed. > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:903) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:846) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:846) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.io.IOException: Output path '/tmp/flink/result2' could not be > initialized. Canceling task... > at > org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:233) > at > org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:78) > at > org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:178) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654) > at java.lang.Thread.run(Thread.java:745) > Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 36.891 sec > <<< FAILURE! - in org.apache.flink.python.api.PythonPlanBinderTest > testJobWithoutObjectReuse(org.apache.flink.python.api.PythonPlanBinderTest) > Time elapsed: 11.53 sec <<< FAILURE! > java.lang.AssertionError: Error while calling the test program: Job execution > failed. > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:180) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) > at > org.a
[jira] [Updated] (FLINK-5206) Flakey PythonPlanBinderTest
[ https://issues.apache.org/jira/browse/FLINK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-5206: - Affects Version/s: 1.5.0 > Flakey PythonPlanBinderTest > --- > > Key: FLINK-5206 > URL: https://issues.apache.org/jira/browse/FLINK-5206 > Project: Flink > Issue Type: Bug > Components: Python API >Affects Versions: 1.2.0, 1.5.0, 1.6.0 > Environment: in TravisCI >Reporter: Nico Kruber >Assignee: Chesnay Schepler >Priority: Critical > Labels: test-stability > Fix For: 1.2.0 > > > {code:none} > --- > T E S T S > --- > Running org.apache.flink.python.api.PythonPlanBinderTest > Job execution failed. > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:903) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:846) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:846) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.io.IOException: Output path '/tmp/flink/result2' could not be > initialized. Canceling task... > at > org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:233) > at > org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:78) > at > org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:178) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654) > at java.lang.Thread.run(Thread.java:745) > Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 36.891 sec > <<< FAILURE! - in org.apache.flink.python.api.PythonPlanBinderTest > testJobWithoutObjectReuse(org.apache.flink.python.api.PythonPlanBinderTest) > Time elapsed: 11.53 sec <<< FAILURE! > java.lang.AssertionError: Error while calling the test program: Job execution > failed. > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:180) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) > at > or
[jira] [Updated] (FLINK-5206) Flakey PythonPlanBinderTest
[ https://issues.apache.org/jira/browse/FLINK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-5206: - Priority: Critical (was: Major) > Flakey PythonPlanBinderTest > --- > > Key: FLINK-5206 > URL: https://issues.apache.org/jira/browse/FLINK-5206 > Project: Flink > Issue Type: Bug > Components: Python API >Affects Versions: 1.2.0, 1.5.0, 1.6.0 > Environment: in TravisCI >Reporter: Nico Kruber >Assignee: Chesnay Schepler >Priority: Critical > Labels: test-stability > Fix For: 1.2.0 > > > {code:none} > --- > T E S T S > --- > Running org.apache.flink.python.api.PythonPlanBinderTest > Job execution failed. > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:903) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:846) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:846) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.io.IOException: Output path '/tmp/flink/result2' could not be > initialized. Canceling task... > at > org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:233) > at > org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:78) > at > org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:178) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654) > at java.lang.Thread.run(Thread.java:745) > Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 36.891 sec > <<< FAILURE! - in org.apache.flink.python.api.PythonPlanBinderTest > testJobWithoutObjectReuse(org.apache.flink.python.api.PythonPlanBinderTest) > Time elapsed: 11.53 sec <<< FAILURE! > java.lang.AssertionError: Error while calling the test program: Job execution > failed. > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:180) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) >
[GitHub] flink issue #5394: [FLINK-6571][tests] Catch InterruptedException in StreamS...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5394 I have often handled it like one of the below variants. What do you think about that pattern? ### Variant 1: Handle interruption if still running ```java public void run(SourceContext ctx) throws Exception { while (running) { try { // do stuff Thread.sleep(20); } catch (InterruptedException e) { // restore interruption flag Thread.currentThread().interrupt(); if (running) { throw new FlinkException("interrupted while still running", e); } // else fall through the loop } } ``` ### Variant 2: Simple let InterruptedException bubble out This variant is also fine, because the Task status is set to CANCELED before the interruption, so any exception bubbling out be suppresses. ```java public void run(SourceContext ctx) throws Exception { while (running) { // do stuff // the InterruptedException from here simply fails the execution Thread.sleep(20); } } ``` ---
[jira] [Commented] (FLINK-6571) InfiniteSource in SourceStreamOperatorTest should deal with InterruptedExceptions
[ https://issues.apache.org/jira/browse/FLINK-6571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383448#comment-16383448 ] ASF GitHub Bot commented on FLINK-6571: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5394 I have often handled it like one of the below variants. What do you think about that pattern? ### Variant 1: Handle interruption if still running ```java public void run(SourceContext ctx) throws Exception { while (running) { try { // do stuff Thread.sleep(20); } catch (InterruptedException e) { // restore interruption flag Thread.currentThread().interrupt(); if (running) { throw new FlinkException("interrupted while still running", e); } // else fall through the loop } } ``` ### Variant 2: Simple let InterruptedException bubble out This variant is also fine, because the Task status is set to CANCELED before the interruption, so any exception bubbling out be suppresses. ```java public void run(SourceContext ctx) throws Exception { while (running) { // do stuff // the InterruptedException from here simply fails the execution Thread.sleep(20); } } ``` > InfiniteSource in SourceStreamOperatorTest should deal with > InterruptedExceptions > - > > Key: FLINK-6571 > URL: https://issues.apache.org/jira/browse/FLINK-6571 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.3.0, 1.4.0, 1.5.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0 > > > So this is a new one: i got a failing test > ({{testNoMaxWatermarkOnAsyncStop}}) due to an uncatched InterruptedException. > {code} > [00:28:15] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: > 0.828 sec <<< FAILURE! - in > org.apache.flink.streaming.runtime.operators.StreamSourceOperatorTest > [00:28:15] > testNoMaxWatermarkOnAsyncStop(org.apache.flink.streaming.runtime.operators.StreamSourceOperatorTest) > Time elapsed: 0 sec <<< ERROR! > [00:28:15] java.lang.InterruptedException: sleep interrupted > [00:28:15]at java.lang.Thread.sleep(Native Method) > [00:28:15]at > org.apache.flink.streaming.runtime.operators.StreamSourceOperatorTest$InfiniteSource.run(StreamSourceOperatorTest.java:343) > [00:28:15]at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) > [00:28:15]at > org.apache.flink.streaming.runtime.operators.StreamSourceOperatorTest.testNoMaxWatermarkOnAsyncStop(StreamSourceOperatorTest.java:176) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6895) Add STR_TO_DATE supported in SQL
[ https://issues.apache.org/jira/browse/FLINK-6895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383450#comment-16383450 ] ASF GitHub Bot commented on FLINK-6895: --- Github user buptljy closed the pull request at: https://github.com/apache/flink/pull/5615 > Add STR_TO_DATE supported in SQL > > > Key: FLINK-6895 > URL: https://issues.apache.org/jira/browse/FLINK-6895 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: Aegeaner >Priority: Major > > STR_TO_DATE(str,format) This is the inverse of the DATE_FORMAT() function. It > takes a string str and a format string format. STR_TO_DATE() returns a > DATETIME value if the format string contains both date and time parts, or a > DATE or TIME value if the string contains only date or time parts. If the > date, time, or datetime value extracted from str is illegal, STR_TO_DATE() > returns NULL and produces a warning. > * Syntax: > STR_TO_DATE(str,format) > * Arguments > **str: - > **format: - > * Return Types > DATAETIME/DATE/TIME > * Example: > STR_TO_DATE('01,5,2013','%d,%m,%Y') -> '2013-05-01' > SELECT STR_TO_DATE('a09:30:17','a%h:%i:%s') -> '09:30:17' > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_str-to-date] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5615: [FLINK-6895][table]Add STR_TO_DATE supported in SQ...
Github user buptljy closed the pull request at: https://github.com/apache/flink/pull/5615 ---
[GitHub] flink pull request #5618: [FLINK-6895][table]Add STR_TO_DATE supported in SQ...
GitHub user buptljy opened a pull request: https://github.com/apache/flink/pull/5618 [FLINK-6895][table]Add STR_TO_DATE supported in SQL ## What is the purpose of the change Add STR_TO_DATE Function supported in SQL ## Brief change log * STR_TO_DATE(str string, format string) \- str is the string that need to be transformed. \- format is the pattern of "str" * Add tests in ScalarFunctionsTest.scala * Add docs in sql.md ## Verifying this change * Run unit tests in ScalarFunctionsTest.scala ## Does this pull request potentially affect one of the following parts: * A new sql function ## Documentation * Add docs in sql.md You can merge this pull request into a Git repository by running: $ git pull https://github.com/buptljy/flink str_to_date Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5618.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5618 commit 59752143ee438cb11969ae4bdda1fac5fc32813c Author: Liao Jiayi Date: 2018-03-01T11:58:08Z add str_to_date sql function commit 63f71e4b3d6378f2114aa04ba4d1128f1ec3bc38 Author: Liao Jiayi Date: 2018-03-01T11:58:41Z Merge branch 'master' of github.com:apache/flink commit 3ec6d2ec487151928032d144de94ca9113d63f01 Author: Liao Jiayi Date: 2018-03-01T15:30:04Z fix checkstyle error ---
[GitHub] flink issue #5618: [FLINK-6895][table]Add STR_TO_DATE supported in SQL
Github user buptljy commented on the issue: https://github.com/apache/flink/pull/5618 Actually I am not sure if it is appropriate to return "string" because it should be the inverse of the "DATE_FORMAT()". However, If I return DATE/TIME/DATETIME as the jira issue described, the type of data user receives will be uncertain in one of DATE/TIME/DATETIME. Do you have some good ideas ? I will optimize it. ---
[jira] [Commented] (FLINK-6895) Add STR_TO_DATE supported in SQL
[ https://issues.apache.org/jira/browse/FLINK-6895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383451#comment-16383451 ] ASF GitHub Bot commented on FLINK-6895: --- GitHub user buptljy opened a pull request: https://github.com/apache/flink/pull/5618 [FLINK-6895][table]Add STR_TO_DATE supported in SQL ## What is the purpose of the change Add STR_TO_DATE Function supported in SQL ## Brief change log * STR_TO_DATE(str string, format string) \- str is the string that need to be transformed. \- format is the pattern of "str" * Add tests in ScalarFunctionsTest.scala * Add docs in sql.md ## Verifying this change * Run unit tests in ScalarFunctionsTest.scala ## Does this pull request potentially affect one of the following parts: * A new sql function ## Documentation * Add docs in sql.md You can merge this pull request into a Git repository by running: $ git pull https://github.com/buptljy/flink str_to_date Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5618.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5618 commit 59752143ee438cb11969ae4bdda1fac5fc32813c Author: Liao Jiayi Date: 2018-03-01T11:58:08Z add str_to_date sql function commit 63f71e4b3d6378f2114aa04ba4d1128f1ec3bc38 Author: Liao Jiayi Date: 2018-03-01T11:58:41Z Merge branch 'master' of github.com:apache/flink commit 3ec6d2ec487151928032d144de94ca9113d63f01 Author: Liao Jiayi Date: 2018-03-01T15:30:04Z fix checkstyle error > Add STR_TO_DATE supported in SQL > > > Key: FLINK-6895 > URL: https://issues.apache.org/jira/browse/FLINK-6895 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: Aegeaner >Priority: Major > > STR_TO_DATE(str,format) This is the inverse of the DATE_FORMAT() function. It > takes a string str and a format string format. STR_TO_DATE() returns a > DATETIME value if the format string contains both date and time parts, or a > DATE or TIME value if the string contains only date or time parts. If the > date, time, or datetime value extracted from str is illegal, STR_TO_DATE() > returns NULL and produces a warning. > * Syntax: > STR_TO_DATE(str,format) > * Arguments > **str: - > **format: - > * Return Types > DATAETIME/DATE/TIME > * Example: > STR_TO_DATE('01,5,2013','%d,%m,%Y') -> '2013-05-01' > SELECT STR_TO_DATE('a09:30:17','a%h:%i:%s') -> '09:30:17' > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_str-to-date] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6895) Add STR_TO_DATE supported in SQL
[ https://issues.apache.org/jira/browse/FLINK-6895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383452#comment-16383452 ] ASF GitHub Bot commented on FLINK-6895: --- Github user buptljy commented on the issue: https://github.com/apache/flink/pull/5618 Actually I am not sure if it is appropriate to return "string" because it should be the inverse of the "DATE_FORMAT()". However, If I return DATE/TIME/DATETIME as the jira issue described, the type of data user receives will be uncertain in one of DATE/TIME/DATETIME. Do you have some good ideas ? I will optimize it. > Add STR_TO_DATE supported in SQL > > > Key: FLINK-6895 > URL: https://issues.apache.org/jira/browse/FLINK-6895 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: Aegeaner >Priority: Major > > STR_TO_DATE(str,format) This is the inverse of the DATE_FORMAT() function. It > takes a string str and a format string format. STR_TO_DATE() returns a > DATETIME value if the format string contains both date and time parts, or a > DATE or TIME value if the string contains only date or time parts. If the > date, time, or datetime value extracted from str is illegal, STR_TO_DATE() > returns NULL and produces a warning. > * Syntax: > STR_TO_DATE(str,format) > * Arguments > **str: - > **format: - > * Return Types > DATAETIME/DATE/TIME > * Example: > STR_TO_DATE('01,5,2013','%d,%m,%Y') -> '2013-05-01' > SELECT STR_TO_DATE('a09:30:17','a%h:%i:%s') -> '09:30:17' > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_str-to-date] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8811) Add MiniClusterClient to allow fast MiniCluster operations
[ https://issues.apache.org/jira/browse/FLINK-8811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8811. Resolution: Fixed Fixed with 1.6.0: 19a8d2ff361be8797d79074b39afb09d51cca671 96a176ac03bb1d188e173ba3bf14c29259d33377 8039464df8f315b1fd06831e11dfc2ef4466b888 1.5.0: f30ca2101f2151214b138f37f472f886fbdfd9f0 19e4f68ba9cfbf5d0f54b325db4c5d196d262d09 29b34e2255c41abc1c7c4af8ab268a39df57f0ff > Add MiniClusterClient to allow fast MiniCluster operations > -- > > Key: FLINK-8811 > URL: https://issues.apache.org/jira/browse/FLINK-8811 > Project: Flink > Issue Type: Improvement > Components: Tests >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Critical > Labels: flip-6 > Fix For: 1.5.0, 1.6.0 > > > We should offer a {{ClusterClient}} implementation for the {{MiniCluster}}. > That way we would be able to submit and wait for result without polling how > it would be the case by using the {{RestClusterClient}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-8808) Enable RestClusterClient to submit jobs to local Dispatchers
[ https://issues.apache.org/jira/browse/FLINK-8808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-8808. Resolution: Fixed Fixed via 1.6.0: 63b3563e5404b89ad8fd20f1181b8247e7de3fa2 1.5.0: 02cb9e39cc19ebbf82f06d87035aead20780eb2b > Enable RestClusterClient to submit jobs to local Dispatchers > > > Key: FLINK-8808 > URL: https://issues.apache.org/jira/browse/FLINK-8808 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.5.0, 1.6.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0, 1.6.0 > > > The {{RestClusterClient}} should be able to submit a job to a {{Dispatcher}} > which runs in a local {{ActorSystem}} on the same host as the > {{RestClusterClient}}. This is the case for test cases. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8519) FileAlreadyExistsException on Start Flink Session
[ https://issues.apache.org/jira/browse/FLINK-8519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383469#comment-16383469 ] Nico Kruber commented on FLINK-8519: [~yew1eb], since I don't have your {{hadoop_user_login.sh}} script, I tried to reproduce the problem with the following command on AWS and did not see any exception in the logs or the command line outputs. {code} ./bin/yarn-session.sh -n 2 -nm job_name -jm 1024 -tm 4096 -s 4 -d {code} Can you try this command as well? Also, with your command (executed in a {{bash}} shell) and the given {{yarn.container-start-command-template}}, I got the following error: {code} Error: Could not find or load main class %-Xmx424m% {code} which indicates that the template may be wrong. Based on {{org.apache.flink.configuration.ConfigConstants#DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE}}, you should only use one percent sign. > FileAlreadyExistsException on Start Flink Session > -- > > Key: FLINK-8519 > URL: https://issues.apache.org/jira/browse/FLINK-8519 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0 >Reporter: Hai Zhou UTC+8 >Priority: Blocker > Fix For: 1.5.0 > > > *steps to reproduce:* > 1. build flink from source , git commit: c1734f4 > 2. run script: > source /path/hadoop/bin/hadoop_user_login.sh hadoop-launcher; > export YARN_CONF_DIR=/path/hadoop/etc/hadoop; > export HADOOP_CONF_DIR=/path/hadoop/etc/hadoop; > export JVM_ARGS="-Djava.security.krb5.conf=${HADOOP_CONF_DIR}/krb5.conf"; > /path/flink-1.5-SNAPSHOT/bin/yarn-session.sh -D > yarn.container-start-command-template="/usr/local/jdk1.8.0_112/bin/java > %%jvmmem%% %%jvmopts%% %%logging%% %%class%% %%args%% %%redirects%%" -n 4 -nm > job_name -qu root.rt.flink -jm 1024 -tm 4096 -s 4 -d > > *error infos:* > 2018-01-27 00:51:12,841 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli - > Error while running the Flink Yarn session. > java.lang.reflect.UndeclaredThrowableException > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1571) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:786) > Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: > Couldn't deploy Yarn session cluster > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:389) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:594) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:786) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) > ... 2 more > Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: Path /user > already exists as dir; cannot create link here > at org.apache.hadoop.fs.viewfs.InodeTree.createLink(InodeTree.java:244) > at org.apache.hadoop.fs.viewfs.InodeTree.(InodeTree.java:334) > at > org.apache.hadoop.fs.viewfs.ViewFileSystem$1.(ViewFileSystem.java:161) > at > org.apache.hadoop.fs.viewfs.ViewFileSystem.initialize(ViewFileSystem.java:161) > at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397) > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89) > at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431) > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:167) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:656) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:485) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:384) > ... 7 more -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8519) FileAlreadyExistsException on Start Flink Session
[ https://issues.apache.org/jira/browse/FLINK-8519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383469#comment-16383469 ] Nico Kruber edited comment on FLINK-8519 at 3/2/18 11:07 AM: - [~yew1eb], since I don't have your {{hadoop_user_login.sh}} script, I tried to reproduce the problem with the following command on AWS with Flink release-1.5 checkout of revision 347ec3848ec603ac452e394a5211cf888db6663f. I did not see any exception in the logs or the command line outputs. {code} ./bin/yarn-session.sh -n 2 -nm job_name -jm 1024 -tm 4096 -s 4 -d {code} Can you try this command as well? Also, with your command (executed in a {{bash}} shell) and the given {{yarn.container-start-command-template}}, I got the following error: {code} Error: Could not find or load main class %-Xmx424m% {code} which indicates that the template may be wrong. Based on {{org.apache.flink.configuration.ConfigConstants#DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE}}, you should only use one percent sign. was (Author: nicok): [~yew1eb], since I don't have your {{hadoop_user_login.sh}} script, I tried to reproduce the problem with the following command on AWS and did not see any exception in the logs or the command line outputs. {code} ./bin/yarn-session.sh -n 2 -nm job_name -jm 1024 -tm 4096 -s 4 -d {code} Can you try this command as well? Also, with your command (executed in a {{bash}} shell) and the given {{yarn.container-start-command-template}}, I got the following error: {code} Error: Could not find or load main class %-Xmx424m% {code} which indicates that the template may be wrong. Based on {{org.apache.flink.configuration.ConfigConstants#DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE}}, you should only use one percent sign. > FileAlreadyExistsException on Start Flink Session > -- > > Key: FLINK-8519 > URL: https://issues.apache.org/jira/browse/FLINK-8519 > Project: Flink > Issue Type: Bug > Components: YARN >Affects Versions: 1.5.0 >Reporter: Hai Zhou UTC+8 >Priority: Blocker > Fix For: 1.5.0 > > > *steps to reproduce:* > 1. build flink from source , git commit: c1734f4 > 2. run script: > source /path/hadoop/bin/hadoop_user_login.sh hadoop-launcher; > export YARN_CONF_DIR=/path/hadoop/etc/hadoop; > export HADOOP_CONF_DIR=/path/hadoop/etc/hadoop; > export JVM_ARGS="-Djava.security.krb5.conf=${HADOOP_CONF_DIR}/krb5.conf"; > /path/flink-1.5-SNAPSHOT/bin/yarn-session.sh -D > yarn.container-start-command-template="/usr/local/jdk1.8.0_112/bin/java > %%jvmmem%% %%jvmopts%% %%logging%% %%class%% %%args%% %%redirects%%" -n 4 -nm > job_name -qu root.rt.flink -jm 1024 -tm 4096 -s 4 -d > > *error infos:* > 2018-01-27 00:51:12,841 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli - > Error while running the Flink Yarn session. > java.lang.reflect.UndeclaredThrowableException > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1571) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:786) > Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: > Couldn't deploy Yarn session cluster > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:389) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:594) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:786) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556) > ... 2 more > Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: Path /user > already exists as dir; cannot create link here > at org.apache.hadoop.fs.viewfs.InodeTree.createLink(InodeTree.java:244) > at org.apache.hadoop.fs.viewfs.InodeTree.(InodeTree.java:334) > at > org.apache.hadoop.fs.viewfs.ViewFileSystem$1.(ViewFileSystem.java:161) > at > org.apache.hadoop.fs.viewfs.ViewFileSystem.initialize(ViewFileSystem.java:161) > at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397) > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89) > at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431) > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:167) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:656) >
[jira] [Commented] (FLINK-8813) AutoParallellismITCase fails with Flip6
[ https://issues.apache.org/jira/browse/FLINK-8813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383476#comment-16383476 ] Till Rohrmann commented on FLINK-8813: -- Auto max parallelism does not make much sense anymore in the Flip-6 world. Yarn clusters, for example, will no longer be started with a set of preallocated resources. Therefore, it is not clear what max parallelism would mean in such a case. I think we should discourage people from using it and instead motivate them to always specify the parallelism with which they want to run the job. As a fix for this concrete problem, we could say that in Flip-6 mode, the max parallelism is always {{1}}. What do you think? > AutoParallellismITCase fails with Flip6 > --- > > Key: FLINK-8813 > URL: https://issues.apache.org/jira/browse/FLINK-8813 > Project: Flink > Issue Type: Bug > Components: JobManager, Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > The {{AutoParallelismITCase}} fails when running against flip6. > ([https://travis-ci.org/zentol/flink/jobs/347373854)] > It appears that the {{JobMaster}} does not properly handle > {{ExecutionConfig#PARALLELISM_AUTO_MAX}}. > > Exception: > {code:java} > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Could not > start JobManager. > at > org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:287) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not > set up JobManager > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:181) > at > org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:747) > at > org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:243) > ... 20 more > Caused by: java.lang.IllegalArgumentException: The parallelism must be at > least one. > at > org.apache.flink.runtime.jobgraph.JobVertex.setParallelism(JobVertex.java:290) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:162) > at > org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:295) > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:170) > ... 22 more{code} > > The likely culprit is this call to {{ExecutionGraphBuilder#buildGraph}} in > the {{JobMaster}} constructor: > {code:java} > this.executionGraph = ExecutionGraphBuilder.buildGraph( >null, >jobGraph, >jobMasterConfiguration.getConfiguration(), >scheduledExecutorService, >scheduledExecutorService, >slotPool.getSlotProvider(), >userCodeLoader, >highAvailabilityServices.getCheckpointRecoveryFactory(), >rpcTimeout, >restartStrategy, >jobMetricGroup, >-1, // parallelismForAutoMax >blobServer, >jobMasterConfiguration.getSlotRequestTimeout(), >log);{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8835) Fix TaskManager config keys
Stephan Ewen created FLINK-8835: --- Summary: Fix TaskManager config keys Key: FLINK-8835 URL: https://issues.apache.org/jira/browse/FLINK-8835 Project: Flink Issue Type: Bug Components: TaskManager Reporter: Stephan Ewen Fix For: 1.5.0 Many new config keys in the TaskManager don't follow the proper naming scheme. We need to clear those up before the release. I would also suggest to keep the key names short, because that makes it easier for users. When doing this cleanup pass over the config keys, I would suggest to also make some of the existing keys more hierarchical harmonize them with the common scheme in Flink. ## New Keys * {{taskmanager.network.credit-based-flow-control.enabled}} to {{taskmanager.network.credit-model}}. * {{taskmanager.exactly-once.blocking.data.enabled}} to {{task.checkpoint.alignment.blocking}} (we already have {{task.checkpoint.alignment.max-size}}) ## Existing Keys * {{taskmanager.debug.memory.startLogThread}} => {{taskmanager.debug.memory.log}} * {{taskmanager.debug.memory.logIntervalMs}} => {{taskmanager.debug.memory.log-interval}} * {{taskmanager.initial-registration-pause}} => {{taskmanager.registration.initial-backoff}} * {{taskmanager.max-registration-pause}} => {{taskmanager.registration.max-backoff}} * {{taskmanager.refused-registration-pause}} {{taskmanager.registration.refused-backoff}} * {{taskmanager.maxRegistrationDuration}} ==> * {{taskmanager.registration.timeout}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8834) Job fails to restart due to some tasks stuck in cancelling state
[ https://issues.apache.org/jira/browse/FLINK-8834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383481#comment-16383481 ] Stephan Ewen commented on FLINK-8834: - The second exception is not a problem, that is just noise in the log, see FLINK-8751 The TaskManagers should make a hard exit (process kill) after a while if graceful shutdown does not work. The timeout for that is by default 3 minutes, configured in {{task.cancellation.timeout}}. You should see something like this in the log: "Notifying TaskManager about fatal error". Can you check if you deactivated this cancellation timeout (set it to 0 or -1)? > Job fails to restart due to some tasks stuck in cancelling state > > > Key: FLINK-8834 > URL: https://issues.apache.org/jira/browse/FLINK-8834 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 > Environment: AWS EMR 5.12 > Flink 1.4.0 > Beam 2.3.0 >Reporter: Daniel Harper >Priority: Major > > Our job threw an exception overnight, causing the job to commence attempting > a restart. > However it never managed to restart because 2 tasks on one of the Task > Managers are stuck in "Cancelling" state, with the following exception > {code:java} > 2018-03-02 02:29:31,604 WARN org.apache.flink.runtime.taskmanager.Task > - Task 'PTransformTranslation.UnknownRawPTransform -> > ParDoTranslation.RawParDo -> ParDoTranslation.RawParDo -> > uk.co.bbc.sawmill.streaming.pipeline.output.io.file.WriteWindowToFile-RDotRecord/TextIO.Write/WriteFiles/GatherTempFileResults/Reshuffle/Window.Into()/Window.Assign.out > -> ParDoTranslation.RawParDo -> ToKeyedWorkItem (24/32)' did not react to > cancelling signal, but is stuck in method: > java.lang.Thread.blockedOn(Thread.java:239) > java.lang.System$2.blockedOn(System.java:1252) > java.nio.channels.spi.AbstractInterruptibleChannel.blockedOn(AbstractInterruptibleChannel.java:211) > java.nio.channels.spi.AbstractInterruptibleChannel.begin(AbstractInterruptibleChannel.java:170) > java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:457) > java.nio.channels.Channels.writeFullyImpl(Channels.java:78) > java.nio.channels.Channels.writeFully(Channels.java:101) > java.nio.channels.Channels.access$000(Channels.java:61) > java.nio.channels.Channels$1.write(Channels.java:174) > java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253) > java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211) > java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145) > java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458) > java.nio.channels.Channels.writeFullyImpl(Channels.java:78) > java.nio.channels.Channels.writeFully(Channels.java:101) > java.nio.channels.Channels.access$000(Channels.java:61) > java.nio.channels.Channels$1.write(Channels.java:174) > sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) > sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125) > sun.nio.cs.StreamEncoder.write(StreamEncoder.java:135) > java.io.OutputStreamWriter.write(OutputStreamWriter.java:220) > java.io.Writer.write(Writer.java:157) > org.apache.beam.sdk.io.TextSink$TextWriter.writeLine(TextSink.java:102) > org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:118) > org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:76) > org.apache.beam.sdk.io.WriteFiles.writeOrClose(WriteFiles.java:550) > org.apache.beam.sdk.io.WriteFiles.access$1000(WriteFiles.java:112) > org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:718) > org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown > Source) > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138) > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65) > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:425) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) > org.apache.bea
[jira] [Created] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers
Tzu-Li (Gordon) Tai created FLINK-8836: -- Summary: Duplicating a KryoSerializer does not duplicate registered default serializers Key: FLINK-8836 URL: https://issues.apache.org/jira/browse/FLINK-8836 Project: Flink Issue Type: Bug Components: Type Serialization System Reporter: Tzu-Li (Gordon) Tai The {{duplicate()}} method of the {{KryoSerializer}} is as following: {code} public KryoSerializer duplicate() { return new KryoSerializer<>(this); } protected KryoSerializer(KryoSerializer toCopy) { defaultSerializers = toCopy.defaultSerializers; defaultSerializerClasses = toCopy.defaultSerializerClasses; kryoRegistrations = toCopy.kryoRegistrations; ... } {code} Shortly put, when duplicating a `KryoSerializer`, the `defaultSerializers` serializer instances are directly provided to the new `KryoSerializer` instance. This causes the fact that those default serializers are shared across two different `KryoSerializer` instances, and therefore not a correct duplicate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers
[ https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-8836: --- Priority: Critical (was: Major) > Duplicating a KryoSerializer does not duplicate registered default serializers > -- > > Key: FLINK-8836 > URL: https://issues.apache.org/jira/browse/FLINK-8836 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Priority: Critical > > The {{duplicate()}} method of the {{KryoSerializer}} is as following: > {code:java} > public KryoSerializer duplicate() { > return new KryoSerializer<>(this); > } > > protected KryoSerializer(KryoSerializer toCopy) { > defaultSerializers = toCopy.defaultSerializers; > defaultSerializerClasses = toCopy.defaultSerializerClasses; > kryoRegistrations = toCopy.kryoRegistrations; > ... > } > {code} > Shortly put, when duplicating a {{KryoSerializer}}, the > {{defaultSerializers}} serializer instances are directly provided to the new > {{KryoSerializer}} instance. > This causes the fact that those default serializers are shared across two > different {{KryoSerializer}} instances, and therefore not a correct duplicate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8517) StaticlyNestedIterationsITCase.testJobWithoutObjectReuse unstable on Travis
[ https://issues.apache.org/jira/browse/FLINK-8517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383483#comment-16383483 ] Nico Kruber commented on FLINK-8517: ok, I can reproduce it locally running the test multiple times - possibly a race between {{TaskEventDispatcher#registerPartition}} and {{TaskEventDispatcher#subscribeToEvent}}. I'll investigate further. > StaticlyNestedIterationsITCase.testJobWithoutObjectReuse unstable on Travis > --- > > Key: FLINK-8517 > URL: https://issues.apache.org/jira/browse/FLINK-8517 > Project: Flink > Issue Type: Bug > Components: DataSet API, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0, 1.4.3 > > > The {{StaticlyNestedIterationsITCase.testJobWithoutObjectReuse}} test case > fails on Travis. This exception might be relevant: > {code:java} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalStateException: Partition > 557b069f2b89f8ba599e6ab0956a3f5a@58f1a6b7d8ae10b9141f17c08d06cecb not > registered at task event dispatcher. > at > org.apache.flink.runtime.io.network.TaskEventDispatcher.subscribeToEvent(TaskEventDispatcher.java:107) > at > org.apache.flink.runtime.iterative.task.IterationHeadTask.initSuperstepBarrier(IterationHeadTask.java:242) > at > org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:266) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748){code} > > https://api.travis-ci.org/v3/job/60156/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers
[ https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-8836: --- Description: The {{duplicate()}} method of the {{KryoSerializer}} is as following: {code:java} public KryoSerializer duplicate() { return new KryoSerializer<>(this); } protected KryoSerializer(KryoSerializer toCopy) { defaultSerializers = toCopy.defaultSerializers; defaultSerializerClasses = toCopy.defaultSerializerClasses; kryoRegistrations = toCopy.kryoRegistrations; ... } {code} Shortly put, when duplicating a {{KryoSerializer}}, the {{defaultSerializers}} serializer instances are directly provided to the new {{KryoSerializer}} instance. This causes the fact that those default serializers are shared across two different {{KryoSerializer}} instances, and therefore not a correct duplicate. was: The {{duplicate()}} method of the {{KryoSerializer}} is as following: {code} public KryoSerializer duplicate() { return new KryoSerializer<>(this); } protected KryoSerializer(KryoSerializer toCopy) { defaultSerializers = toCopy.defaultSerializers; defaultSerializerClasses = toCopy.defaultSerializerClasses; kryoRegistrations = toCopy.kryoRegistrations; ... } {code} Shortly put, when duplicating a `KryoSerializer`, the `defaultSerializers` serializer instances are directly provided to the new `KryoSerializer` instance. This causes the fact that those default serializers are shared across two different `KryoSerializer` instances, and therefore not a correct duplicate. > Duplicating a KryoSerializer does not duplicate registered default serializers > -- > > Key: FLINK-8836 > URL: https://issues.apache.org/jira/browse/FLINK-8836 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Priority: Major > > The {{duplicate()}} method of the {{KryoSerializer}} is as following: > {code:java} > public KryoSerializer duplicate() { > return new KryoSerializer<>(this); > } > > protected KryoSerializer(KryoSerializer toCopy) { > defaultSerializers = toCopy.defaultSerializers; > defaultSerializerClasses = toCopy.defaultSerializerClasses; > kryoRegistrations = toCopy.kryoRegistrations; > ... > } > {code} > Shortly put, when duplicating a {{KryoSerializer}}, the > {{defaultSerializers}} serializer instances are directly provided to the new > {{KryoSerializer}} instance. > This causes the fact that those default serializers are shared across two > different {{KryoSerializer}} instances, and therefore not a correct duplicate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers
[ https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tzu-Li (Gordon) Tai updated FLINK-8836: --- Description: The {{duplicate()}} method of the {{KryoSerializer}} is as following: {code:java} public KryoSerializer duplicate() { return new KryoSerializer<>(this); } protected KryoSerializer(KryoSerializer toCopy) { defaultSerializers = toCopy.defaultSerializers; defaultSerializerClasses = toCopy.defaultSerializerClasses; kryoRegistrations = toCopy.kryoRegistrations; ... } {code} Shortly put, when duplicating a {{KryoSerializer}}, the {{defaultSerializers}} serializer instances are directly provided to the new {{KryoSerializer}} instance. This causes the fact that those default serializers are shared across two different {{KryoSerializer}} instances, and therefore not a correct duplicate. was: The {{duplicate()}} method of the {{KryoSerializer}} is as following: {code:java} public KryoSerializer duplicate() { return new KryoSerializer<>(this); } protected KryoSerializer(KryoSerializer toCopy) { defaultSerializers = toCopy.defaultSerializers; defaultSerializerClasses = toCopy.defaultSerializerClasses; kryoRegistrations = toCopy.kryoRegistrations; ... } {code} Shortly put, when duplicating a {{KryoSerializer}}, the {{defaultSerializers}} serializer instances are directly provided to the new {{KryoSerializer}} instance. This causes the fact that those default serializers are shared across two different {{KryoSerializer}} instances, and therefore not a correct duplicate. > Duplicating a KryoSerializer does not duplicate registered default serializers > -- > > Key: FLINK-8836 > URL: https://issues.apache.org/jira/browse/FLINK-8836 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Priority: Critical > > The {{duplicate()}} method of the {{KryoSerializer}} is as following: > {code:java} > public KryoSerializer duplicate() { > return new KryoSerializer<>(this); > } > protected KryoSerializer(KryoSerializer toCopy) { > defaultSerializers = toCopy.defaultSerializers; > defaultSerializerClasses = toCopy.defaultSerializerClasses; > kryoRegistrations = toCopy.kryoRegistrations; > ... > } > {code} > Shortly put, when duplicating a {{KryoSerializer}}, the > {{defaultSerializers}} serializer instances are directly provided to the new > {{KryoSerializer}} instance. > This causes the fact that those default serializers are shared across two > different {{KryoSerializer}} instances, and therefore not a correct duplicate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8828) Add collect method to DataStream / DataSet scala api
[ https://issues.apache.org/jira/browse/FLINK-8828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383491#comment-16383491 ] Stephan Ewen commented on FLINK-8828: - Interesting suggestion, and it looks like a pretty lightweight self-contained addition, so that's nice! One thing I would raise is the name "collect". The DataSet API has "collect" as 'pull the data set back to the client', and the streaming api has an experimental feature that does the same for the data stream, also using the name "collect", see {{org.apache.flink.streaming.api.datastream.DataStreamUtils}}. > Add collect method to DataStream / DataSet scala api > > > Key: FLINK-8828 > URL: https://issues.apache.org/jira/browse/FLINK-8828 > Project: Flink > Issue Type: Improvement > Components: Core, DataSet API, DataStream API, Scala API >Affects Versions: 1.4.0 >Reporter: Jelmer Kuperus >Priority: Major > > A collect function is a method that takes a Partial Function as its parameter > and applies it to all the elements in the collection to create a new > collection which satisfies the Partial Function. > It can be found on all [core scala collection > classes|http://www.scala-lang.org/api/2.9.2/scala/collection/TraversableLike.html] > as well as on spark's [rdd > interface|https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD] > To understand its utility imagine the following scenario : > Given a DataStream that produces events of type _Purchase_ and _View_ > Transform this stream into a stream of purchase amounts over 1000 euros. > Currently an implementation might look like > {noformat} > val x = dataStream > .filter(_.isInstanceOf[Purchase]) > .map(_.asInstanceOf[Purchase]) > .filter(_.amount > 1000) > .map(_.amount){noformat} > Or alternatively you could do this > {noformat} > dataStream.flatMap(_ match { > case p: Purchase if p.amount > 1000 => Some(p.amount) > case _ => None > }){noformat} > But with collect implemented it could look like > {noformat} > dataStream.collect { > case p: Purchase if p.amount > 1000 => p.amount > }{noformat} > > Which is a lot nicer to both read and write -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8837) Move DataStreamUtils to package 'experimental'.
Stephan Ewen created FLINK-8837: --- Summary: Move DataStreamUtils to package 'experimental'. Key: FLINK-8837 URL: https://issues.apache.org/jira/browse/FLINK-8837 Project: Flink Issue Type: Bug Components: Streaming Reporter: Stephan Ewen Fix For: 1.5.0 The class {{DataStreamUtils}} came from 'flink-contrib' and now accidentally moved to the fully supported API packages. It should be in package 'experimental' to properly communicate that it is not guaranteed to be API stable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8813) AutoParallellismITCase fails with Flip6
[ https://issues.apache.org/jira/browse/FLINK-8813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383498#comment-16383498 ] Chesnay Schepler commented on FLINK-8813: - That sounds like unexpected behavior to me (for a user) and it may be better to outright fail with a proper error message. > AutoParallellismITCase fails with Flip6 > --- > > Key: FLINK-8813 > URL: https://issues.apache.org/jira/browse/FLINK-8813 > Project: Flink > Issue Type: Bug > Components: JobManager, Tests >Affects Versions: 1.5.0 >Reporter: Chesnay Schepler >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > The {{AutoParallelismITCase}} fails when running against flip6. > ([https://travis-ci.org/zentol/flink/jobs/347373854)] > It appears that the {{JobMaster}} does not properly handle > {{ExecutionConfig#PARALLELISM_AUTO_MAX}}. > > Exception: > {code:java} > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Could not > start JobManager. > at > org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:287) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132) > at > akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not > set up JobManager > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:181) > at > org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:747) > at > org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:243) > ... 20 more > Caused by: java.lang.IllegalArgumentException: The parallelism must be at > least one. > at > org.apache.flink.runtime.jobgraph.JobVertex.setParallelism(JobVertex.java:290) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:162) > at > org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:295) > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:170) > ... 22 more{code} > > The likely culprit is this call to {{ExecutionGraphBuilder#buildGraph}} in > the {{JobMaster}} constructor: > {code:java} > this.executionGraph = ExecutionGraphBuilder.buildGraph( >null, >jobGraph, >jobMasterConfiguration.getConfiguration(), >scheduledExecutorService, >scheduledExecutorService, >slotPool.getSlotProvider(), >userCodeLoader, >highAvailabilityServices.getCheckpointRecoveryFactory(), >rpcTimeout, >restartStrategy, >jobMetricGroup, >-1, // parallelismForAutoMax >blobServer, >jobMasterConfiguration.getSlotRequestTimeout(), >log);{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8837) Move DataStreamUtils to package 'experimental'.
[ https://issues.apache.org/jira/browse/FLINK-8837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383502#comment-16383502 ] Stefan Richter commented on FLINK-8837: --- I think there is one problem with keeping it in an experimental package: this class is used to expose functionality that goes through package-private methods in data stream classes. If we move it to a different package, those methods need to become public. If the methods are public, then there is also little use in exposing them over `DataStreamUtils`. > Move DataStreamUtils to package 'experimental'. > --- > > Key: FLINK-8837 > URL: https://issues.apache.org/jira/browse/FLINK-8837 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > > The class {{DataStreamUtils}} came from 'flink-contrib' and now accidentally > moved to the fully supported API packages. It should be in package > 'experimental' to properly communicate that it is not guaranteed to be API > stable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8837) Move DataStreamUtils to package 'experimental'.
[ https://issues.apache.org/jira/browse/FLINK-8837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383502#comment-16383502 ] Stefan Richter edited comment on FLINK-8837 at 3/2/18 11:47 AM: I think there is one problem with keeping it in an experimental package: this class is used to expose functionality that goes through package-private methods in data stream classes. If we move it to a different package, those methods need to become public. If the methods are public, then there is also little use in exposing them over {{DataStreamUtils}}. was (Author: srichter): I think there is one problem with keeping it in an experimental package: this class is used to expose functionality that goes through package-private methods in data stream classes. If we move it to a different package, those methods need to become public. If the methods are public, then there is also little use in exposing them over `DataStreamUtils`. > Move DataStreamUtils to package 'experimental'. > --- > > Key: FLINK-8837 > URL: https://issues.apache.org/jira/browse/FLINK-8837 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > > The class {{DataStreamUtils}} came from 'flink-contrib' and now accidentally > moved to the fully supported API packages. It should be in package > 'experimental' to properly communicate that it is not guaranteed to be API > stable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8837) Move DataStreamUtils to package 'experimental'.
[ https://issues.apache.org/jira/browse/FLINK-8837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383508#comment-16383508 ] Chesnay Schepler commented on FLINK-8837: - why not annotate them with {{PublicEvolving}} instead? Or introduce a separate {{Experimental}} annotation for API's exposed to users that could be removed at any time. > Move DataStreamUtils to package 'experimental'. > --- > > Key: FLINK-8837 > URL: https://issues.apache.org/jira/browse/FLINK-8837 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > > The class {{DataStreamUtils}} came from 'flink-contrib' and now accidentally > moved to the fully supported API packages. It should be in package > 'experimental' to properly communicate that it is not guaranteed to be API > stable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8838) Add Support UNNEST a MultiSet type field
lincoln.lee created FLINK-8838: -- Summary: Add Support UNNEST a MultiSet type field Key: FLINK-8838 URL: https://issues.apache.org/jira/browse/FLINK-8838 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: lincoln.lee Assignee: lincoln.lee {code}MultiSetTypeInfo\{code} was introduced by FLINK-7491, and \{code}UNNEST\{code} support \{code}Array\{code} type only, so it would be nice to support `UNNEST` a `MultiSet` type field. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8837) Move DataStreamUtils to package 'experimental'.
[ https://issues.apache.org/jira/browse/FLINK-8837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383509#comment-16383509 ] Stefan Richter commented on FLINK-8837: --- The class is annotated as evolving and as you said, there is no experimental annotation. > Move DataStreamUtils to package 'experimental'. > --- > > Key: FLINK-8837 > URL: https://issues.apache.org/jira/browse/FLINK-8837 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Stephan Ewen >Priority: Blocker > Fix For: 1.5.0 > > > The class {{DataStreamUtils}} came from 'flink-contrib' and now accidentally > moved to the fully supported API packages. It should be in package > 'experimental' to properly communicate that it is not guaranteed to be API > stable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers
[ https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383510#comment-16383510 ] Stephan Ewen commented on FLINK-8836: - Is the concern that the default serializers themselves are stateful? As far as I understood it so far, the Kryo object is stateful and not thread safe, hence needs to be exclusive to one thread at a time. Kryo does the tracking of object graphs and generic scopes, which makes it not thread safe during serialization / deserialization. The serializers used by Kryo should be stateless and can thus sharable. Please double check this assumption - if the serializers are actually stateful, then we need to see how we can handle that. If they are not, we should be able to close this issue. > Duplicating a KryoSerializer does not duplicate registered default serializers > -- > > Key: FLINK-8836 > URL: https://issues.apache.org/jira/browse/FLINK-8836 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Priority: Critical > > The {{duplicate()}} method of the {{KryoSerializer}} is as following: > {code:java} > public KryoSerializer duplicate() { > return new KryoSerializer<>(this); > } > protected KryoSerializer(KryoSerializer toCopy) { > defaultSerializers = toCopy.defaultSerializers; > defaultSerializerClasses = toCopy.defaultSerializerClasses; > kryoRegistrations = toCopy.kryoRegistrations; > ... > } > {code} > Shortly put, when duplicating a {{KryoSerializer}}, the > {{defaultSerializers}} serializer instances are directly provided to the new > {{KryoSerializer}} instance. > This causes the fact that those default serializers are shared across two > different {{KryoSerializer}} instances, and therefore not a correct duplicate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers
[ https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383515#comment-16383515 ] Tzu-Li (Gordon) Tai commented on FLINK-8836: [~StephanEwen] All built-in Kryo default serializers, AFAIK, are stateless. However, users can also register their own serializer implementations via the `ExecutionConfig`, making them potentially stateful. The \{{KryoSerializer}} recognizes these user registrations and adds them to the Kryo object. > Duplicating a KryoSerializer does not duplicate registered default serializers > -- > > Key: FLINK-8836 > URL: https://issues.apache.org/jira/browse/FLINK-8836 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Priority: Critical > > The {{duplicate()}} method of the {{KryoSerializer}} is as following: > {code:java} > public KryoSerializer duplicate() { > return new KryoSerializer<>(this); > } > protected KryoSerializer(KryoSerializer toCopy) { > defaultSerializers = toCopy.defaultSerializers; > defaultSerializerClasses = toCopy.defaultSerializerClasses; > kryoRegistrations = toCopy.kryoRegistrations; > ... > } > {code} > Shortly put, when duplicating a {{KryoSerializer}}, the > {{defaultSerializers}} serializer instances are directly provided to the new > {{KryoSerializer}} instance. > This causes the fact that those default serializers are shared across two > different {{KryoSerializer}} instances, and therefore not a correct duplicate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers
[ https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383515#comment-16383515 ] Tzu-Li (Gordon) Tai edited comment on FLINK-8836 at 3/2/18 12:03 PM: - [~StephanEwen] All built-in Kryo default serializers, AFAIK, are stateless. However, users can also register their own serializer implementations via the {{ExecutionConfig}}, making them potentially stateful. The {{KryoSerializer}} recognizes these user registrations and adds them to the Kryo object. was (Author: tzulitai): [~StephanEwen] All built-in Kryo default serializers, AFAIK, are stateless. However, users can also register their own serializer implementations via the `ExecutionConfig`, making them potentially stateful. The \{{KryoSerializer}} recognizes these user registrations and adds them to the Kryo object. > Duplicating a KryoSerializer does not duplicate registered default serializers > -- > > Key: FLINK-8836 > URL: https://issues.apache.org/jira/browse/FLINK-8836 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Priority: Critical > > The {{duplicate()}} method of the {{KryoSerializer}} is as following: > {code:java} > public KryoSerializer duplicate() { > return new KryoSerializer<>(this); > } > protected KryoSerializer(KryoSerializer toCopy) { > defaultSerializers = toCopy.defaultSerializers; > defaultSerializerClasses = toCopy.defaultSerializerClasses; > kryoRegistrations = toCopy.kryoRegistrations; > ... > } > {code} > Shortly put, when duplicating a {{KryoSerializer}}, the > {{defaultSerializers}} serializer instances are directly provided to the new > {{KryoSerializer}} instance. > This causes the fact that those default serializers are shared across two > different {{KryoSerializer}} instances, and therefore not a correct duplicate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8838) Add Support UNNEST a MultiSet type field
[ https://issues.apache.org/jira/browse/FLINK-8838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln.lee updated FLINK-8838: --- Description: MultiSetTypeInfo was introduced by FLINK-7491, and UNNEST support Array type only, so it would be nice to support UNNEST a MultiSet type field. (was: {code}MultiSetTypeInfo\{code} was introduced by FLINK-7491, and \{code}UNNEST\{code} support \{code}Array\{code} type only, so it would be nice to support `UNNEST` a `MultiSet` type field.) > Add Support UNNEST a MultiSet type field > > > Key: FLINK-8838 > URL: https://issues.apache.org/jira/browse/FLINK-8838 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Major > > MultiSetTypeInfo was introduced by FLINK-7491, and UNNEST support Array type > only, so it would be nice to support UNNEST a MultiSet type field. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8838) Add Support for UNNEST a MultiSet type field
[ https://issues.apache.org/jira/browse/FLINK-8838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln.lee updated FLINK-8838: --- Summary: Add Support for UNNEST a MultiSet type field (was: Add Support UNNEST a MultiSet type field) > Add Support for UNNEST a MultiSet type field > > > Key: FLINK-8838 > URL: https://issues.apache.org/jira/browse/FLINK-8838 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Major > > MultiSetTypeInfo was introduced by FLINK-7491, and UNNEST support Array type > only, so it would be nice to support UNNEST a MultiSet type field. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-8834) Job fails to restart due to some tasks stuck in cancelling state
[ https://issues.apache.org/jira/browse/FLINK-8834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen updated FLINK-8834: Fix Version/s: 1.5.0 > Job fails to restart due to some tasks stuck in cancelling state > > > Key: FLINK-8834 > URL: https://issues.apache.org/jira/browse/FLINK-8834 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 > Environment: AWS EMR 5.12 > Flink 1.4.0 > Beam 2.3.0 >Reporter: Daniel Harper >Priority: Major > Fix For: 1.5.0 > > > Our job threw an exception overnight, causing the job to commence attempting > a restart. > However it never managed to restart because 2 tasks on one of the Task > Managers are stuck in "Cancelling" state, with the following exception > {code:java} > 2018-03-02 02:29:31,604 WARN org.apache.flink.runtime.taskmanager.Task > - Task 'PTransformTranslation.UnknownRawPTransform -> > ParDoTranslation.RawParDo -> ParDoTranslation.RawParDo -> > uk.co.bbc.sawmill.streaming.pipeline.output.io.file.WriteWindowToFile-RDotRecord/TextIO.Write/WriteFiles/GatherTempFileResults/Reshuffle/Window.Into()/Window.Assign.out > -> ParDoTranslation.RawParDo -> ToKeyedWorkItem (24/32)' did not react to > cancelling signal, but is stuck in method: > java.lang.Thread.blockedOn(Thread.java:239) > java.lang.System$2.blockedOn(System.java:1252) > java.nio.channels.spi.AbstractInterruptibleChannel.blockedOn(AbstractInterruptibleChannel.java:211) > java.nio.channels.spi.AbstractInterruptibleChannel.begin(AbstractInterruptibleChannel.java:170) > java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:457) > java.nio.channels.Channels.writeFullyImpl(Channels.java:78) > java.nio.channels.Channels.writeFully(Channels.java:101) > java.nio.channels.Channels.access$000(Channels.java:61) > java.nio.channels.Channels$1.write(Channels.java:174) > java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253) > java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211) > java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145) > java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458) > java.nio.channels.Channels.writeFullyImpl(Channels.java:78) > java.nio.channels.Channels.writeFully(Channels.java:101) > java.nio.channels.Channels.access$000(Channels.java:61) > java.nio.channels.Channels$1.write(Channels.java:174) > sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) > sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125) > sun.nio.cs.StreamEncoder.write(StreamEncoder.java:135) > java.io.OutputStreamWriter.write(OutputStreamWriter.java:220) > java.io.Writer.write(Writer.java:157) > org.apache.beam.sdk.io.TextSink$TextWriter.writeLine(TextSink.java:102) > org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:118) > org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:76) > org.apache.beam.sdk.io.WriteFiles.writeOrClose(WriteFiles.java:550) > org.apache.beam.sdk.io.WriteFiles.access$1000(WriteFiles.java:112) > org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:718) > org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown > Source) > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138) > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65) > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:425) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:888) > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:865) > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:94) > org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(
[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...
GitHub user lincoln-lil opened a pull request: https://github.com/apache/flink/pull/5619 [FLINK-8838] [table] Add Support for UNNEST a MultiSet type field. ## What is the purpose of the change *This PR add support for UNNEST a MultiSet type field according to SQL standard UNNEST a collection value ( ::= | ) ## Brief change log - *Add support for UNNEST a MultiSet type field ## Verifying this change - *See added unit tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with @Public(Evolving): no - The serializers: no - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/lincoln-lil/flink FLINK-8838 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5619.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5619 commit 961582b28ef20bfab1ea47ceb548b6e6e104e1f7 Author: lincoln-lil Date: 2018-03-02T12:05:44Z [FLINK-8838] [table] Add Support for UNNEST a MultiSet type field. ---
[jira] [Commented] (FLINK-8838) Add Support for UNNEST a MultiSet type field
[ https://issues.apache.org/jira/browse/FLINK-8838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383525#comment-16383525 ] ASF GitHub Bot commented on FLINK-8838: --- GitHub user lincoln-lil opened a pull request: https://github.com/apache/flink/pull/5619 [FLINK-8838] [table] Add Support for UNNEST a MultiSet type field. ## What is the purpose of the change *This PR add support for UNNEST a MultiSet type field according to SQL standard UNNEST a collection value ( ::= | ) ## Brief change log - *Add support for UNNEST a MultiSet type field ## Verifying this change - *See added unit tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with @Public(Evolving): no - The serializers: no - The runtime per-record code paths (performance sensitive): yes - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? JavaDocs You can merge this pull request into a Git repository by running: $ git pull https://github.com/lincoln-lil/flink FLINK-8838 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5619.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5619 commit 961582b28ef20bfab1ea47ceb548b6e6e104e1f7 Author: lincoln-lil Date: 2018-03-02T12:05:44Z [FLINK-8838] [table] Add Support for UNNEST a MultiSet type field. > Add Support for UNNEST a MultiSet type field > > > Key: FLINK-8838 > URL: https://issues.apache.org/jira/browse/FLINK-8838 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: lincoln.lee >Assignee: lincoln.lee >Priority: Major > > MultiSetTypeInfo was introduced by FLINK-7491, and UNNEST support Array type > only, so it would be nice to support UNNEST a MultiSet type field. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8824) In Kafka Consumers, replace 'getCanonicalName()' with 'getClassName()'
[ https://issues.apache.org/jira/browse/FLINK-8824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383535#comment-16383535 ] mingleizhang commented on FLINK-8824: - Hi, [~StephanEwen] The only problematic seems I can get like the following. Other than that, I think all good. {code:java} //inner class System.out.println(HashMap.SimpleEntry.class.getName()); System.out.println(HashMap.SimpleEntry.class.getCanonicalName()); {code} Will print as follows {code:java} java.util.AbstractMap$SimpleEntry // correct way java.util.AbstractMap.SimpleEntry // wrong way {code} > In Kafka Consumers, replace 'getCanonicalName()' with 'getClassName()' > -- > > Key: FLINK-8824 > URL: https://issues.apache.org/jira/browse/FLINK-8824 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Stephan Ewen >Assignee: mingleizhang >Priority: Major > Fix For: 1.5.0 > > > The connector uses {{getCanonicalName()}} in all places, gather than > {{getClassName()}}. > {{getCanonicalName()}}'s intention is to normalize class names for arrays, > etc, but is problematic when instantiating classes from class names. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8834) Job fails to restart due to some tasks stuck in cancelling state
[ https://issues.apache.org/jira/browse/FLINK-8834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383537#comment-16383537 ] Daniel Harper commented on FLINK-8834: -- We are not setting this configuration parameter. These are the config parameters we see via the Job Manager configuration tab on the UI {code:java} classloader.resolve-order containerized.heap-cutoff-ratio env.hadoop.conf.dir env.java.opts.taskmanager env.yarn.conf.dir high-availability high-availability.cluster-id high-availability.zookeeper.path.root high-availability.zookeeper.quorum high-availability.zookeeper.storageDir jobmanager.rpc.address jobmanager.rpc.port jobmanager.web.checkpoints.history parallelism.default state.backend state.backend.fs.checkpointdir state.checkpoints.dir state.savepoints.dir taskmanager.network.numberOfBuffers web.port yarn.application-attempts yarn.maximum-failed-containers zookeeper.sasl.disable {code} I've had a look in the job manager logs for the word "fatal" and it doesn't seem to yield any results {code:java} ▶ gzcat ~/Downloads/jobmanager.log.gz | grep -i fatal ▶ {code} > Job fails to restart due to some tasks stuck in cancelling state > > > Key: FLINK-8834 > URL: https://issues.apache.org/jira/browse/FLINK-8834 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 > Environment: AWS EMR 5.12 > Flink 1.4.0 > Beam 2.3.0 >Reporter: Daniel Harper >Priority: Major > Fix For: 1.5.0 > > > Our job threw an exception overnight, causing the job to commence attempting > a restart. > However it never managed to restart because 2 tasks on one of the Task > Managers are stuck in "Cancelling" state, with the following exception > {code:java} > 2018-03-02 02:29:31,604 WARN org.apache.flink.runtime.taskmanager.Task > - Task 'PTransformTranslation.UnknownRawPTransform -> > ParDoTranslation.RawParDo -> ParDoTranslation.RawParDo -> > uk.co.bbc.sawmill.streaming.pipeline.output.io.file.WriteWindowToFile-RDotRecord/TextIO.Write/WriteFiles/GatherTempFileResults/Reshuffle/Window.Into()/Window.Assign.out > -> ParDoTranslation.RawParDo -> ToKeyedWorkItem (24/32)' did not react to > cancelling signal, but is stuck in method: > java.lang.Thread.blockedOn(Thread.java:239) > java.lang.System$2.blockedOn(System.java:1252) > java.nio.channels.spi.AbstractInterruptibleChannel.blockedOn(AbstractInterruptibleChannel.java:211) > java.nio.channels.spi.AbstractInterruptibleChannel.begin(AbstractInterruptibleChannel.java:170) > java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:457) > java.nio.channels.Channels.writeFullyImpl(Channels.java:78) > java.nio.channels.Channels.writeFully(Channels.java:101) > java.nio.channels.Channels.access$000(Channels.java:61) > java.nio.channels.Channels$1.write(Channels.java:174) > java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253) > java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211) > java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145) > java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458) > java.nio.channels.Channels.writeFullyImpl(Channels.java:78) > java.nio.channels.Channels.writeFully(Channels.java:101) > java.nio.channels.Channels.access$000(Channels.java:61) > java.nio.channels.Channels$1.write(Channels.java:174) > sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) > sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125) > sun.nio.cs.StreamEncoder.write(StreamEncoder.java:135) > java.io.OutputStreamWriter.write(OutputStreamWriter.java:220) > java.io.Writer.write(Writer.java:157) > org.apache.beam.sdk.io.TextSink$TextWriter.writeLine(TextSink.java:102) > org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:118) > org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:76) > org.apache.beam.sdk.io.WriteFiles.writeOrClose(WriteFiles.java:550) > org.apache.beam.sdk.io.WriteFiles.access$1000(WriteFiles.java:112) > org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:718) > org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown > Source) > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138) > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65) > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:425) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperato
[GitHub] flink pull request #5620: [FLINK-8824] [kafka] Replace getCanonicalName with...
GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/5620 [FLINK-8824] [kafka] Replace getCanonicalName with getName ## What is the purpose of the change Since using ```getCanonicalName``` to get class name there are potential problems. And it is mainly for normalizing class names for array. ## Brief change log Replace it with ```getName``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-8824 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5620.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5620 commit 634262b27b90cc1383c38ff8ac642f7aae522e71 Author: zhangminglei Date: 2018-03-02T12:31:49Z [FLINK-8824] [kafka] Replace getCanonicalName with getName ---
[jira] [Commented] (FLINK-8824) In Kafka Consumers, replace 'getCanonicalName()' with 'getClassName()'
[ https://issues.apache.org/jira/browse/FLINK-8824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383539#comment-16383539 ] ASF GitHub Bot commented on FLINK-8824: --- GitHub user zhangminglei opened a pull request: https://github.com/apache/flink/pull/5620 [FLINK-8824] [kafka] Replace getCanonicalName with getName ## What is the purpose of the change Since using ```getCanonicalName``` to get class name there are potential problems. And it is mainly for normalizing class names for array. ## Brief change log Replace it with ```getName``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhangminglei/flink flink-8824 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5620.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5620 commit 634262b27b90cc1383c38ff8ac642f7aae522e71 Author: zhangminglei Date: 2018-03-02T12:31:49Z [FLINK-8824] [kafka] Replace getCanonicalName with getName > In Kafka Consumers, replace 'getCanonicalName()' with 'getClassName()' > -- > > Key: FLINK-8824 > URL: https://issues.apache.org/jira/browse/FLINK-8824 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Stephan Ewen >Assignee: mingleizhang >Priority: Major > Fix For: 1.5.0 > > > The connector uses {{getCanonicalName()}} in all places, gather than > {{getClassName()}}. > {{getCanonicalName()}}'s intention is to normalize class names for arrays, > etc, but is problematic when instantiating classes from class names. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-6160) Retry JobManager/ResourceManager connection in case of timeout
[ https://issues.apache.org/jira/browse/FLINK-6160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383541#comment-16383541 ] mingleizhang commented on FLINK-6160: - Sup ? > Retry JobManager/ResourceManager connection in case of timeout > --- > > Key: FLINK-6160 > URL: https://issues.apache.org/jira/browse/FLINK-6160 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.3.0 >Reporter: Till Rohrmann >Priority: Major > Labels: flip-6 > > In case of a heartbeat timeout, the {{TaskExecutor}} closes the connection to > the remote component. Furthermore, it assumes that the component has actually > failed and, thus, it will only start trying to connect to the component if it > is notified about a new leader address and leader session id. This is > brittle, because the heartbeat could also time out without the component > having crashed. Thus, we should add an automatic retry to the latest known > leader address information in case of a timeout. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers
[ https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383515#comment-16383515 ] Tzu-Li (Gordon) Tai edited comment on FLINK-8836 at 3/2/18 12:44 PM: - [~StephanEwen] I think they can be stateful. All built-in Kryo default serializers, AFAIK, are stateless. However, users can also register their own serializer implementations via the {{ExecutionConfig}}, making them potentially stateful. The {{KryoSerializer}} recognizes these user registrations and adds them to the Kryo object. was (Author: tzulitai): [~StephanEwen] All built-in Kryo default serializers, AFAIK, are stateless. However, users can also register their own serializer implementations via the {{ExecutionConfig}}, making them potentially stateful. The {{KryoSerializer}} recognizes these user registrations and adds them to the Kryo object. > Duplicating a KryoSerializer does not duplicate registered default serializers > -- > > Key: FLINK-8836 > URL: https://issues.apache.org/jira/browse/FLINK-8836 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Reporter: Tzu-Li (Gordon) Tai >Priority: Critical > > The {{duplicate()}} method of the {{KryoSerializer}} is as following: > {code:java} > public KryoSerializer duplicate() { > return new KryoSerializer<>(this); > } > protected KryoSerializer(KryoSerializer toCopy) { > defaultSerializers = toCopy.defaultSerializers; > defaultSerializerClasses = toCopy.defaultSerializerClasses; > kryoRegistrations = toCopy.kryoRegistrations; > ... > } > {code} > Shortly put, when duplicating a {{KryoSerializer}}, the > {{defaultSerializers}} serializer instances are directly provided to the new > {{KryoSerializer}} instance. > This causes the fact that those default serializers are shared across two > different {{KryoSerializer}} instances, and therefore not a correct duplicate. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-8835) Fix TaskManager config keys
[ https://issues.apache.org/jira/browse/FLINK-8835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] mingleizhang reassigned FLINK-8835: --- Assignee: mingleizhang > Fix TaskManager config keys > --- > > Key: FLINK-8835 > URL: https://issues.apache.org/jira/browse/FLINK-8835 > Project: Flink > Issue Type: Bug > Components: TaskManager >Reporter: Stephan Ewen >Assignee: mingleizhang >Priority: Blocker > Fix For: 1.5.0 > > > Many new config keys in the TaskManager don't follow the proper naming > scheme. We need to clear those up before the release. I would also suggest to > keep the key names short, because that makes it easier for users. > When doing this cleanup pass over the config keys, I would suggest to also > make some of the existing keys more hierarchical harmonize them with the > common scheme in Flink. > ## New Keys > * {{taskmanager.network.credit-based-flow-control.enabled}} to > {{taskmanager.network.credit-model}}. > * {{taskmanager.exactly-once.blocking.data.enabled}} to > {{task.checkpoint.alignment.blocking}} (we already have > {{task.checkpoint.alignment.max-size}}) > ## Existing Keys > * {{taskmanager.debug.memory.startLogThread}} => > {{taskmanager.debug.memory.log}} > * {{taskmanager.debug.memory.logIntervalMs}} => > {{taskmanager.debug.memory.log-interval}} > * {{taskmanager.initial-registration-pause}} => > {{taskmanager.registration.initial-backoff}} > * {{taskmanager.max-registration-pause}} => > {{taskmanager.registration.max-backoff}} > * {{taskmanager.refused-registration-pause}} > {{taskmanager.registration.refused-backoff}} > * {{taskmanager.maxRegistrationDuration}} ==> * > {{taskmanager.registration.timeout}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8274) Fix Java 64K method compiling limitation for CommonCalc
[ https://issues.apache.org/jira/browse/FLINK-8274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383545#comment-16383545 ] ASF GitHub Bot commented on FLINK-8274: --- Github user Xpray commented on the issue: https://github.com/apache/flink/pull/5613 It looks great, how about splitting reuseInputUnboxCode as well? I found UnboxingCode might be oversized and has to split. > Fix Java 64K method compiling limitation for CommonCalc > --- > > Key: FLINK-8274 > URL: https://issues.apache.org/jira/browse/FLINK-8274 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.5.0 >Reporter: Ruidong Li >Assignee: Ruidong Li >Priority: Critical > > For complex SQL Queries, the generated code for {code}DataStreamCalc{code}, > {code}DataSetCalc{code} may exceed Java's method length limitation 64kb. > > This issue will split long method to several sub method calls. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink issue #5613: [FLINK-8274] [table] Split generated methods for preventi...
Github user Xpray commented on the issue: https://github.com/apache/flink/pull/5613 It looks great, how about splitting reuseInputUnboxCode as well? I found UnboxingCode might be oversized and has to split. ---
[jira] [Comment Edited] (FLINK-8834) Job fails to restart due to some tasks stuck in cancelling state
[ https://issues.apache.org/jira/browse/FLINK-8834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383548#comment-16383548 ] Daniel Harper edited comment on FLINK-8834 at 3/2/18 12:56 PM: --- I can see this code where the log line is created [https://github.com/apache/flink/blob/f9a583b727c9aecbec3213b12266f1d598223400/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L1578] So it looks like the {{if}} condition is never reaching the first branch, which in turn logs the message [[~StephanEwen]] was talking about. was (Author: djharper): I can see this code where the log line is created [https://github.com/apache/flink/blob/f9a583b727c9aecbec3213b12266f1d598223400/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L1578] So it looks like the {{if}} condition is never reaching the first branch, which in turn logs the message [[~StephanEwen]] was talking about. > Job fails to restart due to some tasks stuck in cancelling state > > > Key: FLINK-8834 > URL: https://issues.apache.org/jira/browse/FLINK-8834 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 > Environment: AWS EMR 5.12 > Flink 1.4.0 > Beam 2.3.0 >Reporter: Daniel Harper >Priority: Major > Fix For: 1.5.0 > > > Our job threw an exception overnight, causing the job to commence attempting > a restart. > However it never managed to restart because 2 tasks on one of the Task > Managers are stuck in "Cancelling" state, with the following exception > {code:java} > 2018-03-02 02:29:31,604 WARN org.apache.flink.runtime.taskmanager.Task > - Task 'PTransformTranslation.UnknownRawPTransform -> > ParDoTranslation.RawParDo -> ParDoTranslation.RawParDo -> > uk.co.bbc.sawmill.streaming.pipeline.output.io.file.WriteWindowToFile-RDotRecord/TextIO.Write/WriteFiles/GatherTempFileResults/Reshuffle/Window.Into()/Window.Assign.out > -> ParDoTranslation.RawParDo -> ToKeyedWorkItem (24/32)' did not react to > cancelling signal, but is stuck in method: > java.lang.Thread.blockedOn(Thread.java:239) > java.lang.System$2.blockedOn(System.java:1252) > java.nio.channels.spi.AbstractInterruptibleChannel.blockedOn(AbstractInterruptibleChannel.java:211) > java.nio.channels.spi.AbstractInterruptibleChannel.begin(AbstractInterruptibleChannel.java:170) > java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:457) > java.nio.channels.Channels.writeFullyImpl(Channels.java:78) > java.nio.channels.Channels.writeFully(Channels.java:101) > java.nio.channels.Channels.access$000(Channels.java:61) > java.nio.channels.Channels$1.write(Channels.java:174) > java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253) > java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211) > java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145) > java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458) > java.nio.channels.Channels.writeFullyImpl(Channels.java:78) > java.nio.channels.Channels.writeFully(Channels.java:101) > java.nio.channels.Channels.access$000(Channels.java:61) > java.nio.channels.Channels$1.write(Channels.java:174) > sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) > sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125) > sun.nio.cs.StreamEncoder.write(StreamEncoder.java:135) > java.io.OutputStreamWriter.write(OutputStreamWriter.java:220) > java.io.Writer.write(Writer.java:157) > org.apache.beam.sdk.io.TextSink$TextWriter.writeLine(TextSink.java:102) > org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:118) > org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:76) > org.apache.beam.sdk.io.WriteFiles.writeOrClose(WriteFiles.java:550) > org.apache.beam.sdk.io.WriteFiles.access$1000(WriteFiles.java:112) > org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:718) > org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown > Source) > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138) > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65) > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:425) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) > org.apa
[jira] [Commented] (FLINK-8834) Job fails to restart due to some tasks stuck in cancelling state
[ https://issues.apache.org/jira/browse/FLINK-8834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383548#comment-16383548 ] Daniel Harper commented on FLINK-8834: -- I can see this code where the log line is created [https://github.com/apache/flink/blob/f9a583b727c9aecbec3213b12266f1d598223400/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L1578] So it looks like the {{if}} condition is never reaching the first branch, which in turn logs the message [[~StephanEwen]] was talking about. > Job fails to restart due to some tasks stuck in cancelling state > > > Key: FLINK-8834 > URL: https://issues.apache.org/jira/browse/FLINK-8834 > Project: Flink > Issue Type: Bug >Affects Versions: 1.4.0 > Environment: AWS EMR 5.12 > Flink 1.4.0 > Beam 2.3.0 >Reporter: Daniel Harper >Priority: Major > Fix For: 1.5.0 > > > Our job threw an exception overnight, causing the job to commence attempting > a restart. > However it never managed to restart because 2 tasks on one of the Task > Managers are stuck in "Cancelling" state, with the following exception > {code:java} > 2018-03-02 02:29:31,604 WARN org.apache.flink.runtime.taskmanager.Task > - Task 'PTransformTranslation.UnknownRawPTransform -> > ParDoTranslation.RawParDo -> ParDoTranslation.RawParDo -> > uk.co.bbc.sawmill.streaming.pipeline.output.io.file.WriteWindowToFile-RDotRecord/TextIO.Write/WriteFiles/GatherTempFileResults/Reshuffle/Window.Into()/Window.Assign.out > -> ParDoTranslation.RawParDo -> ToKeyedWorkItem (24/32)' did not react to > cancelling signal, but is stuck in method: > java.lang.Thread.blockedOn(Thread.java:239) > java.lang.System$2.blockedOn(System.java:1252) > java.nio.channels.spi.AbstractInterruptibleChannel.blockedOn(AbstractInterruptibleChannel.java:211) > java.nio.channels.spi.AbstractInterruptibleChannel.begin(AbstractInterruptibleChannel.java:170) > java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:457) > java.nio.channels.Channels.writeFullyImpl(Channels.java:78) > java.nio.channels.Channels.writeFully(Channels.java:101) > java.nio.channels.Channels.access$000(Channels.java:61) > java.nio.channels.Channels$1.write(Channels.java:174) > java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253) > java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211) > java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145) > java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458) > java.nio.channels.Channels.writeFullyImpl(Channels.java:78) > java.nio.channels.Channels.writeFully(Channels.java:101) > java.nio.channels.Channels.access$000(Channels.java:61) > java.nio.channels.Channels$1.write(Channels.java:174) > sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) > sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282) > sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125) > sun.nio.cs.StreamEncoder.write(StreamEncoder.java:135) > java.io.OutputStreamWriter.write(OutputStreamWriter.java:220) > java.io.Writer.write(Writer.java:157) > org.apache.beam.sdk.io.TextSink$TextWriter.writeLine(TextSink.java:102) > org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:118) > org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:76) > org.apache.beam.sdk.io.WriteFiles.writeOrClose(WriteFiles.java:550) > org.apache.beam.sdk.io.WriteFiles.access$1000(WriteFiles.java:112) > org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:718) > org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown > Source) > org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) > org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138) > org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65) > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:425) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524) > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504) > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831) > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809) > org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOut
[jira] [Updated] (FLINK-8517) StaticlyNestedIterationsITCase.testJobWithoutObjectReuse unstable on Travis
[ https://issues.apache.org/jira/browse/FLINK-8517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-8517: --- Component/s: TaskManager > StaticlyNestedIterationsITCase.testJobWithoutObjectReuse unstable on Travis > --- > > Key: FLINK-8517 > URL: https://issues.apache.org/jira/browse/FLINK-8517 > Project: Flink > Issue Type: Bug > Components: DataSet API, TaskManager, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0, 1.4.3 > > > The {{StaticlyNestedIterationsITCase.testJobWithoutObjectReuse}} test case > fails on Travis. This exception might be relevant: > {code:java} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalStateException: Partition > 557b069f2b89f8ba599e6ab0956a3f5a@58f1a6b7d8ae10b9141f17c08d06cecb not > registered at task event dispatcher. > at > org.apache.flink.runtime.io.network.TaskEventDispatcher.subscribeToEvent(TaskEventDispatcher.java:107) > at > org.apache.flink.runtime.iterative.task.IterationHeadTask.initSuperstepBarrier(IterationHeadTask.java:242) > at > org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:266) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748){code} > > https://api.travis-ci.org/v3/job/60156/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5621: [FLINK-8517] fix missing synchronization in TaskEv...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/5621 [FLINK-8517] fix missing synchronization in TaskEventDispatcher ## What is the purpose of the change The `TaskEventDispatcher` was missing synchronization accessing the `registeredHandlers` field for the new `subscribeToEvent()` and `publish()` methods. This was causing the `StaticlyNestedIterationsITCase.testJobWithoutObjectReuse()` test to sporadically fail (reproducible after running a couple of times). Please merge into `master` and `release-1.5` after accepting. ## Brief change log - add synchronization around `TaskEventDispatcher#subscribeToEvent()`'s access to `registeredHandlers` - add synchronization around `TaskEventDispatcher#publish()`'s access to `registeredHandlers` ## Verifying this change This change is already covered by existing tests (indirectly), such as `StaticlyNestedIterationsITCase.testJobWithoutObjectReuse()`. I ran it almost 24000 times and could not reproduce it anymore with the change ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-8517 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5621.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5621 commit aabeb89dd1259174c786f19b7e97c4c50038610f Author: Nico Kruber Date: 2018-03-02T13:38:20Z [FLINK-8517] fix missing synchronization in TaskEventDispatcher ---
[jira] [Commented] (FLINK-8517) StaticlyNestedIterationsITCase.testJobWithoutObjectReuse unstable on Travis
[ https://issues.apache.org/jira/browse/FLINK-8517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383580#comment-16383580 ] ASF GitHub Bot commented on FLINK-8517: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/5621 [FLINK-8517] fix missing synchronization in TaskEventDispatcher ## What is the purpose of the change The `TaskEventDispatcher` was missing synchronization accessing the `registeredHandlers` field for the new `subscribeToEvent()` and `publish()` methods. This was causing the `StaticlyNestedIterationsITCase.testJobWithoutObjectReuse()` test to sporadically fail (reproducible after running a couple of times). Please merge into `master` and `release-1.5` after accepting. ## Brief change log - add synchronization around `TaskEventDispatcher#subscribeToEvent()`'s access to `registeredHandlers` - add synchronization around `TaskEventDispatcher#publish()`'s access to `registeredHandlers` ## Verifying this change This change is already covered by existing tests (indirectly), such as `StaticlyNestedIterationsITCase.testJobWithoutObjectReuse()`. I ran it almost 24000 times and could not reproduce it anymore with the change ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-8517 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5621.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5621 commit aabeb89dd1259174c786f19b7e97c4c50038610f Author: Nico Kruber Date: 2018-03-02T13:38:20Z [FLINK-8517] fix missing synchronization in TaskEventDispatcher > StaticlyNestedIterationsITCase.testJobWithoutObjectReuse unstable on Travis > --- > > Key: FLINK-8517 > URL: https://issues.apache.org/jira/browse/FLINK-8517 > Project: Flink > Issue Type: Bug > Components: DataSet API, TaskManager, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0, 1.4.3 > > > The {{StaticlyNestedIterationsITCase.testJobWithoutObjectReuse}} test case > fails on Travis. This exception might be relevant: > {code:java} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalStateException: Partition > 557b069f2b89f8ba599e6ab0956a3f5a@58f1a6b7d8ae10b9141f17c08d06cecb not > registered at task event dispatcher. > at > org.apache.flink.runtime.io.network.TaskEventDispatcher.subscribeToEvent(TaskEventDispatcher.java:107) > at > org.apache.flink.runtime.iterative.task.IterationHeadTask.initSuperstepBarrier(IterationHeadTask.java:242) > at > org.apache.flink.runtime.iterative.
[jira] [Updated] (FLINK-8517) StaticlyNestedIterationsITCase.testJobWithoutObjectReuse unstable on Travis
[ https://issues.apache.org/jira/browse/FLINK-8517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber updated FLINK-8517: --- Fix Version/s: (was: 1.4.3) > StaticlyNestedIterationsITCase.testJobWithoutObjectReuse unstable on Travis > --- > > Key: FLINK-8517 > URL: https://issues.apache.org/jira/browse/FLINK-8517 > Project: Flink > Issue Type: Bug > Components: DataSet API, TaskManager, Tests >Affects Versions: 1.5.0 >Reporter: Till Rohrmann >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0 > > > The {{StaticlyNestedIterationsITCase.testJobWithoutObjectReuse}} test case > fails on Travis. This exception might be relevant: > {code:java} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalStateException: Partition > 557b069f2b89f8ba599e6ab0956a3f5a@58f1a6b7d8ae10b9141f17c08d06cecb not > registered at task event dispatcher. > at > org.apache.flink.runtime.io.network.TaskEventDispatcher.subscribeToEvent(TaskEventDispatcher.java:107) > at > org.apache.flink.runtime.iterative.task.IterationHeadTask.initSuperstepBarrier(IterationHeadTask.java:242) > at > org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:266) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748){code} > > https://api.travis-ci.org/v3/job/60156/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-8839) Table source factory discovery is broken in SQL Client
Timo Walther created FLINK-8839: --- Summary: Table source factory discovery is broken in SQL Client Key: FLINK-8839 URL: https://issues.apache.org/jira/browse/FLINK-8839 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther Assignee: Timo Walther Table source factories cannot not be discovered if they were added using a jar file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383610#comment-16383610 ] ASF GitHub Bot commented on FLINK-8459: --- GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5622 [FLINK-8459][flip6] Implement RestClusterClient.cancelWithSavepoint ## What is the purpose of the change *Introduce cancelJob flag to existing triggerSavepoint methods in Dispatcher and JobMaster. Stop checkpoint scheduler before taking savepoint to make sure that the savepoint created by this command is the last one.* cc: @tillrohrmann ## Brief change log - *Implement RestClusterClient.cancelWithSavepoint* ## Verifying this change This change added tests and can be verified as follows: - *Added `JobMasterTriggerSavepointIT`.* - *Manually tested.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8459-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5622.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5622 commit 7e913b0d1eab8453279ffacc11f4633b9263190d Author: gyao Date: 2018-03-02T14:11:36Z [FLINK-8459][flip6] Implement RestClusterClient.cancelWithSavepoint Introduce cancelJob flag to existing triggerSavepoint methods in Dispatcher and JobMaster. Stop checkpoint scheduler before taking savepoint to make sure that the savepoint created by this command is the last one. > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Implement the method > {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String > savepointDirectory)}}. > by either taking a savepoint and cancel the job separately, or by migrating > the logic in {{JobCancellationWithSavepointHandlers}}. The former will have > different semantics because the checkpoint scheduler is not stopped. Thus it > is not guaranteed that there won't be additional checkpoints between the > savepoint and the job cancelation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5622 [FLINK-8459][flip6] Implement RestClusterClient.cancelWithSavepoint ## What is the purpose of the change *Introduce cancelJob flag to existing triggerSavepoint methods in Dispatcher and JobMaster. Stop checkpoint scheduler before taking savepoint to make sure that the savepoint created by this command is the last one.* cc: @tillrohrmann ## Brief change log - *Implement RestClusterClient.cancelWithSavepoint* ## Verifying this change This change added tests and can be verified as follows: - *Added `JobMasterTriggerSavepointIT`.* - *Manually tested.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8459-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5622.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5622 commit 7e913b0d1eab8453279ffacc11f4633b9263190d Author: gyao Date: 2018-03-02T14:11:36Z [FLINK-8459][flip6] Implement RestClusterClient.cancelWithSavepoint Introduce cancelJob flag to existing triggerSavepoint methods in Dispatcher and JobMaster. Stop checkpoint scheduler before taking savepoint to make sure that the savepoint created by this command is the last one. ---
[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5622#discussion_r171857387 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) { @Override public CompletableFuture triggerSavepoint( - @Nullable final String targetDirectory, - final Time timeout) { - try { - return executionGraph.getCheckpointCoordinator() - .triggerSavepoint(System.currentTimeMillis(), targetDirectory) - .thenApply(CompletedCheckpoint::getExternalPointer); - } catch (Exception e) { - return FutureUtils.completedExceptionally(e); + @Nullable final String targetDirectory, + final boolean cancelJob, + final Time timeout) { + + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + if (checkpointCoordinator == null) { + return FutureUtils.completedExceptionally(new IllegalStateException( + String.format("Job %s is not a streaming job.", jobGraph.getJobID(; --- End diff -- If the job is in a terminal state, the coordinator will be `null`ed as well. ---
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383612#comment-16383612 ] ASF GitHub Bot commented on FLINK-8459: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5622#discussion_r171857387 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) { @Override public CompletableFuture triggerSavepoint( - @Nullable final String targetDirectory, - final Time timeout) { - try { - return executionGraph.getCheckpointCoordinator() - .triggerSavepoint(System.currentTimeMillis(), targetDirectory) - .thenApply(CompletedCheckpoint::getExternalPointer); - } catch (Exception e) { - return FutureUtils.completedExceptionally(e); + @Nullable final String targetDirectory, + final boolean cancelJob, + final Time timeout) { + + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + if (checkpointCoordinator == null) { + return FutureUtils.completedExceptionally(new IllegalStateException( + String.format("Job %s is not a streaming job.", jobGraph.getJobID(; --- End diff -- If the job is in a terminal state, the coordinator will be `null`ed as well. > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Implement the method > {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String > savepointDirectory)}}. > by either taking a savepoint and cancel the job separately, or by migrating > the logic in {{JobCancellationWithSavepointHandlers}}. The former will have > different semantics because the checkpoint scheduler is not stopped. Thus it > is not guaranteed that there won't be additional checkpoints between the > savepoint and the job cancelation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5622#discussion_r171857517 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) { @Override public CompletableFuture triggerSavepoint( - @Nullable final String targetDirectory, - final Time timeout) { - try { - return executionGraph.getCheckpointCoordinator() - .triggerSavepoint(System.currentTimeMillis(), targetDirectory) - .thenApply(CompletedCheckpoint::getExternalPointer); - } catch (Exception e) { - return FutureUtils.completedExceptionally(e); + @Nullable final String targetDirectory, + final boolean cancelJob, + final Time timeout) { + + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + if (checkpointCoordinator == null) { + return FutureUtils.completedExceptionally(new IllegalStateException( + String.format("Job %s is not a streaming job.", jobGraph.getJobID(; + } + + if (cancelJob) { + checkpointCoordinator.stopCheckpointScheduler(); + } + return checkpointCoordinator + .triggerSavepoint(System.currentTimeMillis(), targetDirectory) + .thenApply(CompletedCheckpoint::getExternalPointer) + .thenApplyAsync(path -> { + if (cancelJob) { + log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID()); + cancel(timeout); + } + return path; + }, getMainThreadExecutor()) + .exceptionally(throwable -> { + if (cancelJob) { + startCheckpointScheduler(checkpointCoordinator); --- End diff -- If the cancelation failed, we restart the scheduler as well. I think this differs from the previous implementation. ---
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383616#comment-16383616 ] ASF GitHub Bot commented on FLINK-8459: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5622#discussion_r171857517 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) { @Override public CompletableFuture triggerSavepoint( - @Nullable final String targetDirectory, - final Time timeout) { - try { - return executionGraph.getCheckpointCoordinator() - .triggerSavepoint(System.currentTimeMillis(), targetDirectory) - .thenApply(CompletedCheckpoint::getExternalPointer); - } catch (Exception e) { - return FutureUtils.completedExceptionally(e); + @Nullable final String targetDirectory, + final boolean cancelJob, + final Time timeout) { + + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + if (checkpointCoordinator == null) { + return FutureUtils.completedExceptionally(new IllegalStateException( + String.format("Job %s is not a streaming job.", jobGraph.getJobID(; + } + + if (cancelJob) { + checkpointCoordinator.stopCheckpointScheduler(); + } + return checkpointCoordinator + .triggerSavepoint(System.currentTimeMillis(), targetDirectory) + .thenApply(CompletedCheckpoint::getExternalPointer) + .thenApplyAsync(path -> { + if (cancelJob) { + log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID()); + cancel(timeout); + } + return path; + }, getMainThreadExecutor()) + .exceptionally(throwable -> { + if (cancelJob) { + startCheckpointScheduler(checkpointCoordinator); --- End diff -- If the cancelation failed, we restart the scheduler as well. I think this differs from the previous implementation. > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Implement the method > {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String > savepointDirectory)}}. > by either taking a savepoint and cancel the job separately, or by migrating > the logic in {{JobCancellationWithSavepointHandlers}}. The former will have > different semantics because the checkpoint scheduler is not stopped. Thus it > is not guaranteed that there won't be additional checkpoints between the > savepoint and the job cancelation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5622#discussion_r171857970 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) { @Override public CompletableFuture triggerSavepoint( - @Nullable final String targetDirectory, - final Time timeout) { - try { - return executionGraph.getCheckpointCoordinator() - .triggerSavepoint(System.currentTimeMillis(), targetDirectory) - .thenApply(CompletedCheckpoint::getExternalPointer); - } catch (Exception e) { - return FutureUtils.completedExceptionally(e); + @Nullable final String targetDirectory, + final boolean cancelJob, + final Time timeout) { + + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + if (checkpointCoordinator == null) { + return FutureUtils.completedExceptionally(new IllegalStateException( + String.format("Job %s is not a streaming job.", jobGraph.getJobID(; + } + + if (cancelJob) { + checkpointCoordinator.stopCheckpointScheduler(); + } + return checkpointCoordinator + .triggerSavepoint(System.currentTimeMillis(), targetDirectory) + .thenApply(CompletedCheckpoint::getExternalPointer) + .thenApplyAsync(path -> { + if (cancelJob) { + log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID()); + cancel(timeout); + } + return path; + }, getMainThreadExecutor()) + .exceptionally(throwable -> { + if (cancelJob) { + startCheckpointScheduler(checkpointCoordinator); + } + throw new CompletionException(throwable); + }); + } + + private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) { --- End diff -- Method can be reused in the job rescaling logic. ---
[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5622#discussion_r171858158 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBodyTest.java --- @@ -34,8 +34,8 @@ } @Override - protected SavepointTriggerRequestBody getTestRequestInstance() throws Exception { - return new SavepointTriggerRequestBody("/tmp"); + protected SavepointTriggerRequestBody getTestRequestInstance() { + return new SavepointTriggerRequestBody("/tmp", true); --- End diff -- strictly speaking the `false` case should be tested as well ---
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383618#comment-16383618 ] ASF GitHub Bot commented on FLINK-8459: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5622#discussion_r171857970 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) { @Override public CompletableFuture triggerSavepoint( - @Nullable final String targetDirectory, - final Time timeout) { - try { - return executionGraph.getCheckpointCoordinator() - .triggerSavepoint(System.currentTimeMillis(), targetDirectory) - .thenApply(CompletedCheckpoint::getExternalPointer); - } catch (Exception e) { - return FutureUtils.completedExceptionally(e); + @Nullable final String targetDirectory, + final boolean cancelJob, + final Time timeout) { + + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + if (checkpointCoordinator == null) { + return FutureUtils.completedExceptionally(new IllegalStateException( + String.format("Job %s is not a streaming job.", jobGraph.getJobID(; + } + + if (cancelJob) { + checkpointCoordinator.stopCheckpointScheduler(); + } + return checkpointCoordinator + .triggerSavepoint(System.currentTimeMillis(), targetDirectory) + .thenApply(CompletedCheckpoint::getExternalPointer) + .thenApplyAsync(path -> { + if (cancelJob) { + log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID()); + cancel(timeout); + } + return path; + }, getMainThreadExecutor()) + .exceptionally(throwable -> { + if (cancelJob) { + startCheckpointScheduler(checkpointCoordinator); + } + throw new CompletionException(throwable); + }); + } + + private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) { --- End diff -- Method can be reused in the job rescaling logic. > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Implement the method > {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String > savepointDirectory)}}. > by either taking a savepoint and cancel the job separately, or by migrating > the logic in {{JobCancellationWithSavepointHandlers}}. The former will have > different semantics because the checkpoint scheduler is not stopped. Thus it > is not guaranteed that there won't be additional checkpoints between the > savepoint and the job cancelation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383619#comment-16383619 ] ASF GitHub Bot commented on FLINK-8459: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5622#discussion_r171858158 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBodyTest.java --- @@ -34,8 +34,8 @@ } @Override - protected SavepointTriggerRequestBody getTestRequestInstance() throws Exception { - return new SavepointTriggerRequestBody("/tmp"); + protected SavepointTriggerRequestBody getTestRequestInstance() { + return new SavepointTriggerRequestBody("/tmp", true); --- End diff -- strictly speaking the `false` case should be tested as well > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Implement the method > {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String > savepointDirectory)}}. > by either taking a savepoint and cancel the job separately, or by migrating > the logic in {{JobCancellationWithSavepointHandlers}}. The former will have > different semantics because the checkpoint scheduler is not stopped. Thus it > is not guaranteed that there won't be additional checkpoints between the > savepoint and the job cancelation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383622#comment-16383622 ] ASF GitHub Bot commented on FLINK-8459: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5622#discussion_r171858593 --- Diff: flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.CheckpointTriggerException; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Assume; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.isOneOf; + +/** + * Tests for {@link org.apache.flink.runtime.jobmaster.JobMaster#triggerSavepoint(String, boolean, Time)}. + * + * @see org.apache.flink.runtime.jobmaster.JobMaster + */ +@Category(Flip6.class) +public class JobMasterTriggerSavepointIT extends AbstractTestBase { + + private static CountDownLatch invokeLatch; + + private static volatile CountDownLatch triggerCheckpointLatch; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private Path savepointDirectory; + private MiniClusterClient clusterClient; + private JobGraph jobGraph; + + @Before + public void setUp() throws Exception { + invokeLatch = new CountDownLatch(1); + triggerCheckpointLatch = new CountDownLatch(1); + savepointDirectory = temporaryFolder.newFolder().toPath(); + + Assume.assumeTrue( + "ClusterClient is not an instance of MiniClusterClient", + miniClusterResource.getClusterClient() instanceof MiniClusterClient); + + clusterClient = (MiniClusterClient) miniClusterResource.getClusterClient(); + clusterClient.setDetached(true); + + jobGraph = new JobGraph(); + + final JobVertex vertex = new JobVertex("testVertex")
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383623#comment-16383623 ] ASF GitHub Bot commented on FLINK-8459: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5622#discussion_r171858662 --- Diff: flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.CheckpointTriggerException; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Assume; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.isOneOf; + +/** + * Tests for {@link org.apache.flink.runtime.jobmaster.JobMaster#triggerSavepoint(String, boolean, Time)}. + * + * @see org.apache.flink.runtime.jobmaster.JobMaster + */ +@Category(Flip6.class) +public class JobMasterTriggerSavepointIT extends AbstractTestBase { + + private static CountDownLatch invokeLatch; + + private static volatile CountDownLatch triggerCheckpointLatch; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private Path savepointDirectory; + private MiniClusterClient clusterClient; + private JobGraph jobGraph; + + @Before + public void setUp() throws Exception { + invokeLatch = new CountDownLatch(1); + triggerCheckpointLatch = new CountDownLatch(1); + savepointDirectory = temporaryFolder.newFolder().toPath(); + + Assume.assumeTrue( --- End diff -- shouldn't happen if category is `flip6` > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >
[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5622#discussion_r171858662 --- Diff: flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.CheckpointTriggerException; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Assume; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.isOneOf; + +/** + * Tests for {@link org.apache.flink.runtime.jobmaster.JobMaster#triggerSavepoint(String, boolean, Time)}. + * + * @see org.apache.flink.runtime.jobmaster.JobMaster + */ +@Category(Flip6.class) +public class JobMasterTriggerSavepointIT extends AbstractTestBase { + + private static CountDownLatch invokeLatch; + + private static volatile CountDownLatch triggerCheckpointLatch; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private Path savepointDirectory; + private MiniClusterClient clusterClient; + private JobGraph jobGraph; + + @Before + public void setUp() throws Exception { + invokeLatch = new CountDownLatch(1); + triggerCheckpointLatch = new CountDownLatch(1); + savepointDirectory = temporaryFolder.newFolder().toPath(); + + Assume.assumeTrue( --- End diff -- shouldn't happen if category is `flip6` ---
[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5622#discussion_r171858593 --- Diff: flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.CheckpointTriggerException; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Assume; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.isOneOf; + +/** + * Tests for {@link org.apache.flink.runtime.jobmaster.JobMaster#triggerSavepoint(String, boolean, Time)}. + * + * @see org.apache.flink.runtime.jobmaster.JobMaster + */ +@Category(Flip6.class) +public class JobMasterTriggerSavepointIT extends AbstractTestBase { + + private static CountDownLatch invokeLatch; + + private static volatile CountDownLatch triggerCheckpointLatch; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private Path savepointDirectory; + private MiniClusterClient clusterClient; + private JobGraph jobGraph; + + @Before + public void setUp() throws Exception { + invokeLatch = new CountDownLatch(1); + triggerCheckpointLatch = new CountDownLatch(1); + savepointDirectory = temporaryFolder.newFolder().toPath(); + + Assume.assumeTrue( + "ClusterClient is not an instance of MiniClusterClient", + miniClusterResource.getClusterClient() instanceof MiniClusterClient); + + clusterClient = (MiniClusterClient) miniClusterResource.getClusterClient(); + clusterClient.setDetached(true); + + jobGraph = new JobGraph(); + + final JobVertex vertex = new JobVertex("testVertex"); + vertex.setInvokableClass(NoOpBlockingInvokable.class); + jobGraph.addVertex(vertex); + + jobGraph.setSnapshotSettings(new JobCheckpointingSettings( + Collections.singletonList(vertex.g