[GitHub] flink issue #2811: [FLINK-5159] Improve perfomance of inner joins with a sin...

2016-11-30 Thread AlexanderShoshin
Github user AlexanderShoshin commented on the issue:

https://github.com/apache/flink/pull/2811
  
Hi, @fhueske.
I've made the corrections that you asked.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5159) Improve perfomance of inner joins with a single row input

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15711203#comment-15711203
 ] 

ASF GitHub Bot commented on FLINK-5159:
---

Github user AlexanderShoshin commented on the issue:

https://github.com/apache/flink/pull/2811
  
Hi, @fhueske.
I've made the corrections that you asked.


> Improve perfomance of inner joins with a single row input
> -
>
> Key: FLINK-5159
> URL: https://issues.apache.org/jira/browse/FLINK-5159
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Alexander Shoshin
>Assignee: Alexander Shoshin
>Priority: Minor
>
> All inner joins (including a cross join) can be implemented as a 
> {{MapFunction}} if one of their inputs is a single row. This row can be 
> passed to a {{MapFunction}} as a {{BroadcastSet}}.
> This approach is going to be more lightweight than the other current 
> strategies.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4523) Allow Kinesis Consumer to start from specific timestamp / Date

2016-11-30 Thread Wei-Che Wei (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Che Wei reassigned FLINK-4523:
--

Assignee: Wei-Che Wei

> Allow Kinesis Consumer to start from specific timestamp / Date
> --
>
> Key: FLINK-4523
> URL: https://issues.apache.org/jira/browse/FLINK-4523
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> We had a Kinesis user requesting this feature on an offline chat.
> To be specific, we let all initial Kinesis shards be iterated starting from 
> records at the given timestamp.
> The AWS Java SDK we're using already provides API for this, so we can add 
> this functionality with fairly low overhead: 
> http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/kinesis/model/GetShardIteratorRequest.html#setTimestamp-java.util.Date-



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-5185) Decouple BatchTableSourceScan with TableSourceTable

2016-11-30 Thread Kurt Young (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15710644#comment-15710644
 ] 

Kurt Young edited comment on FLINK-5185 at 12/1/16 2:45 AM:


Hi [~fhueske], thanks for the reply.

Actually, this issue reflects a fundamental question we need to answer: "who 
decides the RowType when you scan the table?" or "who is the schema authority?".

Apparently, your suggestion leads us with the answer: {{BatchTableSouceScan}}. 
But i think the answer should be {{TableSource}}, which is responsible for all 
the real work. Let {{BatchTableSouceScan}} holds the optional parameters for 
projection or filter will do the work for now, but when we begin to introduce 
complex {{TableSources}}, which can even let you push part of the query down, 
the {{BatchTableSouceScan}} will no longer competent for this job. 

So i propose to let {{TableSource}} be the RowType authority, and we can and 
only can get RowType information from {{TableSource}}. And also, only 
{{TableSource}} can decide how to react to the provided projection columns or 
filter condition or even the original query. {{BatchTableSouceScan}} should 
pass these information to the {{TableSource}}, and just wait for the new 
RowType comes from the new {{TableSource}}. After all, RowType is the only 
thing {{BatchTableSouceScan}} cares and should care. 


was (Author: ykt836):
Hi [~fhueske], thanks for the reply.

Actually, this issue reflects a fundamental question we need to answer: "who 
decides the RowType when you scan the table?" or "who is the schema authority?".

Apparently, your suggestion leads us with the answer: {{BatchTableSouceScan}}. 
But i think the answer should be {{TableSource}}, which is responsible for all 
the real work. Let {{BatchTableSouceScan}} holds the optional parameters for 
projection or filter will do the work for now, but when we begin to introduce 
complex {{TableSources}}, which can even let you push part of the query down, 
the {{BatchTableSouceScan}} will no longer competent for his job. 

So i propose to let {{TableSource}} be the RowType authority, and we can and 
only can get RowType information from {{TableSource}}. And also, only 
{{TableSource}} can decide how to react to the provided projection columns or 
filter condition or even the original query. {{BatchTableSouceScan}} should 
pass these information to the {{TableSource}}, and just wait for the new 
RowType comes from the new {{TableSource}}. After all, RowType is the only 
thing {{BatchTableSouceScan}} cares and should care. 

> Decouple BatchTableSourceScan with TableSourceTable
> ---
>
> Key: FLINK-5185
> URL: https://issues.apache.org/jira/browse/FLINK-5185
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Kurt Young
>Assignee: zhangjing
>Priority: Minor
>
> As the components' relationship show in this design doc:
> https://docs.google.com/document/d/1PBnEbOcFHlEF1qGGAUgJvINdEXzzFTIRElgvs4-Tdeo/
> We found it's been annoying for {{BatchTableSourceScan}} directly holding 
> {{TableSourceTable}}, and refer to {{TableSource}} further. It's ok if the 
> relationship is immutable, but when we want to change the {{TableSource}} 
> when applying optimizations, it will cause some conflicts and 
> misunderstanding. 
> Since there is only one way to change {{TableSource}}, which is creating a 
> new {{TableSourceTable}} to hold the new {{TableSource}}, and create a new 
> {{BatchTableSourceScan}} pointing to the {{TableSourceTable}} which just 
> created. The annoying part is the {{RelOptTable}} comes from the super class 
> {{TableScan}} still holds the connection to the original {{TableSourceTable}} 
> and {{TableSource}}. It will cause some misunderstanding, which one should 
> the {{Scan}} rely to, and what's difference between these tables. 
> Besides, {{TableSourceTable}} is not very useful in {{BatchTableSourceScan}}, 
> the only thing {{Scan}} cares is the {{RowType}} it returns, since this is 
> and should be decided by {{TableSource}}. So we can let 
> {{BatchTableSourceScan}} directly holding {{TableSource}} instead of holding 
> {{TableSourceTable}}.If some original information are needed, find table 
> through {{RelOptTable}}. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5185) Decouple BatchTableSourceScan with TableSourceTable

2016-11-30 Thread Kurt Young (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15710644#comment-15710644
 ] 

Kurt Young commented on FLINK-5185:
---

Hi [~fhueske], thanks for the reply.

Actually, this issue reflects a fundamental question we need to answer: "who 
decides the RowType when you scan the table?" or "who is the schema authority?".

Apparently, your suggestion leads us with the answer: {{BatchTableSouceScan}}. 
But i think the answer should be {{TableSource}}, which is responsible for all 
the real work. Let {{BatchTableSouceScan}} holds the optional parameters for 
projection or filter will do the work for now, but when we begin to introduce 
complex {{TableSource}}s, which can even let you push part of the query down, 
the {{BatchTableSouceScan}} will no longer competent for his job. 

So i propose to let {{TableSource}} be the RowType authority, and we can and 
only can get RowType information from {{TableSource}}. And also, only 
{{TableSource}} can decide how to react to the provided projection columns or 
filter condition or even the original query. {{BatchTableSouceScan}} should 
pass these information to the {{TableSource}}, and just wait for the new 
RowType comes from the new {{TableSource}}. After all, RowType is the only 
thing {{BatchTableSouceScan}} cares and should care. 

> Decouple BatchTableSourceScan with TableSourceTable
> ---
>
> Key: FLINK-5185
> URL: https://issues.apache.org/jira/browse/FLINK-5185
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Kurt Young
>Assignee: zhangjing
>Priority: Minor
>
> As the components' relationship show in this design doc:
> https://docs.google.com/document/d/1PBnEbOcFHlEF1qGGAUgJvINdEXzzFTIRElgvs4-Tdeo/
> We found it's been annoying for {{BatchTableSourceScan}} directly holding 
> {{TableSourceTable}}, and refer to {{TableSource}} further. It's ok if the 
> relationship is immutable, but when we want to change the {{TableSource}} 
> when applying optimizations, it will cause some conflicts and 
> misunderstanding. 
> Since there is only one way to change {{TableSource}}, which is creating a 
> new {{TableSourceTable}} to hold the new {{TableSource}}, and create a new 
> {{BatchTableSourceScan}} pointing to the {{TableSourceTable}} which just 
> created. The annoying part is the {{RelOptTable}} comes from the super class 
> {{TableScan}} still holds the connection to the original {{TableSourceTable}} 
> and {{TableSource}}. It will cause some misunderstanding, which one should 
> the {{Scan}} rely to, and what's difference between these tables. 
> Besides, {{TableSourceTable}} is not very useful in {{BatchTableSourceScan}}, 
> the only thing {{Scan}} cares is the {{RowType}} it returns, since this is 
> and should be decided by {{TableSource}}. So we can let 
> {{BatchTableSourceScan}} directly holding {{TableSource}} instead of holding 
> {{TableSourceTable}}.If some original information are needed, find table 
> through {{RelOptTable}}. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-5185) Decouple BatchTableSourceScan with TableSourceTable

2016-11-30 Thread Kurt Young (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15710644#comment-15710644
 ] 

Kurt Young edited comment on FLINK-5185 at 12/1/16 2:44 AM:


Hi [~fhueske], thanks for the reply.

Actually, this issue reflects a fundamental question we need to answer: "who 
decides the RowType when you scan the table?" or "who is the schema authority?".

Apparently, your suggestion leads us with the answer: {{BatchTableSouceScan}}. 
But i think the answer should be {{TableSource}}, which is responsible for all 
the real work. Let {{BatchTableSouceScan}} holds the optional parameters for 
projection or filter will do the work for now, but when we begin to introduce 
complex {{TableSources}}, which can even let you push part of the query down, 
the {{BatchTableSouceScan}} will no longer competent for his job. 

So i propose to let {{TableSource}} be the RowType authority, and we can and 
only can get RowType information from {{TableSource}}. And also, only 
{{TableSource}} can decide how to react to the provided projection columns or 
filter condition or even the original query. {{BatchTableSouceScan}} should 
pass these information to the {{TableSource}}, and just wait for the new 
RowType comes from the new {{TableSource}}. After all, RowType is the only 
thing {{BatchTableSouceScan}} cares and should care. 


was (Author: ykt836):
Hi [~fhueske], thanks for the reply.

Actually, this issue reflects a fundamental question we need to answer: "who 
decides the RowType when you scan the table?" or "who is the schema authority?".

Apparently, your suggestion leads us with the answer: {{BatchTableSouceScan}}. 
But i think the answer should be {{TableSource}}, which is responsible for all 
the real work. Let {{BatchTableSouceScan}} holds the optional parameters for 
projection or filter will do the work for now, but when we begin to introduce 
complex {{TableSource}}s, which can even let you push part of the query down, 
the {{BatchTableSouceScan}} will no longer competent for his job. 

So i propose to let {{TableSource}} be the RowType authority, and we can and 
only can get RowType information from {{TableSource}}. And also, only 
{{TableSource}} can decide how to react to the provided projection columns or 
filter condition or even the original query. {{BatchTableSouceScan}} should 
pass these information to the {{TableSource}}, and just wait for the new 
RowType comes from the new {{TableSource}}. After all, RowType is the only 
thing {{BatchTableSouceScan}} cares and should care. 

> Decouple BatchTableSourceScan with TableSourceTable
> ---
>
> Key: FLINK-5185
> URL: https://issues.apache.org/jira/browse/FLINK-5185
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Kurt Young
>Assignee: zhangjing
>Priority: Minor
>
> As the components' relationship show in this design doc:
> https://docs.google.com/document/d/1PBnEbOcFHlEF1qGGAUgJvINdEXzzFTIRElgvs4-Tdeo/
> We found it's been annoying for {{BatchTableSourceScan}} directly holding 
> {{TableSourceTable}}, and refer to {{TableSource}} further. It's ok if the 
> relationship is immutable, but when we want to change the {{TableSource}} 
> when applying optimizations, it will cause some conflicts and 
> misunderstanding. 
> Since there is only one way to change {{TableSource}}, which is creating a 
> new {{TableSourceTable}} to hold the new {{TableSource}}, and create a new 
> {{BatchTableSourceScan}} pointing to the {{TableSourceTable}} which just 
> created. The annoying part is the {{RelOptTable}} comes from the super class 
> {{TableScan}} still holds the connection to the original {{TableSourceTable}} 
> and {{TableSource}}. It will cause some misunderstanding, which one should 
> the {{Scan}} rely to, and what's difference between these tables. 
> Besides, {{TableSourceTable}} is not very useful in {{BatchTableSourceScan}}, 
> the only thing {{Scan}} cares is the {{RowType}} it returns, since this is 
> and should be decided by {{TableSource}}. So we can let 
> {{BatchTableSourceScan}} directly holding {{TableSource}} instead of holding 
> {{TableSourceTable}}.If some original information are needed, find table 
> through {{RelOptTable}}. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5214) Clean up checkpoint files when failing checkpoint operation on TM

2016-11-30 Thread Xiaogang Shi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15710512#comment-15710512
 ] 

Xiaogang Shi commented on FLINK-5214:
-

I opened FLINK-5086 to report a similar problem, but I do not have a good idea 
how to resolve it. 

Because JM does know the existence of these checkpoint files, it seems only TM 
can delete them. But as a failed TM may not be recovered by the JM if the 
number of retries exceeds the given limit,  these files will not be deleted in 
such cases.

One possible solution i think is to let each TM return a handler to JM when the 
TM is registered. JM can use the handler to clean the files even when the TM 
fails. 

Another solution is to recover the TM when the number of retries exceeds the 
limit. Once the TM is recovered, the only thing it does is to clean the 
checkpoint files.

Do you have any better ideas?

> Clean up checkpoint files when failing checkpoint operation on TM
> -
>
> Key: FLINK-5214
> URL: https://issues.apache.org/jira/browse/FLINK-5214
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> When the {{StreamTask#performCheckpoint}} operation fails on a 
> {{TaskManager}} potentially created checkpoint files are not cleaned up. This 
> should be changed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5091) Formalize the AppMaster environment for docker compability

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15710479#comment-15710479
 ] 

ASF GitHub Bot commented on FLINK-5091:
---

GitHub user EronWright opened a pull request:

https://github.com/apache/flink/pull/2915

[FLINK-5091] Formalize the Mesos AppMaster environment for docker 
compatibility

Fixes FLINK-5091.

- introduced ContainerSpecification.
- reworked how the TM container environment is constructed; eliminated
special-case environment variables, file layout.
- added dynamic configuration support to GlobalConfiguration.
- integrated the SecurityContext into AM/TM runners.
- added config setting for Mesos framework user.
- support DCOS side-channel authentication.
- set the FS default scheme.
- made the artifact server more generic (no assumption about existence
of dispatcher, Path-based).
- moved some test code related to overriding the JVM’s env.
- moved the Mesos containerizer config code to the 
MesosTaskManagerParameters.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/EronWright/flink feature-FLINK-5091

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2915.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2915


commit 82342491b2996f5086fd17f3b24ca588e783b8d4
Author: wrighe3 
Date:   2016-11-17T19:33:40Z

[FLINK-4921] Upgrade to Mesos 1.0.1

Updated the Mesos dependency, to unlock some new features (notably the
ability to fetch into sandbox sub-directories).

Shaded the protobuf dependency because the new Mesos library depends on
a newer version than does akka-remoting.

