[jira] [Created] (FLINK-8552) CliFrontend don't exit normal after job has been submitted with 'bin/flink run some_jar_ball'

2018-02-02 Thread Lynch Lee (JIRA)
Lynch Lee created FLINK-8552:


 Summary: CliFrontend don't exit normal after job has been 
submitted with 'bin/flink run some_jar_ball'
 Key: FLINK-8552
 URL: https://issues.apache.org/jira/browse/FLINK-8552
 Project: Flink
  Issue Type: Bug
  Components: Client
Affects Versions: 1.4.0
Reporter: Lynch Lee
 Fix For: 1.4.0


I used cmd 'bin/flink run some_jar_ball' to submit my job into remote cluster, 
but I found it the java process did not exit normally while my submitting 
action is done and job status changed into RUNNING . 

 

Is this a Bug to fixed ?

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8551) Should BIGINT in Flink SQL will be enchanced

2018-02-02 Thread Lynch Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lynch Lee updated FLINK-8551:
-
Summary: Should BIGINT in Flink SQL will be enchanced  (was: Should BIGINT 
in Flink SQL will be enchance to )

> Should BIGINT in Flink SQL will be enchanced
> 
>
> Key: FLINK-8551
> URL: https://issues.apache.org/jira/browse/FLINK-8551
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Lynch Lee
>Priority: Major
> Fix For: 1.4.0
>
>
> As we all known , see 
> [https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-type-conversions.html]
> SQL data type BIGINT in mysql represents UNSIGNED BIGINT by default , it can 
> receive value from some columns typed Long or BigInteger if we use java/scala 
> driver to write data into db.
> ||MySQL Type Name||Return value of {{GetColumnTyneName}}||Return value of 
> {{GetColumnClassName}}||
> |{{BIGINT[(M)] [UNSIGNED]}}|{{BIGINT [UNSIGNED]}}|{{java.lang.Long}}, if 
> UNSIGNED {{java.math.BigInteger}}|
>  
> But now , in Flink SQL BIGINT just represents SIGNED BIGINT, that to say it 
> can only receive value from some columns typed Long .
> all supported types of the Table API defined in this class 
> org.apache.flink.table.api.Types
>  
> so if we should let BIGINT in Flink SQL match the BIGINT in some SQL Engine , 
> like MySQL.
> Or, as a new feature, support a new FLINK SQL type named BIGINTEGER for Java 
> type Biginteger and also into MySQL BIGINT?
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8551) Should BIGINT in Flink SQL will be enchance to

2018-02-02 Thread Lynch Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lynch Lee updated FLINK-8551:
-
Description: 
As we all known , see 
[https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-type-conversions.html]

SQL data type BIGINT in mysql represents UNSIGNED BIGINT by default , it can 
receive value from some columns typed Long or BigInteger if we use java/scala 
driver to write data into db.
||MySQL Type Name||Return value of {{GetColumnTyneName}}||Return value of 
{{GetColumnClassName}}||
|{{BIGINT[(M)] [UNSIGNED]}}|{{BIGINT [UNSIGNED]}}|{{java.lang.Long}}, if 
UNSIGNED {{java.math.BigInteger}}|

 

But now , in Flink SQL BIGINT just represents SIGNED BIGINT, that to say it can 
only receive value from some columns typed Long .

all supported types of the Table API defined in this class 
org.apache.flink.table.api.Types

 

so if we should let BIGINT in Flink SQL match the BIGINT in some SQL Engine , 
like MySQL.

Or, as a new feature, support a new FLINK SQL type named BIGINTEGER for Java 
type Biginteger and also into MySQL BIGINT?

 

 

 

 

  was:
As we all known , see 
[https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-type-conversions.html]

SQL data type BIGINT in mysql represents UNSIGNED BIGINT by default , it can 
receive value from some columns typed Long or BigInteger if we use java/scala 
driver to write data into db.
||MySQL Type Name||Return value of {{GetColumnTyneName}}||Return value of 
{{GetColumnClassName}}||
|{{BIGINT[(M)] [UNSIGNED]}}|{{BIGINT [UNSIGNED]}}|{{java.lang.Long}}, if 
UNSIGNED {{java.math.BigInteger}}|

 

But now , in Flink SQL BIGINT just represents SIGNED BIGINT, that to say it can 
only receive value from some columns typed Long .

all supported types of the Table API defined in this class 
org.apache.flink.table.api.Types

 

so if we should let BIGINT in Flink SQL match the BIGINT in some SQL Engine , 
like MySQL.

 

 

 

 


> Should BIGINT in Flink SQL will be enchance to 
> ---
>
> Key: FLINK-8551
> URL: https://issues.apache.org/jira/browse/FLINK-8551
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Lynch Lee
>Priority: Major
> Fix For: 1.4.0
>
>
> As we all known , see 
> [https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-type-conversions.html]
> SQL data type BIGINT in mysql represents UNSIGNED BIGINT by default , it can 
> receive value from some columns typed Long or BigInteger if we use java/scala 
> driver to write data into db.
> ||MySQL Type Name||Return value of {{GetColumnTyneName}}||Return value of 
> {{GetColumnClassName}}||
> |{{BIGINT[(M)] [UNSIGNED]}}|{{BIGINT [UNSIGNED]}}|{{java.lang.Long}}, if 
> UNSIGNED {{java.math.BigInteger}}|
>  
> But now , in Flink SQL BIGINT just represents SIGNED BIGINT, that to say it 
> can only receive value from some columns typed Long .
> all supported types of the Table API defined in this class 
> org.apache.flink.table.api.Types
>  
> so if we should let BIGINT in Flink SQL match the BIGINT in some SQL Engine , 
> like MySQL.
> Or, as a new feature, support a new FLINK SQL type named BIGINTEGER for Java 
> type Biginteger and also into MySQL BIGINT?
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8506) fullRestarts Gauge not incremented when jobmanager got killed

2018-02-02 Thread Steven Zhen Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu closed FLINK-8506.
-
Resolution: Not A Problem

> fullRestarts Gauge not incremented when jobmanager got killed
> -
>
> Key: FLINK-8506
> URL: https://issues.apache.org/jira/browse/FLINK-8506
> Project: Flink
>  Issue Type: Bug
>Reporter: Steven Zhen Wu
>Priority: Major
>
> [~till.rohrmann] When jobmanager node got killed, it will cause job restart. 
> But in this case, we didn't see _fullRestarts_ guage got incremented. is this 
> expected or a bug?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8551) Should BIGINT in Flink SQL will be enchance to

2018-02-02 Thread Lynch Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lynch Lee updated FLINK-8551:
-
Description: 
As we all known , see 
[https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-type-conversions.html]

SQL data type BIGINT in mysql represents UNSIGNED BIGINT by default , it can 
receive value from some columns typed Long or BigInteger if we use java/scala 
driver to write data into db.
||MySQL Type Name||Return value of {{GetColumnTyneName}}||Return value of 
{{GetColumnClassName}}||
|{{BIGINT[(M)] [UNSIGNED]}}|{{BIGINT [UNSIGNED]}}|{{java.lang.Long}}, if 
UNSIGNED {{java.math.BigInteger}}|

 

But now , in Flink SQL BIGINT just represents SIGNED BIGINT, that to say it can 
only receive value from some columns typed Long .

all supported types of the Table API defined in this class 
org.apache.flink.table.api.Types

 

so if we should let BIGINT in Flink SQL match the BIGINT in some SQL Engine , 
like MySQL.

 

 

 

 

  was:
As we all known , see 
[https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-type-conversions.html]

SQL data type BIGINT in mysql represents UNSIGNED BIGINT by default , it can 
receive value from some columns typed Long or BigInteger if we use java/scala 
driver to write data into db.
||MySQL Type Name||Return value of {{GetColumnTyneName}}||Return value of 
{{GetColumnClassName}}||
|{{BIGINT[(M)] [UNSIGNED]}}|{{BIGINT [UNSIGNED]}}|{{java.lang.Long}}, if 
UNSIGNED {{java.math.BigInteger}}|

 

By now , in Flink SQL BIGINT just represents SIGNED BIGINT, that to say it can 
only receive value from some columns typed Long .

all supported types of the Table API defined in this class 
org.apache.flink.table.api.Types

 

so if we should let BIGINT in Flink SQL match the BIGINT in some SQL Engine , 
like MySQL.

 

 

 

 


> Should BIGINT in Flink SQL will be enchance to 
> ---
>
> Key: FLINK-8551
> URL: https://issues.apache.org/jira/browse/FLINK-8551
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Lynch Lee
>Priority: Major
> Fix For: 1.4.0
>
>
> As we all known , see 
> [https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-type-conversions.html]
> SQL data type BIGINT in mysql represents UNSIGNED BIGINT by default , it can 
> receive value from some columns typed Long or BigInteger if we use java/scala 
> driver to write data into db.
> ||MySQL Type Name||Return value of {{GetColumnTyneName}}||Return value of 
> {{GetColumnClassName}}||
> |{{BIGINT[(M)] [UNSIGNED]}}|{{BIGINT [UNSIGNED]}}|{{java.lang.Long}}, if 
> UNSIGNED {{java.math.BigInteger}}|
>  
> But now , in Flink SQL BIGINT just represents SIGNED BIGINT, that to say it 
> can only receive value from some columns typed Long .
> all supported types of the Table API defined in this class 
> org.apache.flink.table.api.Types
>  
> so if we should let BIGINT in Flink SQL match the BIGINT in some SQL Engine , 
> like MySQL.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8551) Should BIGINT in Flink SQL will be enchance to

