[jira] [Commented] (FLINK-7391) Normalize release entries

2018-07-16 Thread Neelesh Srinivas Salian (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546060#comment-16546060
 ] 

Neelesh Srinivas Salian commented on FLINK-7391:


[~Zentol], is anyone working on this? I could take it up.

> Normalize release entries
> -
>
> Key: FLINK-7391
> URL: https://issues.apache.org/jira/browse/FLINK-7391
> Project: Flink
>  Issue Type: Improvement
>  Components: Project Website
>Reporter: Chesnay Schepler
>Priority: Major
>  Labels: starter
>
> The release list at http://flink.apache.org/downloads.html is inconsistent in 
> regards to the java/scala docs links. For 1.1.3 and below we only include a 
> docs link for the latest version (i.e 1.1.3, but not for 1.1.2), for higher 
> versions we have a docs link for every release.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-6895) Add STR_TO_DATE supported in SQL

2018-07-16 Thread Neelesh Srinivas Salian (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546057#comment-16546057
 ] 

Neelesh Srinivas Salian edited comment on FLINK-6895 at 7/17/18 5:50 AM:
-

[~fhueske] [~twalthr] is this Jira being worked on?

I could assign it and start working on it and what is needed for FLINK-6976 


was (Author: nssalian):
[~fhueske] [~twalthr] is this Jira being worked on?

I could assign it and start working on it. 

> Add STR_TO_DATE supported in SQL
> 
>
> Key: FLINK-6895
> URL: https://issues.apache.org/jira/browse/FLINK-6895
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: buptljy
>Priority: Major
>
> STR_TO_DATE(str,format) This is the inverse of the DATE_FORMAT() function. It 
> takes a string str and a format string format. STR_TO_DATE() returns a 
> DATETIME value if the format string contains both date and time parts, or a 
> DATE or TIME value if the string contains only date or time parts. If the 
> date, time, or datetime value extracted from str is illegal, STR_TO_DATE() 
> returns NULL and produces a warning.
> * Syntax:
> STR_TO_DATE(str,format) 
> * Arguments
> **str: -
> **format: -
> * Return Types
>   DATAETIME/DATE/TIME
> * Example:
>   STR_TO_DATE('01,5,2013','%d,%m,%Y') -> '2013-05-01'
>   SELECT STR_TO_DATE('a09:30:17','a%h:%i:%s') -> '09:30:17'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_str-to-date]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6895) Add STR_TO_DATE supported in SQL

2018-07-16 Thread Neelesh Srinivas Salian (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546057#comment-16546057
 ] 

Neelesh Srinivas Salian commented on FLINK-6895:


[~fhueske] [~twalthr] is this Jira being worked on?

I could assign it and start working on it. 

> Add STR_TO_DATE supported in SQL
> 
>
> Key: FLINK-6895
> URL: https://issues.apache.org/jira/browse/FLINK-6895
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: buptljy
>Priority: Major
>
> STR_TO_DATE(str,format) This is the inverse of the DATE_FORMAT() function. It 
> takes a string str and a format string format. STR_TO_DATE() returns a 
> DATETIME value if the format string contains both date and time parts, or a 
> DATE or TIME value if the string contains only date or time parts. If the 
> date, time, or datetime value extracted from str is illegal, STR_TO_DATE() 
> returns NULL and produces a warning.
> * Syntax:
> STR_TO_DATE(str,format) 
> * Arguments
> **str: -
> **format: -
> * Return Types
>   DATAETIME/DATE/TIME
> * Example:
>   STR_TO_DATE('01,5,2013','%d,%m,%Y') -> '2013-05-01'
>   SELECT STR_TO_DATE('a09:30:17','a%h:%i:%s') -> '09:30:17'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/date-and-time-functions.html#function_str-to-date]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9862) Update end-to-end test to use RocksDB backed timers

2018-07-16 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai reassigned FLINK-9862:
--

Assignee: Tzu-Li (Gordon) Tai

> Update end-to-end test to use RocksDB backed timers
> ---
>
> Key: FLINK-9862
> URL: https://issues.apache.org/jira/browse/FLINK-9862
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing, Streaming
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.6.0
>
>
> We should add or modify an end-to-end test to use RocksDB backed timers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9829) The wrapper classes be compared by symbol of '==' directly in BigDecSerializer.java

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546036#comment-16546036
 ] 

ASF GitHub Bot commented on FLINK-9829:
---

Github user lamber-ken closed the pull request at:

https://github.com/apache/flink/pull/6321


> The wrapper classes be compared by symbol of '==' directly in 
> BigDecSerializer.java
> ---
>
> Key: FLINK-9829
> URL: https://issues.apache.org/jira/browse/FLINK-9829
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.5.0
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> The wrapper classes should be compared by equals method rather than by symbol 
> of '==' directly in BigDecSerializer.java



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6321: [FLINK-9829] fix the wrapper classes be compared b...

2018-07-16 Thread lamber-ken
Github user lamber-ken closed the pull request at:

https://github.com/apache/flink/pull/6321


---


[jira] [Commented] (FLINK-9866) Allow passing program arguments to StandaloneJobCluster

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546032#comment-16546032
 ] 

ASF GitHub Bot commented on FLINK-9866:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6344#discussion_r202895544
  
--- Diff: flink-container/kubernetes/README.md ---
@@ -17,6 +17,7 @@ The files contain the following variables:
 
 - `${FLINK_IMAGE_NAME}`: Name of the image to use for the container
 - `${FLINK_JOB}`: Name of the Flink job to start (the user code jar must 
be included in the container image)
+- `${FLINK_JOB_ARGUMENS}`: Job specific command line arguments
--- End diff --

Shall we give a example or more documentation to guide how to pass the 
command line arguments? for example the format, like "--arg val" or something 
else? because here are many formats such as "--key value", "-Dxxx=xx".


> Allow passing program arguments to StandaloneJobCluster
> ---
>
> Key: FLINK-9866
> URL: https://issues.apache.org/jira/browse/FLINK-9866
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Right now always an empty array is passed as arguments to 
> {{StandaloneJobClusterEntryPoint}}. Should pass the parsed arguments. Also we 
> should extend run and docker scripts to allow passing arguments.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6344: [FLINK-9866] Allow passing command line arguments ...

2018-07-16 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6344#discussion_r202895544
  
--- Diff: flink-container/kubernetes/README.md ---
@@ -17,6 +17,7 @@ The files contain the following variables:
 
 - `${FLINK_IMAGE_NAME}`: Name of the image to use for the container
 - `${FLINK_JOB}`: Name of the Flink job to start (the user code jar must 
be included in the container image)
+- `${FLINK_JOB_ARGUMENS}`: Job specific command line arguments
--- End diff --

Shall we give a example or more documentation to guide how to pass the 
command line arguments? for example the format, like "--arg val" or something 
else? because here are many formats such as "--key value", "-Dxxx=xx".


---


[jira] [Commented] (FLINK-9869) Send PartitionInfo in batch to Improve perfornance

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546030#comment-16546030
 ] 

ASF GitHub Bot commented on FLINK-9869:
---

Github user tison1 commented on the issue:

https://github.com/apache/flink/pull/6345
  
cc @tillrohrmann @fhueske 


> Send PartitionInfo in batch to Improve perfornance
> --
>
> Key: FLINK-9869
> URL: https://issues.apache.org/jira/browse/FLINK-9869
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: 1.5.1
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> ... current we send partition info as soon as one arrive. we could 
> `cachePartitionInfo` and then `sendPartitionInfoAsync`, which will improve 
> performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6345: [FLINK-9869] Send PartitionInfo in batch to Improve perfo...

2018-07-16 Thread tison1
Github user tison1 commented on the issue:

https://github.com/apache/flink/pull/6345
  
cc @tillrohrmann @fhueske 


---


[jira] [Assigned] (FLINK-9868) Expose channel id to ProcessFunction

2018-07-16 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9868:
---

Assignee: vinoyang

> Expose channel id to ProcessFunction
> 
>
> Key: FLINK-9868
> URL: https://issues.apache.org/jira/browse/FLINK-9868
> Project: Flink
>  Issue Type: New Feature
>  Components: Local Runtime
>Reporter: Hequn Cheng
>Assignee: vinoyang
>Priority: Major
>
> Currently, channel id has not been exposed from {{StreamInputProcessor}} to 
> the {{ProcessOperator}} and {{ProcessFunction}}. There are some cases that 
> users want the channel id, as discovered [here(mailing 
> list)|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallelism-and-keyed-streams-td21501.html].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9869) Send PartitionInfo in batch to Improve perfornance

2018-07-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9869:
--
Labels: pull-request-available  (was: )

> Send PartitionInfo in batch to Improve perfornance
> --
>
> Key: FLINK-9869
> URL: https://issues.apache.org/jira/browse/FLINK-9869
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: 1.5.1
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> ... current we send partition info as soon as one arrive. we could 
> `cachePartitionInfo` and then `sendPartitionInfoAsync`, which will improve 
> performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9869) Send PartitionInfo in batch to Improve perfornance

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16546007#comment-16546007
 ] 

ASF GitHub Bot commented on FLINK-9869:
---

GitHub user tison1 opened a pull request:

https://github.com/apache/flink/pull/6345

[FLINK-9869] Send PartitionInfo in batch to Improve perfornance

## What is the purpose of the change

Current we send partition info as soon as one arrive. we could 
`cachePartitionInfo` and then `sendPartitionInfoAsync`, which will improve 
performance.

... also improve task deployment

## Brief change log

- `Execution`
  - now deploy task in another thread
  - as describe above, now we first `cachePartitionInfo` and then 
`sendPartitionInfoAsync`
- add a config option 
`JobManagerOptions#UPDATE_PARTITION_INFO_SEND_INTERVAL`, which config the time 
window for cachePartitionInfo
- update `ExecutionGraphDeploymentTest` and 
`ExecutionVertexDeploymentTest`, which also tests changes above

## Verifying this change

This change is already covered by existing tests, such as 
`ExecutionGraphDeploymentTest` and `ExecutionVertexDeploymentTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no, it's internal)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tison1/flink partition-improve

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6345.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6345


commit ca9ffbb99e91a8415d7469cba4bf2075615edc0d
Author: 陈梓立 
Date:   2018-07-17T04:11:36Z

[FLINK-9869] Send PartitionInfo in batch to Improve perfornance




> Send PartitionInfo in batch to Improve perfornance
> --
>
> Key: FLINK-9869
> URL: https://issues.apache.org/jira/browse/FLINK-9869
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: 1.5.1
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> ... current we send partition info as soon as one arrive. we could 
> `cachePartitionInfo` and then `sendPartitionInfoAsync`, which will improve 
> performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6345: [FLINK-9869] Send PartitionInfo in batch to Improv...

2018-07-16 Thread tison1
GitHub user tison1 opened a pull request:

https://github.com/apache/flink/pull/6345

[FLINK-9869] Send PartitionInfo in batch to Improve perfornance

## What is the purpose of the change

Current we send partition info as soon as one arrive. we could 
`cachePartitionInfo` and then `sendPartitionInfoAsync`, which will improve 
performance.

... also improve task deployment

## Brief change log

- `Execution`
  - now deploy task in another thread
  - as describe above, now we first `cachePartitionInfo` and then 
`sendPartitionInfoAsync`
- add a config option 
`JobManagerOptions#UPDATE_PARTITION_INFO_SEND_INTERVAL`, which config the time 
window for cachePartitionInfo
- update `ExecutionGraphDeploymentTest` and 
`ExecutionVertexDeploymentTest`, which also tests changes above

## Verifying this change