commit abaf2b4b3eb7c9978bdbcdc4d8c59617b86be664
Author: wrighe3 
Date:   2016-11-30T08:34:58Z

[FLINK-5091] Formalize the Mesos AppMaster environment for docker 
compability

Fixes FLINK-5091 and FLINK-4826.

- introduced ContainerSpecification.
- reworked how the TM container environment is constructed; eliminated
special-case environment variables, file layout.
- added dynamic configuration support to GlobalConfiguration.
- integrated the SecurityContext into AM/TM runners.
- added config setting for Mesos framework user.
- support DCOS side-channel authentication.
- set the FS default scheme.
- made the artifact server more generic (no assumption about existence
of dispatcher, Path-based).
- moved some test code related to overriding the JVM’s env.

commit e0db6f4c3d8630b11ed022453de2296e0d5c03ef
Author: wrighe3 
Date:   2016-11-30T23:22:00Z

Merge branch 'master' of https://github.com/apache/flink into 
feature-FLINK-5091

commit 08f9921eb3d5883d5ab1c1f786a4cac9f777b310
Author: wrighe3 
Date:   2016-12-01T01:25:41Z

[FLINK-5091] Formalize the Mesos AppMaster environment for docker 
compatibility

- remove dead code




> Formalize the AppMaster environment for docker compability
> --
>
> Key: FLINK-5091
> URL: https://issues.apache.org/jira/browse/FLINK-5091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
> Fix For: 1.2.0
>
>
> For scenarios where the AppMaster is launched from a docker image, it would 
> be ideal to use the installed Flink rather than rely on a special file layout 
> in the sandbox directory.
> This is related to DCOS integration, which (in 1.2) will launch the AppMaster 
> via Marathon (as a top-level DCOS service).  The existing code assumed that 
> only the dispatcher (coming in 1.3) would launch the AppMaster.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2915: [FLINK-5091] Formalize the Mesos AppMaster environ...

2016-11-30 Thread EronWright
GitHub user EronWright opened a pull request:

https://github.com/apache/flink/pull/2915

[FLINK-5091] Formalize the Mesos AppMaster environment for docker 
compatibility

Fixes FLINK-5091.

- introduced ContainerSpecification.
- reworked how the TM container environment is constructed; eliminated
special-case environment variables, file layout.
- added dynamic configuration support to GlobalConfiguration.
- integrated the SecurityContext into AM/TM runners.
- added config setting for Mesos framework user.
- support DCOS side-channel authentication.
- set the FS default scheme.
- made the artifact server more generic (no assumption about existence
of dispatcher, Path-based).
- moved some test code related to overriding the JVM’s env.
- moved the Mesos containerizer config code to the 
MesosTaskManagerParameters.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/EronWright/flink feature-FLINK-5091

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2915.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2915


commit 82342491b2996f5086fd17f3b24ca588e783b8d4
Author: wrighe3 
Date:   2016-11-17T19:33:40Z

[FLINK-4921] Upgrade to Mesos 1.0.1

Updated the Mesos dependency, to unlock some new features (notably the
ability to fetch into sandbox sub-directories).

Shaded the protobuf dependency because the new Mesos library depends on
a newer version than does akka-remoting.

commit abaf2b4b3eb7c9978bdbcdc4d8c59617b86be664
Author: wrighe3 
Date:   2016-11-30T08:34:58Z

[FLINK-5091] Formalize the Mesos AppMaster environment for docker 
compability

Fixes FLINK-5091 and FLINK-4826.

- introduced ContainerSpecification.
- reworked how the TM container environment is constructed; eliminated
special-case environment variables, file layout.
- added dynamic configuration support to GlobalConfiguration.
- integrated the SecurityContext into AM/TM runners.
- added config setting for Mesos framework user.
- support DCOS side-channel authentication.
- set the FS default scheme.
- made the artifact server more generic (no assumption about existence
of dispatcher, Path-based).
- moved some test code related to overriding the JVM’s env.

commit e0db6f4c3d8630b11ed022453de2296e0d5c03ef
Author: wrighe3 
Date:   2016-11-30T23:22:00Z

Merge branch 'master' of https://github.com/apache/flink into 
feature-FLINK-5091

commit 08f9921eb3d5883d5ab1c1f786a4cac9f777b310
Author: wrighe3 
Date:   2016-12-01T01:25:41Z

[FLINK-5091] Formalize the Mesos AppMaster environment for docker 
compatibility

- remove dead code




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Comment Edited] (FLINK-5185) Decouple BatchTableSourceScan with TableSourceTable

2016-11-30 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709903#comment-15709903
 ] 

Fabian Hueske edited comment on FLINK-5185 at 11/30/16 9:56 PM:


I assume, you are proposing this in order to create a {{BatchTableSourceScan}} 
with pushed down predicate or projection, right?
In this case, you would need to create a new {{TableSourceTable}} to hold the 
modified {{TableSource}} which projects and/or filters in the {{RelOptRule}} 
that translates {{LogicalScan}} and {{LogicalCalc}} to {{BatchTableSourceScan}}.
I see that this is actually a problem which needs to be fixed.

How about we just add optional parameters for projection columns and filter 
conditions to the {{BatchTableSouceScan}} and generate the modified 
{{TableSource}} inside of the {{BatchTableSouceScan.translateToPlan()}} method? 
Then we would not need to create a new {{TableSource}} in a {{RelOptRule}} at 
all but would do this when the plan is generated.


was (Author: fhueske):
I assume, you are proposing this in order to create a {{BatchTableSourceScan}} 
with pushed down predicate or projection, right?
In this case, you would need to create a new {{TableSourceTable}} to hold the 
modified {{TableSource}} which projects and/or filters in the {{RelOptRule}} 
that translates {{LogicalScan}} and {{LogicalCalc}} to {{BatchTableSourceScan}}.

How about we just add optional parameters for projection columns and filter 
conditions to the {{BatchTableSouceScan}} and generate the modified 
{{TableSource}} inside of the {{BatchTableSouceScan.translateToPlan()}} method? 
Then we would not need to create a new {{TableSource}} in a {{RelOptRule}} at 
all but would do this when the plan is generated.

> Decouple BatchTableSourceScan with TableSourceTable
> ---
>
> Key: FLINK-5185
> URL: https://issues.apache.org/jira/browse/FLINK-5185
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Kurt Young
>Assignee: zhangjing
>Priority: Minor
>
> As the components' relationship show in this design doc:
> https://docs.google.com/document/d/1PBnEbOcFHlEF1qGGAUgJvINdEXzzFTIRElgvs4-Tdeo/
> We found it's been annoying for {{BatchTableSourceScan}} directly holding 
> {{TableSourceTable}}, and refer to {{TableSource}} further. It's ok if the 
> relationship is immutable, but when we want to change the {{TableSource}} 
> when applying optimizations, it will cause some conflicts and 
> misunderstanding. 
> Since there is only one way to change {{TableSource}}, which is creating a 
> new {{TableSourceTable}} to hold the new {{TableSource}}, and create a new 
> {{BatchTableSourceScan}} pointing to the {{TableSourceTable}} which just 
> created. The annoying part is the {{RelOptTable}} comes from the super class 
> {{TableScan}} still holds the connection to the original {{TableSourceTable}} 
> and {{TableSource}}. It will cause some misunderstanding, which one should 
> the {{Scan}} rely to, and what's difference between these tables. 
> Besides, {{TableSourceTable}} is not very useful in {{BatchTableSourceScan}}, 
> the only thing {{Scan}} cares is the {{RowType}} it returns, since this is 
> and should be decided by {{TableSource}}. So we can let 
> {{BatchTableSourceScan}} directly holding {{TableSource}} instead of holding 
> {{TableSourceTable}}.If some original information are needed, find table 
> through {{RelOptTable}}. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5185) Decouple BatchTableSourceScan with TableSourceTable

2016-11-30 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709903#comment-15709903
 ] 

Fabian Hueske commented on FLINK-5185:
--

I assume, you are proposing this in order to create a {{BatchTableSourceScan}} 
with pushed down predicate or projection, right?
In this case, you would need to create a new {{TableSourceTable}} to hold the 
modified {{TableSource}} which projects and/or filters in the {{RelOptRule}} 
that translates {{LogicalScan}} and {{LogicalCalc}} to {{BatchTableSourceScan}}.

How about we just add optional parameters for projection columns and filter 
conditions to the {{BatchTableSouceScan}} and generate the modified 
{{TableSource}} inside of the {{BatchTableSouceScan.translateToPlan()}} method? 
Then we would not need to create a new {{TableSource}} in a {{RelOptRule}} at 
all but would do this when the plan is generated.

> Decouple BatchTableSourceScan with TableSourceTable
> ---
>
> Key: FLINK-5185
> URL: https://issues.apache.org/jira/browse/FLINK-5185
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Kurt Young
>Assignee: zhangjing
>Priority: Minor
>
> As the components' relationship show in this design doc:
> https://docs.google.com/document/d/1PBnEbOcFHlEF1qGGAUgJvINdEXzzFTIRElgvs4-Tdeo/
> We found it's been annoying for {{BatchTableSourceScan}} directly holding 
> {{TableSourceTable}}, and refer to {{TableSource}} further. It's ok if the 
> relationship is immutable, but when we want to change the {{TableSource}} 
> when applying optimizations, it will cause some conflicts and 
> misunderstanding. 
> Since there is only one way to change {{TableSource}}, which is creating a 
> new {{TableSourceTable}} to hold the new {{TableSource}}, and create a new 
> {{BatchTableSourceScan}} pointing to the {{TableSourceTable}} which just 
> created. The annoying part is the {{RelOptTable}} comes from the super class 
> {{TableScan}} still holds the connection to the original {{TableSourceTable}} 
> and {{TableSource}}. It will cause some misunderstanding, which one should 
> the {{Scan}} rely to, and what's difference between these tables. 
> Besides, {{TableSourceTable}} is not very useful in {{BatchTableSourceScan}}, 
> the only thing {{Scan}} cares is the {{RowType}} it returns, since this is 
> and should be decided by {{TableSource}}. So we can let 
> {{BatchTableSourceScan}} directly holding {{TableSource}} instead of holding 
> {{TableSourceTable}}.If some original information are needed, find table 
> through {{RelOptTable}}. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3719) WebInterface: Moving the barrier between graph and stats

2016-11-30 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-3719.
-
Resolution: Implemented

Implemented in bd62fe14ac3e8c382b72c7c16ce1f0a6ee7db77d

> WebInterface: Moving the barrier between graph and stats
> 
>
> Key: FLINK-3719
> URL: https://issues.apache.org/jira/browse/FLINK-3719
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Niels Basjes
>Assignee: Ivan Mushketyk
> Fix For: 1.2.0
>
>
> It would be really useful if the separator between the graphical view of a 
> job topology at the top and the textual overview of the counters at the 
> bottom can be moved up/down. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3680) Remove or improve (not set) text in the Job Plan UI

2016-11-30 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan closed FLINK-3680.
-
Resolution: Fixed

1.1: 7b5d769aded2b653ac4e573bf0d9c63e250ef684
1.2: 16c08b54c4cce8801981eb6d4c79cf972b5e85b6

> Remove or improve (not set) text in the Job Plan UI
> ---
>
> Key: FLINK-3680
> URL: https://issues.apache.org/jira/browse/FLINK-3680
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Jamie Grier
>Assignee: Ivan Mushketyk
> Fix For: 1.2.0, 1.1.4
>
> Attachments: Screen Shot 2016-03-29 at 8.12.17 PM.png, Screen Shot 
> 2016-03-29 at 8.13.12 PM.png
>
>
> When running streaming jobs the UI display (not set) in the UI in a few 
> different places.  This is not the case for batch jobs.
> To illustrate I've included screen shots of the UI for the batch and 
> streaming WordCount examples.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3719) WebInterface: Moving the barrier between graph and stats

2016-11-30 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-3719:
--
Fix Version/s: 1.2.0

> WebInterface: Moving the barrier between graph and stats
> 
>
> Key: FLINK-3719
> URL: https://issues.apache.org/jira/browse/FLINK-3719
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Niels Basjes
>Assignee: Ivan Mushketyk
> Fix For: 1.2.0
>
>
> It would be really useful if the separator between the graphical view of a 
> job topology at the top and the textual overview of the counters at the 
> bottom can be moved up/down. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2457: [FLINK-3680][web frontend] Remove "(not set)" text...

2016-11-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2457


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3680) Remove or improve (not set) text in the Job Plan UI

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709893#comment-15709893
 ] 

ASF GitHub Bot commented on FLINK-3680:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2457


> Remove or improve (not set) text in the Job Plan UI
> ---
>
> Key: FLINK-3680
> URL: https://issues.apache.org/jira/browse/FLINK-3680
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Jamie Grier
>Assignee: Ivan Mushketyk
> Fix For: 1.2.0, 1.1.4
>
> Attachments: Screen Shot 2016-03-29 at 8.12.17 PM.png, Screen Shot 
> 2016-03-29 at 8.13.12 PM.png
>
>
> When running streaming jobs the UI display (not set) in the UI in a few 
> different places.  This is not the case for batch jobs.
> To illustrate I've included screen shots of the UI for the batch and 
> streaming WordCount examples.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2467: [FLINK-3719][web frontend] Moving the barrier betw...

2016-11-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2467


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3719) WebInterface: Moving the barrier between graph and stats

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709894#comment-15709894
 ] 

ASF GitHub Bot commented on FLINK-3719:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2467


> WebInterface: Moving the barrier between graph and stats
> 
>
> Key: FLINK-3719
> URL: https://issues.apache.org/jira/browse/FLINK-3719
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Niels Basjes
>Assignee: Ivan Mushketyk
>
> It would be really useful if the separator between the graphical view of a 
> job topology at the top and the textual overview of the counters at the 
> bottom can be moved up/down. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-4848) keystoreFilePath should be checked against null in SSLUtils#createSSLServerContext

2016-11-30 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15582930#comment-15582930
 ] 

Ted Yu edited comment on FLINK-4848 at 11/30/16 9:38 PM:
-

There is similar issue with trustStoreFilePath:
{code}
trustStoreFile = new FileInputStream(new File(trustStoreFilePath));
{code}


was (Author: yuzhih...@gmail.com):
There is similar issue with trustStoreFilePath:

{code}
trustStoreFile = new FileInputStream(new File(trustStoreFilePath));
{code}

> keystoreFilePath should be checked against null in 
> SSLUtils#createSSLServerContext
> --
>
> Key: FLINK-4848
> URL: https://issues.apache.org/jira/browse/FLINK-4848
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
>   String keystoreFilePath = sslConfig.getString(
> ConfigConstants.SECURITY_SSL_KEYSTORE,
> null);
> ...
>   try {
> keyStoreFile = new FileInputStream(new File(keystoreFilePath));
> {code}
> If keystoreFilePath is null, the File ctor would throw NPE.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5002) Lack of synchronization in LocalBufferPool#getNumberOfUsedBuffers

2016-11-30 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709854#comment-15709854
 ] 

Ted Yu commented on FLINK-5002:
---

lgtm