2018-02-02 Thread Lynch Lee (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lynch Lee updated FLINK-8551:
-
Description: 
As we all known , see 
[https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-type-conversions.html]

SQL data type BIGINT in mysql represents UNSIGNED BIGINT by default , it can 
receive value from some columns typed Long or BigInteger if we use java/scala 
driver to write data into db.
||MySQL Type Name||Return value of {{GetColumnTyneName}}||Return value of 
{{GetColumnClassName}}||
|{{BIGINT[(M)] [UNSIGNED]}}|{{BIGINT [UNSIGNED]}}|{{java.lang.Long}}, if 
UNSIGNED {{java.math.BigInteger}}|

 

By now , in Flink SQL BIGINT just represents SIGNED BIGINT, that to say it can 
only receive value from some columns typed Long .

all supported types of the Table API defined in this class 
org.apache.flink.table.api.Types

 

so if we should let BIGINT in Flink SQL match the BIGINT in some SQL Engine , 
like MySQL.

 

 

 

 

  was:
As we all known , see 
[https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-type-conversions.html]

SQL data type BIGINT in mysql represents UNSIGNED BIGINT by default , it can 
receive value from some columns typed Long or BigInteger if we use java/scala 
driver to write data into db.
|{{BIGINT[(M)] [UNSIGNED]}}|{{BIGINT [UNSIGNED]}}|{{java.lang.Long}}, if 
UNSIGNED {{java.math.BigInteger}}|

 

By now , in Flink SQL BIGINT just represents SIGNED BIGINT, that to say it can 
only receive value from some columns typed Long .

all supported types of the Table API defined in this class 
org.apache.flink.table.api.Types

 

so if we should let BIGINT in Flink SQL match the BIGINT in some SQL Engine , 
like MySQL.

 

 

 

 


> Should BIGINT in Flink SQL will be enchance to 
> ---
>
> Key: FLINK-8551
> URL: https://issues.apache.org/jira/browse/FLINK-8551
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Lynch Lee
>Priority: Major
> Fix For: 1.4.0
>
>
> As we all known , see 
> [https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-type-conversions.html]
> SQL data type BIGINT in mysql represents UNSIGNED BIGINT by default , it can 
> receive value from some columns typed Long or BigInteger if we use java/scala 
> driver to write data into db.
> ||MySQL Type Name||Return value of {{GetColumnTyneName}}||Return value of 
> {{GetColumnClassName}}||
> |{{BIGINT[(M)] [UNSIGNED]}}|{{BIGINT [UNSIGNED]}}|{{java.lang.Long}}, if 
> UNSIGNED {{java.math.BigInteger}}|
>  
> By now , in Flink SQL BIGINT just represents SIGNED BIGINT, that to say it 
> can only receive value from some columns typed Long .
> all supported types of the Table API defined in this class 
> org.apache.flink.table.api.Types
>  
> so if we should let BIGINT in Flink SQL match the BIGINT in some SQL Engine , 
> like MySQL.
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8551) Should BIGINT in Flink SQL will be enchance to

2018-02-02 Thread Lynch Lee (JIRA)
Lynch Lee created FLINK-8551:


 Summary: Should BIGINT in Flink SQL will be enchance to 
 Key: FLINK-8551
 URL: https://issues.apache.org/jira/browse/FLINK-8551
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Affects Versions: 1.4.0
Reporter: Lynch Lee
 Fix For: 1.4.0


As we all known , see 
[https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-type-conversions.html]

SQL data type BIGINT in mysql represents UNSIGNED BIGINT by default , it can 
receive value from some columns typed Long or BigInteger if we use java/scala 
driver to write data into db.
|{{BIGINT[(M)] [UNSIGNED]}}|{{BIGINT [UNSIGNED]}}|{{java.lang.Long}}, if 
UNSIGNED {{java.math.BigInteger}}|

 

By now , in Flink SQL BIGINT just represents SIGNED BIGINT, that to say it can 
only receive value from some columns typed Long .

all supported types of the Table API defined in this class 
org.apache.flink.table.api.Types

 

so if we should let BIGINT in Flink SQL match the BIGINT in some SQL Engine , 
like MySQL.

 

 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7525) Add config option to disable Cancel functionality on UI

2018-02-02 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16351287#comment-16351287
 ] 

Chesnay Schepler commented on FLINK-7525:
-

Nah, various FLIP-6 issues aren't assigned as subtasks of FLINK-4319. Look for 
tickets with the "flip-6" label to see what is happening.

> Add config option to disable Cancel functionality on UI
> ---
>
> Key: FLINK-7525
> URL: https://issues.apache.org/jira/browse/FLINK-7525
> Project: Flink
>  Issue Type: Improvement
>  Components: Web Client, Webfrontend
>Reporter: Ted Yu
>Priority: Major
>
> In this email thread 
> http://search-hadoop.com/m/Flink/VkLeQlf0QOnc7YA?subj=Security+Control+of+running+Flink+Jobs+on+Flink+UI
>  , Raja was asking for a way to control how users cancel Job(s).
> Robert proposed adding a config option which disables the Cancel 
> functionality.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5407: [hotfix][build] Fix duplicate maven enforce plugin...

2018-02-02 Thread yew1eb
GitHub user yew1eb opened a pull request:

https://github.com/apache/flink/pull/5407

[hotfix][build] Fix duplicate maven enforce plugin declaration

## What is the purpose of the change
Hotfix duplicate `maven-enforce-plugin` declaration.

## Brief change log

 - *Merge duplicate `maven-enforce-plugin` declaration.*

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yew1eb/flink 
duplicate_enforce_plugin_declaration

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5407.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5407


commit bbed22f71ff6c5792d6eeb182a6a488c36a10b94
Author: zhouhai02 
Date:   2018-02-03T06:52:51Z

[hotfix][build] Fix duplicate maven enforce plugin declaration




---


[GitHub] flink pull request #5406: [hotfix] Fix typos in comments.

2018-02-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5406#discussion_r165807233
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
 ---
@@ -90,11 +90,11 @@
CompletableFuture releaseTaskManager(final ResourceID 
resourceId);
 
