[jira] [Updated] (FLINK-7970) SlotPool support batch allocating slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7970:

Description: 
Now all slot allocation is one by one from execution to slot pool. If batch 
allocate a number of slots to slot pool, then slot pool can assign its cached 
slots according to the global slot requests of all executions, so it can make 
an optimal matching between slot and execution, this is especially usefully for 
failover. For example, it can assign a slot to the execution whose state is 
just on the machine when failover.
!https://issues.apache.org/jira/secure/attachment/12895566/sp.jpg!

  was:
Now all slot allocation is one by one from execution to slot pool. If batch 
allocate a number of slots to slot pool, then slot pool can assign its cached 
slots according to the global slot requests of all executions, so it can make 
an optimal matching between slot and execution, this is especially usefully for 
failover. For example, it can assign a slot to the execution whose state is 
just on the machine when failover.
!sp.jpg|thumbnail!


> SlotPool support batch allocating slots
> ---
>
> Key: FLINK-7970
> URL: https://issues.apache.org/jira/browse/FLINK-7970
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Attachments: sp.jpg
>
>
> Now all slot allocation is one by one from execution to slot pool. If batch 
> allocate a number of slots to slot pool, then slot pool can assign its cached 
> slots according to the global slot requests of all executions, so it can make 
> an optimal matching between slot and execution, this is especially usefully 
> for failover. For example, it can assign a slot to the execution whose state 
> is just on the machine when failover.
> !https://issues.apache.org/jira/secure/attachment/12895566/sp.jpg!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7970) SlotPool support batch allocating slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7970:

Attachment: sp.jpg

> SlotPool support batch allocating slots
> ---
>
> Key: FLINK-7970
> URL: https://issues.apache.org/jira/browse/FLINK-7970
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Attachments: sp.jpg
>
>
> Now all slot allocation is one by one from execution to slot pool. If batch 
> allocate a number of slots to slot pool, then slot pool can assign its cached 
> slots according to the global slot requests of all executions, so it can make 
> an optimal matching between slot and execution, this is especially usefully 
> for failover. For example, it can assign a slot to the execution whose state 
> is just on the machine when failover.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7970) SlotPool support batch allocating slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7970:

Description: 
Now all slot allocation is one by one from execution to slot pool. If batch 
allocate a number of slots to slot pool, then slot pool can assign its cached 
slots according to the global slot requests of all executions, so it can make 
an optimal matching between slot and execution, this is especially usefully for 
failover. For example, it can assign a slot to the execution whose state is 
just on the machine when failover.
!sp.jpg|thumbnail!

  was:
Now all slot allocation is one by one from execution to slot pool. If batch 
allocate a number of slots to slot pool, then slot pool can assign its cached 
slots according to the global slot requests of all executions, so it can make 
an optimal matching between slot and execution, this is especially usefully for 
failover. For example, it can assign a slot to the execution whose state is 
just on the machine when failover.



> SlotPool support batch allocating slots
> ---
>
> Key: FLINK-7970
> URL: https://issues.apache.org/jira/browse/FLINK-7970
> Project: Flink
>  Issue Type: Improvement
>  Components: JobManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Attachments: sp.jpg
>
>
> Now all slot allocation is one by one from execution to slot pool. If batch 
> allocate a number of slots to slot pool, then slot pool can assign its cached 
> slots according to the global slot requests of all executions, so it can make 
> an optimal matching between slot and execution, this is especially usefully 
> for failover. For example, it can assign a slot to the execution whose state 
> is just on the machine when failover.
> !sp.jpg|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7970) SlotPool support batch allocating slots

2017-11-02 Thread shuai.xu (JIRA)
shuai.xu created FLINK-7970:
---

 Summary: SlotPool support batch allocating slots
 Key: FLINK-7970
 URL: https://issues.apache.org/jira/browse/FLINK-7970
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Reporter: shuai.xu
Assignee: shuai.xu


Now all slot allocation is one by one from execution to slot pool. If batch 
allocate a number of slots to slot pool, then slot pool can assign its cached 
slots according to the global slot requests of all executions, so it can make 
an optimal matching between slot and execution, this is especially usefully for 
failover. For example, it can assign a slot to the execution whose state is 
just on the machine when failover.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7866) Weigh list of preferred locations for scheduling

2017-11-02 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16237111#comment-16237111
 ] 

Sihua Zhou commented on FLINK-7866:
---

[~till.rohrmann] thanks and so happy for thinking of me when you plan to 
revision scheduler, I do have some thought about a properly scheduler, works 
incrementally is a nice requirement as you have explained. IMO, it also need 
the follow features.

1, Strong Extendibility.
This means that scheduler should be easy to extends for other 
`schedule-factor` not only just for state & inputs, E.g: TM's status and 
cluster's status.

2, Consider the Runtime Information of the job.
This means that when do scheduling we need to consider the previous 
runtime information of the execution, the runtime information should contains 
but only `task manager location`, `state size`, `input flow rate`, 
`thoughtput`, I think these will be helpful for scheduler. For example, imagine 
that vertex `A`,`B` both connect to vertex 'C' with `forward` and if A's 
`thoughtput` was `1M \ s` and B's `thoughput` was `100M\s`, than B's slot will 
be picked for 'C'. Currently, runtime information can only be filled when we do 
recover, is there a chance that the scheduler can Self-Regulating, dynamic 
change the schdule result. Ah, what I want to express is runtime information of 
execution is helpful.

I am looking forward to your plan and interest in it :)

> Weigh list of preferred locations for scheduling
> 
>
> Key: FLINK-7866
> URL: https://issues.apache.org/jira/browse/FLINK-7866
> Project: Flink
>  Issue Type: Improvement
>  Components: Scheduler
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> [~sihuazhou] proposed to not only use the list of preferred locations to 
> decide where to schedule a task, but to also weigh the list according to how 
> often a location appeared and then select the location based on the weight. 
> That way, we would obtain better locality in some cases.
> Example:
> Preferred locations list: {{[location1, location2, location2]}}
> Weighted preferred locations list {{[(location2 , 2), (location1, 1)]}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu reassigned FLINK-7969:
---

Assignee: shuai.xu

> Resource manager support batch request slots
> 
>
> Key: FLINK-7969
> URL: https://issues.apache.org/jira/browse/FLINK-7969
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now resource manager only support requesting slot one by one, it's better to 
> make it support batch allocating alots, so that it can make a global decision 
> with all the resource requests. For example: it can decide how many slots 
> should be put into one task manager.
> !attachment-name.jpg|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7969:

Labels: flip-6  (was: )

> Resource manager support batch request slots
> 
>
> Key: FLINK-7969
> URL: https://issues.apache.org/jira/browse/FLINK-7969
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Reporter: shuai.xu
>  Labels: flip-6
>
> Now resource manager only support requesting slot one by one, it's better to 
> make it support batch allocating alots, so that it can make a global decision 
> with all the resource requests. For example: it can decide how many slots 
> should be put into one task manager.
> !attachment-name.jpg|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)
shuai.xu created FLINK-7969:
---

 Summary: Resource manager support batch request slots
 Key: FLINK-7969
 URL: https://issues.apache.org/jira/browse/FLINK-7969
 Project: Flink
  Issue Type: Improvement
  Components: ResourceManager
Reporter: shuai.xu


Now resource manager only support requesting slot one by one, it's better to 
make it support batch allocating alots, so that it can make a global decision 
with all the resource requests. For example: it can decide how many slots 
should be put into one task manager.
!attachment-name.jpg|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7969:

Description: 
!https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg! Now resource 
manager only support requesting slot one by one, it's better to make it support 
batch allocating alots, so that it can make a global decision with all the 
resource requests. For example: it can decide how many slots should be put into 
one task manager.
!rm.jpg|thumbnail!

  was:
!https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg!Now resource 
manager only support requesting slot one by one, it's better to make it support 
batch allocating alots, so that it can make a global decision with all the 
resource requests. For example: it can decide how many slots should be put into 
one task manager.
!rm.jpg|thumbnail!


> Resource manager support batch request slots
> 
>
> Key: FLINK-7969
> URL: https://issues.apache.org/jira/browse/FLINK-7969
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Attachments: rm.jpg
>
>
> !https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg! Now 
> resource manager only support requesting slot one by one, it's better to make 
> it support batch allocating alots, so that it can make a global decision with 
> all the resource requests. For example: it can decide how many slots should 
> be put into one task manager.
> !rm.jpg|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7969:

Description: 
!https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg!Now resource 
manager only support requesting slot one by one, it's better to make it support 
batch allocating alots, so that it can make a global decision with all the 
resource requests. For example: it can decide how many slots should be put into 
one task manager.
!rm.jpg|thumbnail!

  was:
Now resource manager only support requesting slot one by one, it's better to 
make it support batch allocating alots, so that it can make a global decision 
with all the resource requests. For example: it can decide how many slots 
should be put into one task manager.
!rm.jpg|thumbnail!


> Resource manager support batch request slots
> 
>
> Key: FLINK-7969
> URL: https://issues.apache.org/jira/browse/FLINK-7969
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Attachments: rm.jpg
>
>
> !https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg!Now 
> resource manager only support requesting slot one by one, it's better to make 
> it support batch allocating alots, so that it can make a global decision with 
> all the resource requests. For example: it can decide how many slots should 
> be put into one task manager.
> !rm.jpg|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7969:

Description: 
Now resource manager only support requesting slot one by one, it's better to 
make it support batch allocating alots, so that it can make a global decision 
with all the resource requests. For example: it can decide how many slots 
should be put into one task manager.
!https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg! 

  was:
!https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg! Now resource 
manager only support requesting slot one by one, it's better to make it support 
batch allocating alots, so that it can make a global decision with all the 
resource requests. For example: it can decide how many slots should be put into 
one task manager.
!rm.jpg|thumbnail!


> Resource manager support batch request slots
> 
>
> Key: FLINK-7969
> URL: https://issues.apache.org/jira/browse/FLINK-7969
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Attachments: rm.jpg
>
>
> Now resource manager only support requesting slot one by one, it's better to 
> make it support batch allocating alots, so that it can make a global decision 
> with all the resource requests. For example: it can decide how many slots 
> should be put into one task manager.
> !https://issues.apache.org/jira/secure/attachment/12895559/rm.jpg! 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7969:

Attachment: (was: rm.png)

> Resource manager support batch request slots
> 
>
> Key: FLINK-7969
> URL: https://issues.apache.org/jira/browse/FLINK-7969
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Attachments: rm.jpg
>
>
> Now resource manager only support requesting slot one by one, it's better to 
> make it support batch allocating alots, so that it can make a global decision 
> with all the resource requests. For example: it can decide how many slots 
> should be put into one task manager.
> !rm.png|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7969:

Description: 
Now resource manager only support requesting slot one by one, it's better to 
make it support batch allocating alots, so that it can make a global decision 
with all the resource requests. For example: it can decide how many slots 
should be put into one task manager.
!rm.jpg|thumbnail!

  was:
Now resource manager only support requesting slot one by one, it's better to 
make it support batch allocating alots, so that it can make a global decision 
with all the resource requests. For example: it can decide how many slots 
should be put into one task manager.
!rm.png|thumbnail!


> Resource manager support batch request slots
> 
>
> Key: FLINK-7969
> URL: https://issues.apache.org/jira/browse/FLINK-7969
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Attachments: rm.jpg
>
>
> Now resource manager only support requesting slot one by one, it's better to 
> make it support batch allocating alots, so that it can make a global decision 
> with all the resource requests. For example: it can decide how many slots 
> should be put into one task manager.
> !rm.jpg|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7969:

Attachment: rm.jpg

> Resource manager support batch request slots
> 
>
> Key: FLINK-7969
> URL: https://issues.apache.org/jira/browse/FLINK-7969
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Attachments: rm.jpg
>
>
> Now resource manager only support requesting slot one by one, it's better to 
> make it support batch allocating alots, so that it can make a global decision 
> with all the resource requests. For example: it can decide how many slots 
> should be put into one task manager.
> !rm.png|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7969:

Attachment: rm.png

> Resource manager support batch request slots
> 
>
> Key: FLINK-7969
> URL: https://issues.apache.org/jira/browse/FLINK-7969
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Attachments: rm.png
>
>
> Now resource manager only support requesting slot one by one, it's better to 
> make it support batch allocating alots, so that it can make a global decision 
> with all the resource requests. For example: it can decide how many slots 
> should be put into one task manager.
> !attachment-name.jpg|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7969) Resource manager support batch request slots