> Lack of synchronization in LocalBufferPool#getNumberOfUsedBuffers
> -
>
> Key: FLINK-5002
> URL: https://issues.apache.org/jira/browse/FLINK-5002
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: Roman Maier
>Priority: Minor
>  Labels: easyfix, starter
>
> {code}
>   public int getNumberOfUsedBuffers() {
> return numberOfRequestedMemorySegments - availableMemorySegments.size();
>   }
> {code}
> Access to availableMemorySegments should be protected with proper 
> synchronization as other methods do.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3719) WebInterface: Moving the barrier between graph and stats

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709735#comment-15709735
 ] 

ASF GitHub Bot commented on FLINK-3719:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2467
  
Merging this ...


> WebInterface: Moving the barrier between graph and stats
> 
>
> Key: FLINK-3719
> URL: https://issues.apache.org/jira/browse/FLINK-3719
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Reporter: Niels Basjes
>Assignee: Ivan Mushketyk
>
> It would be really useful if the separator between the graphical view of a 
> job topology at the top and the textual overview of the counters at the 
> bottom can be moved up/down. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2467: [FLINK-3719][web frontend] Moving the barrier between gra...

2016-11-30 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2467
  
Merging this ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5188) Shade dependency

2016-11-30 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709688#comment-15709688
 ] 

Chesnay Schepler commented on FLINK-5188:
-

which dependency in which project are you referring to?

> Shade dependency
> 
>
> Key: FLINK-5188
> URL: https://issues.apache.org/jira/browse/FLINK-5188
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, Table API & SQL
>Reporter: Anton Solovev
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5069) Pending checkpoint statistics gauge

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709675#comment-15709675
 ] 

ASF GitHub Bot commented on FLINK-5069:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2809
  
I don't think There is a benefit in having a separate custom class (in this 
case, PendingCheckpointStats) to encapsulate the data.

Most reporters turn Gauge values to strings. This means that the list and 
the PendingCheckpoitnStats objects are pretty much immediately discarded. We 
can save some overhead by creating the String straight away.

The only reporter that doesn't turn them to strings is the JMXReporter, 
which however would not be able to expose this object at all since it doesn't 
fulfill the necessary requirements.


> Pending checkpoint statistics gauge
> ---
>
> Key: FLINK-5069
> URL: https://issues.apache.org/jira/browse/FLINK-5069
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> Add the statistics of pending checkpoints as a gauge metric. When the 
> checkpoint appears not to be completed, this metric would  help to get the 
> state of a pending checkpoint , e.g  which task did not complete the 
> checkpoint. 
> The statistic will be as the follows:
> checkpointID, 
> Number Of Acknowledged Tasks,
> Number Of Not yet Acknowledged Tasks, 
> The detail of not yet acknowledged JobVertexID, taskID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2809: [FLINK-5069] [Metrics] Pending checkpoint statistics gaug...

2016-11-30 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2809
  
I don't think There is a benefit in having a separate custom class (in this 
case, PendingCheckpointStats) to encapsulate the data.

Most reporters turn Gauge values to strings. This means that the list and 
the PendingCheckpoitnStats objects are pretty much immediately discarded. We 
can save some overhead by creating the String straight away.

The only reporter that doesn't turn them to strings is the JMXReporter, 
which however would not be able to expose this object at all since it doesn't 
fulfill the necessary requirements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3680) Remove or improve (not set) text in the Job Plan UI

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709650#comment-15709650
 ] 

ASF GitHub Bot commented on FLINK-3680:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2457
  
Merging this ...


> Remove or improve (not set) text in the Job Plan UI
> ---
>
> Key: FLINK-3680
> URL: https://issues.apache.org/jira/browse/FLINK-3680
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Reporter: Jamie Grier
>Assignee: Ivan Mushketyk
> Fix For: 1.2.0, 1.1.4
>
> Attachments: Screen Shot 2016-03-29 at 8.12.17 PM.png, Screen Shot 
> 2016-03-29 at 8.13.12 PM.png
>
>
> When running streaming jobs the UI display (not set) in the UI in a few 
> different places.  This is not the case for batch jobs.
> To illustrate I've included screen shots of the UI for the batch and 
> streaming WordCount examples.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2457: [FLINK-3680][web frontend] Remove "(not set)" text in the...

2016-11-30 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2457
  
Merging this ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4705) Instrument FixedLengthRecordSorter

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709603#comment-15709603
 ] 

ASF GitHub Bot commented on FLINK-4705:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2617
  
Hi @StephanEwen, should we endeavor to get this into 1.2 or defer to the 
next release?


> Instrument FixedLengthRecordSorter
> --
>
> Key: FLINK-4705
> URL: https://issues.apache.org/jira/browse/FLINK-4705
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> The {{NormalizedKeySorter}} sorts on the concatenation of (potentially 
> partial) keys plus an 8-byte pointer to the record. After sorting each 
> pointer must be dereferenced, which is not cache friendly.
> The {{FixedLengthRecordSorter}} sorts on the concatentation of full keys 
> followed by the remainder of the record. The records can then be deserialized 
> in sequence.
> Instrumenting the {{FixedLengthRecordSorter}} requires implementing the 
> comparator methods {{writereadWithKeyNormalization}} and 
> {{readWithKeyNormalization}}.
> Testing {{JaccardIndex}} on an m4.16xlarge the scale 18 runtime dropped from 
> 71.8 to 68.8 s (4.3% faster) and the scale 20 runtime dropped from 546.1 to 
> 501.8 s (8.8% faster).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2617: [FLINK-4705] Instrument FixedLengthRecordSorter

2016-11-30 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/2617
  
Hi @StephanEwen, should we endeavor to get this into 1.2 or defer to the 
next release?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5188) Shade dependency

2016-11-30 Thread Anton Solovev (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709586#comment-15709586
 ] 

Anton Solovev commented on FLINK-5188:
--

Exactly

> Shade dependency
> 
>
> Key: FLINK-5188
> URL: https://issues.apache.org/jira/browse/FLINK-5188
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, Table API & SQL
>Reporter: Anton Solovev
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-5188) Shade dependency

2016-11-30 Thread Anton Solovev (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Solovev updated FLINK-5188:
-
Summary: Shade dependency  (was: Redirect dependencies)

> Shade dependency
> 
>
> Key: FLINK-5188
> URL: https://issues.apache.org/jira/browse/FLINK-5188
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, Table API & SQL
>Reporter: Anton Solovev
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5069) Pending checkpoint statistics gauge

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709567#comment-15709567
 ] 

ASF GitHub Bot commented on FLINK-5069:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2809#discussion_r90306783
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/PendingCheckpointStats.java
 ---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.stats;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+/**
+ * Statistics for a pending checkpoint.
+ */
+public class PendingCheckpointStats implements Serializable {
--- End diff --

Can this be refactored into `org.apache.flink.runtime.metrics.groups`?


> Pending checkpoint statistics gauge
> ---
>
> Key: FLINK-5069
> URL: https://issues.apache.org/jira/browse/FLINK-5069
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> Add the statistics of pending checkpoints as a gauge metric. When the 
> checkpoint appears not to be completed, this metric would  help to get the 
> state of a pending checkpoint , e.g  which task did not complete the 
> checkpoint. 
> The statistic will be as the follows:
> checkpointID, 
> Number Of Acknowledged Tasks,
> Number Of Not yet Acknowledged Tasks, 
> The detail of not yet acknowledged JobVertexID, taskID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5069) Pending checkpoint statistics gauge

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709566#comment-15709566
 ] 

ASF GitHub Bot commented on FLINK-5069:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2809#discussion_r90303696
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -40,15 +43,21 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
 import java.util.ArrayDeque;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.LinkedHashMap;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
--- End diff --

Let's keep these sorted. I use and recommend @StephanEwen's configuration 
from https://github.com/apache/flink/pull/2352/files


> Pending checkpoint statistics gauge
> ---
>
> Key: FLINK-5069
> URL: https://issues.apache.org/jira/browse/FLINK-5069
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> Add the statistics of pending checkpoints as a gauge metric. When the 
> checkpoint appears not to be completed, this metric would  help to get the 
> state of a pending checkpoint , e.g  which task did not complete the 
> checkpoint. 
> The statistic will be as the follows:
> checkpointID, 
> Number Of Acknowledged Tasks,
> Number Of Not yet Acknowledged Tasks, 
> The detail of not yet acknowledged JobVertexID, taskID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5069) Pending checkpoint statistics gauge

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709571#comment-15709571
 ] 

ASF GitHub Bot commented on FLINK-5069:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2809#discussion_r90305721
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/PendingCheckpointStats.java
 ---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.stats;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+/**
+ * Statistics for a pending checkpoint.
+ */
+public class PendingCheckpointStats implements Serializable {
+
+   /** ID of the checkpoint. */
+   private final long checkpointId;
+
+   /** Timestamp when the checkpoint was triggered. */
+   private final long triggerTimestamp;
+
+   /** Number Of Acknowledged Tasks. */
+   private final int numberOfAcknowledgedTasks;
+
+   /** Number Of Not yet Acknowledged Tasks. */
+   private final int numberOfNonAcknowledgedTasks;
+
+   /** Not yet Acknowledged Tasks. */
+   private final Map> notYetAcknowledgedTasks;
+
+   /**
+* Creates a pending checkpoint statistic.
+*
+* @param checkpointId   Checkpoint ID
+* @param triggerTimestamp   Timestamp when the checkpoint 
was triggered
+* @param numberOfAcknowledgedTasks  Number Of Acknowledged Tasks
+* @param numberOfNonAcknowledgedTasks   Number Of Not yet Acknowledged 
Tasks
+* @param notYetAcknowledgedTasksNot yet Acknowledged Tasks
+*/
+   public PendingCheckpointStats(
+   long checkpointId,
+   long triggerTimestamp,
+   int numberOfAcknowledgedTasks,
+   int numberOfNonAcknowledgedTasks,
+   Map> notYetAcknowledgedTasks) {
+
+   this.checkpointId = checkpointId;
+   this.triggerTimestamp = triggerTimestamp;
+   this.numberOfAcknowledgedTasks = numberOfAcknowledgedTasks;
+   this.numberOfNonAcknowledgedTasks = 
numberOfNonAcknowledgedTasks;
+   this.notYetAcknowledgedTasks = notYetAcknowledgedTasks;
+   }
+
+   /**
+* Returns the ID of the checkpoint.
+*
+* @return ID of the checkpoint.
+*/
+   public long getCheckpointId() {
+   return checkpointId;
+   }
+
+   /**
+* Returns the timestamp when the checkpoint was triggered.
+*
+* @return Timestamp when the checkpoint was triggered.
+*/
+   public long getTriggerTimestamp() {
+   return triggerTimestamp;
+   }
+
+   /**
+* Returns the number of acknowledged tasks of the checkpoint.
+*
+* @return Number Of acknowledged tasks of the checkpoint.
+*/
+   public int getNumberOfAcknowledgedTasks() {
+   return numberOfAcknowledgedTasks;
+   }
+
+   /**
+* Returns the number of not yet acknowledged tasks of the checkpoint.
+*
+* @return Number of not yet acknowledged tasks of the checkpoint.
+*/
+   public int getNumberOfNonAcknowledgedTasks() {
+   return numberOfNonAcknowledgedTasks;
+   }
+
+   /**
+* Returns the not yet acknowledged tasks of the checkpoint.
+*
+* @return Not yet acknowledged tasks of the checkpoint.
+*/
+   public Map> getNotYetAcknowledgedTasks() {
+   return notYetAcknowledgedTasks;
+   }
+
+   @Override
+   public String toString() {
+   StringBuilder sb = new StringBuilder();
+   sb.append("Checkpoint{ID=").append(ch

[jira] [Commented] (FLINK-5069) Pending checkpoint statistics gauge

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709570#comment-15709570
 ] 

ASF GitHub Bot commented on FLINK-5069:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2809#discussion_r90306654
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -957,4 +969,65 @@ public void run() {
}
});
}
+
+   // 

+   //  pending checkpoints stats metrics
+   // 

+   private class PendingCheckpointStatGauge implements 
Gauge> {
--- End diff --

Can this be refactored into 
`org.apache.flink.runtime.metrics.util.MetricUtils`?


> Pending checkpoint statistics gauge
> ---
>
> Key: FLINK-5069
> URL: https://issues.apache.org/jira/browse/FLINK-5069
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> Add the statistics of pending checkpoints as a gauge metric. When the 
> checkpoint appears not to be completed, this metric would  help to get the 
> state of a pending checkpoint , e.g  which task did not complete the 
> checkpoint. 
> The statistic will be as the follows:
> checkpointID, 
> Number Of Acknowledged Tasks,
> Number Of Not yet Acknowledged Tasks, 
> The detail of not yet acknowledged JobVertexID, taskID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5069) Pending checkpoint statistics gauge

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709569#comment-15709569
 ] 

ASF GitHub Bot commented on FLINK-5069:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2809#discussion_r90308555
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -214,6 +224,8 @@ public CheckpointCoordinator(
checkpointProperties = 
CheckpointProperties.forStandardCheckpoint();
}
 
+   metrics.gauge("pendingCheckpointStat", new 
PendingCheckpointStatGauge());
--- End diff --

Need to document in `metrics.md`.


> Pending checkpoint statistics gauge
> ---
>
> Key: FLINK-5069
> URL: https://issues.apache.org/jira/browse/FLINK-5069
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> Add the statistics of pending checkpoints as a gauge metric. When the 
> checkpoint appears not to be completed, this metric would  help to get the 
> state of a pending checkpoint , e.g  which task did not complete the 
> checkpoint. 
> The statistic will be as the follows:
> checkpointID, 
> Number Of Acknowledged Tasks,
> Number Of Not yet Acknowledged Tasks, 
> The detail of not yet acknowledged JobVertexID, taskID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5069) Pending checkpoint statistics gauge

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709568#comment-15709568
 ] 

ASF GitHub Bot commented on FLINK-5069:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2809#discussion_r90312273
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -957,4 +969,65 @@ public void run() {
}
});
}
+
+   // 

+   //  pending checkpoints stats metrics
+   // 

+   private class PendingCheckpointStatGauge implements 
Gauge> {
+
+   @Override
+   public List getValue() {
+   List pendingCheckpointStatsList 
= new ArrayList<>();
+
+   for (PendingCheckpoint checkpoint : 
pendingCheckpoints.values()) {
+   long checkpointId = 
checkpoint.getCheckpointId();
+   long triggerTime = 
checkpoint.getCheckpointTimestamp();
+   int numberOfAckTasks = 
checkpoint.getNumberOfAcknowledgedTasks();
+   int numberOfNonAckTasks = 
checkpoint.getNumberOfNonAcknowledgedTasks();
+
+   // Acknowledged tasks for double check 
NonAcknowledged tasks
+   Set ackTasks = new HashSet<>();
+   for (TaskState taskState : 
checkpoint.getTaskStates().values()) {
+   JobVertexID jobVertexID = 
taskState.getJobVertexID();
+   for (int subtaskId : 
taskState.getSubtaskStates().keySet()) {
+   ackTasks.add(jobVertexID + "_" 
+ subtaskId);
+   }
+   }
+
+   // Not yet Acknowledged tasks
+   Map> nonAckTasks = 
new HashMap<>();
--- End diff --

Can we make use of `PendingCheckpointnotYetAcknowledgedTasks`?


> Pending checkpoint statistics gauge
> ---
>
> Key: FLINK-5069
> URL: https://issues.apache.org/jira/browse/FLINK-5069
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Reporter: zhuhaifeng
>Assignee: zhuhaifeng
>Priority: Minor
> Fix For: 1.2.0
>
>
> Add the statistics of pending checkpoints as a gauge metric. When the 
> checkpoint appears not to be completed, this metric would  help to get the 
> state of a pending checkpoint , e.g  which task did not complete the 
> checkpoint. 
> The statistic will be as the follows:
> checkpointID, 
> Number Of Acknowledged Tasks,
> Number Of Not yet Acknowledged Tasks, 
> The detail of not yet acknowledged JobVertexID, taskID



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2809: [FLINK-5069] [Metrics] Pending checkpoint statisti...

2016-11-30 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2809#discussion_r90306654
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -957,4 +969,65 @@ public void run() {
}
});
}
+
+   // 

+   //  pending checkpoints stats metrics
+   // 

+   private class PendingCheckpointStatGauge implements 
Gauge> {
--- End diff --

Can this be refactored into 
`org.apache.flink.runtime.metrics.util.MetricUtils`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2809: [FLINK-5069] [Metrics] Pending checkpoint statisti...

2016-11-30 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2809#discussion_r90303696
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -40,15 +43,21 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
 import java.util.ArrayDeque;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.LinkedHashMap;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
--- End diff --

Let's keep these sorted. I use and recommend @StephanEwen's configuration 
from https://github.com/apache/flink/pull/2352/files


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2809: [FLINK-5069] [Metrics] Pending checkpoint statisti...

2016-11-30 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2809#discussion_r90305721
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/PendingCheckpointStats.java
 ---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.stats;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+/**
+ * Statistics for a pending checkpoint.
+ */
+public class PendingCheckpointStats implements Serializable {
+
+   /** ID of the checkpoint. */
+   private final long checkpointId;
+
+   /** Timestamp when the checkpoint was triggered. */
+   private final long triggerTimestamp;
+
+   /** Number Of Acknowledged Tasks. */
+   private final int numberOfAcknowledgedTasks;
+
+   /** Number Of Not yet Acknowledged Tasks. */
+   private final int numberOfNonAcknowledgedTasks;
+
+   /** Not yet Acknowledged Tasks. */
+   private final Map> notYetAcknowledgedTasks;
+
+   /**
+* Creates a pending checkpoint statistic.
+*
+* @param checkpointId   Checkpoint ID
+* @param triggerTimestamp   Timestamp when the checkpoint 
was triggered
+* @param numberOfAcknowledgedTasks  Number Of Acknowledged Tasks
+* @param numberOfNonAcknowledgedTasks   Number Of Not yet Acknowledged 
Tasks
+* @param notYetAcknowledgedTasksNot yet Acknowledged Tasks
+*/
+   public PendingCheckpointStats(
+   long checkpointId,
+   long triggerTimestamp,
+   int numberOfAcknowledgedTasks,
+   int numberOfNonAcknowledgedTasks,
+   Map> notYetAcknowledgedTasks) {
+
+   this.checkpointId = checkpointId;
+   this.triggerTimestamp = triggerTimestamp;
+   this.numberOfAcknowledgedTasks = numberOfAcknowledgedTasks;
+   this.numberOfNonAcknowledgedTasks = 
numberOfNonAcknowledgedTasks;
+   this.notYetAcknowledgedTasks = notYetAcknowledgedTasks;
+   }
+
+   /**
+* Returns the ID of the checkpoint.
+*
+* @return ID of the checkpoint.
+*/
+   public long getCheckpointId() {
+   return checkpointId;
+   }
+
+   /**
+* Returns the timestamp when the checkpoint was triggered.
+*
+* @return Timestamp when the checkpoint was triggered.
+*/
+   public long getTriggerTimestamp() {
+   return triggerTimestamp;
+   }
+
+   /**
+* Returns the number of acknowledged tasks of the checkpoint.
+*
+* @return Number Of acknowledged tasks of the checkpoint.
+*/
+   public int getNumberOfAcknowledgedTasks() {
+   return numberOfAcknowledgedTasks;
+   }
+
+   /**
+* Returns the number of not yet acknowledged tasks of the checkpoint.
+*
+* @return Number of not yet acknowledged tasks of the checkpoint.
+*/
+   public int getNumberOfNonAcknowledgedTasks() {
+   return numberOfNonAcknowledgedTasks;
+   }
+
+   /**
+* Returns the not yet acknowledged tasks of the checkpoint.
+*
+* @return Not yet acknowledged tasks of the checkpoint.
+*/
+   public Map> getNotYetAcknowledgedTasks() {
+   return notYetAcknowledgedTasks;
+   }
+
+   @Override
+   public String toString() {
+   StringBuilder sb = new StringBuilder();
+   sb.append("Checkpoint{ID=").append(checkpointId).
+   append(",triggerTime=").append(triggerTimestamp).
+   
append(",numOfAckTasks=").append(numberOfAcknowledgedTasks).
+   
append(",numOfNonAckTasks=").append(numberOfNonAcknowledge

[GitHub] flink pull request #2809: [FLINK-5069] [Metrics] Pending checkpoint statisti...

2016-11-30 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2809#discussion_r90312273
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -957,4 +969,65 @@ public void run() {
}
});
}
+
+   // 

+   //  pending checkpoints stats metrics
+   // 

+   private class PendingCheckpointStatGauge implements 
Gauge> {
+
+   @Override
+   public List getValue() {
+   List pendingCheckpointStatsList 
= new ArrayList<>();
+
+   for (PendingCheckpoint checkpoint : 
pendingCheckpoints.values()) {
+   long checkpointId = 
checkpoint.getCheckpointId();
+   long triggerTime = 
checkpoint.getCheckpointTimestamp();
+   int numberOfAckTasks = 
checkpoint.getNumberOfAcknowledgedTasks();
+   int numberOfNonAckTasks = 
checkpoint.getNumberOfNonAcknowledgedTasks();
+
+   // Acknowledged tasks for double check 
NonAcknowledged tasks
+   Set ackTasks = new HashSet<>();
+   for (TaskState taskState : 
checkpoint.getTaskStates().values()) {
+   JobVertexID jobVertexID = 
taskState.getJobVertexID();
+   for (int subtaskId : 
taskState.getSubtaskStates().keySet()) {
+   ackTasks.add(jobVertexID + "_" 
+ subtaskId);
+   }
+   }
+
+   // Not yet Acknowledged tasks
+   Map> nonAckTasks = 
new HashMap<>();
--- End diff --

Can we make use of `PendingCheckpointnotYetAcknowledgedTasks`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2809: [FLINK-5069] [Metrics] Pending checkpoint statisti...

2016-11-30 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2809#discussion_r90308555
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -214,6 +224,8 @@ public CheckpointCoordinator(
checkpointProperties = 
CheckpointProperties.forStandardCheckpoint();
}
 
+   metrics.gauge("pendingCheckpointStat", new 
PendingCheckpointStatGauge());
--- End diff --

Need to document in `metrics.md`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2809: [FLINK-5069] [Metrics] Pending checkpoint statisti...

2016-11-30 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2809#discussion_r90306783
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/PendingCheckpointStats.java
 ---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.stats;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+/**
+ * Statistics for a pending checkpoint.
+ */
+public class PendingCheckpointStats implements Serializable {
--- End diff --

Can this be refactored into `org.apache.flink.runtime.metrics.groups`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-5216) CheckpointCoordinator's 'minPauseBetweenCheckpoints' refers to checkpoint start rather then checkpoint completion

2016-11-30 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-5216:
---

 Summary: CheckpointCoordinator's 'minPauseBetweenCheckpoints' 
refers to checkpoint start rather then checkpoint completion
 Key: FLINK-5216
 URL: https://issues.apache.org/jira/browse/FLINK-5216
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.1.3
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.2.0, 1.1.4






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5188) Redirect dependencies

2016-11-30 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709313#comment-15709313
 ] 

Stephan Ewen commented on FLINK-5188:
-

What does "redirect dependencies" mean?

> Redirect dependencies
> -
>
> Key: FLINK-5188
> URL: https://issues.apache.org/jira/browse/FLINK-5188
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, Table API & SQL
>Reporter: Anton Solovev
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-5188) Redirect dependencies

2016-11-30 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5188?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709313#comment-15709313
 ] 

Stephan Ewen edited comment on FLINK-5188 at 11/30/16 6:14 PM:
---

What does "redirect dependencies" mean? Do you mean "dependency shading"?


was (Author: stephanewen):
What does "redirect dependencies" mean?

> Redirect dependencies
> -
>
> Key: FLINK-5188
> URL: https://issues.apache.org/jira/browse/FLINK-5188
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core, Table API & SQL
>Reporter: Anton Solovev
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5091) Formalize the AppMaster environment for docker compability

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709299#comment-15709299
 ] 

ASF GitHub Bot commented on FLINK-5091:
---

Github user EronWright closed the pull request at:

https://github.com/apache/flink/pull/2904


> Formalize the AppMaster environment for docker compability
> --
>
> Key: FLINK-5091
> URL: https://issues.apache.org/jira/browse/FLINK-5091
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
> Fix For: 1.2.0
>
>
> For scenarios where the AppMaster is launched from a docker image, it would 
> be ideal to use the installed Flink rather than rely on a special file layout 
> in the sandbox directory.
> This is related to DCOS integration, which (in 1.2) will launch the AppMaster 
> via Marathon (as a top-level DCOS service).  The existing code assumed that 
> only the dispatcher (coming in 1.3) would launch the AppMaster.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2904: [FLINK-5091] Formalize the Mesos AppMaster environ...

2016-11-30 Thread EronWright
Github user EronWright closed the pull request at:

https://github.com/apache/flink/pull/2904


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4826) Add keytab based kerberos support for Mesos environment

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709198#comment-15709198
 ] 

ASF GitHub Bot commented on FLINK-4826:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2734


> Add keytab based kerberos support for Mesos environment
> ---
>
> Key: FLINK-4826
> URL: https://issues.apache.org/jira/browse/FLINK-4826
> Project: Flink
>  Issue Type: Sub-task
>  Components: Mesos, Security
>Reporter: Vijay Srinivasaraghavan
>Assignee: Vijay Srinivasaraghavan
> Fix For: 1.2.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2900: Rebased: Keytab & TLS support for Flink on Mesos S...

2016-11-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2900


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2734: Keytab & TLS support for Flink on Mesos Setup

2016-11-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2734


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2900: Rebased: Keytab & TLS support for Flink on Mesos Setup

2016-11-30 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2900
  
Tests passed. Merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2914: [FLINK-4611] Make "AUTO" credential provider as de...

2016-11-30 Thread tony810430
GitHub user tony810430 opened a pull request:

https://github.com/apache/flink/pull/2914

[FLINK-4611] Make "AUTO" credential provider as default for Kinesis C…



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tony810430/flink FLINK-4611

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2914.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2914


commit be8b22fc3d8dd1c5aa97800c988fe4492b94993e
Author: 魏偉哲 
Date:   2016-11-30T10:17:24Z

[FLINK-4611] Make "AUTO" credential provider as default for Kinesis 
Connector




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-5214) Clean up checkpoint files when failing checkpoint operation on TM

2016-11-30 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-5214:


Assignee: Till Rohrmann

> Clean up checkpoint files when failing checkpoint operation on TM
> -
>
> Key: FLINK-5214
> URL: https://issues.apache.org/jira/browse/FLINK-5214
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> When the {{StreamTask#performCheckpoint}} operation fails on a 
> {{TaskManager}} potentially created checkpoint files are not cleaned up. This 
> should be changed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5215) Close checkpoint streams upon cancellation

2016-11-30 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-5215:


 Summary: Close checkpoint streams upon cancellation
 Key: FLINK-5215
 URL: https://issues.apache.org/jira/browse/FLINK-5215
 Project: Flink
  Issue Type: Bug
  Components: TaskManager
Affects Versions: 1.1.3
Reporter: Till Rohrmann
 Fix For: 1.1.4


Closing checkpoint streams upon cancellation of a {{Task}} could help to 
interrupt the main thread being stuck in the {{snapshotState}} method of 
stateful operator. Otherwise, ff the snapshotState method takes too long to 
complete, the watch dog will kill the whole {{TaskManager}} terminating all 
other running tasks as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4611) Make "AUTO" credential provider as default for Kinesis Connector

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709154#comment-15709154
 ] 

ASF GitHub Bot commented on FLINK-4611:
---

GitHub user tony810430 opened a pull request:

https://github.com/apache/flink/pull/2914

[FLINK-4611] Make "AUTO" credential provider as default for Kinesis C…



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tony810430/flink FLINK-4611

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2914.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2914


commit be8b22fc3d8dd1c5aa97800c988fe4492b94993e
Author: 魏偉哲 
Date:   2016-11-30T10:17:24Z

[FLINK-4611] Make "AUTO" credential provider as default for Kinesis 
Connector




> Make "AUTO" credential provider as default for Kinesis Connector
> 
>
> Key: FLINK-4611
> URL: https://issues.apache.org/jira/browse/FLINK-4611
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Wei-Che Wei
> Fix For: 1.2.0
>
>
> Right now, the Kinesis Consumer / Producer by default directly expects the 
> access key id and secret access key to be given in the config properties.
> This isn't a good practice for accessing AWS services, and usually Kinesis 
> users would most likely be running their Flink application in AWS instances 
> that have embedded credentials that can be access via the default credential 
> provider chain. Therefore, it makes sense to change the default 
> {{AWS_CREDENTIALS_PROVIDER}} to {{AUTO}} instead of {{BASIC}}.
> To avoid breaking user code, we only use directly supplied AWS credentials if 
> both access key and secret key is given through {{AWS_ACCESS_KEY}} and 
> {{AWS_SECRET_KEY}}. Otherwise, the default credential provider chain is used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5214) Clean up checkpoint files when failing checkpoint operation on TM

2016-11-30 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-5214:


 Summary: Clean up checkpoint files when failing checkpoint 
operation on TM
 Key: FLINK-5214
 URL: https://issues.apache.org/jira/browse/FLINK-5214
 Project: Flink
  Issue Type: Bug
  Components: TaskManager
Affects Versions: 1.1.3, 1.2.0
Reporter: Till Rohrmann
 Fix For: 1.2.0, 1.1.4


When the {{StreamTask#performCheckpoint}} operation fails on a {{TaskManager}} 
potentially created checkpoint files are not cleaned up. This should be changed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3160) Aggregate operator statistics by TaskManager

2016-11-30 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709120#comment-15709120
 ] 

Greg Hogan commented on FLINK-3160:
---

For large parallelism (> 1000, depending on device and browser) the number of 
subtasks is unwieldy. The TaskManager tab aggregates subtasks per TaskManager 
so the table is generally much smaller. Also, the aggregated statistics can be 
quite useful.

How useful would it be to sort by subtask ID when that value is not included in 
the table? A user would need to count down the list to locate a specific index.

> Aggregate operator statistics by TaskManager
> 
>
> Key: FLINK-3160
> URL: https://issues.apache.org/jira/browse/FLINK-3160
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.0.0
>
>
> The web client job info page presents a table of the following per task 
> statistics: start time, end time, duration, bytes received, records received, 
> bytes sent, records sent, attempt, host, status.
> Flink supports clusters with thousands of slots and a job setting a high 
> parallelism renders this job info page unwieldy and difficult to analyze in 
> real-time.
> It would be helpful to optionally or automatically aggregate statistics by 
> TaskManager. These rows could then be expanded to reveal the current per task 
> statistics.
> Start time, end time, duration, and attempt are not applicable to a 
> TaskManager since new tasks for repeated attempts may be started. Bytes 
> received, records received, bytes sent, and records sent are summed. Any 
> throughput metrics can be averaged over the total task time or time window. 
> Status could reference the number of running tasks on the TaskManager or an 
> idle state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3160) Aggregate operator statistics by TaskManager

2016-11-30 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709119#comment-15709119
 ] 

Greg Hogan commented on FLINK-3160:
---

For large parallelism (> 1000, depending on device and browser) the number of 
subtasks is unwieldy. The TaskManager tab aggregates subtasks per TaskManager 
so the table is generally much smaller. Also, the aggregated statistics can be 
quite useful.

How useful would it be to sort by subtask ID when that value is not included in 
the table? A user would need to count down the list to locate a specific index.

> Aggregate operator statistics by TaskManager
> 
>
> Key: FLINK-3160
> URL: https://issues.apache.org/jira/browse/FLINK-3160
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.0.0
>
>
> The web client job info page presents a table of the following per task 
> statistics: start time, end time, duration, bytes received, records received, 
> bytes sent, records sent, attempt, host, status.
> Flink supports clusters with thousands of slots and a job setting a high 
> parallelism renders this job info page unwieldy and difficult to analyze in 
> real-time.
> It would be helpful to optionally or automatically aggregate statistics by 
> TaskManager. These rows could then be expanded to reveal the current per task 
> statistics.
> Start time, end time, duration, and attempt are not applicable to a 
> TaskManager since new tasks for repeated attempts may be started. Bytes 
> received, records received, bytes sent, and records sent are summed. Any 
> throughput metrics can be averaged over the total task time or time window. 
> Status could reference the number of running tasks on the TaskManager or an 
> idle state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2644) State partitioning does not respect the different partitioning of multiple inputs

2016-11-30 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-2644.
-
   Resolution: Fixed
Fix Version/s: 1.0.1

Fixed a few versions back...

> State partitioning does not respect the different partitioning of multiple 
> inputs
> -
>
> Key: FLINK-2644
> URL: https://issues.apache.org/jira/browse/FLINK-2644
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Gyula Fora
> Fix For: 1.0.1
>
>
> Currently state partitioning is only available for one-input stream operators 
> and even for those, the partitioning will only depend on the first input.
> This happens because the keyselector used to extract the partitioning key 
> from each element is used from the first input stream. We need to make sure 
> that for each input stream the corresponding key selector is used.
> To do this, ctx.nextRecord(record), should be extended by adding a parameter 
> denoting the index of the input channel which can be used by the partitioner 
> to select the appropriate key selector.
> The same mechanics can be extended to allow state partitioning for TwoInput 
> stream operators as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2644) State partitioning does not respect the different partitioning of multiple inputs

2016-11-30 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-2644.
---

> State partitioning does not respect the different partitioning of multiple 
> inputs
> -
>
> Key: FLINK-2644
> URL: https://issues.apache.org/jira/browse/FLINK-2644
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Gyula Fora
> Fix For: 1.0.1
>
>
> Currently state partitioning is only available for one-input stream operators 
> and even for those, the partitioning will only depend on the first input.
> This happens because the keyselector used to extract the partitioning key 
> from each element is used from the first input stream. We need to make sure 
> that for each input stream the corresponding key selector is used.
> To do this, ctx.nextRecord(record), should be extended by adding a parameter 
> denoting the index of the input channel which can be used by the partitioner 
> to select the appropriate key selector.
> The same mechanics can be extended to allow state partitioning for TwoInput 
> stream operators as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2999) Support connected keyed streams

2016-11-30 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-2999.
-
   Resolution: Fixed
Fix Version/s: 1.0.1

This has been implemented a while back already.

> Support connected keyed streams
> ---
>
> Key: FLINK-2999
> URL: https://issues.apache.org/jira/browse/FLINK-2999
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Fabian Hueske
>Assignee: Stephan Ewen
> Fix For: 1.0.1
>
>
> It would be nice to add support for connected keyed streams to enable 
> key-partitioned state in Co*Functions.
> This could be done by simply connecting two keyed Streams or adding a new 
> method to connect and key two streams as one operation.
> {code}
> DataStream s1 = ...
> DataStream s2 = ...
> // alternative 1
> s1
>   .keyBy(0)
>   .connect(s2.keyBy(1))
>   .map(new KeyedCoMap());
> // alternative 2
> s1
>   .connectByKey(s2, 0, 1)
>   .map(new KeyedCoMap());
> public class KeyedCoMap implements RichCoMapFunction {
>   
>   OperatorState s;
>   public void open() {
> s = getRuntimeContext().getKeyValueState("abc", A.class, new A());
>   }
>   // ...
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4855) Add partitionedKeyBy to DataStream

2016-11-30 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709090#comment-15709090
 ] 

Stephan Ewen commented on FLINK-4855:
-

I think this is a good idea, I have heard this being asked a couple of times.

[~greghogan] It could be discussed on the mailing list, true. In this case, it 
is a very small addition, so discussing it in this issue would be probably okay.
Probably the biggest remaining question is how to call the new function.

My suggestion for the name would be `reKeyBy()`, because it expresses that this 
should be used on streams that were keyed before.

I recently wrote a utility function for a user. The utility function does 
re-keying for window functions, here is the code, usable as a utility function 
as a temporary way to realize that.
{code}
public static  SingleOutputStreamOperator 
reKeyAndWindow(
DataStream input,
KeySelector keySelector,
WindowAssigner windowAssigner,
WindowFunction function) {

Trigger trigger = 
windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());

return reKeyAndWindow(input, keySelector, windowAssigner, trigger, 
function);
}

public static  SingleOutputStreamOperator 
reKeyAndWindow(
DataStream input,
KeySelector keySelector,
WindowAssigner windowAssigner,
Trigger trigger,
WindowFunction function) {

TypeInformation resultType = 
TypeExtractor.getUnaryOperatorReturnType(
function, WindowFunction.class, true, true, 
input.getType(), null, false);

TypeInformation keyType = 
TypeExtractor.getKeySelectorTypes(keySelector, input.getType());

return reKeyAndWindow(input, keySelector, windowAssigner, trigger, 
function, resultType, keyType);
}

public static  SingleOutputStreamOperator 
reKeyAndWindow(
DataStream input,
KeySelector keySelector,
WindowAssigner windowAssigner,
Trigger trigger,
WindowFunction function,
TypeInformation resultType,
TypeInformation keyType) {

StreamExecutionEnvironment env = input.getExecutionEnvironment();
String callLocation = Utils.getCallLocationName();
String udfName = "WindowedStream." + callLocation;

ListStateDescriptor stateDesc = new 
ListStateDescriptor<>("window-contents",
input.getType().createSerializer(env.getConfig()));

String opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + 
", " + trigger + ", " + udfName + ")";

WindowOperator, R, W> operator =
new WindowOperator<>(windowAssigner,

windowAssigner.getWindowSerializer(env.getConfig()),
keySelector,

keyType.createSerializer(env.getConfig()),
stateDesc,
new 
InternalIterableWindowFunction<>(function),
trigger,
0L); // last parameter is the allowed 
lateness

SingleOutputStreamOperator result = input.transform(opName, 
resultType, operator);

OneInputTransformation transform = (OneInputTransformation) 
result.getTransformation();
transform.setStateKeySelector(keySelector);
transform.setStateKeyType(keyType);

return result;
}

// -- example usage ---

public static void main(String[] args) throws Exception {

final KeySelector, Long> keySelector = (value) -> 
value.f0;

DataStream> stream = env
.generateSequence(1, 1000L)
.map( (value) -> new Tuple2<>(value / 1, 1L) );

DataStream> perSecondWindows = stream
.keyBy(keySelector)
.timeWindow(Time.seconds(1))
.sum(1);

DataStream> perFiveSecondWindows = reKeyAndWindow(
perSecondWindows,
keySelector,
TumblingEventTimeWindows.of(Time.seconds(5)),
new WindowFunction, Tuple2, Long, TimeWindow>() { ... }
);
}
{code}


> Add partitionedKeyBy to DataStream
> --
>
> Key: FLINK-4855
> URL: https://issues.apache.org/jira/browse/FLINK-4855
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Xiaowei Jiang
>Assignee: MaGuowei
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> After we do any interesting operati

[jira] [Commented] (FLINK-5209) Fix TaskManager metrics

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709072#comment-15709072
 ] 

ASF GitHub Bot commented on FLINK-5209:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2902#discussion_r90274186
  
--- Diff: 
flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.metrics.jade
 ---
@@ -75,18 +75,18 @@ div(ng-if="metrics.id")
 tbody
   tr
 td Direct
-td {{ metrics.metrics.directCount }}
-td {{ metrics.metrics.directUsed }}
-td {{ metrics.metrics.directTotal }}
+td {{ metrics.metrics.directCount | humanizeBytes }}
+td {{ metrics.metrics.directUsed | humanizeBytes }}
+td {{ metrics.metrics.directMax | humanizeBytes }}
   tr
 td Mapped
-td {{ metrics.metrics.mappedCount }}
-td {{ metrics.metrics.mappedUsed }}
-td {{ metrics.metrics.mappedMax }}
+td {{ metrics.metrics.mappedCount | humanizeBytes }}
--- End diff --

I'll add a Coffee filter to localize the numbers with commas / periods.


> Fix TaskManager metrics
> ---
>
> Key: FLINK-5209
> URL: https://issues.apache.org/jira/browse/FLINK-5209
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.2.0
>
>
> Properly propagate the network and non-JVM memory metrics to the web UI.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5209) Fix TaskManager metrics

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709073#comment-15709073
 ] 

ASF GitHub Bot commented on FLINK-5209:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2902#discussion_r90272935
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
 ---
@@ -124,13 +124,21 @@ public String handleJsonRequest(Map 
pathParams, Map Fix TaskManager metrics
> ---
>
> Key: FLINK-5209
> URL: https://issues.apache.org/jira/browse/FLINK-5209
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.2.0
>
>
> Properly propagate the network and non-JVM memory metrics to the web UI.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5209) Fix TaskManager metrics

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15709074#comment-15709074
 ] 

ASF GitHub Bot commented on FLINK-5209:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2902#discussion_r90273043
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
 ---
@@ -124,13 +124,21 @@ public String handleJsonRequest(Map 
pathParams, Map Fix TaskManager metrics
> ---
>
> Key: FLINK-5209
> URL: https://issues.apache.org/jira/browse/FLINK-5209
> Project: Flink
>  Issue Type: Bug
>  Components: Webfrontend
>Affects Versions: 1.2.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.2.0
>
>
> Properly propagate the network and non-JVM memory metrics to the web UI.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2902: [FLINK-5209] [webfrontend] Fix TaskManager metrics

2016-11-30 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2902#discussion_r90274186
  
--- Diff: 
flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.metrics.jade
 ---
@@ -75,18 +75,18 @@ div(ng-if="metrics.id")
 tbody
   tr
 td Direct
-td {{ metrics.metrics.directCount }}
-td {{ metrics.metrics.directUsed }}
-td {{ metrics.metrics.directTotal }}
+td {{ metrics.metrics.directCount | humanizeBytes }}
+td {{ metrics.metrics.directUsed | humanizeBytes }}
+td {{ metrics.metrics.directMax | humanizeBytes }}
   tr
 td Mapped
-td {{ metrics.metrics.mappedCount }}
-td {{ metrics.metrics.mappedUsed }}
-td {{ metrics.metrics.mappedMax }}
+td {{ metrics.metrics.mappedCount | humanizeBytes }}
--- End diff --

I'll add a Coffee filter to localize the numbers with commas / periods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2902: [FLINK-5209] [webfrontend] Fix TaskManager metrics

2016-11-30 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2902#discussion_r90273043
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
 ---
@@ -124,13 +124,21 @@ public String handleJsonRequest(Map 
pathParams, Map

[GitHub] flink pull request #2902: [FLINK-5209] [webfrontend] Fix TaskManager metrics

2016-11-30 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/2902#discussion_r90272935
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
 ---
@@ -124,13 +124,21 @@ public String handleJsonRequest(Map 
pathParams, Map

[jira] [Commented] (FLINK-5158) Handle ZooKeeperCompletedCheckpointStore exceptions in CheckpointCoordinator

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15708912#comment-15708912
 ] 

ASF GitHub Bot commented on FLINK-5158:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2873#discussion_r90258544
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -731,46 +700,100 @@ public boolean 
receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws E
 

discardState(message.getState());
}
+
+   return true;
}
else if (checkpoint != null) {
// this should not happen
throw new IllegalStateException(
"Received message for discarded 
but non-removed checkpoint " + checkpointId);
}
else {
+   boolean wasPendingCheckpoint;
+
// message is for an unknown checkpoint, or 
comes too late (checkpoint disposed)
if 
(recentPendingCheckpoints.contains(checkpointId)) {
-   isPendingCheckpoint = true;
+   wasPendingCheckpoint = true;
LOG.warn("Received late message for now 
expired checkpoint attempt {}.", checkpointId);
}
else {
LOG.debug("Received message for an 
unknown checkpoint {}.", checkpointId);
-   isPendingCheckpoint = false;
+   wasPendingCheckpoint = false;
}
 
// try to discard the state so that we don't 
have lingering state lying around
discardState(message.getState());
+
+   return wasPendingCheckpoint;
+   }
+   }
+   }
+
+   private void completePendingCheckpoint(PendingCheckpoint 
pendingCheckpoint) throws CheckpointException {
--- End diff --

Missing JavaDocs, maybe add that this needs to be called in checkpoint lock 
scope


> Handle ZooKeeperCompletedCheckpointStore exceptions in CheckpointCoordinator
> 
>
> Key: FLINK-5158
> URL: https://issues.apache.org/jira/browse/FLINK-5158
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> The checkpoint coordinator does not properly handle exceptions when trying to 
> store completed checkpoints. As a result, completed checkpoints are not 
> properly cleaned up and even worse, the {{CheckpointCoordinator}} might get 
> stuck stopping triggering checkpoints.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5158) Handle ZooKeeperCompletedCheckpointStore exceptions in CheckpointCoordinator

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15708913#comment-15708913
 ] 

ASF GitHub Bot commented on FLINK-5158:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2873#discussion_r90258638
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -731,46 +700,100 @@ public boolean 
receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws E
 