/**
-* Offers a slot to the {@link SlotPool}. The slot offer can be 
accepted or
+* Offers a slot to the {@link SlotPool}. The slot offers can be 
accepted or
 * rejected.
 *
-* @param taskManagerLocation from which the slot offer originates
-* @param taskManagerGateway to talk to the slot offerer
+* @param taskManagerLocation from which the slot offers originate
+* @param taskManagerGateway to talk to the slot offers
--- End diff --

the previous version was correct


---


[GitHub] flink pull request #5406: [hotfix] Fix typos in comments.

2018-02-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5406#discussion_r165807228
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
 ---
@@ -109,7 +109,7 @@
 * slot offers.
 *
 * @param taskManagerLocation from which the slot offers originate
-* @param taskManagerGateway to talk to the slot offerer
+* @param taskManagerGateway to talk to the slot offers
--- End diff --

the previous version was correct


---


[GitHub] flink pull request #5406: [hotfix] Fix typos in comments.

2018-02-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5406#discussion_r165807220
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolGateway.java
 ---
@@ -90,11 +90,11 @@
CompletableFuture releaseTaskManager(final ResourceID 
resourceId);
 
/**
-* Offers a slot to the {@link SlotPool}. The slot offer can be 
accepted or
+* Offers a slot to the {@link SlotPool}. The slot offers can be 
accepted or
 * rejected.
 *
-* @param taskManagerLocation from which the slot offer originates
-* @param taskManagerGateway to talk to the slot offerer
+* @param taskManagerLocation from which the slot offers originate
--- End diff --

given that this method offers a single slot using singular makes more sense


---


[jira] [Commented] (FLINK-4812) Report Watermark metrics in all operators

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16351279#comment-16351279
 ] 

ASF GitHub Bot commented on FLINK-4812:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5155


> Report Watermark metrics in all operators
> -
>
> Key: FLINK-4812
> URL: https://issues.apache.org/jira/browse/FLINK-4812
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.0
>
>
> As reported by a user, Flink does currently not export the current low 
> watermark for sources 
> (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/currentLowWatermark-metric-not-reported-for-all-tasks-td13770.html).
> This JIRA is for adding such a metric for the sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5155: [FLINK-4812][metrics] Expose currentLowWatermark f...

2018-02-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5155


---


[jira] [Closed] (FLINK-4812) Report Watermark metrics in all operators

2018-02-02 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-4812.
---
Resolution: Fixed

master: 92ad53e6a7e4c9df7055aac89b49ae95d50c5b4c

> Report Watermark metrics in all operators
> -
>
> Key: FLINK-4812
> URL: https://issues.apache.org/jira/browse/FLINK-4812
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.0
>
>
> As reported by a user, Flink does currently not export the current low 
> watermark for sources 
> (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/currentLowWatermark-metric-not-reported-for-all-tasks-td13770.html).
> This JIRA is for adding such a metric for the sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5406: [hotfix][docs] Fix typos

2018-02-02 Thread yew1eb
GitHub user yew1eb opened a pull request:

https://github.com/apache/flink/pull/5406

[hotfix][docs] Fix typos

Hotfix some typos in comments.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yew1eb/flink fix_typos

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5406.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5406


commit 05b708e7a417b1337ec91c19b0f66a951635dd1d
Author: zhouhai02 
Date:   2018-02-03T06:08:29Z

[hotfix][docs] Fix typos




---


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16351270#comment-16351270
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai What is the proposed migration test going to assert? The assigner 
does not influence how state is saved and restored. Even when the assigner 
returns invalid index, the modulus will ensure that the shard gets assigned.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...

2018-02-02 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai What is the proposed migration test going to assert? The assigner 
does not influence how state is saved and restored. Even when the assigner 
returns invalid index, the modulus will ensure that the shard gets assigned.


---


[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source

2018-02-02 Thread Hequn Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350548#comment-16350548
 ] 

Hequn Cheng edited comment on FLINK-8545 at 2/3/18 2:33 AM:


Hi, the {{Tuple}} input format is a good choice. It is consistent with the 
{{UpsertStreamTableSink}}.

Yes, upsert stream source should have state so that to generate retract 
messages. As for the problem of deletes on non-inserted keys, if i understand 
correctly, do you mean caching the unordered delete message until the 
corresponding insert message comes? I was thinking the scenario that ordered 
delete messages are ingested. For example, users use flink job to ingest delete 
messages from a source database and output to a target database.

 

 

 


was (Author: hequn8128):
Hi, the {{Tuple}} input format is a good choice. It is consistent with the 
{{UpsertStreamTableSink}}.

Yes, upsert stream source should have state so that to generate retract 
messages. As for the problem of deletes on non-inserted keys, if i understand 
correctly, do you mean caching the unordered delete message until the 
corresponding insert message comes? I was thinking the scenario that only 
delete messages are ingested. For example, users use flink job to ingest delete 
messages from a source database and output to a target database.

 

 

 

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8003) Support named ROW in Flink SQL

2018-02-02 Thread Shuyi Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuyi Chen updated FLINK-8003:
--
Summary: Support named ROW in Flink SQL  (was: Support Calcite's ROW value 
constructor in Flink SQL)

> Support named ROW in Flink SQL
> --
>
> Key: FLINK-8003
> URL: https://issues.apache.org/jira/browse/FLINK-8003
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>
> We want to use the row value constructor and CAST to create a named row, or 
> even nested named row, e.g.
> {code:sql}
> CREATE TYPE myrowtype AS (f1 INTEGER, f2 VARCHAR(10))
> SELECT CAST(ROW(intField, "test") AS myrowtype) AS myRow FROM myTable;
> {code}
> So if converted to JSON, the output will be 
> {code:java}
> {"myRow":{"f0": ${intField}, "f1":"test"}}
> {code}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16351189#comment-16351189
 ] 

ASF GitHub Bot commented on FLINK-7923:
---

Github user suez1224 commented on the issue:

https://github.com/apache/flink/pull/5367
  
@hequn8128 @twalthr could you please take another look? I've refactored the 
code to reuse the generateFieldAccess() code, also added null test. Nested 
tuple wont work now due to https://issues.apache.org/jira/browse/CALCITE-2162. 
I've submitted a PR to fix it in Calcite.


> SQL parser exception when accessing subfields of a Composite element in an 
> Object Array type column
> ---
>
> Key: FLINK-7923
> URL: https://issues.apache.org/jira/browse/FLINK-7923
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>Assignee: Shuyi Chen
>Priority: Major
>
> Access type such as:
> {code:SQL}
> SELECT 
>   a[1].f0 
> FROM 
>   MyTable
> {code}
> will cause problem. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5367: [FLINK-7923][Table API & SQL] Support field access of com...

2018-02-02 Thread suez1224
Github user suez1224 commented on the issue:

https://github.com/apache/flink/pull/5367
  
@hequn8128 @twalthr could you please take another look? I've refactored the 
code to reuse the generateFieldAccess() code, also added null test. Nested 
tuple wont work now due to https://issues.apache.org/jira/browse/CALCITE-2162. 
I've submitted a PR to fix it in Calcite.


---


[jira] [Commented] (FLINK-8477) Add api to support for user to skip the first incomplete window data

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16351172#comment-16351172
 ] 

ASF GitHub Bot commented on FLINK-8477:
---

Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/5405
  
cc @aljoscha please help review this patch.

![image](https://user-images.githubusercontent.com/9486140/35761522-6e00f4b8-08c4-11e8-8063-7ec015802428.png)
see the picture above, when user choose to use without a checkpoint to 
avoid catch up data after a crash , and use kafka#setStartFromLatest to consume 
the latest data. if use without the skip api , we can see that it can  produce 
a broken data which may lead to the alert in monitor Scenario。if user want to 
skip the broken window, can hava a choice to skip serveral window after the 
first fire.



> Add api to support for user to skip the first incomplete window data
> 
>
> Key: FLINK-8477
> URL: https://issues.apache.org/jira/browse/FLINK-8477
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0, 1.3.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
> Fix For: 1.4.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5405: [FLINK-8477][Window]Add api to support user to skip serva...

2018-02-02 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/5405
  
cc @aljoscha please help review this patch.

![image](https://user-images.githubusercontent.com/9486140/35761522-6e00f4b8-08c4-11e8-8063-7ec015802428.png)
see the picture above, when user choose to use without a checkpoint to 
avoid catch up data after a crash , and use kafka#setStartFromLatest to consume 
the latest data. if use without the skip api , we can see that it can  produce 
a broken data which may lead to the alert in monitor Scenario。if user want to 
skip the broken window, can hava a choice to skip serveral window after the 
first fire.



---


[jira] [Commented] (FLINK-8477) Add api to support for user to skip the first incomplete window data

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16351163#comment-16351163
 ] 

ASF GitHub Bot commented on FLINK-8477:
---

GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/5405

[FLINK-8477][Window]Add api to support user to skip serval broken window

In production, some application like monitor type , it need the accuarcy 
data,but in this scenario: if we start a job at 10:45:20s with a 1min window 
aggregate, we may produce a broken data of 10:45min ,so may lead to mistake. We 
can support a user api to choose to skip serveral windows to avoid the broken 
data by user self.

## Brief change log

  - add a streaming api 




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink FLINK-8477

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5405.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5405


commit 9c6b77077bac2e0dfa4ea3bddf11bd27831ba3e4
Author: minwenjun 
Date:   2018-02-02T15:46:11Z

Add api to support user to skip serval broken window




> Add api to support for user to skip the first incomplete window data
> 
>
> Key: FLINK-8477
> URL: https://issues.apache.org/jira/browse/FLINK-8477
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0, 1.3.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
> Fix For: 1.4.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5405: [FLINK-8477][Window]Add api to support user to ski...

2018-02-02 Thread Aitozi
GitHub user Aitozi opened a pull request:

https://github.com/apache/flink/pull/5405

[FLINK-8477][Window]Add api to support user to skip serval broken window

In production, some application like monitor type , it need the accuarcy 
data,but in this scenario: if we start a job at 10:45:20s with a 1min window 
aggregate, we may produce a broken data of 10:45min ,so may lead to mistake. We 
can support a user api to choose to skip serveral windows to avoid the broken 
data by user self.

## Brief change log

  - add a streaming api 




You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Aitozi/flink FLINK-8477

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5405.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5405


commit 9c6b77077bac2e0dfa4ea3bddf11bd27831ba3e4
Author: minwenjun 
Date:   2018-02-02T15:46:11Z

Add api to support user to skip serval broken window




---


[jira] [Commented] (FLINK-7525) Add config option to disable Cancel functionality on UI

2018-02-02 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16351081#comment-16351081
 ] 

Ted Yu commented on FLINK-7525:
---

Looks like activity on FLINK-4319 has stalled.

> Add config option to disable Cancel functionality on UI
> ---
>
> Key: FLINK-7525
> URL: https://issues.apache.org/jira/browse/FLINK-7525
> Project: Flink
>  Issue Type: Improvement
>  Components: Web Client, Webfrontend
>Reporter: Ted Yu
>Priority: Major
>
> In this email thread 
> http://search-hadoop.com/m/Flink/VkLeQlf0QOnc7YA?subj=Security+Control+of+running+Flink+Jobs+on+Flink+UI
>  , Raja was asking for a way to control how users cancel Job(s).
> Robert proposed adding a config option which disables the Cancel 
> functionality.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7588) Document RocksDB tuning for spinning disks

2018-02-02 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-7588:
--
Labels: performance  (was: )

> Document RocksDB tuning for spinning disks
> --
>
> Key: FLINK-7588
> URL: https://issues.apache.org/jira/browse/FLINK-7588
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Ted Yu
>Priority: Major
>  Labels: performance
>
> In docs/ops/state/large_state_tuning.md , it was mentioned that:
> bq. the default configuration is tailored towards SSDs and performs 
> suboptimal on spinning disks
> We should add recommendation targeting spinning disks:
> https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide#difference-of-spinning-disk



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350872#comment-16350872
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on the issue:

https://github.com/apache/flink/pull/5295
  
erf, I see what you mean, as well as the creation of all those Time objects.


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5295: [FLINK-8384] [streaming] Session Window Assigner with Dyn...

2018-02-02 Thread dyanarose
Github user dyanarose commented on the issue:

https://github.com/apache/flink/pull/5295
  
erf, I see what you mean, as well as the creation of all those Time objects.


---


[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source

2018-02-02 Thread Hequn Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350548#comment-16350548
 ] 

Hequn Cheng edited comment on FLINK-8545 at 2/2/18 4:01 PM:


Hi, the {{Tuple}} input format is a good choice. It is consistent with the 
{{UpsertStreamTableSink}}.

Yes, upsert stream source should have state so that to generate retract 
messages. As for the problem of deletes on non-inserted keys, if i understand 
correctly, do you mean caching the unordered delete message until the 
corresponding insert message comes? I was think the scenario that only delete 
messages are ingested. For example, users use flink job to sync delete messages 
from a source database to a target database.

 

 

 


was (Author: hequn8128):
Hi, the {{Tuple}} input format is a good choice. It is consistent with the 
\{{UpsertStreamTableSink}}.

Yes, upsert stream source should have state so that to generate retract 
messages. As for the problem of deletes on non-inserted keys, if i understand 
correctly, do you mean caching the unordered delete message until the 
corresponding insert message comes? I was think the scenario that only delete 
messages are ingested. 

 

 

 

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source

2018-02-02 Thread Hequn Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350548#comment-16350548
 ] 

Hequn Cheng edited comment on FLINK-8545 at 2/2/18 4:09 PM:


Hi, the {{Tuple}} input format is a good choice. It is consistent with the 
{{UpsertStreamTableSink}}.

Yes, upsert stream source should have state so that to generate retract 
messages. As for the problem of deletes on non-inserted keys, if i understand 
correctly, do you mean caching the unordered delete message until the 
corresponding insert message comes? I was thinking the scenario that only 
delete messages are ingested. For example, users use flink job to ingest delete 
messages from a source database and output to a target database.

 

 

 


was (Author: hequn8128):
Hi, the {{Tuple}} input format is a good choice. It is consistent with the 
{{UpsertStreamTableSink}}.

Yes, upsert stream source should have state so that to generate retract 
messages. As for the problem of deletes on non-inserted keys, if i understand 
correctly, do you mean caching the unordered delete message until the 
corresponding insert message comes? I was think the scenario that only delete 
messages are ingested. For example, users use flink job to ingest delete 
messages from a source database and output to a target database.

 

 

 

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source

2018-02-02 Thread Hequn Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350548#comment-16350548
 ] 

Hequn Cheng edited comment on FLINK-8545 at 2/2/18 4:08 PM:


Hi, the {{Tuple}} input format is a good choice. It is consistent with the 
{{UpsertStreamTableSink}}.

Yes, upsert stream source should have state so that to generate retract 
messages. As for the problem of deletes on non-inserted keys, if i understand 
correctly, do you mean caching the unordered delete message until the 
corresponding insert message comes? I was think the scenario that only delete 
messages are ingested. For example, users use flink job to ingest delete 
messages from a source database and output to a target database.

 

 

 


was (Author: hequn8128):
Hi, the {{Tuple}} input format is a good choice. It is consistent with the 
{{UpsertStreamTableSink}}.

Yes, upsert stream source should have state so that to generate retract 
messages. As for the problem of deletes on non-inserted keys, if i understand 
correctly, do you mean caching the unordered delete message until the 
corresponding insert message comes? I was think the scenario that only delete 
messages are ingested. For example, users use flink job to sync delete messages 
from a source database to a target database.

 

 

 

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8545) Implement upsert stream table source

2018-02-02 Thread Hequn Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350548#comment-16350548
 ] 

Hequn Cheng commented on FLINK-8545:


Hi, the {{Tuple}} input format is a good choice. It is consistent with the 
\{{UpsertStreamTableSink}}.

Yes, upsert stream source should have state so that to generate retract 
messages. As for the problem of deletes on non-inserted keys, if i understand 
correctly, do you mean caching the unordered delete message until the 
corresponding insert message comes? I was think the scenario that only delete 
messages are ingested. 

 

 

 

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-4812) Report Watermark metrics in all operators

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350488#comment-16350488
 ] 

ASF GitHub Bot commented on FLINK-4812:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5155
  
merging.


> Report Watermark metrics in all operators
> -
>
> Key: FLINK-4812
> URL: https://issues.apache.org/jira/browse/FLINK-4812
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.0
>
>
> As reported by a user, Flink does currently not export the current low 
> watermark for sources 
> (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/currentLowWatermark-metric-not-reported-for-all-tasks-td13770.html).
> This JIRA is for adding such a metric for the sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5155: [FLINK-4812][metrics] Expose currentLowWatermark for all ...

2018-02-02 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5155
  
merging.


---


[jira] [Commented] (FLINK-6109) Add "consumer lag" report metric to FlinkKafkaConsumer

2018-02-02 Thread aitozi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350468#comment-16350468
 ] 

aitozi commented on FLINK-6109:
---

[~tzulitai] you mentioned that the calculate of the lag is overwhelming, but we 
can fetch the kafka lag after kafka version 0.10.2 and dont need to calculate , 
as i mentioned in https://github.com/apache/flink/pull/4935

> Add "consumer lag" report metric to FlinkKafkaConsumer
> --
>
> Key: FLINK-6109
> URL: https://issues.apache.org/jira/browse/FLINK-6109
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: aitozi
>Priority: Major
>
> This is a feature discussed in this ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Telling-if-a-job-has-caught-up-with-Kafka-td12261.html.
> As discussed, we can expose two kinds of "consumer lag" metrics for this:
>  - *current consumer lag per partition:* the current difference between the 
> latest offset and the last collected record. This metric is calculated and 
> updated at a configurable interval. This metric basically serves as an 
> indicator of how the consumer is keeping up with the head of partitions. I 
> propose to name this {{currentOffsetLag}}.
>  - *Consumer lag of last checkpoint per partition:* the difference between 
> the latest offset and the offset stored in the checkpoint. This metric is 
> only updated when checkpoints are completed. It serves as an indicator of how 
> much data may need to be replayed in case of a failure. I propose to name 
> this {{lastCheckpointedOffsetLag}}.
> I don't think it is reasonable to define a metric of whether or not a 
> consumer has "caught up" with the HEAD. That would imply a threshold for the 
> offset difference. We should probably leave this "caught up" logic for the 
> user to determine themselves when they query this metric.
> The granularity of the metric is per-FlinkKafkaConsumer, and independent of 
> the consumer group.id used (the offset used to calculate consumer lag is the 
> internal offset state of the FlinkKafkaConsumer, not the consumer group's 
> committed offsets in Kafka).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8489) Data is not emitted by second ElasticSearch connector

2018-02-02 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8489.
---
   Resolution: Fixed
Fix Version/s: 1.4.1
   1.5.0

1.4: dfa050c01adc559a3ed4df4c2c3273903a37ff79

User confirmed using separate configs resolved the issue.

> Data is not emitted by second ElasticSearch connector
> -
>
> Key: FLINK-8489
> URL: https://issues.apache.org/jira/browse/FLINK-8489
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Fabian Hueske
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> A user reported [this 
> issue|https://lists.apache.org/thread.html/e91c71beb45d6df879effa16c52f2c71aa6ef1a54ef0a8ac4ccc72ee@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list.
> *Setup:*
>  * A program with two pipelines that write to ElasticSearch. The pipelines 
> can be connected or completely separate.
>  * ElasticSearch 5.6.4, connector {{flink-connector-elasticsearch5_2.11}}
> *Problem:*
>  Only one of the ES connectors correctly emits data. The other connector 
> writes a single record and then stops emitting data (or does not write any 
> data at all). The problem does not exist, if the second ES connector is 
> replaced by a different connector (for example Cassandra).
> Below is a program to reproduce the issue:
> {code:java}
> public class ElasticSearchTest1 {
>   public static void main(String[] args) throws Exception {
>   
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   
>   // set elasticsearch connection details 
>   Map config = new HashMap<>();
>   config.put("bulk.flush.max.actions", "1");
>   config.put("cluster.name", "");
>   List transports = new ArrayList<>(); 
>   transports.add(new 
> InetSocketAddress(InetAddress.getByName(""), 9300));
>   
>   //Set properties for Kafka Streaming
>   Properties properties = new Properties();
>   properties.setProperty("bootstrap.servers", " ip>"+":9092");
>   properties.setProperty("group.id", "testGroup");
>   properties.setProperty("auto.offset.reset", "latest");  
>   
>   //Create consumer for log records
>   
>   FlinkKafkaConsumer011 inputConsumer1 = new 
> FlinkKafkaConsumer011<>("elastic_test1", new JSONDeserializationSchema(), 
> properties);
>   
>   DataStream firstStream = env
>   .addSource(inputConsumer1)
>   .flatMap(new CreateRecordOne());
>   
>   firstStream 
>   .addSink(new ElasticsearchSink(config, 
>   transports, 
>   new 
> ElasticSearchOutputRecord("elastic_test_index1","elastic_test_index1")));
>   
>   FlinkKafkaConsumer011 inputConsumer2 = new 
> FlinkKafkaConsumer011<>("elastic_test2", new JSONDeserializationSchema(), 
> properties);
>   
>   DataStream secondStream = env
>   .addSource(inputConsumer2)  
>   .flatMap(new CreateRecordTwo());
>   
>   secondStream
>   .addSink(new ElasticsearchSink(config, 
>   transports, 
>   new 
> ElasticSearchOutputRecord2("elastic_test_index2","elastic_test_index2")));
>   
>   env.execute("Elastic Search Test");
>   }
> }
> public class ElasticSearchOutputRecord implements 
> ElasticsearchSinkFunction {
>   String index;
>   String type;
> // Initialize filter function
> public ElasticSearchOutputRecord(String index, String type) {
> this.index = index;
> this.type = type;
> }
>   // construct index request
>   @Override
>   public void process(
>   RecordOne record,
>   RuntimeContext ctx,
>   RequestIndexer indexer) {
>   // construct JSON document to index
>   Map json = new HashMap<>();
>   
>   json.put("item_one", record.item1);  
>   json.put("item_two", record.item2);  
>   
>   IndexRequest rqst = Requests.indexRequest()
>   

[jira] [Commented] (FLINK-7856) Port JobVertexBackPressureHandler to REST endpoint

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350465#comment-16350465
 ] 

ASF GitHub Bot commented on FLINK-7856:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r165666773
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
 ---
@@ -75,6 +76,17 @@ public void 
setDisconnectJobManagerConsumer(Consumer> d
return CompletableFuture.completedFuture(Acknowledge.get());
}
 
+   @Override
+   public CompletableFuture 
requestStackTraceSample(
--- End diff --

or maybe not


> Port JobVertexBackPressureHandler to REST endpoint
> --
>
> Key: FLINK-7856
> URL: https://issues.apache.org/jira/browse/FLINK-7856
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Gary Yao
>Priority: Major
>
> Port JobVertexBackPressureHandler to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5397: [FLINK-7856][flip6] WIP JobVertexBackPressureHandl...

2018-02-02 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5397#discussion_r165666773
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
 ---
@@ -75,6 +76,17 @@ public void 
setDisconnectJobManagerConsumer(Consumer> d
return CompletableFuture.completedFuture(Acknowledge.get());
}
 
+   @Override
+   public CompletableFuture 
requestStackTraceSample(
--- End diff --

or maybe not


---


[jira] [Updated] (FLINK-8489) Data is not emitted by second ElasticSearch connector

2018-02-02 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8489?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8489:

Affects Version/s: 1.5.0

> Data is not emitted by second ElasticSearch connector
> -
>
> Key: FLINK-8489
> URL: https://issues.apache.org/jira/browse/FLINK-8489
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Fabian Hueske
>Assignee: Chesnay Schepler
>Priority: Critical
>
> A user reported [this 
> issue|https://lists.apache.org/thread.html/e91c71beb45d6df879effa16c52f2c71aa6ef1a54ef0a8ac4ccc72ee@%3Cuser.flink.apache.org%3E]
>  on the user@f.a.o mailing list.
> *Setup:*
>  * A program with two pipelines that write to ElasticSearch. The pipelines 
> can be connected or completely separate.
>  * ElasticSearch 5.6.4, connector {{flink-connector-elasticsearch5_2.11}}
> *Problem:*
>  Only one of the ES connectors correctly emits data. The other connector 
> writes a single record and then stops emitting data (or does not write any 
> data at all). The problem does not exist, if the second ES connector is 
> replaced by a different connector (for example Cassandra).
> Below is a program to reproduce the issue:
> {code:java}
> public class ElasticSearchTest1 {
>   public static void main(String[] args) throws Exception {
>   
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   
>   // set elasticsearch connection details 
>   Map config = new HashMap<>();
>   config.put("bulk.flush.max.actions", "1");
>   config.put("cluster.name", "");
>   List transports = new ArrayList<>(); 
>   transports.add(new 
> InetSocketAddress(InetAddress.getByName(""), 9300));
>   
>   //Set properties for Kafka Streaming
>   Properties properties = new Properties();
>   properties.setProperty("bootstrap.servers", " ip>"+":9092");
>   properties.setProperty("group.id", "testGroup");
>   properties.setProperty("auto.offset.reset", "latest");  
>   
>   //Create consumer for log records
>   
>   FlinkKafkaConsumer011 inputConsumer1 = new 
> FlinkKafkaConsumer011<>("elastic_test1", new JSONDeserializationSchema(), 
> properties);
>   
>   DataStream firstStream = env
>   .addSource(inputConsumer1)
>   .flatMap(new CreateRecordOne());
>   
>   firstStream 
>   .addSink(new ElasticsearchSink(config, 
>   transports, 
>   new 
> ElasticSearchOutputRecord("elastic_test_index1","elastic_test_index1")));
>   
>   FlinkKafkaConsumer011 inputConsumer2 = new 
> FlinkKafkaConsumer011<>("elastic_test2", new JSONDeserializationSchema(), 
> properties);
>   
>   DataStream secondStream = env
>   .addSource(inputConsumer2)  
>   .flatMap(new CreateRecordTwo());
>   
>   secondStream
>   .addSink(new ElasticsearchSink(config, 
>   transports, 
>   new 
> ElasticSearchOutputRecord2("elastic_test_index2","elastic_test_index2")));
>   
>   env.execute("Elastic Search Test");
>   }
> }
> public class ElasticSearchOutputRecord implements 
> ElasticsearchSinkFunction {
>   String index;
>   String type;
> // Initialize filter function
> public ElasticSearchOutputRecord(String index, String type) {
> this.index = index;
> this.type = type;
> }
>   // construct index request
>   @Override
>   public void process(
>   RecordOne record,
>   RuntimeContext ctx,
>   RequestIndexer indexer) {
>   // construct JSON document to index
>   Map json = new HashMap<>();
>   
>   json.put("item_one", record.item1);  
>   json.put("item_two", record.item2);  
>   
>   IndexRequest rqst = Requests.indexRequest()
>   .index(index)   // index name
>   .type(type) // mapping name
>   .source(json);
>   indexer.add(rqst);
>   

[jira] [Commented] (FLINK-8275) Flink YARN deployment with Kerberos enabled not working

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350431#comment-16350431
 ] 

ASF GitHub Bot commented on FLINK-8275:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5172#discussion_r165635517
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java 
---
@@ -170,9 +160,10 @@ protected int run(String[] args) {
 
final Configuration flinkConfig = 
createConfiguration(currDir, dynamicProperties);
 
-   // set keytab principal and replace path with the local 
path of the shipped keytab file in NodeManager
-   if (keytabPath != null && remoteKeytabPrincipal != 
null) {
-   
flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+   File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+   if (remoteKeytabPrincipal != null && f.exists()) {
--- End diff --

Can we re-add the debug log `LOG.debug("keytabPath: {}", keytabPath);` to 
provide more visibility to this?


> Flink YARN deployment with Kerberos enabled not working 
> 
>
> Key: FLINK-8275
> URL: https://issues.apache.org/jira/browse/FLINK-8275
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Affects Versions: 1.4.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> The local keytab path in YarnTaskManagerRunner is incorrectly set to the 
> ApplicationMaster's local keytab path. This causes jobs to fail because the 
> TaskManager can't read the keytab.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5172: [FLINK-8275] [Security] fix keytab local path in Y...

2018-02-02 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5172#discussion_r165636080
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java ---
@@ -142,19 +153,10 @@ public static void runYarnTaskManager(String[] args, 
final Class() {
-   @Override
-   public Integer call() {
-   try {
-   
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, 
taskManager);
-   }
-   catch (Throwable t) {
-   LOG.error("Error while starting 
the TaskManager", t);
-   
System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
-   }
-   return null;
-   }
-   });
+   if (mainRunner == null) {
--- End diff --

Not sure about this.
This is basically adding a non-production code relevant path in for testing 
purposes (i.e., it is only ever non-null in the `YarnTaskManagerRunnerTest`).

I think it would be better if we have a `protected createMainRunner(...)` 
method that can be overriden to inject the mock runner dependency for testing.


---


[jira] [Commented] (FLINK-8275) Flink YARN deployment with Kerberos enabled not working

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350438#comment-16350438
 ] 

ASF GitHub Bot commented on FLINK-8275:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5172
  
@suez1224 Thanks a lot for the contribution!
I've had a look and the changes LGTM. I did have a comment regarding 
injecting a dependency for the runner, which I've added a commit for.

Can you take a look at 2ffa659 before I actually merge this, and let me 
know what you think? Thanks!
Will make sure this gets in for Flink 1.4.1 ..


> Flink YARN deployment with Kerberos enabled not working 
> 
>
> Key: FLINK-8275
> URL: https://issues.apache.org/jira/browse/FLINK-8275
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Affects Versions: 1.4.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> The local keytab path in YarnTaskManagerRunner is incorrectly set to the 
> ApplicationMaster's local keytab path. This causes jobs to fail because the 
> TaskManager can't read the keytab.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5172: [FLINK-8275] [Security] fix keytab local path in YarnTask...

2018-02-02 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5172
  
@suez1224 Thanks a lot for the contribution!
I've had a look and the changes LGTM. I did have a comment regarding 
injecting a dependency for the runner, which I've added a commit for.

Can you take a look at 2ffa659 before I actually merge this, and let me 
know what you think? Thanks!
Will make sure this gets in for Flink 1.4.1 ..


---


[jira] [Commented] (FLINK-6109) Add "consumer lag" report metric to FlinkKafkaConsumer

2018-02-02 Thread aitozi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350437#comment-16350437
 ] 

aitozi commented on FLINK-6109:
---

OK, i will see the pr later .

> Add "consumer lag" report metric to FlinkKafkaConsumer
> --
>
> Key: FLINK-6109
> URL: https://issues.apache.org/jira/browse/FLINK-6109
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector, Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: aitozi
>Priority: Major
>
> This is a feature discussed in this ML: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Telling-if-a-job-has-caught-up-with-Kafka-td12261.html.
> As discussed, we can expose two kinds of "consumer lag" metrics for this:
>  - *current consumer lag per partition:* the current difference between the 
> latest offset and the last collected record. This metric is calculated and 
> updated at a configurable interval. This metric basically serves as an 
> indicator of how the consumer is keeping up with the head of partitions. I 
> propose to name this {{currentOffsetLag}}.
>  - *Consumer lag of last checkpoint per partition:* the difference between 
> the latest offset and the offset stored in the checkpoint. This metric is 
> only updated when checkpoints are completed. It serves as an indicator of how 
> much data may need to be replayed in case of a failure. I propose to name 
> this {{lastCheckpointedOffsetLag}}.
> I don't think it is reasonable to define a metric of whether or not a 
> consumer has "caught up" with the HEAD. That would imply a threshold for the 
> offset difference. We should probably leave this "caught up" logic for the 
> user to determine themselves when they query this metric.
> The granularity of the metric is per-FlinkKafkaConsumer, and independent of 
> the consumer group.id used (the offset used to calculate consumer lag is the 
> internal offset state of the FlinkKafkaConsumer, not the consumer group's 
> committed offsets in Kafka).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8275) Flink YARN deployment with Kerberos enabled not working

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350430#comment-16350430
 ] 

ASF GitHub Bot commented on FLINK-8275:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5172#discussion_r165636080
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java ---
@@ -142,19 +153,10 @@ public static void runYarnTaskManager(String[] args, 
final Class() {
-   @Override
-   public Integer call() {
-   try {
-   
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, 
taskManager);
-   }
-   catch (Throwable t) {
-   LOG.error("Error while starting 
the TaskManager", t);
-   
System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
-   }
-   return null;
-   }
-   });
+   if (mainRunner == null) {
--- End diff --

Not sure about this.
This is basically adding a non-production code relevant path in for testing 
purposes (i.e., it is only ever non-null in the `YarnTaskManagerRunnerTest`).

I think it would be better if we have a `protected createMainRunner(...)` 
method that can be overriden to inject the mock runner dependency for testing.


> Flink YARN deployment with Kerberos enabled not working 
> 
>
> Key: FLINK-8275
> URL: https://issues.apache.org/jira/browse/FLINK-8275
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Affects Versions: 1.4.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> The local keytab path in YarnTaskManagerRunner is incorrectly set to the 
> ApplicationMaster's local keytab path. This causes jobs to fail because the 
> TaskManager can't read the keytab.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8275) Flink YARN deployment with Kerberos enabled not working

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350432#comment-16350432
 ] 

ASF GitHub Bot commented on FLINK-8275:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5172#discussion_r165663546
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java ---
@@ -142,19 +153,10 @@ public static void runYarnTaskManager(String[] args, 
final Class() {
-   @Override
-   public Integer call() {
-   try {
-   
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, 
taskManager);
-   }
-   catch (Throwable t) {
-   LOG.error("Error while starting 
the TaskManager", t);
-   
System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
-   }
-   return null;
-   }
-   });
+   if (mainRunner == null) {
--- End diff --

I don't think this is a good idea, to add a code path in production code 
that exists only for the purpose of injecting a mock runner dependency in tests.

I've added another commit upon @suez1224's changes that makes 
`YarnTaskManagerRunner` a factory-like class, that creates a `Runner` 
containing all the final configurations. The unit test can then test against 
that configuration.


> Flink YARN deployment with Kerberos enabled not working 
> 
>
> Key: FLINK-8275
> URL: https://issues.apache.org/jira/browse/FLINK-8275
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Affects Versions: 1.4.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> The local keytab path in YarnTaskManagerRunner is incorrectly set to the 
> ApplicationMaster's local keytab path. This causes jobs to fail because the 
> TaskManager can't read the keytab.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5172: [FLINK-8275] [Security] fix keytab local path in Y...

2018-02-02 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5172#discussion_r165635517
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java 
---
@@ -170,9 +160,10 @@ protected int run(String[] args) {
 
final Configuration flinkConfig = 
createConfiguration(currDir, dynamicProperties);
 
-   // set keytab principal and replace path with the local 
path of the shipped keytab file in NodeManager
-   if (keytabPath != null && remoteKeytabPrincipal != 
null) {
-   
flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+   File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+   if (remoteKeytabPrincipal != null && f.exists()) {
--- End diff --

Can we re-add the debug log `LOG.debug("keytabPath: {}", keytabPath);` to 
provide more visibility to this?


---


[GitHub] flink pull request #5172: [FLINK-8275] [Security] fix keytab local path in Y...

2018-02-02 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5172#discussion_r165663546
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java ---
@@ -142,19 +153,10 @@ public static void runYarnTaskManager(String[] args, 
final Class() {
-   @Override
-   public Integer call() {
-   try {
-   
TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, resourceId, 
taskManager);
-   }
-   catch (Throwable t) {
-   LOG.error("Error while starting 
the TaskManager", t);
-   
System.exit(TaskManager.STARTUP_FAILURE_RETURN_CODE());
-   }
-   return null;
-   }
-   });
+   if (mainRunner == null) {
--- End diff --

I don't think this is a good idea, to add a code path in production code 
that exists only for the purpose of injecting a mock runner dependency in tests.

I've added another commit upon @suez1224's changes that makes 
`YarnTaskManagerRunner` a factory-like class, that creates a `Runner` 
containing all the final configurations. The unit test can then test against 
that configuration.


---


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350425#comment-16350425
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5230
  
@aljoscha I addressed your comments. Please have a look.


> Iterate over keyed state on broadcast side of connect with broadcast.
> -
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5230: [FLINK-8345] Add iterator of keyed state on broadcast sid...

2018-02-02 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5230
  
@aljoscha I addressed your comments. Please have a look.


---


[jira] [Commented] (FLINK-8477) Add api to support for user to skip the first incomplete window data

2018-02-02 Thread aitozi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350403#comment-16350403
 ] 

aitozi commented on FLINK-8477:
---

[~aljoscha]In production, some application like monitor type , it need the 
accuarcy data,but in this scenario, if we start a job at 10:45:20s with a 1min 
window aggregate, we may produce a broken data of 10:45min ,so may lead to 
mistake. We can support a user api to choose to skip serveral windows to avoid 
the broken data by user self.

> Add api to support for user to skip the first incomplete window data
> 
>
> Key: FLINK-8477
> URL: https://issues.apache.org/jira/browse/FLINK-8477
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0, 1.3.2
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
> Fix For: 1.4.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8364) Add iterator() to ListState which returns empty iterator when it has no value

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350371#comment-16350371
 ] 

ASF GitHub Bot commented on FLINK-8364:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5356
  
I still do not believe that this is a significant improvement, and may even 
increase confusion: Now you have two slightly different ways to do almost the 
same thing, in one case you need to take care of `null` and in the other case 
not. It is always nicer to keep the core user interfaces slim. I would prefer 
to not include this change, and remember that this should be fixed if we come 
to the point of a API-breaking release.


> Add iterator() to ListState which returns empty iterator when it has no value
> -
>
> Key: FLINK-8364
> URL: https://issues.apache.org/jira/browse/FLINK-8364
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Add iterator() to ListState which returns empty iterator when it has no value



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5356: [FLINK-8364][state backend] Add iterator() to ListState w...

2018-02-02 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5356
  
I still do not believe that this is a significant improvement, and may even 
increase confusion: Now you have two slightly different ways to do almost the 
same thing, in one case you need to take care of `null` and in the other case 
not. It is always nicer to keep the core user interfaces slim. I would prefer 
to not include this change, and remember that this should be fixed if we come 
to the point of a API-breaking release.


---


[GitHub] flink issue #5357: [hotfix][JobGraph] Eliminate the conditions of parallelis...

2018-02-02 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5357
  
In this case, could you please close the PR? (we cannot easily do that)


---


[jira] [Commented] (FLINK-8357) enable rolling in default log settings

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350363#comment-16350363
 ] 

ASF GitHub Bot commented on FLINK-8357:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5371
  
Thanks @StephanEwen  review. Yea, You correct. DailyRollingFileAppender 
doesn't support MaxFileSize, RollingFileAppender does. I will changed the code 
to a correct version and wait for the CI ending.


> enable rolling in default log settings
> --
>
> Key: FLINK-8357
> URL: https://issues.apache.org/jira/browse/FLINK-8357
> Project: Flink
>  Issue Type: Improvement
>  Components: Logging
>Reporter: Xu Mingmin
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.5.0
>
>
> The release packages uses {{org.apache.log4j.FileAppender}} for log4j and 
> {{ch.qos.logback.core.FileAppender}} for logback, which could results in very 
> large log files. 
> For most cases, if not all, we need to enable rotation in a production 
> cluster, and I suppose it's a good idea to make rotation as default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5371: [FLINK-8357] [conf] Enable rolling in default log setting...

2018-02-02 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5371
  
Thanks @StephanEwen  review. Yea, You correct. DailyRollingFileAppender 
doesn't support MaxFileSize, RollingFileAppender does. I will changed the code 
to a correct version and wait for the CI ending.


---


[jira] [Commented] (FLINK-8550) Iterate over entryset instead of keys

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350339#comment-16350339
 ] 

ASF GitHub Bot commented on FLINK-8550:
---

GitHub user hequn8128 opened a pull request:

https://github.com/apache/flink/pull/5404

[FLINK-8550][table] Iterate over entryset instead of keys


## What is the purpose of the change

Iterate over entrysets instead of keys when we want to get both key and 
value. I went over the code in flink. Luckily, there are not many places we 
need to optimize.  


## Brief change log

  - Iterate over entrysets instead of keys when we want to get both key and 
value


## Verifying this change

This change is already covered by existing tests, such as 
`OverWindowHarnessTest` for changes in `ProcTimeBoundedRangeOver`
`OverWindowITCase` for changes in `RowTimeBoundedRangeOver`
`GroupWindowITCase` for changes in `JavaUserDefinedAggFunctions`
`CorrelateITCase` for changes in `UserDefinedTableFunctions`


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hequn8128/flink 8550

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5404.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5404


commit 24a44c4c160e11f85d422b1acdfba20ea10a1a45
Author: hequn8128 
Date:   2018-02-02T09:33:50Z

[FLINK-8550][table] Iterate over entryset instead of keys




> Iterate over entryset instead of keys
> -
>
> Key: FLINK-8550
> URL: https://issues.apache.org/jira/browse/FLINK-8550
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Iterate over entrysets instead of keys when we want to get both key and 
> value. 
> For example, in \{{ProcTimeBoundedRangeOver}}:
> {code:java}
> // code placeholder
> val iter = rowMapState.keys.iterator
> val markToRemove = new ArrayList[Long]()
> while (iter.hasNext) {
> val elementKey = iter.next
> if (elementKey < limit) {
> ...
> val elementsRemove = rowMapState.get(elementKey)
> ...
> }
> }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8421) HeapInternalTimerService should reconfigure compatible key / namespace serializers on restore

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350342#comment-16350342
 ] 

ASF GitHub Bot commented on FLINK-8421:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5362#discussion_r165646464
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
 ---
@@ -52,7 +52,7 @@
 
private final ProcessingTimeService processingTimeService;
 
-   private final Map> timerServices;
+   public final Map> timerServices;
--- End diff --

no, this is a mistake that should be reverted.


> HeapInternalTimerService should reconfigure compatible key / namespace 
> serializers on restore
> -
>
> Key: FLINK-8421
> URL: https://issues.apache.org/jira/browse/FLINK-8421
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> The {{HeapInternalTimerService}} still uses simple {{equals}} checks on 
> restored / newly provided serializers for compatibility checks. This should 
> be replaced with the {{TypeSerializer::ensureCompatibility}} checks instead, 
> so that new serializers can be reconfigured.
> This would entail that the {{TypeSerializerConfiguration}} of the key and 
> namespace serializer in the {{HeapInternalTimerService}} also needs to be 
> written to the raw state.
> For Flink 1.4.0 release and current master, this is a critical bug since the 
> {{KryoSerializer}} has different default base registrations than before due 
> to FLINK-7420. i.e if the key of a window is serialized using the 
> {{KryoSerializer}} in 1.3.x, the restore would never succeed in 1.4.0.
> For 1.3.x, this fix would be an improvement, such that the 
> {{HeapInternalTimerService}} restore will make use of serializer 
> reconfiguration.
> Other remarks:
> * We need to double check all operators that checkpoint / restore from 
> **raw** state. Apparently, the serializer compatibility checks were only 
> implemented for managed state.
> * Migration ITCases apparently do not have enough coverage. A migration test 
> job that uses a key type which required the {{KryoSerializer}}, and uses 
> windows, would have caught this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5362: [FLINK-8421] [DataStream] Make timer serializers r...

2018-02-02 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5362#discussion_r165646464
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
 ---
@@ -52,7 +52,7 @@
 
private final ProcessingTimeService processingTimeService;
 
-   private final Map> timerServices;
+   public final Map> timerServices;
--- End diff --

no, this is a mistake that should be reverted.


---


[GitHub] flink pull request #5404: [FLINK-8550][table] Iterate over entryset instead ...

2018-02-02 Thread hequn8128
GitHub user hequn8128 opened a pull request:

https://github.com/apache/flink/pull/5404

[FLINK-8550][table] Iterate over entryset instead of keys


## What is the purpose of the change

Iterate over entrysets instead of keys when we want to get both key and 
value. I went over the code in flink. Luckily, there are not many places we 
need to optimize.  


## Brief change log

  - Iterate over entrysets instead of keys when we want to get both key and 
value


## Verifying this change

This change is already covered by existing tests, such as 
`OverWindowHarnessTest` for changes in `ProcTimeBoundedRangeOver`
`OverWindowITCase` for changes in `RowTimeBoundedRangeOver`
`GroupWindowITCase` for changes in `JavaUserDefinedAggFunctions`
`CorrelateITCase` for changes in `UserDefinedTableFunctions`


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hequn8128/flink 8550

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5404.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5404


commit 24a44c4c160e11f85d422b1acdfba20ea10a1a45
Author: hequn8128 
Date:   2018-02-02T09:33:50Z

[FLINK-8550][table] Iterate over entryset instead of keys




---


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350278#comment-16350278
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5295
  
I get that logic, but the existing `TimestampAssigner` also returns a 
`long` and if we return `Time` we always have to wrap/unwrap that long. What do 
you think?


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5295: [FLINK-8384] [streaming] Session Window Assigner with Dyn...

2018-02-02 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5295
  
I get that logic, but the existing `TimestampAssigner` also returns a 
`long` and if we return `Time` we always have to wrap/unwrap that long. What do 
you think?


---


[jira] [Commented] (FLINK-8421) HeapInternalTimerService should reconfigure compatible key / namespace serializers on restore

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350259#comment-16350259
 ] 

ASF GitHub Bot commented on FLINK-8421:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5362#discussion_r165636384
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
 ---
@@ -52,7 +52,7 @@
 
private final ProcessingTimeService processingTimeService;
 
-   private final Map> timerServices;
+   public final Map> timerServices;
--- End diff --

Is this only public for testing?


> HeapInternalTimerService should reconfigure compatible key / namespace 
> serializers on restore
> -
>
> Key: FLINK-8421
> URL: https://issues.apache.org/jira/browse/FLINK-8421
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> The {{HeapInternalTimerService}} still uses simple {{equals}} checks on 
> restored / newly provided serializers for compatibility checks. This should 
> be replaced with the {{TypeSerializer::ensureCompatibility}} checks instead, 
> so that new serializers can be reconfigured.
> This would entail that the {{TypeSerializerConfiguration}} of the key and 
> namespace serializer in the {{HeapInternalTimerService}} also needs to be 
> written to the raw state.
> For Flink 1.4.0 release and current master, this is a critical bug since the 
> {{KryoSerializer}} has different default base registrations than before due 
> to FLINK-7420. i.e if the key of a window is serialized using the 
> {{KryoSerializer}} in 1.3.x, the restore would never succeed in 1.4.0.
> For 1.3.x, this fix would be an improvement, such that the 
> {{HeapInternalTimerService}} restore will make use of serializer 
> reconfiguration.
> Other remarks:
> * We need to double check all operators that checkpoint / restore from 
> **raw** state. Apparently, the serializer compatibility checks were only 
> implemented for managed state.
> * Migration ITCases apparently do not have enough coverage. A migration test 
> job that uses a key type which required the {{KryoSerializer}}, and uses 
> windows, would have caught this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5362: [FLINK-8421] [DataStream] Make timer serializers r...

2018-02-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5362#discussion_r165636384
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
 ---
@@ -52,7 +52,7 @@
 
private final ProcessingTimeService processingTimeService;
 
-   private final Map> timerServices;
+   public final Map> timerServices;
--- End diff --

Is this only public for testing?


---


[GitHub] flink pull request #5403: [WIP] Reschedule failed tasks to previous allocati...

2018-02-02 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/5403

[WIP] Reschedule failed tasks to previous allocation (if possible).

This PR is a preview for early feedback on scheduler changes that make 
allocations sticky under failures for local recovery.

Core idea is that we consider previous allocations in our scheduling and 
all tasks obey to the following rule:

If there was a previous allocation, try to find the same slot again or 
request a new slot, that cannot be owned by another task. We do this to prevent 
task that cannot find their previous slot (e.g. machine failure) from stealing 
the previous slot from another failed task, that could otherwise recover 
locally.

`SlotProfile` specifies now all requirements for a slot and a matcher is 
used to identify the right candidate.

CC @tillrohrmann 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink 
task-local-recovery-scheduler-wip

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5403.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5403


commit 4dd57b17f24e7611ba0b68f90a6c1593cd496225
Author: Stefan Richter 
Date:   2018-02-01T15:02:28Z

[WIP] Reschedule failed tasks to previous allocation (if possible).




---


[GitHub] flink pull request #5398: [hotfix][cep] Remove migration from 1.5 test

2018-02-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5398


---


[jira] [Commented] (FLINK-8308) Update yajl-ruby dependency to 1.3.1 or higher

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350108#comment-16350108
 ] 

ASF GitHub Bot commented on FLINK-8308:
---

Github user uce commented on the issue:

https://github.com/apache/flink/pull/5395
  
https://issues.apache.org/jira/browse/INFRA-15959

`ruby2.3.1` has been installed on the buildbot. I will check whether the 
build works as expected and report back here (probably only by next week).



> Update yajl-ruby dependency to 1.3.1 or higher
> --
>
> Key: FLINK-8308
> URL: https://issues.apache.org/jira/browse/FLINK-8308
> Project: Flink
>  Issue Type: Task
>  Components: Project Website
>Reporter: Fabian Hueske
>Assignee: Steven Langbroek
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> We got notified that yajl-ruby < 1.3.1, a dependency which is used to build 
> the Flink website, has a  security vulnerability of high severity.
> We should update yajl-ruby to 1.3.1 or higher.
> Since the website is built offline and served as static HTML, I don't think 
> this is a super critical issue (please correct me if I'm wrong), but we 
> should resolve this soon.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...

2018-02-02 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/5395
  
https://issues.apache.org/jira/browse/INFRA-15959

`ruby2.3.1` has been installed on the buildbot. I will check whether the 
build works as expected and report back here (probably only by next week).



---


[jira] [Commented] (FLINK-8549) Move TimerServiceOptions to TaskManagerOptions

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350209#comment-16350209
 ] 

ASF GitHub Bot commented on FLINK-8549:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5402
  
Thanks for the heads up - nice to know that the generator also supports 
different projects now.

I am still leaning towards making this change, because all other shutdown 
options are already in the `TaskManagerOptions`, and this class just felt out 
of place in `flink-streaming-java`.


> Move TimerServiceOptions to TaskManagerOptions
> --
>
> Key: FLINK-8549
> URL: https://issues.apache.org/jira/browse/FLINK-8549
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0
>
>
> The {{TimerServiceOptions}} are in the wrong place (prohibit generation of 
> config docs) and cause over-fragmentation of the options in the code base.
> I propose to simple move the one option from that class to the 
> {{TaskManagerOptions}}, as it relates to task execution. Other shutdown 
> related options are in there already.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5402: [FLINK-8549] [config] Move TimerServiceOptions into TaskM...

2018-02-02 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5402
  
Thanks for the heads up - nice to know that the generator also supports 
different projects now.

I am still leaning towards making this change, because all other shutdown 
options are already in the `TaskManagerOptions`, and this class just felt out 
of place in `flink-streaming-java`.


---


[jira] [Commented] (FLINK-8549) Move TimerServiceOptions to TaskManagerOptions

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350207#comment-16350207
 ] 

ASF GitHub Bot commented on FLINK-8549:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5402#discussion_r165626728
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java 
---
@@ -206,6 +206,14 @@
key("task.cancellation.timeout")
.defaultValue(18L);
 
+   /**
+* This configures how long we wait for the timers to finish all 
pending timer threads
+* when the stream task is cancelled .
+*/
+   public static final ConfigOption TASK_CANCELLATION_TIMEOUT_TIMERS 
= ConfigOptions
+   .key("task.cancellation.timeout.timers")
+   .defaultValue(7500L);
--- End diff --

Good point - I just checked, it was in fact already part of 1.4 (I had 
falsely in my mind that it was new in 1.5)


> Move TimerServiceOptions to TaskManagerOptions
> --
>
> Key: FLINK-8549
> URL: https://issues.apache.org/jira/browse/FLINK-8549
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0
>
>
> The {{TimerServiceOptions}} are in the wrong place (prohibit generation of 
> config docs) and cause over-fragmentation of the options in the code base.
> I propose to simple move the one option from that class to the 
> {{TaskManagerOptions}}, as it relates to task execution. Other shutdown 
> related options are in there already.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5402: [FLINK-8549] [config] Move TimerServiceOptions int...

2018-02-02 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/5402#discussion_r165626728
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java 
---
@@ -206,6 +206,14 @@
key("task.cancellation.timeout")
.defaultValue(18L);
 
+   /**
+* This configures how long we wait for the timers to finish all 
pending timer threads
+* when the stream task is cancelled .
+*/
+   public static final ConfigOption TASK_CANCELLATION_TIMEOUT_TIMERS 
= ConfigOptions
+   .key("task.cancellation.timeout.timers")
+   .defaultValue(7500L);
--- End diff --

Good point - I just checked, it was in fact already part of 1.4 (I had 
falsely in my mind that it was new in 1.5)