This change is already covered by existing tests, such as 
`ExecutionGraphDeploymentTest` and `ExecutionVertexDeploymentTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no, it's internal)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tison1/flink partition-improve

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6345.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6345


commit ca9ffbb99e91a8415d7469cba4bf2075615edc0d
Author: 陈梓立 
Date:   2018-07-17T04:11:36Z

[FLINK-9869] Send PartitionInfo in batch to Improve perfornance




---


[jira] [Created] (FLINK-9869) Send PartitionInfo in batch to Improve perfornance

2018-07-16 Thread JIRA
陈梓立 created FLINK-9869:
--

 Summary: Send PartitionInfo in batch to Improve perfornance
 Key: FLINK-9869
 URL: https://issues.apache.org/jira/browse/FLINK-9869
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 1.5.1
Reporter: 陈梓立
Assignee: 陈梓立
 Fix For: 1.5.2


... current we send partition info as soon as one arrive. we could 
`cachePartitionInfo` and then `sendPartitionInfoAsync`, which will improve 
performance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9868) Expose channel id to ProcessFunction

2018-07-16 Thread Hequn Cheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng updated FLINK-9868:
---
Description: Currently, channel id has not been exposed from 
{{StreamInputProcessor}} to the {{ProcessOperator}} and {{ProcessFunction}}. 
There are some cases that users want the channel id, as discovered 
[here(mailing 
list)|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallelism-and-keyed-streams-td21501.html].
  (was: Currently, channel id has not been exposed from 
{{StreamInputProcessor}} to the {{ProcessOperator}} and {{ProcessFunction}}. 
There are some cases that users want the channel id(), as discovered 
[here(mailing 
list)|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallelism-and-keyed-streams-td21501.html].)

> Expose channel id to ProcessFunction
> 
>
> Key: FLINK-9868
> URL: https://issues.apache.org/jira/browse/FLINK-9868
> Project: Flink
>  Issue Type: New Feature
>  Components: Local Runtime
>Reporter: Hequn Cheng
>Priority: Major
>
> Currently, channel id has not been exposed from {{StreamInputProcessor}} to 
> the {{ProcessOperator}} and {{ProcessFunction}}. There are some cases that 
> users want the channel id, as discovered [here(mailing 
> list)|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallelism-and-keyed-streams-td21501.html].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9868) Expose channel id to ProcessFunction

2018-07-16 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-9868:
--

 Summary: Expose channel id to ProcessFunction
 Key: FLINK-9868
 URL: https://issues.apache.org/jira/browse/FLINK-9868
 Project: Flink
  Issue Type: New Feature
  Components: Local Runtime
Reporter: Hequn Cheng


Currently, channel id has not been exposed from {{StreamInputProcessor}} to the 
{{ProcessOperator}} and {{ProcessFunction}}. There are some cases that users 
want the channel id(), as discovered [here(mailing 
list)|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Parallelism-and-keyed-streams-td21501.html].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9859) Distinguish TM akka config with JM config

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545885#comment-16545885
 ] 

ASF GitHub Bot commented on FLINK-9859:
---

Github user tison1 commented on the issue:

https://github.com/apache/flink/pull/6339
  
cc @zentol @tzulitai 


> Distinguish TM akka config with JM config
> -
>
> Key: FLINK-9859
> URL: https://issues.apache.org/jira/browse/FLINK-9859
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: 1.5.1
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> ... increase the number of akka threads on JM, to improve its performance; 
> decrease the number of akka threads on TM, to save resource.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6339: [FLINK-9859][Runtime] Distinguish TM akka config with JM ...

2018-07-16 Thread tison1
Github user tison1 commented on the issue:

https://github.com/apache/flink/pull/6339
  
cc @zentol @tzulitai 


---


[jira] [Created] (FLINK-9867) Extend release notes for Flink 1.6

2018-07-16 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-9867:


 Summary: Extend release notes for Flink 1.6
 Key: FLINK-9867
 URL: https://issues.apache.org/jira/browse/FLINK-9867
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.6.0
Reporter: Till Rohrmann
 Fix For: 1.6.0


We should extend the release notes under {{/docs/release-notes/flink-1.6.md}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545737#comment-16545737
 ] 

ASF GitHub Bot commented on FLINK-9489:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6333


> Checkpoint timers as part of managed keyed state instead of raw keyed state
> ---
>
> Key: FLINK-9489
> URL: https://issues.apache.org/jira/browse/FLINK-9489
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Timer state should now become part of the keyed state backend snapshot, i.e., 
> stored inside the managed keyed state. This means that we have to connect our 
> preparation for asynchronous checkpoints with the backend, so that the timers 
> are written as part of the state for each key-group. This means that we will 
> also free up the raw keyed state an might expose it to user functions in the 
> future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6333: [FLINK-9489] Checkpoint timers as part of managed ...

2018-07-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6333


---


[jira] [Resolved] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state

2018-07-16 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9489.
--
Resolution: Done

Done via dbddf00b75032c20df6e7aef26814da392347194

> Checkpoint timers as part of managed keyed state instead of raw keyed state
> ---
>
> Key: FLINK-9489
> URL: https://issues.apache.org/jira/browse/FLINK-9489
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Timer state should now become part of the keyed state backend snapshot, i.e., 
> stored inside the managed keyed state. This means that we have to connect our 
> preparation for asynchronous checkpoints with the backend, so that the timers 
> are written as part of the state for each key-group. This means that we will 
> also free up the raw keyed state an might expose it to user functions in the 
> future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9490) Provide backwards compatibility for timer state of Flink 1.5

2018-07-16 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9490?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9490.
--
Resolution: Done

Done via dbddf00b75032c20df6e7aef26814da392347194

> Provide backwards compatibility for timer state of Flink 1.5 
> -
>
> Key: FLINK-9490
> URL: https://issues.apache.org/jira/browse/FLINK-9490
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Blocker
> Fix For: 1.6.0
>
>
> As we changed how timers are written to the snapshot, we also need to 
> implement a backwards compatibility path that reads timers from Flink 1.5 and 
> inserts them into our new timer state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9792) Cannot add html tags in options description

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545674#comment-16545674
 ] 

ASF GitHub Bot commented on FLINK-9792:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6312#discussion_r202803069
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/description/LineBreakElement.java
 ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration.description;
+
+/**
+ * Represents a line break in the {@link Description}.
+ */
+public class LineBreakElement implements BlockElement {
+
+   /**
+* Creates a line break in the description.
+*/
+   public static LineBreakElement linebreak() {
+   return new LineBreakElement();
+   }
+
+   private LineBreakElement() {
+   }
+
+   @Override
+   public String format(Formatter formatter) {
--- End diff --

That's actually a really neat idea with the new instance of the formatter! 
I didn't think of this. It actually solves the part I had problems with.
I uploaded a slightly adjusted version of your second solution.


> Cannot add html tags in options description
> ---
>
> Key: FLINK-9792
> URL: https://issues.apache.org/jira/browse/FLINK-9792
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.1, 1.6.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
>
> Right now it is impossible to add any html tags in options description, 
> because all "<" and ">" are escaped. Therefore some links there do not work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6312: [FLINK-9792] Added custom Description class for Co...

2018-07-16 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6312#discussion_r202803069
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/description/LineBreakElement.java
 ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration.description;
+
+/**
+ * Represents a line break in the {@link Description}.
+ */
+public class LineBreakElement implements BlockElement {
+
+   /**
+* Creates a line break in the description.
+*/
+   public static LineBreakElement linebreak() {
+   return new LineBreakElement();
+   }
+
+   private LineBreakElement() {
+   }
+
+   @Override
+   public String format(Formatter formatter) {
--- End diff --

That's actually a really neat idea with the new instance of the formatter! 
I didn't think of this. It actually solves the part I had problems with.
I uploaded a slightly adjusted version of your second solution.


---


[jira] [Updated] (FLINK-9866) Allow passing program arguments to StandaloneJobCluster

2018-07-16 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated FLINK-9866:

Component/s: Distributed Coordination

> Allow passing program arguments to StandaloneJobCluster
> ---
>
> Key: FLINK-9866
> URL: https://issues.apache.org/jira/browse/FLINK-9866
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Right now always an empty array is passed as arguments to 
> {{StandaloneJobClusterEntryPoint}}. Should pass the parsed arguments. Also we 
> should extend run and docker scripts to allow passing arguments.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6997) SavepointITCase fails in master branch sometimes

2018-07-16 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545624#comment-16545624
 ] 

Chesnay Schepler commented on FLINK-6997:
-

Tried reproducing this locally but failed to do so in several hundred runs for 
both legacy/new mode.

At this point I'm inclined to close this as {{Cannot reproduce}}. In any case 
we should look into improving the error message to specify which task wasn't 
running and in which state it was.

> SavepointITCase fails in master branch sometimes
> 
>
> Key: FLINK-6997
> URL: https://issues.apache.org/jira/browse/FLINK-6997
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.3.0, 1.5.0
>Reporter: Ted Yu
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.5.2, 1.6.0
>
>
> I got the following test failure (with commit 
> a0b781461bcf8c2f1d00b93464995f03eda589f1)
> {code}
> testSavepointForJobWithIteration(org.apache.flink.test.checkpointing.SavepointITCase)
>   Time elapsed: 8.129 sec  <<< ERROR!
> java.io.IOException: java.lang.Exception: Failed to complete savepoint
>   at 
> org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:342)
>   at 
> org.apache.flink.runtime.testingUtils.TestingCluster.triggerSavepoint(TestingCluster.scala:316)
>   at 
> org.apache.flink.test.checkpointing.SavepointITCase.testSavepointForJobWithIteration(SavepointITCase.java:827)
> Caused by: java.lang.Exception: Failed to complete savepoint
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:821)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anon$7.apply(JobManager.scala:805)
>   at 
> org.apache.flink.runtime.concurrent.impl.FlinkFuture$5.onComplete(FlinkFuture.java:272)
>   at akka.dispatch.OnComplete.internal(Future.scala:247)
>   at akka.dispatch.OnComplete.internal(Future.scala:245)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:175)
>   at akka.dispatch.japi$CallbackBridge.apply(Future.scala:172)
>   at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
>   at 
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>   at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>   at 
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: Failed to trigger savepoint: Not all required 
> tasks are currently running.
>   at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:382)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:800)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at 
> org.apache.flink.runtime.testingUtils.TestingJobManagerLike$$anonfun$handleTestingMessage$1.applyOrElse(TestingJobManagerLike.scala:95)
>   at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>   at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>   at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>   at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>   at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> {code}



--
This message 

[jira] [Updated] (FLINK-9866) Allow passing program arguments to StandaloneJobCluster

2018-07-16 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated FLINK-9866:

Fix Version/s: 1.6.0

> Allow passing program arguments to StandaloneJobCluster
> ---
>
> Key: FLINK-9866
> URL: https://issues.apache.org/jira/browse/FLINK-9866
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> Right now always an empty array is passed as arguments to 
> {{StandaloneJobClusterEntryPoint}}. Should pass the parsed arguments. Also we 
> should extend run and docker scripts to allow passing arguments.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9815) YARNSessionCapacitySchedulerITCase flaky

2018-07-16 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated FLINK-9815:

Priority: Blocker  (was: Major)

> YARNSessionCapacitySchedulerITCase flaky
> 
>
> Key: FLINK-9815
> URL: https://issues.apache.org/jira/browse/FLINK-9815
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Priority: Blocker
> Fix For: 1.6.0
>
>
> The test fails because of dangling yarn applications.
> Logs: https://api.travis-ci.org/v3/job/402657694/log.txt
> It was also reported previously in [FLINK-8161] : 
> https://issues.apache.org/jira/browse/FLINK-8161?focusedCommentId=16480216=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16480216



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9815) YARNSessionCapacitySchedulerITCase flaky

2018-07-16 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated FLINK-9815:

Fix Version/s: 1.6.0

> YARNSessionCapacitySchedulerITCase flaky
> 
>
> Key: FLINK-9815
> URL: https://issues.apache.org/jira/browse/FLINK-9815
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Priority: Blocker
> Fix For: 1.6.0
>
>
> The test fails because of dangling yarn applications.
> Logs: https://api.travis-ci.org/v3/job/402657694/log.txt
> It was also reported previously in [FLINK-8161] : 
> https://issues.apache.org/jira/browse/FLINK-8161?focusedCommentId=16480216=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16480216



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9815) YARNSessionCapacitySchedulerITCase flaky

2018-07-16 Thread Dawid Wysakowicz (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545619#comment-16545619
 ] 

Dawid Wysakowicz commented on FLINK-9815:
-

Another failure: https://api.travis-ci.org/v3/job/404478412/log.txt

> YARNSessionCapacitySchedulerITCase flaky
> 
>
> Key: FLINK-9815
> URL: https://issues.apache.org/jira/browse/FLINK-9815
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Priority: Major
> Fix For: 1.6.0
>
>
> The test fails because of dangling yarn applications.
> Logs: https://api.travis-ci.org/v3/job/402657694/log.txt
> It was also reported previously in [FLINK-8161] : 
> https://issues.apache.org/jira/browse/FLINK-8161?focusedCommentId=16480216=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16480216



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9866) Allow passing program arguments to StandaloneJobCluster

2018-07-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9866:
--
Labels: pull-request-available  (was: )

> Allow passing program arguments to StandaloneJobCluster
> ---
>
> Key: FLINK-9866
> URL: https://issues.apache.org/jira/browse/FLINK-9866
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
>
> Right now always an empty array is passed as arguments to 
> {{StandaloneJobClusterEntryPoint}}. Should pass the parsed arguments. Also we 
> should extend run and docker scripts to allow passing arguments.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9866) Allow passing program arguments to StandaloneJobCluster

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545618#comment-16545618
 ] 

ASF GitHub Bot commented on FLINK-9866:
---

GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/6344

[FLINK-9866] Allow passing command line arguments to standalone job

## What is the purpose of the change

Allow passing program arguments to standalone job

## Brief change log

*(for example:)*
  - Pass parsed program arguments to StandloneJobClusterEntry
  - updated standalone-job.sh script
  - updated docker scripts


## Verifying this change

* extended 
org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPointTest to 
take parameters into account

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink FLINK-9866

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6344.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6344


commit 899b9eebe27cfff7348c6cde28428e19533ee2ed
Author: Dawid Wysakowicz 
Date:   2018-07-16T15:30:17Z

[FLINK-9866] Allow passing command line arguments to standalone job




> Allow passing program arguments to StandaloneJobCluster
> ---
>
> Key: FLINK-9866
> URL: https://issues.apache.org/jira/browse/FLINK-9866
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.6.0
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
>
> Right now always an empty array is passed as arguments to 
> {{StandaloneJobClusterEntryPoint}}. Should pass the parsed arguments. Also we 
> should extend run and docker scripts to allow passing arguments.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6344: [FLINK-9866] Allow passing command line arguments ...

2018-07-16 Thread dawidwys
GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/6344

[FLINK-9866] Allow passing command line arguments to standalone job

## What is the purpose of the change

Allow passing program arguments to standalone job

## Brief change log

*(for example:)*
  - Pass parsed program arguments to StandloneJobClusterEntry
  - updated standalone-job.sh script
  - updated docker scripts


## Verifying this change

* extended 
org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPointTest to 
take parameters into account

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink FLINK-9866

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6344.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6344


commit 899b9eebe27cfff7348c6cde28428e19533ee2ed
Author: Dawid Wysakowicz 
Date:   2018-07-16T15:30:17Z

[FLINK-9866] Allow passing command line arguments to standalone job




---


[jira] [Created] (FLINK-9866) Allow passing program arguments to StandaloneJobCluster

2018-07-16 Thread Dawid Wysakowicz (JIRA)
Dawid Wysakowicz created FLINK-9866:
---

 Summary: Allow passing program arguments to StandaloneJobCluster
 Key: FLINK-9866
 URL: https://issues.apache.org/jira/browse/FLINK-9866
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.6.0
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz


Right now always an empty array is passed as arguments to 
{{StandaloneJobClusterEntryPoint}}. Should pass the parsed arguments. Also we 
should extend run and docker scripts to allow passing arguments.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9839) End-to-end test: Streaming job with SSL

2018-07-16 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9839.
---
   Resolution: Fixed
Fix Version/s: 1.5.2

master: e2e090b1a105f9bd20b6e6d0d354fefd5ab0fce9
1.5: 1f36a66ec5be338645e2f37e6a4bf7afd415702a

> End-to-end test: Streaming job with SSL
> ---
>
> Key: FLINK-9839
> URL: https://issues.apache.org/jira/browse/FLINK-9839
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
>
> None of the existing e2e tests run with an SSL configuration but there should 
> be such a test as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9842) Job submission fails via CLI with SSL enabled

2018-07-16 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9842?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9842.
---
Resolution: Fixed

master: 0a5aebb0149d3660e549446a3d46df34ef1fb4d2
1.5: 44e5cb6671e637a9ea61744aa6db46b92743b96a

> Job submission fails via CLI with SSL enabled
> -
>
> Key: FLINK-9842
> URL: https://issues.apache.org/jira/browse/FLINK-9842
> Project: Flink
>  Issue Type: Bug
>  Components: Client, Job-Submission
>Affects Versions: 1.5.1, 1.6.0
>Reporter: Nico Kruber
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: pull-request-available, regression
> Fix For: 1.5.2, 1.6.0
>
>
> There's a regression in Flink 1.5.1 which leads to the job submission via CLI 
> failing with SSL enabled (1.5.0 works). Tried with the {{WordCount}} example:
> Client log:
> {code}
> 2018-07-16 11:11:12,688 INFO  org.apache.flink.client.cli.CliFrontend 
>   - 
> 
> 2018-07-16 11:11:12,690 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  Starting Command Line Client (Version: 1.5.1, Rev:3488f8b, 
> Date:10.07.2018 @ 11:51:27 GMT)
> 2018-07-16 11:11:12,690 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  OS current user: nico
> 2018-07-16 11:11:12,690 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  Current Hadoop/Kerberos user: 
> 2018-07-16 11:11:12,690 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 
> 1.8/25.171-b11
> 2018-07-16 11:11:12,690 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  Maximum heap size: 3534 MiBytes
> 2018-07-16 11:11:12,691 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  JAVA_HOME: /usr/lib64/jvm/java
> 2018-07-16 11:11:12,691 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  No Hadoop Dependency available
> 2018-07-16 11:11:12,691 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  JVM Options:
> 2018-07-16 11:11:12,691 INFO  org.apache.flink.client.cli.CliFrontend 
>   - 
> -Dlog.file=/home/nico/Downloads/flink-1.5.1/log/flink-nico-client-nico-work.fritz.box.log
> 2018-07-16 11:11:12,691 INFO  org.apache.flink.client.cli.CliFrontend 
>   - 
> -Dlog4j.configuration=file:/home/nico/Downloads/flink-1.5.1/conf/log4j-cli.properties
> 2018-07-16 11:11:12,691 INFO  org.apache.flink.client.cli.CliFrontend 
>   - 
> -Dlogback.configurationFile=file:/home/nico/Downloads/flink-1.5.1/conf/logback.xml
> 2018-07-16 11:11:12,691 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  Program Arguments:
> 2018-07-16 11:11:12,691 INFO  org.apache.flink.client.cli.CliFrontend 
>   - run
> 2018-07-16 11:11:12,692 INFO  org.apache.flink.client.cli.CliFrontend 
>   - ./examples/streaming/WordCount.jar
> 2018-07-16 11:11:12,692 INFO  org.apache.flink.client.cli.CliFrontend 
>   - --input
> 2018-07-16 11:11:12,692 INFO  org.apache.flink.client.cli.CliFrontend 
>   - LICENSE
> 2018-07-16 11:11:12,692 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  Classpath: 
> /home/nico/Downloads/flink-1.5.1/lib/flink-python_2.11-1.5.1.jar:/home/nico/Downloads/flink-1.5.1/lib/log4j-1.2.17.jar:/home/nico/Downloads/flink-1.5.1/lib/slf4j-log4j12-1.7.7.jar:/home/nico/Downloads/flink-1.5.1/lib/flink-dist_2.11-1.5.1.jar:::
> 2018-07-16 11:11:12,692 INFO  org.apache.flink.client.cli.CliFrontend 
>   - 
> 
> 2018-07-16 11:11:12,698 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.rpc.address, localhost
> 2018-07-16 11:11:12,698 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.rpc.port, 6123
> 2018-07-16 11:11:12,698 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.heap.mb, 1024
> 2018-07-16 11:11:12,699 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: taskmanager.heap.mb, 1024
> 2018-07-16 11:11:12,699 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2018-07-16 11:11:12,699 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> 

[jira] [Closed] (FLINK-9380) Failing end-to-end tests should not clean up logs

2018-07-16 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9380.
---
Resolution: Fixed

master: 053614ff2fad2847ba7e7c801a706eb06bed
1.5: dfc8d15841a86ec7d3b9c3b0788043a77fd6179a

> Failing end-to-end tests should not clean up logs
> -
>
> Key: FLINK-9380
> URL: https://issues.apache.org/jira/browse/FLINK-9380
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Deepak Sharma
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.5.2, 1.6.0
>
>
> Some of the end-to-end tests clean up their logs also in the failure case. 
> This makes debugging and understanding the problem extremely difficult. 
> Ideally, the scripts says where it stored the respective logs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9860) Netty resource leak on receiver side

2018-07-16 Thread Nico Kruber (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545495#comment-16545495
 ] 

Nico Kruber edited comment on FLINK-9860 at 7/16/18 5:04 PM:
-

oh, with our shaded netty version, I actually need to set 
{{org.apache.flink.shaded.netty4.io.netty.leakDetection.level}}

Anyway, I got several instances with access information now (with level 
{{paranoid}} - see http://netty.io/wiki/reference-counted-objects.html):

{code}
2018-07-16 19:01:14,477 ERROR 
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector  - LEAK: 
ByteBuf.release() was not called before it's garbage-collected. See 
http://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
#1:
Hint: 'FileUploadHandler#0' will handle the message from this point.

org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:88)

org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:24)

org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116)

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)

org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)

org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)

org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)

org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)

org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)

org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)

org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)

org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)

org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)

org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)

org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)

org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)

org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:748)
Created at:

org.apache.flink.shaded.netty4.io.netty.buffer.SimpleLeakAwareByteBuf.unwrappedDerived(SimpleLeakAwareByteBuf.java:143)

org.apache.flink.shaded.netty4.io.netty.buffer.SimpleLeakAwareByteBuf.readRetainedSlice(SimpleLeakAwareByteBuf.java:67)

org.apache.flink.shaded.netty4.io.netty.buffer.AdvancedLeakAwareByteBuf.readRetainedSlice(AdvancedLeakAwareByteBuf.java:107)

org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:345)

org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec$HttpServerRequestDecoder.decode(HttpServerCodec.java:101)

org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)

org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)


[jira] [Commented] (FLINK-9860) Netty resource leak on receiver side

2018-07-16 Thread Nico Kruber (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545495#comment-16545495
 ] 

Nico Kruber commented on FLINK-9860:


oh, with our shaded netty version, I actually need to set 
{{org.apache.flink.shaded.netty4.io.netty.leakDetection.level}}...anyway, I got 
several instances with access information now:

{code}
2018-07-16 19:01:14,477 ERROR 
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector  - LEAK: 
ByteBuf.release() was not called before it's garbage-collected. See 
http://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
#1:
Hint: 'FileUploadHandler#0' will handle the message from this point.

org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:88)

org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpContent.touch(DefaultHttpContent.java:24)

org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116)

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)

org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)

org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)

org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)

org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)

org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)

org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)

org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)

org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)

org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)

org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)

org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)

org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)

org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:748)
Created at:

org.apache.flink.shaded.netty4.io.netty.buffer.SimpleLeakAwareByteBuf.unwrappedDerived(SimpleLeakAwareByteBuf.java:143)

org.apache.flink.shaded.netty4.io.netty.buffer.SimpleLeakAwareByteBuf.readRetainedSlice(SimpleLeakAwareByteBuf.java:67)

org.apache.flink.shaded.netty4.io.netty.buffer.AdvancedLeakAwareByteBuf.readRetainedSlice(AdvancedLeakAwareByteBuf.java:107)

org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:345)

org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec$HttpServerRequestDecoder.decode(HttpServerCodec.java:101)

org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)

org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)

org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)


[jira] [Commented] (FLINK-9762) CoreOptions.TMP_DIRS wrongly managed on Yarn

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545487#comment-16545487
 ] 

ASF GitHub Bot commented on FLINK-9762:
---

Github user JTaky commented on a diff in the pull request:

https://github.com/apache/flink/pull/6284#discussion_r202756519
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java ---
@@ -181,12 +181,17 @@
 * The config parameter defining the directories for temporary files, 
separated by
 * ",", "|", or the system's {@link java.io.File#pathSeparator}.
 */
-   @Documentation.OverrideDefault("System.getProperty(\"java.io.tmpdir\")")
+   @Documentation.OverrideDefault("'LOCAL_DIRS' on Yarn and 
'_FLINK_TMP_DIR' on Mesos.")
public static final ConfigOption TMP_DIRS =
key("io.tmp.dirs")
.defaultValue(System.getProperty("java.io.tmpdir"))
.withDeprecatedKeys("taskmanager.tmp.dirs");
 
+   /**
+* String key, which says if variable `java.io.tmpdir` has been 
overridden for the cluster.
+*/
+   public static final String TMP_DIRS_OVERRIDDEN = 
"io.tmp.dirs.overridden";
--- End diff --

done


> CoreOptions.TMP_DIRS wrongly managed on Yarn
> 
>
> Key: FLINK-9762
> URL: https://issues.apache.org/jira/browse/FLINK-9762
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Oleksandr Nitavskyi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
>
> The issue on Yarn is that it is impossible to have different LOCAL_DIRS on 
> JobManager and TaskManager, despite LOCAL_DIRS value depends on the container.
> The issue is that CoreOptions.TMP_DIRS is configured to the default value 
> during JobManager initialization and added to the configuration object. When 
> TaskManager is launched the appropriate configuration object is cloned with 
> LOCAL_DIRS which makes sense only for Job Manager container. When YARN 
> container with TaskManager from his point of view CoreOptions.TMP_DIRS is 
> always equal either to path in flink.yml or to the or to the LOCAL_DIRS of 
> Job Manager (default behaviour). Is TaskManager’s container do not have an 
> access to another folders, that folders allocated by YARN TaskManager cannot 
> be started.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9762) CoreOptions.TMP_DIRS wrongly managed on Yarn

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545489#comment-16545489
 ] 

ASF GitHub Bot commented on FLINK-9762:
---

Github user JTaky commented on a diff in the pull request:

https://github.com/apache/flink/pull/6284#discussion_r202756696
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -473,8 +474,13 @@ private ContainerLaunchContext 
createTaskExecutorLaunchContext(Resource resource
 
log.debug("TaskManager configuration: {}", flinkConfig);
 
+   Configuration taskManagerConfig = flinkConfig.clone();
+   if (!flinkConfig.containsKey(CoreOptions.TMP_DIRS_OVERRIDDEN)){
+   taskManagerConfig.setString(CoreOptions.TMP_DIRS, "");  
// HACK: emulate removal for the given key
+   }
--- End diff --

same question as before :( didn't get the idea


> CoreOptions.TMP_DIRS wrongly managed on Yarn
> 
>
> Key: FLINK-9762
> URL: https://issues.apache.org/jira/browse/FLINK-9762
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Oleksandr Nitavskyi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
>
> The issue on Yarn is that it is impossible to have different LOCAL_DIRS on 
> JobManager and TaskManager, despite LOCAL_DIRS value depends on the container.
> The issue is that CoreOptions.TMP_DIRS is configured to the default value 
> during JobManager initialization and added to the configuration object. When 
> TaskManager is launched the appropriate configuration object is cloned with 
> LOCAL_DIRS which makes sense only for Job Manager container. When YARN 
> container with TaskManager from his point of view CoreOptions.TMP_DIRS is 
> always equal either to path in flink.yml or to the or to the LOCAL_DIRS of 
> Job Manager (default behaviour). Is TaskManager’s container do not have an 
> access to another folders, that folders allocated by YARN TaskManager cannot 
> be started.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9762) CoreOptions.TMP_DIRS wrongly managed on Yarn

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545488#comment-16545488
 ] 

ASF GitHub Bot commented on FLINK-9762:
---

Github user JTaky commented on a diff in the pull request:

https://github.com/apache/flink/pull/6284#discussion_r202756536
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 ---
@@ -250,6 +250,10 @@ public static Configuration 
generateTaskManagerConfiguration(
cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
numSlots);
}
 
+   if (!baseConfig.containsKey(CoreOptions.TMP_DIRS_OVERRIDDEN)){
+   cfg.setString(CoreOptions.TMP_DIRS, "");// 
HACK: emulate removal for the given key
+   }
--- End diff --

Agree, magic values are the dirtiest way.
Will go with a 'clear' (probably remove, in order to mimic java collection 
API) method.

Didn't get the point with clone method.  Is it in order to factorize this 4 
lines or do you want to make it generic and use for all custom settings? In 
such case we should extract list of predicates per each configuration which 
looks quite complex as an API


> CoreOptions.TMP_DIRS wrongly managed on Yarn
> 
>
> Key: FLINK-9762
> URL: https://issues.apache.org/jira/browse/FLINK-9762
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Oleksandr Nitavskyi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
>
> The issue on Yarn is that it is impossible to have different LOCAL_DIRS on 
> JobManager and TaskManager, despite LOCAL_DIRS value depends on the container.
> The issue is that CoreOptions.TMP_DIRS is configured to the default value 
> during JobManager initialization and added to the configuration object. When 
> TaskManager is launched the appropriate configuration object is cloned with 
> LOCAL_DIRS which makes sense only for Job Manager container. When YARN 
> container with TaskManager from his point of view CoreOptions.TMP_DIRS is 
> always equal either to path in flink.yml or to the or to the LOCAL_DIRS of 
> Job Manager (default behaviour). Is TaskManager’s container do not have an 
> access to another folders, that folders allocated by YARN TaskManager cannot 
> be started.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

2018-07-16 Thread JTaky
Github user JTaky commented on a diff in the pull request:

https://github.com/apache/flink/pull/6284#discussion_r202756696
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -473,8 +474,13 @@ private ContainerLaunchContext 
createTaskExecutorLaunchContext(Resource resource
 
log.debug("TaskManager configuration: {}", flinkConfig);
 
+   Configuration taskManagerConfig = flinkConfig.clone();
+   if (!flinkConfig.containsKey(CoreOptions.TMP_DIRS_OVERRIDDEN)){
+   taskManagerConfig.setString(CoreOptions.TMP_DIRS, "");  
// HACK: emulate removal for the given key
+   }
--- End diff --

same question as before :( didn't get the idea


---


[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

2018-07-16 Thread JTaky
Github user JTaky commented on a diff in the pull request:

https://github.com/apache/flink/pull/6284#discussion_r202756519
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java ---
@@ -181,12 +181,17 @@
 * The config parameter defining the directories for temporary files, 
separated by
 * ",", "|", or the system's {@link java.io.File#pathSeparator}.
 */
-   @Documentation.OverrideDefault("System.getProperty(\"java.io.tmpdir\")")
+   @Documentation.OverrideDefault("'LOCAL_DIRS' on Yarn and 
'_FLINK_TMP_DIR' on Mesos.")
public static final ConfigOption TMP_DIRS =
key("io.tmp.dirs")
.defaultValue(System.getProperty("java.io.tmpdir"))
.withDeprecatedKeys("taskmanager.tmp.dirs");
 
+   /**
+* String key, which says if variable `java.io.tmpdir` has been 
overridden for the cluster.
+*/
+   public static final String TMP_DIRS_OVERRIDDEN = 
"io.tmp.dirs.overridden";
--- End diff --

done


---


[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

2018-07-16 Thread JTaky
Github user JTaky commented on a diff in the pull request:

https://github.com/apache/flink/pull/6284#discussion_r202756536
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 ---
@@ -250,6 +250,10 @@ public static Configuration 
generateTaskManagerConfiguration(
cfg.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
numSlots);
}
 
+   if (!baseConfig.containsKey(CoreOptions.TMP_DIRS_OVERRIDDEN)){
+   cfg.setString(CoreOptions.TMP_DIRS, "");// 
HACK: emulate removal for the given key
+   }
--- End diff --

Agree, magic values are the dirtiest way.
Will go with a 'clear' (probably remove, in order to mimic java collection 
API) method.

Didn't get the point with clone method.  Is it in order to factorize this 4 
lines or do you want to make it generic and use for all custom settings? In 
such case we should extract list of predicates per each configuration which 
looks quite complex as an API


---


[jira] [Commented] (FLINK-9762) CoreOptions.TMP_DIRS wrongly managed on Yarn

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545481#comment-16545481
 ] 

ASF GitHub Bot commented on FLINK-9762:
---

Github user JTaky commented on a diff in the pull request:

https://github.com/apache/flink/pull/6284#discussion_r202755913
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -473,8 +474,13 @@ private ContainerLaunchContext 
createTaskExecutorLaunchContext(Resource resource
 
log.debug("TaskManager configuration: {}", flinkConfig);
--- End diff --

done


> CoreOptions.TMP_DIRS wrongly managed on Yarn
> 
>
> Key: FLINK-9762
> URL: https://issues.apache.org/jira/browse/FLINK-9762
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Oleksandr Nitavskyi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
>
> The issue on Yarn is that it is impossible to have different LOCAL_DIRS on 
> JobManager and TaskManager, despite LOCAL_DIRS value depends on the container.
> The issue is that CoreOptions.TMP_DIRS is configured to the default value 
> during JobManager initialization and added to the configuration object. When 
> TaskManager is launched the appropriate configuration object is cloned with 
> LOCAL_DIRS which makes sense only for Job Manager container. When YARN 
> container with TaskManager from his point of view CoreOptions.TMP_DIRS is 
> always equal either to path in flink.yml or to the or to the LOCAL_DIRS of 
> Job Manager (default behaviour). Is TaskManager’s container do not have an 
> access to another folders, that folders allocated by YARN TaskManager cannot 
> be started.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9762) CoreOptions.TMP_DIRS wrongly managed on Yarn

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545480#comment-16545480
 ] 

ASF GitHub Bot commented on FLINK-9762:
---

Github user JTaky commented on a diff in the pull request:

https://github.com/apache/flink/pull/6284#discussion_r202755897
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -473,8 +474,13 @@ private ContainerLaunchContext 
createTaskExecutorLaunchContext(Resource resource
 
log.debug("TaskManager configuration: {}", flinkConfig);
 
+   Configuration taskManagerConfig = flinkConfig.clone();
+   if (!flinkConfig.containsKey(CoreOptions.TMP_DIRS_OVERRIDDEN)){
+   taskManagerConfig.setString(CoreOptions.TMP_DIRS, "");  
// HACK: emulate removal for the given key
+   }
+
ContainerLaunchContext taskExecutorLaunchContext = 
Utils.createTaskExecutorContext(
-   flinkConfig,
--- End diff --

done. Thank you very much!
Since you seem fine with this hacky approach I have tested and made stable 
the last PR on our Yarn cluster.


> CoreOptions.TMP_DIRS wrongly managed on Yarn
> 
>
> Key: FLINK-9762
> URL: https://issues.apache.org/jira/browse/FLINK-9762
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0
>Reporter: Oleksandr Nitavskyi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2, 1.6.0
>
>
> The issue on Yarn is that it is impossible to have different LOCAL_DIRS on 
> JobManager and TaskManager, despite LOCAL_DIRS value depends on the container.
> The issue is that CoreOptions.TMP_DIRS is configured to the default value 
> during JobManager initialization and added to the configuration object. When 
> TaskManager is launched the appropriate configuration object is cloned with 
> LOCAL_DIRS which makes sense only for Job Manager container. When YARN 
> container with TaskManager from his point of view CoreOptions.TMP_DIRS is 
> always equal either to path in flink.yml or to the or to the LOCAL_DIRS of 
> Job Manager (default behaviour). Is TaskManager’s container do not have an 
> access to another folders, that folders allocated by YARN TaskManager cannot 
> be started.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

2018-07-16 Thread JTaky
Github user JTaky commented on a diff in the pull request:

https://github.com/apache/flink/pull/6284#discussion_r202755913
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -473,8 +474,13 @@ private ContainerLaunchContext 
createTaskExecutorLaunchContext(Resource resource
 
log.debug("TaskManager configuration: {}", flinkConfig);
--- End diff --

done


---


[GitHub] flink pull request #6284: [FLINK-9762] CoreOptions.TMP_DIRS ('io.tmp.dirs') ...

2018-07-16 Thread JTaky
Github user JTaky commented on a diff in the pull request:

https://github.com/apache/flink/pull/6284#discussion_r202755897
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -473,8 +474,13 @@ private ContainerLaunchContext 
createTaskExecutorLaunchContext(Resource resource
 
log.debug("TaskManager configuration: {}", flinkConfig);
 
+   Configuration taskManagerConfig = flinkConfig.clone();
+   if (!flinkConfig.containsKey(CoreOptions.TMP_DIRS_OVERRIDDEN)){
+   taskManagerConfig.setString(CoreOptions.TMP_DIRS, "");  
// HACK: emulate removal for the given key
+   }
+
ContainerLaunchContext taskExecutorLaunchContext = 
Utils.createTaskExecutorContext(
-   flinkConfig,
--- End diff --

done. Thank you very much!
Since you seem fine with this hacky approach I have tested and made stable 
the last PR on our Yarn cluster.


---


[jira] [Comment Edited] (FLINK-9860) Netty resource leak on receiver side

2018-07-16 Thread Nico Kruber (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545457#comment-16545457
 ] 

Nico Kruber edited comment on FLINK-9860 at 7/16/18 4:46 PM:
-

The e2e test that was running when the leak occurred actually runs with 
parallelism 1 on 1 taskmanager. Therefore, it cannot be in the Flink-internal 
communication between TMs. Also, looking at the logs in more details, it is 
reported from the JM log anyway.

The only call that is being executed at this stage (around job submission) is 
{{flink list -r}}.

I adapted the test to loop around the job submission from one process while 
another one was constantly listing jobs as posted and while I was hitting 
"Perform GC" in an attached VisualVM session. I could reproduce this without 
any other settings but so far was not able to reproduce this with 
{{env.java.opts: -Dio.netty.leakDetection.level=paranoid}} which would give 
more details ... trying {{env.java.opts: 
-Dio.netty.leakDetection.level=advanced}} now, but maybe the timing is 
extremely rare already.
 


was (Author: nicok):
The e2e test that was running when the leak occurred actually runs with 
parallelism 1 on 1 taskmanager. Therefore, it cannot be in the Flink-internal 
communication between TMs. Also, looking at the logs in more details, it is 
reported from the JM log anyway.

The only call that is being executed at this stage (around job submission) is 
{{flink list -r}} but, unfortunately, I was not able to reproduce this without 
or with {{env.java.opts: -Dio.netty.leakDetection.level=paranoid}} which would 
give more details.
(I tried constantly submitting new {{WordCount}} jobs from one process while 
the other was constantly listing jobs as posted and while I was hitting 
"Perform GC" in an attached VisualVM session.)

> Netty resource leak on receiver side
> 
>
> Key: FLINK-9860
> URL: https://issues.apache.org/jira/browse/FLINK-9860
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.6.0
>
>
> The Hadoop-free Wordcount end-to-end test fails with the following exception:
> {code}
> ERROR org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector  - 
> LEAK: ByteBuf.release() was not called before it's garbage-collected. See 
> http://netty.io/wiki/reference-counted-objects.html for more information.
> Recent access records: 
> Created at:
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:331)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:137)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:147)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>   
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
>   
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> {code}
> We might have a resource leak on the receiving side of our network stack.
> https://api.travis-ci.org/v3/job/404225956/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9860) Netty resource leak on receiver side

2018-07-16 Thread Nico Kruber (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545457#comment-16545457
 ] 

Nico Kruber edited comment on FLINK-9860 at 7/16/18 4:38 PM:
-

The e2e test that was running when the leak occurred actually runs with 
parallelism 1 on 1 taskmanager. Therefore, it cannot be in the Flink-internal 
communication between TMs. Also, looking at the logs in more details, it is 
reported from the JM log anyway.

The only call that is being executed at this stage (around job submission) is 
{{flink list -r}} but, unfortunately, I was not able to reproduce this without 
or with {{env.java.opts: -Dio.netty.leakDetection.level=paranoid}} which would 
give more details.
(I tried constantly submitting new {{WordCount}} jobs from one process while 
the other was constantly listing jobs as posted and while I was hitting 
"Perform GC" in an attached VisualVM session.)


was (Author: nicok):
The e2e test that was running when the leak occurred actually runs with 
parallelism 1 on 1 taskmanager. Therefore, it cannot be in the Flink-internal 
communication between TMs. Also, looking at the logs in more details, it is 
reported from the JM log anyway.

The only call that is being executed at this stage (around job submission) is 
{{flink list -r}} but, unfortunately, I was not able to reproduce this without 
or with {{env.java.opts: -Dio.netty.leakDetection.level=paranoid}} which would 
give more details.
(I tried constantly submitting new {{WordCount}} jobs from one process while 
the other was constantly listing jobs as posted.)

> Netty resource leak on receiver side
> 
>
> Key: FLINK-9860
> URL: https://issues.apache.org/jira/browse/FLINK-9860
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.6.0
>
>
> The Hadoop-free Wordcount end-to-end test fails with the following exception:
> {code}
> ERROR org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector  - 
> LEAK: ByteBuf.release() was not called before it's garbage-collected. See 
> http://netty.io/wiki/reference-counted-objects.html for more information.
> Recent access records: 
> Created at:
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:331)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:137)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:147)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>   
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
>   
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> {code}
> We might have a resource leak on the receiving side of our network stack.
> https://api.travis-ci.org/v3/job/404225956/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9860) Netty resource leak on receiver side

2018-07-16 Thread Nico Kruber (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545457#comment-16545457
 ] 

Nico Kruber edited comment on FLINK-9860 at 7/16/18 4:37 PM:
-

The e2e test that was running when the leak occurred actually runs with 
parallelism 1 on 1 taskmanager. Therefore, it cannot be in the Flink-internal 
communication between TMs. Also, looking at the logs in more details, it is 
reported from the JM log anyway.

The only call that is being executed at this stage (around job submission) is 
{{flink list -r}} but, unfortunately, I was not able to reproduce this without 
or with {{env.java.opts: -Dio.netty.leakDetection.level=paranoid}} which would 
give more details.
(I tried constantly submitting new {{WordCount}} jobs from one process while 
the other was constantly listing jobs as posted.)


was (Author: nicok):
The e2e test that was running when the leak occurred actually runs with 
parallelism 1 on 1 taskmanager. Therefore, it cannot be in the Flink-internal 
communication between TMs. Also, looking at the logs in more details, it is 
reported from the JM log anyway.

The only call that is being executed at this stage (around job submission) is 
{{flink list -r}} but, unfortunately, I was not able to reproduce this without 
or with {{env.java.opts: -Dio.netty.leakDetection.level=paranoid}} which would 
give more details.

> Netty resource leak on receiver side
> 
>
> Key: FLINK-9860
> URL: https://issues.apache.org/jira/browse/FLINK-9860
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.6.0
>
>
> The Hadoop-free Wordcount end-to-end test fails with the following exception:
> {code}
> ERROR org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector  - 
> LEAK: ByteBuf.release() was not called before it's garbage-collected. See 
> http://netty.io/wiki/reference-counted-objects.html for more information.
> Recent access records: 
> Created at:
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:331)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:137)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:147)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>   
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
>   
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> {code}
> We might have a resource leak on the receiving side of our network stack.
> https://api.travis-ci.org/v3/job/404225956/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9860) Netty resource leak on receiver side

2018-07-16 Thread Nico Kruber (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545457#comment-16545457
 ] 

Nico Kruber commented on FLINK-9860:


The e2e test that was running when the leak occurred actually runs with 
parallelism 1 on 1 taskmanager. Therefore, it cannot be in the Flink-internal 
communication between TMs. Also, looking at the logs in more details, it is 
reported from the JM log anyway.

The only call that is being executed at this stage (around job submission) is 
{{flink list -r}} but, unfortunately, I was not able to reproduce this with or 
without {{env.java.opts: -Dio.netty.leakDetection.level=paranoid}} which would 
give more details.

> Netty resource leak on receiver side
> 
>
> Key: FLINK-9860
> URL: https://issues.apache.org/jira/browse/FLINK-9860
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.6.0
>
>
> The Hadoop-free Wordcount end-to-end test fails with the following exception:
> {code}
> ERROR org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector  - 
> LEAK: ByteBuf.release() was not called before it's garbage-collected. See 
> http://netty.io/wiki/reference-counted-objects.html for more information.
> Recent access records: 
> Created at:
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:331)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:137)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:147)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>   
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
>   
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> {code}
> We might have a resource leak on the receiving side of our network stack.
> https://api.travis-ci.org/v3/job/404225956/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9860) Netty resource leak on receiver side

2018-07-16 Thread Nico Kruber (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545457#comment-16545457
 ] 

Nico Kruber edited comment on FLINK-9860 at 7/16/18 4:36 PM:
-

The e2e test that was running when the leak occurred actually runs with 
parallelism 1 on 1 taskmanager. Therefore, it cannot be in the Flink-internal 
communication between TMs. Also, looking at the logs in more details, it is 
reported from the JM log anyway.

The only call that is being executed at this stage (around job submission) is 
{{flink list -r}} but, unfortunately, I was not able to reproduce this without 
or with {{env.java.opts: -Dio.netty.leakDetection.level=paranoid}} which would 
give more details.


was (Author: nicok):
The e2e test that was running when the leak occurred actually runs with 
parallelism 1 on 1 taskmanager. Therefore, it cannot be in the Flink-internal 
communication between TMs. Also, looking at the logs in more details, it is 
reported from the JM log anyway.

The only call that is being executed at this stage (around job submission) is 
{{flink list -r}} but, unfortunately, I was not able to reproduce this with or 
without {{env.java.opts: -Dio.netty.leakDetection.level=paranoid}} which would 
give more details.

> Netty resource leak on receiver side
> 
>
> Key: FLINK-9860
> URL: https://issues.apache.org/jira/browse/FLINK-9860
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.6.0
>
>
> The Hadoop-free Wordcount end-to-end test fails with the following exception:
> {code}
> ERROR org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector  - 
> LEAK: ByteBuf.release() was not called before it's garbage-collected. See 
> http://netty.io/wiki/reference-counted-objects.html for more information.
> Recent access records: 
> Created at:
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:331)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:185)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:176)
>   
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:137)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:147)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>   
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>   
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
>   
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> {code}
> We might have a resource leak on the receiving side of our network stack.
> https://api.travis-ci.org/v3/job/404225956/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545413#comment-16545413
 ] 

ASF GitHub Bot commented on FLINK-9852:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/6343

[FLINK-9852] [table] Expose descriptor-based sink creation

## What is the purpose of the change

This commit exposes the new unified sink creation through the table 
environments and the external catalog table. It introduce a new `update-mode` 
property in order to distinguish between append, retract, and upsert table 
sources and sinks. This commit refactors the top-level API classes a last time 
and adds more documentation. This commit completes the unified table 
sources/sinks story from an API point of view.

## Brief change log

- Introduction of `TableEnvironment.connect()` and corresponding API 
builder classes
- Introduction of property `update-mode: table` and update of existing 
connectors
- External catalog support with proper source/sink discovery and API

## Verifying this change

Existing tests were adapted.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not documented


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-9852

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6343.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6343






> Expose descriptor-based sink creation in table environments
> ---
>
> Key: FLINK-9852
> URL: https://issues.apache.org/jira/browse/FLINK-9852
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Currently, only a table source can be created using the unified table 
> descriptors with {{tableEnv.from(...)}}. A similar approach should be 
> supported for defining sinks or even both types at the same time.
> I suggest the following syntax:
> {code}
> tableEnv.connect(Kafka(...)).registerSource("name")
> tableEnv.connect(Kafka(...)).registerSink("name")
> tableEnv.connect(Kafka(...)).registerSourceAndSink("name")
> {code}
> A table could then access the registered source/sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6343: [FLINK-9852] [table] Expose descriptor-based sink ...

2018-07-16 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/6343

[FLINK-9852] [table] Expose descriptor-based sink creation

## What is the purpose of the change

This commit exposes the new unified sink creation through the table 
environments and the external catalog table. It introduce a new `update-mode` 
property in order to distinguish between append, retract, and upsert table 
sources and sinks. This commit refactors the top-level API classes a last time 
and adds more documentation. This commit completes the unified table 
sources/sinks story from an API point of view.

## Brief change log

- Introduction of `TableEnvironment.connect()` and corresponding API 
builder classes
- Introduction of property `update-mode: table` and update of existing 
connectors
- External catalog support with proper source/sink discovery and API

## Verifying this change

Existing tests were adapted.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not documented


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-9852

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6343.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6343






---


[jira] [Updated] (FLINK-9852) Expose descriptor-based sink creation in table environments

2018-07-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9852:
--
Labels: pull-request-available  (was: )

> Expose descriptor-based sink creation in table environments
> ---
>
> Key: FLINK-9852
> URL: https://issues.apache.org/jira/browse/FLINK-9852
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Currently, only a table source can be created using the unified table 
> descriptors with {{tableEnv.from(...)}}. A similar approach should be 
> supported for defining sinks or even both types at the same time.
> I suggest the following syntax:
> {code}
> tableEnv.connect(Kafka(...)).registerSource("name")
> tableEnv.connect(Kafka(...)).registerSink("name")
> tableEnv.connect(Kafka(...)).registerSourceAndSink("name")
> {code}
> A table could then access the registered source/sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9863) Add a built-in ingestion time TS extractor

2018-07-16 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9863:
---

Assignee: vinoyang

> Add a built-in ingestion time TS extractor
> --
>
> Key: FLINK-9863
> URL: https://issues.apache.org/jira/browse/FLINK-9863
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.6.0
>Reporter: Timo Walther
>Assignee: vinoyang
>Priority: Major
>
> There are cases where ingestion time is also useful in the Table & SQL API. 
> As an example see FLINK-9857 and the linked mailing list discussion there. We 
> should provide an ingestion time timestamps extractor in 
> {{org.apache.flink.table.sources.tsextractors}}.
> The following classes should be updated as welll:
> - org.apache.flink.table.descriptors.Rowtime
> - org.apache.flink.table.descriptors.RowtimeValidator
> - org.apache.flink.table.descriptors.SchemaValidator#deriveRowtimeAttributes



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9598) [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when there's a checkpoint failure

2018-07-16 Thread Yun Tang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang reassigned FLINK-9598:
---

Assignee: Yun Tang

> [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when 
> there's a checkpoint failure
> -
>
> Key: FLINK-9598
> URL: https://issues.apache.org/jira/browse/FLINK-9598
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.2
>Reporter: Prem Santosh
>Assignee: Yun Tang
>Priority: Major
> Attachments: Screen Shot 2018-06-20 at 7.44.10 AM.png
>
>
> We have set the config Minimum Pause Between Checkpoints to be 10 min but 
> noticed that when a checkpoint fails (because it timesout before it 
> completes) the application immediately starts taking the next checkpoint. 
> This basically stalls the application's progress since its always taking 
> checkpoints.
> [^Screen Shot 2018-06-20 at 7.44.10 AM.png] is a screenshot of this issue.
> Details:
>  * Running Flink-1.3.2 on EMR
>  * checkpoint timeout duration: 40 min
>  * minimum pause between checkpoints: 10 min
> There is also a [relevant 
> thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html]
>  that I found on the Flink users group.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9682) Add setDescription to execution environment and provide description field for the rest api

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545358#comment-16545358
 ] 

ASF GitHub Bot commented on FLINK-9682:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6266
  
@zentol can you review this PR? so that I can start the part 2 of the task 
as soon as possible. thanks.


> Add setDescription to execution environment and provide description field for 
> the rest api
> --
>
> Key: FLINK-9682
> URL: https://issues.apache.org/jira/browse/FLINK-9682
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently you can provide a job name to {{execute}} in the execution 
> environment.  In an environment where many version of a job may be executing, 
> such as a development or test environment, identifying which running job is 
> of a specific version via the UI can be difficult unless the version is 
> embedded into the job name given the {{execute}}.  But the job name is uses 
> for other purposes, such as for namespacing metrics.  Thus, it is not ideal 
> to modify the job name, as that could require modifying metric dashboards and 
> monitors each time versions change.
> I propose a new method be added to the execution environment, 
> {{setDescription}}, that would allow a user to pass in an arbitrary 
> description that would be displayed in the dashboard, allowing users to 
> distinguish jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6266: [FLINK-9682] Add setDescription to execution environment ...

2018-07-16 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6266
  
@zentol can you review this PR? so that I can start the part 2 of the task 
as soon as possible. thanks.


---


[jira] [Commented] (FLINK-9598) [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when there's a checkpoint failure

2018-07-16 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545352#comment-16545352
 ] 

vinoyang commented on FLINK-9598:
-

[~yunta] I agree with you. I will release the assignee, please feel free to 
assign to yourself and redefine the doc if you want.

> [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when 
> there's a checkpoint failure
> -
>
> Key: FLINK-9598
> URL: https://issues.apache.org/jira/browse/FLINK-9598
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.2
>Reporter: Prem Santosh
>Assignee: vinoyang
>Priority: Major
> Attachments: Screen Shot 2018-06-20 at 7.44.10 AM.png
>
>
> We have set the config Minimum Pause Between Checkpoints to be 10 min but 
> noticed that when a checkpoint fails (because it timesout before it 
> completes) the application immediately starts taking the next checkpoint. 
> This basically stalls the application's progress since its always taking 
> checkpoints.
> [^Screen Shot 2018-06-20 at 7.44.10 AM.png] is a screenshot of this issue.
> Details:
>  * Running Flink-1.3.2 on EMR
>  * checkpoint timeout duration: 40 min
>  * minimum pause between checkpoints: 10 min
> There is also a [relevant 
> thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html]
>  that I found on the Flink users group.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9598) [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when there's a checkpoint failure

2018-07-16 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9598:
---

Assignee: (was: vinoyang)

> [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when 
> there's a checkpoint failure
> -
>
> Key: FLINK-9598
> URL: https://issues.apache.org/jira/browse/FLINK-9598
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.2
>Reporter: Prem Santosh
>Priority: Major
> Attachments: Screen Shot 2018-06-20 at 7.44.10 AM.png
>
>
> We have set the config Minimum Pause Between Checkpoints to be 10 min but 
> noticed that when a checkpoint fails (because it timesout before it 
> completes) the application immediately starts taking the next checkpoint. 
> This basically stalls the application's progress since its always taking 
> checkpoints.
> [^Screen Shot 2018-06-20 at 7.44.10 AM.png] is a screenshot of this issue.
> Details:
>  * Running Flink-1.3.2 on EMR
>  * checkpoint timeout duration: 40 min
>  * minimum pause between checkpoints: 10 min
> There is also a [relevant 
> thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html]
>  that I found on the Flink users group.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9013) Document yarn.containers.vcores only being effective when adapting YARN config

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545324#comment-16545324
 ] 

ASF GitHub Bot commented on FLINK-9013:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/6294#discussion_r202719580
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
 ---
@@ -65,7 +65,11 @@
key("yarn.containers.vcores")
.defaultValue(-1)
.withDescription("The number of virtual cores (vcores) per YARN 
container. By default, the number of vcores" +
-   " is set to the number of slots per TaskManager, if 
set, or to 1, otherwise.");
+   " is set to the number of slots per TaskManager, if 
set, or to 1, otherwise. In order for this parameter " +
+   "to be used your cluster must have CPU scheduling 
enabled. You can do this e.g. by setting the " +
+   
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler or 
enabling " +
+   
"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator for " +
+   
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler");
--- End diff --

Have you actually tried - back when I created this issue, simply changing 
to `DominantResourceCalculator` did not lead to the desired effect.


> Document yarn.containers.vcores only being effective when adapting YARN config
> --
>
> Key: FLINK-9013
> URL: https://issues.apache.org/jira/browse/FLINK-9013
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> Even after specifying {{yarn.containers.vcores}} and having Flink request 
> such a container from YARN, it may not take these into account at all and 
> return a container with 1 vcore.
> The YARN configuration needs to be adapted to take the vcores into account, 
> e.g. by setting the {{FairScheduler}} in {{yarn-site.xml}}:
> {code}
> 
>   yarn.resourcemanager.scheduler.class
>   
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler
> 
> {code}
> This fact should be documented at least at the configuration parameter 
> documentation of  {{yarn.containers.vcores}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9013) Document yarn.containers.vcores only being effective when adapting YARN config

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545325#comment-16545325
 ] 

ASF GitHub Bot commented on FLINK-9013:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/6294#discussion_r202722162
  
--- Diff: docs/ops/deployment/yarn_setup.md ---
@@ -132,7 +132,7 @@ Stop the YARN session by stopping the unix process 
(using CTRL+C) or by entering
 
 Flink on YARN will only start all requested containers if enough resources 
are available on the cluster. Most YARN schedulers account for the requested 
memory of the containers,
 some account also for the number of vcores. By default, the number of 
vcores is equal to the processing slots (`-s`) argument. The 
`yarn.containers.vcores` allows overwriting the
-number of vcores with a custom value.
+number of vcores with a custom value. In order for this parameter to work 
you should enable CPU scheduling in your cluster, see more [here]({{ 
site.baseurl }}/ops/config.html#yarn-containers-vcores)
--- End diff --

You should, however, never link on "here" - maybe put the link on "enable 
CPU scheduling" instead?


> Document yarn.containers.vcores only being effective when adapting YARN config
> --
>
> Key: FLINK-9013
> URL: https://issues.apache.org/jira/browse/FLINK-9013
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> Even after specifying {{yarn.containers.vcores}} and having Flink request 
> such a container from YARN, it may not take these into account at all and 
> return a container with 1 vcore.
> The YARN configuration needs to be adapted to take the vcores into account, 
> e.g. by setting the {{FairScheduler}} in {{yarn-site.xml}}:
> {code}
> 
>   yarn.resourcemanager.scheduler.class
>   
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler
> 
> {code}
> This fact should be documented at least at the configuration parameter 
> documentation of  {{yarn.containers.vcores}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9013) Document yarn.containers.vcores only being effective when adapting YARN config

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545323#comment-16545323
 ] 

ASF GitHub Bot commented on FLINK-9013:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/6294#discussion_r202720821
  
--- Diff: docs/ops/deployment/yarn_setup.md ---
@@ -132,7 +132,7 @@ Stop the YARN session by stopping the unix process 
(using CTRL+C) or by entering
 
 Flink on YARN will only start all requested containers if enough resources 
are available on the cluster. Most YARN schedulers account for the requested 
memory of the containers,
 some account also for the number of vcores. By default, the number of 
vcores is equal to the processing slots (`-s`) argument. The 
`yarn.containers.vcores` allows overwriting the
--- End diff --

I agree with @zentol - a link here (in addition) would be better to simply 
click your way through the docs


> Document yarn.containers.vcores only being effective when adapting YARN config
> --
>
> Key: FLINK-9013
> URL: https://issues.apache.org/jira/browse/FLINK-9013
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> Even after specifying {{yarn.containers.vcores}} and having Flink request 
> such a container from YARN, it may not take these into account at all and 
> return a container with 1 vcore.
> The YARN configuration needs to be adapted to take the vcores into account, 
> e.g. by setting the {{FairScheduler}} in {{yarn-site.xml}}:
> {code}
> 
>   yarn.resourcemanager.scheduler.class
>   
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler
> 
> {code}
> This fact should be documented at least at the configuration parameter 
> documentation of  {{yarn.containers.vcores}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores...

2018-07-16 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/6294#discussion_r202720265
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
 ---
@@ -65,7 +65,11 @@
key("yarn.containers.vcores")
.defaultValue(-1)
.withDescription("The number of virtual cores (vcores) per YARN 
container. By default, the number of vcores" +
-   " is set to the number of slots per TaskManager, if 
set, or to 1, otherwise.");
+   " is set to the number of slots per TaskManager, if 
set, or to 1, otherwise. In order for this parameter " +
+   "to be used your cluster must have CPU scheduling 
enabled. You can do this e.g. by setting the " +
+   
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler or 
enabling " +
+   
"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator for " +
+   
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler");
--- End diff --

Also, please put the configuration parameters into some code environment, 
e.g. 
`org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler`,
 for the config.html page.


---


[jira] [Commented] (FLINK-9013) Document yarn.containers.vcores only being effective when adapting YARN config

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545326#comment-16545326
 ] 

ASF GitHub Bot commented on FLINK-9013:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/6294#discussion_r202720265
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
 ---
@@ -65,7 +65,11 @@
key("yarn.containers.vcores")
.defaultValue(-1)
.withDescription("The number of virtual cores (vcores) per YARN 
container. By default, the number of vcores" +
-   " is set to the number of slots per TaskManager, if 
set, or to 1, otherwise.");
+   " is set to the number of slots per TaskManager, if 
set, or to 1, otherwise. In order for this parameter " +
+   "to be used your cluster must have CPU scheduling 
enabled. You can do this e.g. by setting the " +
+   
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler or 
enabling " +
+   
"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator for " +
+   
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler");
--- End diff --

Also, please put the configuration parameters into some code environment, 
e.g. 
`org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler`,
 for the config.html page.


> Document yarn.containers.vcores only being effective when adapting YARN config
> --
>
> Key: FLINK-9013
> URL: https://issues.apache.org/jira/browse/FLINK-9013
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.2
>
>
> Even after specifying {{yarn.containers.vcores}} and having Flink request 
> such a container from YARN, it may not take these into account at all and 
> return a container with 1 vcore.
> The YARN configuration needs to be adapted to take the vcores into account, 
> e.g. by setting the {{FairScheduler}} in {{yarn-site.xml}}:
> {code}
> 
>   yarn.resourcemanager.scheduler.class
>   
> org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler
> 
> {code}
> This fact should be documented at least at the configuration parameter 
> documentation of  {{yarn.containers.vcores}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores...

2018-07-16 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/6294#discussion_r202720821
  
--- Diff: docs/ops/deployment/yarn_setup.md ---
@@ -132,7 +132,7 @@ Stop the YARN session by stopping the unix process 
(using CTRL+C) or by entering
 
 Flink on YARN will only start all requested containers if enough resources 
are available on the cluster. Most YARN schedulers account for the requested 
memory of the containers,
 some account also for the number of vcores. By default, the number of 
vcores is equal to the processing slots (`-s`) argument. The 
`yarn.containers.vcores` allows overwriting the
--- End diff --

I agree with @zentol - a link here (in addition) would be better to simply 
click your way through the docs


---


[GitHub] flink pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores...

2018-07-16 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/6294#discussion_r202719580
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
 ---
@@ -65,7 +65,11 @@
key("yarn.containers.vcores")
.defaultValue(-1)
.withDescription("The number of virtual cores (vcores) per YARN 
container. By default, the number of vcores" +
-   " is set to the number of slots per TaskManager, if 
set, or to 1, otherwise.");
+   " is set to the number of slots per TaskManager, if 
set, or to 1, otherwise. In order for this parameter " +
+   "to be used your cluster must have CPU scheduling 
enabled. You can do this e.g. by setting the " +
+   
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler or 
enabling " +
+   
"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator for " +
+   
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler");
--- End diff --

Have you actually tried - back when I created this issue, simply changing 
to `DominantResourceCalculator` did not lead to the desired effect.


---


[GitHub] flink pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores...

2018-07-16 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/6294#discussion_r202722162
  
--- Diff: docs/ops/deployment/yarn_setup.md ---
@@ -132,7 +132,7 @@ Stop the YARN session by stopping the unix process 
(using CTRL+C) or by entering
 
 Flink on YARN will only start all requested containers if enough resources 
are available on the cluster. Most YARN schedulers account for the requested 
memory of the containers,
 some account also for the number of vcores. By default, the number of 
vcores is equal to the processing slots (`-s`) argument. The 
`yarn.containers.vcores` allows overwriting the
-number of vcores with a custom value.
+number of vcores with a custom value. In order for this parameter to work 
you should enable CPU scheduling in your cluster, see more [here]({{ 
site.baseurl }}/ops/config.html#yarn-containers-vcores)
--- End diff --

You should, however, never link on "here" - maybe put the link on "enable 
CPU scheduling" instead?


---


[jira] [Commented] (FLINK-9598) [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when there's a checkpoint failure

2018-07-16 Thread Yun Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545318#comment-16545318
 ] 

Yun Tang commented on FLINK-9598:
-

Hi [~premsantosh], the "Minimum Pause Between Checkpoints" is actually the 
initial delay between successful checkpoints, you can find the logical in 
CheckpointCoordinator#_triggerCheckpoint_() method, in which after 
expired-checkpoint cleaner detects some checkpoint expired, it will trigger 
another checkpoint ASAP through CheckpointCoordinator#_triggerQueuedRequests_() 
method, no matter Flink-1.3.2 or latest Flink-1.5.1

I think a user usually wants to get a successful checkpoint as quickly as 
possible again, and the running checkpoint would not stall your application 
running in general as the sub-tasks only start snapshot when checkpoint barrier 
comes, not all sub-tasks are executing snapshot process.

In my point of view, it would be better to redefine some of the javadocs e.g. 
attribute _minPauseBetweenCheckpointsNanos_ in CheckpointCoordinator. What's 
your opinion [~yanghua], if you don't have time to do these trivial works, I'd 
like to take some time to redefine all related javadocs.

> [Checkpoints] The config Minimum Pause Between Checkpoints doesn't work when 
> there's a checkpoint failure
> -
>
> Key: FLINK-9598
> URL: https://issues.apache.org/jira/browse/FLINK-9598
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.2
>Reporter: Prem Santosh
>Assignee: vinoyang
>Priority: Major
> Attachments: Screen Shot 2018-06-20 at 7.44.10 AM.png
>
>
> We have set the config Minimum Pause Between Checkpoints to be 10 min but 
> noticed that when a checkpoint fails (because it timesout before it 
> completes) the application immediately starts taking the next checkpoint. 
> This basically stalls the application's progress since its always taking 
> checkpoints.
> [^Screen Shot 2018-06-20 at 7.44.10 AM.png] is a screenshot of this issue.
> Details:
>  * Running Flink-1.3.2 on EMR
>  * checkpoint timeout duration: 40 min
>  * minimum pause between checkpoints: 10 min
> There is also a [relevant 
> thread|http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Having-a-backoff-while-experiencing-checkpointing-failures-td20618.html]
>  that I found on the Flink users group.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9865) flink-hadoop-compatibility should assume Hadoop as provided

2018-07-16 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-9865:
---

 Summary: flink-hadoop-compatibility should assume Hadoop as 
provided
 Key: FLINK-9865
 URL: https://issues.apache.org/jira/browse/FLINK-9865
 Project: Flink
  Issue Type: Bug
  Components: Build System
Affects Versions: 1.5.1, 1.5.0
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.6.0


The {{flink-hadoop-compatibility}} project as a *compile* scope dependency on 
Hadoop ({{flink-hadoop-shaded}}). Because of that, the hadoop dependencies are 
pulled into the user application.

Like in other Hadoop-dependent modules, we should assume that Hadoop is 
provided in the framework classpath already.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9813) Build xTableSource from Avro schemas

2018-07-16 Thread JIRA


[ 
https://issues.apache.org/jira/browse/FLINK-9813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545255#comment-16545255
 ] 

François Lacombe edited comment on FLINK-9813 at 7/16/18 3:12 PM:
--

Hi Fabian,

I am suggesting to build a CsvTableSource (or any other TableSource) from Avro 
schemas (not Avro data format).
 The point isn't to support more input format, but to describe all structures 
with one common "language".

Avro schema ([https://avro.apache.org/docs/1.8.1/spec.html#schemas)] define the 
structure of each record, just like CsvTableSource Builder do when you call 
.field() method.
 The schema doesn't specify csv separator or row separator for instance, but 
only expected columns and their type. That's why it's only about structure but 
not whole format.

Avro schemas are highly versatile and get more and more supported. It's really 
convenient to write such a structure descriptor whatever the file format.

I think Flink can get strong benefit to add support of such schemas by building 
its sources from it.

 

Here is what I'm currently doing, which may be a bit awkward

{{import org.apache.avro.Schema;}}
 {{import org.apache.flink.table.api.Types;}}
 {{import org.apache.flink.table.sources.CsvTableSource}}

{{public static CsvTableSource getFromSchema(String path, Schema sch) {}}
 {{        HashMap> primitiveTypes = new 
HashMap>();}}
 {{    primitiveTypes.put(Schema.Type.BOOLEAN, Types.BOOLEAN());}}
 {{    primitiveTypes.put(Schema.Type.INT, Types.INT());}}
 {{    primitiveTypes.put(Schema.Type.LONG, Types.LONG());}}
 {{    primitiveTypes.put(Schema.Type.FLOAT, Types.FLOAT());}}
 {{    primitiveTypes.put(Schema.Type.DOUBLE, Types.DOUBLE());}}
 {{    primitiveTypes.put(Schema.Type.BYTES, Types.BYTE());}}
 {{    primitiveTypes.put(Schema.Type.STRING, Types.STRING());}}
 {{    }}
 {{        Builder src_builder = CsvTableSource.builder().path(path);}}
 {{        }}
 {{        for (Schema field_nfo : sch.getTypes()){}}
 {{            src_builder.field(field_nfo.getName(), 
primitiveTypes.get(field_nfo.getType()));}}
 {{        }}}
 {{        }}
 {{    return src_builder.build();}}
 {{}}}

 

With FLINK-9814, I would be able to build sources from a schema description and 
get Exception when the input file doesn't match the format specification

 

All the best


was (Author: flacombe):
Hi Fabien,

I am suggesting to build a CsvTableSource (or any other TableSource) from Avro 
schemas (not Avro data format).
The point isn't to support more input format, but to describe all structures 
with one common "language".

Avro schema ([https://avro.apache.org/docs/1.8.1/spec.html#schemas)] define the 
structure of each record, just like CsvTableSource Builder do when you call 
.field() method.
The schema doesn't specify csv separator or row separator for instance, but 
only expected columns and their type. That's why it's only about structure but 
not whole format.

Avro schemas are highly versatile and get more and more supported. It's really 
convenient to write such a structure descriptor whatever the file format.

I think Flink can get strong benefit to add support of such schemas by building 
its sources from it.

 

Here is what I'm currently doing, which may be a bit awkward

{{import org.apache.avro.Schema;}}
{{import org.apache.flink.table.api.Types;}}
{{import org.apache.flink.table.sources.CsvTableSource}}


{{public static CsvTableSource getFromSchema(String path, Schema sch) {}}
{{        HashMap> primitiveTypes = new 
HashMap>();}}
{{    primitiveTypes.put(Schema.Type.BOOLEAN, Types.BOOLEAN());}}
{{    primitiveTypes.put(Schema.Type.INT, Types.INT());}}
{{    primitiveTypes.put(Schema.Type.LONG, Types.LONG());}}
{{    primitiveTypes.put(Schema.Type.FLOAT, Types.FLOAT());}}
{{    primitiveTypes.put(Schema.Type.DOUBLE, Types.DOUBLE());}}
{{    primitiveTypes.put(Schema.Type.BYTES, Types.BYTE());}}
{{    primitiveTypes.put(Schema.Type.STRING, Types.STRING());}}
{{    }}
{{        Builder src_builder = CsvTableSource.builder().path(path);}}
{{        }}
{{        for (Schema field_nfo : sch.getTypes()){}}
{{            src_builder.field(field_nfo.getName(), 
primitiveTypes.get(field_nfo.getType()));}}
{{        }}}
{{        }}
{{    return src_builder.build();}}
{{}}}

 

With FLINK-9814, I would be able to build sources from a schema description and 
get Exception when the input file doesn't match the format specification

 

All the best

> Build xTableSource from Avro schemas
> 
>
> Key: FLINK-9813
> URL: https://issues.apache.org/jira/browse/FLINK-9813
> Project: Flink
>  Issue Type: Wish
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: François Lacombe
>Priority: Trivial
>   Original 

[GitHub] flink issue #6337: [FLINK-9853][Tabel API & SQL] add HEX support

2018-07-16 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6337
  
+1, from my side


---


[jira] [Comment Edited] (FLINK-9814) CsvTableSource "lack of column" warning

2018-07-16 Thread JIRA


[ 
https://issues.apache.org/jira/browse/FLINK-9814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545270#comment-16545270
 ] 

François Lacombe edited comment on FLINK-9814 at 7/16/18 3:11 PM:
--

Fabian,

My comments on each point below :

1) No, because at the main() method run, the input files may be not known 
(especially with streaming processes)

2) Maybe, depending on what IO overhead it implies according to what you say

3) Yes it can work. For CSV files and other flat formats, header knowledge is 
mandatory

The check should ensure that the input file is conform to what structure we 
expect.
 In CsvTableSource, we use to declare what field should be in the file. I want 
to get an Exception when any input file doesn't have one of those fields.

Depending on the format, it may be possible to check types, but not by checking 
each row which may imply a lot of processing.

Example :

Builder src_builder = CsvTableSource.builder().path(path);
 src_builder.field("col1", Types.INT());
 src_builder.field("col2", Types.STRING());
 src_builder.field("col3", Types.STRING());

We except a CSV file with 3 columns.
 Then, if something else comes in input :

++
||col1||col2||col4||
|Col A1|Col A2|blabla|

Exception : where is col3 ?

 
  
 All the best


was (Author: flacombe):
Fabien,

My comments on each point below :

1) No, because at the main() method run, the input files may be not known 
(especially with streaming processes)

2) Maybe, depending on what IO overhead it implies according to what you say

3) Yes it can work. For CSV files and other flat formats, header knowledge is 
mandatory

The check should ensure that the input file is conform to what structure we 
expect.
 In CsvTableSource, we use to declare what field should be in the file. I want 
to get an Exception when any input file doesn't have one of those fields.

Depending on the format, it may be possible to check types, but not by checking 
each row which may imply a lot of processing.

Example :

Builder src_builder = CsvTableSource.builder().path(path);
 src_builder.field("col1", Types.INT());
 src_builder.field("col2", Types.STRING());
 src_builder.field("col3", Types.STRING());

We except a CSV file with 3 columns.
 Then, if something else comes in input :

++
||col1||col2||col4||
|Col A1|Col A2|blabla|

Exception : where is col3 ?

 
  
 All the best

> CsvTableSource "lack of column" warning
> ---
>
> Key: FLINK-9814
> URL: https://issues.apache.org/jira/browse/FLINK-9814
> Project: Flink
>  Issue Type: Wish
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: François Lacombe
>Assignee: vinoyang
>Priority: Minor
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> The CsvTableSource class is built by defining expected columns to be find in 
> the corresponding csv file.
>  
> It would be great to throw an Exception when the csv file doesn't have the 
> same structure as defined in the source. For retro-compatibility sake, 
> developers should explicitly set the builder to define columns stricly and 
> expect Exception to be thrown in case of structure difference.
> It can be easilly checked with file header if it exists.
> Is this possible ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9853) add hex support in table api and sql

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545300#comment-16545300
 ] 

ASF GitHub Bot commented on FLINK-9853:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6337
  
+1, from my side


> add hex support in table api and sql
> 
>
> Key: FLINK-9853
> URL: https://issues.apache.org/jira/browse/FLINK-9853
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: xueyu
>Priority: Major
>  Labels: pull-request-available
>
> like in mysql, HEX could take int or string arguments, For a integer argument 
> N, it returns a hexadecimal string representation of the value of N. For a 
> string argument str, it returns a hexadecimal string representation of str 
> where each byte of each character in str is converted to two hexadecimal 
> digits. 
> Syntax:
> HEX(100) = 64
> HEX('This is a test String.') = '546869732069732061207465737420537472696e672e'
> See more: [link 
> MySQL|https://dev.mysql.com/doc/refman/8.0/en/string-functions.html#function_hex]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9857) Processing-time timers fire too early

2018-07-16 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545286#comment-16545286
 ] 

Aljoscha Krettek commented on FLINK-9857:
-

On a side note: even if we change this off-by-one bug, I think there can still 
be races because current processing time is queried using 
System.currentTimeMillis() and we set timers using a 
ScheduledThreadPoolExecutor (currently). If there's any race between those two 
you can also get weird results.

For these reasons, I would always suggest to go with event time or ingestion 
time, but I think the latter is currently not possible with the Table API/SQL.

> Processing-time timers fire too early
> -
>
> Key: FLINK-9857
> URL: https://issues.apache.org/jira/browse/FLINK-9857
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.3.4, 1.4.2, 1.5.1, 1.6.0
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.2, 1.6.0
>
>
> The firing of processing-time timers is off by one. This leads to problems in 
> edge cases, as discovered [here (mailing 
> list)|https://lists.apache.org/thread.html/e49748fa5fa1c9217b9dfb65eea7a37af1f2895c769528e77a1a93fa@%3Cuser.flink.apache.org%3E]
>  when elements arrive at the timestamp that is the end of the window.
> The problem is [here 
> (github)|https://github.com/apache/flink/blob/79b38f8f9a79b917d525842cf46087c5b8c40f3d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java#L231].
>  For event-time, we fire timers when the watermark is >= the timestamp, this 
> is correct because a watermark T says that we will not see elements with a 
> timestamp smaller or equal to T. For processing time, a time of T does not 
> say that we won't see an element with timestamp T, which makes 
> processing-time timers fire one ms too early.
> I think we can fix it by turning that {{<=}} into a {{<}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9842) Job submission fails via CLI with SSL enabled

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545279#comment-16545279
 ] 

ASF GitHub Bot commented on FLINK-9842:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6340


> Job submission fails via CLI with SSL enabled
> -
>
> Key: FLINK-9842
> URL: https://issues.apache.org/jira/browse/FLINK-9842
> Project: Flink
>  Issue Type: Bug
>  Components: Client, Job-Submission
>Affects Versions: 1.5.1, 1.6.0
>Reporter: Nico Kruber
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: pull-request-available, regression
> Fix For: 1.5.2, 1.6.0
>
>
> There's a regression in Flink 1.5.1 which leads to the job submission via CLI 
> failing with SSL enabled (1.5.0 works). Tried with the {{WordCount}} example:
> Client log:
> {code}
> 2018-07-16 11:11:12,688 INFO  org.apache.flink.client.cli.CliFrontend 
>   - 
> 
> 2018-07-16 11:11:12,690 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  Starting Command Line Client (Version: 1.5.1, Rev:3488f8b, 
> Date:10.07.2018 @ 11:51:27 GMT)
> 2018-07-16 11:11:12,690 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  OS current user: nico
> 2018-07-16 11:11:12,690 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  Current Hadoop/Kerberos user: 
> 2018-07-16 11:11:12,690 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 
> 1.8/25.171-b11
> 2018-07-16 11:11:12,690 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  Maximum heap size: 3534 MiBytes
> 2018-07-16 11:11:12,691 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  JAVA_HOME: /usr/lib64/jvm/java
> 2018-07-16 11:11:12,691 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  No Hadoop Dependency available
> 2018-07-16 11:11:12,691 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  JVM Options:
> 2018-07-16 11:11:12,691 INFO  org.apache.flink.client.cli.CliFrontend 
>   - 
> -Dlog.file=/home/nico/Downloads/flink-1.5.1/log/flink-nico-client-nico-work.fritz.box.log
> 2018-07-16 11:11:12,691 INFO  org.apache.flink.client.cli.CliFrontend 
>   - 
> -Dlog4j.configuration=file:/home/nico/Downloads/flink-1.5.1/conf/log4j-cli.properties
> 2018-07-16 11:11:12,691 INFO  org.apache.flink.client.cli.CliFrontend 
>   - 
> -Dlogback.configurationFile=file:/home/nico/Downloads/flink-1.5.1/conf/logback.xml
> 2018-07-16 11:11:12,691 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  Program Arguments:
> 2018-07-16 11:11:12,691 INFO  org.apache.flink.client.cli.CliFrontend 
>   - run
> 2018-07-16 11:11:12,692 INFO  org.apache.flink.client.cli.CliFrontend 
>   - ./examples/streaming/WordCount.jar
> 2018-07-16 11:11:12,692 INFO  org.apache.flink.client.cli.CliFrontend 
>   - --input
> 2018-07-16 11:11:12,692 INFO  org.apache.flink.client.cli.CliFrontend 
>   - LICENSE
> 2018-07-16 11:11:12,692 INFO  org.apache.flink.client.cli.CliFrontend 
>   -  Classpath: 
> /home/nico/Downloads/flink-1.5.1/lib/flink-python_2.11-1.5.1.jar:/home/nico/Downloads/flink-1.5.1/lib/log4j-1.2.17.jar:/home/nico/Downloads/flink-1.5.1/lib/slf4j-log4j12-1.7.7.jar:/home/nico/Downloads/flink-1.5.1/lib/flink-dist_2.11-1.5.1.jar:::
> 2018-07-16 11:11:12,692 INFO  org.apache.flink.client.cli.CliFrontend 
>   - 
> 
> 2018-07-16 11:11:12,698 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.rpc.address, localhost
> 2018-07-16 11:11:12,698 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.rpc.port, 6123
> 2018-07-16 11:11:12,698 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: jobmanager.heap.mb, 1024
> 2018-07-16 11:11:12,699 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: taskmanager.heap.mb, 1024
> 2018-07-16 11:11:12,699 INFO  
> org.apache.flink.configuration.GlobalConfiguration- Loading 
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2018-07-16 11:11:12,699 INFO  
> org.apache.flink.configuration.GlobalConfiguration- 

[jira] [Commented] (FLINK-9839) End-to-end test: Streaming job with SSL

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9839?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545280#comment-16545280
 ] 

ASF GitHub Bot commented on FLINK-9839:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6327


> End-to-end test: Streaming job with SSL
> ---
>
> Key: FLINK-9839
> URL: https://issues.apache.org/jira/browse/FLINK-9839
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> None of the existing e2e tests run with an SSL configuration but there should 
> be such a test as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9380) Failing end-to-end tests should not clean up logs

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545278#comment-16545278
 ] 

ASF GitHub Bot commented on FLINK-9380:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6289


> Failing end-to-end tests should not clean up logs
> -
>
> Key: FLINK-9380
> URL: https://issues.apache.org/jira/browse/FLINK-9380
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Deepak Sharma
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.5.2, 1.6.0
>
>
> Some of the end-to-end tests clean up their logs also in the failure case. 
> This makes debugging and understanding the problem extremely difficult. 
> Ideally, the scripts says where it stored the respective logs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6340: [FLINK-9842][rest] Pass actual configuration to Bl...

2018-07-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6340


---


[jira] [Comment Edited] (FLINK-9814) CsvTableSource "lack of column" warning

2018-07-16 Thread JIRA


[ 
https://issues.apache.org/jira/browse/FLINK-9814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545270#comment-16545270
 ] 

François Lacombe edited comment on FLINK-9814 at 7/16/18 2:50 PM:
--

Fabien,

My comments on each point below :

1) No, because at the main() method run, the input files may be not known 
(especially with streaming processes)

2) Maybe, depending on what IO overhead it implies according to what you say

3) Yes it can work. For CSV files and other flat formats, header knowledge is 
mandatory

The check should ensure that the input file is conform to what structure we 
expect.
 In CsvTableSource, we use to declare what field should be in the file. I want 
to get an Exception when any input file doesn't have one of those fields.

Depending on the format, it may be possible to check types, but not by checking 
each row which may imply a lot of processing.

Example :

Builder src_builder = CsvTableSource.builder().path(path);
 src_builder.field("col1", Types.INT());
 src_builder.field("col2", Types.STRING());
 src_builder.field("col3", Types.STRING());

We except a CSV file with 3 columns.
 Then, if something else comes in input :

++
||col1||col2||col4||
|Col A1|Col A2|blabla|

Exception : where is col3 ?

 
  
 All the best


was (Author: flacombe):
Fabien,

My comments on each point below :

1) No, because at the main() method exception the input files may be not known 
(especially with streaming processes)

2) Maybe, depending on what IO overhead it implies according to what you say

3) Yes it can work. For CSV files and other flat formats, header knowledge is 
mandatory

The check should ensure that the input file is conform to what structure we 
expect.
 In CsvTableSource, we use to declare what field should be in the file. I want 
to get an Exception when any input file doesn't have one of those fields.

Depending on the format, it may be possible to check types, but not by checking 
each row which may imply a lot of processing.

Example :

Builder src_builder = CsvTableSource.builder().path(path);
 src_builder.field("col1", Types.INT());
 src_builder.field("col2", Types.STRING());
 src_builder.field("col3", Types.STRING());

We except a CSV file with 3 columns.
 Then, if something else comes in input :

++
||col1||col2||col4||
|Col A1|Col A2|blabla|
Exception : where is col3 ?

 
 
All the best

> CsvTableSource "lack of column" warning
> ---
>
> Key: FLINK-9814
> URL: https://issues.apache.org/jira/browse/FLINK-9814
> Project: Flink
>  Issue Type: Wish
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: François Lacombe
>Assignee: vinoyang
>Priority: Minor
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> The CsvTableSource class is built by defining expected columns to be find in 
> the corresponding csv file.
>  
> It would be great to throw an Exception when the csv file doesn't have the 
> same structure as defined in the source. For retro-compatibility sake, 
> developers should explicitly set the builder to define columns stricly and 
> expect Exception to be thrown in case of structure difference.
> It can be easilly checked with file header if it exists.
> Is this possible ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6327: [FLINK-9839][e2e] add end-to-end tests with SSL en...

2018-07-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6327


---


[GitHub] flink pull request #6289: [FLINK-9380]: Modified end to end test runner scri...

2018-07-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6289


---


[jira] [Commented] (FLINK-9814) CsvTableSource "lack of column" warning

2018-07-16 Thread JIRA


[ 
https://issues.apache.org/jira/browse/FLINK-9814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545270#comment-16545270
 ] 

François Lacombe commented on FLINK-9814:
-

Fabien,

My comments on each point below :

1) No, because at the main() method exception the input files may be not known 
(especially with streaming processes)

2) Maybe, depending on what IO overhead it implies according to what you say

3) Yes it can work. For CSV files and other flat formats, header knowledge is 
mandatory

The check should ensure that the input file is conform to what structure we 
expect.
 In CsvTableSource, we use to declare what field should be in the file. I want 
to get an Exception when any input file doesn't have one of those fields.

Depending on the format, it may be possible to check types, but not by checking 
each row which may imply a lot of processing.

Example :

Builder src_builder = CsvTableSource.builder().path(path);
 src_builder.field("col1", Types.INT());
 src_builder.field("col2", Types.STRING());
 src_builder.field("col3", Types.STRING());

We except a CSV file with 3 columns.
 Then, if something else comes in input :

++
||col1||col2||col4||
|Col A1|Col A2|blabla|
Exception : where is col3 ?

 
 
All the best

> CsvTableSource "lack of column" warning
> ---
>
> Key: FLINK-9814
> URL: https://issues.apache.org/jira/browse/FLINK-9814
> Project: Flink
>  Issue Type: Wish
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: François Lacombe
>Assignee: vinoyang
>Priority: Minor
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> The CsvTableSource class is built by defining expected columns to be find in 
> the corresponding csv file.
>  
> It would be great to throw an Exception when the csv file doesn't have the 
> same structure as defined in the source. For retro-compatibility sake, 
> developers should explicitly set the builder to define columns stricly and 
> expect Exception to be thrown in case of structure difference.
> It can be easilly checked with file header if it exists.
> Is this possible ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9813) Build xTableSource from Avro schemas

2018-07-16 Thread JIRA


[ 
https://issues.apache.org/jira/browse/FLINK-9813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545255#comment-16545255
 ] 

François Lacombe commented on FLINK-9813:
-

Hi Fabien,

I am suggesting to build a CsvTableSource (or any other TableSource) from Avro 
schemas (not Avro data format).
The point isn't to support more input format, but to describe all structures 
with one common "language".

Avro schema ([https://avro.apache.org/docs/1.8.1/spec.html#schemas)] define the 
structure of each record, just like CsvTableSource Builder do when you call 
.field() method.
The schema doesn't specify csv separator or row separator for instance, but 
only expected columns and their type. That's why it's only about structure but 
not whole format.

Avro schemas are highly versatile and get more and more supported. It's really 
convenient to write such a structure descriptor whatever the file format.

I think Flink can get strong benefit to add support of such schemas by building 
its sources from it.

 

Here is what I'm currently doing, which may be a bit awkward

{{import org.apache.avro.Schema;}}
{{import org.apache.flink.table.api.Types;}}
{{import org.apache.flink.table.sources.CsvTableSource}}


{{public static CsvTableSource getFromSchema(String path, Schema sch) {}}
{{        HashMap> primitiveTypes = new 
HashMap>();}}
{{    primitiveTypes.put(Schema.Type.BOOLEAN, Types.BOOLEAN());}}
{{    primitiveTypes.put(Schema.Type.INT, Types.INT());}}
{{    primitiveTypes.put(Schema.Type.LONG, Types.LONG());}}
{{    primitiveTypes.put(Schema.Type.FLOAT, Types.FLOAT());}}
{{    primitiveTypes.put(Schema.Type.DOUBLE, Types.DOUBLE());}}
{{    primitiveTypes.put(Schema.Type.BYTES, Types.BYTE());}}
{{    primitiveTypes.put(Schema.Type.STRING, Types.STRING());}}
{{    }}
{{        Builder src_builder = CsvTableSource.builder().path(path);}}
{{        }}
{{        for (Schema field_nfo : sch.getTypes()){}}
{{            src_builder.field(field_nfo.getName(), 
primitiveTypes.get(field_nfo.getType()));}}
{{        }}}
{{        }}
{{    return src_builder.build();}}
{{}}}

 

With FLINK-9814, I would be able to build sources from a schema description and 
get Exception when the input file doesn't match the format specification

 

All the best

> Build xTableSource from Avro schemas
> 
>
> Key: FLINK-9813
> URL: https://issues.apache.org/jira/browse/FLINK-9813
> Project: Flink
>  Issue Type: Wish
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: François Lacombe
>Priority: Trivial
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> As Avro provide efficient data schemas formalism, it may be great to be able 
> to build Flink Tables Sources with such files.
> More info about Avro schemas 
> :[https://avro.apache.org/docs/1.8.1/spec.html#schemas]
> For instance, with CsvTableSource :
> Parser schemaParser = new Schema.Parser();
> Schema tableSchema = schemaParser.parse("avro.json");
> Builder bld = CsvTableSource.builder().schema(tableSchema);
>  
> This would give me a fully available CsvTableSource with columns defined in 
> avro.json
> It may be possible to do so for every TableSources since avro format is 
> really common and versatile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9864) Make timestamp extraction more flexible in SQL Client

2018-07-16 Thread Timo Walther (JIRA)
Timo Walther created FLINK-9864:
---

 Summary: Make timestamp extraction more flexible in SQL Client
 Key: FLINK-9864
 URL: https://issues.apache.org/jira/browse/FLINK-9864
 Project: Flink
  Issue Type: New Feature
  Components: Table API  SQL
Reporter: Timo Walther
Assignee: Timo Walther


Currently, a timestamp must be in the top-level of a possibly nested row and 
must have a certain format. We should think about making this more flexible to 
cover most of the use cases.

A first solution could be to allow a DOT operator syntax: 
{{myfield.nested.timestamp}}
Other cases might be:
- The time could also be split into several field
- Or needs to be parsed using a [date format 
syntax|https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql.html#date-format-specifier].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9748) create_source_release pollutes flink root directory

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545204#comment-16545204
 ] 

ASF GitHub Bot commented on FLINK-9748:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6342

[FLINK-9748][release] Use dedicated directory for release artifacts

## What is the purpose of the change

With this PR artifacts created during the release process are no longer 
placed in the root flink directory, but instead a dedicated directory under 
`/tools/releasing`.
This makes it easier to reset the repository state in case of an error, as 
all you have to do is remove said directory. It also prevents accidentally 
committing release files.
In case of success this directory will contain all release artifacts that 
should be uploaded.

Additionally this PR introduces variables for commonly used directories 
(flink root directory, release directory, flink-clone directory) and reduces 
usages of relative paths.

## Brief change log

* modifies source/binary release scripts to use dedicate directory for 
storing release artifacts
* modified rat-plugin to exclude release directory
* modified .gitignore to exclude release directory

## Verifying this change

Manually verified.

@aljoscha @tillrohrmann I'd appreciate your input.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9748

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6342.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6342


commit 92bf5ce764dafa82fcdc2ad3c625d194979c76d9
Author: zentol 
Date:   2018-07-16T13:16:19Z

[FLINK-9748][release] Use dedicated directory for release artifacts




> create_source_release pollutes flink root directory
> ---
>
> Key: FLINK-9748
> URL: https://issues.apache.org/jira/browse/FLINK-9748
> Project: Flink
>  Issue Type: Improvement
>  Components: Release System
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
>
> The {{create_source_release.sh}} moves generated files into the root 
> directory of the flink repository, and also creates the flink-clone in this 
> directory.
> It would be nicer if these files were placed under a dedicated directory (say 
> releasing/release).
> This would prevent file to prevent release files from accidentally being 
> committed by adding  this directory to the {{.gitignore}} file, and makes it 
> easier to cleanup a failed release attempt by actually making it obvious 
> which files belong to the release.
> I already implemented these changes in the {{flink-shaded}} release scripts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9748) create_source_release pollutes flink root directory

2018-07-16 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9748:
--
Labels: pull-request-available  (was: )

> create_source_release pollutes flink root directory
> ---
>
> Key: FLINK-9748
> URL: https://issues.apache.org/jira/browse/FLINK-9748
> Project: Flink
>  Issue Type: Improvement
>  Components: Release System
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
>
> The {{create_source_release.sh}} moves generated files into the root 
> directory of the flink repository, and also creates the flink-clone in this 
> directory.
> It would be nicer if these files were placed under a dedicated directory (say 
> releasing/release).
> This would prevent file to prevent release files from accidentally being 
> committed by adding  this directory to the {{.gitignore}} file, and makes it 
> easier to cleanup a failed release attempt by actually making it obvious 
> which files belong to the release.
> I already implemented these changes in the {{flink-shaded}} release scripts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9855) Check all calls to the TypeExtractor.getBinaryOperatorReturnType()

2018-07-16 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther reassigned FLINK-9855:
---

Assignee: Timo Walther  (was: Florian Schmidt)

> Check all calls to the TypeExtractor.getBinaryOperatorReturnType()
> --
>
> Key: FLINK-9855
> URL: https://issues.apache.org/jira/browse/FLINK-9855
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.6.0
>
>
> KeyedStream.IntervalJoined is not calling type extraction functions 
> correctly. It should have an index.
> Other examples with potentially wrong usage are:
>  - process(final BroadcastProcessFunction function) and
>  - flatMap(CoFlatMapFunction coFlatMapper



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6342: [FLINK-9748][release] Use dedicated directory for ...

2018-07-16 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6342

[FLINK-9748][release] Use dedicated directory for release artifacts

## What is the purpose of the change

With this PR artifacts created during the release process are no longer 
placed in the root flink directory, but instead a dedicated directory under 
`/tools/releasing`.
This makes it easier to reset the repository state in case of an error, as 
all you have to do is remove said directory. It also prevents accidentally 
committing release files.
In case of success this directory will contain all release artifacts that 
should be uploaded.

Additionally this PR introduces variables for commonly used directories 
(flink root directory, release directory, flink-clone directory) and reduces 
usages of relative paths.

## Brief change log

* modifies source/binary release scripts to use dedicate directory for 
storing release artifacts
* modified rat-plugin to exclude release directory
* modified .gitignore to exclude release directory

## Verifying this change

Manually verified.

@aljoscha @tillrohrmann I'd appreciate your input.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9748

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6342.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6342


commit 92bf5ce764dafa82fcdc2ad3c625d194979c76d9
Author: zentol 
Date:   2018-07-16T13:16:19Z

[FLINK-9748][release] Use dedicated directory for release artifacts




---


[jira] [Commented] (FLINK-9855) Check all calls to the TypeExtractor.getBinaryOperatorReturnType()

2018-07-16 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545202#comment-16545202
 ] 

Timo Walther commented on FLINK-9855:
-

I will assign this issue to me. I agree that this method is not easy to call 
but it needs to be flexible enough for all kinds of function interfaces that 
Flink offers. And Flink offers a lot of them :D

> Check all calls to the TypeExtractor.getBinaryOperatorReturnType()
> --
>
> Key: FLINK-9855
> URL: https://issues.apache.org/jira/browse/FLINK-9855
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Timo Walther
>Assignee: Florian Schmidt
>Priority: Blocker
> Fix For: 1.6.0
>
>
> KeyedStream.IntervalJoined is not calling type extraction functions 
> correctly. It should have an index.
> Other examples with potentially wrong usage are:
>  - process(final BroadcastProcessFunction function) and
>  - flatMap(CoFlatMapFunction coFlatMapper



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5750) Incorrect translation of n-ary Union

2018-07-16 Thread Alexander Koltsov (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545193#comment-16545193
 ] 

Alexander Koltsov commented on FLINK-5750:
--

Now it's clear.
Thanks a lot.

> Incorrect translation of n-ary Union
> 
>
> Key: FLINK-5750
> URL: https://issues.apache.org/jira/browse/FLINK-5750
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.2.0, 1.3.4, 1.5.0, 1.4.2, 1.6.0
>Reporter: Anton Mushin
>Assignee: Alexander Koltsov
>Priority: Critical
>  Labels: pull-request-available
>
> Calcite's union operator is supports more than two input relations. However, 
> Flink's translation rules only consider the first two relations because we 
> assumed that Calcite's union is binary. 
> This problem exists for batch and streaming queries.
> It seems that Calcite only generates non-binary Unions in rare cases 
> ({{(SELECT * FROM t) UNION ALL (SELECT * FROM t) UNION ALL (SELECT * FROM 
> t)}} results in two binary union operators) but the problem definitely needs 
> to be fixed.
> The following query can be used to validate the problem. 
> {code:java}
> @Test
>   public void testValuesWithCast() throws Exception {
>   ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>   BatchTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env, config());
>   String sqlQuery = "VALUES (1, cast(1 as BIGINT) )," +
>   "(2, cast(2 as BIGINT))," +
>   "(3, cast(3 as BIGINT))";
>   String sqlQuery2 = "VALUES (1,1)," +
>   "(2, 2)," +
>   "(3, 3)";
>   Table result = tableEnv.sql(sqlQuery);
>   DataSet resultSet = tableEnv.toDataSet(result, Row.class);
>   List results = resultSet.collect();
>   Table result2 = tableEnv.sql(sqlQuery2);
>   DataSet resultSet2 = tableEnv.toDataSet(result2, 
> Row.class);
>   List results2 = resultSet2.collect();
>   String expected = "1,1\n2,2\n3,3";
>   compareResultAsText(results2, expected);
>   compareResultAsText(results, expected);
>   }
> {code}
> AR for {{results}} variable
> {noformat}
> java.lang.AssertionError: Different elements in arrays: expected 3 elements 
> and received 2
>  expected: [1,1, 2,2, 3,3]
>  received: [1,1, 2,2] 
> Expected :3
> Actual   :2
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5750) Incorrect translation of n-ary Union

2018-07-16 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545191#comment-16545191
 ] 

Fabian Hueske commented on FLINK-5750:
--

We prefer PRs with a single initial commit. Commits that are added later on top 
to address feedback are fine.

Thanks, Fabian

> Incorrect translation of n-ary Union
> 
>
> Key: FLINK-5750
> URL: https://issues.apache.org/jira/browse/FLINK-5750
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.2.0, 1.3.4, 1.5.0, 1.4.2, 1.6.0
>Reporter: Anton Mushin
>Assignee: Alexander Koltsov
>Priority: Critical
>  Labels: pull-request-available
>
> Calcite's union operator is supports more than two input relations. However, 
> Flink's translation rules only consider the first two relations because we 
> assumed that Calcite's union is binary. 
> This problem exists for batch and streaming queries.
> It seems that Calcite only generates non-binary Unions in rare cases 
> ({{(SELECT * FROM t) UNION ALL (SELECT * FROM t) UNION ALL (SELECT * FROM 
> t)}} results in two binary union operators) but the problem definitely needs 
> to be fixed.
> The following query can be used to validate the problem. 
> {code:java}
> @Test
>   public void testValuesWithCast() throws Exception {
>   ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>   BatchTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env, config());
>   String sqlQuery = "VALUES (1, cast(1 as BIGINT) )," +
>   "(2, cast(2 as BIGINT))," +
>   "(3, cast(3 as BIGINT))";
>   String sqlQuery2 = "VALUES (1,1)," +
>   "(2, 2)," +
>   "(3, 3)";
>   Table result = tableEnv.sql(sqlQuery);
>   DataSet resultSet = tableEnv.toDataSet(result, Row.class);
>   List results = resultSet.collect();
>   Table result2 = tableEnv.sql(sqlQuery2);
>   DataSet resultSet2 = tableEnv.toDataSet(result2, 
> Row.class);
>   List results2 = resultSet2.collect();
>   String expected = "1,1\n2,2\n3,3";
>   compareResultAsText(results2, expected);
>   compareResultAsText(results, expected);
>   }
> {code}
> AR for {{results}} variable
> {noformat}
> java.lang.AssertionError: Different elements in arrays: expected 3 elements 
> and received 2
>  expected: [1,1, 2,2, 3,3]
>  received: [1,1, 2,2] 
> Expected :3
> Actual   :2
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9435) Remove per-key selection Tuple instantiation via reflection in ComparableKeySelector and ArrayKeySelector

2018-07-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545188#comment-16545188
 ] 

ASF GitHub Bot commented on FLINK-9435:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6115
  
Thanks for the update @NicoK. +1 to merge this


> Remove per-key selection Tuple instantiation via reflection in 
> ComparableKeySelector and ArrayKeySelector
> -
>
> Key: FLINK-9435
> URL: https://issues.apache.org/jira/browse/FLINK-9435
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.3.0, 1.3.1, 1.3.2, 1.3.3, 1.4.0, 1.5.0, 1.4.1, 1.4.2, 
> 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Inside {{KeySelectorUtil}}, every {{ComparableKeySelector#getKey()}} call 
> currently creates a new tuple from 
> {{Tuple.getTupleClass(keyLength).newInstance();}} which seems expensive. 
> Instead, we could get a template tuple and use {{Tuple#copy()}} which copies 
> the right sub-class in a more optimal way.
> Similarly, {{ArrayKeySelector}} instantiates new {{Tuple}} instances via 
> reflection which can be changed the same way.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6115: [FLINK-9435][java] Remove per-key selection Tuple instant...

2018-07-16 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6115
  
Thanks for the update @NicoK. +1 to merge this


---


[jira] [Commented] (FLINK-5750) Incorrect translation of n-ary Union

2018-07-16 Thread Alexander Koltsov (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16545184#comment-16545184
 ] 

Alexander Koltsov commented on FLINK-5750:
--

[~fhueske],
I'm agree with your point. But then PR will contain a few commits. And a rule: 
one PR=one commit will not be followed. That's why I asked about that.

> Incorrect translation of n-ary Union
> 
>
> Key: FLINK-5750
> URL: https://issues.apache.org/jira/browse/FLINK-5750
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.2.0, 1.3.4, 1.5.0, 1.4.2, 1.6.0
>Reporter: Anton Mushin
>Assignee: Alexander Koltsov
>Priority: Critical
>  Labels: pull-request-available
>
> Calcite's union operator is supports more than two input relations. However, 
> Flink's translation rules only consider the first two relations because we 
> assumed that Calcite's union is binary. 
> This problem exists for batch and streaming queries.
> It seems that Calcite only generates non-binary Unions in rare cases 
> ({{(SELECT * FROM t) UNION ALL (SELECT * FROM t) UNION ALL (SELECT * FROM 
> t)}} results in two binary union operators) but the problem definitely needs 
> to be fixed.
> The following query can be used to validate the problem. 
> {code:java}
> @Test
>   public void testValuesWithCast() throws Exception {
>   ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>   BatchTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env, config());
>   String sqlQuery = "VALUES (1, cast(1 as BIGINT) )," +
>   "(2, cast(2 as BIGINT))," +
>   "(3, cast(3 as BIGINT))";
>   String sqlQuery2 = "VALUES (1,1)," +
>   "(2, 2)," +
>   "(3, 3)";
>   Table result = tableEnv.sql(sqlQuery);
>   DataSet resultSet = tableEnv.toDataSet(result, Row.class);
>   List results = resultSet.collect();
>   Table result2 = tableEnv.sql(sqlQuery2);
>   DataSet resultSet2 = tableEnv.toDataSet(result2, 
> Row.class);
>   List results2 = resultSet2.collect();
>   String expected = "1,1\n2,2\n3,3";
>   compareResultAsText(results2, expected);
>   compareResultAsText(results, expected);
>   }
> {code}
> AR for {{results}} variable
> {noformat}
> java.lang.AssertionError: Different elements in arrays: expected 3 elements 
> and received 2
>  expected: [1,1, 2,2, 3,3]
>  received: [1,1, 2,2] 
> Expected :3
> Actual   :2
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >