[jira] [Created] (FLINK-8830) YarnResourceManager throws NullPointerException

2018-03-02 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-8830:
-

 Summary: YarnResourceManager throws NullPointerException
 Key: FLINK-8830
 URL: https://issues.apache.org/jira/browse/FLINK-8830
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.5.0
Reporter: Piotr Nowojski


 
{code:java}
java.lang.NullPointerException
at java.io.File.(File.java:277)
at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:502)
at 
org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:445)
at 
org.apache.flink.yarn.YarnResourceManager.onContainersAllocated(YarnResourceManager.java:338)
at 
org.apache.flink.yarn.YarnResourceManagerTest$1.(YarnResourceManagerTest.java:340)
at 
org.apache.flink.yarn.YarnResourceManagerTest.testStopWorker(YarnResourceManagerTest.java:317)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
{code}
 

This exception is being thrown in 
`org.apache.flink.yarn.YarnResourceManagerTest#testStopWorker`. Exception 
apparently is being ignored, since the test completes. It seems like this line:
{code:java}
String fileLocation = 
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION){code}
Is not guarded against returned null value. I don't know if that's a test or 
production code issue.

CC [~till.rohrmann]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8806) Failure in UnionInputGate getNextBufferOrEvent()

2018-03-02 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8806?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber closed FLINK-8806.
--
Resolution: Duplicate

> Failure in UnionInputGate getNextBufferOrEvent()
> 
>
> Key: FLINK-8806
> URL: https://issues.apache.org/jira/browse/FLINK-8806
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0, 1.6.0
>
>
> Error occurs in {{SelfConnectionITCase}}:
> Full log: https://api.travis-ci.org/v3/job/346847455/log.txt
> Exception Stack Trace
> {code}
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.IllegalStateException
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:527)
>   at 
> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:79)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
>   at 
> org.apache.flink.test.streaming.runtime.SelfConnectionITCase.differentDataStreamDifferentChain(SelfConnectionITCase.java:158)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runners.Suite.runChild(Suite.java:128)
>   at org.junit.runners.Suite.runChild(Suite.java:27)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>   at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.createRequestAndRun(JUnitCoreWrapper.java:108)
>   at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.executeEager(JUnitCoreWrapper.java:78)
>   at 
> org.apache.maven.surefire.junitcore.JUnitCoreWrapper.execute(JUnitCoreWrapper.java:54)
>   at 
> org.apache.maven.surefire.junitcore.JUnitCoreProvider.invoke(JUnitCoreProvider.java:144)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
> Caused by: java.lang.IllegalStateException
>   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:173)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94)
>   at 
> org.apache.flink.streaming.runtime.io.StreamTwoInpu

[GitHub] flink pull request #5608: [FLINK-8821][TableAPI && SQL] Fix non-terminating ...

2018-03-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5608


---


[jira] [Commented] (FLINK-8821) Fix non-terminating decimal error

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383351#comment-16383351
 ] 

ASF GitHub Bot commented on FLINK-8821:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5608


> Fix non-terminating decimal error
> -
>
> Key: FLINK-8821
> URL: https://issues.apache.org/jira/browse/FLINK-8821
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>
> The DecimalAvgAggFunction lacks precision protection



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8821) Fix non-terminating decimal error

2018-03-02 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-8821.
-
   Resolution: Fixed
Fix Version/s: 1.6.0
   1.5.0

Fixed in 1.6.0: b31b707cb20f34633815718ff356e187f3397620
Fixed in 1.5.0: 347ec3848ec603ac452e394a5211cf888db6663f

> Fix non-terminating decimal error
> -
>
> Key: FLINK-8821
> URL: https://issues.apache.org/jira/browse/FLINK-8821
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> The DecimalAvgAggFunction lacks precision protection



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-02 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383356#comment-16383356
 ] 

yanxiaobin commented on FLINK-8794:
---

hi, [~pnowojski] ! Thank you for your suggestion! The downstream processor can 
ignore the files with "*pending" or "*in-progress" sufixes and "_" prefix, but 
I don't think it's a good way to deal with it. We can change this behaviour/add 
an option for BucketingSink to use temporary "in-progress" and "pending" 
directories instead of prefixes, but the temporary "in-progress" and "pending" 
directories is still also a subdirectory of the base directory, and the 
downstream processor may still read the base directory recursively, It also 
results in reading redundant dirty data. I think the temporary data produced 
during the program should be isolated from the final output data. Thanks!

Also [~kkl0u] could you elaborate why rescaling forced us to keep lingering 
files? 

 

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-02 Thread yanxiaobin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383356#comment-16383356
 ] 

yanxiaobin edited comment on FLINK-8794 at 3/2/18 8:40 AM:
---

hi, [~pnowojski] Thank you for your suggestion! The downstream processor can 
ignore the files with "*pending" or "*in-progress" sufixes and "_" prefix, but 
I don't think it's a good way to deal with it. We can change this behaviour/add 
an option for BucketingSink to use temporary "in-progress" and "pending" 
directories instead of prefixes, but the temporary "in-progress" and "pending" 
directories is still also a subdirectory of the base directory, and the 
downstream processor may still read the base directory recursively, It also 
results in reading redundant dirty data. I think the temporary data produced 
during the program should be isolated from the final output data. Thanks!


Also [~kkl0u] could you elaborate why rescaling forced us to keep lingering 
files? 

 


was (Author: backlight):
hi, [~pnowojski] ! Thank you for your suggestion! The downstream processor can 
ignore the files with "*pending" or "*in-progress" sufixes and "_" prefix, but 
I don't think it's a good way to deal with it. We can change this behaviour/add 
an option for BucketingSink to use temporary "in-progress" and "pending" 
directories instead of prefixes, but the temporary "in-progress" and "pending" 
directories is still also a subdirectory of the base directory, and the 
downstream processor may still read the base directory recursively, It also 
results in reading redundant dirty data. I think the temporary data produced 
during the program should be isolated from the final output data. Thanks!

Also [~kkl0u] could you elaborate why rescaling forced us to keep lingering 
files? 

 

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8831) Create SQL Client dependencies

2018-03-02 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8831:
---

 Summary: Create SQL Client dependencies
 Key: FLINK-8831
 URL: https://issues.apache.org/jira/browse/FLINK-8831
 Project: Flink
  Issue Type: New Feature
  Components: Table API & SQL
Reporter: Timo Walther
Assignee: Timo Walther


A first minimum version of FLIP-24 for the upcoming 
Flink SQL Client has been merged to the master. We also merged 
possibilities to discover and configure table sources without a single 
line of code using string-based properties and Java service provider 
discovery.

We are now facing the issue of how to manage dependencies in this new 
environment. It is different from how regular Flink projects are created 
(by setting up a a new Maven project and build a jar or fat jar). 
Ideally, a user should be able to select from a set of prepared 
connectors, catalogs, and formats. E.g., if a Kafka connector and Avro 
format is needed, all that should be required is to move a 
"flink-kafka.jar" and "flink-avro.jar" into the "sql_lib" directory that 
is shipped to a Flink cluster together with the SQL query.

[As discussed on 
ML|http://mail-archives.apache.org/mod_mbox/flink-dev/201802.mbox/%3C9c73518b-ec8e-3b01-f200-dea816c75efc%40apache.org%3E],
 we will build fat jars for these modules with every Flink release that can be 
hostet somewhere (e.g. Apache infrastructure, but not Maven central). This 
would make it very easy to add a dependency by downloading the prepared JAR 
files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8832) Create a SQL Client Kafka fat-jar

2018-03-02 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8832:
---

 Summary: Create a SQL Client Kafka fat-jar
 Key: FLINK-8832
 URL: https://issues.apache.org/jira/browse/FLINK-8832
 Project: Flink
  Issue Type: Sub-task
  Components: Table API & SQL
Reporter: Timo Walther
Assignee: Timo Walther


Create fat-jars for Apache Kafka.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8833) Create SQL Client JSON format fat-jar

2018-03-02 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8833:
---

 Summary: Create SQL Client JSON format fat-jar
 Key: FLINK-8833
 URL: https://issues.apache.org/jira/browse/FLINK-8833
 Project: Flink
  Issue Type: Sub-task
  Components: Table API & SQL
Reporter: Timo Walther
Assignee: Timo Walther


Create a fat-jar for flink-json.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-7282) Credit-based Network Flow Control

2018-03-02 Thread zhijiang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang reassigned FLINK-7282:
---

Assignee: zhijiang

> Credit-based Network Flow Control
> -
>
> Key: FLINK-7282
> URL: https://issues.apache.org/jira/browse/FLINK-7282
> Project: Flink
>  Issue Type: New Feature
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> This is a part of work for network stack improvements proposed in 
> [~StephanEwen]  's 
> [FLIP|https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit#]
> Backpressure currently happens very naturally through the TCP network 
> connections and the bounded buffering capacity. The downsides are :
> * All channels multiplexed into the same TCP connection stall together, as 
> soon as one channel has backpressure.
> * Under backpressure, connections can not transport checkpoint barriers.
> This flink-managed flow control is similar to the window-based advertisement 
> mechanism in TCP. The basic approaches are the following:
> * Each RemoteInputChannel has fixed exclusive buffers as initial credits, and 
> SingleInputGate has a fixed buffer pool for managing floating buffers for all 
> RemoteInputChannels.
> * RemoteInputChannel as receiver notifies the current available credits to 
> the sender side.
> * Senders must never send buffers without credit, that means all the buffers 
> sent must be accepted by receivers so no buffers accumulated on the network 
> wire.  
> * Senders also send the current size of backlog that indicates how many 
> buffers are available on the sender side. The receivers use this information 
> to decide how to request floating buffers from the fixed buffer pool.
> To avoid immediate commits affecting master branch, it will be implemented 
> into a separate feature branch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8833) Create a SQL Client JSON format fat-jar

2018-03-02 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-8833:

Summary: Create a SQL Client JSON format fat-jar  (was: Create SQL Client 
JSON format fat-jar)

> Create a SQL Client JSON format fat-jar
> ---
>
> Key: FLINK-8833
> URL: https://issues.apache.org/jira/browse/FLINK-8833
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> Create a fat-jar for flink-json.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8828) Add collect method to DataStream / DataSet scala api

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383391#comment-16383391
 ] 

ASF GitHub Bot commented on FLINK-8828:
---

GitHub user jelmerk opened a pull request:

https://github.com/apache/flink/pull/5616

[FLINK-8828] [stream, dataset, scala] Introduce collect method

## What is the purpose of the change

A collect function is a method that takes a Partial Function as its 
parameter and applies it to all the elements in the collection to create a new 
collection which satisfies the Partial Function. It makes certain things nicer 
to express 

## Brief change log

- added collect method on scala dataset and datastream api

## Verifying this change

It seems to be hard to find a place where this could be tested in 
isolation, suggestions welcome

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
  - The serializers: don't know
  - The runtime per-record code paths (performance sensitive): yes
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper:don't know
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? JavaDocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jelmerk/flink collect_support

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5616.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5616


commit 61b4bcb7c941950d62d0db0aa2041f1796fdbaaf
Author: Jelmer Kuperus 
Date:   2018-03-02T09:17:54Z

[FLINK-8828] [stream, dataset, scala] Introduce collect method




> Add collect method to DataStream / DataSet scala api
> 
>
> Key: FLINK-8828
> URL: https://issues.apache.org/jira/browse/FLINK-8828
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataSet API, DataStream API, Scala API
>Affects Versions: 1.4.0
>Reporter: Jelmer Kuperus
>Priority: Major
>
> A collect function is a method that takes a Partial Function as its parameter 
> and applies it to all the elements in the collection to create a new 
> collection which satisfies the Partial Function.
> It can be found on all [core scala collection 
> classes|http://www.scala-lang.org/api/2.9.2/scala/collection/TraversableLike.html]
>  as well as on spark's [rdd 
> interface|https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD]
> To understand its utility imagine the following scenario :
> Given a DataStream that produces events of type _Purchase_ and _View_ 
> Transform this stream into a stream of purchase amounts over 1000 euros.
> Currently an implementation might look like
> {noformat}
> val x = dataStream
>   .filter(_.isInstanceOf[Purchase])
>   .map(_.asInstanceOf[Purchase])
>   .filter(_.amount > 1000)
>   .map(_.amount){noformat}
> Or alternatively you could do this
> {noformat}
> dataStream.flatMap(_ match {
>   case p: Purchase if p.amount > 1000 => Some(p.amount)
>   case _ => None
> }){noformat}
> But with collect implemented it could look like
> {noformat}
> dataStream.collect {
>   case p: Purchase if p.amount > 1000 => p.amount
> }{noformat}
>  
> Which is a lot nicer to both read and write



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5616: [FLINK-8828] [stream, dataset, scala] Introduce co...

2018-03-02 Thread jelmerk
GitHub user jelmerk opened a pull request:

https://github.com/apache/flink/pull/5616

[FLINK-8828] [stream, dataset, scala] Introduce collect method

## What is the purpose of the change

A collect function is a method that takes a Partial Function as its 
parameter and applies it to all the elements in the collection to create a new 
collection which satisfies the Partial Function. It makes certain things nicer 
to express 

## Brief change log

- added collect method on scala dataset and datastream api

## Verifying this change

It seems to be hard to find a place where this could be tested in 
isolation, suggestions welcome

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
  - The serializers: don't know
  - The runtime per-record code paths (performance sensitive): yes
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper:don't know
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? JavaDocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jelmerk/flink collect_support

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5616.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5616


commit 61b4bcb7c941950d62d0db0aa2041f1796fdbaaf
Author: Jelmer Kuperus 
Date:   2018-03-02T09:17:54Z

[FLINK-8828] [stream, dataset, scala] Introduce collect method




---


[jira] [Updated] (FLINK-8799) Make AbstractYarnClusterDescriptor immutable

2018-03-02 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8799?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-8799:

Issue Type: Improvement  (was: Bug)

> Make AbstractYarnClusterDescriptor immutable
> 
>
> Key: FLINK-8799
> URL: https://issues.apache.org/jira/browse/FLINK-8799
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.6.0
>
>
> {{AbstractYarnClusterDescriptor}} should be made immutable. Currently, its 
> internal configuration is modified from different places which makes it 
> difficult to reason about the code. For example, it should not be possible to 
> modify the {{zookeeperNamespace}} using a setter method. A user of this class 
> should be forced to provide all information prior to creating the instance, 
> e.g., by passing a {{org.apache.flink.configuration.Configuration}} object.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5617: [FLINK-8799][YARN] Make AbstractYarnClusterDescrip...

2018-03-02 Thread yanghua
GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/5617

[FLINK-8799][YARN] Make AbstractYarnClusterDescriptor immutable

## What is the purpose of the change

*This pull request Make AbstractYarnClusterDescriptor immutable*


## Brief change log

  - *removed or closed some setter accessor in class 
`AbstractYarnClusterDescriptor`*
  - *deleted some set property code and replaced with adding option to 
`Configuration` instance*
  - *fetch the config item from `Configuration` and init the field for 
`AbstractYarnClusterDescriptor`*
  - *add some config to `YarnConfigOptions`*
  - *fixed some old test cast and some new test case for refactored config 
properties*


## Verifying this change

This change added tests and can be verified as follows:

  - *fixed some old test cast and some new test case for refactored config 
properties such as flink jar path and name and so on*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? ( no)
  - If yes, how is the feature documented? (not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-8799

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5617.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5617


commit f04f8d68a0859923dcdba594ce22b5f420305df5
Author: vinoyang 
Date:   2018-03-02T09:22:54Z

[FLINK-8799][YARN] Make AbstractYarnClusterDescriptor immutable




---


[jira] [Commented] (FLINK-8799) Make AbstractYarnClusterDescriptor immutable

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383393#comment-16383393
 ] 

ASF GitHub Bot commented on FLINK-8799:
---

GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/5617

[FLINK-8799][YARN] Make AbstractYarnClusterDescriptor immutable

## What is the purpose of the change

*This pull request Make AbstractYarnClusterDescriptor immutable*


## Brief change log

  - *removed or closed some setter accessor in class 
`AbstractYarnClusterDescriptor`*
  - *deleted some set property code and replaced with adding option to 
`Configuration` instance*
  - *fetch the config item from `Configuration` and init the field for 
`AbstractYarnClusterDescriptor`*
  - *add some config to `YarnConfigOptions`*
  - *fixed some old test cast and some new test case for refactored config 
properties*


## Verifying this change

This change added tests and can be verified as follows:

  - *fixed some old test cast and some new test case for refactored config 
properties such as flink jar path and name and so on*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? ( no)
  - If yes, how is the feature documented? (not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-8799

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5617.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5617


commit f04f8d68a0859923dcdba594ce22b5f420305df5
Author: vinoyang 
Date:   2018-03-02T09:22:54Z

[FLINK-8799][YARN] Make AbstractYarnClusterDescriptor immutable




> Make AbstractYarnClusterDescriptor immutable
> 
>
> Key: FLINK-8799
> URL: https://issues.apache.org/jira/browse/FLINK-8799
> Project: Flink
>  Issue Type: Improvement
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.6.0
>
>
> {{AbstractYarnClusterDescriptor}} should be made immutable. Currently, its 
> internal configuration is modified from different places which makes it 
> difficult to reason about the code. For example, it should not be possible to 
> modify the {{zookeeperNamespace}} using a setter method. A user of this class 
> should be forced to provide all information prior to creating the instance, 
> e.g., by passing a {{org.apache.flink.configuration.Configuration}} object.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8830) YarnResourceManager throws NullPointerException