---


[GitHub] flink issue #5364: [FLINK-8472] [test] Extend all migration tests for Flink ...

2018-02-02 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5364
  
Thanks for the reviews, I'll address Chesnay's comment and the merge this 
then (to `master` and `release-1.4`).


---


[jira] [Commented] (FLINK-8472) Extend migration tests for Flink 1.4

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350146#comment-16350146
 ] 

ASF GitHub Bot commented on FLINK-8472:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5364
  
Thanks for the reviews, I'll address Chesnay's comment and the merge this 
then (to `master` and `release-1.4`).


> Extend migration tests for Flink 1.4
> 
>
> Key: FLINK-8472
> URL: https://issues.apache.org/jira/browse/FLINK-8472
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> The following migration tests should be extended to cover migrating Flink 
> jobs with a 1.4 savepoint.
>  * {{WindowOperatorMigrationTest}}
>  * {{CEPMigrationTest}}
>  * {{StatefulJobSavepointMigrationTestITCase}}
>  * {{FlinkKinesisConsumerMigrationTest}}
>  * {{FlinkKafkaConsumerBaseMigrationTest}}
>  * {{ContinuousFileProcessingMigrationTest}}
>  * {{BucketingSinkMigrationTest}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350138#comment-16350138
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on the issue:

https://github.com/apache/flink/pull/5295
  
I like long myself, but I think that's only because I'm quite used to 
working in milliseconds. As the existing static Session Windows take Time as 
the gap, I think it made sense to have the extract method also produce a time. 

If it returns a Time, we don't have to worry about an implementer getting 
confused about what time unit they need to be returning, or always having to 
look it up just to check that they're right.


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5295: [FLINK-8384] [streaming] Session Window Assigner with Dyn...

2018-02-02 Thread dyanarose
Github user dyanarose commented on the issue:

https://github.com/apache/flink/pull/5295
  
I like long myself, but I think that's only because I'm quite used to 
working in milliseconds. As the existing static Session Windows take Time as 
the gap, I think it made sense to have the extract method also produce a time. 

If it returns a Time, we don't have to worry about an implementer getting 
confused about what time unit they need to be returning, or always having to 
look it up just to check that they're right.


---


[jira] [Commented] (FLINK-8549) Move TimerServiceOptions to TaskManagerOptions

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350191#comment-16350191
 ] 