2017-11-02 Thread shuai.xu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7969?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

shuai.xu updated FLINK-7969:

Description: 
Now resource manager only support requesting slot one by one, it's better to 
make it support batch allocating alots, so that it can make a global decision 
with all the resource requests. For example: it can decide how many slots 
should be put into one task manager.
!rm.png|thumbnail!

  was:
Now resource manager only support requesting slot one by one, it's better to 
make it support batch allocating alots, so that it can make a global decision 
with all the resource requests. For example: it can decide how many slots 
should be put into one task manager.
!attachment-name.jpg|thumbnail!


> Resource manager support batch request slots
> 
>
> Key: FLINK-7969
> URL: https://issues.apache.org/jira/browse/FLINK-7969
> Project: Flink
>  Issue Type: Improvement
>  Components: ResourceManager
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
> Attachments: rm.png
>
>
> Now resource manager only support requesting slot one by one, it's better to 
> make it support batch allocating alots, so that it can make a global decision 
> with all the resource requests. For example: it can decide how many slots 
> should be put into one task manager.
> !rm.png|thumbnail!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7941) Port SubtasksTimesHandler to new REST endpoint

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236998#comment-16236998
 ] 

ASF GitHub Bot commented on FLINK-7941:
---

Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4930
  
@zentol Thank you for your remind, I'v updated jackson imports. thanks


> Port SubtasksTimesHandler to new REST endpoint
> --
>
> Key: FLINK-7941
> URL: https://issues.apache.org/jira/browse/FLINK-7941
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> Port *SubtasksTimesHandler* to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4930: [FLINK-7941][flip6] Port SubtasksTimesHandler to new REST...

2017-11-02 Thread zjureel
Github user zjureel commented on the issue:

https://github.com/apache/flink/pull/4930
  
@zentol Thank you for your remind, I'v updated jackson imports. thanks


---


[jira] [Commented] (FLINK-7941) Port SubtasksTimesHandler to new REST endpoint

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236996#comment-16236996
 ] 

ASF GitHub Bot commented on FLINK-7941:
---

GitHub user zjureel reopened a pull request:

https://github.com/apache/flink/pull/4930

[FLINK-7941][flip6] Port SubtasksTimesHandler to new REST endpoint

## What is the purpose of the change

Port SubtasksTimesHandler to new REST endpoint

## Brief change log
  - *Add new SubtasksTimesHandler class*
  - *Add SubtasksTimesInfo/SubtaskTimesHeaders*

## Verifying this change

This change added tests and can be verified as follows:

  - *Add SubtasksTimesInfoTest for testing json response*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-7941

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4930.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4930


commit 1dfc1131fd5325a0f2284237657d3027f19e4fcf
Author: zjureel 
Date:   2017-11-03T02:27:27Z

[FLINK-7941][flip6] Port SubtasksTimesHandler to new REST endpoint




> Port SubtasksTimesHandler to new REST endpoint
> --
>
> Key: FLINK-7941
> URL: https://issues.apache.org/jira/browse/FLINK-7941
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> Port *SubtasksTimesHandler* to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4930: [FLINK-7941][flip6] Port SubtasksTimesHandler to n...

2017-11-02 Thread zjureel
GitHub user zjureel reopened a pull request:

https://github.com/apache/flink/pull/4930

[FLINK-7941][flip6] Port SubtasksTimesHandler to new REST endpoint

## What is the purpose of the change

Port SubtasksTimesHandler to new REST endpoint

## Brief change log
  - *Add new SubtasksTimesHandler class*
  - *Add SubtasksTimesInfo/SubtaskTimesHeaders*

## Verifying this change

This change added tests and can be verified as follows:

  - *Add SubtasksTimesInfoTest for testing json response*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zjureel/flink FLINK-7941

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4930.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4930


commit 1dfc1131fd5325a0f2284237657d3027f19e4fcf
Author: zjureel 
Date:   2017-11-03T02:27:27Z

[FLINK-7941][flip6] Port SubtasksTimesHandler to new REST endpoint




---


[jira] [Commented] (FLINK-7941) Port SubtasksTimesHandler to new REST endpoint

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236994#comment-16236994
 ] 

ASF GitHub Bot commented on FLINK-7941:
---

Github user zjureel closed the pull request at:

https://github.com/apache/flink/pull/4930


> Port SubtasksTimesHandler to new REST endpoint
> --
>
> Key: FLINK-7941
> URL: https://issues.apache.org/jira/browse/FLINK-7941
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> Port *SubtasksTimesHandler* to new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4930: [FLINK-7941][flip6] Port SubtasksTimesHandler to n...

2017-11-02 Thread zjureel
Github user zjureel closed the pull request at:

https://github.com/apache/flink/pull/4930


---


[jira] [Commented] (FLINK-6243) Continuous Joins: True Sliding Window Joins

2017-11-02 Thread Elias Levy (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236751#comment-16236751
 ] 

Elias Levy commented on FLINK-6243:
---

[~StephanEwen] anything to review?

> Continuous Joins:  True Sliding Window Joins
> 
>
> Key: FLINK-6243
> URL: https://issues.apache.org/jira/browse/FLINK-6243
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Affects Versions: 1.1.4
>Reporter: Elias Levy
>Priority: Major
>
> Flink defines sliding window joins as the join of elements of two streams 
> that share a window of time, where the windows are defined by advancing them 
> forward some amount of time that is less than the window time span.  More 
> generally, such windows are just overlapping hopping windows. 
> Other systems, such as Kafka Streams, support a different notion of sliding 
> window joins.  In these systems, two elements of a stream are joined if the 
> absolute time difference between the them is less or equal the time window 
> length.
> This alternate notion of sliding window joins has some advantages in some 
> applications over the current implementation.  
> Elements to be joined may both fall within multiple overlapping sliding 
> windows, leading them to be joined multiple times, when we only wish them to 
> be joined once.
> The implementation need not instantiate window objects to keep track of 
> stream elements, which becomes problematic in the current implementation if 
> the window size is very large and the slide is very small.
> It allows for asymmetric time joins.  E.g. join if elements from stream A are 
> no more than X time behind and Y time head of an element from stream B.
> It is currently possible to implement a join with these semantics using 
> {{CoProcessFunction}}, but the capability should be a first class feature, 
> such as it is in Kafka Streams.
> To perform the join, elements of each stream must be buffered for at least 
> the window time length.  To allow for large window sizes and high volume of 
> elements, the state, possibly optionally, should be buffered such as it can 
> spill to disk (e.g. by using RocksDB).
> The same stream may be joined multiple times in a complex topology.  As an 
> optimization, it may be wise to reuse any element buffer among colocated join 
> operators.  Otherwise, there may write amplification and increased state that 
> must be snapshotted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7339) aggregationToString fails when user defined aggregation contains constants

2017-11-02 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reassigned FLINK-7339:


Assignee: Fabian Hueske

> aggregationToString fails when user defined aggregation contains constants
> --
>
> Key: FLINK-7339
> URL: https://issues.apache.org/jira/browse/FLINK-7339
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Stefano Bortoli
>Assignee: Fabian Hueske
>Priority: Major
>
> Issue related to FLINK-7338, when the user defined aggregation contains a 
> constant it breaks the aggregation translation to string, which are mapped 1 
> to 1 to the input fields. 
> OverAggregates.scala aggregationToString function fails to find a parameter 
> of the function among the input fields, and therefore throws a 
> RuntimeException. 
> {code}
> private[flink] def aggregationToString(
> inputType: RelDataType,
> rowType: RelDataType,
> namedAggregates: Seq[CalcitePair[AggregateCall, String]]): String = {
> val inFields = inputType.getFieldNames.asScala
> val outFields = rowType.getFieldNames.asScala
> val aggStrings = namedAggregates.map(_.getKey).map(
>   a => s"${a.getAggregation}(${
> if (a.getArgList.size() > 0) {
>   // ERROR HAPPENS HERE!
>   a.getArgList.asScala.map(inFields(_)).mkString(", ")
> } else {
>   "*"
> }
>   })")
> (inFields ++ aggStrings).zip(outFields).map {
>   case (f, o) => if (f == o) {
> f
>   } else {
> s"$f AS $o"
>   }
> }.mkString(", ")
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7338) User Defined aggregation with constants causes error under in lowerbound over window extraction

2017-11-02 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7338?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236687#comment-16236687
 ] 

Fabian Hueske commented on FLINK-7338:
--

Thank you [~stefano.bortoli] for reporting the bug and providing the fix!
I'll fix this for the 1.4.0 release.

Thanks, Fabian

> User Defined aggregation with constants causes error under in lowerbound over 
> window extraction
> ---
>
> Key: FLINK-7338
> URL: https://issues.apache.org/jira/browse/FLINK-7338
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Stefano Bortoli
>Assignee: Fabian Hueske
>Priority: Critical
>
> A user defined aggregation that passes a constant among the arguments causes 
> a RuntimeException extracting the lower boundary over window.
> {code}
> val sqlQuery = "SELECT a, " +
>   "  myAgg(a, CAST('1' as BIGINT)) "+
>   "   OVER (ORDER BY proctime RANGE BETWEEN INTERVAL '30' SECOND 
>PRECEDING AND CURRENT ROW) " +
>   "FROM MyTable"
> {code}
> The error is in the org.apache.flink.table.plan.nodes.OverAggregate.scala
> we do : field count - lower bound index
> --  which causes a -1 get, and subsequent RuntimeException. 
> We should do: lower bound offset - field count to find the value in the 
> constant array.
> The code below should fix the problem.
> {code}
> private[flink] def getLowerBoundary(
> logicWindow: Window,
> overWindow: Group,
> input: RelNode): Long = {
> val ref: RexInputRef = 
> overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef]
> val lowerBoundIndex = ref.getIndex - input.getRowType.getFieldCount
> val lowerBound = logicWindow.constants.get(lowerBoundIndex).getValue2
> lowerBound match {
>   case x: java.math.BigDecimal => 
> x.asInstanceOf[java.math.BigDecimal].longValue()
>   case _ => lowerBound.asInstanceOf[Long]
> }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7338) User Defined aggregation with constants causes error under in lowerbound over window extraction

2017-11-02 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reassigned FLINK-7338:


Assignee: Fabian Hueske

> User Defined aggregation with constants causes error under in lowerbound over 
> window extraction
> ---
>
> Key: FLINK-7338
> URL: https://issues.apache.org/jira/browse/FLINK-7338
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Stefano Bortoli
>Assignee: Fabian Hueske
>Priority: Critical
>
> A user defined aggregation that passes a constant among the arguments causes 
> a RuntimeException extracting the lower boundary over window.
> {code}
> val sqlQuery = "SELECT a, " +
>   "  myAgg(a, CAST('1' as BIGINT)) "+
>   "   OVER (ORDER BY proctime RANGE BETWEEN INTERVAL '30' SECOND 
>PRECEDING AND CURRENT ROW) " +
>   "FROM MyTable"
> {code}
> The error is in the org.apache.flink.table.plan.nodes.OverAggregate.scala
> we do : field count - lower bound index
> --  which causes a -1 get, and subsequent RuntimeException. 
> We should do: lower bound offset - field count to find the value in the 
> constant array.
> The code below should fix the problem.
> {code}
> private[flink] def getLowerBoundary(
> logicWindow: Window,
> overWindow: Group,
> input: RelNode): Long = {
> val ref: RexInputRef = 
> overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef]
> val lowerBoundIndex = ref.getIndex - input.getRowType.getFieldCount
> val lowerBound = logicWindow.constants.get(lowerBoundIndex).getValue2
> lowerBound match {
>   case x: java.math.BigDecimal => 
> x.asInstanceOf[java.math.BigDecimal].longValue()
>   case _ => lowerBound.asInstanceOf[Long]
> }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7968) Deduplicate serializer classes between runtime and queryable state

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236615#comment-16236615
 ] 

ASF GitHub Bot commented on FLINK-7968:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4938


> Deduplicate serializer classes between runtime and queryable state
> --
>
> Key: FLINK-7968
> URL: https://issues.apache.org/jira/browse/FLINK-7968
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0
>
>
> Some serializer classes where duplicated into {{flink-queryable-state}} to 
> avoid a dependency on {{flink-runtime}}.
> The proper solution here is to move the classes to the shared {{flink-core}} 
> project, because these classes are actually useful in a series of API 
> utilities and they do not have any dependency on other flink classes at all.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4938: [FLINK-7968] [core] Move DataOutputSerializer and ...