2018-03-02 Thread vinoyang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-8830:
---

Assignee: vinoyang

> YarnResourceManager throws NullPointerException
> ---
>
> Key: FLINK-8830
> URL: https://issues.apache.org/jira/browse/FLINK-8830
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: vinoyang
>Priority: Major
>
>  
> {code:java}
> java.lang.NullPointerException
> at java.io.File.(File.java:277)
> at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:502)
> at 
> org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:445)
> at 
> org.apache.flink.yarn.YarnResourceManager.onContainersAllocated(YarnResourceManager.java:338)
> at 
> org.apache.flink.yarn.YarnResourceManagerTest$1.(YarnResourceManagerTest.java:340)
> at 
> org.apache.flink.yarn.YarnResourceManagerTest.testStopWorker(YarnResourceManagerTest.java:317)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
> at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
>  
> This exception is being thrown in 
> `org.apache.flink.yarn.YarnResourceManagerTest#testStopWorker`. Exception 
> apparently is being ignored, since the test completes. It seems like this 
> line:
> {code:java}
> String fileLocation = 
> System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION){code}
> Is not guarded against returned null value. I don't know if that's a test or 
> production code issue.
> CC [~till.rohrmann]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8834) Job fails to restart due to some tasks stuck in cancelling state

2018-03-02 Thread Daniel Harper (JIRA)
Daniel Harper created FLINK-8834:


 Summary: Job fails to restart due to some tasks stuck in 
cancelling state
 Key: FLINK-8834
 URL: https://issues.apache.org/jira/browse/FLINK-8834
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.4.0
 Environment: EMR 5.12

Flink 1.4.0

Beam 2.3.0
Reporter: Daniel Harper


Our job threw an exception overnight, causing the job to commence attempting a 
restart.

However it never managed to restart because 2 tasks are stuck in "Cancelling" 
state, with the following exception
{code:java}
2018-03-02 02:29:31,604 WARN  org.apache.flink.runtime.taskmanager.Task 
    - Task 'PTransformTranslation.UnknownRawPTransform -> 
ParDoTranslation.RawParDo -> ParDoTranslation.RawParDo -> 
uk.co.bbc.sawmill.streaming.pipeline.output.io.file.WriteWindowToFile-RDotRecord/TextIO.Write/WriteFiles/GatherTempFileResults/Reshuffle/Window.Into()/Window.Assign.out
 -> ParDoTranslation.RawParDo -> ToKeyedWorkItem (24/32)' did not react to 
cancelling signal, but is stuck in method:
 java.lang.Thread.blockedOn(Thread.java:239)
java.lang.System$2.blockedOn(System.java:1252)
java.nio.channels.spi.AbstractInterruptibleChannel.blockedOn(AbstractInterruptibleChannel.java:211)
java.nio.channels.spi.AbstractInterruptibleChannel.begin(AbstractInterruptibleChannel.java:170)
java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:457)
java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
java.nio.channels.Channels.writeFully(Channels.java:101)
java.nio.channels.Channels.access$000(Channels.java:61)
java.nio.channels.Channels$1.write(Channels.java:174)
java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458)
java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
java.nio.channels.Channels.writeFully(Channels.java:101)
java.nio.channels.Channels.access$000(Channels.java:61)
java.nio.channels.Channels$1.write(Channels.java:174)
sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
sun.nio.cs.StreamEncoder.write(StreamEncoder.java:135)
java.io.OutputStreamWriter.write(OutputStreamWriter.java:220)
java.io.Writer.write(Writer.java:157)
org.apache.beam.sdk.io.TextSink$TextWriter.writeLine(TextSink.java:102)
org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:118)
org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:76)
org.apache.beam.sdk.io.WriteFiles.writeOrClose(WriteFiles.java:550)
org.apache.beam.sdk.io.WriteFiles.access$1000(WriteFiles.java:112)
org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:718)
org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138)
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:425)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:888)
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:865)
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:94)
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:87)
org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1040)
org.apache.beam.runners.core.ReduceFnRunner$$Lambda$163/1408647946.output(Unknown
 Source)
org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:433)
org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceF

[jira] [Updated] (FLINK-8834) Job fails to restart due to some tasks stuck in cancelling state

2018-03-02 Thread Daniel Harper (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Harper updated FLINK-8834:
-
Environment: 
AWS EMR 5.12

Flink 1.4.0

Beam 2.3.0

  was:
EMR 5.12

Flink 1.4.0

Beam 2.3.0


> Job fails to restart due to some tasks stuck in cancelling state
> 
>
> Key: FLINK-8834
> URL: https://issues.apache.org/jira/browse/FLINK-8834
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
> Environment: AWS EMR 5.12
> Flink 1.4.0
> Beam 2.3.0
>Reporter: Daniel Harper
>Priority: Major
>
> Our job threw an exception overnight, causing the job to commence attempting 
> a restart.
> However it never managed to restart because 2 tasks are stuck in "Cancelling" 
> state, with the following exception
> {code:java}
> 2018-03-02 02:29:31,604 WARN  org.apache.flink.runtime.taskmanager.Task   
>   - Task 'PTransformTranslation.UnknownRawPTransform -> 
> ParDoTranslation.RawParDo -> ParDoTranslation.RawParDo -> 
> uk.co.bbc.sawmill.streaming.pipeline.output.io.file.WriteWindowToFile-RDotRecord/TextIO.Write/WriteFiles/GatherTempFileResults/Reshuffle/Window.Into()/Window.Assign.out
>  -> ParDoTranslation.RawParDo -> ToKeyedWorkItem (24/32)' did not react to 
> cancelling signal, but is stuck in method:
>  java.lang.Thread.blockedOn(Thread.java:239)
> java.lang.System$2.blockedOn(System.java:1252)
> java.nio.channels.spi.AbstractInterruptibleChannel.blockedOn(AbstractInterruptibleChannel.java:211)
> java.nio.channels.spi.AbstractInterruptibleChannel.begin(AbstractInterruptibleChannel.java:170)
> java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:457)
> java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
> java.nio.channels.Channels.writeFully(Channels.java:101)
> java.nio.channels.Channels.access$000(Channels.java:61)
> java.nio.channels.Channels$1.write(Channels.java:174)
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458)
> java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
> java.nio.channels.Channels.writeFully(Channels.java:101)
> java.nio.channels.Channels.access$000(Channels.java:61)
> java.nio.channels.Channels$1.write(Channels.java:174)
> sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
> sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
> sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
> sun.nio.cs.StreamEncoder.write(StreamEncoder.java:135)
> java.io.OutputStreamWriter.write(OutputStreamWriter.java:220)
> java.io.Writer.write(Writer.java:157)
> org.apache.beam.sdk.io.TextSink$TextWriter.writeLine(TextSink.java:102)
> org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:118)
> org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:76)
> org.apache.beam.sdk.io.WriteFiles.writeOrClose(WriteFiles.java:550)
> org.apache.beam.sdk.io.WriteFiles.access$1000(WriteFiles.java:112)
> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:718)
> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138)
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:425)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:888)
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:865)
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:94)
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.out

[jira] [Updated] (FLINK-8834) Job fails to restart due to some tasks stuck in cancelling state

2018-03-02 Thread Daniel Harper (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Harper updated FLINK-8834:
-
Description: 
Our job threw an exception overnight, causing the job to commence attempting a 
restart.

However it never managed to restart because 2 tasks on one of the Task Managers 
are stuck in "Cancelling" state, with the following exception
{code:java}
2018-03-02 02:29:31,604 WARN  org.apache.flink.runtime.taskmanager.Task 
    - Task 'PTransformTranslation.UnknownRawPTransform -> 
ParDoTranslation.RawParDo -> ParDoTranslation.RawParDo -> 
uk.co.bbc.sawmill.streaming.pipeline.output.io.file.WriteWindowToFile-RDotRecord/TextIO.Write/WriteFiles/GatherTempFileResults/Reshuffle/Window.Into()/Window.Assign.out
 -> ParDoTranslation.RawParDo -> ToKeyedWorkItem (24/32)' did not react to 
cancelling signal, but is stuck in method:
 java.lang.Thread.blockedOn(Thread.java:239)
java.lang.System$2.blockedOn(System.java:1252)
java.nio.channels.spi.AbstractInterruptibleChannel.blockedOn(AbstractInterruptibleChannel.java:211)
java.nio.channels.spi.AbstractInterruptibleChannel.begin(AbstractInterruptibleChannel.java:170)
java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:457)
java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
java.nio.channels.Channels.writeFully(Channels.java:101)
java.nio.channels.Channels.access$000(Channels.java:61)
java.nio.channels.Channels$1.write(Channels.java:174)
java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458)
java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
java.nio.channels.Channels.writeFully(Channels.java:101)
java.nio.channels.Channels.access$000(Channels.java:61)
java.nio.channels.Channels$1.write(Channels.java:174)
sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
sun.nio.cs.StreamEncoder.write(StreamEncoder.java:135)
java.io.OutputStreamWriter.write(OutputStreamWriter.java:220)
java.io.Writer.write(Writer.java:157)
org.apache.beam.sdk.io.TextSink$TextWriter.writeLine(TextSink.java:102)
org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:118)
org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:76)
org.apache.beam.sdk.io.WriteFiles.writeOrClose(WriteFiles.java:550)
org.apache.beam.sdk.io.WriteFiles.access$1000(WriteFiles.java:112)
org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:718)
org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138)
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:425)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:888)
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:865)
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:94)
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:87)
org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1(ReduceFnRunner.java:1040)
org.apache.beam.runners.core.ReduceFnRunner$$Lambda$163/1408647946.output(Unknown
 Source)
org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output(ReduceFnContextFactory.java:433)
org.apache.beam.runners.core.SystemReduceFn.onTrigger(SystemReduceFn.java:127)
org.apache.beam.runners.core.ReduceFnRunner.onTrigger(ReduceFnRunner.java:1043)
org.apache.beam.runners.core.ReduceFnRunner.emit(ReduceFnRunner.java:911)
org.apache.beam.runners.core.Red

[jira] [Closed] (FLINK-8751) Canceling a job results in a InterruptedException in the TM

2018-03-02 Thread Stefan Richter (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-8751.
-
   Resolution: Fixed
 Assignee: Stefan Richter
Fix Version/s: 1.5.0

Fixed in f9a583b.

> Canceling a job results in a InterruptedException in the TM
> ---
>
> Key: FLINK-8751
> URL: https://issues.apache.org/jira/browse/FLINK-8751
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.4.1
>Reporter: Elias Levy
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> Canceling a job results in the following exception reported by the TM: 
> {code:java}
> ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Could 
> not shut down timer service java.lang.InterruptedException 
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown
>  Source) at java.util.concurrent.ThreadPoolExecutor.awaitTermination(Unknown 
> Source) 
>   at 
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService.shutdownAndAwaitPending(SystemProcessingTimeService.java:197)
>  
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:317)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) 
>   at java.lang.Thread.run(Unknown Source){code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5607: [hotfix][docs] Drop the incorrect parallel remark in wind...

2018-03-02 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/5607
  
Thanks :)


---


[jira] [Commented] (FLINK-8830) YarnResourceManager throws NullPointerException

2018-03-02 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383435#comment-16383435
 ] 

Till Rohrmann commented on FLINK-8830:
--

Looks like a bug. We should add a guard.

> YarnResourceManager throws NullPointerException
> ---
>
> Key: FLINK-8830
> URL: https://issues.apache.org/jira/browse/FLINK-8830
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: vinoyang
>Priority: Major
>
>  
> {code:java}
> java.lang.NullPointerException
> at java.io.File.(File.java:277)
> at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:502)
> at 
> org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:445)
> at 
> org.apache.flink.yarn.YarnResourceManager.onContainersAllocated(YarnResourceManager.java:338)
> at 
> org.apache.flink.yarn.YarnResourceManagerTest$1.(YarnResourceManagerTest.java:340)
> at 
> org.apache.flink.yarn.YarnResourceManagerTest.testStopWorker(YarnResourceManagerTest.java:317)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
> at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
>  
> This exception is being thrown in 
> `org.apache.flink.yarn.YarnResourceManagerTest#testStopWorker`. Exception 
> apparently is being ignored, since the test completes. It seems like this 
> line:
> {code:java}
> String fileLocation = 
> System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION){code}
> Is not guarded against returned null value. I don't know if that's a test or 
> production code issue.
> CC [~till.rohrmann]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5600: [FLINK-8811] [flip6] Add initial implementation of...

2018-03-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5600#discussion_r171814808
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 ---
@@ -178,6 +181,13 @@ public URI getRestAddress() {
}
}
 
+   public HighAvailabilityServices getHighAvailabilityServices() {
+   synchronized (lock) {
+   checkState(running, "MiniCluster is not yet running.");
--- End diff --

Because all state accesses to the MiniCluster are guarded since it can be 
used by potentially multiple threads.


---


[jira] [Commented] (FLINK-8811) Add MiniClusterClient to allow fast MiniCluster operations

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383436#comment-16383436
 ] 

ASF GitHub Bot commented on FLINK-8811:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5600#discussion_r171814808
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 ---
@@ -178,6 +181,13 @@ public URI getRestAddress() {
}
}
 
+   public HighAvailabilityServices getHighAvailabilityServices() {
+   synchronized (lock) {
+   checkState(running, "MiniCluster is not yet running.");
--- End diff --

Because all state accesses to the MiniCluster are guarded since it can be 
used by potentially multiple threads.


> Add MiniClusterClient to allow fast MiniCluster operations
> --
>
> Key: FLINK-8811
> URL: https://issues.apache.org/jira/browse/FLINK-8811
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: flip-6
> Fix For: 1.5.0, 1.6.0
>
>
> We should offer a {{ClusterClient}} implementation for the {{MiniCluster}}. 
> That way we would be able to submit and wait for result without polling how 
> it would be the case by using the {{RestClusterClient}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8830) YarnResourceManager throws NullPointerException

2018-03-02 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383435#comment-16383435
 ] 

Till Rohrmann edited comment on FLINK-8830 at 3/2/18 10:39 AM:
---

Looks like a bug. We should add a guard. The good thing is that it happens in a 
{{try-catch}} block. Therefore, it won't crash the system.


was (Author: till.rohrmann):
Looks like a bug. We should add a guard.

> YarnResourceManager throws NullPointerException
> ---
>
> Key: FLINK-8830
> URL: https://issues.apache.org/jira/browse/FLINK-8830
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: vinoyang
>Priority: Major
>
>  
> {code:java}
> java.lang.NullPointerException
> at java.io.File.(File.java:277)
> at org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:502)
> at 
> org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:445)
> at 
> org.apache.flink.yarn.YarnResourceManager.onContainersAllocated(YarnResourceManager.java:338)
> at 
> org.apache.flink.yarn.YarnResourceManagerTest$1.(YarnResourceManagerTest.java:340)
> at 
> org.apache.flink.yarn.YarnResourceManagerTest.testStopWorker(YarnResourceManagerTest.java:317)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
> at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
> at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
>  
> This exception is being thrown in 
> `org.apache.flink.yarn.YarnResourceManagerTest#testStopWorker`. Exception 
> apparently is being ignored, since the test completes. It seems like this 
> line:
> {code:java}
> String fileLocation = 
> System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION){code}
> Is not guarded against returned null value. I don't know if that's a test or 
> production code issue.
> CC [~till.rohrmann]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-6968) Store streaming, updating tables with unique key in queryable state

2018-03-02 Thread Renjie Liu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Renjie Liu reassigned FLINK-6968:
-

Assignee: Renjie Liu

> Store streaming, updating tables with unique key in queryable state
> ---
>
> Key: FLINK-6968
> URL: https://issues.apache.org/jira/browse/FLINK-6968
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Renjie Liu
>Priority: Major
>
> Streaming tables with unique key are continuously updated. For example 
> queries with a non-windowed aggregation generate such tables. Commonly, such 
> updating tables are emitted via an upsert table sink to an external datastore 
> (k-v store, database) to make it accessible to applications.
> This issue is about adding a feature to store and maintain such a table as 
> queryable state in Flink. By storing the table in Flnk's queryable state, we 
> do not need an external data store to access the results of the query but can 
> query the results directly from Flink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8794) When using BucketingSink, it happens that one of the files is always in the [.in-progress] state

2018-03-02 Thread Piotr Nowojski (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383441#comment-16383441
 ] 

Piotr Nowojski commented on FLINK-8794:
---

The temporary data is already separated from the final output - it's in 
different files. If we allow for different directory that should be already 
enough.

Besides, writing to local disks would decrease performance, since you would 
need to write the same data twice (first locally then copy remotely, which is 
unnecessary, while moving files between directories is cheap) and stil 
"pending" files would have to be copied to remote location, since in some cases 
"pending" files are committed during recovery. Thus it wouldn't solve your 
problem.

> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state
> 
>
> Key: FLINK-8794
> URL: https://issues.apache.org/jira/browse/FLINK-8794
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector
>Affects Versions: 1.4.0, 1.4.1
>Reporter: yanxiaobin
>Priority: Major
>
> When using BucketingSink, it happens that one of the files is always in the 
> [.in-progress] state. And this state has never changed after that.  The 
> underlying use of S3 as storage.
>  
> {code:java}
> // code placeholder
> {code}
> 2018-02-28 11:58:42  147341619 {color:#d04437}_part-28-0.in-progress{color}
> 2018-02-28 12:06:27  147315059 part-0-0
> 2018-02-28 12:06:27  147462359 part-1-0
> 2018-02-28 12:06:27  147316006 part-10-0
> 2018-02-28 12:06:28  147349854 part-100-0
> 2018-02-28 12:06:27  147421625 part-101-0
> 2018-02-28 12:06:27  147443830 part-102-0
> 2018-02-28 12:06:27  147372801 part-103-0
> 2018-02-28 12:06:27  147343670 part-104-0
> ..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5621) Flink should provide a mechanism to prevent scheduling tasks on TaskManagers with operational issues

2018-03-02 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383439#comment-16383439
 ] 

Till Rohrmann commented on FLINK-5621:
--

Hi [~yanghua], I think such a feature would indeed be a nice addition for 
Flink. Black-listing TMs with known issues could be done in the 
{{ResourceManager}}. We could also add a RPC call which tells the {{TMs}} to 
shut down in such a case.

> Flink should provide a mechanism to prevent scheduling tasks on TaskManagers 
> with operational issues
> 
>
> Key: FLINK-5621
> URL: https://issues.apache.org/jira/browse/FLINK-5621
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.1.4
>Reporter: Jamie Grier
>Priority: Critical
>
> There are cases where jobs can get into a state where no progress can be made 
> if there is something pathologically wrong with one of the TaskManager nodes 
> in the cluster.
> An example of this would be a TaskManager on a machine that runs out of disk 
> space.  Flink never considers the TM to be "bad" and will keep using it to 
> attempt to run tasks -- which will continue to fail.
> A suggestion for overcoming this would be to allow an option where a TM will 
> commit suicide if that TM was the source of an exception that caused a job to 
> fail/restart.
> I'm sure there are plenty of other approaches to solving this..



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-5206) Flakey PythonPlanBinderTest

2018-03-02 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reopened FLINK-5206:
--

It seems to happen again with the following exception.
{code:java}
java.io.IOException: Mkdirs failed to create /tmp/flink
at 
org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:271)
at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121)
at 
org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:248)
at org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:88)
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:202)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748){code}
https://api.travis-ci.org/v3/job/347676340/log.txt

> Flakey PythonPlanBinderTest
> ---
>
> Key: FLINK-5206
> URL: https://issues.apache.org/jira/browse/FLINK-5206
> Project: Flink
>  Issue Type: Bug
>  Components: Python API
>Affects Versions: 1.2.0
> Environment: in TravisCI
>Reporter: Nico Kruber
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: test-stability
> Fix For: 1.2.0
>
>
> {code:none}
> ---
>  T E S T S
> ---
> Running org.apache.flink.python.api.PythonPlanBinderTest
> Job execution failed.
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:903)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:846)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:846)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.io.IOException: Output path '/tmp/flink/result2' could not be 
> initialized. Canceling task...
>   at 
> org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:233)
>   at 
> org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:78)
>   at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:178)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
>   at java.lang.Thread.run(Thread.java:745)
> Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 36.891 sec 
> <<< FAILURE! - in org.apache.flink.python.api.PythonPlanBinderTest
> testJobWithoutObjectReuse(org.apache.flink.python.api.PythonPlanBinderTest)  
> Time elapsed: 11.53 sec  <<< FAILURE!
> java.lang.AssertionError: Error while calling the test program: Job execution 
> failed.
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:180)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRun

[jira] [Updated] (FLINK-5206) Flakey PythonPlanBinderTest

2018-03-02 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-5206:
-
Affects Version/s: 1.6.0

> Flakey PythonPlanBinderTest
> ---
>
> Key: FLINK-5206
> URL: https://issues.apache.org/jira/browse/FLINK-5206
> Project: Flink
>  Issue Type: Bug
>  Components: Python API
>Affects Versions: 1.2.0, 1.5.0, 1.6.0
> Environment: in TravisCI
>Reporter: Nico Kruber
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: test-stability
> Fix For: 1.2.0
>
>
> {code:none}
> ---
>  T E S T S
> ---
> Running org.apache.flink.python.api.PythonPlanBinderTest
> Job execution failed.
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:903)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:846)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:846)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.io.IOException: Output path '/tmp/flink/result2' could not be 
> initialized. Canceling task...
>   at 
> org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:233)
>   at 
> org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:78)
>   at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:178)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
>   at java.lang.Thread.run(Thread.java:745)
> Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 36.891 sec 
> <<< FAILURE! - in org.apache.flink.python.api.PythonPlanBinderTest
> testJobWithoutObjectReuse(org.apache.flink.python.api.PythonPlanBinderTest)  
> Time elapsed: 11.53 sec  <<< FAILURE!
> java.lang.AssertionError: Error while calling the test program: Job execution 
> failed.
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:180)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
>   at 
> org.a

[jira] [Updated] (FLINK-5206) Flakey PythonPlanBinderTest

2018-03-02 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-5206:
-
Affects Version/s: 1.5.0

> Flakey PythonPlanBinderTest
> ---
>
> Key: FLINK-5206
> URL: https://issues.apache.org/jira/browse/FLINK-5206
> Project: Flink
>  Issue Type: Bug
>  Components: Python API
>Affects Versions: 1.2.0, 1.5.0, 1.6.0
> Environment: in TravisCI
>Reporter: Nico Kruber
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.2.0
>
>
> {code:none}
> ---
>  T E S T S
> ---
> Running org.apache.flink.python.api.PythonPlanBinderTest
> Job execution failed.
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:903)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:846)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:846)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.io.IOException: Output path '/tmp/flink/result2' could not be 
> initialized. Canceling task...
>   at 
> org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:233)
>   at 
> org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:78)
>   at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:178)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
>   at java.lang.Thread.run(Thread.java:745)
> Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 36.891 sec 
> <<< FAILURE! - in org.apache.flink.python.api.PythonPlanBinderTest
> testJobWithoutObjectReuse(org.apache.flink.python.api.PythonPlanBinderTest)  
> Time elapsed: 11.53 sec  <<< FAILURE!
> java.lang.AssertionError: Error while calling the test program: Job execution 
> failed.
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:180)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
>   at 
> or

[jira] [Updated] (FLINK-5206) Flakey PythonPlanBinderTest

2018-03-02 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-5206:
-
Priority: Critical  (was: Major)

> Flakey PythonPlanBinderTest
> ---
>
> Key: FLINK-5206
> URL: https://issues.apache.org/jira/browse/FLINK-5206
> Project: Flink
>  Issue Type: Bug
>  Components: Python API
>Affects Versions: 1.2.0, 1.5.0, 1.6.0
> Environment: in TravisCI
>Reporter: Nico Kruber
>Assignee: Chesnay Schepler
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.2.0
>
>
> {code:none}
> ---
>  T E S T S
> ---
> Running org.apache.flink.python.api.PythonPlanBinderTest
> Job execution failed.
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:903)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:846)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:846)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.io.IOException: Output path '/tmp/flink/result2' could not be 
> initialized. Canceling task...
>   at 
> org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:233)
>   at 
> org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:78)
>   at 
> org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:178)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
>   at java.lang.Thread.run(Thread.java:745)
> Tests run: 3, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 36.891 sec 
> <<< FAILURE! - in org.apache.flink.python.api.PythonPlanBinderTest
> testJobWithoutObjectReuse(org.apache.flink.python.api.PythonPlanBinderTest)  
> Time elapsed: 11.53 sec  <<< FAILURE!
> java.lang.AssertionError: Error while calling the test program: Job execution 
> failed.
>   at org.junit.Assert.fail(Assert.java:88)
>   at 
> org.apache.flink.test.util.JavaProgramTestBase.testJobWithoutObjectReuse(JavaProgramTestBase.java:180)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
>   

[GitHub] flink issue #5394: [FLINK-6571][tests] Catch InterruptedException in StreamS...

2018-03-02 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5394
  
I have often handled it like one of the below variants. What do you think 
about that pattern?

### Variant 1: Handle interruption if still running
```java
public void run(SourceContext ctx) throws Exception {
while (running) {
try {
// do stuff
Thread.sleep(20);
} catch (InterruptedException e) {
// restore interruption flag
Thread.currentThread().interrupt();
if (running) {
throw new FlinkException("interrupted while still running", 
e);
}
// else fall through the loop
}
}
```

### Variant 2: Simple let InterruptedException bubble out

This variant is also fine, because the Task status is set to CANCELED 
before the interruption, so any exception bubbling out be suppresses.

```java
public void run(SourceContext ctx) throws Exception {
while (running) {
// do stuff

// the InterruptedException from here simply fails the execution
Thread.sleep(20);
}
}
```


---


[jira] [Commented] (FLINK-6571) InfiniteSource in SourceStreamOperatorTest should deal with InterruptedExceptions

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383448#comment-16383448
 ] 

ASF GitHub Bot commented on FLINK-6571:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5394
  
I have often handled it like one of the below variants. What do you think 
about that pattern?

### Variant 1: Handle interruption if still running
```java
public void run(SourceContext ctx) throws Exception {
while (running) {
try {
// do stuff
Thread.sleep(20);
} catch (InterruptedException e) {
// restore interruption flag
Thread.currentThread().interrupt();
if (running) {
throw new FlinkException("interrupted while still running", 
e);
}
// else fall through the loop
}
}
```

### Variant 2: Simple let InterruptedException bubble out

This variant is also fine, because the Task status is set to CANCELED 
before the interruption, so any exception bubbling out be suppresses.

```java
public void run(SourceContext ctx) throws Exception {
while (running) {
// do stuff

// the InterruptedException from here simply fails the execution
Thread.sleep(20);
}
}
```


> InfiniteSource in SourceStreamOperatorTest should deal with 
> InterruptedExceptions
> -
>
> Key: FLINK-6571
> URL: https://issues.apache.org/jira/browse/FLINK-6571
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.3.0, 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> So this is a new one: i got a failing test 
> ({{testNoMaxWatermarkOnAsyncStop}}) due to an uncatched InterruptedException.
> {code}
> [00:28:15] Tests run: 7, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 
> 0.828 sec <<< FAILURE! - in 
> org.apache.flink.streaming.runtime.operators.StreamSourceOperatorTest
> [00:28:15] 
> testNoMaxWatermarkOnAsyncStop(org.apache.flink.streaming.runtime.operators.StreamSourceOperatorTest)
>   Time elapsed: 0 sec  <<< ERROR!
> [00:28:15] java.lang.InterruptedException: sleep interrupted
> [00:28:15]at java.lang.Thread.sleep(Native Method)
> [00:28:15]at 
> org.apache.flink.streaming.runtime.operators.StreamSourceOperatorTest$InfiniteSource.run(StreamSourceOperatorTest.java:343)
> [00:28:15]at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
> [00:28:15]at 
> org.apache.flink.streaming.runtime.operators.StreamSourceOperatorTest.testNoMaxWatermarkOnAsyncStop(StreamSourceOperatorTest.java:176)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6895) Add STR_TO_DATE supported in SQL

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383450#comment-16383450
 ] 

ASF GitHub Bot commented on FLINK-6895:
---

Github user buptljy closed the pull request at:

https://github.com/apache/flink/pull/5615


> Add STR_TO_DATE supported in SQL
> 
>
> Key: FLINK-6895
> URL: https://issues.apache.org/jira/browse/FLINK-6895
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: Aegeaner
>Priority: Major
>
> STR_TO_DATE(str,format) This is the inverse of the DATE_FORMAT() function. It 
> takes a string str and a format string format. STR_TO_DATE() returns a 
> DATETIME value if the format string contains both date and time parts, or a 
> DATE or TIME value if the string contains only date or time parts. If the 
> date, time, or datetime value extracted from str is illegal, STR_TO_DATE() 
> returns NULL and produces a warning.
> * Syntax:
> STR_TO_DATE(str,format) 
> * Arguments
> **str: -
> **format: -
> * Return Types
>   DATAETIME/DATE/TIME
> * Example:
>   STR_TO_DATE('01,5,2013','%d,%m,%Y') -> '2013-05-01'
>   SELECT STR_TO_DATE('a09:30:17','a%h:%i:%s') -> '09:30:17'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_str-to-date]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5615: [FLINK-6895][table]Add STR_TO_DATE supported in SQ...

2018-03-02 Thread buptljy
Github user buptljy closed the pull request at:

https://github.com/apache/flink/pull/5615


---


[GitHub] flink pull request #5618: [FLINK-6895][table]Add STR_TO_DATE supported in SQ...

2018-03-02 Thread buptljy
GitHub user buptljy opened a pull request:

https://github.com/apache/flink/pull/5618

[FLINK-6895][table]Add STR_TO_DATE supported in SQL

## What is the purpose of the change
Add STR_TO_DATE Function supported in SQL
## Brief change log
 * STR_TO_DATE(str string, format string)  
\-  str is the string that need to be transformed.
\- format is the pattern of "str"
 * Add tests in ScalarFunctionsTest.scala
 * Add docs in sql.md
## Verifying this change
 * Run unit tests in  ScalarFunctionsTest.scala
## Does this pull request potentially affect one of the following parts:
 * A new sql function
## Documentation
  * Add docs in sql.md

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/buptljy/flink str_to_date

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5618.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5618


commit 59752143ee438cb11969ae4bdda1fac5fc32813c
Author: Liao Jiayi 
Date:   2018-03-01T11:58:08Z

add str_to_date sql function

commit 63f71e4b3d6378f2114aa04ba4d1128f1ec3bc38
Author: Liao Jiayi 
Date:   2018-03-01T11:58:41Z

Merge branch 'master' of github.com:apache/flink

commit 3ec6d2ec487151928032d144de94ca9113d63f01
Author: Liao Jiayi 
Date:   2018-03-01T15:30:04Z

fix checkstyle error




---


[GitHub] flink issue #5618: [FLINK-6895][table]Add STR_TO_DATE supported in SQL

2018-03-02 Thread buptljy
Github user buptljy commented on the issue:

https://github.com/apache/flink/pull/5618
  
Actually I am not sure if it is appropriate to return "string" because it 
should be the inverse of the "DATE_FORMAT()". However, If I return 
DATE/TIME/DATETIME as the jira issue described, the type of data user receives 
will be uncertain in one of DATE/TIME/DATETIME.
Do you have some good ideas ? I will optimize it.


---


[jira] [Commented] (FLINK-6895) Add STR_TO_DATE supported in SQL

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383451#comment-16383451
 ] 

ASF GitHub Bot commented on FLINK-6895:
---

GitHub user buptljy opened a pull request:

https://github.com/apache/flink/pull/5618

[FLINK-6895][table]Add STR_TO_DATE supported in SQL

## What is the purpose of the change
Add STR_TO_DATE Function supported in SQL
## Brief change log
 * STR_TO_DATE(str string, format string)  
\-  str is the string that need to be transformed.
\- format is the pattern of "str"
 * Add tests in ScalarFunctionsTest.scala
 * Add docs in sql.md
## Verifying this change
 * Run unit tests in  ScalarFunctionsTest.scala
## Does this pull request potentially affect one of the following parts:
 * A new sql function
## Documentation
  * Add docs in sql.md

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/buptljy/flink str_to_date

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5618.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5618


commit 59752143ee438cb11969ae4bdda1fac5fc32813c
Author: Liao Jiayi 
Date:   2018-03-01T11:58:08Z

add str_to_date sql function

commit 63f71e4b3d6378f2114aa04ba4d1128f1ec3bc38
Author: Liao Jiayi 
Date:   2018-03-01T11:58:41Z

Merge branch 'master' of github.com:apache/flink

commit 3ec6d2ec487151928032d144de94ca9113d63f01
Author: Liao Jiayi 
Date:   2018-03-01T15:30:04Z

fix checkstyle error




> Add STR_TO_DATE supported in SQL
> 
>
> Key: FLINK-6895
> URL: https://issues.apache.org/jira/browse/FLINK-6895
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: Aegeaner
>Priority: Major
>
> STR_TO_DATE(str,format) This is the inverse of the DATE_FORMAT() function. It 
> takes a string str and a format string format. STR_TO_DATE() returns a 
> DATETIME value if the format string contains both date and time parts, or a 
> DATE or TIME value if the string contains only date or time parts. If the 
> date, time, or datetime value extracted from str is illegal, STR_TO_DATE() 
> returns NULL and produces a warning.
> * Syntax:
> STR_TO_DATE(str,format) 
> * Arguments
> **str: -
> **format: -
> * Return Types
>   DATAETIME/DATE/TIME
> * Example:
>   STR_TO_DATE('01,5,2013','%d,%m,%Y') -> '2013-05-01'
>   SELECT STR_TO_DATE('a09:30:17','a%h:%i:%s') -> '09:30:17'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_str-to-date]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6895) Add STR_TO_DATE supported in SQL

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383452#comment-16383452
 ] 

ASF GitHub Bot commented on FLINK-6895:
---

Github user buptljy commented on the issue:

https://github.com/apache/flink/pull/5618
  
Actually I am not sure if it is appropriate to return "string" because it 
should be the inverse of the "DATE_FORMAT()". However, If I return 
DATE/TIME/DATETIME as the jira issue described, the type of data user receives 
will be uncertain in one of DATE/TIME/DATETIME.
Do you have some good ideas ? I will optimize it.


> Add STR_TO_DATE supported in SQL
> 
>
> Key: FLINK-6895
> URL: https://issues.apache.org/jira/browse/FLINK-6895
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: Aegeaner
>Priority: Major
>
> STR_TO_DATE(str,format) This is the inverse of the DATE_FORMAT() function. It 
> takes a string str and a format string format. STR_TO_DATE() returns a 
> DATETIME value if the format string contains both date and time parts, or a 
> DATE or TIME value if the string contains only date or time parts. If the 
> date, time, or datetime value extracted from str is illegal, STR_TO_DATE() 
> returns NULL and produces a warning.
> * Syntax:
> STR_TO_DATE(str,format) 
> * Arguments
> **str: -
> **format: -
> * Return Types
>   DATAETIME/DATE/TIME
> * Example:
>   STR_TO_DATE('01,5,2013','%d,%m,%Y') -> '2013-05-01'
>   SELECT STR_TO_DATE('a09:30:17','a%h:%i:%s') -> '09:30:17'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_str-to-date]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8811) Add MiniClusterClient to allow fast MiniCluster operations

2018-03-02 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8811.

Resolution: Fixed

Fixed with

1.6.0:

19a8d2ff361be8797d79074b39afb09d51cca671

96a176ac03bb1d188e173ba3bf14c29259d33377

8039464df8f315b1fd06831e11dfc2ef4466b888

1.5.0:

f30ca2101f2151214b138f37f472f886fbdfd9f0

19e4f68ba9cfbf5d0f54b325db4c5d196d262d09

29b34e2255c41abc1c7c4af8ab268a39df57f0ff

> Add MiniClusterClient to allow fast MiniCluster operations
> --
>
> Key: FLINK-8811
> URL: https://issues.apache.org/jira/browse/FLINK-8811
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: flip-6
> Fix For: 1.5.0, 1.6.0
>
>
> We should offer a {{ClusterClient}} implementation for the {{MiniCluster}}. 
> That way we would be able to submit and wait for result without polling how 
> it would be the case by using the {{RestClusterClient}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8808) Enable RestClusterClient to submit jobs to local Dispatchers

2018-03-02 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8808.

Resolution: Fixed

Fixed via

1.6.0: 63b3563e5404b89ad8fd20f1181b8247e7de3fa2

1.5.0: 02cb9e39cc19ebbf82f06d87035aead20780eb2b

> Enable RestClusterClient to submit jobs to local Dispatchers
> 
>
> Key: FLINK-8808
> URL: https://issues.apache.org/jira/browse/FLINK-8808
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0, 1.6.0
>
>
> The {{RestClusterClient}} should be able to submit a job to a {{Dispatcher}} 
> which runs in a local {{ActorSystem}} on the same host as the 
> {{RestClusterClient}}. This is the case for test cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8519) FileAlreadyExistsException on Start Flink Session

2018-03-02 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383469#comment-16383469
 ] 

Nico Kruber commented on FLINK-8519:


[~yew1eb], since I don't have your {{hadoop_user_login.sh}} script, I tried to 
reproduce the problem with the following command on AWS and did not see any 
exception in the logs or the command line outputs.
{code}
./bin/yarn-session.sh -n 2 -nm job_name -jm 1024 -tm 4096 -s 4 -d
{code}

Can you try this command as well?

Also, with your command (executed in a {{bash}} shell) and the given 
{{yarn.container-start-command-template}}, I got the following error:
{code}
Error: Could not find or load main class %-Xmx424m%
{code}
which indicates that the template may be wrong. Based on 
{{org.apache.flink.configuration.ConfigConstants#DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE}},
 you should only use one percent sign.

> FileAlreadyExistsException on Start Flink Session 
> --
>
> Key: FLINK-8519
> URL: https://issues.apache.org/jira/browse/FLINK-8519
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Hai Zhou UTC+8
>Priority: Blocker
> Fix For: 1.5.0
>
>
> *steps to reproduce:*
>  1. build flink from source , git commit: c1734f4
>  2. run script:
> source /path/hadoop/bin/hadoop_user_login.sh hadoop-launcher;
> export YARN_CONF_DIR=/path/hadoop/etc/hadoop;
> export HADOOP_CONF_DIR=/path/hadoop/etc/hadoop;
> export JVM_ARGS="-Djava.security.krb5.conf=${HADOOP_CONF_DIR}/krb5.conf"; 
> /path/flink-1.5-SNAPSHOT/bin/yarn-session.sh -D 
> yarn.container-start-command-template="/usr/local/jdk1.8.0_112/bin/java 
> %%jvmmem%% %%jvmopts%% %%logging%% %%class%% %%args%% %%redirects%%" -n 4 -nm 
> job_name -qu root.rt.flink -jm 1024 -tm 4096 -s 4 -d
>  
>  *error infos:*
> 2018-01-27 00:51:12,841 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> Error while running the Flink Yarn session.
>  java.lang.reflect.UndeclaredThrowableException
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1571)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:786)
>  Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: 
> Couldn't deploy Yarn session cluster
>  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:389)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:594)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:786)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
>  ... 2 more
>  Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: Path /user 
> already exists as dir; cannot create link here
>  at org.apache.hadoop.fs.viewfs.InodeTree.createLink(InodeTree.java:244)
>  at org.apache.hadoop.fs.viewfs.InodeTree.(InodeTree.java:334)
>  at 
> org.apache.hadoop.fs.viewfs.ViewFileSystem$1.(ViewFileSystem.java:161)
>  at 
> org.apache.hadoop.fs.viewfs.ViewFileSystem.initialize(ViewFileSystem.java:161)
>  at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
>  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
>  at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
>  at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
>  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
>  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:167)
>  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:656)
>  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:485)
>  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:384)
>  ... 7 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8519) FileAlreadyExistsException on Start Flink Session

2018-03-02 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383469#comment-16383469
 ] 

Nico Kruber edited comment on FLINK-8519 at 3/2/18 11:07 AM:
-

[~yew1eb], since I don't have your {{hadoop_user_login.sh}} script, I tried to 
reproduce the problem with the following command on AWS with Flink release-1.5 
checkout of revision 347ec3848ec603ac452e394a5211cf888db6663f. I did not see 
any exception in the logs or the command line outputs.
{code}
./bin/yarn-session.sh -n 2 -nm job_name -jm 1024 -tm 4096 -s 4 -d
{code}

Can you try this command as well?

Also, with your command (executed in a {{bash}} shell) and the given 
{{yarn.container-start-command-template}}, I got the following error:
{code}
Error: Could not find or load main class %-Xmx424m%
{code}
which indicates that the template may be wrong. Based on 
{{org.apache.flink.configuration.ConfigConstants#DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE}},
 you should only use one percent sign.


was (Author: nicok):
[~yew1eb], since I don't have your {{hadoop_user_login.sh}} script, I tried to 
reproduce the problem with the following command on AWS and did not see any 
exception in the logs or the command line outputs.
{code}
./bin/yarn-session.sh -n 2 -nm job_name -jm 1024 -tm 4096 -s 4 -d
{code}

Can you try this command as well?

Also, with your command (executed in a {{bash}} shell) and the given 
{{yarn.container-start-command-template}}, I got the following error:
{code}
Error: Could not find or load main class %-Xmx424m%
{code}
which indicates that the template may be wrong. Based on 
{{org.apache.flink.configuration.ConfigConstants#DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE}},
 you should only use one percent sign.

> FileAlreadyExistsException on Start Flink Session 
> --
>
> Key: FLINK-8519
> URL: https://issues.apache.org/jira/browse/FLINK-8519
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Hai Zhou UTC+8
>Priority: Blocker
> Fix For: 1.5.0
>
>
> *steps to reproduce:*
>  1. build flink from source , git commit: c1734f4
>  2. run script:
> source /path/hadoop/bin/hadoop_user_login.sh hadoop-launcher;
> export YARN_CONF_DIR=/path/hadoop/etc/hadoop;
> export HADOOP_CONF_DIR=/path/hadoop/etc/hadoop;
> export JVM_ARGS="-Djava.security.krb5.conf=${HADOOP_CONF_DIR}/krb5.conf"; 
> /path/flink-1.5-SNAPSHOT/bin/yarn-session.sh -D 
> yarn.container-start-command-template="/usr/local/jdk1.8.0_112/bin/java 
> %%jvmmem%% %%jvmopts%% %%logging%% %%class%% %%args%% %%redirects%%" -n 4 -nm 
> job_name -qu root.rt.flink -jm 1024 -tm 4096 -s 4 -d
>  
>  *error infos:*
> 2018-01-27 00:51:12,841 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli - 
> Error while running the Flink Yarn session.
>  java.lang.reflect.UndeclaredThrowableException
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1571)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:786)
>  Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: 
> Couldn't deploy Yarn session cluster
>  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:389)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:594)
>  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$2(FlinkYarnSessionCli.java:786)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
>  ... 2 more
>  Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: Path /user 
> already exists as dir; cannot create link here
>  at org.apache.hadoop.fs.viewfs.InodeTree.createLink(InodeTree.java:244)
>  at org.apache.hadoop.fs.viewfs.InodeTree.(InodeTree.java:334)
>  at 
> org.apache.hadoop.fs.viewfs.ViewFileSystem$1.(ViewFileSystem.java:161)
>  at 
> org.apache.hadoop.fs.viewfs.ViewFileSystem.initialize(ViewFileSystem.java:161)
>  at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
>  at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
>  at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
>  at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
>  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
>  at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:167)
>  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:656)
> 

[jira] [Commented] (FLINK-8813) AutoParallellismITCase fails with Flip6

2018-03-02 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383476#comment-16383476
 ] 

Till Rohrmann commented on FLINK-8813:
--

Auto max parallelism does not make much sense anymore in the Flip-6 world. Yarn 
clusters, for example, will no longer be started with a set of preallocated 
resources. Therefore, it is not clear what max parallelism would mean in such a 
case. I think we should discourage people from using it and instead motivate 
them to always specify the parallelism with which they want to run the job.

As a fix for this concrete problem, we could say that in Flip-6 mode, the max 
parallelism is always {{1}}. What do you think?

> AutoParallellismITCase fails with Flip6
> ---
>
> Key: FLINK-8813
> URL: https://issues.apache.org/jira/browse/FLINK-8813
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{AutoParallelismITCase}} fails when running against flip6. 
> ([https://travis-ci.org/zentol/flink/jobs/347373854)]
> It appears that the {{JobMaster}} does not properly handle 
> {{ExecutionConfig#PARALLELISM_AUTO_MAX}}.
>  
> Exception:
> {code:java}
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Could not 
> start JobManager.
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:287)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>   at 
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not 
> set up JobManager
>   at 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:181)
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:747)
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:243)
>   ... 20 more
> Caused by: java.lang.IllegalArgumentException: The parallelism must be at 
> least one.
>   at 
> org.apache.flink.runtime.jobgraph.JobVertex.setParallelism(JobVertex.java:290)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:162)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:295)
>   at 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:170)
>   ... 22 more{code}
>  
> The likely culprit is this call to {{ExecutionGraphBuilder#buildGraph}} in 
> the {{JobMaster}} constructor:
> {code:java}
> this.executionGraph = ExecutionGraphBuilder.buildGraph(
>null,
>jobGraph,
>jobMasterConfiguration.getConfiguration(),
>scheduledExecutorService,
>scheduledExecutorService,
>slotPool.getSlotProvider(),
>userCodeLoader,
>highAvailabilityServices.getCheckpointRecoveryFactory(),
>rpcTimeout,
>restartStrategy,
>jobMetricGroup,
>-1, // parallelismForAutoMax
>blobServer,
>jobMasterConfiguration.getSlotRequestTimeout(),
>log);{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8835) Fix TaskManager config keys

2018-03-02 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-8835:
---

 Summary: Fix TaskManager config keys
 Key: FLINK-8835
 URL: https://issues.apache.org/jira/browse/FLINK-8835
 Project: Flink
  Issue Type: Bug
  Components: TaskManager
Reporter: Stephan Ewen
 Fix For: 1.5.0


Many new config keys in the TaskManager don't follow the proper naming scheme. 
We need to clear those up before the release. I would also suggest to keep the 
key names short, because that makes it easier for users.

When doing this cleanup pass over the config keys, I would suggest to also make 
some of the existing keys more hierarchical harmonize them with the common 
scheme in Flink.

## New Keys

* {{taskmanager.network.credit-based-flow-control.enabled}} to 
{{taskmanager.network.credit-model}}.

* {{taskmanager.exactly-once.blocking.data.enabled}} to 
{{task.checkpoint.alignment.blocking}} (we already have 
{{task.checkpoint.alignment.max-size}})

## Existing Keys

* {{taskmanager.debug.memory.startLogThread}} => 
{{taskmanager.debug.memory.log}}

* {{taskmanager.debug.memory.logIntervalMs}} => 
{{taskmanager.debug.memory.log-interval}}

* {{taskmanager.initial-registration-pause}} => 
{{taskmanager.registration.initial-backoff}}

* {{taskmanager.max-registration-pause}} => 
{{taskmanager.registration.max-backoff}}

* {{taskmanager.refused-registration-pause}} 
{{taskmanager.registration.refused-backoff}}

* {{taskmanager.maxRegistrationDuration}} ==> * 
{{taskmanager.registration.timeout}}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8834) Job fails to restart due to some tasks stuck in cancelling state

2018-03-02 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383481#comment-16383481
 ] 

Stephan Ewen commented on FLINK-8834:
-

The second exception is not a problem, that is just noise in the log, see 
FLINK-8751

The TaskManagers should make a hard exit (process kill) after a while if 
graceful shutdown does not work. The timeout for that is by default 3 minutes, 
configured in {{task.cancellation.timeout}}. You should see something like this 
in the log: "Notifying TaskManager about fatal error".

Can you check if you deactivated this cancellation timeout (set it to 0 or -1)?





> Job fails to restart due to some tasks stuck in cancelling state
> 
>
> Key: FLINK-8834
> URL: https://issues.apache.org/jira/browse/FLINK-8834
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
> Environment: AWS EMR 5.12
> Flink 1.4.0
> Beam 2.3.0
>Reporter: Daniel Harper
>Priority: Major
>
> Our job threw an exception overnight, causing the job to commence attempting 
> a restart.
> However it never managed to restart because 2 tasks on one of the Task 
> Managers are stuck in "Cancelling" state, with the following exception
> {code:java}
> 2018-03-02 02:29:31,604 WARN  org.apache.flink.runtime.taskmanager.Task   
>   - Task 'PTransformTranslation.UnknownRawPTransform -> 
> ParDoTranslation.RawParDo -> ParDoTranslation.RawParDo -> 
> uk.co.bbc.sawmill.streaming.pipeline.output.io.file.WriteWindowToFile-RDotRecord/TextIO.Write/WriteFiles/GatherTempFileResults/Reshuffle/Window.Into()/Window.Assign.out
>  -> ParDoTranslation.RawParDo -> ToKeyedWorkItem (24/32)' did not react to 
> cancelling signal, but is stuck in method:
>  java.lang.Thread.blockedOn(Thread.java:239)
> java.lang.System$2.blockedOn(System.java:1252)
> java.nio.channels.spi.AbstractInterruptibleChannel.blockedOn(AbstractInterruptibleChannel.java:211)
> java.nio.channels.spi.AbstractInterruptibleChannel.begin(AbstractInterruptibleChannel.java:170)
> java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:457)
> java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
> java.nio.channels.Channels.writeFully(Channels.java:101)
> java.nio.channels.Channels.access$000(Channels.java:61)
> java.nio.channels.Channels$1.write(Channels.java:174)
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458)
> java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
> java.nio.channels.Channels.writeFully(Channels.java:101)
> java.nio.channels.Channels.access$000(Channels.java:61)
> java.nio.channels.Channels$1.write(Channels.java:174)
> sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
> sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
> sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
> sun.nio.cs.StreamEncoder.write(StreamEncoder.java:135)
> java.io.OutputStreamWriter.write(OutputStreamWriter.java:220)
> java.io.Writer.write(Writer.java:157)
> org.apache.beam.sdk.io.TextSink$TextWriter.writeLine(TextSink.java:102)
> org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:118)
> org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:76)
> org.apache.beam.sdk.io.WriteFiles.writeOrClose(WriteFiles.java:550)
> org.apache.beam.sdk.io.WriteFiles.access$1000(WriteFiles.java:112)
> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:718)
> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138)
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:425)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
> org.apache.bea

[jira] [Created] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers

2018-03-02 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-8836:
--

 Summary: Duplicating a KryoSerializer does not duplicate 
registered default serializers
 Key: FLINK-8836
 URL: https://issues.apache.org/jira/browse/FLINK-8836
 Project: Flink
  Issue Type: Bug
  Components: Type Serialization System
Reporter: Tzu-Li (Gordon) Tai


The {{duplicate()}} method of the {{KryoSerializer}} is as following:
{code}

public KryoSerializer duplicate() {
    return new KryoSerializer<>(this);
}

 

protected KryoSerializer(KryoSerializer toCopy) {
    defaultSerializers = toCopy.defaultSerializers;
    defaultSerializerClasses = toCopy.defaultSerializerClasses;

    kryoRegistrations = toCopy.kryoRegistrations;

    ...
 }


{code}

Shortly put, when duplicating a `KryoSerializer`, the `defaultSerializers` 
serializer instances are directly provided to the new `KryoSerializer` instance.
This causes the fact that those default serializers are shared across two 
different `KryoSerializer` instances, and therefore not a correct duplicate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers

2018-03-02 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-8836:
---
Priority: Critical  (was: Major)

> Duplicating a KryoSerializer does not duplicate registered default serializers
> --
>
> Key: FLINK-8836
> URL: https://issues.apache.org/jira/browse/FLINK-8836
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> The {{duplicate()}} method of the {{KryoSerializer}} is as following:
> {code:java}
> public KryoSerializer duplicate() {
>     return new KryoSerializer<>(this);
> }
>  
> protected KryoSerializer(KryoSerializer toCopy) {
>     defaultSerializers = toCopy.defaultSerializers;
>     defaultSerializerClasses = toCopy.defaultSerializerClasses;
>     kryoRegistrations = toCopy.kryoRegistrations;
>     ...
>  }
> {code}
> Shortly put, when duplicating a {{KryoSerializer}}, the 
> {{defaultSerializers}} serializer instances are directly provided to the new 
> {{KryoSerializer}} instance.
>  This causes the fact that those default serializers are shared across two 
> different {{KryoSerializer}} instances, and therefore not a correct duplicate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8517) StaticlyNestedIterationsITCase.testJobWithoutObjectReuse unstable on Travis

2018-03-02 Thread Nico Kruber (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383483#comment-16383483
 ] 

Nico Kruber commented on FLINK-8517:


ok, I can reproduce it locally running the test multiple times - possibly a 
race between {{TaskEventDispatcher#registerPartition}} and 
{{TaskEventDispatcher#subscribeToEvent}}. I'll investigate further.

> StaticlyNestedIterationsITCase.testJobWithoutObjectReuse unstable on Travis
> ---
>
> Key: FLINK-8517
> URL: https://issues.apache.org/jira/browse/FLINK-8517
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.3
>
>
> The {{StaticlyNestedIterationsITCase.testJobWithoutObjectReuse}} test case 
> fails on Travis. This exception might be relevant:
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: Partition 
> 557b069f2b89f8ba599e6ab0956a3f5a@58f1a6b7d8ae10b9141f17c08d06cecb not 
> registered at task event dispatcher.
>   at 
> org.apache.flink.runtime.io.network.TaskEventDispatcher.subscribeToEvent(TaskEventDispatcher.java:107)
>   at 
> org.apache.flink.runtime.iterative.task.IterationHeadTask.initSuperstepBarrier(IterationHeadTask.java:242)
>   at 
> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:266)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748){code}
>  
> https://api.travis-ci.org/v3/job/60156/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers

2018-03-02 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-8836:
---
Description: 
The {{duplicate()}} method of the {{KryoSerializer}} is as following:
{code:java}
public KryoSerializer duplicate() {
    return new KryoSerializer<>(this);
}

 

protected KryoSerializer(KryoSerializer toCopy) {
    defaultSerializers = toCopy.defaultSerializers;
    defaultSerializerClasses = toCopy.defaultSerializerClasses;

    kryoRegistrations = toCopy.kryoRegistrations;

    ...
 }


{code}
Shortly put, when duplicating a {{KryoSerializer}}, the {{defaultSerializers}} 
serializer instances are directly provided to the new {{KryoSerializer}} 
instance.
 This causes the fact that those default serializers are shared across two 
different {{KryoSerializer}} instances, and therefore not a correct duplicate.

  was:
The {{duplicate()}} method of the {{KryoSerializer}} is as following:
{code}

public KryoSerializer duplicate() {
    return new KryoSerializer<>(this);
}

 

protected KryoSerializer(KryoSerializer toCopy) {
    defaultSerializers = toCopy.defaultSerializers;
    defaultSerializerClasses = toCopy.defaultSerializerClasses;

    kryoRegistrations = toCopy.kryoRegistrations;

    ...
 }


{code}

Shortly put, when duplicating a `KryoSerializer`, the `defaultSerializers` 
serializer instances are directly provided to the new `KryoSerializer` instance.
This causes the fact that those default serializers are shared across two 
different `KryoSerializer` instances, and therefore not a correct duplicate.


> Duplicating a KryoSerializer does not duplicate registered default serializers
> --
>
> Key: FLINK-8836
> URL: https://issues.apache.org/jira/browse/FLINK-8836
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
>
> The {{duplicate()}} method of the {{KryoSerializer}} is as following:
> {code:java}
> public KryoSerializer duplicate() {
>     return new KryoSerializer<>(this);
> }
>  
> protected KryoSerializer(KryoSerializer toCopy) {
>     defaultSerializers = toCopy.defaultSerializers;
>     defaultSerializerClasses = toCopy.defaultSerializerClasses;
>     kryoRegistrations = toCopy.kryoRegistrations;
>     ...
>  }
> {code}
> Shortly put, when duplicating a {{KryoSerializer}}, the 
> {{defaultSerializers}} serializer instances are directly provided to the new 
> {{KryoSerializer}} instance.
>  This causes the fact that those default serializers are shared across two 
> different {{KryoSerializer}} instances, and therefore not a correct duplicate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers

2018-03-02 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-8836:
---
Description: 
The {{duplicate()}} method of the {{KryoSerializer}} is as following:
{code:java}
public KryoSerializer duplicate() {
    return new KryoSerializer<>(this);
}

protected KryoSerializer(KryoSerializer toCopy) {
    defaultSerializers = toCopy.defaultSerializers;
    defaultSerializerClasses = toCopy.defaultSerializerClasses;

    kryoRegistrations = toCopy.kryoRegistrations;

    ...
}
{code}
Shortly put, when duplicating a {{KryoSerializer}}, the {{defaultSerializers}} 
serializer instances are directly provided to the new {{KryoSerializer}} 
instance.
 This causes the fact that those default serializers are shared across two 
different {{KryoSerializer}} instances, and therefore not a correct duplicate.

  was:
The {{duplicate()}} method of the {{KryoSerializer}} is as following:
{code:java}
public KryoSerializer duplicate() {
    return new KryoSerializer<>(this);
}

 

protected KryoSerializer(KryoSerializer toCopy) {
    defaultSerializers = toCopy.defaultSerializers;
    defaultSerializerClasses = toCopy.defaultSerializerClasses;

    kryoRegistrations = toCopy.kryoRegistrations;

    ...
 }


{code}
Shortly put, when duplicating a {{KryoSerializer}}, the {{defaultSerializers}} 
serializer instances are directly provided to the new {{KryoSerializer}} 
instance.
 This causes the fact that those default serializers are shared across two 
different {{KryoSerializer}} instances, and therefore not a correct duplicate.


> Duplicating a KryoSerializer does not duplicate registered default serializers
> --
>
> Key: FLINK-8836
> URL: https://issues.apache.org/jira/browse/FLINK-8836
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> The {{duplicate()}} method of the {{KryoSerializer}} is as following:
> {code:java}
> public KryoSerializer duplicate() {
>     return new KryoSerializer<>(this);
> }
> protected KryoSerializer(KryoSerializer toCopy) {
>     defaultSerializers = toCopy.defaultSerializers;
>     defaultSerializerClasses = toCopy.defaultSerializerClasses;
>     kryoRegistrations = toCopy.kryoRegistrations;
>     ...
> }
> {code}
> Shortly put, when duplicating a {{KryoSerializer}}, the 
> {{defaultSerializers}} serializer instances are directly provided to the new 
> {{KryoSerializer}} instance.
>  This causes the fact that those default serializers are shared across two 
> different {{KryoSerializer}} instances, and therefore not a correct duplicate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8828) Add collect method to DataStream / DataSet scala api

2018-03-02 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383491#comment-16383491
 ] 

Stephan Ewen commented on FLINK-8828:
-

Interesting suggestion, and it looks like a pretty lightweight self-contained 
addition, so that's nice!

One thing I would raise is the name "collect". The DataSet API has "collect" as 
'pull the data set back to the client', and the streaming api has an 
experimental feature that does the same for the data stream, also using the 
name "collect", see 
{{org.apache.flink.streaming.api.datastream.DataStreamUtils}}.

> Add collect method to DataStream / DataSet scala api
> 
>
> Key: FLINK-8828
> URL: https://issues.apache.org/jira/browse/FLINK-8828
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataSet API, DataStream API, Scala API
>Affects Versions: 1.4.0
>Reporter: Jelmer Kuperus
>Priority: Major
>
> A collect function is a method that takes a Partial Function as its parameter 
> and applies it to all the elements in the collection to create a new 
> collection which satisfies the Partial Function.
> It can be found on all [core scala collection 
> classes|http://www.scala-lang.org/api/2.9.2/scala/collection/TraversableLike.html]
>  as well as on spark's [rdd 
> interface|https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.rdd.RDD]
> To understand its utility imagine the following scenario :
> Given a DataStream that produces events of type _Purchase_ and _View_ 
> Transform this stream into a stream of purchase amounts over 1000 euros.
> Currently an implementation might look like
> {noformat}
> val x = dataStream
>   .filter(_.isInstanceOf[Purchase])
>   .map(_.asInstanceOf[Purchase])
>   .filter(_.amount > 1000)
>   .map(_.amount){noformat}
> Or alternatively you could do this
> {noformat}
> dataStream.flatMap(_ match {
>   case p: Purchase if p.amount > 1000 => Some(p.amount)
>   case _ => None
> }){noformat}
> But with collect implemented it could look like
> {noformat}
> dataStream.collect {
>   case p: Purchase if p.amount > 1000 => p.amount
> }{noformat}
>  
> Which is a lot nicer to both read and write



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8837) Move DataStreamUtils to package 'experimental'.

2018-03-02 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-8837:
---

 Summary: Move DataStreamUtils to package 'experimental'.
 Key: FLINK-8837
 URL: https://issues.apache.org/jira/browse/FLINK-8837
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Stephan Ewen
 Fix For: 1.5.0


The class {{DataStreamUtils}} came from 'flink-contrib' and now accidentally 
moved to the fully supported API packages. It should be in package 
'experimental' to properly communicate that it is not guaranteed to be API 
stable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8813) AutoParallellismITCase fails with Flip6

2018-03-02 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383498#comment-16383498
 ] 

Chesnay Schepler commented on FLINK-8813:
-

That sounds like unexpected behavior to me (for a user) and it may be better to 
outright fail with a proper error message.

> AutoParallellismITCase fails with Flip6
> ---
>
> Key: FLINK-8813
> URL: https://issues.apache.org/jira/browse/FLINK-8813
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{AutoParallelismITCase}} fails when running against flip6. 
> ([https://travis-ci.org/zentol/flink/jobs/347373854)]
> It appears that the {{JobMaster}} does not properly handle 
> {{ExecutionConfig#PARALLELISM_AUTO_MAX}}.
>  
> Exception:
> {code:java}
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Could not 
> start JobManager.
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:287)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>   at 
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not 
> set up JobManager
>   at 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:181)
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:747)
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:243)
>   ... 20 more
> Caused by: java.lang.IllegalArgumentException: The parallelism must be at 
> least one.
>   at 
> org.apache.flink.runtime.jobgraph.JobVertex.setParallelism(JobVertex.java:290)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:162)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:295)
>   at 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:170)
>   ... 22 more{code}
>  
> The likely culprit is this call to {{ExecutionGraphBuilder#buildGraph}} in 
> the {{JobMaster}} constructor:
> {code:java}
> this.executionGraph = ExecutionGraphBuilder.buildGraph(
>null,
>jobGraph,
>jobMasterConfiguration.getConfiguration(),
>scheduledExecutorService,
>scheduledExecutorService,
>slotPool.getSlotProvider(),
>userCodeLoader,
>highAvailabilityServices.getCheckpointRecoveryFactory(),
>rpcTimeout,
>restartStrategy,
>jobMetricGroup,
>-1, // parallelismForAutoMax
>blobServer,
>jobMasterConfiguration.getSlotRequestTimeout(),
>log);{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8837) Move DataStreamUtils to package 'experimental'.

2018-03-02 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383502#comment-16383502
 ] 

Stefan Richter commented on FLINK-8837:
---

I think there is one problem with keeping it in an experimental package: this 
class is used to expose functionality that goes through package-private methods 
in data stream classes. If we move it to a different package, those methods 
need to become public. If the methods are public, then there is also little use 
in exposing them over `DataStreamUtils`.

> Move DataStreamUtils to package 'experimental'.
> ---
>
> Key: FLINK-8837
> URL: https://issues.apache.org/jira/browse/FLINK-8837
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The class {{DataStreamUtils}} came from 'flink-contrib' and now accidentally 
> moved to the fully supported API packages. It should be in package 
> 'experimental' to properly communicate that it is not guaranteed to be API 
> stable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8837) Move DataStreamUtils to package 'experimental'.

2018-03-02 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383502#comment-16383502
 ] 

Stefan Richter edited comment on FLINK-8837 at 3/2/18 11:47 AM:


I think there is one problem with keeping it in an experimental package: this 
class is used to expose functionality that goes through package-private methods 
in data stream classes. If we move it to a different package, those methods 
need to become public. If the methods are public, then there is also little use 
in exposing them over {{DataStreamUtils}}.


was (Author: srichter):
I think there is one problem with keeping it in an experimental package: this 
class is used to expose functionality that goes through package-private methods 
in data stream classes. If we move it to a different package, those methods 
need to become public. If the methods are public, then there is also little use 
in exposing them over `DataStreamUtils`.

> Move DataStreamUtils to package 'experimental'.
> ---
>
> Key: FLINK-8837
> URL: https://issues.apache.org/jira/browse/FLINK-8837
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The class {{DataStreamUtils}} came from 'flink-contrib' and now accidentally 
> moved to the fully supported API packages. It should be in package 
> 'experimental' to properly communicate that it is not guaranteed to be API 
> stable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8837) Move DataStreamUtils to package 'experimental'.

2018-03-02 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383508#comment-16383508
 ] 

Chesnay Schepler commented on FLINK-8837:
-

why not annotate them with {{PublicEvolving}} instead? Or introduce a separate 
{{Experimental}} annotation for API's exposed to users that could be removed at 
any time.

> Move DataStreamUtils to package 'experimental'.
> ---
>
> Key: FLINK-8837
> URL: https://issues.apache.org/jira/browse/FLINK-8837
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The class {{DataStreamUtils}} came from 'flink-contrib' and now accidentally 
> moved to the fully supported API packages. It should be in package 
> 'experimental' to properly communicate that it is not guaranteed to be API 
> stable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8838) Add Support UNNEST a MultiSet type field

2018-03-02 Thread lincoln.lee (JIRA)
lincoln.lee created FLINK-8838:
--

 Summary: Add Support UNNEST a MultiSet type field
 Key: FLINK-8838
 URL: https://issues.apache.org/jira/browse/FLINK-8838
 Project: Flink
  Issue Type: New Feature
  Components: Table API & SQL
Reporter: lincoln.lee
Assignee: lincoln.lee


{code}MultiSetTypeInfo\{code} was introduced by  FLINK-7491, and 
\{code}UNNEST\{code} support \{code}Array\{code} type only,  so it would be 
nice to support `UNNEST` a `MultiSet` type field.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8837) Move DataStreamUtils to package 'experimental'.

2018-03-02 Thread Stefan Richter (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383509#comment-16383509
 ] 

Stefan Richter commented on FLINK-8837:
---

The class is annotated as evolving and as you said, there is no experimental 
annotation.

> Move DataStreamUtils to package 'experimental'.
> ---
>
> Key: FLINK-8837
> URL: https://issues.apache.org/jira/browse/FLINK-8837
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The class {{DataStreamUtils}} came from 'flink-contrib' and now accidentally 
> moved to the fully supported API packages. It should be in package 
> 'experimental' to properly communicate that it is not guaranteed to be API 
> stable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers

2018-03-02 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383510#comment-16383510
 ] 

Stephan Ewen commented on FLINK-8836:
-

Is the concern that the default serializers themselves are stateful?

As far as I understood it so far, the Kryo object is stateful and not thread 
safe, hence needs to be exclusive to one thread at a time. Kryo does the 
tracking of object graphs and generic scopes, which makes it not thread safe 
during serialization / deserialization.
The serializers used by Kryo should be stateless and can thus sharable.

Please double check this assumption - if the serializers are actually stateful, 
then we need to see how we can handle that.
If they are not, we should be able to close this issue.

> Duplicating a KryoSerializer does not duplicate registered default serializers
> --
>
> Key: FLINK-8836
> URL: https://issues.apache.org/jira/browse/FLINK-8836
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> The {{duplicate()}} method of the {{KryoSerializer}} is as following:
> {code:java}
> public KryoSerializer duplicate() {
>     return new KryoSerializer<>(this);
> }
> protected KryoSerializer(KryoSerializer toCopy) {
>     defaultSerializers = toCopy.defaultSerializers;
>     defaultSerializerClasses = toCopy.defaultSerializerClasses;
>     kryoRegistrations = toCopy.kryoRegistrations;
>     ...
> }
> {code}
> Shortly put, when duplicating a {{KryoSerializer}}, the 
> {{defaultSerializers}} serializer instances are directly provided to the new 
> {{KryoSerializer}} instance.
>  This causes the fact that those default serializers are shared across two 
> different {{KryoSerializer}} instances, and therefore not a correct duplicate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers

2018-03-02 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383515#comment-16383515
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8836:


[~StephanEwen]

All built-in Kryo default serializers, AFAIK, are stateless. However, users can 
also register their own serializer implementations via the `ExecutionConfig`, 
making them potentially stateful. The \{{KryoSerializer}} recognizes these user 
registrations and adds them to the Kryo object.

> Duplicating a KryoSerializer does not duplicate registered default serializers
> --
>
> Key: FLINK-8836
> URL: https://issues.apache.org/jira/browse/FLINK-8836
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> The {{duplicate()}} method of the {{KryoSerializer}} is as following:
> {code:java}
> public KryoSerializer duplicate() {
>     return new KryoSerializer<>(this);
> }
> protected KryoSerializer(KryoSerializer toCopy) {
>     defaultSerializers = toCopy.defaultSerializers;
>     defaultSerializerClasses = toCopy.defaultSerializerClasses;
>     kryoRegistrations = toCopy.kryoRegistrations;
>     ...
> }
> {code}
> Shortly put, when duplicating a {{KryoSerializer}}, the 
> {{defaultSerializers}} serializer instances are directly provided to the new 
> {{KryoSerializer}} instance.
>  This causes the fact that those default serializers are shared across two 
> different {{KryoSerializer}} instances, and therefore not a correct duplicate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers

2018-03-02 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383515#comment-16383515
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-8836 at 3/2/18 12:03 PM:
-

[~StephanEwen]

All built-in Kryo default serializers, AFAIK, are stateless. However, users can 
also register their own serializer implementations via the {{ExecutionConfig}}, 
making them potentially stateful. The {{KryoSerializer}} recognizes these user 
registrations and adds them to the Kryo object.


was (Author: tzulitai):
[~StephanEwen]

All built-in Kryo default serializers, AFAIK, are stateless. However, users can 
also register their own serializer implementations via the `ExecutionConfig`, 
making them potentially stateful. The \{{KryoSerializer}} recognizes these user 
registrations and adds them to the Kryo object.

> Duplicating a KryoSerializer does not duplicate registered default serializers
> --
>
> Key: FLINK-8836
> URL: https://issues.apache.org/jira/browse/FLINK-8836
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> The {{duplicate()}} method of the {{KryoSerializer}} is as following:
> {code:java}
> public KryoSerializer duplicate() {
>     return new KryoSerializer<>(this);
> }
> protected KryoSerializer(KryoSerializer toCopy) {
>     defaultSerializers = toCopy.defaultSerializers;
>     defaultSerializerClasses = toCopy.defaultSerializerClasses;
>     kryoRegistrations = toCopy.kryoRegistrations;
>     ...
> }
> {code}
> Shortly put, when duplicating a {{KryoSerializer}}, the 
> {{defaultSerializers}} serializer instances are directly provided to the new 
> {{KryoSerializer}} instance.
>  This causes the fact that those default serializers are shared across two 
> different {{KryoSerializer}} instances, and therefore not a correct duplicate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8838) Add Support UNNEST a MultiSet type field

2018-03-02 Thread lincoln.lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln.lee updated FLINK-8838:
---
Description: MultiSetTypeInfo was introduced by  FLINK-7491, and UNNEST 
support Array type only,  so it would be nice to support UNNEST a MultiSet type 
field.  (was: {code}MultiSetTypeInfo\{code} was introduced by  FLINK-7491, and 
\{code}UNNEST\{code} support \{code}Array\{code} type only,  so it would be 
nice to support `UNNEST` a `MultiSet` type field.)

> Add Support UNNEST a MultiSet type field
> 
>
> Key: FLINK-8838
> URL: https://issues.apache.org/jira/browse/FLINK-8838
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Major
>
> MultiSetTypeInfo was introduced by  FLINK-7491, and UNNEST support Array type 
> only,  so it would be nice to support UNNEST a MultiSet type field.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8838) Add Support for UNNEST a MultiSet type field

2018-03-02 Thread lincoln.lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln.lee updated FLINK-8838:
---
Summary: Add Support for UNNEST a MultiSet type field  (was: Add Support 
UNNEST a MultiSet type field)

> Add Support for UNNEST a MultiSet type field
> 
>
> Key: FLINK-8838
> URL: https://issues.apache.org/jira/browse/FLINK-8838
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Major
>
> MultiSetTypeInfo was introduced by  FLINK-7491, and UNNEST support Array type 
> only,  so it would be nice to support UNNEST a MultiSet type field.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8834) Job fails to restart due to some tasks stuck in cancelling state

2018-03-02 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-8834:

Fix Version/s: 1.5.0

> Job fails to restart due to some tasks stuck in cancelling state
> 
>
> Key: FLINK-8834
> URL: https://issues.apache.org/jira/browse/FLINK-8834
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
> Environment: AWS EMR 5.12
> Flink 1.4.0
> Beam 2.3.0
>Reporter: Daniel Harper
>Priority: Major
> Fix For: 1.5.0
>
>
> Our job threw an exception overnight, causing the job to commence attempting 
> a restart.
> However it never managed to restart because 2 tasks on one of the Task 
> Managers are stuck in "Cancelling" state, with the following exception
> {code:java}
> 2018-03-02 02:29:31,604 WARN  org.apache.flink.runtime.taskmanager.Task   
>   - Task 'PTransformTranslation.UnknownRawPTransform -> 
> ParDoTranslation.RawParDo -> ParDoTranslation.RawParDo -> 
> uk.co.bbc.sawmill.streaming.pipeline.output.io.file.WriteWindowToFile-RDotRecord/TextIO.Write/WriteFiles/GatherTempFileResults/Reshuffle/Window.Into()/Window.Assign.out
>  -> ParDoTranslation.RawParDo -> ToKeyedWorkItem (24/32)' did not react to 
> cancelling signal, but is stuck in method:
>  java.lang.Thread.blockedOn(Thread.java:239)
> java.lang.System$2.blockedOn(System.java:1252)
> java.nio.channels.spi.AbstractInterruptibleChannel.blockedOn(AbstractInterruptibleChannel.java:211)
> java.nio.channels.spi.AbstractInterruptibleChannel.begin(AbstractInterruptibleChannel.java:170)
> java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:457)
> java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
> java.nio.channels.Channels.writeFully(Channels.java:101)
> java.nio.channels.Channels.access$000(Channels.java:61)
> java.nio.channels.Channels$1.write(Channels.java:174)
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458)
> java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
> java.nio.channels.Channels.writeFully(Channels.java:101)
> java.nio.channels.Channels.access$000(Channels.java:61)
> java.nio.channels.Channels$1.write(Channels.java:174)
> sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
> sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
> sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
> sun.nio.cs.StreamEncoder.write(StreamEncoder.java:135)
> java.io.OutputStreamWriter.write(OutputStreamWriter.java:220)
> java.io.Writer.write(Writer.java:157)
> org.apache.beam.sdk.io.TextSink$TextWriter.writeLine(TextSink.java:102)
> org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:118)
> org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:76)
> org.apache.beam.sdk.io.WriteFiles.writeOrClose(WriteFiles.java:550)
> org.apache.beam.sdk.io.WriteFiles.access$1000(WriteFiles.java:112)
> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:718)
> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138)
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:425)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:888)
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:865)
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(GroupAlsoByWindowViaWindowSetNewDoFn.java:94)
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$1.outputWindowedValue(

[GitHub] flink pull request #5619: [FLINK-8838] [table] Add Support for UNNEST a Mult...

2018-03-02 Thread lincoln-lil
GitHub user lincoln-lil opened a pull request:

https://github.com/apache/flink/pull/5619

[FLINK-8838] [table] Add Support for UNNEST a MultiSet type field.

## What is the purpose of the change
*This PR add support for UNNEST a MultiSet type field according to SQL 
standard UNNEST a collection value
( ::=   | )

## Brief change log
- *Add support for UNNEST a MultiSet type field
## Verifying this change
- *See added unit tests.

## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with 
@Public(Evolving): no
- The serializers: no
- The runtime per-record code paths (performance sensitive): yes
- Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
- The S3 file system connector: no

## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? JavaDocs

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lincoln-lil/flink FLINK-8838

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5619.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5619


commit 961582b28ef20bfab1ea47ceb548b6e6e104e1f7
Author: lincoln-lil 
Date:   2018-03-02T12:05:44Z

[FLINK-8838] [table] Add Support for UNNEST a MultiSet type field.




---


[jira] [Commented] (FLINK-8838) Add Support for UNNEST a MultiSet type field

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383525#comment-16383525
 ] 

ASF GitHub Bot commented on FLINK-8838:
---

GitHub user lincoln-lil opened a pull request:

https://github.com/apache/flink/pull/5619

[FLINK-8838] [table] Add Support for UNNEST a MultiSet type field.

## What is the purpose of the change
*This PR add support for UNNEST a MultiSet type field according to SQL 
standard UNNEST a collection value
( ::=   | )

## Brief change log
- *Add support for UNNEST a MultiSet type field
## Verifying this change
- *See added unit tests.

## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with 
@Public(Evolving): no
- The serializers: no
- The runtime per-record code paths (performance sensitive): yes
- Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
- The S3 file system connector: no

## Documentation
- Does this pull request introduce a new feature? no
- If yes, how is the feature documented? JavaDocs

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lincoln-lil/flink FLINK-8838

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5619.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5619


commit 961582b28ef20bfab1ea47ceb548b6e6e104e1f7
Author: lincoln-lil 
Date:   2018-03-02T12:05:44Z

[FLINK-8838] [table] Add Support for UNNEST a MultiSet type field.




> Add Support for UNNEST a MultiSet type field
> 
>
> Key: FLINK-8838
> URL: https://issues.apache.org/jira/browse/FLINK-8838
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: lincoln.lee
>Assignee: lincoln.lee
>Priority: Major
>
> MultiSetTypeInfo was introduced by  FLINK-7491, and UNNEST support Array type 
> only,  so it would be nice to support UNNEST a MultiSet type field.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8824) In Kafka Consumers, replace 'getCanonicalName()' with 'getClassName()'

2018-03-02 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383535#comment-16383535
 ] 

mingleizhang commented on FLINK-8824:
-

Hi, [~StephanEwen] The only problematic seems I can get like the following. 
Other than that, I think all good.


{code:java}
//inner class
System.out.println(HashMap.SimpleEntry.class.getName());
System.out.println(HashMap.SimpleEntry.class.getCanonicalName());
{code}

Will print as follows


{code:java}
java.util.AbstractMap$SimpleEntry // correct way
java.util.AbstractMap.SimpleEntry // wrong way
{code}








 

> In Kafka Consumers, replace 'getCanonicalName()' with 'getClassName()'
> --
>
> Key: FLINK-8824
> URL: https://issues.apache.org/jira/browse/FLINK-8824
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Stephan Ewen
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.5.0
>
>
> The connector uses {{getCanonicalName()}} in all places, gather than 
> {{getClassName()}}.
> {{getCanonicalName()}}'s intention is to normalize class names for arrays, 
> etc, but is problematic when instantiating classes from class names.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8834) Job fails to restart due to some tasks stuck in cancelling state

2018-03-02 Thread Daniel Harper (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383537#comment-16383537
 ] 

Daniel Harper commented on FLINK-8834:
--

We are not setting this configuration parameter. These are the config 
parameters we see via the Job Manager configuration tab on the UI

 
{code:java}
classloader.resolve-order
containerized.heap-cutoff-ratio
env.hadoop.conf.dir
env.java.opts.taskmanager
env.yarn.conf.dir
high-availability
high-availability.cluster-id
high-availability.zookeeper.path.root
high-availability.zookeeper.quorum
high-availability.zookeeper.storageDir
jobmanager.rpc.address
jobmanager.rpc.port
jobmanager.web.checkpoints.history
parallelism.default
state.backend
state.backend.fs.checkpointdir
state.checkpoints.dir
state.savepoints.dir
taskmanager.network.numberOfBuffers
web.port
yarn.application-attempts
yarn.maximum-failed-containers
zookeeper.sasl.disable
{code}
 

I've had a look in the job manager logs for the word "fatal" and it doesn't 
seem to yield any results

 
{code:java}
 ▶ gzcat ~/Downloads/jobmanager.log.gz | grep -i fatal
 ▶ {code}

> Job fails to restart due to some tasks stuck in cancelling state
> 
>
> Key: FLINK-8834
> URL: https://issues.apache.org/jira/browse/FLINK-8834
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
> Environment: AWS EMR 5.12
> Flink 1.4.0
> Beam 2.3.0
>Reporter: Daniel Harper
>Priority: Major
> Fix For: 1.5.0
>
>
> Our job threw an exception overnight, causing the job to commence attempting 
> a restart.
> However it never managed to restart because 2 tasks on one of the Task 
> Managers are stuck in "Cancelling" state, with the following exception
> {code:java}
> 2018-03-02 02:29:31,604 WARN  org.apache.flink.runtime.taskmanager.Task   
>   - Task 'PTransformTranslation.UnknownRawPTransform -> 
> ParDoTranslation.RawParDo -> ParDoTranslation.RawParDo -> 
> uk.co.bbc.sawmill.streaming.pipeline.output.io.file.WriteWindowToFile-RDotRecord/TextIO.Write/WriteFiles/GatherTempFileResults/Reshuffle/Window.Into()/Window.Assign.out
>  -> ParDoTranslation.RawParDo -> ToKeyedWorkItem (24/32)' did not react to 
> cancelling signal, but is stuck in method:
>  java.lang.Thread.blockedOn(Thread.java:239)
> java.lang.System$2.blockedOn(System.java:1252)
> java.nio.channels.spi.AbstractInterruptibleChannel.blockedOn(AbstractInterruptibleChannel.java:211)
> java.nio.channels.spi.AbstractInterruptibleChannel.begin(AbstractInterruptibleChannel.java:170)
> java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:457)
> java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
> java.nio.channels.Channels.writeFully(Channels.java:101)
> java.nio.channels.Channels.access$000(Channels.java:61)
> java.nio.channels.Channels$1.write(Channels.java:174)
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458)
> java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
> java.nio.channels.Channels.writeFully(Channels.java:101)
> java.nio.channels.Channels.access$000(Channels.java:61)
> java.nio.channels.Channels$1.write(Channels.java:174)
> sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
> sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
> sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
> sun.nio.cs.StreamEncoder.write(StreamEncoder.java:135)
> java.io.OutputStreamWriter.write(OutputStreamWriter.java:220)
> java.io.Writer.write(Writer.java:157)
> org.apache.beam.sdk.io.TextSink$TextWriter.writeLine(TextSink.java:102)
> org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:118)
> org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:76)
> org.apache.beam.sdk.io.WriteFiles.writeOrClose(WriteFiles.java:550)
> org.apache.beam.sdk.io.WriteFiles.access$1000(WriteFiles.java:112)
> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:718)
> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138)
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:425)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperato

[GitHub] flink pull request #5620: [FLINK-8824] [kafka] Replace getCanonicalName with...

2018-03-02 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/5620

[FLINK-8824] [kafka] Replace getCanonicalName with getName

## What is the purpose of the change
Since using ```getCanonicalName``` to get class name  there are potential 
problems. And it is mainly for  normalizing class names for array.


## Brief change log

Replace it with ```getName```


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-8824

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5620.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5620


commit 634262b27b90cc1383c38ff8ac642f7aae522e71
Author: zhangminglei 
Date:   2018-03-02T12:31:49Z

[FLINK-8824] [kafka] Replace getCanonicalName with getName




---


[jira] [Commented] (FLINK-8824) In Kafka Consumers, replace 'getCanonicalName()' with 'getClassName()'

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383539#comment-16383539
 ] 

ASF GitHub Bot commented on FLINK-8824:
---

GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/5620

[FLINK-8824] [kafka] Replace getCanonicalName with getName

## What is the purpose of the change
Since using ```getCanonicalName``` to get class name  there are potential 
problems. And it is mainly for  normalizing class names for array.


## Brief change log

Replace it with ```getName```


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-8824

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5620.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5620


commit 634262b27b90cc1383c38ff8ac642f7aae522e71
Author: zhangminglei 
Date:   2018-03-02T12:31:49Z

[FLINK-8824] [kafka] Replace getCanonicalName with getName




> In Kafka Consumers, replace 'getCanonicalName()' with 'getClassName()'
> --
>
> Key: FLINK-8824
> URL: https://issues.apache.org/jira/browse/FLINK-8824
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Stephan Ewen
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.5.0
>
>
> The connector uses {{getCanonicalName()}} in all places, gather than 
> {{getClassName()}}.
> {{getCanonicalName()}}'s intention is to normalize class names for arrays, 
> etc, but is problematic when instantiating classes from class names.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6160) Retry JobManager/ResourceManager connection in case of timeout

2018-03-02 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383541#comment-16383541
 ] 

mingleizhang commented on FLINK-6160:
-

Sup ?

>  Retry JobManager/ResourceManager connection in case of timeout
> ---
>
> Key: FLINK-6160
> URL: https://issues.apache.org/jira/browse/FLINK-6160
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.3.0
>Reporter: Till Rohrmann
>Priority: Major
>  Labels: flip-6
>
> In case of a heartbeat timeout, the {{TaskExecutor}} closes the connection to 
> the remote component. Furthermore, it assumes that the component has actually 
> failed and, thus, it will only start trying to connect to the component if it 
> is notified about a new leader address and leader session id. This is 
> brittle, because the heartbeat could also time out without the component 
> having crashed. Thus, we should add an automatic retry to the latest known 
> leader address information in case of a timeout.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8836) Duplicating a KryoSerializer does not duplicate registered default serializers