ASF GitHub Bot commented on FLINK-8549:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5402#discussion_r165623427
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java 
---
@@ -206,6 +206,14 @@
key("task.cancellation.timeout")
.defaultValue(18L);
 
+   /**
+* This configures how long we wait for the timers to finish all 
pending timer threads
+* when the stream task is cancelled .
+*/
+   public static final ConfigOption TASK_CANCELLATION_TIMEOUT_TIMERS 
= ConfigOptions
+   .key("task.cancellation.timeout.timers")
+   .defaultValue(7500L);
--- End diff --

add deprecated key? (I'm not quite sure whether the previous option was 
part of a release)


> Move TimerServiceOptions to TaskManagerOptions
> --
>
> Key: FLINK-8549
> URL: https://issues.apache.org/jira/browse/FLINK-8549
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0
>
>
> The {{TimerServiceOptions}} are in the wrong place (prohibit generation of 
> config docs) and cause over-fragmentation of the options in the code base.
> I propose to simple move the one option from that class to the 
> {{TaskManagerOptions}}, as it relates to task execution. Other shutdown 
> related options are in there already.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5402: [FLINK-8549] [config] Move TimerServiceOptions int...

2018-02-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5402#discussion_r165623427
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java 
---
@@ -206,6 +206,14 @@
key("task.cancellation.timeout")
.defaultValue(18L);
 