2017-11-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4938


---


[GitHub] flink issue #4938: [FLINK-7968] [core] Move DataOutputSerializer and DataInp...

2017-11-02 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4938
  
+1


---


[jira] [Commented] (FLINK-7968) Deduplicate serializer classes between runtime and queryable state

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236601#comment-16236601
 ] 

ASF GitHub Bot commented on FLINK-7968:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4938
  
+1


> Deduplicate serializer classes between runtime and queryable state
> --
>
> Key: FLINK-7968
> URL: https://issues.apache.org/jira/browse/FLINK-7968
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0
>
>
> Some serializer classes where duplicated into {{flink-queryable-state}} to 
> avoid a dependency on {{flink-runtime}}.
> The proper solution here is to move the classes to the shared {{flink-core}} 
> project, because these classes are actually useful in a series of API 
> utilities and they do not have any dependency on other flink classes at all.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236587#comment-16236587
 ] 

ASF GitHub Bot commented on FLINK-4228:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r148656504
  
--- Diff: flink-filesystems/flink-s3-fs-hadoop/pom.xml ---
@@ -182,6 +182,21 @@ under the License.
${project.version}
test

+   
--- End diff --

Would be great if we can avoid adding these dependencies.
This couples projects that were really meant to be independent, even if 
just in test scope.

If this is about testing recursive upload, can this be written properly as 
a test case in this project?
Or can the Yarn upload test be completely in the yarn test project, adding 
a dependency on this s3 project?


> YARN artifact upload does not work with S3AFileSystem
> -
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The issue now is exclusive to running on YARN with s3a:// as your configured 
> FileSystem. If so, the Flink session will fail on staging itself because it 
> tries to copy the flink/lib directory to S3 and the S3aFileSystem does not 
> support recursive copy.
> h2. Old Issue
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236588#comment-16236588
 ] 

ASF GitHub Bot commented on FLINK-4228:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r148656918
  
--- Diff: 
flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
 ---
@@ -57,11 +62,52 @@
private static final String ACCESS_KEY = 
System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
private static final String SECRET_KEY = 
System.getenv("ARTIFACTS_AWS_SECRET_KEY");
 
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
@BeforeClass
-   public static void checkIfCredentialsArePresent() {
+   public static void checkCredentialsAndSetup() throws IOException {
+   // check whether credentials exist
Assume.assumeTrue("AWS S3 bucket not configured, skipping 
test...", BUCKET != null);
Assume.assumeTrue("AWS S3 access key not configured, skipping 
test...", ACCESS_KEY != null);
Assume.assumeTrue("AWS S3 secret key not configured, skipping 
test...", SECRET_KEY != null);
+
+   // initialize configuration with valid credentials
--- End diff --

I would suggest to move this out of the "setup" method into the actual test.
The setup logic is not shared (all other test methods don't assume that 
setup) and it also assumes existence of functionality that is tested in other 
test methods..


> YARN artifact upload does not work with S3AFileSystem
> -
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The issue now is exclusive to running on YARN with s3a:// as your configured 
> FileSystem. If so, the Flink session will fail on staging itself because it 
> tries to copy the flink/lib directory to S3 and the S3aFileSystem does not 
> support recursive copy.
> h2. Old Issue
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> 

[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-02 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r148656504
  
--- Diff: flink-filesystems/flink-s3-fs-hadoop/pom.xml ---
@@ -182,6 +182,21 @@ under the License.
${project.version}
test

+   
--- End diff --

Would be great if we can avoid adding these dependencies.
This couples projects that were really meant to be independent, even if 
just in test scope.

If this is about testing recursive upload, can this be written properly as 
a test case in this project?
Or can the Yarn upload test be completely in the yarn test project, adding 
a dependency on this s3 project?


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-02 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r148656918
  
--- Diff: 
flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
 ---
@@ -57,11 +62,52 @@
private static final String ACCESS_KEY = 
System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
private static final String SECRET_KEY = 
System.getenv("ARTIFACTS_AWS_SECRET_KEY");
 
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
@BeforeClass
-   public static void checkIfCredentialsArePresent() {
+   public static void checkCredentialsAndSetup() throws IOException {
+   // check whether credentials exist
Assume.assumeTrue("AWS S3 bucket not configured, skipping 
test...", BUCKET != null);
Assume.assumeTrue("AWS S3 access key not configured, skipping 
test...", ACCESS_KEY != null);
Assume.assumeTrue("AWS S3 secret key not configured, skipping 
test...", SECRET_KEY != null);
+
+   // initialize configuration with valid credentials
--- End diff --

I would suggest to move this out of the "setup" method into the actual test.
The setup logic is not shared (all other test methods don't assume that 
setup) and it also assumes existence of functionality that is tested in other 
test methods..


---


[jira] [Assigned] (FLINK-7421) AvroRow(De)serializationSchema not serializable

2017-11-02 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reassigned FLINK-7421:


Assignee: Fabian Hueske  (was: Timo Walther)

> AvroRow(De)serializationSchema not serializable
> ---
>
> Key: FLINK-7421
> URL: https://issues.apache.org/jira/browse/FLINK-7421
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors, Table API & SQL
>Reporter: Timo Walther
>Assignee: Fabian Hueske
>Priority: Critical
>
> Both {{AvroRowDeserializationSchema}} and {{AvroRowSerializationSchema}} 
> contain fields that are not serializable. Those fields should be made 
> transient and both schemas need to be tested in practice.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-6226) ScalarFunction and TableFunction do not support parameters of byte, short and float

2017-11-02 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reassigned FLINK-6226:


Assignee: Fabian Hueske  (was: Zhuoluo Yang)

> ScalarFunction and TableFunction do not support parameters of byte, short and 
> float
> ---
>
> Key: FLINK-6226
> URL: https://issues.apache.org/jira/browse/FLINK-6226
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Zhuoluo Yang
>Assignee: Fabian Hueske
>Priority: Major
>
> It seems to be a problem that ScalarFunction and TableFunction do not support 
> types of byte, short or float.
> It will throw some exceptions like following;
> {panel}
> org.apache.flink.table.api.ValidationException: Given parameters of function 
> 'org$apache$flink$table$expressions$utils$Func18$$98a126fbdab73f43d640516da603291a'
>  do not match any signature. 
> Actual: (java.lang.String, java.lang.Integer, java.lang.Integer, 
> java.lang.Integer, java.lang.Long) 
> Expected: (java.lang.String, byte, short, int, long)
>   at 
> org.apache.flink.table.functions.utils.ScalarSqlFunction$$anon$1.inferReturnType(ScalarSqlFunction.scala:82)
>   at 
> org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:469)
>   at 
> org.apache.calcite.rex.RexBuilder.deriveReturnType(RexBuilder.java:271)
>   at org.apache.calcite.tools.RelBuilder.call(RelBuilder.java:518)
>   at 
> org.apache.flink.table.expressions.ScalarFunctionCall.toRexNode(call.scala:68)
>   at 
> org.apache.flink.table.expressions.Alias.toRexNode(fieldExpression.scala:76)
>   at 
> org.apache.flink.table.plan.logical.Project$$anonfun$construct$1.apply(operators.scala:95)
>   at 
> org.apache.flink.table.plan.logical.Project$$anonfun$construct$1.apply(operators.scala:95)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
>   at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at 
> org.apache.flink.table.plan.logical.Project.construct(operators.scala:95)
>   at 
> org.apache.flink.table.plan.logical.LogicalNode.toRelNode(LogicalNode.scala:77)
>   at org.apache.flink.table.api.Table.getRelNode(table.scala:72)
>   at 
> org.apache.flink.table.expressions.utils.ExpressionTestBase.addTableApiTestExpr(ExpressionTestBase.scala:215)
>   at 
> org.apache.flink.table.expressions.utils.ExpressionTestBase.testAllApis(ExpressionTestBase.scala:241)
>   at 
> org.apache.flink.table.expressions.UserDefinedScalarFunctionTest.testVariableArgs(UserDefinedScalarFunctionTest.scala:240)
> {panel}
> Testing code looks like following:
> {code:java}
> object Func18 extends ScalarFunction {
>   def eval(a: String, b: Byte, c: Short, d: Int, e: Long): String = {
> a + "," + b + "," + c + "," + d + "," + e
>   }
> }
> class TableFunc4 extends  TableFunction[Row] {
>   def eval(data: String, tinyInt: Byte, smallInt: Short, int: Int, long: 
> Long): Unit = {
> val row = new Row(5)
> row.setField(0, data)
> row.setField(1, tinyInt)
> row.setField(2, smallInt)
> row.setField(3, int)
> row.setField(4, long)
> collect(row)
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(
>   BasicTypeInfo.STRING_TYPE_INFO,
>   BasicTypeInfo.BYTE_TYPE_INFO,
>   BasicTypeInfo.SHORT_TYPE_INFO,
>   BasicTypeInfo.INT_TYPE_INFO,
>   BasicTypeInfo.LONG_TYPE_INFO
> )
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4794: [build][minor] Add missing licenses

2017-11-02 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/4794
  
+1


---


[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236512#comment-16236512
 ] 

ASF GitHub Bot commented on FLINK-4228:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4939

[FLINK-4228][yarn/s3a] fix yarn resource upload s3a defaultFs

## What is the purpose of the change

If YARN is configured to use the `s3a` default file system, upload of the 
Flink jars will fail since its 
`org.apache.hadoop.fs.FileSystem#copyFromLocalFile()` does not work recursively 
on the given `lib` folder.

## Brief change log

- implement our own recursive upload (based on #2288)
- add unit tests to verify its behaviour for both `hdfs://` and `s3://` 
(via S3A) resource uploads

## Verifying this change

This change added tests and can be verified as follows:

- added a unit test for HDFS uploads via our `MiniDFSCluster`
- added integration test to verify S3 uploads (via the S3A filesystem 
implementation of the `flink-s3-fs-hadoop` sub-project)
- manually verified the test on YARN with both S3A and HDFS default file 
systems being set

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes - internally)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-4228

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4939.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4939


commit 5d31f41e0e480820e9fec1efa84e5725364a136d
Author: Nico Kruber 
Date:   2017-11-02T18:38:48Z

[hotfix][s3] fix HadoopS3FileSystemITCase leaving test directories behind 
in S3

commit bf47d376397a8e64625a031468d5f5d0a5486238
Author: Nico Kruber 
Date:   2016-11-09T20:04:50Z

[FLINK-4228][yarn/s3] fix for yarn staging with s3a defaultFs

+ includes a new unit tests for recursive uploads to hfds:// targets
+ add a unit test for recursive file uploads to s3:// via s3a




> YARN artifact upload does not work with S3AFileSystem
> -
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The issue now is exclusive to running on YARN with s3a:// as your configured 
> FileSystem. If so, the Flink session will fail on staging itself because it 
> tries to copy the flink/lib directory to S3 and the S3aFileSystem does not 
> support recursive copy.
> h2. Old Issue
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> 

[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-02 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4939

[FLINK-4228][yarn/s3a] fix yarn resource upload s3a defaultFs

## What is the purpose of the change

If YARN is configured to use the `s3a` default file system, upload of the 
Flink jars will fail since its 
`org.apache.hadoop.fs.FileSystem#copyFromLocalFile()` does not work recursively 
on the given `lib` folder.

## Brief change log

- implement our own recursive upload (based on #2288)
- add unit tests to verify its behaviour for both `hdfs://` and `s3://` 
(via S3A) resource uploads

## Verifying this change

This change added tests and can be verified as follows:

- added a unit test for HDFS uploads via our `MiniDFSCluster`
- added integration test to verify S3 uploads (via the S3A filesystem 
implementation of the `flink-s3-fs-hadoop` sub-project)
- manually verified the test on YARN with both S3A and HDFS default file 
systems being set

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes - internally)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-4228

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4939.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4939


commit 5d31f41e0e480820e9fec1efa84e5725364a136d
Author: Nico Kruber 
Date:   2017-11-02T18:38:48Z

[hotfix][s3] fix HadoopS3FileSystemITCase leaving test directories behind 
in S3

commit bf47d376397a8e64625a031468d5f5d0a5486238
Author: Nico Kruber 
Date:   2016-11-09T20:04:50Z

[FLINK-4228][yarn/s3] fix for yarn staging with s3a defaultFs

+ includes a new unit tests for recursive uploads to hfds:// targets
+ add a unit test for recursive file uploads to s3:// via s3a




---


[jira] [Updated] (FLINK-7961) Docker-Flink with Docker Swarm doesn't work when machines are in different clouds

2017-11-02 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7961?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-7961:
--
Description: 
Task Managers can't find Job Manager by name. Maybe some additional Docker 
configuration is needed?

I am running the standard setup and create-docker-swarm-service.sh script from 
the Docker Flink project:
https://github.com/apache/flink/blob/master/flink-contrib/docker-flink/create-docker-swarm-service.sh

This is the log from one of the Task Manager's containers:

{noformat}
Starting Task Manager
config file:
jobmanager.rpc.address: flink-jobmanager
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 1024
taskmanager.heap.mb: 1024
taskmanager.numberOfTaskSlots: 2
taskmanager.memory.preallocate: false
parallelism.default: 1
jobmanager.web.port: 8081
blob.server.port: 6124
query.server.port: 6125
Starting taskmanager as a console application on host c42a6093f7bb.
2017-11-01 11:20:51,459 WARN  org.apache.hadoop.util.NativeCodeLoader   
- Unable to load native-hadoop library for your platform... using 
builtin-java classes where applicable
2017-11-01 11:20:51,522 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- 

2017-11-01 11:20:51,522 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Starting TaskManager (Version: 1.3.2, Rev:0399bee, 
Date:03.08.2017 @ 10:23:11 UTC)
2017-11-01 11:20:51,522 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Current user: flink
2017-11-01 11:20:51,522 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 
1.8/25.141-b15
2017-11-01 11:20:51,522 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Maximum heap size: 1024 MiBytes
2017-11-01 11:20:51,522 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  JAVA_HOME: /docker-java-home/jre
2017-11-01 11:20:51,526 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Hadoop version: 2.7.2
2017-11-01 11:20:51,526 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  JVM Options:
2017-11-01 11:20:51,526 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- -XX:+UseG1GC
2017-11-01 11:20:51,526 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- -Xms1024M
2017-11-01 11:20:51,526 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- -Xmx1024M
2017-11-01 11:20:51,526 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- -XX:MaxDirectMemorySize=8388607T
2017-11-01 11:20:51,526 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- 
-Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2017-11-01 11:20:51,526 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- 
-Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2017-11-01 11:20:51,526 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Program Arguments:
2017-11-01 11:20:51,527 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- --configDir
2017-11-01 11:20:51,527 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- /opt/flink/conf
2017-11-01 11:20:51,527 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
-  Classpath: 
/opt/flink/lib/flink-python_2.11-1.3.2.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.3.2.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.3.2.jar:::
2017-11-01 11:20:51,527 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- 

2017-11-01 11:20:51,528 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Registered UNIX signal handlers for [TERM, HUP, INT]
2017-11-01 11:20:51,532 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Maximum number of open file descriptors is 1048576
2017-11-01 11:20:51,548 INFO  org.apache.flink.runtime.taskmanager.TaskManager  
- Loading configuration from /opt/flink/conf
2017-11-01 11:20:51,551 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rpc.address, flink-jobmanager
2017-11-01 11:20:51,551 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rpc.port, 6123
2017-11-01 11:20:51,551 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.heap.mb, 1024
2017-11-01 11:20:51,551 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: taskmanager.heap.mb, 1024

[jira] [Commented] (FLINK-7947) Let ParameterTool return a dedicated GlobalJobParameters object

2017-11-02 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236479#comment-16236479
 ] 

Greg Hogan commented on FLINK-7947:
---

Do these changes necessitate modifying the {{@Public}} API?

> Let ParameterTool return a dedicated GlobalJobParameters object
> ---
>
> Key: FLINK-7947
> URL: https://issues.apache.org/jira/browse/FLINK-7947
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Bowen Li
>Priority: Major
>
> The {{ParameterTool}} directly implements the {{GlobalJobParameters}} 
> interface. Additionally it has grown over time to not only store the 
> configuration parameters but also to record which parameters have been 
> requested and what default value was set. This information is irrelevant on 
> the server side when setting a {{GlobalJobParameters}} object via 
> {{ExecutionConfig#setGlobalJobParameters}}.
> Since we don't separate the {{ParameterTool}} logic and the actual data view, 
> users ran into problems when reusing the same {{ParameterTool}} to start 
> multiple jobs concurrently (see FLINK-7943). I think it would be a much 
> clearer separation of concerns if we would actually split the 
> {{GlobalJobParameters}} from the {{ParameterTool}}.
> Furthermore, we should think about whether {{ParameterTool#get}} should have 
> side effects or not as it does right now.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6226) ScalarFunction and TableFunction do not support parameters of byte, short and float

2017-11-02 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236297#comment-16236297
 ] 

Fabian Hueske commented on FLINK-6226:
--

I tried to reproduce the issue but the provided UDFs work on the current master 
branch.
Seems like the issue was fixed in the mean time.

I'd suggest to modify two existing test cases to use UDFs with {{byte}}, 
{{short}}, and {{float}} types.

> ScalarFunction and TableFunction do not support parameters of byte, short and 
> float
> ---
>
> Key: FLINK-6226
> URL: https://issues.apache.org/jira/browse/FLINK-6226
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Zhuoluo Yang
>Assignee: Zhuoluo Yang
>Priority: Major
>
> It seems to be a problem that ScalarFunction and TableFunction do not support 
> types of byte, short or float.
> It will throw some exceptions like following;
> {panel}
> org.apache.flink.table.api.ValidationException: Given parameters of function 
> 'org$apache$flink$table$expressions$utils$Func18$$98a126fbdab73f43d640516da603291a'
>  do not match any signature. 
> Actual: (java.lang.String, java.lang.Integer, java.lang.Integer, 
> java.lang.Integer, java.lang.Long) 
> Expected: (java.lang.String, byte, short, int, long)
>   at 
> org.apache.flink.table.functions.utils.ScalarSqlFunction$$anon$1.inferReturnType(ScalarSqlFunction.scala:82)
>   at 
> org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:469)
>   at 
> org.apache.calcite.rex.RexBuilder.deriveReturnType(RexBuilder.java:271)
>   at org.apache.calcite.tools.RelBuilder.call(RelBuilder.java:518)
>   at 
> org.apache.flink.table.expressions.ScalarFunctionCall.toRexNode(call.scala:68)
>   at 
> org.apache.flink.table.expressions.Alias.toRexNode(fieldExpression.scala:76)
>   at 
> org.apache.flink.table.plan.logical.Project$$anonfun$construct$1.apply(operators.scala:95)
>   at 
> org.apache.flink.table.plan.logical.Project$$anonfun$construct$1.apply(operators.scala:95)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
>   at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at 
> org.apache.flink.table.plan.logical.Project.construct(operators.scala:95)
>   at 
> org.apache.flink.table.plan.logical.LogicalNode.toRelNode(LogicalNode.scala:77)
>   at org.apache.flink.table.api.Table.getRelNode(table.scala:72)
>   at 
> org.apache.flink.table.expressions.utils.ExpressionTestBase.addTableApiTestExpr(ExpressionTestBase.scala:215)
>   at 
> org.apache.flink.table.expressions.utils.ExpressionTestBase.testAllApis(ExpressionTestBase.scala:241)
>   at 
> org.apache.flink.table.expressions.UserDefinedScalarFunctionTest.testVariableArgs(UserDefinedScalarFunctionTest.scala:240)
> {panel}
> Testing code looks like following:
> {code:java}
> object Func18 extends ScalarFunction {
>   def eval(a: String, b: Byte, c: Short, d: Int, e: Long): String = {
> a + "," + b + "," + c + "," + d + "," + e
>   }
> }
> class TableFunc4 extends  TableFunction[Row] {
>   def eval(data: String, tinyInt: Byte, smallInt: Short, int: Int, long: 
> Long): Unit = {
> val row = new Row(5)
> row.setField(0, data)
> row.setField(1, tinyInt)
> row.setField(2, smallInt)
> row.setField(3, int)
> row.setField(4, long)
> collect(row)
>   }
>   override def getResultType: TypeInformation[Row] = {
> new RowTypeInfo(
>   BasicTypeInfo.STRING_TYPE_INFO,
>   BasicTypeInfo.BYTE_TYPE_INFO,
>   BasicTypeInfo.SHORT_TYPE_INFO,
>   BasicTypeInfo.INT_TYPE_INFO,
>   BasicTypeInfo.LONG_TYPE_INFO
> )
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7968) Deduplicate serializer classes between runtime and queryable state

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236262#comment-16236262
 ] 

ASF GitHub Bot commented on FLINK-7968:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/4938

[FLINK-7968] [core] Move DataOutputSerializer and DataInputDeserializer to 
'flink-core'

## What is the purpose of the change

This moves the classes `DataInputDeserializer` and `DataOutputSerializer` 
to `flink-core`, because these classes are used across difference projects 
(`flink-runtime` and `flink-queryable-state`) and were previously duplicated.
This is also needed for future cleanup in the proper scoping of 
`SerializationSchema`.
It is easily possible, because these classes have no dependencies on other 
Flink classes anyways.

## Changelog
  - Move `DataInputDeserializer` and `DataOutputSerializer` to `flink-core`
  - Move associated tests and test utils to `flink-core`
  - Delete the duplicate serializers in `flink-queryable-state`.

## Verifying this change

This change is a trivial rework / code cleanup which is covered by existing 
tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink move_data_in_out

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4938.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4938


commit 6afd68b100edab4689a6e7a39b5aee7743e05853
Author: Stephan Ewen 
Date:   2017-11-02T16:19:19Z

[hotfix] [tests] Improve TypeInfoTestCoverageTest

commit 4d808a456d698253b2a77cc79e7d4386ceeab706
Author: Stephan Ewen 
Date:   2017-11-02T17:27:23Z

[FLINK-7968] [core] Move DataOutputSerializer and DataInputDeserializer to 
'flink-core'

These core flink utils are independent of any other runtime classes and
are also used both in flink-runtime and in flink-queryable-state (which 
duplicated
the code).




> Deduplicate serializer classes between runtime and queryable state
> --
>
> Key: FLINK-7968
> URL: https://issues.apache.org/jira/browse/FLINK-7968
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0
>
>
> Some serializer classes where duplicated into {{flink-queryable-state}} to 
> avoid a dependency on {{flink-runtime}}.
> The proper solution here is to move the classes to the shared {{flink-core}} 
> project, because these classes are actually useful in a series of API 
> utilities and they do not have any dependency on other flink classes at all.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7421) AvroRow(De)serializationSchema not serializable

2017-11-02 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-7421:
-
Priority: Critical  (was: Major)

> AvroRow(De)serializationSchema not serializable
> ---
>
> Key: FLINK-7421
> URL: https://issues.apache.org/jira/browse/FLINK-7421
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors, Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Critical
>
> Both {{AvroRowDeserializationSchema}} and {{AvroRowSerializationSchema}} 
> contain fields that are not serializable. Those fields should be made 
> transient and both schemas need to be tested in practice.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4938: [FLINK-7968] [core] Move DataOutputSerializer and ...

2017-11-02 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/4938

[FLINK-7968] [core] Move DataOutputSerializer and DataInputDeserializer to 
'flink-core'

## What is the purpose of the change

This moves the classes `DataInputDeserializer` and `DataOutputSerializer` 
to `flink-core`, because these classes are used across difference projects 
(`flink-runtime` and `flink-queryable-state`) and were previously duplicated.
This is also needed for future cleanup in the proper scoping of 
`SerializationSchema`.
It is easily possible, because these classes have no dependencies on other 
Flink classes anyways.

## Changelog
  - Move `DataInputDeserializer` and `DataOutputSerializer` to `flink-core`
  - Move associated tests and test utils to `flink-core`
  - Delete the duplicate serializers in `flink-queryable-state`.

## Verifying this change

This change is a trivial rework / code cleanup which is covered by existing 
tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink move_data_in_out

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4938.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4938


commit 6afd68b100edab4689a6e7a39b5aee7743e05853
Author: Stephan Ewen 
Date:   2017-11-02T16:19:19Z

[hotfix] [tests] Improve TypeInfoTestCoverageTest

commit 4d808a456d698253b2a77cc79e7d4386ceeab706
Author: Stephan Ewen 
Date:   2017-11-02T17:27:23Z

[FLINK-7968] [core] Move DataOutputSerializer and DataInputDeserializer to 
'flink-core'

These core flink utils are independent of any other runtime classes and
are also used both in flink-runtime and in flink-queryable-state (which 
duplicated
the code).




---


[jira] [Created] (FLINK-7968) Deduplicate serializer classes between runtime and queryable state

2017-11-02 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-7968:
---

 Summary: Deduplicate serializer classes between runtime and 
queryable state
 Key: FLINK-7968
 URL: https://issues.apache.org/jira/browse/FLINK-7968
 Project: Flink
  Issue Type: Bug
  Components: Queryable State
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.4.0


Some serializer classes where duplicated into {{flink-queryable-state}} to 
avoid a dependency on {{flink-runtime}}.

The proper solution here is to move the classes to the shared {{flink-core}} 
project, because these classes are actually useful in a series of API utilities 
and they do not have any dependency on other flink classes at all.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236251#comment-16236251
 ] 

ASF GitHub Bot commented on FLINK-7694:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148609899
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
 ---
@@ -32,8 +46,11 @@
  * The handler will then return a list containing the values of the 
requested metrics.
  * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] 
}
  */
-public class JobMetricsHandler extends AbstractMetricsHandler {
+public class JobMetricsHandler extends AbstractMetricsHandler
--- End diff --

Why aren't we simply implementing a new handler? I think that we don't 
reuse any of `AbstractMetricsHandler` functionality and, thus, there is no need 
to use it as a base class. 


> Port JobMetricsHandler to new REST handler
> --
>
> Key: FLINK-7694
> URL: https://issues.apache.org/jira/browse/FLINK-7694
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236247#comment-16236247
 ] 

ASF GitHub Bot commented on FLINK-7694:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148606039
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/metrics/MetricMessageParameters.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.metrics;
+
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Parameters for getting metrics.
+ */
+public class MetricMessageParameters extends MessageParameters {
--- End diff --

Let's extend from `JobMessageParameters`


> Port JobMetricsHandler to new REST handler
> --
>
> Key: FLINK-7694
> URL: https://issues.apache.org/jira/browse/FLINK-7694
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236250#comment-16236250
 ] 

ASF GitHub Bot commented on FLINK-7694:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148610350
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
 ---
@@ -120,6 +121,16 @@ public synchronized ComponentMetricStore 
getJobMetricStore(String jobID) {
}
 
/**
+* Returns the {@link ComponentMetricStore} for the given job ID.
+*
+* @param jobID job ID
+* @return ComponentMetricStore for the given ID, or null if no store 
for the given argument exists
+*/
+   public synchronized ComponentMetricStore getJobMetricStore(JobID jobID) 
{
+   return jobID == null ? null : 
ComponentMetricStore.unmodifiable(jobs.get(jobID.toString()));
--- End diff --

`jobID` should not be nullable.


> Port JobMetricsHandler to new REST handler
> --
>
> Key: FLINK-7694
> URL: https://issues.apache.org/jira/browse/FLINK-7694
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236252#comment-16236252
 ] 

ASF GitHub Bot commented on FLINK-7694:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148606599
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandlerTest.java
 ---
@@ -65,15 +65,28 @@ public void getMapFor() throws Exception {
 
assertEquals("2", metrics.get("abc.metric3"));
assertEquals("3", metrics.get("abc.metric4"));
+   assertEquals(
+   "[" +
+   "{\"id\":\"abc.metric4\"}," +
+   "{\"id\":\"abc.metric3\"}" +
+   "]",
+   handler.getAvailableMetricsList(pathParams));
+   assertEquals("", handler.getMetricsValues(pathParams, ""));
+   assertEquals(
+   "[" +
+   
"{\"id\":\"abc.metric3\",\"value\":\"2\"}," +
+   
"{\"id\":\"abc.metric4\",\"value\":\"3\"}" +
+   "]",
+   handler.getMetricsValues(pathParams, 
"abc.metric3,abc.metric4"));
}
 
@Test
public void getMapForNull() {
MetricFetcher fetcher = new MetricFetcher(
-   mock(GatewayRetriever.class),
-   mock(MetricQueryServiceRetriever.class),
-   Executors.directExecutor(),
-   TestingUtils.TIMEOUT());
+   
mock(GatewayRetriever.class),
+   
mock(MetricQueryServiceRetriever.class),
+   
Executors.directExecutor(),
+   
TestingUtils.TIMEOUT());
--- End diff --

Please revert


> Port JobMetricsHandler to new REST handler
> --
>
> Key: FLINK-7694
> URL: https://issues.apache.org/jira/browse/FLINK-7694
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236246#comment-16236246
 ] 

ASF GitHub Bot commented on FLINK-7694:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148606465
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/metrics/MetricsOverview.java
 ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.metrics;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Response of metrics handlers, represented as a list of {@link 
MetricEntry}.
+ */
+public class MetricsOverview extends ArrayList implements 
ResponseBody {
--- End diff --

Let's not directly extend from `ArrayList` but instead use composition.


> Port JobMetricsHandler to new REST handler
> --
>
> Key: FLINK-7694
> URL: https://issues.apache.org/jira/browse/FLINK-7694
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236249#comment-16236249
 ] 

ASF GitHub Bot commented on FLINK-7694:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148606524
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandlerTest.java
 ---
@@ -50,10 +50,10 @@ public void testGetPaths() {
@Test
public void getMapFor() throws Exception {
MetricFetcher fetcher = new MetricFetcher(
-   mock(GatewayRetriever.class),
-   mock(MetricQueryServiceRetriever.class),
-   Executors.directExecutor(),
-   TestingUtils.TIMEOUT());
+   
mock(GatewayRetriever.class),
+   
mock(MetricQueryServiceRetriever.class),
+   
Executors.directExecutor(),
+   
TestingUtils.TIMEOUT());
--- End diff --

Please revert


> Port JobMetricsHandler to new REST handler
> --
>
> Key: FLINK-7694
> URL: https://issues.apache.org/jira/browse/FLINK-7694
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236248#comment-16236248
 ] 

ASF GitHub Bot commented on FLINK-7694:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148604013
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 ---
@@ -304,22 +304,21 @@ public WebRuntimeMonitor(
get(router, new JobAccumulatorsHandler(executionGraphCache, 
scheduledExecutor));
 
get(router, new TaskManagersHandler(scheduledExecutor, 
DEFAULT_REQUEST_TIMEOUT, metricFetcher));
-   get(router,
-   new TaskManagerLogHandler(
-   retriever,
-   scheduledExecutor,
-   localRestAddress,
-   timeout,
-   TaskManagerLogHandler.FileMode.LOG,
-   config));
-   get(router,
-   new TaskManagerLogHandler(
-   retriever,
-   scheduledExecutor,
-   localRestAddress,
-   timeout,
-   TaskManagerLogHandler.FileMode.STDOUT,
-   config));
+   get(router, new TaskManagerLogHandler(
+   retriever,
+   scheduledExecutor,
+   localRestAddress,
+   timeout,
+   
TaskManagerLogHandler.FileMode.LOG,
+   config));
+   get(router, new TaskManagerLogHandler(
+   retriever,
+   scheduledExecutor,
+   localRestAddress,
+   timeout,
+   
TaskManagerLogHandler.FileMode.STDOUT,
+   config));
+   get(router, new TaskManagerMetricsHandler(scheduledExecutor, 
metricFetcher));
--- End diff --

why are you adding things to the old `WebRuntimeMonitor`?


> Port JobMetricsHandler to new REST handler
> --
>
> Key: FLINK-7694
> URL: https://issues.apache.org/jira/browse/FLINK-7694
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236254#comment-16236254
 ] 

ASF GitHub Bot commented on FLINK-7694:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148606332
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/metrics/MetricsHeaders.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.metrics;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message header for metrics handler.
+ */
+public final class MetricsHeaders implements 
MessageHeaders {
+
+   private static final MetricsHeaders INSTANCE = new MetricsHeaders();
+
+   public static final String PARAMETER_JOB_ID = "jobid";
--- End diff --

where is this field used?


> Port JobMetricsHandler to new REST handler
> --
>
> Key: FLINK-7694
> URL: https://issues.apache.org/jira/browse/FLINK-7694
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236253#comment-16236253
 ] 

ASF GitHub Bot commented on FLINK-7694:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148609556
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
 ---
@@ -48,8 +65,43 @@ public JobMetricsHandler(Executor executor, 
MetricFetcher fetcher) {
@Override
protected Map getMapFor(Map pathParams, 
MetricStore metrics) {
MetricStore.ComponentMetricStore job = 
metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
-   return job != null
-   ? job.metrics
-   : null;
+   return job != null ? job.metrics : null;
+   }
+
+   @Override
+   public CompletableFuture 
handleRequest(HandlerRequest 
request, DispatcherGateway gateway) {
+   return CompletableFuture.supplyAsync(
+   () -> {
+   fetcher.update();
+   JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
+   List requestedMetrics = 
request.getQueryParameter(MetricNameParameter.class);
+   return getMetricsOverview(jobID, 
requestedMetrics);
+   },
+   executor);
+   }
+
+   protected MetricsOverview getMetricsOverview(JobID jobID, List 
requestedMetrics) {
+   Map metricsMap = getMetricsMapByJobId(jobID, 
fetcher.getMetricStore());
+   if (metricsMap == null) {
+   return new MetricsOverview();
+   }
+
+   if (requestedMetrics == null || requestedMetrics.isEmpty()) {
+   return new MetricsOverview(
+   metricsMap.entrySet().stream()
+   .map(e -> new 
MetricEntry(e.getKey(), e.getValue()))
+   
.collect(Collectors.toList()));
+   } else {
+   return new MetricsOverview(
+   requestedMetrics.stream()
+   .filter(e -> 
metricsMap.get(e) != null)
+   .map(e -> new 
MetricEntry(e, metricsMap.get(e)))
+   
.collect(Collectors.toList()));
--- End diff --

I think by not using Java streams we can avoid to do for every `e in 
requestedMetrics` two `HashMap` lookups and instead do it with a single lookup.


> Port JobMetricsHandler to new REST handler
> --
>
> Key: FLINK-7694
> URL: https://issues.apache.org/jira/browse/FLINK-7694
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7694) Port JobMetricsHandler to new REST handler

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236245#comment-16236245
 ] 

ASF GitHub Bot commented on FLINK-7694:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148604078
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 ---
@@ -304,22 +304,21 @@ public WebRuntimeMonitor(
get(router, new JobAccumulatorsHandler(executionGraphCache, 
scheduledExecutor));
 
get(router, new TaskManagersHandler(scheduledExecutor, 
DEFAULT_REQUEST_TIMEOUT, metricFetcher));
-   get(router,
-   new TaskManagerLogHandler(
-   retriever,
-   scheduledExecutor,
-   localRestAddress,
-   timeout,
-   TaskManagerLogHandler.FileMode.LOG,
-   config));
-   get(router,
-   new TaskManagerLogHandler(
-   retriever,
-   scheduledExecutor,
-   localRestAddress,
-   timeout,
-   TaskManagerLogHandler.FileMode.STDOUT,
-   config));
+   get(router, new TaskManagerLogHandler(
+   retriever,
+   scheduledExecutor,
+   localRestAddress,
+   timeout,
+   
TaskManagerLogHandler.FileMode.LOG,
+   config));
+   get(router, new TaskManagerLogHandler(
+   retriever,
+   scheduledExecutor,
+   localRestAddress,
+   timeout,
+   
TaskManagerLogHandler.FileMode.STDOUT,
+   config));
--- End diff --

Please revert formatting changes.


> Port JobMetricsHandler to new REST handler
> --
>
> Key: FLINK-7694
> URL: https://issues.apache.org/jira/browse/FLINK-7694
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148606599
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandlerTest.java
 ---
@@ -65,15 +65,28 @@ public void getMapFor() throws Exception {
 
assertEquals("2", metrics.get("abc.metric3"));
assertEquals("3", metrics.get("abc.metric4"));
+   assertEquals(
+   "[" +
+   "{\"id\":\"abc.metric4\"}," +
+   "{\"id\":\"abc.metric3\"}" +
+   "]",
+   handler.getAvailableMetricsList(pathParams));
+   assertEquals("", handler.getMetricsValues(pathParams, ""));
+   assertEquals(
+   "[" +
+   
"{\"id\":\"abc.metric3\",\"value\":\"2\"}," +
+   
"{\"id\":\"abc.metric4\",\"value\":\"3\"}" +
+   "]",
+   handler.getMetricsValues(pathParams, 
"abc.metric3,abc.metric4"));
}
 
@Test
public void getMapForNull() {
MetricFetcher fetcher = new MetricFetcher(
-   mock(GatewayRetriever.class),
-   mock(MetricQueryServiceRetriever.class),
-   Executors.directExecutor(),
-   TestingUtils.TIMEOUT());
+   
mock(GatewayRetriever.class),
+   
mock(MetricQueryServiceRetriever.class),
+   
Executors.directExecutor(),
+   
TestingUtils.TIMEOUT());
--- End diff --

Please revert


---


[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148606524
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandlerTest.java
 ---
@@ -50,10 +50,10 @@ public void testGetPaths() {
@Test
public void getMapFor() throws Exception {
MetricFetcher fetcher = new MetricFetcher(
-   mock(GatewayRetriever.class),
-   mock(MetricQueryServiceRetriever.class),
-   Executors.directExecutor(),
-   TestingUtils.TIMEOUT());
+   
mock(GatewayRetriever.class),
+   
mock(MetricQueryServiceRetriever.class),
+   
Executors.directExecutor(),
+   
TestingUtils.TIMEOUT());
--- End diff --

Please revert


---


[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148606332
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/metrics/MetricsHeaders.java
 ---
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.metrics;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message header for metrics handler.
+ */
+public final class MetricsHeaders implements 
MessageHeaders {
+
+   private static final MetricsHeaders INSTANCE = new MetricsHeaders();
+
+   public static final String PARAMETER_JOB_ID = "jobid";
--- End diff --

where is this field used?


---


[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148609899
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
 ---
@@ -32,8 +46,11 @@
  * The handler will then return a list containing the values of the 
requested metrics.
  * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] 
}
  */
-public class JobMetricsHandler extends AbstractMetricsHandler {
+public class JobMetricsHandler extends AbstractMetricsHandler
--- End diff --

Why aren't we simply implementing a new handler? I think that we don't 
reuse any of `AbstractMetricsHandler` functionality and, thus, there is no need 
to use it as a base class. 


---


[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148604013
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 ---
@@ -304,22 +304,21 @@ public WebRuntimeMonitor(
get(router, new JobAccumulatorsHandler(executionGraphCache, 
scheduledExecutor));
 
get(router, new TaskManagersHandler(scheduledExecutor, 
DEFAULT_REQUEST_TIMEOUT, metricFetcher));
-   get(router,
-   new TaskManagerLogHandler(
-   retriever,
-   scheduledExecutor,
-   localRestAddress,
-   timeout,
-   TaskManagerLogHandler.FileMode.LOG,
-   config));
-   get(router,
-   new TaskManagerLogHandler(
-   retriever,
-   scheduledExecutor,
-   localRestAddress,
-   timeout,
-   TaskManagerLogHandler.FileMode.STDOUT,
-   config));
+   get(router, new TaskManagerLogHandler(
+   retriever,
+   scheduledExecutor,
+   localRestAddress,
+   timeout,
+   
TaskManagerLogHandler.FileMode.LOG,
+   config));
+   get(router, new TaskManagerLogHandler(
+   retriever,
+   scheduledExecutor,
+   localRestAddress,
+   timeout,
+   
TaskManagerLogHandler.FileMode.STDOUT,
+   config));
+   get(router, new TaskManagerMetricsHandler(scheduledExecutor, 
metricFetcher));
--- End diff --

why are you adding things to the old `WebRuntimeMonitor`?


---


[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148606465
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/metrics/MetricsOverview.java
 ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.metrics;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Response of metrics handlers, represented as a list of {@link 
MetricEntry}.
+ */
+public class MetricsOverview extends ArrayList implements 
ResponseBody {
--- End diff --

Let's not directly extend from `ArrayList` but instead use composition.


---


[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148609556
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobMetricsHandler.java
 ---
@@ -48,8 +65,43 @@ public JobMetricsHandler(Executor executor, 
MetricFetcher fetcher) {
@Override
protected Map getMapFor(Map pathParams, 
MetricStore metrics) {
MetricStore.ComponentMetricStore job = 
metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
-   return job != null
-   ? job.metrics
-   : null;
+   return job != null ? job.metrics : null;
+   }
+
+   @Override
+   public CompletableFuture 
handleRequest(HandlerRequest 
request, DispatcherGateway gateway) {
+   return CompletableFuture.supplyAsync(
+   () -> {
+   fetcher.update();
+   JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
+   List requestedMetrics = 
request.getQueryParameter(MetricNameParameter.class);
+   return getMetricsOverview(jobID, 
requestedMetrics);
+   },
+   executor);
+   }
+
+   protected MetricsOverview getMetricsOverview(JobID jobID, List 
requestedMetrics) {
+   Map metricsMap = getMetricsMapByJobId(jobID, 
fetcher.getMetricStore());
+   if (metricsMap == null) {
+   return new MetricsOverview();
+   }
+
+   if (requestedMetrics == null || requestedMetrics.isEmpty()) {
+   return new MetricsOverview(
+   metricsMap.entrySet().stream()
+   .map(e -> new 
MetricEntry(e.getKey(), e.getValue()))
+   
.collect(Collectors.toList()));
+   } else {
+   return new MetricsOverview(
+   requestedMetrics.stream()
+   .filter(e -> 
metricsMap.get(e) != null)
+   .map(e -> new 
MetricEntry(e, metricsMap.get(e)))
+   
.collect(Collectors.toList()));
--- End diff --

I think by not using Java streams we can avoid to do for every `e in 
requestedMetrics` two `HashMap` lookups and instead do it with a single lookup.


---


[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148606039
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/metrics/MetricMessageParameters.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.metrics;
+
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Parameters for getting metrics.
+ */
+public class MetricMessageParameters extends MessageParameters {
--- End diff --

Let's extend from `JobMessageParameters`


---


[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148604078
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 ---
@@ -304,22 +304,21 @@ public WebRuntimeMonitor(
get(router, new JobAccumulatorsHandler(executionGraphCache, 
scheduledExecutor));
 
get(router, new TaskManagersHandler(scheduledExecutor, 
DEFAULT_REQUEST_TIMEOUT, metricFetcher));
-   get(router,
-   new TaskManagerLogHandler(
-   retriever,
-   scheduledExecutor,
-   localRestAddress,
-   timeout,
-   TaskManagerLogHandler.FileMode.LOG,
-   config));
-   get(router,
-   new TaskManagerLogHandler(
-   retriever,
-   scheduledExecutor,
-   localRestAddress,
-   timeout,
-   TaskManagerLogHandler.FileMode.STDOUT,
-   config));
+   get(router, new TaskManagerLogHandler(
+   retriever,
+   scheduledExecutor,
+   localRestAddress,
+   timeout,
+   
TaskManagerLogHandler.FileMode.LOG,
+   config));
+   get(router, new TaskManagerLogHandler(
+   retriever,
+   scheduledExecutor,
+   localRestAddress,
+   timeout,
+   
TaskManagerLogHandler.FileMode.STDOUT,
+   config));
--- End diff --

Please revert formatting changes.


---


[GitHub] flink pull request #4757: [FLINK-7694][REST][Webfrontend]Port JobMetricsHand...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4757#discussion_r148610350
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/MetricStore.java
 ---
@@ -120,6 +121,16 @@ public synchronized ComponentMetricStore 
getJobMetricStore(String jobID) {
}
 
/**
+* Returns the {@link ComponentMetricStore} for the given job ID.
+*
+* @param jobID job ID
+* @return ComponentMetricStore for the given ID, or null if no store 
for the given argument exists
+*/
+   public synchronized ComponentMetricStore getJobMetricStore(JobID jobID) 
{
+   return jobID == null ? null : 
ComponentMetricStore.unmodifiable(jobs.get(jobID.toString()));
--- End diff --

`jobID` should not be nullable.


---


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236191#comment-16236191
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148599674
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
 ---
@@ -86,10 +85,16 @@
// 

 
CompletableFuture allocateSlot(
-   ScheduledUnit task,
+   AllocationID allocationID,
ResourceProfile resources,
Iterable locationPreferences,
@RpcTimeout Time timeout);
 
void returnAllocatedSlot(Slot slot);
+
+   /**
+* Cancel a slot allocation.
+* This method should be called when the CompletableFuture returned by 
allocateSlot completed exceptionally.
--- End diff --

Params description is missing.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236185#comment-16236185
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148591926
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -262,23 +263,36 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   ScheduledUnit task,
--- End diff --

We should not remove the `ScheduledUnit` parameter here.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236180#comment-16236180
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148571514
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfWaitingForResourceRequests());
+
+   pool.cancelSlotAllocation(allocationID);
+   assertEquals(0, pool.getNumOfWaitingForResourceRequests());
+
+   // 2. test the pending request is in pendingRequests
+   ResourceManagerGateway resourceManagerGateway = 
SlotPoolTest.createResourceManagerGatewayMock();
+   pool.connectToResourceManager(resourceManagerGateway);
+
+   AllocationID allocationID2 = new AllocationID();
+   future = slotPoolGateway.allocateSlot(allocationID2, 
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfPendingRequests());
+
+   pool.cancelSlotAllocation(allocationID2);
+   assertEquals(0, pool.getNumOfPendingRequests());
+   //verify(resourceManagerGateway, 
times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2);
+
+   // 3. test the allocation is timed out in client side but the 
request is fulfilled in slot pool
+   AllocationID allocationID3 = new AllocationID();
+   future = slotPoolGateway.allocateSlot(allocationID3, 
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   ResourceID resourceID = ResourceID.generate();
+   AllocatedSlot allocatedSlot = 
SlotPoolTest.createAllocatedSlot(resourceID, allocationID3, jid, 
DEFAULT_TESTING_PROFILE);
+   slotPoolGateway.registerTaskManager(resourceID);
+   assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+
+   assertEquals(0, pool.getNumOfPendingRequests());
+   assertTrue(pool.getAllocatedSlots().contains(allocationID3));
+
+   pool.cancelSlotAllocation(allocationID3);
+   assertFalse(pool.getAllocatedSlots().contains(allocationID3));
+   assertTrue(pool.getAvailableSlots().contains(allocationID3));
+   }
+
+   @Test
+   public void testProviderAndOwner() throws 

[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236182#comment-16236182
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148570658
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfWaitingForResourceRequests());
+
+   pool.cancelSlotAllocation(allocationID);
+   assertEquals(0, pool.getNumOfWaitingForResourceRequests());
+
+   // 2. test the pending request is in pendingRequests
+   ResourceManagerGateway resourceManagerGateway = 
SlotPoolTest.createResourceManagerGatewayMock();
+   pool.connectToResourceManager(resourceManagerGateway);
+
+   AllocationID allocationID2 = new AllocationID();
+   future = slotPoolGateway.allocateSlot(allocationID2, 
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfPendingRequests());
+
+   pool.cancelSlotAllocation(allocationID2);
+   assertEquals(0, pool.getNumOfPendingRequests());
+   //verify(resourceManagerGateway, 
times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2);
+
+   // 3. test the allocation is timed out in client side but the 
request is fulfilled in slot pool
--- End diff --

separate test


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236184#comment-16236184
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148598542
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -1006,7 +1044,13 @@ public boolean returnAllocatedSlot(Slot slot) {
Iterable locationPreferences = 

task.getTaskToExecute().getVertex().getPreferredLocations();
 
-   return gateway.allocateSlot(task, 
ResourceProfile.UNKNOWN, locationPreferences, timeout);
+   final AllocationID allocationID = new AllocationID();
+   CompletableFuture slotFuture = 
gateway.allocateSlot(allocationID, ResourceProfile.UNKNOWN, 
locationPreferences, timeout);
+   slotFuture.exceptionally((Throwable failure) -> {
--- End diff --

I think `slotFuture.whenComplete` would better fit here.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236187#comment-16236187
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148596041
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -262,23 +263,36 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   ScheduledUnit task,
+   public CompletableFuture allocateSlot(AllocationID 
allocationID,
ResourceProfile resources,
Iterable locationPreferences,
Time timeout) {
 
-   return internalAllocateSlot(task, resources, 
locationPreferences);
+   return internalAllocateSlot(allocationID, resources, 
locationPreferences);
}
 
@Override
public void returnAllocatedSlot(Slot slot) {
internalReturnAllocatedSlot(slot);
}
 
+   @Override
+   public void cancelSlotAllocation(AllocationID allocationID) {
+   waitingForResourceManager.remove(allocationID);
--- End diff --

we should fail the pending request properly. E.g. check if the slot is in 
`waitingForResourceManager` or `pendingRequests`. If yes, then remove and call 
`failPendingRequest`.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236174#comment-16236174
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148571851
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java 
---
@@ -294,11 +296,11 @@ public void returnAllocatedSlot(Slot slot) {
}
}
 
-   private static ResourceManagerGateway 
createResourceManagerGatewayMock() {
+   static ResourceManagerGateway createResourceManagerGatewayMock() {
ResourceManagerGateway resourceManagerGateway = 
mock(ResourceManagerGateway.class);
when(resourceManagerGateway
-   .requestSlot(any(JobMasterId.class), 
any(SlotRequest.class), any(Time.class)))
-   .thenReturn(mock(CompletableFuture.class, 
RETURNS_MOCKS));
+   .requestSlot(any(JobMasterId.class), 
any(SlotRequest.class), any(Time.class)))
+   .thenReturn(mock(CompletableFuture.class, 
RETURNS_MOCKS));
--- End diff --

Why not returning a proper `CompletableFuture` here?


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236183#comment-16236183
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148571183
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfWaitingForResourceRequests());
+
+   pool.cancelSlotAllocation(allocationID);
+   assertEquals(0, pool.getNumOfWaitingForResourceRequests());
+
+   // 2. test the pending request is in pendingRequests
+   ResourceManagerGateway resourceManagerGateway = 
SlotPoolTest.createResourceManagerGatewayMock();
+   pool.connectToResourceManager(resourceManagerGateway);
+
+   AllocationID allocationID2 = new AllocationID();
+   future = slotPoolGateway.allocateSlot(allocationID2, 
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfPendingRequests());
+
+   pool.cancelSlotAllocation(allocationID2);
+   assertEquals(0, pool.getNumOfPendingRequests());
+   //verify(resourceManagerGateway, 
times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2);
+
+   // 3. test the allocation is timed out in client side but the 
request is fulfilled in slot pool
+   AllocationID allocationID3 = new AllocationID();
+   future = slotPoolGateway.allocateSlot(allocationID3, 
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   ResourceID resourceID = ResourceID.generate();
+   AllocatedSlot allocatedSlot = 
SlotPoolTest.createAllocatedSlot(resourceID, allocationID3, jid, 
DEFAULT_TESTING_PROFILE);
+   slotPoolGateway.registerTaskManager(resourceID);
+   assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+
+   assertEquals(0, pool.getNumOfPendingRequests());
+   assertTrue(pool.getAllocatedSlots().contains(allocationID3));
+
+   pool.cancelSlotAllocation(allocationID3);
+   assertFalse(pool.getAllocatedSlots().contains(allocationID3));
+   assertTrue(pool.getAvailableSlots().contains(allocationID3));
+   }
+
+   @Test
+   public void testProviderAndOwner() throws 

[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236170#comment-16236170
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148569315
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -361,9 +374,19 @@ private void 
slotRequestToResourceManagerFailed(AllocationID allocationID, Throw
}
 
private void checkTimeoutSlotAllocation(AllocationID allocationID) {
+   removePendingRequestWithException(allocationID, new 
TimeoutException("Slot allocation request " + allocationID + " timed out"));
+   }
+
+   private void removePendingRequestWithException(AllocationID 
allocationID, Exception e) {
PendingRequest request = pendingRequests.remove(allocationID);
-   if (request != null && !request.getFuture().isDone()) {
-   request.getFuture().completeExceptionally(new 
TimeoutException("Slot allocation request timed out"));
+   if (request != null && (!request.getFuture().isDone() || 
request.getFuture().isCompletedExceptionally())) {
+   //TODO: the following line depends on the pr: 
https://github.com/apache/flink/pull/4887
+   //if (resourceManagerGateway != null) {
+   //  resourceManagerGateway.cancelSlotRequest(jobId, 
jobMasterId, allocationID);
+   //}
--- End diff --

This should be removed and added once #4887 has been merged.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236190#comment-16236190
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148602423
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfWaitingForResourceRequests());
+
+   pool.cancelSlotAllocation(allocationID);
+   assertEquals(0, pool.getNumOfWaitingForResourceRequests());
--- End diff --

I think the test could look the following:

```
slotPoolGateway.allocateSlot();
CompletableFuture numberPendingRequestsFuture = 
slotPoolGateway.requestNumberPendingRequests();
assertEquals(1, numberPendingRequestsFuture.get());
slotPoolGateway.cancelAllocation();
CompletableFuture numberPendingRequestsFuture = 
slotPoolGateway.requestNumberPendingRequests();
assertEquals(0, numberPendingRequestsFuture.get());
```


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236186#comment-16236186
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148601504
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -645,6 +668,21 @@ AllocatedSlots getAllocatedSlots() {
return allocatedSlots;
}
 
+   @VisibleForTesting
+   AvailableSlots getAvailableSlots() {
+   return availableSlots;
+   }
+
+   @VisibleForTesting
+   int getNumOfWaitingForResourceRequests() {
+   return waitingForResourceManager.size();
+   }
+
+   @VisibleForTesting
+   int getNumOfPendingRequests() {
+   return pendingRequests.size();
+   }
--- End diff --

I think we should not make internal state easily accessible because it will 
usually be modified by the main thread. Also when checking a certain 
interleaving you might be falsely entrapped that you can do something like
```
slotPool.asyncAddPendingRequest()
slotPool.getNumOfPendingRequests() // this returns +1 pending requests
```

This is might work but sometimes it also does not work because the 
concurrent operation has not been completed. I would like to make concurrent 
operations explicit by, for example, returning a `CompletableFuture 
getNumberOfPendingRequests` if at all.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236176#comment-16236176
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148570094
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
--- End diff --

Better to check with `instanceOf` I think.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236189#comment-16236189
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148602544
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfWaitingForResourceRequests());
+
+   pool.cancelSlotAllocation(allocationID);
+   assertEquals(0, pool.getNumOfWaitingForResourceRequests());
+
+   // 2. test the pending request is in pendingRequests
+   ResourceManagerGateway resourceManagerGateway = 
SlotPoolTest.createResourceManagerGatewayMock();
+   pool.connectToResourceManager(resourceManagerGateway);
+
+   AllocationID allocationID2 = new AllocationID();
+   future = slotPoolGateway.allocateSlot(allocationID2, 
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfPendingRequests());
+
+   pool.cancelSlotAllocation(allocationID2);
+   assertEquals(0, pool.getNumOfPendingRequests());
--- End diff --

It's not guaranteed that `pool.getNumofPendingRequests()` is executed after 
`pool.cancelSlotAllocation` has been executed.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236171#comment-16236171
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148570278
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfWaitingForResourceRequests());
+
+   pool.cancelSlotAllocation(allocationID);
+   assertEquals(0, pool.getNumOfWaitingForResourceRequests());
+
+   // 2. test the pending request is in pendingRequests
--- End diff --

This should be a separate test.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236181#comment-16236181
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148570875
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfWaitingForResourceRequests());
+
+   pool.cancelSlotAllocation(allocationID);
+   assertEquals(0, pool.getNumOfWaitingForResourceRequests());
+
+   // 2. test the pending request is in pendingRequests
+   ResourceManagerGateway resourceManagerGateway = 
SlotPoolTest.createResourceManagerGatewayMock();
+   pool.connectToResourceManager(resourceManagerGateway);
+
+   AllocationID allocationID2 = new AllocationID();
+   future = slotPoolGateway.allocateSlot(allocationID2, 
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfPendingRequests());
+
+   pool.cancelSlotAllocation(allocationID2);
+   assertEquals(0, pool.getNumOfPendingRequests());
+   //verify(resourceManagerGateway, 
times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2);
+
+   // 3. test the allocation is timed out in client side but the 
request is fulfilled in slot pool
+   AllocationID allocationID3 = new AllocationID();
+   future = slotPoolGateway.allocateSlot(allocationID3, 
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   ResourceID resourceID = ResourceID.generate();
+   AllocatedSlot allocatedSlot = 
SlotPoolTest.createAllocatedSlot(resourceID, allocationID3, jid, 
DEFAULT_TESTING_PROFILE);
+   slotPoolGateway.registerTaskManager(resourceID);
+   assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+
+   assertEquals(0, pool.getNumOfPendingRequests());
+   assertTrue(pool.getAllocatedSlots().contains(allocationID3));
+
+   pool.cancelSlotAllocation(allocationID3);
+   assertFalse(pool.getAllocatedSlots().contains(allocationID3));
+   assertTrue(pool.getAvailableSlots().contains(allocationID3));
+   }
+
+   @Test
+   public void testProviderAndOwner() throws 

[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236178#comment-16236178
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148596180
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -262,23 +263,36 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   ScheduledUnit task,
+   public CompletableFuture allocateSlot(AllocationID 
allocationID,
ResourceProfile resources,
Iterable locationPreferences,
Time timeout) {
 
-   return internalAllocateSlot(task, resources, 
locationPreferences);
+   return internalAllocateSlot(allocationID, resources, 
locationPreferences);
}
 
@Override
public void returnAllocatedSlot(Slot slot) {
internalReturnAllocatedSlot(slot);
}
 
+   @Override
+   public void cancelSlotAllocation(AllocationID allocationID) {
+   waitingForResourceManager.remove(allocationID);
+   
+   removePendingRequestWithException(allocationID, new 
CancellationException("Allocation " + allocationID + " cancelled"));
+
+   if (allocatedSlots.contains(allocationID)) {
+   Slot slot = allocatedSlots.get(allocationID);
--- End diff --

We could avoid the `contains` call by simply calling `get` and then compare 
against `null`.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236188#comment-16236188
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148599978
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
--- End diff --

This is not needed if you add `throws Exception` to the test method.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236175#comment-16236175
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148570569
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfWaitingForResourceRequests());
+
+   pool.cancelSlotAllocation(allocationID);
+   assertEquals(0, pool.getNumOfWaitingForResourceRequests());
+
+   // 2. test the pending request is in pendingRequests
+   ResourceManagerGateway resourceManagerGateway = 
SlotPoolTest.createResourceManagerGatewayMock();
+   pool.connectToResourceManager(resourceManagerGateway);
+
+   AllocationID allocationID2 = new AllocationID();
+   future = slotPoolGateway.allocateSlot(allocationID2, 
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfPendingRequests());
+
+   pool.cancelSlotAllocation(allocationID2);
+   assertEquals(0, pool.getNumOfPendingRequests());
+   //verify(resourceManagerGateway, 
times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2);
--- End diff --

should be removed


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236179#comment-16236179
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148570462
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
--- End diff --

Timeout should be lower


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236177#comment-16236177
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148572112
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java 
---
@@ -294,11 +296,11 @@ public void returnAllocatedSlot(Slot slot) {
}
}
 
-   private static ResourceManagerGateway 
createResourceManagerGatewayMock() {
+   static ResourceManagerGateway createResourceManagerGatewayMock() {
ResourceManagerGateway resourceManagerGateway = 
mock(ResourceManagerGateway.class);
--- End diff --

We could think about implementing a `SimpleAckingResourceManagerGateway` 
for testing purposes. That way we avoid mocking too much.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236172#comment-16236172
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148570387
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
--- End diff --

Timeouts should be lower.


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6434) There may be allocatedSlots leak in SlotPool

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16236173#comment-16236173
 ] 

ASF GitHub Bot commented on FLINK-6434:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148592528
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -361,9 +374,19 @@ private void 
slotRequestToResourceManagerFailed(AllocationID allocationID, Throw
}
 
private void checkTimeoutSlotAllocation(AllocationID allocationID) {
+   removePendingRequestWithException(allocationID, new 
TimeoutException("Slot allocation request " + allocationID + " timed out"));
+   }
+
+   private void removePendingRequestWithException(AllocationID 
allocationID, Exception e) {
--- End diff --

maybe we could refactor this method into 
`failPendingRequest(PendingRequest, Exception)`, then it could be used by 
`checkTimeoutSlotAllocation` and `checkTimeoutRequestWaitingForResourceManager`


> There may be allocatedSlots leak in SlotPool
> 
>
> Key: FLINK-6434
> URL: https://issues.apache.org/jira/browse/FLINK-6434
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Major
>  Labels: flip-6
>
> If the call allocateSlot() from Execution to Slotpool timeout, the job will 
> begin to failover, but the pending request are still in SlotPool, if then a 
> new slot register to SlotPool, it may be fulfill the outdated pending request 
> and be added to allocatedSlots, but it will never be used and will never be 
> recycled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4937: [FLINK-6434] [runtime] cancel slot allocation if r...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148601504
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -645,6 +668,21 @@ AllocatedSlots getAllocatedSlots() {
return allocatedSlots;
}
 
+   @VisibleForTesting
+   AvailableSlots getAvailableSlots() {
+   return availableSlots;
+   }
+
+   @VisibleForTesting
+   int getNumOfWaitingForResourceRequests() {
+   return waitingForResourceManager.size();
+   }
+
+   @VisibleForTesting
+   int getNumOfPendingRequests() {
+   return pendingRequests.size();
+   }
--- End diff --

I think we should not make internal state easily accessible because it will 
usually be modified by the main thread. Also when checking a certain 
interleaving you might be falsely entrapped that you can do something like
```
slotPool.asyncAddPendingRequest()
slotPool.getNumOfPendingRequests() // this returns +1 pending requests
```

This is might work but sometimes it also does not work because the 
concurrent operation has not been completed. I would like to make concurrent 
operations explicit by, for example, returning a `CompletableFuture 
getNumberOfPendingRequests` if at all.


---


[GitHub] flink pull request #4937: [FLINK-6434] [runtime] cancel slot allocation if r...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148569315
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -361,9 +374,19 @@ private void 
slotRequestToResourceManagerFailed(AllocationID allocationID, Throw
}
 
private void checkTimeoutSlotAllocation(AllocationID allocationID) {
+   removePendingRequestWithException(allocationID, new 
TimeoutException("Slot allocation request " + allocationID + " timed out"));
+   }
+
+   private void removePendingRequestWithException(AllocationID 
allocationID, Exception e) {
PendingRequest request = pendingRequests.remove(allocationID);
-   if (request != null && !request.getFuture().isDone()) {
-   request.getFuture().completeExceptionally(new 
TimeoutException("Slot allocation request timed out"));
+   if (request != null && (!request.getFuture().isDone() || 
request.getFuture().isCompletedExceptionally())) {
+   //TODO: the following line depends on the pr: 
https://github.com/apache/flink/pull/4887
+   //if (resourceManagerGateway != null) {
+   //  resourceManagerGateway.cancelSlotRequest(jobId, 
jobMasterId, allocationID);
+   //}
--- End diff --

This should be removed and added once #4887 has been merged.


---


[GitHub] flink pull request #4937: [FLINK-6434] [runtime] cancel slot allocation if r...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148596180
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -262,23 +263,36 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   ScheduledUnit task,
+   public CompletableFuture allocateSlot(AllocationID 
allocationID,
ResourceProfile resources,
Iterable locationPreferences,
Time timeout) {
 
-   return internalAllocateSlot(task, resources, 
locationPreferences);
+   return internalAllocateSlot(allocationID, resources, 
locationPreferences);
}
 
@Override
public void returnAllocatedSlot(Slot slot) {
internalReturnAllocatedSlot(slot);
}
 
+   @Override
+   public void cancelSlotAllocation(AllocationID allocationID) {
+   waitingForResourceManager.remove(allocationID);
+   
+   removePendingRequestWithException(allocationID, new 
CancellationException("Allocation " + allocationID + " cancelled"));
+
+   if (allocatedSlots.contains(allocationID)) {
+   Slot slot = allocatedSlots.get(allocationID);
--- End diff --

We could avoid the `contains` call by simply calling `get` and then compare 
against `null`.


---


[GitHub] flink pull request #4937: [FLINK-6434] [runtime] cancel slot allocation if r...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148592528
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -361,9 +374,19 @@ private void 
slotRequestToResourceManagerFailed(AllocationID allocationID, Throw
}
 
private void checkTimeoutSlotAllocation(AllocationID allocationID) {
+   removePendingRequestWithException(allocationID, new 
TimeoutException("Slot allocation request " + allocationID + " timed out"));
+   }
+
+   private void removePendingRequestWithException(AllocationID 
allocationID, Exception e) {
--- End diff --

maybe we could refactor this method into 
`failPendingRequest(PendingRequest, Exception)`, then it could be used by 
`checkTimeoutSlotAllocation` and `checkTimeoutRequestWaitingForResourceManager`


---


[GitHub] flink pull request #4937: [FLINK-6434] [runtime] cancel slot allocation if r...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148602544
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfWaitingForResourceRequests());
+
+   pool.cancelSlotAllocation(allocationID);
+   assertEquals(0, pool.getNumOfWaitingForResourceRequests());
+
+   // 2. test the pending request is in pendingRequests
+   ResourceManagerGateway resourceManagerGateway = 
SlotPoolTest.createResourceManagerGatewayMock();
+   pool.connectToResourceManager(resourceManagerGateway);
+
+   AllocationID allocationID2 = new AllocationID();
+   future = slotPoolGateway.allocateSlot(allocationID2, 
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfPendingRequests());
+
+   pool.cancelSlotAllocation(allocationID2);
+   assertEquals(0, pool.getNumOfPendingRequests());
--- End diff --

It's not guaranteed that `pool.getNumofPendingRequests()` is executed after 
`pool.cancelSlotAllocation` has been executed.


---


[GitHub] flink pull request #4937: [FLINK-6434] [runtime] cancel slot allocation if r...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148572112
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java 
---
@@ -294,11 +296,11 @@ public void returnAllocatedSlot(Slot slot) {
}
}
 
-   private static ResourceManagerGateway 
createResourceManagerGatewayMock() {
+   static ResourceManagerGateway createResourceManagerGatewayMock() {
ResourceManagerGateway resourceManagerGateway = 
mock(ResourceManagerGateway.class);
--- End diff --

We could think about implementing a `SimpleAckingResourceManagerGateway` 
for testing purposes. That way we avoid mocking too much.


---


[GitHub] flink pull request #4937: [FLINK-6434] [runtime] cancel slot allocation if r...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148596041
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---
@@ -262,23 +263,36 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   ScheduledUnit task,
+   public CompletableFuture allocateSlot(AllocationID 
allocationID,
ResourceProfile resources,
Iterable locationPreferences,
Time timeout) {
 
-   return internalAllocateSlot(task, resources, 
locationPreferences);
+   return internalAllocateSlot(allocationID, resources, 
locationPreferences);
}
 
@Override
public void returnAllocatedSlot(Slot slot) {
internalReturnAllocatedSlot(slot);
}
 
+   @Override
+   public void cancelSlotAllocation(AllocationID allocationID) {
+   waitingForResourceManager.remove(allocationID);
--- End diff --

we should fail the pending request properly. E.g. check if the slot is in 
`waitingForResourceManager` or `pendingRequests`. If yes, then remove and call 
`failPendingRequest`.


---


[GitHub] flink pull request #4937: [FLINK-6434] [runtime] cancel slot allocation if r...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148599674
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
 ---
@@ -86,10 +85,16 @@
// 

 
CompletableFuture allocateSlot(
-   ScheduledUnit task,
+   AllocationID allocationID,
ResourceProfile resources,
Iterable locationPreferences,
@RpcTimeout Time timeout);
 
void returnAllocatedSlot(Slot slot);
+
+   /**
+* Cancel a slot allocation.
+* This method should be called when the CompletableFuture returned by 
allocateSlot completed exceptionally.
--- End diff --

Params description is missing.


---


[GitHub] flink pull request #4937: [FLINK-6434] [runtime] cancel slot allocation if r...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148571183
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfWaitingForResourceRequests());
+
+   pool.cancelSlotAllocation(allocationID);
+   assertEquals(0, pool.getNumOfWaitingForResourceRequests());
+
+   // 2. test the pending request is in pendingRequests
+   ResourceManagerGateway resourceManagerGateway = 
SlotPoolTest.createResourceManagerGatewayMock();
+   pool.connectToResourceManager(resourceManagerGateway);
+
+   AllocationID allocationID2 = new AllocationID();
+   future = slotPoolGateway.allocateSlot(allocationID2, 
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   assertEquals(1, pool.getNumOfPendingRequests());
+
+   pool.cancelSlotAllocation(allocationID2);
+   assertEquals(0, pool.getNumOfPendingRequests());
+   //verify(resourceManagerGateway, 
times(1)).cancelSlotRequest(jid, any(JobMasterId.class), allocationID2);
+
+   // 3. test the allocation is timed out in client side but the 
request is fulfilled in slot pool
+   AllocationID allocationID3 = new AllocationID();
+   future = slotPoolGateway.allocateSlot(allocationID3, 
DEFAULT_TESTING_PROFILE, null, Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
+   fail("We expected a AskTimeoutException.");
+   }
+   catch (ExecutionException e) {
+   assertEquals(AskTimeoutException.class, 
e.getCause().getClass());
+   }
+   catch (Exception e) {
+   fail("wrong exception: " + e);
+   }
+
+   ResourceID resourceID = ResourceID.generate();
+   AllocatedSlot allocatedSlot = 
SlotPoolTest.createAllocatedSlot(resourceID, allocationID3, jid, 
DEFAULT_TESTING_PROFILE);
+   slotPoolGateway.registerTaskManager(resourceID);
+   assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get());
+
+   assertEquals(0, pool.getNumOfPendingRequests());
+   assertTrue(pool.getAllocatedSlots().contains(allocationID3));
+
+   pool.cancelSlotAllocation(allocationID3);
+   assertFalse(pool.getAllocatedSlots().contains(allocationID3));
+   assertTrue(pool.getAvailableSlots().contains(allocationID3));
+   }
+
+   @Test
+   public void testProviderAndOwner() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+ 

[GitHub] flink pull request #4937: [FLINK-6434] [runtime] cancel slot allocation if r...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148570462
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
--- End diff --

Timeout should be lower


---


[GitHub] flink pull request #4937: [FLINK-6434] [runtime] cancel slot allocation if r...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148570387
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 ---
@@ -99,4 +108,129 @@ public void testSlotAllocationNoResourceManager() 
throws Exception {
fail("wrong exception: " + e);
}
}
+
+   @Test
+   public void testCancelSlotAllocation() throws Exception {
+   final JobID jid = new JobID();
+
+   final SlotPool pool = new SlotPool(
+   rpcService, jid,
+   SystemClock.getInstance(),
+   Time.days(1), Time.days(1),
+   Time.seconds(3) // this is the timeout for the 
request tested here
+   );
+   pool.start(JobMasterId.generate(), "foobar");
+   SlotPoolGateway slotPoolGateway = 
pool.getSelfGateway(SlotPoolGateway.class);
+
+   // 1. test the pending request is in 
waitingResourceManagerRequests
+   AllocationID allocationID = new AllocationID();
+   CompletableFuture future = 
slotPoolGateway.allocateSlot(allocationID, DEFAULT_TESTING_PROFILE, null, 
Time.seconds(1));
+
+   try {
+   future.get(2, TimeUnit.SECONDS);
--- End diff --

Timeouts should be lower.


---


[GitHub] flink pull request #4937: [FLINK-6434] [runtime] cancel slot allocation if r...

2017-11-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4937#discussion_r148571851
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java 
---
@@ -294,11 +296,11 @@ public void returnAllocatedSlot(Slot slot) {
}
}
 
-   private static ResourceManagerGateway 
createResourceManagerGatewayMock() {
+   static ResourceManagerGateway createResourceManagerGatewayMock() {
ResourceManagerGateway resourceManagerGateway = 
mock(ResourceManagerGateway.class);
when(resourceManagerGateway
-   .requestSlot(any(JobMasterId.class), 
any(SlotRequest.class), any(Time.class)))
-   .thenReturn(mock(CompletableFuture.class, 
RETURNS_MOCKS));
+   .requestSlot(any(JobMasterId.class), 
any(SlotRequest.class), any(Time.class)))
+   .thenReturn(mock(CompletableFuture.class, 
RETURNS_MOCKS));
--- End diff --

Why not returning a proper `CompletableFuture` here?


---


  1   2   3   >