2018-03-02 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383515#comment-16383515
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-8836 at 3/2/18 12:44 PM:
-

[~StephanEwen] I think they can be stateful.

All built-in Kryo default serializers, AFAIK, are stateless. However, users can 
also register their own serializer implementations via the {{ExecutionConfig}}, 
making them potentially stateful. The {{KryoSerializer}} recognizes these user 
registrations and adds them to the Kryo object.


was (Author: tzulitai):
[~StephanEwen]

All built-in Kryo default serializers, AFAIK, are stateless. However, users can 
also register their own serializer implementations via the {{ExecutionConfig}}, 
making them potentially stateful. The {{KryoSerializer}} recognizes these user 
registrations and adds them to the Kryo object.

> Duplicating a KryoSerializer does not duplicate registered default serializers
> --
>
> Key: FLINK-8836
> URL: https://issues.apache.org/jira/browse/FLINK-8836
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Critical
>
> The {{duplicate()}} method of the {{KryoSerializer}} is as following:
> {code:java}
> public KryoSerializer duplicate() {
>     return new KryoSerializer<>(this);
> }
> protected KryoSerializer(KryoSerializer toCopy) {
>     defaultSerializers = toCopy.defaultSerializers;
>     defaultSerializerClasses = toCopy.defaultSerializerClasses;
>     kryoRegistrations = toCopy.kryoRegistrations;
>     ...
> }
> {code}
> Shortly put, when duplicating a {{KryoSerializer}}, the 
> {{defaultSerializers}} serializer instances are directly provided to the new 
> {{KryoSerializer}} instance.
>  This causes the fact that those default serializers are shared across two 
> different {{KryoSerializer}} instances, and therefore not a correct duplicate.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8835) Fix TaskManager config keys

2018-03-02 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-8835:
---

Assignee: mingleizhang

> Fix TaskManager config keys
> ---
>
> Key: FLINK-8835
> URL: https://issues.apache.org/jira/browse/FLINK-8835
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Reporter: Stephan Ewen
>Assignee: mingleizhang
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Many new config keys in the TaskManager don't follow the proper naming 
> scheme. We need to clear those up before the release. I would also suggest to 
> keep the key names short, because that makes it easier for users.
> When doing this cleanup pass over the config keys, I would suggest to also 
> make some of the existing keys more hierarchical harmonize them with the 
> common scheme in Flink.
> ## New Keys
> * {{taskmanager.network.credit-based-flow-control.enabled}} to 
> {{taskmanager.network.credit-model}}.
> * {{taskmanager.exactly-once.blocking.data.enabled}} to 
> {{task.checkpoint.alignment.blocking}} (we already have 
> {{task.checkpoint.alignment.max-size}})
> ## Existing Keys
> * {{taskmanager.debug.memory.startLogThread}} => 
> {{taskmanager.debug.memory.log}}
> * {{taskmanager.debug.memory.logIntervalMs}} => 
> {{taskmanager.debug.memory.log-interval}}
> * {{taskmanager.initial-registration-pause}} => 
> {{taskmanager.registration.initial-backoff}}
> * {{taskmanager.max-registration-pause}} => 
> {{taskmanager.registration.max-backoff}}
> * {{taskmanager.refused-registration-pause}} 
> {{taskmanager.registration.refused-backoff}}
> * {{taskmanager.maxRegistrationDuration}} ==> * 
> {{taskmanager.registration.timeout}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8274) Fix Java 64K method compiling limitation for CommonCalc

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383545#comment-16383545
 ] 

ASF GitHub Bot commented on FLINK-8274:
---

Github user Xpray commented on the issue:

https://github.com/apache/flink/pull/5613
  
It looks great, how about splitting reuseInputUnboxCode as well?  I found 
UnboxingCode might be oversized and has to split. 


> Fix Java 64K method compiling limitation for CommonCalc
> ---
>
> Key: FLINK-8274
> URL: https://issues.apache.org/jira/browse/FLINK-8274
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Critical
>
> For complex SQL Queries, the generated code for {code}DataStreamCalc{code}, 
> {code}DataSetCalc{code} may exceed Java's method length limitation 64kb.
>  
> This issue will split long method to several sub method calls.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5613: [FLINK-8274] [table] Split generated methods for preventi...

2018-03-02 Thread Xpray
Github user Xpray commented on the issue:

https://github.com/apache/flink/pull/5613
  
It looks great, how about splitting reuseInputUnboxCode as well?  I found 
UnboxingCode might be oversized and has to split. 


---


[jira] [Comment Edited] (FLINK-8834) Job fails to restart due to some tasks stuck in cancelling state

2018-03-02 Thread Daniel Harper (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383548#comment-16383548
 ] 

Daniel Harper edited comment on FLINK-8834 at 3/2/18 12:56 PM:
---

I can see this code where the log line is created

[https://github.com/apache/flink/blob/f9a583b727c9aecbec3213b12266f1d598223400/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L1578]

So it looks like the {{if}} condition is never reaching the first branch, which 
in turn logs the message [[~StephanEwen]] was talking about.

 


was (Author: djharper):
I can see this code where the log line is created

[https://github.com/apache/flink/blob/f9a583b727c9aecbec3213b12266f1d598223400/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L1578]

So it looks like the {{if}} condition is never reaching the first branch, which 
in turn logs the message [[~StephanEwen]] was talking about.

 

> Job fails to restart due to some tasks stuck in cancelling state
> 
>
> Key: FLINK-8834
> URL: https://issues.apache.org/jira/browse/FLINK-8834
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
> Environment: AWS EMR 5.12
> Flink 1.4.0
> Beam 2.3.0
>Reporter: Daniel Harper
>Priority: Major
> Fix For: 1.5.0
>
>
> Our job threw an exception overnight, causing the job to commence attempting 
> a restart.
> However it never managed to restart because 2 tasks on one of the Task 
> Managers are stuck in "Cancelling" state, with the following exception
> {code:java}
> 2018-03-02 02:29:31,604 WARN  org.apache.flink.runtime.taskmanager.Task   
>   - Task 'PTransformTranslation.UnknownRawPTransform -> 
> ParDoTranslation.RawParDo -> ParDoTranslation.RawParDo -> 
> uk.co.bbc.sawmill.streaming.pipeline.output.io.file.WriteWindowToFile-RDotRecord/TextIO.Write/WriteFiles/GatherTempFileResults/Reshuffle/Window.Into()/Window.Assign.out
>  -> ParDoTranslation.RawParDo -> ToKeyedWorkItem (24/32)' did not react to 
> cancelling signal, but is stuck in method:
>  java.lang.Thread.blockedOn(Thread.java:239)
> java.lang.System$2.blockedOn(System.java:1252)
> java.nio.channels.spi.AbstractInterruptibleChannel.blockedOn(AbstractInterruptibleChannel.java:211)
> java.nio.channels.spi.AbstractInterruptibleChannel.begin(AbstractInterruptibleChannel.java:170)
> java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:457)
> java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
> java.nio.channels.Channels.writeFully(Channels.java:101)
> java.nio.channels.Channels.access$000(Channels.java:61)
> java.nio.channels.Channels$1.write(Channels.java:174)
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458)
> java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
> java.nio.channels.Channels.writeFully(Channels.java:101)
> java.nio.channels.Channels.access$000(Channels.java:61)
> java.nio.channels.Channels$1.write(Channels.java:174)
> sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
> sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
> sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
> sun.nio.cs.StreamEncoder.write(StreamEncoder.java:135)
> java.io.OutputStreamWriter.write(OutputStreamWriter.java:220)
> java.io.Writer.write(Writer.java:157)
> org.apache.beam.sdk.io.TextSink$TextWriter.writeLine(TextSink.java:102)
> org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:118)
> org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:76)
> org.apache.beam.sdk.io.WriteFiles.writeOrClose(WriteFiles.java:550)
> org.apache.beam.sdk.io.WriteFiles.access$1000(WriteFiles.java:112)
> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:718)
> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138)
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:425)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
> org.apa

[jira] [Commented] (FLINK-8834) Job fails to restart due to some tasks stuck in cancelling state

2018-03-02 Thread Daniel Harper (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383548#comment-16383548
 ] 

Daniel Harper commented on FLINK-8834:
--

I can see this code where the log line is created

[https://github.com/apache/flink/blob/f9a583b727c9aecbec3213b12266f1d598223400/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L1578]

So it looks like the {{if}} condition is never reaching the first branch, which 
in turn logs the message [[~StephanEwen]] was talking about.

 

> Job fails to restart due to some tasks stuck in cancelling state
> 
>
> Key: FLINK-8834
> URL: https://issues.apache.org/jira/browse/FLINK-8834
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0
> Environment: AWS EMR 5.12
> Flink 1.4.0
> Beam 2.3.0
>Reporter: Daniel Harper
>Priority: Major
> Fix For: 1.5.0
>
>
> Our job threw an exception overnight, causing the job to commence attempting 
> a restart.
> However it never managed to restart because 2 tasks on one of the Task 
> Managers are stuck in "Cancelling" state, with the following exception
> {code:java}
> 2018-03-02 02:29:31,604 WARN  org.apache.flink.runtime.taskmanager.Task   
>   - Task 'PTransformTranslation.UnknownRawPTransform -> 
> ParDoTranslation.RawParDo -> ParDoTranslation.RawParDo -> 
> uk.co.bbc.sawmill.streaming.pipeline.output.io.file.WriteWindowToFile-RDotRecord/TextIO.Write/WriteFiles/GatherTempFileResults/Reshuffle/Window.Into()/Window.Assign.out
>  -> ParDoTranslation.RawParDo -> ToKeyedWorkItem (24/32)' did not react to 
> cancelling signal, but is stuck in method:
>  java.lang.Thread.blockedOn(Thread.java:239)
> java.lang.System$2.blockedOn(System.java:1252)
> java.nio.channels.spi.AbstractInterruptibleChannel.blockedOn(AbstractInterruptibleChannel.java:211)
> java.nio.channels.spi.AbstractInterruptibleChannel.begin(AbstractInterruptibleChannel.java:170)
> java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:457)
> java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
> java.nio.channels.Channels.writeFully(Channels.java:101)
> java.nio.channels.Channels.access$000(Channels.java:61)
> java.nio.channels.Channels$1.write(Channels.java:174)
> java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458)
> java.nio.channels.Channels.writeFullyImpl(Channels.java:78)
> java.nio.channels.Channels.writeFully(Channels.java:101)
> java.nio.channels.Channels.access$000(Channels.java:61)
> java.nio.channels.Channels$1.write(Channels.java:174)
> sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
> sun.nio.cs.StreamEncoder.implWrite(StreamEncoder.java:282)
> sun.nio.cs.StreamEncoder.write(StreamEncoder.java:125)
> sun.nio.cs.StreamEncoder.write(StreamEncoder.java:135)
> java.io.OutputStreamWriter.write(OutputStreamWriter.java:220)
> java.io.Writer.write(Writer.java:157)
> org.apache.beam.sdk.io.TextSink$TextWriter.writeLine(TextSink.java:102)
> org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:118)
> org.apache.beam.sdk.io.TextSink$TextWriter.write(TextSink.java:76)
> org.apache.beam.sdk.io.WriteFiles.writeOrClose(WriteFiles.java:550)
> org.apache.beam.sdk.io.WriteFiles.access$1000(WriteFiles.java:112)
> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:718)
> org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177)
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138)
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:425)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOut

[jira] [Updated] (FLINK-8517) StaticlyNestedIterationsITCase.testJobWithoutObjectReuse unstable on Travis

2018-03-02 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-8517:
---
Component/s: TaskManager

> StaticlyNestedIterationsITCase.testJobWithoutObjectReuse unstable on Travis
> ---
>
> Key: FLINK-8517
> URL: https://issues.apache.org/jira/browse/FLINK-8517
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, TaskManager, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.3
>
>
> The {{StaticlyNestedIterationsITCase.testJobWithoutObjectReuse}} test case 
> fails on Travis. This exception might be relevant:
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: Partition 
> 557b069f2b89f8ba599e6ab0956a3f5a@58f1a6b7d8ae10b9141f17c08d06cecb not 
> registered at task event dispatcher.
>   at 
> org.apache.flink.runtime.io.network.TaskEventDispatcher.subscribeToEvent(TaskEventDispatcher.java:107)
>   at 
> org.apache.flink.runtime.iterative.task.IterationHeadTask.initSuperstepBarrier(IterationHeadTask.java:242)
>   at 
> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:266)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748){code}
>  
> https://api.travis-ci.org/v3/job/60156/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5621: [FLINK-8517] fix missing synchronization in TaskEv...

2018-03-02 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/5621

[FLINK-8517] fix missing synchronization in TaskEventDispatcher

## What is the purpose of the change

The `TaskEventDispatcher` was missing synchronization accessing the 
`registeredHandlers` field for the new `subscribeToEvent()` and `publish()` 
methods. This was causing the 
`StaticlyNestedIterationsITCase.testJobWithoutObjectReuse()` test to 
sporadically fail (reproducible after running a couple of times).

Please merge into `master` and `release-1.5` after accepting.

## Brief change log

- add synchronization around `TaskEventDispatcher#subscribeToEvent()`'s 
access to `registeredHandlers`
- add synchronization around `TaskEventDispatcher#publish()`'s access to 
`registeredHandlers`

## Verifying this change

This change is already covered by existing tests (indirectly), such as 
`StaticlyNestedIterationsITCase.testJobWithoutObjectReuse()`. I ran it almost 
24000 times and could not reproduce it anymore with the change

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **not applicable**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-8517

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5621.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5621


commit aabeb89dd1259174c786f19b7e97c4c50038610f
Author: Nico Kruber 
Date:   2018-03-02T13:38:20Z

[FLINK-8517] fix missing synchronization in TaskEventDispatcher




---


[jira] [Commented] (FLINK-8517) StaticlyNestedIterationsITCase.testJobWithoutObjectReuse unstable on Travis

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383580#comment-16383580
 ] 

ASF GitHub Bot commented on FLINK-8517:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/5621

[FLINK-8517] fix missing synchronization in TaskEventDispatcher

## What is the purpose of the change

The `TaskEventDispatcher` was missing synchronization accessing the 
`registeredHandlers` field for the new `subscribeToEvent()` and `publish()` 
methods. This was causing the 
`StaticlyNestedIterationsITCase.testJobWithoutObjectReuse()` test to 
sporadically fail (reproducible after running a couple of times).

Please merge into `master` and `release-1.5` after accepting.

## Brief change log

- add synchronization around `TaskEventDispatcher#subscribeToEvent()`'s 
access to `registeredHandlers`
- add synchronization around `TaskEventDispatcher#publish()`'s access to 
`registeredHandlers`

## Verifying this change

This change is already covered by existing tests (indirectly), such as 
`StaticlyNestedIterationsITCase.testJobWithoutObjectReuse()`. I ran it almost 
24000 times and could not reproduce it anymore with the change

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **not applicable**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-8517

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5621.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5621


commit aabeb89dd1259174c786f19b7e97c4c50038610f
Author: Nico Kruber 
Date:   2018-03-02T13:38:20Z

[FLINK-8517] fix missing synchronization in TaskEventDispatcher




> StaticlyNestedIterationsITCase.testJobWithoutObjectReuse unstable on Travis
> ---
>
> Key: FLINK-8517
> URL: https://issues.apache.org/jira/browse/FLINK-8517
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, TaskManager, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.3
>
>
> The {{StaticlyNestedIterationsITCase.testJobWithoutObjectReuse}} test case 
> fails on Travis. This exception might be relevant:
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: Partition 
> 557b069f2b89f8ba599e6ab0956a3f5a@58f1a6b7d8ae10b9141f17c08d06cecb not 
> registered at task event dispatcher.
>   at 
> org.apache.flink.runtime.io.network.TaskEventDispatcher.subscribeToEvent(TaskEventDispatcher.java:107)
>   at 
> org.apache.flink.runtime.iterative.task.IterationHeadTask.initSuperstepBarrier(IterationHeadTask.java:242)
>   at 
> org.apache.flink.runtime.iterative.

[jira] [Updated] (FLINK-8517) StaticlyNestedIterationsITCase.testJobWithoutObjectReuse unstable on Travis

2018-03-02 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-8517:
---
Fix Version/s: (was: 1.4.3)

> StaticlyNestedIterationsITCase.testJobWithoutObjectReuse unstable on Travis
> ---
>
> Key: FLINK-8517
> URL: https://issues.apache.org/jira/browse/FLINK-8517
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, TaskManager, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> The {{StaticlyNestedIterationsITCase.testJobWithoutObjectReuse}} test case 
> fails on Travis. This exception might be relevant:
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:891)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:834)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalStateException: Partition 
> 557b069f2b89f8ba599e6ab0956a3f5a@58f1a6b7d8ae10b9141f17c08d06cecb not 
> registered at task event dispatcher.
>   at 
> org.apache.flink.runtime.io.network.TaskEventDispatcher.subscribeToEvent(TaskEventDispatcher.java:107)
>   at 
> org.apache.flink.runtime.iterative.task.IterationHeadTask.initSuperstepBarrier(IterationHeadTask.java:242)
>   at 
> org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:266)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748){code}
>  
> https://api.travis-ci.org/v3/job/60156/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8839) Table source factory discovery is broken in SQL Client

2018-03-02 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8839:
---

 Summary: Table source factory discovery is broken in SQL Client
 Key: FLINK-8839
 URL: https://issues.apache.org/jira/browse/FLINK-8839
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: Timo Walther
Assignee: Timo Walther


Table source factories cannot not be discovered if they were added using a jar 
file.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383610#comment-16383610
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5622

[FLINK-8459][flip6] Implement RestClusterClient.cancelWithSavepoint

## What is the purpose of the change

*Introduce cancelJob flag to existing triggerSavepoint methods in 
Dispatcher and
JobMaster. Stop checkpoint scheduler before taking savepoint to make sure 
that
the savepoint created by this command is the last one.*

cc: @tillrohrmann 

## Brief change log

  - *Implement RestClusterClient.cancelWithSavepoint*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added `JobMasterTriggerSavepointIT`.*
  - *Manually tested.*
## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8459-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5622.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5622


commit 7e913b0d1eab8453279ffacc11f4633b9263190d
Author: gyao 
Date:   2018-03-02T14:11:36Z

[FLINK-8459][flip6] Implement RestClusterClient.cancelWithSavepoint

Introduce cancelJob flag to existing triggerSavepoint methods in Dispatcher 
and
JobMaster. Stop checkpoint scheduler before taking savepoint to make sure 
that
the savepoint created by this command is the last one.




> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...

2018-03-02 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5622

[FLINK-8459][flip6] Implement RestClusterClient.cancelWithSavepoint

## What is the purpose of the change

*Introduce cancelJob flag to existing triggerSavepoint methods in 
Dispatcher and
JobMaster. Stop checkpoint scheduler before taking savepoint to make sure 
that
the savepoint created by this command is the last one.*

cc: @tillrohrmann 

## Brief change log

  - *Implement RestClusterClient.cancelWithSavepoint*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added `JobMasterTriggerSavepointIT`.*
  - *Manually tested.*
## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8459-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5622.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5622


commit 7e913b0d1eab8453279ffacc11f4633b9263190d
Author: gyao 
Date:   2018-03-02T14:11:36Z

[FLINK-8459][flip6] Implement RestClusterClient.cancelWithSavepoint

Introduce cancelJob flag to existing triggerSavepoint methods in Dispatcher 
and
JobMaster. Stop checkpoint scheduler before taking savepoint to make sure 
that
the savepoint created by this command is the last one.




---


[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...

2018-03-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5622#discussion_r171857387
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final 
ResourceID resourceID) {
 
@Override
public CompletableFuture triggerSavepoint(
-   @Nullable final String targetDirectory,
-   final Time timeout) {
-   try {
-   return executionGraph.getCheckpointCoordinator()
-   .triggerSavepoint(System.currentTimeMillis(), 
targetDirectory)
-   
.thenApply(CompletedCheckpoint::getExternalPointer);
-   } catch (Exception e) {
-   return FutureUtils.completedExceptionally(e);
+   @Nullable final String targetDirectory,
+   final boolean cancelJob,
+   final Time timeout) {
+
+   final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+   if (checkpointCoordinator == null) {
+   return FutureUtils.completedExceptionally(new 
IllegalStateException(
+   String.format("Job %s is not a streaming job.", 
jobGraph.getJobID(;
--- End diff --

If the job is in a terminal state, the coordinator will be `null`ed as well.


---


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383612#comment-16383612
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5622#discussion_r171857387
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final 
ResourceID resourceID) {
 
@Override
public CompletableFuture triggerSavepoint(
-   @Nullable final String targetDirectory,
-   final Time timeout) {
-   try {
-   return executionGraph.getCheckpointCoordinator()
-   .triggerSavepoint(System.currentTimeMillis(), 
targetDirectory)
-   
.thenApply(CompletedCheckpoint::getExternalPointer);
-   } catch (Exception e) {
-   return FutureUtils.completedExceptionally(e);
+   @Nullable final String targetDirectory,
+   final boolean cancelJob,
+   final Time timeout) {
+
+   final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+   if (checkpointCoordinator == null) {
+   return FutureUtils.completedExceptionally(new 
IllegalStateException(
+   String.format("Job %s is not a streaming job.", 
jobGraph.getJobID(;
--- End diff --

If the job is in a terminal state, the coordinator will be `null`ed as well.


> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...

2018-03-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5622#discussion_r171857517
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final 
ResourceID resourceID) {
 
@Override
public CompletableFuture triggerSavepoint(
-   @Nullable final String targetDirectory,
-   final Time timeout) {
-   try {
-   return executionGraph.getCheckpointCoordinator()
-   .triggerSavepoint(System.currentTimeMillis(), 
targetDirectory)
-   
.thenApply(CompletedCheckpoint::getExternalPointer);
-   } catch (Exception e) {
-   return FutureUtils.completedExceptionally(e);
+   @Nullable final String targetDirectory,
+   final boolean cancelJob,
+   final Time timeout) {
+
+   final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+   if (checkpointCoordinator == null) {
+   return FutureUtils.completedExceptionally(new 
IllegalStateException(
+   String.format("Job %s is not a streaming job.", 
jobGraph.getJobID(;
+   }
+
+   if (cancelJob) {
+   checkpointCoordinator.stopCheckpointScheduler();
+   }
+   return checkpointCoordinator
+   .triggerSavepoint(System.currentTimeMillis(), 
targetDirectory)
+   .thenApply(CompletedCheckpoint::getExternalPointer)
+   .thenApplyAsync(path -> {
+   if (cancelJob) {
+   log.info("Savepoint stored in {}. Now 
cancelling {}.", path, jobGraph.getJobID());
+   cancel(timeout);
+   }
+   return path;
+   }, getMainThreadExecutor())
+   .exceptionally(throwable -> {
+   if (cancelJob) {
+   
startCheckpointScheduler(checkpointCoordinator);
--- End diff --

If the cancelation failed, we restart the scheduler as well. I think this 
differs from the previous implementation.


---


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383616#comment-16383616
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5622#discussion_r171857517
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final 
ResourceID resourceID) {
 
@Override
public CompletableFuture triggerSavepoint(
-   @Nullable final String targetDirectory,
-   final Time timeout) {
-   try {
-   return executionGraph.getCheckpointCoordinator()
-   .triggerSavepoint(System.currentTimeMillis(), 
targetDirectory)
-   
.thenApply(CompletedCheckpoint::getExternalPointer);
-   } catch (Exception e) {
-   return FutureUtils.completedExceptionally(e);
+   @Nullable final String targetDirectory,
+   final boolean cancelJob,
+   final Time timeout) {
+
+   final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+   if (checkpointCoordinator == null) {
+   return FutureUtils.completedExceptionally(new 
IllegalStateException(
+   String.format("Job %s is not a streaming job.", 
jobGraph.getJobID(;
+   }
+
+   if (cancelJob) {
+   checkpointCoordinator.stopCheckpointScheduler();
+   }
+   return checkpointCoordinator
+   .triggerSavepoint(System.currentTimeMillis(), 
targetDirectory)
+   .thenApply(CompletedCheckpoint::getExternalPointer)
+   .thenApplyAsync(path -> {
+   if (cancelJob) {
+   log.info("Savepoint stored in {}. Now 
cancelling {}.", path, jobGraph.getJobID());
+   cancel(timeout);
+   }
+   return path;
+   }, getMainThreadExecutor())
+   .exceptionally(throwable -> {
+   if (cancelJob) {
+   
startCheckpointScheduler(checkpointCoordinator);
--- End diff --

If the cancelation failed, we restart the scheduler as well. I think this 
differs from the previous implementation.


> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...

2018-03-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5622#discussion_r171857970
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final 
ResourceID resourceID) {
 
@Override
public CompletableFuture triggerSavepoint(
-   @Nullable final String targetDirectory,
-   final Time timeout) {
-   try {
-   return executionGraph.getCheckpointCoordinator()
-   .triggerSavepoint(System.currentTimeMillis(), 
targetDirectory)
-   
.thenApply(CompletedCheckpoint::getExternalPointer);
-   } catch (Exception e) {
-   return FutureUtils.completedExceptionally(e);
+   @Nullable final String targetDirectory,
+   final boolean cancelJob,
+   final Time timeout) {
+
+   final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+   if (checkpointCoordinator == null) {
+   return FutureUtils.completedExceptionally(new 
IllegalStateException(
+   String.format("Job %s is not a streaming job.", 
jobGraph.getJobID(;
+   }
+
+   if (cancelJob) {
+   checkpointCoordinator.stopCheckpointScheduler();
+   }
+   return checkpointCoordinator
+   .triggerSavepoint(System.currentTimeMillis(), 
targetDirectory)
+   .thenApply(CompletedCheckpoint::getExternalPointer)
+   .thenApplyAsync(path -> {
+   if (cancelJob) {
+   log.info("Savepoint stored in {}. Now 
cancelling {}.", path, jobGraph.getJobID());
+   cancel(timeout);
+   }
+   return path;
+   }, getMainThreadExecutor())
+   .exceptionally(throwable -> {
+   if (cancelJob) {
+   
startCheckpointScheduler(checkpointCoordinator);
+   }
+   throw new CompletionException(throwable);
+   });
+   }
+
+   private void startCheckpointScheduler(final CheckpointCoordinator 
checkpointCoordinator) {
--- End diff --

Method can be reused in the job rescaling logic.


---


[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...

2018-03-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5622#discussion_r171858158
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBodyTest.java
 ---
@@ -34,8 +34,8 @@
}
 
@Override
-   protected SavepointTriggerRequestBody getTestRequestInstance() throws 
Exception {
-   return new SavepointTriggerRequestBody("/tmp");
+   protected SavepointTriggerRequestBody getTestRequestInstance() {
+   return new SavepointTriggerRequestBody("/tmp", true);
--- End diff --

strictly speaking the `false` case should be tested as well


---


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383618#comment-16383618
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5622#discussion_r171857970
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final 
ResourceID resourceID) {
 
@Override
public CompletableFuture triggerSavepoint(
-   @Nullable final String targetDirectory,
-   final Time timeout) {
-   try {
-   return executionGraph.getCheckpointCoordinator()
-   .triggerSavepoint(System.currentTimeMillis(), 
targetDirectory)
-   
.thenApply(CompletedCheckpoint::getExternalPointer);
-   } catch (Exception e) {
-   return FutureUtils.completedExceptionally(e);
+   @Nullable final String targetDirectory,
+   final boolean cancelJob,
+   final Time timeout) {
+
+   final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+   if (checkpointCoordinator == null) {
+   return FutureUtils.completedExceptionally(new 
IllegalStateException(
+   String.format("Job %s is not a streaming job.", 
jobGraph.getJobID(;
+   }
+
+   if (cancelJob) {
+   checkpointCoordinator.stopCheckpointScheduler();
+   }
+   return checkpointCoordinator
+   .triggerSavepoint(System.currentTimeMillis(), 
targetDirectory)
+   .thenApply(CompletedCheckpoint::getExternalPointer)
+   .thenApplyAsync(path -> {
+   if (cancelJob) {
+   log.info("Savepoint stored in {}. Now 
cancelling {}.", path, jobGraph.getJobID());
+   cancel(timeout);
+   }
+   return path;
+   }, getMainThreadExecutor())
+   .exceptionally(throwable -> {
+   if (cancelJob) {
+   
startCheckpointScheduler(checkpointCoordinator);
+   }
+   throw new CompletionException(throwable);
+   });
+   }
+
+   private void startCheckpointScheduler(final CheckpointCoordinator 
checkpointCoordinator) {
--- End diff --

Method can be reused in the job rescaling logic.


> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383619#comment-16383619
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5622#discussion_r171858158
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBodyTest.java
 ---
@@ -34,8 +34,8 @@
}
 
@Override
-   protected SavepointTriggerRequestBody getTestRequestInstance() throws 
Exception {
-   return new SavepointTriggerRequestBody("/tmp");
+   protected SavepointTriggerRequestBody getTestRequestInstance() {
+   return new SavepointTriggerRequestBody("/tmp", true);
--- End diff --

strictly speaking the `false` case should be tested as well


> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383622#comment-16383622
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5622#discussion_r171858593
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointTriggerException;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.isOneOf;
+
+/**
+ * Tests for {@link 
org.apache.flink.runtime.jobmaster.JobMaster#triggerSavepoint(String, boolean, 
Time)}.
+ *
+ * @see org.apache.flink.runtime.jobmaster.JobMaster
+ */
+@Category(Flip6.class)
+public class JobMasterTriggerSavepointIT extends AbstractTestBase {
+
+   private static CountDownLatch invokeLatch;
+
+   private static volatile CountDownLatch triggerCheckpointLatch;
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private Path savepointDirectory;
+   private MiniClusterClient clusterClient;
+   private JobGraph jobGraph;
+
+   @Before
+   public void setUp() throws Exception {
+   invokeLatch = new CountDownLatch(1);
+   triggerCheckpointLatch = new CountDownLatch(1);
+   savepointDirectory = temporaryFolder.newFolder().toPath();
+
+   Assume.assumeTrue(
+   "ClusterClient is not an instance of MiniClusterClient",
+   miniClusterResource.getClusterClient() instanceof 
MiniClusterClient);
+
+   clusterClient = (MiniClusterClient) 
miniClusterResource.getClusterClient();
+   clusterClient.setDetached(true);
+
+   jobGraph = new JobGraph();
+
+   final JobVertex vertex = new JobVertex("testVertex")

[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383623#comment-16383623
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5622#discussion_r171858662
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointTriggerException;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.isOneOf;
+
+/**
+ * Tests for {@link 
org.apache.flink.runtime.jobmaster.JobMaster#triggerSavepoint(String, boolean, 
Time)}.
+ *
+ * @see org.apache.flink.runtime.jobmaster.JobMaster
+ */
+@Category(Flip6.class)
+public class JobMasterTriggerSavepointIT extends AbstractTestBase {
+
+   private static CountDownLatch invokeLatch;
+
+   private static volatile CountDownLatch triggerCheckpointLatch;
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private Path savepointDirectory;
+   private MiniClusterClient clusterClient;
+   private JobGraph jobGraph;
+
+   @Before
+   public void setUp() throws Exception {
+   invokeLatch = new CountDownLatch(1);
+   triggerCheckpointLatch = new CountDownLatch(1);
+   savepointDirectory = temporaryFolder.newFolder().toPath();
+
+   Assume.assumeTrue(
--- End diff --

shouldn't happen if category is `flip6`


> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
> 

[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...

2018-03-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5622#discussion_r171858662
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointTriggerException;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.isOneOf;
+
+/**
+ * Tests for {@link 
org.apache.flink.runtime.jobmaster.JobMaster#triggerSavepoint(String, boolean, 
Time)}.
+ *
+ * @see org.apache.flink.runtime.jobmaster.JobMaster
+ */
+@Category(Flip6.class)
+public class JobMasterTriggerSavepointIT extends AbstractTestBase {
+
+   private static CountDownLatch invokeLatch;
+
+   private static volatile CountDownLatch triggerCheckpointLatch;
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private Path savepointDirectory;
+   private MiniClusterClient clusterClient;
+   private JobGraph jobGraph;
+
+   @Before
+   public void setUp() throws Exception {
+   invokeLatch = new CountDownLatch(1);
+   triggerCheckpointLatch = new CountDownLatch(1);
+   savepointDirectory = temporaryFolder.newFolder().toPath();
+
+   Assume.assumeTrue(
--- End diff --

shouldn't happen if category is `flip6`


---


[GitHub] flink pull request #5622: [FLINK-8459][flip6] Implement RestClusterClient.ca...

2018-03-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5622#discussion_r171858593
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointTriggerException;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.isOneOf;
+
+/**
+ * Tests for {@link 
org.apache.flink.runtime.jobmaster.JobMaster#triggerSavepoint(String, boolean, 
Time)}.
+ *
+ * @see org.apache.flink.runtime.jobmaster.JobMaster
+ */
+@Category(Flip6.class)
+public class JobMasterTriggerSavepointIT extends AbstractTestBase {
+
+   private static CountDownLatch invokeLatch;
+
+   private static volatile CountDownLatch triggerCheckpointLatch;
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private Path savepointDirectory;
+   private MiniClusterClient clusterClient;
+   private JobGraph jobGraph;
+
+   @Before
+   public void setUp() throws Exception {
+   invokeLatch = new CountDownLatch(1);
+   triggerCheckpointLatch = new CountDownLatch(1);
+   savepointDirectory = temporaryFolder.newFolder().toPath();
+
+   Assume.assumeTrue(
+   "ClusterClient is not an instance of MiniClusterClient",
+   miniClusterResource.getClusterClient() instanceof 
MiniClusterClient);
+
+   clusterClient = (MiniClusterClient) 
miniClusterResource.getClusterClient();
+   clusterClient.setDetached(true);
+
+   jobGraph = new JobGraph();
+
+   final JobVertex vertex = new JobVertex("testVertex");
+   vertex.setInvokableClass(NoOpBlockingInvokable.class);
+   jobGraph.addVertex(vertex);
+
+   jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
+   Collections.singletonList(vertex.g

  1   2   >