+   /**
+* This configures how long we wait for the timers to finish all 
pending timer threads
+* when the stream task is cancelled .
+*/
+   public static final ConfigOption TASK_CANCELLATION_TIMEOUT_TIMERS 
= ConfigOptions
+   .key("task.cancellation.timeout.timers")
+   .defaultValue(7500L);
--- End diff --

add deprecated key? (I'm not quite sure whether the previous option was 
part of a release)


---


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350119#comment-16350119
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5295
  
I think the changes are good! Thanks for working on this.  

As a final change before merging, I would annotate the new classes/methods 
as `@PublicEvolving`, would you be ok with that? And I would also like to 
change `SessionWindowTimeGapExtractor.extract()` to return a long instead of 
`Time`. What do you think?


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-02 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8543:

Fix Version/s: 1.5.0

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Major
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
> try {
> ObjectMetadata om = 
> this.fs.newObjectMetadata(this.backupFile.length());
> Upload upload = 
> this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
> ProgressableProgressListener listener = new 
> ProgressableProgressListener(this.fs, 

[jira] [Updated] (FLINK-8543) Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen

2018-02-02 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8543:

Priority: Blocker  (was: Major)

> Output Stream closed at org.apache.hadoop.fs.s3a.S3AOutputStream.checkOpen
> --
>
> Key: FLINK-8543
> URL: https://issues.apache.org/jira/browse/FLINK-8543
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.0
> Environment: IBM Analytics Engine - 
> [https://console.bluemix.net/docs/services/AnalyticsEngine/index.html#introduction]
> The cluster is based on Hortonworks Data Platform 2.6.2. The following 
> components are made available.
> Apache Spark 2.1.1 Hadoop 2.7.3
> Apache Livy 0.3.0
> Knox 0.12.0
> Ambari 2.5.2
> Anaconda with Python 2.7.13 and 3.5.2 
> Jupyter Enterprise Gateway 0.5.0 
> HBase 1.1.2 * 
> Hive 1.2.1 *
> Oozie 4.2.0 *
> Flume 1.5.2 * 
> Tez 0.7.0 * 
> Pig 0.16.0 * 
> Sqoop 1.4.6 * 
> Slider 0.92.0 * 
>Reporter: chris snow
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: Screen Shot 2018-01-30 at 18.34.51.png
>
>
> I'm hitting an issue with my BucketingSink from a streaming job.
>  
> {code:java}
> return new BucketingSink>(path)
>  .setWriter(writer)
>  .setBucketer(new DateTimeBucketer Object>>(formatString));
> {code}
>  
> I can see that a few files have run into issues with uploading to S3:
> !Screen Shot 2018-01-30 at 18.34.51.png!   
> The Flink console output is showing an exception being thrown by 
> S3AOutputStream, so I've grabbed the S3AOutputStream class from my cluster 
> and added some additional logging to the checkOpen() method to log the 'key' 
> just before the exception is thrown:
>  
> {code:java}
> /*
>  * Decompiled with CFR.
>  */
> package org.apache.hadoop.fs.s3a;
> import com.amazonaws.AmazonClientException;
> import com.amazonaws.event.ProgressListener;
> import com.amazonaws.services.s3.model.ObjectMetadata;
> import com.amazonaws.services.s3.model.PutObjectRequest;
> import com.amazonaws.services.s3.transfer.Upload;
> import com.amazonaws.services.s3.transfer.model.UploadResult;
> import java.io.BufferedOutputStream;
> import java.io.File;
> import java.io.FileOutputStream;
> import java.io.IOException;
> import java.io.InterruptedIOException;
> import java.io.OutputStream;
> import java.util.concurrent.atomic.AtomicBoolean;
> import org.apache.hadoop.classification.InterfaceAudience;
> import org.apache.hadoop.classification.InterfaceStability;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.s3a.ProgressableProgressListener;
> import org.apache.hadoop.fs.s3a.S3AFileSystem;
> import org.apache.hadoop.fs.s3a.S3AUtils;
> import org.apache.hadoop.util.Progressable;
> import org.slf4j.Logger;
> @InterfaceAudience.Private
> @InterfaceStability.Evolving
> public class S3AOutputStream
> extends OutputStream {
> private final OutputStream backupStream;
> private final File backupFile;
> private final AtomicBoolean closed = new AtomicBoolean(false);
> private final String key;
> private final Progressable progress;
> private final S3AFileSystem fs;
> public static final Logger LOG = S3AFileSystem.LOG;
> public S3AOutputStream(Configuration conf, S3AFileSystem fs, String key, 
> Progressable progress) throws IOException {
> this.key = key;
> this.progress = progress;
> this.fs = fs;
> this.backupFile = fs.createTmpFileForWrite("output-", -1, conf);
> LOG.debug("OutputStream for key '{}' writing to tempfile: {}", 
> (Object)key, (Object)this.backupFile);
> this.backupStream = new BufferedOutputStream(new 
> FileOutputStream(this.backupFile));
> }
> void checkOpen() throws IOException {
> if (!this.closed.get()) return;
> // vv-- Additional logging --vvv
> LOG.error("OutputStream for key '{}' closed.", (Object)this.key);
> throw new IOException("Output Stream closed");
> }
> @Override
> public void flush() throws IOException {
> this.checkOpen();
> this.backupStream.flush();
> }
> @Override
> public void close() throws IOException {
> if (this.closed.getAndSet(true)) {
> return;
> }
> this.backupStream.close();
> LOG.debug("OutputStream for key '{}' closed. Now beginning upload", 
> (Object)this.key);
> try {
> ObjectMetadata om = 
> this.fs.newObjectMetadata(this.backupFile.length());
> Upload upload = 
> this.fs.putObject(this.fs.newPutObjectRequest(this.key, om, this.backupFile));
> ProgressableProgressListener listener = new 
> 

[GitHub] flink issue #5295: [FLINK-8384] [streaming] Session Window Assigner with Dyn...

2018-02-02 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5295
  
I think the changes are good! Thanks for working on this. 👍 

As a final change before merging, I would annotate the new classes/methods 
as `@PublicEvolving`, would you be ok with that? And I would also like to 
change `SessionWindowTimeGapExtractor.extract()` to return a long instead of 
`Time`. What do you think?


---


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350040#comment-16350040
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165588395
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
--- End diff --

I think it makes sense to move this class to the same level as 
`FlinkKinesisConsumer`, since it is part of the public API.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350046#comment-16350046
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165591410
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -192,6 +198,14 @@ public FlinkKinesisConsumer(List streams, 
KinesisDeserializationSchema `shardAssigner`


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350048#comment-16350048
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165589453
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
+ */
+public interface KinesisShardAssigner extends Serializable {
+/**
+* Returns the index of the target subtask that a specific Kafka 
partition should be
+* assigned to. For return values outside the subtask range, modulus 
operation will
+* be applied automatically, hence it is also valid to just return a 
hash code.
+*
+* The resulting distribution of shards has the following contract:
--- End diff --

The resulting distribution of shards "**should**" have the following 
contract.

i.e., we can't guarantee it, instead the user implementation should 
guarantee it.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350045#comment-16350045
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165590209
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
+ */
+public interface KinesisShardAssigner extends Serializable {
+/**
+* Returns the index of the target subtask that a specific Kafka 
partition should be
+* assigned to. For return values outside the subtask range, modulus 
operation will
+* be applied automatically, hence it is also valid to just return a 
hash code.
+*
+* The resulting distribution of shards has the following contract:
+* 
+* 1. Uniform distribution across subtasks
+* 2. Deterministic, calls for a given shard always return same 
index.
+* 
+*
+* The above contract is crucial and cannot be broken. Consumer 
subtasks rely on this
+* contract to filter out partitions that they should not subscribe to, 
guaranteeing
+* that all partitions of a single topic will always be assigned to 
some subtask in a
+* uniformly distributed manner.
+*
+* Kinesis and the consumer support dynamic re-sharding and shard 
IDs, while sequential,
+* cannot be assumed to be consecutive. There is no perfect generic 
default assignment function.
+* Default subtask index assignment, which is based on hash code, may 
result in skew,
+* with some subtasks having many shards assigned and others none.
--- End diff --

I feel like this section of the Javadoc should be part of the Javadoc for 
the original consumer constructors, and should guide them to use the 
`setShardAssigner` method if the do encounter the case of serious shard skew.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350050#comment-16350050
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165588489
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
+ */
+public interface KinesisShardAssigner extends Serializable {
+/**
+* Returns the index of the target subtask that a specific Kafka 
partition should be
--- End diff --

mentions Kafka partition, should be Kinesis shard


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350043#comment-16350043
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165587909
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
+ */
+public interface KinesisShardAssigner extends Serializable {
+/**
+* Returns the index of the target subtask that a specific Kafka 
partition should be
+* assigned to. For return values outside the subtask range, modulus 
operation will
+* be applied automatically, hence it is also valid to just return a 
hash code.
+*
+* The resulting distribution of shards has the following contract:
+* 
+* 1. Uniform distribution across subtasks
+* 2. Deterministic, calls for a given shard always return same 
index.
+* 
+*
+* The above contract is crucial and cannot be broken. Consumer 
subtasks rely on this
+* contract to filter out partitions that they should not subscribe to, 
guaranteeing
+* that all partitions of a single topic will always be assigned to 
some subtask in a
+* uniformly distributed manner.
+*
+* Kinesis and the consumer support dynamic re-sharding and shard 
IDs, while sequential,
+* cannot be assumed to be consecutive. There is no perfect generic 
default assignment function.
+* Default subtask index assignment, which is based on hash code, may 
result in skew,
+* with some subtasks having many shards assigned and others none.
+*
+* It is recommended to monitor the shard distribution and adjust 
assignment appropriately.
+* Custom implementation may optimize the hash function or use static 
overrides to limit skew.
+ *
+ * @param shard the shard to determine
+ * @param numParallelSubtasks total number of subtasks
+ * @return index or hash code
--- End diff --

nit:
I think it would be more straightforward if we just say this returns the 
"target index".
If the index value is outside the subtask range, we perform a modulus 
operation.

So basically what the PR is already doing, just re-terming it to be more 
straightforward.
What do you think? This is also just a personal preference :)


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350041#comment-16350041
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165588295
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
+ */
+public interface KinesisShardAssigner extends Serializable {
--- End diff --

Add `@PublicEvolving` annotation, to make it clear this is a public API.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350037#comment-16350037
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165588081
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
+ */
+public interface KinesisShardAssigner extends Serializable {
+/**
--- End diff --

nit: add one empty line above comment block (just a personal preference, 
though)


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350044#comment-16350044
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165589748
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
+ */
+public interface KinesisShardAssigner extends Serializable {
+/**
+* Returns the index of the target subtask that a specific Kafka 
partition should be
+* assigned to. For return values outside the subtask range, modulus 
operation will
+* be applied automatically, hence it is also valid to just return a 
hash code.
+*
+* The resulting distribution of shards has the following contract:
+* 
+* 1. Uniform distribution across subtasks
+* 2. Deterministic, calls for a given shard always return same 
index.
+* 
+*
+* The above contract is crucial and cannot be broken. Consumer 
subtasks rely on this
+* contract to filter out partitions that they should not subscribe to, 
guaranteeing
+* that all partitions of a single topic will always be assigned to 
some subtask in a
--- End diff --

of a single "**Kinesis stream**", not topic (which is Kafka terms)


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350039#comment-16350039
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165587359
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -182,13 +191,15 @@ public KinesisDataFetcher(List streams,

SourceFunction.SourceContext sourceContext,
RuntimeContext 
runtimeContext,
Properties configProps,
-   
KinesisDeserializationSchema deserializationSchema) {
+   
KinesisDeserializationSchema deserializationSchema,
+   KinesisShardAssigner 
kinesisShardToSubTaskIndexFn) {
--- End diff --

mismatching variable name: `kinesisShardToSubTaskIndexFn` --> 
`kinesisShardAssigner` or just `shardAssigner`


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350042#comment-16350042
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165586434
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -76,6 +77,9 @@
 @Internal
 public class KinesisDataFetcher {
 
+   public static final KinesisShardAssigner DEFAULT_SHARD_ASSIGNER = 
(shard, subtasks) -> shard.hashCode();
+
+
--- End diff --

nit: unnecessary empty line


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350047#comment-16350047
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165594279
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -192,6 +198,14 @@ public FlinkKinesisConsumer(List streams, 
KinesisDeserializationSchema FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350049#comment-16350049
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165590401
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
--- End diff --

The overview class Javadoc could probably be less generic.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16350038#comment-16350038
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165586994
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 ---
@@ -214,6 +226,7 @@ protected KinesisDataFetcher(List streams,
this.totalNumberOfConsumerSubtasks = 
runtimeContext.getNumberOfParallelSubtasks();
this.indexOfThisConsumerSubtask = 
runtimeContext.getIndexOfThisSubtask();
this.deserializationSchema = 
checkNotNull(deserializationSchema);
+   this.shardAssigner = checkNotNull(shardAssigner);
--- End diff --

We also need to try cleaning the closure of the given object (if it is a 
non-static inner class):
```
ClosureCleaner.clean(shardAssigner, true);
```


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-02 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165590401
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
--- End diff --

The overview class Javadoc could probably be less generic.


---


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-02 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5393#discussion_r165590209
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisShardAssigner.java
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.util;
+
+import 
org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import java.io.Serializable;
+
+/**
+ * Utility to map Kinesis shards to Flink subtask indices.
+ */
+public interface KinesisShardAssigner extends Serializable {
+/**
+* Returns the index of the target subtask that a specific Kafka 
partition should be
+* assigned to. For return values outside the subtask range, modulus 
operation will
+* be applied automatically, hence it is also valid to just return a 
hash code.
+*
+* The resulting distribution of shards has the following contract:
+* 
+* 1. Uniform distribution across subtasks
+* 2. Deterministic, calls for a given shard always return same 
index.
+* 
+*
+* The above contract is crucial and cannot be broken. Consumer 
subtasks rely on this
+* contract to filter out partitions that they should not subscribe to, 
guaranteeing
+* that all partitions of a single topic will always be assigned to 
some subtask in a
+* uniformly distributed manner.
+*
+* Kinesis and the consumer support dynamic re-sharding and shard 
IDs, while sequential,
+* cannot be assumed to be consecutive. There is no perfect generic 
default assignment function.
+* Default subtask index assignment, which is based on hash code, may 
result in skew,
+* with some subtasks having many shards assigned and others none.
--- End diff --

I feel like this section of the Javadoc should be part of the Javadoc for 
the original consumer constructors, and should guide them to use the 
`setShardAssigner` method if the do encounter the case of serious shard skew.


---


  1   2   >