discardState(message.getState());
}
+
+   return true;
}
else if (checkpoint != null) {
// this should not happen
throw new IllegalStateException(
"Received message for discarded 
but non-removed checkpoint " + checkpointId);
}
else {
+   boolean wasPendingCheckpoint;
+
// message is for an unknown checkpoint, or 
comes too late (checkpoint disposed)
if 
(recentPendingCheckpoints.contains(checkpointId)) {
-   isPendingCheckpoint = true;
+   wasPendingCheckpoint = true;
LOG.warn("Received late message for now 
expired checkpoint attempt {}.", checkpointId);
}
else {
LOG.debug("Received message for an 
unknown checkpoint {}.", checkpointId);
-   isPendingCheckpoint = false;
+   wasPendingCheckpoint = false;
}
 
// try to discard the state so that we don't 
have lingering state lying around
discardState(message.getState());
+
+   return wasPendingCheckpoint;
+   }
+   }
+   }
+
+   private void completePendingCheckpoint(PendingCheckpoint 
pendingCheckpoint) throws CheckpointException {
+   final long checkpointId = pendingCheckpoint.getCheckpointId();
+   CompletedCheckpoint completedCheckpoint = null;
+
+   try {
+   completedCheckpoint = 
pendingCheckpoint.finalizeCheckpoint();
+
+   
completedCheckpointStore.addCheckpoint(completedCheckpoint);
+
+   rememberRecentCheckpointId(checkpointId);
+   
dropSubsumedCheckpoints(completedCheckpoint.getTimestamp());
+
+   onFullyAcknowledgedCheckpoint(completedCheckpoint);
+   } catch (Exception exception) {
+   // abort the current pending checkpoint if it has not 
been discarded yet
+   if(!pendingCheckpoint.isDiscarded()) {
--- End diff --

missing whitespace after if


> Handle ZooKeeperCompletedCheckpointStore exceptions in CheckpointCoordinator
> 
>
> Key: FLINK-5158
> URL: https://issues.apache.org/jira/browse/FLINK-5158
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> The checkpoint coordinator does not properly handle exceptions when trying to 
> store completed checkpoints. As a result, completed checkpoints are not 
> properly cleaned up and even worse, the {{CheckpointCoordinator}} might get 
> stuck stopping triggering checkpoints.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5158) Handle ZooKeeperCompletedCheckpointStore exceptions in CheckpointCoordinator

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15708911#comment-15708911
 ] 

ASF GitHub Bot commented on FLINK-5158:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2873#discussion_r90258796
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -731,46 +700,100 @@ public boolean 
receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws E
 

discardState(message.getState());
}
+
+   return true;
}
else if (checkpoint != null) {
// this should not happen
throw new IllegalStateException(
"Received message for discarded 
but non-removed checkpoint " + checkpointId);
}
else {
+   boolean wasPendingCheckpoint;
+
// message is for an unknown checkpoint, or 
comes too late (checkpoint disposed)
if 
(recentPendingCheckpoints.contains(checkpointId)) {
-   isPendingCheckpoint = true;
+   wasPendingCheckpoint = true;
LOG.warn("Received late message for now 
expired checkpoint attempt {}.", checkpointId);
}
else {
LOG.debug("Received message for an 
unknown checkpoint {}.", checkpointId);
-   isPendingCheckpoint = false;
+   wasPendingCheckpoint = false;
}
 
// try to discard the state so that we don't 
have lingering state lying around
discardState(message.getState());
+
+   return wasPendingCheckpoint;
+   }
+   }
+   }
+
+   private void completePendingCheckpoint(PendingCheckpoint 
pendingCheckpoint) throws CheckpointException {
+   final long checkpointId = pendingCheckpoint.getCheckpointId();
+   CompletedCheckpoint completedCheckpoint = null;
+
+   try {
+   completedCheckpoint = 
pendingCheckpoint.finalizeCheckpoint();
+
+   
completedCheckpointStore.addCheckpoint(completedCheckpoint);
+
+   rememberRecentCheckpointId(checkpointId);
+   
dropSubsumedCheckpoints(completedCheckpoint.getTimestamp());
+
+   onFullyAcknowledgedCheckpoint(completedCheckpoint);
+   } catch (Exception exception) {
+   // abort the current pending checkpoint if it has not 
been discarded yet
+   if(!pendingCheckpoint.isDiscarded()) {
+   pendingCheckpoint.discard(userClassLoader);
+   }
+
+   if (completedCheckpoint != null) {
+   // we failed to store the completed checkpoint. 
Let's clean up
+   final CompletedCheckpoint cc = 
completedCheckpoint;
+
+   executor.execute(new Runnable() {
+   @Override
+   public void run() {
+   try {
+   
cc.discard(userClassLoader);
+   } catch (Exception 
nestedException) {
+   LOG.warn("Could not 
properly discard completed checkpoint {}.", cc.getCheckpointID(), 
nestedException);
+   }
+   }
+   });
}
+
+   throw new CheckpointException("Could not complete the 
pending checkpoint " + checkpointId + '.', exception);
+   } finally {
+   pendingCheckpoints.remove(checkpointId);
+
+   triggerQueuedRequests();
+   }
+
+   LOG.info("Completed checkpoint {} (in {} ms).", checkpointId, 
completedCheckpoint.getDuration());
+
+   if (LOG.isDebugEnabled()) {
--- End diff --

While rebasing you have to make sure to copy the updated string builder here


> Handle ZooKeeperCompletedCheckpointStore exceptions

[jira] [Commented] (FLINK-5158) Handle ZooKeeperCompletedCheckpointStore exceptions in CheckpointCoordinator

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15708914#comment-15708914
 ] 

ASF GitHub Bot commented on FLINK-5158:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2873#discussion_r90257889
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -651,64 +651,33 @@ public boolean 
receiveDeclineMessage(DeclineCheckpoint message) {
 *
 * @throws Exception If the checkpoint cannot be added to the completed 
checkpoint store.
 */
-   public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) 
throws Exception {
+   public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) 
throws CheckpointException {
if (shutdown || message == null) {
return false;
}
if (!job.equals(message.getJob())) {
-   LOG.error("Received AcknowledgeCheckpoint message for 
wrong job: {}", message);
+   LOG.error("Received wrong AcknowledgeCheckpoint message 
for job {}: {}", job, message);
return false;
}
 
final long checkpointId = message.getCheckpointId();
 
-   CompletedCheckpoint completed = null;
-   PendingCheckpoint checkpoint;
-
-   // Flag indicating whether the ack message was for a known 
pending
-   // checkpoint.
-   boolean isPendingCheckpoint;
-
synchronized (lock) {
// we need to check inside the lock for being shutdown 
as well, otherwise we
// get races and invalid error log messages
if (shutdown) {
return false;
}
 
-   checkpoint = pendingCheckpoints.get(checkpointId);
+   final PendingCheckpoint checkpoint = 
pendingCheckpoints.get(checkpointId);
 
if (checkpoint != null && !checkpoint.isDiscarded()) {
-   isPendingCheckpoint = true;
 
switch 
(checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), 
message.getStateSize(), null)) {
case SUCCESS:
// TODO: Give KV-state to the 
acknowledgeTask method
--- End diff --

Unrelated, but could you remove this TODO since this has been addressed for 
1.2, but probably won't be addressed in 1.1


> Handle ZooKeeperCompletedCheckpointStore exceptions in CheckpointCoordinator
> 
>
> Key: FLINK-5158
> URL: https://issues.apache.org/jira/browse/FLINK-5158
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0, 1.1.3
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.2.0, 1.1.4
>
>
> The checkpoint coordinator does not properly handle exceptions when trying to 
> store completed checkpoints. As a result, completed checkpoints are not 
> properly cleaned up and even worse, the {{CheckpointCoordinator}} might get 
> stuck stopping triggering checkpoints.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2873: [backport] [FLINK-5158] [ckPtCoord] Handle excepti...

2016-11-30 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2873#discussion_r90258638
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -731,46 +700,100 @@ public boolean 
receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws E
 

discardState(message.getState());
}
+
+   return true;
}
else if (checkpoint != null) {
// this should not happen
throw new IllegalStateException(
"Received message for discarded 
but non-removed checkpoint " + checkpointId);
}
else {
+   boolean wasPendingCheckpoint;
+
// message is for an unknown checkpoint, or 
comes too late (checkpoint disposed)
if 
(recentPendingCheckpoints.contains(checkpointId)) {
-   isPendingCheckpoint = true;
+   wasPendingCheckpoint = true;
LOG.warn("Received late message for now 
expired checkpoint attempt {}.", checkpointId);
}
else {
LOG.debug("Received message for an 
unknown checkpoint {}.", checkpointId);
-   isPendingCheckpoint = false;
+   wasPendingCheckpoint = false;
}
 
// try to discard the state so that we don't 
have lingering state lying around
discardState(message.getState());
+
+   return wasPendingCheckpoint;
+   }
+   }
+   }
+
+   private void completePendingCheckpoint(PendingCheckpoint 
pendingCheckpoint) throws CheckpointException {
+   final long checkpointId = pendingCheckpoint.getCheckpointId();
+   CompletedCheckpoint completedCheckpoint = null;
+
+   try {
+   completedCheckpoint = 
pendingCheckpoint.finalizeCheckpoint();
+
+   
completedCheckpointStore.addCheckpoint(completedCheckpoint);
+
+   rememberRecentCheckpointId(checkpointId);
+   
dropSubsumedCheckpoints(completedCheckpoint.getTimestamp());
+
+   onFullyAcknowledgedCheckpoint(completedCheckpoint);
+   } catch (Exception exception) {
+   // abort the current pending checkpoint if it has not 
been discarded yet
+   if(!pendingCheckpoint.isDiscarded()) {
--- End diff --

missing whitespace after if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2873: [backport] [FLINK-5158] [ckPtCoord] Handle excepti...

2016-11-30 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2873#discussion_r90258544
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -731,46 +700,100 @@ public boolean 
receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws E
 

discardState(message.getState());
}
+
+   return true;
}
else if (checkpoint != null) {
// this should not happen
throw new IllegalStateException(
"Received message for discarded 
but non-removed checkpoint " + checkpointId);
}
else {
+   boolean wasPendingCheckpoint;
+
// message is for an unknown checkpoint, or 
comes too late (checkpoint disposed)
if 
(recentPendingCheckpoints.contains(checkpointId)) {
-   isPendingCheckpoint = true;
+   wasPendingCheckpoint = true;
LOG.warn("Received late message for now 
expired checkpoint attempt {}.", checkpointId);
}
else {
LOG.debug("Received message for an 
unknown checkpoint {}.", checkpointId);
-   isPendingCheckpoint = false;
+   wasPendingCheckpoint = false;
}
 
// try to discard the state so that we don't 
have lingering state lying around
discardState(message.getState());
+
+   return wasPendingCheckpoint;
+   }
+   }
+   }
+
+   private void completePendingCheckpoint(PendingCheckpoint 
pendingCheckpoint) throws CheckpointException {
--- End diff --

Missing JavaDocs, maybe add that this needs to be called in checkpoint lock 
scope


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2873: [backport] [FLINK-5158] [ckPtCoord] Handle excepti...

2016-11-30 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2873#discussion_r90258796
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -731,46 +700,100 @@ public boolean 
receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws E
 

discardState(message.getState());
}
+
+   return true;
}
else if (checkpoint != null) {
// this should not happen
throw new IllegalStateException(
"Received message for discarded 
but non-removed checkpoint " + checkpointId);
}
else {
+   boolean wasPendingCheckpoint;
+
// message is for an unknown checkpoint, or 
comes too late (checkpoint disposed)
if 
(recentPendingCheckpoints.contains(checkpointId)) {
-   isPendingCheckpoint = true;
+   wasPendingCheckpoint = true;
LOG.warn("Received late message for now 
expired checkpoint attempt {}.", checkpointId);
}
else {
LOG.debug("Received message for an 
unknown checkpoint {}.", checkpointId);
-   isPendingCheckpoint = false;
+   wasPendingCheckpoint = false;
}
 
// try to discard the state so that we don't 
have lingering state lying around
discardState(message.getState());
+
+   return wasPendingCheckpoint;
+   }
+   }
+   }
+
+   private void completePendingCheckpoint(PendingCheckpoint 
pendingCheckpoint) throws CheckpointException {
+   final long checkpointId = pendingCheckpoint.getCheckpointId();
+   CompletedCheckpoint completedCheckpoint = null;
+
+   try {
+   completedCheckpoint = 
pendingCheckpoint.finalizeCheckpoint();
+
+   
completedCheckpointStore.addCheckpoint(completedCheckpoint);
+
+   rememberRecentCheckpointId(checkpointId);
+   
dropSubsumedCheckpoints(completedCheckpoint.getTimestamp());
+
+   onFullyAcknowledgedCheckpoint(completedCheckpoint);
+   } catch (Exception exception) {
+   // abort the current pending checkpoint if it has not 
been discarded yet
+   if(!pendingCheckpoint.isDiscarded()) {
+   pendingCheckpoint.discard(userClassLoader);
+   }
+
+   if (completedCheckpoint != null) {
+   // we failed to store the completed checkpoint. 
Let's clean up
+   final CompletedCheckpoint cc = 
completedCheckpoint;
+
+   executor.execute(new Runnable() {
+   @Override
+   public void run() {
+   try {
+   
cc.discard(userClassLoader);
+   } catch (Exception 
nestedException) {
+   LOG.warn("Could not 
properly discard completed checkpoint {}.", cc.getCheckpointID(), 
nestedException);
+   }
+   }
+   });
}
+
+   throw new CheckpointException("Could not complete the 
pending checkpoint " + checkpointId + '.', exception);
+   } finally {
+   pendingCheckpoints.remove(checkpointId);
+
+   triggerQueuedRequests();
+   }
+
+   LOG.info("Completed checkpoint {} (in {} ms).", checkpointId, 
completedCheckpoint.getDuration());
+
+   if (LOG.isDebugEnabled()) {
--- End diff --

While rebasing you have to make sure to copy the updated string builder here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with

[GitHub] flink pull request #2873: [backport] [FLINK-5158] [ckPtCoord] Handle excepti...

2016-11-30 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/2873#discussion_r90257889
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -651,64 +651,33 @@ public boolean 
receiveDeclineMessage(DeclineCheckpoint message) {
 *
 * @throws Exception If the checkpoint cannot be added to the completed 
checkpoint store.
 */
-   public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) 
throws Exception {
+   public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) 
throws CheckpointException {
if (shutdown || message == null) {
return false;
}
if (!job.equals(message.getJob())) {
-   LOG.error("Received AcknowledgeCheckpoint message for 
wrong job: {}", message);
+   LOG.error("Received wrong AcknowledgeCheckpoint message 
for job {}: {}", job, message);
return false;
}
 
final long checkpointId = message.getCheckpointId();
 
-   CompletedCheckpoint completed = null;
-   PendingCheckpoint checkpoint;
-
-   // Flag indicating whether the ack message was for a known 
pending
-   // checkpoint.
-   boolean isPendingCheckpoint;
-
synchronized (lock) {
// we need to check inside the lock for being shutdown 
as well, otherwise we
// get races and invalid error log messages
if (shutdown) {
return false;
}
 
-   checkpoint = pendingCheckpoints.get(checkpointId);
+   final PendingCheckpoint checkpoint = 
pendingCheckpoints.get(checkpointId);
 
if (checkpoint != null && !checkpoint.isDiscarded()) {
-   isPendingCheckpoint = true;
 
switch 
(checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), 
message.getStateSize(), null)) {
case SUCCESS:
// TODO: Give KV-state to the 
acknowledgeTask method
--- End diff --

Unrelated, but could you remove this TODO since this has been addressed for 
1.2, but probably won't be addressed in 1.1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2899: Various logging improvements

2016-11-30 Thread uce
Github user uce closed the pull request at:

https://github.com/apache/flink/pull/2899


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3160) Aggregate operator statistics by TaskManager

2016-11-30 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15708872#comment-15708872
 ] 

Ufuk Celebi commented on FLINK-3160:


Maybe I'm misunderstanding something, too. ;-) To me it looks like there is 
currently no difference between viewing subtasks via the Subtasks or the 
TaskManagers tab. I thought that we introduced the TaskManagers tab for users 
who want to order all subtasks by host. But I think we should have kept the 
Subtasks tab sorted by subtask index, which is what we had before, I think. 
Does this make sense?

---

My last sentence ("Ideally, ...") was meant as a future improvement we could 
do: there is only one tab, but the user can change the sort order dynamically 
by clicking on the head line. But this is an independent thing.

> Aggregate operator statistics by TaskManager
> 
>
> Key: FLINK-3160
> URL: https://issues.apache.org/jira/browse/FLINK-3160
> Project: Flink
>  Issue Type: Improvement
>  Components: Webfrontend
>Affects Versions: 1.0.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.0.0
>
>
> The web client job info page presents a table of the following per task 
> statistics: start time, end time, duration, bytes received, records received, 
> bytes sent, records sent, attempt, host, status.
> Flink supports clusters with thousands of slots and a job setting a high 
> parallelism renders this job info page unwieldy and difficult to analyze in 
> real-time.
> It would be helpful to optionally or automatically aggregate statistics by 
> TaskManager. These rows could then be expanded to reveal the current per task 
> statistics.
> Start time, end time, duration, and attempt are not applicable to a 
> TaskManager since new tasks for repeated attempts may be started. Bytes 
> received, records received, bytes sent, and records sent are summed. Any 
> throughput metrics can be averaged over the total task time or time window. 
> Status could reference the number of running tasks on the TaskManager or an 
> idle state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-5213) Missing @Override in Task

2016-11-30 Thread Ufuk Celebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ufuk Celebi closed FLINK-5213.
--
Resolution: Fixed

Fixed in 8cdb406 (master).

> Missing @Override in Task
> -
>
> Key: FLINK-5213
> URL: https://issues.apache.org/jira/browse/FLINK-5213
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>Priority: Trivial
> Fix For: 1.2.0
>
>
> {{Task}} implements {{TaskActions}} and overrides {{failExternally}}, but it 
> doesn't have a {{@Override}} annotation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5213) Missing @Override in Task

2016-11-30 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-5213:
--

 Summary: Missing @Override in Task
 Key: FLINK-5213
 URL: https://issues.apache.org/jira/browse/FLINK-5213
 Project: Flink
  Issue Type: Improvement
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi
Priority: Trivial
 Fix For: 1.2.0


{{Task}} implements {{TaskActions}} and overrides {{failExternally}}, but it 
doesn't have a {{@Override}} annotation.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2899: Various logging improvements

2016-11-30 Thread uce
Github user uce commented on the issue:

https://github.com/apache/flink/pull/2899
  
Added `FlinkUntypedActor` as well and addressed the comments, Till. I would 
like to merge this now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2912: [FLINK-5114] [network] Handle partition producer s...

2016-11-30 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/2912

[FLINK-5114] [network] Handle partition producer state check for 
unregistered executions

If a partition state request is triggered for a producer that terminates 
before the request arrives, the execution is unregistered and the producer 
cannot be found. In this case the partition state returns `null` and the job 
fails although this is perfectly legal.

For these cases, we look up the respective intermediate result partition 
and find the producing execution manually instead of looking it up via the 
registered executions.

I've removed some unused message parameters that have become obsolete with 
other recent refactorings.

This adds a hash map to `IntermediateResult` for lookups by partition ID. I 
didn't dare to change the partition connect logic in other places that is 
tightly coupled to the partitions being held as an array. As an alternative, we 
could to a linear scan over the partitions as this happens rarely. The memory 
overhead for the hash map should be acceptable as it's created per produced 
result and only has entries for each partition.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 5114-partition_state

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2912.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2912


commit 6308ff0aba49f026c23c67af4a2f3943b16f2b31
Author: Ufuk Celebi 
Date:   2016-11-22T15:15:04Z

[FLINK-5114] [network] Handle partition producer state check for 
unregistered executions




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2913: [backport] [FLINK-5114] [network] Handle partition...

2016-11-30 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/2913

[backport] [FLINK-5114] [network] Handle partition producer state check for 
unregistered executions

This is a backport of #2912.

Code between 1.1 and 1.2 slightly changed. I decided to further backport 
the new callback method via `TaskActions`.

This would be important to get into 1.1.4.

\cc @tillrohrmann 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 5114-partition_state-1.1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2913.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2913


commit 628c6e63c424ef11f7d650f8e88ea50af515fb84
Author: Ufuk Celebi 
Date:   2016-11-30T14:09:44Z

[FLINK-5114] [network] Handle partition producer state check for 
unregistered executions




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5114) PartitionState update with finished execution fails

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15708824#comment-15708824
 ] 

ASF GitHub Bot commented on FLINK-5114:
---

GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/2913

[backport] [FLINK-5114] [network] Handle partition producer state check for 
unregistered executions

This is a backport of #2912.

Code between 1.1 and 1.2 slightly changed. I decided to further backport 
the new callback method via `TaskActions`.

This would be important to get into 1.1.4.

\cc @tillrohrmann 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 5114-partition_state-1.1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2913.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2913


commit 628c6e63c424ef11f7d650f8e88ea50af515fb84
Author: Ufuk Celebi 
Date:   2016-11-30T14:09:44Z

[FLINK-5114] [network] Handle partition producer state check for 
unregistered executions




> PartitionState update with finished execution fails
> ---
>
> Key: FLINK-5114
> URL: https://issues.apache.org/jira/browse/FLINK-5114
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> If a partition state request is triggered for a producer that finishes before 
> the request arrives, the execution is unregistered and the producer cannot be 
> found. In this case the PartitionState returns null and the job fails.
> We need to check the producer location via the intermediate result partition 
> in this case.
> See here: https://api.travis-ci.org/jobs/177668505/log.txt?deansi=true



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5166) TextInputFormatTest.testNestedFileRead

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15708815#comment-15708815
 ] 

ASF GitHub Bot commented on FLINK-5166:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2876#discussion_r90249472
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java 
---
@@ -90,23 +92,23 @@ public void testSimpleRead() {
 
@Test
public void testNestedFileRead() {
-   String[] dirs = new String[] {"tmp/first/", "tmp/second/"};
-   List expectedFiles = new ArrayList<>();
-
try {
-   for (String dir: dirs) {
+   String tmpDirPath = 
CommonTestUtils.createTempDirectory().getPath();
+   String[] dirs = new String[]{tmpDirPath + "/first/", 
tmpDirPath + "/second/"};
+   List expectedFiles = new ArrayList<>();
+   for (String dir : dirs) {
// create input file
File tmpDir = new File(dir);
-   if (!tmpDir.exists()) {
-   tmpDir.mkdirs();
+   if (!tmpDir.exists() && tmpDir.mkdirs()) {
+   tmpDir.deleteOnExit();
}
 
--- End diff --

this section could be a bit cleaner imo:
```
File parentDir = CommonTestUtils.createTempDirectory();
String[] dirs = new String[]{"first", "second"};
...
File tmpDir= new File(parentDir , dir);
```


> TextInputFormatTest.testNestedFileRead
> --
>
> Key: FLINK-5166
> URL: https://issues.apache.org/jira/browse/FLINK-5166
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, Tests
>Reporter: shijinkui
>
> `mvn clean package -P \!scala-2.11,scala-2.11  -U`
> Failed tests:
>   TextInputFormatTest.testNestedFileRead:140 Test erroneous
> Tests run: 846, Failures: 1, Errors: 0, Skipped: 0



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2876: [FLINK-5166] TextInputFormatTest.testNestedFileRea...

2016-11-30 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2876#discussion_r90249472
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java 
---
@@ -90,23 +92,23 @@ public void testSimpleRead() {
 
@Test
public void testNestedFileRead() {
-   String[] dirs = new String[] {"tmp/first/", "tmp/second/"};
-   List expectedFiles = new ArrayList<>();
-
try {
-   for (String dir: dirs) {
+   String tmpDirPath = 
CommonTestUtils.createTempDirectory().getPath();
+   String[] dirs = new String[]{tmpDirPath + "/first/", 
tmpDirPath + "/second/"};
+   List expectedFiles = new ArrayList<>();
+   for (String dir : dirs) {
// create input file
File tmpDir = new File(dir);
-   if (!tmpDir.exists()) {
-   tmpDir.mkdirs();
+   if (!tmpDir.exists() && tmpDir.mkdirs()) {
+   tmpDir.deleteOnExit();
}
 
--- End diff --

this section could be a bit cleaner imo:
```
File parentDir = CommonTestUtils.createTempDirectory();
String[] dirs = new String[]{"first", "second"};
...
File tmpDir= new File(parentDir , dir);
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5114) PartitionState update with finished execution fails

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15708810#comment-15708810
 ] 

ASF GitHub Bot commented on FLINK-5114:
---

GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/2912

[FLINK-5114] [network] Handle partition producer state check for 
unregistered executions

If a partition state request is triggered for a producer that terminates 
before the request arrives, the execution is unregistered and the producer 
cannot be found. In this case the partition state returns `null` and the job 
fails although this is perfectly legal.

For these cases, we look up the respective intermediate result partition 
and find the producing execution manually instead of looking it up via the 
registered executions.

I've removed some unused message parameters that have become obsolete with 
other recent refactorings.

This adds a hash map to `IntermediateResult` for lookups by partition ID. I 
didn't dare to change the partition connect logic in other places that is 
tightly coupled to the partitions being held as an array. As an alternative, we 
could to a linear scan over the partitions as this happens rarely. The memory 
overhead for the hash map should be acceptable as it's created per produced 
result and only has entries for each partition.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/flink 5114-partition_state

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2912.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2912


commit 6308ff0aba49f026c23c67af4a2f3943b16f2b31
Author: Ufuk Celebi 
Date:   2016-11-22T15:15:04Z

[FLINK-5114] [network] Handle partition producer state check for 
unregistered executions




> PartitionState update with finished execution fails
> ---
>
> Key: FLINK-5114
> URL: https://issues.apache.org/jira/browse/FLINK-5114
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Reporter: Ufuk Celebi
>Assignee: Ufuk Celebi
>
> If a partition state request is triggered for a producer that finishes before 
> the request arrives, the execution is unregistered and the producer cannot be 
> found. In this case the PartitionState returns null and the job fails.
> We need to check the producer location via the intermediate result partition 
> in this case.
> See here: https://api.travis-ci.org/jobs/177668505/log.txt?deansi=true



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5212) Flakey ScalaShellITCase#testPreventRecreationBatch (out of Java heap space)

2016-11-30 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-5212:
--

 Summary: Flakey ScalaShellITCase#testPreventRecreationBatch (out 
of Java heap space)
 Key: FLINK-5212
 URL: https://issues.apache.org/jira/browse/FLINK-5212
 Project: Flink
  Issue Type: Bug
  Components: Flink on Tez, Scala Shell
Affects Versions: 2.0.0
 Environment: TravisCI
Reporter: Nico Kruber


{code:none}
Tests run: 9, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 255.399 sec <<< 
FAILURE! - in org.apache.flink.api.scala.ScalaShellITCase
testPreventRecreationBatch(org.apache.flink.api.scala.ScalaShellITCase)  Time 
elapsed: 198.128 sec  <<< ERROR!
java.lang.OutOfMemoryError: Java heap space
at scala.reflect.internal.Names$class.enterChars(Names.scala:70)
at scala.reflect.internal.Names$class.body$1(Names.scala:116)
at scala.reflect.internal.Names$class.newTermName(Names.scala:127)
at scala.reflect.internal.SymbolTable.newTermName(SymbolTable.scala:16)
at scala.reflect.internal.Names$class.newTermName(Names.scala:83)
at scala.reflect.internal.SymbolTable.newTermName(SymbolTable.scala:16)
at scala.reflect.internal.Names$class.newTermName(Names.scala:144)
at scala.reflect.internal.SymbolTable.newTermName(SymbolTable.scala:16)
at 
scala.tools.nsc.symtab.classfile.ClassfileParser$ConstantPool.getName(ClassfileParser.scala:206)
at 
scala.tools.nsc.symtab.classfile.ClassfileParser$ConstantPool.getExternalName(ClassfileParser.scala:216)
at 
scala.tools.nsc.symtab.classfile.ClassfileParser$ConstantPool.getType(ClassfileParser.scala:286)
at 
scala.tools.nsc.symtab.classfile.ClassfileParser.parseMethod(ClassfileParser.scala:565)
at 
scala.tools.nsc.symtab.classfile.ClassfileParser.scala$tools$nsc$symtab$classfile$ClassfileParser$$queueLoad$1(ClassfileParser.scala:480)
at 
scala.tools.nsc.symtab.classfile.ClassfileParser$$anonfun$parseClass$1.apply$mcV$sp(ClassfileParser.scala:490)
at 
scala.tools.nsc.symtab.classfile.ClassfileParser.parseClass(ClassfileParser.scala:495)
at 
scala.tools.nsc.symtab.classfile.ClassfileParser.parse(ClassfileParser.scala:136)
at 
scala.tools.nsc.symtab.SymbolLoaders$ClassfileLoader$$anonfun$doComplete$2.apply$mcV$sp(SymbolLoaders.scala:347)
at 
scala.tools.nsc.symtab.SymbolLoaders$ClassfileLoader$$anonfun$doComplete$2.apply(SymbolLoaders.scala:347)
at 
scala.tools.nsc.symtab.SymbolLoaders$ClassfileLoader$$anonfun$doComplete$2.apply(SymbolLoaders.scala:347)
at 
scala.reflect.internal.SymbolTable.enteringPhase(SymbolTable.scala:235)
at 
scala.tools.nsc.symtab.SymbolLoaders$ClassfileLoader.doComplete(SymbolLoaders.scala:347)
at 
scala.tools.nsc.symtab.SymbolLoaders$SymbolLoader.complete(SymbolLoaders.scala:211)
at 
scala.tools.nsc.symtab.SymbolLoaders$SymbolLoader.load(SymbolLoaders.scala:227)
at scala.reflect.internal.Symbols$Symbol.typeParams(Symbols.scala:1708)
at 
scala.reflect.internal.Types$NoArgsTypeRef.typeParams(Types.scala:1926)
at 
scala.reflect.internal.Types$NoArgsTypeRef.isHigherKinded(Types.scala:1925)
at 
scala.reflect.internal.transform.UnCurry$class.scala$reflect$internal$transform$UnCurry$$expandAlias(UnCurry.scala:22)
at 
scala.reflect.internal.transform.UnCurry$$anon$2.apply(UnCurry.scala:26)
at 
scala.reflect.internal.tpe.TypeMaps$TypeMap.applyToSymbolInfo(TypeMaps.scala:218)
at 
scala.reflect.internal.tpe.TypeMaps$TypeMap.loop$1(TypeMaps.scala:227)
at 
scala.reflect.internal.tpe.TypeMaps$TypeMap.noChangeToSymbols(TypeMaps.scala:229)
at 
scala.reflect.internal.tpe.TypeMaps$TypeMap.mapOver(TypeMaps.scala:243)


Results :

Tests in error: 
  ScalaShellITCase.testPreventRecreationBatch » OutOfMemory Java heap space
{code}

stdout:
https://api.travis-ci.org/jobs/180090640/log.txt?deansi=true

full logs:
https://transfer.sh/nu2wr/34.1.tar.gz



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-5211) Include an example configuration for all reporters

2016-11-30 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-5211:
---

 Summary: Include an example configuration for all reporters
 Key: FLINK-5211
 URL: https://issues.apache.org/jira/browse/FLINK-5211
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Metrics
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler
 Fix For: 1.2.0


We should extend the reporter documentation to include an example configuration 
for every reporter.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-5166) TextInputFormatTest.testNestedFileRead

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15708780#comment-15708780
 ] 

ASF GitHub Bot commented on FLINK-5166:
---

Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/2876#discussion_r90246314
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java 
---
@@ -90,7 +91,9 @@ public void testSimpleRead() {
 
@Test
public void testNestedFileRead() {
-   String[] dirs = new String[] {"tmp/first/", "tmp/second/"};
+   String tmpDirName = "tmp-" + 
UUID.randomUUID().toString().substring(0, 8);
--- End diff --

CommonTestUtils in core package have no function createTempDirectory, have 
add one.


> TextInputFormatTest.testNestedFileRead
> --
>
> Key: FLINK-5166
> URL: https://issues.apache.org/jira/browse/FLINK-5166
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, Tests
>Reporter: shijinkui
>
> `mvn clean package -P \!scala-2.11,scala-2.11  -U`
> Failed tests:
>   TextInputFormatTest.testNestedFileRead:140 Test erroneous
> Tests run: 846, Failures: 1, Errors: 0, Skipped: 0



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2876: [FLINK-5166] TextInputFormatTest.testNestedFileRea...

2016-11-30 Thread shijinkui
Github user shijinkui commented on a diff in the pull request:

https://github.com/apache/flink/pull/2876#discussion_r90246314
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java 
---
@@ -90,7 +91,9 @@ public void testSimpleRead() {
 
@Test
public void testNestedFileRead() {
-   String[] dirs = new String[] {"tmp/first/", "tmp/second/"};
+   String tmpDirName = "tmp-" + 
UUID.randomUUID().toString().substring(0, 8);
--- End diff --

CommonTestUtils in core package have no function createTempDirectory, have 
add one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5178) allow BLOB_STORAGE_DIRECTORY_KEY to point to a distributed file system

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15708766#comment-15708766
 ] 

ASF GitHub Bot commented on FLINK-5178:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/2911
  
@uce can you have a look after processing #2891 (FLINK-5129)?


> allow BLOB_STORAGE_DIRECTORY_KEY to point to a distributed file system
> --
>
> Key: FLINK-5178
> URL: https://issues.apache.org/jira/browse/FLINK-5178
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> After FLINK-5129, high availability (HA) mode adds the ability for the 
> BlobCache instances at the task managers to download blobs directly from the 
> distributed file system. It would be nice if this also worked in non-HA mode 
> and BLOB_STORAGE_DIRECTORY_KEY may point to a distributed file system.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4895) Drop support for Hadoop 1

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15708769#comment-15708769
 ] 

ASF GitHub Bot commented on FLINK-4895:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2850


> Drop support for Hadoop 1
> -
>
> Key: FLINK-4895
> URL: https://issues.apache.org/jira/browse/FLINK-4895
> Project: Flink
>  Issue Type: Task
>  Components: Build System
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.2.0
>
>
> As per this mailing list discussion: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Drop-Hadoop-1-support-with-Flink-1-2-td9530.html
>  the community agreed to drop support for Hadoop 1.
> The task includes
> - removing the hadoop-1 / hadoop-2 build profiles, 
> - removing the scripts for generating hadoop-x poms
> - updating the release script
> - updating the nightly build script
> - updating the travis configuration file
> - updating the documentation
> - updating the website



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4895) Drop support for Hadoop 1

2016-11-30 Thread Robert Metzger (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Metzger resolved FLINK-4895.
---
   Resolution: Fixed
Fix Version/s: 1.2.0

Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/ac7d8715

> Drop support for Hadoop 1
> -
>
> Key: FLINK-4895
> URL: https://issues.apache.org/jira/browse/FLINK-4895
> Project: Flink
>  Issue Type: Task
>  Components: Build System
>Affects Versions: 1.2.0
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.2.0
>
>
> As per this mailing list discussion: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/DISCUSS-Drop-Hadoop-1-support-with-Flink-1-2-td9530.html
>  the community agreed to drop support for Hadoop 1.
> The task includes
> - removing the hadoop-1 / hadoop-2 build profiles, 
> - removing the scripts for generating hadoop-x poms
> - updating the release script
> - updating the nightly build script
> - updating the travis configuration file
> - updating the documentation
> - updating the website



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2911: [FLINK-5178] allow BLOB_STORAGE_DIRECTORY_KEY to point to...

2016-11-30 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/2911
  
@uce can you have a look after processing #2891 (FLINK-5129)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2850: [FLINK-4895] Drop Hadoop1 support

2016-11-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2850


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-5178) allow BLOB_STORAGE_DIRECTORY_KEY to point to a distributed file system

2016-11-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15708761#comment-15708761
 ] 

ASF GitHub Bot commented on FLINK-5178:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/2911

[FLINK-5178] allow BLOB_STORAGE_DIRECTORY_KEY to point to a distributed 
file system

Previously, this was restricted to a local file system path but now we can
allow it to be distributed, too, which is therefore not restricted to HA
mode anymore.

Example for hdfs: `blob.storage.directory=hdfs:///flink/data/`

Unfortunately, we cannot detect the case when a locally-mounted distributed
file system is used. In this case, we require the user to give us a hint, 
e.g.:
`blob.storage.directory=dfs:///flink/data/`
for a file system mounted to `/flink/data/`. If this hint is missing, each
job manager and task manager will create its own unique storage directory 
under
this path and files will be requested from the blob server at the job 
manager
as usual, i.e. the task manager requests a blob from the blob server which
reads the file from the file system and sends it back where it is stored in
the task manager's individual storage path of the same file system.

BEWARE: If HA mode is configured and a local file system without the 
`dfs://`
hint is given as HA_STORAGE_PATH, an IllegalConfigurationException will be
thrown!

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink FLINK-5178

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2911.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2911


commit b65e74dd92bdf74b2816a0d8a26a5ebaa25ca586
Author: Nico Kruber 
Date:   2016-11-22T11:49:03Z

[hotfix] remove unused package-private BlobUtils#copyFromRecoveryPath

This was actually the same implementation as
FileSystemBlobStore#get(java.lang.String, java.io.File) and either of the 
two
could have been removed but the implementation makes most sense at the
concrete file system abstraction layer, i.e. in FileSystemBlobStore.

commit 09bdd49e6282268fd9c1b2672f0ea6222e097ca2
Author: Nico Kruber 
Date:   2016-11-23T15:11:35Z

[hotfix] do not create intermediate strings inside String.format in 
BlobUtils

commit 93938ff97fef9e39c17ac795e1e89ca9de25e028
Author: Nico Kruber 
Date:   2016-11-24T16:11:19Z

[hotfix] properly shut down the BlobServer in BlobServerRangeTest

commit c0c9d2239a767154d6071171d4c33e762e01aa62
Author: Nico Kruber 
Date:   2016-11-24T17:50:43Z

[FLINK-5129] BlobServer: include the cluster id in the HA storage path for 
blobs

Also use JUnit's TemporaryFolder in BlobRecoveryITCase, too. This makes
cleaning up simpler.

commit 8b9c7d9fd6e1ab3c7f2175a31d0e29b41b01cc61
Author: Nico Kruber 
Date:   2016-11-23T18:50:52Z

[FLINK-5129] make the BlobCache use the HA filesystem back-end properly

Previously, the BlobServer holds a local copy and in case high availability 
(HA)
is set, it also copies jar files to a distributed file system. Upon restore,
these files are copied to local store from which they are used.

This commit abstracts the BlobServer's backing file system and makes it use 
the
distributed file system directly in HA mode, i.e. without the local file 
system
copy. Other than that the behaviour does not change.

commit 249b2ea48f19c54498faa56ad45d299efaad4521
Author: Nico Kruber 
Date:   2016-11-25T16:42:05Z

[FLINK-5129] make the BlobCache also use a distributed file system in HA 
mode

* re-factor the file system abstraction in FileSystemBlobStore so that it 
can
  be used by the task managers, too, which should not be able to delete 
files
  in a distributed file system shared among different nodes
* only download blobs from the blob server if not in HA mode or the 
distributed
  file system is not accessible by the BlobCache, e.g. at the task managers

commit dd69f65a47205eb55ac8cc2c8f3aa9f7232dc8ba
Author: Nico Kruber 
Date:   2016-11-28T10:42:13Z

[FLINK-5129] restore non-HA mode unique directory setup in the blob server 
and cache

If not in high availability mode, local (and now also distributed) file 
systems
again try to set up a unique directory structure so that other instances 
with
the same configuration file or storage path do not interfere.

This was lost in 8b9c7d9fd6.

commit 76ccc9ffaaa63d6e0bd55ba7f6c08f8c1cff98cb
Author: Nico Kruber 
Date:   2016-11-28T15:19:20Z

[hotfix] add a missing "'" to FileSystemBlobStore

commit 53702add38d1087062e84a7e804b08920dfc0c23
Author: Nico Kruber 
Date:   2016

[GitHub] flink pull request #2911: [FLINK-5178] allow BLOB_STORAGE_DIRECTORY_KEY to p...

2016-11-30 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/2911

[FLINK-5178] allow BLOB_STORAGE_DIRECTORY_KEY to point to a distributed 
file system

Previously, this was restricted to a local file system path but now we can
allow it to be distributed, too, which is therefore not restricted to HA
mode anymore.

Example for hdfs: `blob.storage.directory=hdfs:///flink/data/`

Unfortunately, we cannot detect the case when a locally-mounted distributed
file system is used. In this case, we require the user to give us a hint, 
e.g.:
`blob.storage.directory=dfs:///flink/data/`
for a file system mounted to `/flink/data/`. If this hint is missing, each
job manager and task manager will create its own unique storage directory 
under
this path and files will be requested from the blob server at the job 
manager
as usual, i.e. the task manager requests a blob from the blob server which
reads the file from the file system and sends it back where it is stored in
the task manager's individual storage path of the same file system.

BEWARE: If HA mode is configured and a local file system without the 
`dfs://`
hint is given as HA_STORAGE_PATH, an IllegalConfigurationException will be
thrown!

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink FLINK-5178

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2911.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2911


commit b65e74dd92bdf74b2816a0d8a26a5ebaa25ca586
Author: Nico Kruber 
Date:   2016-11-22T11:49:03Z

[hotfix] remove unused package-private BlobUtils#copyFromRecoveryPath

This was actually the same implementation as
FileSystemBlobStore#get(java.lang.String, java.io.File) and either of the 
two
could have been removed but the implementation makes most sense at the
concrete file system abstraction layer, i.e. in FileSystemBlobStore.

commit 09bdd49e6282268fd9c1b2672f0ea6222e097ca2
Author: Nico Kruber 
Date:   2016-11-23T15:11:35Z

[hotfix] do not create intermediate strings inside String.format in 
BlobUtils

commit 93938ff97fef9e39c17ac795e1e89ca9de25e028
Author: Nico Kruber 
Date:   2016-11-24T16:11:19Z

[hotfix] properly shut down the BlobServer in BlobServerRangeTest

commit c0c9d2239a767154d6071171d4c33e762e01aa62
Author: Nico Kruber 
Date:   2016-11-24T17:50:43Z

[FLINK-5129] BlobServer: include the cluster id in the HA storage path for 
blobs

Also use JUnit's TemporaryFolder in BlobRecoveryITCase, too. This makes
cleaning up simpler.

commit 8b9c7d9fd6e1ab3c7f2175a31d0e29b41b01cc61
Author: Nico Kruber 
Date:   2016-11-23T18:50:52Z

[FLINK-5129] make the BlobCache use the HA filesystem back-end properly

Previously, the BlobServer holds a local copy and in case high availability 
(HA)
is set, it also copies jar files to a distributed file system. Upon restore,
these files are copied to local store from which they are used.

This commit abstracts the BlobServer's backing file system and makes it use 
the
distributed file system directly in HA mode, i.e. without the local file 
system
copy. Other than that the behaviour does not change.

commit 249b2ea48f19c54498faa56ad45d299efaad4521
Author: Nico Kruber 
Date:   2016-11-25T16:42:05Z

[FLINK-5129] make the BlobCache also use a distributed file system in HA 
mode

* re-factor the file system abstraction in FileSystemBlobStore so that it 
can
  be used by the task managers, too, which should not be able to delete 
files
  in a distributed file system shared among different nodes
* only download blobs from the blob server if not in HA mode or the 
distributed
  file system is not accessible by the BlobCache, e.g. at the task managers

commit dd69f65a47205eb55ac8cc2c8f3aa9f7232dc8ba
Author: Nico Kruber 
Date:   2016-11-28T10:42:13Z

[FLINK-5129] restore non-HA mode unique directory setup in the blob server 
and cache

If not in high availability mode, local (and now also distributed) file 
systems
again try to set up a unique directory structure so that other instances 
with
the same configuration file or storage path do not interfere.

This was lost in 8b9c7d9fd6.

commit 76ccc9ffaaa63d6e0bd55ba7f6c08f8c1cff98cb
Author: Nico Kruber 
Date:   2016-11-28T15:19:20Z

[hotfix] add a missing "'" to FileSystemBlobStore

commit 53702add38d1087062e84a7e804b08920dfc0c23
Author: Nico Kruber 
Date:   2016-11-28T15:41:11Z

[FLINK-5129] move path-related methods from BlobUtils to 
FileSystemBlobStore and cleanup unused methods

commit d45e4615f422ff3cf1b66e6388a0929e366df128
Author: Nico Kruber 
Date:   2016-11-29T15:50:57Z

[FLINK-5129] BlobService#

[GitHub] flink issue #2899: Various logging improvements

2016-11-30 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2899
  
We should also set the received and handled message in 
`FlinkUntypedActor#onReceive` to trace.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >