[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517821#comment-16517821
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
Could anybody have a look at this?


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

2018-06-19 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
Could anybody have a look at this?


---


[GitHub] flink issue #6185: [FLINK-9619][YARN] Eagerly close the connection with task...

2018-06-19 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6185
  
CC @tillrohrmann 


---


[jira] [Commented] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517794#comment-16517794
 ] 

ASF GitHub Bot commented on FLINK-9619:
---

GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/6185

[FLINK-9619][YARN] Eagerly close the connection with task manager when the 
container is completed

## What is the purpose of the change

*We should always eagerly close the connection with task manager when the 
container is completed.*


## Brief change log

  - *Eagerly close the connection with task manager when the container is 
completed*


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - No


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink FLINK-9619

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6185.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6185


commit a667ea120b0c2519ce45e5919c1377e897897d17
Author: sihuazhou 
Date:   2018-06-20T04:41:05Z

Eagerly close the connection with task manager when the container is 
completed.




> Always close the task manager connection when the container is completed in 
> YarnResourceManager
> ---
>
> Key: FLINK-9619
> URL: https://issues.apache.org/jira/browse/FLINK-9619
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.6.0, 1.5.1
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> We should always eagerly close the connection with task manager when the 
> container is completed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517795#comment-16517795
 ] 

ASF GitHub Bot commented on FLINK-9619:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6185
  
CC @tillrohrmann 


> Always close the task manager connection when the container is completed in 
> YarnResourceManager
> ---
>
> Key: FLINK-9619
> URL: https://issues.apache.org/jira/browse/FLINK-9619
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.6.0, 1.5.1
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> We should always eagerly close the connection with task manager when the 
> container is completed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6185: [FLINK-9619][YARN] Eagerly close the connection wi...

2018-06-19 Thread sihuazhou
GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/6185

[FLINK-9619][YARN] Eagerly close the connection with task manager when the 
container is completed

## What is the purpose of the change

*We should always eagerly close the connection with task manager when the 
container is completed.*


## Brief change log

  - *Eagerly close the connection with task manager when the container is 
completed*


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - No


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink FLINK-9619

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6185.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6185


commit a667ea120b0c2519ce45e5919c1377e897897d17
Author: sihuazhou 
Date:   2018-06-20T04:41:05Z

Eagerly close the connection with task manager when the container is 
completed.




---


[jira] [Created] (FLINK-9619) Always close the task manager connection when the container is completed in YarnResourceManager

2018-06-19 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9619:
-

 Summary: Always close the task manager connection when the 
container is completed in YarnResourceManager
 Key: FLINK-9619
 URL: https://issues.apache.org/jira/browse/FLINK-9619
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.6.0, 1.5.1
Reporter: Sihua Zhou
Assignee: Sihua Zhou
 Fix For: 1.6.0, 1.5.1


We should always eagerly close the connection with task manager when the 
container is completed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9417) Send heartbeat requests from RPC endpoint's main thread

2018-06-19 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517790#comment-16517790
 ] 

Sihua Zhou commented on FLINK-9417:
---

Hi [~till.rohrmann] One thing come to my mind, If we send heartbeat requests 
from RPC's main thread, then should we also do a checking for the 
HEARTBEAT_INTERVAL with a sanity min value(currently it only need to greater 
than 0)? If the user configure a very small value e.g 10, then the resource 
manager and the job master will be kept always very busy just for sending the 
heartbeat.

> Send heartbeat requests from RPC endpoint's main thread
> ---
>
> Key: FLINK-9417
> URL: https://issues.apache.org/jira/browse/FLINK-9417
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>
> Currently, we use the {{RpcService#scheduledExecutor}} to send heartbeat 
> requests to remote targets. This has the problem that we still see heartbeats 
> from this endpoint also if its main thread is currently blocked. Due to this, 
> the heartbeat response cannot be processed and the remote target times out. 
> On the remote side, this won't be noticed because it still receives the 
> heartbeat requests.
> A solution to this problem would be to send the heartbeat requests to the 
> remote thread through the RPC endpoint's main thread. That way, also the 
> heartbeats would be blocked if the main thread is blocked/busy.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517789#comment-16517789
 ] 

ASF GitHub Bot commented on FLINK-9187:
---

Github user lamber-ken commented on the issue:

https://github.com/apache/flink/pull/6184
  
Please click here for details 
[old-flink-9187](https://github.com/apache/flink/pull/5857)


> add prometheus pushgateway reporter
> ---
>
> Key: FLINK-9187
> URL: https://issues.apache.org/jira/browse/FLINK-9187
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> make flink system can send metrics to prometheus via pushgateway.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6184: [FLINK-9187][METRICS] add prometheus pushgateway reporter

2018-06-19 Thread lamber-ken
Github user lamber-ken commented on the issue:

https://github.com/apache/flink/pull/6184
  
Please click here for details 
[old-flink-9187](https://github.com/apache/flink/pull/5857)


---


[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517788#comment-16517788
 ] 

ASF GitHub Bot commented on FLINK-9187:
---

Github user lamber-ken closed the pull request at:

https://github.com/apache/flink/pull/5857


> add prometheus pushgateway reporter
> ---
>
> Key: FLINK-9187
> URL: https://issues.apache.org/jira/browse/FLINK-9187
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> make flink system can send metrics to prometheus via pushgateway.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5857: [FLINK-9187][METRICS] add prometheus pushgateway r...

2018-06-19 Thread lamber-ken
Github user lamber-ken closed the pull request at:

https://github.com/apache/flink/pull/5857


---


[GitHub] flink pull request #6184: add prometheus pushgateway reporter

2018-06-19 Thread lamber-ken
GitHub user lamber-ken opened a pull request:

https://github.com/apache/flink/pull/6184

add prometheus pushgateway reporter

## What is the purpose of the change
This pull request makes flink system can send metrics to prometheus via 
pushgateway. it may be useful.

## Brief change log

  - Add prometheus pushgateway repoter
  - Restructure the code of the promethues reporter part

## Verifying this change

This change is already covered by existing tests. [prometheus 
test](https://github.com/apache/flink/tree/master/flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus)

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lamber-ken/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6184.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6184


commit 1a8e1f6193823e70b1dc6abc1146299042c25c7d
Author: lamber-ken 
Date:   2018-06-20T04:26:10Z

add prometheus pushgateway reporter




---


[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517760#comment-16517760
 ] 

ASF GitHub Bot commented on FLINK-9187:
---

Github user lamber-ken commented on the issue:

https://github.com/apache/flink/pull/5857
  
sorry, I reforked `flink` project, do I need to start a new PR?

![image](https://user-images.githubusercontent.com/20113411/41635822-76271486-747d-11e8-9ad3-c6447c1b930c.png)



> add prometheus pushgateway reporter
> ---
>
> Key: FLINK-9187
> URL: https://issues.apache.org/jira/browse/FLINK-9187
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> make flink system can send metrics to prometheus via pushgateway.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5857: [FLINK-9187][METRICS] add prometheus pushgateway reporter

2018-06-19 Thread lamber-ken
Github user lamber-ken commented on the issue:

https://github.com/apache/flink/pull/5857
  
sorry, I reforked `flink` project, do I need to start a new PR?

![image](https://user-images.githubusercontent.com/20113411/41635822-76271486-747d-11e8-9ad3-c6447c1b930c.png)



---


[jira] [Commented] (FLINK-9588) Reuse the same conditionContext with in a same computationState

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517732#comment-16517732
 ] 

ASF GitHub Bot commented on FLINK-9588:
---

Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6168
  
Is it ok now ? @dawidwys 


> Reuse the same conditionContext with in a same computationState
> ---
>
> Key: FLINK-9588
> URL: https://issues.apache.org/jira/browse/FLINK-9588
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>
> Now cep checkFilterCondition with a newly created Conditioncontext for each 
> edge, which will result in the repeatable getEventsForPattern because of the 
> different Conditioncontext Object.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6168: [FLINK-9588][CEP]Reused context with same computation sta...

2018-06-19 Thread Aitozi
Github user Aitozi commented on the issue:

https://github.com/apache/flink/pull/6168
  
Is it ok now ? @dawidwys 


---


[jira] [Commented] (FLINK-9380) Failing end-to-end tests should not clean up logs

2018-06-19 Thread Deepak Sharma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9380?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517718#comment-16517718
 ] 

Deepak Sharma commented on FLINK-9380:
--

Hi [~till.rohrmann], just a friendly reminder of this Jira :) 

> Failing end-to-end tests should not clean up logs
> -
>
> Key: FLINK-9380
> URL: https://issues.apache.org/jira/browse/FLINK-9380
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Deepak Sharma
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.6.0, 1.5.1
>
>
> Some of the end-to-end tests clean up their logs also in the failure case. 
> This makes debugging and understanding the problem extremely difficult. 
> Ideally, the scripts says where it stored the respective logs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9563) Migrate integration tests for CEP

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517717#comment-16517717
 ] 

ASF GitHub Bot commented on FLINK-9563:
---

Github user deepaks4077 commented on the issue:

https://github.com/apache/flink/pull/6170
  
Hi @zentol, just a friendly reminder about this pull request :)


> Migrate integration tests for CEP
> -
>
> Key: FLINK-9563
> URL: https://issues.apache.org/jira/browse/FLINK-9563
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Deepak Sharma
>Assignee: Deepak Sharma
>Priority: Minor
>
> Covers all integration tests under
> apache-flink/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6170: [FLINK-9563]: Using a custom sink function for tests in C...

2018-06-19 Thread deepaks4077
Github user deepaks4077 commented on the issue:

https://github.com/apache/flink/pull/6170
  
Hi @zentol, just a friendly reminder about this pull request :)


---


[jira] [Commented] (FLINK-9614) Improve the error message for Compiler#compile

2018-06-19 Thread mingleizhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517679#comment-16517679
 ] 

mingleizhang commented on FLINK-9614:
-

I close this since we catch a throwable. stackoverflowerror is acceptable.

> Improve the error message for Compiler#compile
> --
>
> Key: FLINK-9614
> URL: https://issues.apache.org/jira/browse/FLINK-9614
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: mingleizhang
>Assignee: mingleizhang
>Priority: Major
>
> When the below sql has too long. Like
> case when  case when .
>  when host in 
> ('114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247')
>  then 'condition'
> Then cause the {{StackOverflowError}}. And the current code is, but I would 
> suggest prompt users add the {{-Xss 20m}} to solve this, instead of {{This is 
> a bug..}}
> {code:java}
> trait Compiler[T] {
>   @throws(classOf[CompileException])
>   def compile(cl: ClassLoader, name: String, code: String): Class[T] = {
> require(cl != null, "Classloader must not be null.")
> val compiler = new SimpleCompiler()
> compiler.setParentClassLoader(cl)
> try {
>   compiler.cook(code)
> } catch {
>   case t: Throwable =>
> throw new InvalidProgramException("Table program cannot be compiled. 
> " +
>   "This is a bug. Please file an issue.", t)
> }
> compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]]
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9614) Improve the error message for Compiler#compile

2018-06-19 Thread mingleizhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang closed FLINK-9614.
---

> Improve the error message for Compiler#compile
> --
>
> Key: FLINK-9614
> URL: https://issues.apache.org/jira/browse/FLINK-9614
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: mingleizhang
>Assignee: mingleizhang
>Priority: Major
>
> When the below sql has too long. Like
> case when  case when .
>  when host in 
> ('114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247')
>  then 'condition'
> Then cause the {{StackOverflowError}}. And the current code is, but I would 
> suggest prompt users add the {{-Xss 20m}} to solve this, instead of {{This is 
> a bug..}}
> {code:java}
> trait Compiler[T] {
>   @throws(classOf[CompileException])
>   def compile(cl: ClassLoader, name: String, code: String): Class[T] = {
> require(cl != null, "Classloader must not be null.")
> val compiler = new SimpleCompiler()
> compiler.setParentClassLoader(cl)
> try {
>   compiler.cook(code)
> } catch {
>   case t: Throwable =>
> throw new InvalidProgramException("Table program cannot be compiled. 
> " +
>   "This is a bug. Please file an issue.", t)
> }
> compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]]
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9614) Improve the error message for Compiler#compile

2018-06-19 Thread mingleizhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang resolved FLINK-9614.
-
Resolution: Not A Problem

> Improve the error message for Compiler#compile
> --
>
> Key: FLINK-9614
> URL: https://issues.apache.org/jira/browse/FLINK-9614
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: mingleizhang
>Assignee: mingleizhang
>Priority: Major
>
> When the below sql has too long. Like
> case when  case when .
>  when host in 
> ('114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247')
>  then 'condition'
> Then cause the {{StackOverflowError}}. And the current code is, but I would 
> suggest prompt users add the {{-Xss 20m}} to solve this, instead of {{This is 
> a bug..}}
> {code:java}
> trait Compiler[T] {
>   @throws(classOf[CompileException])
>   def compile(cl: ClassLoader, name: String, code: String): Class[T] = {
> require(cl != null, "Classloader must not be null.")
> val compiler = new SimpleCompiler()
> compiler.setParentClassLoader(cl)
> try {
>   compiler.cook(code)
> } catch {
>   case t: Throwable =>
> throw new InvalidProgramException("Table program cannot be compiled. 
> " +
>   "This is a bug. Please file an issue.", t)
> }
> compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]]
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9618) NullPointerException in FlinkKinesisProducer when aws.region is not set and aws.endpoint is set

2018-06-19 Thread Aaron Langford (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517645#comment-16517645
 ] 

Aaron Langford commented on FLINK-9618:
---

I would be happy to take the work to fix this.

> NullPointerException in FlinkKinesisProducer when aws.region is not set and 
> aws.endpoint is set
> ---
>
> Key: FLINK-9618
> URL: https://issues.apache.org/jira/browse/FLINK-9618
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.5.0
> Environment: N/A
>Reporter: Aaron Langford
>Priority: Minor
>   Original Estimate: 3h
>  Remaining Estimate: 3h
>
> This problem arose while trying to write to a local kinesalite instance. 
> Specifying the aws.region and the aws.endpoint is not allowed. However when 
> the aws.region is not present, a NullPointer exception is thrown.
> Here is some example Scala code:
> {code:java}
> /**
>   *
>   * @param region the AWS region the stream lives in
>   * @param streamName the stream to write records to
>   * @param endpoint if in local dev, this points to a kinesalite instance
>   * @return
>   */
> def getSink(region: String,
> streamName: String,
> endpoint: Option[String]): 
> FlinkKinesisProducer[ProcessedMobilePageView] = {
>   val props = new Properties()
>   props.put(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO")
>   endpoint match {
> case Some(uri) => props.put(AWSConfigConstants.AWS_ENDPOINT, uri)
> case None => props.put(AWSConfigConstants.AWS_REGION, region)
>   }
>   val producer = new FlinkKinesisProducer[ProcessedMobilePageView](
> new JsonSerializer[ProcessedMobilePageView](DefaultSerializationBuilder),
> props
>   )
>   producer.setDefaultStream(streamName)
>   producer
> }
> {code}
> To produce the NullPointerException, pass in `Some("localhost:4567")` for 
> endpoint.
> The source of the error is found at 
> org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.java, on 
> line 194. This line should perform some kind of check if aws.endpoint is 
> present before grabbing it from the Properties object.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9618) NullPointerException in FlinkKinesisProducer when aws.region is not set and aws.endpoint is set

2018-06-19 Thread Aaron Langford (JIRA)
Aaron Langford created FLINK-9618:
-

 Summary: NullPointerException in FlinkKinesisProducer when 
aws.region is not set and aws.endpoint is set
 Key: FLINK-9618
 URL: https://issues.apache.org/jira/browse/FLINK-9618
 Project: Flink
  Issue Type: Bug
  Components: Kinesis Connector
Affects Versions: 1.5.0
 Environment: N/A
Reporter: Aaron Langford


This problem arose while trying to write to a local kinesalite instance. 
Specifying the aws.region and the aws.endpoint is not allowed. However when the 
aws.region is not present, a NullPointer exception is thrown.

Here is some example Scala code:
{code:java}
/**
  *
  * @param region the AWS region the stream lives in
  * @param streamName the stream to write records to
  * @param endpoint if in local dev, this points to a kinesalite instance
  * @return
  */
def getSink(region: String,
streamName: String,
endpoint: Option[String]): 
FlinkKinesisProducer[ProcessedMobilePageView] = {
  val props = new Properties()
  props.put(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO")

  endpoint match {
case Some(uri) => props.put(AWSConfigConstants.AWS_ENDPOINT, uri)
case None => props.put(AWSConfigConstants.AWS_REGION, region)
  }

  val producer = new FlinkKinesisProducer[ProcessedMobilePageView](
new JsonSerializer[ProcessedMobilePageView](DefaultSerializationBuilder),
props
  )
  producer.setDefaultStream(streamName)

  producer
}
{code}
To produce the NullPointerException, pass in `Some("localhost:4567")` for 
endpoint.

The source of the error is found at 
org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.java, on 
line 194. This line should perform some kind of check if aws.endpoint is 
present before grabbing it from the Properties object.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9611) Allow for user-defined artifacts to be specified as part of a mesos overlay

2018-06-19 Thread Eron Wright (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517528#comment-16517528
 ] 

Eron Wright  commented on FLINK-9611:
-

I really like this idea, except for the proposed name because it has a 
connotation of overlaying the 'user' (which is sort of what happens with the 
Kerberos overlay).   Maybe 'MesosCustomOverlay'.

> Allow for user-defined artifacts to be specified as part of a mesos overlay
> ---
>
> Key: FLINK-9611
> URL: https://issues.apache.org/jira/browse/FLINK-9611
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration, Docker, Mesos
>Affects Versions: 1.5.0
>Reporter: Addison Higham
>Priority: Major
>
> NOTE: this assumes mesos, but this improvement could also be useful for 
> future container deployments.
> Currently, when deploying to mesos, the "Overlay" functionality is used to 
> determine which artifacts are to be downloaded into the container. However, 
> there isn't a way to plug in your own artifacts to be downloaded into the 
> container. This can cause problems with certain deployment models. 
> For example, if you are running flink in docker on mesos, you cannot easily 
> use a private docker image. Typically with mesos and private docker images, 
> you specify credentials as a URI to be downloaded into the container that 
> give permissions to download the private image. Typically, this credentials 
> expire after a few days, so baking them into a docker host isn't a solution.
> It would make sense to add a `MesosUserOverlay` that would simplify take some 
> new configuration parameters and add any custom artifacts (or possibly also 
> environment variables?) 
> Another solution (or longer term solution) might be to allow for dynamically 
> loading an overlay class for even further customization of the container 
> specification.
>  
>  
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9612) Add option for minimal artifacts being pulled in Mesos

2018-06-19 Thread Eron Wright (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517525#comment-16517525
 ] 

Eron Wright  commented on FLINK-9612:
-

Yes it makes sense that the overlays would be selective and configurable, and 
especially true that the Flink binaries aren't needed in most scenarios 
involving a docker image.   Specifically on that, I wonder if the Flink conf 
directory should be treated differently from the bin/libs (perhaps as a 
different overlay), since the image might be 'stock'.

> Add option for minimal artifacts being pulled in Mesos
> --
>
> Key: FLINK-9612
> URL: https://issues.apache.org/jira/browse/FLINK-9612
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration, Docker, Mesos
>Reporter: Addison Higham
>Priority: Major
>
> NOTE: this assumes mesos, but this improvement could also be useful for 
> future container deployments.
> Currently, in mesos, the FlinkDistributionOverlay  copies the entire `conf`, 
> `bin`, and `lib` folders from the running JobManager/ResourceManager. When 
> using docker with a pre-installed flink distribution, this is relatively 
> inefficient as it pulls jars that are already baked into the container image.
> A new option that disables pulling most (if not all?) of the 
> FlinkDistributionOverlay could allow for much faster and more scalable 
> provisions of TaskManagers. As it currently stands, trying to run a few 
> hundred TaskManagers is likely to result in poor performance in pulling all 
> the artifacts from the MesosArtifactServer



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517490#comment-16517490
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196559093
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest {
--- End diff --

Missing `extends TestLogger`


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196556101
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Tests for {@link FileUploads}.
+ */
+public class FileUploadsTest {
--- End diff --

`extends TestLogger` is missing


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196453980
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
 ---
@@ -129,4 +137,9 @@ public R getRequestBody() {
return queryParameter.getValue();
}
}
+
+   @Nonnull
+   public FileUploads getFileUploads() {
+   return uploadedFiles;
+   }
--- End diff --

I would not expose `FileUploads` to the user but rather return a 
`Collection`.


---


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517480#comment-16517480
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196556101
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Tests for {@link FileUploads}.
+ */
+public class FileUploadsTest {
--- End diff --

`extends TestLogger` is missing


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196559335
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest {
+
+   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static RestServerEndpoint serverEndpoint;
+   private static String serverAddress;
+
+   private static MultipartMixedHandler mixedHandler;
+   private static MultipartJsonHandler jsonHandler;
+   private static MultipartFileHandler fileHandler;
+   private static File file1;
+   private static File file2;
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196558949
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
 ---
@@ -63,4 +63,13 @@
 * @return description for the header
 */
String getDescription();
+
+   /**
+* Returns whether this header allows file uploads.
+*
+* @return whether this header allows file uploads
+*/
+   default boolean acceptsFileUploads() {
+   return false;
+   }
--- End diff --

Should this maybe go into `UntypedResponseMessageHeaders`? At the moment 
one can upload files for a `AbstractHandler` (e.g. 
`AbstractTaskManagerFileHandler`) implementation and also has access to it via 
the `HandlerRequest` without being able to specify whether file upload is 
allowed or not. 


---


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517479#comment-16517479
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196452418
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -116,5 +136,16 @@ private void reset() {
currentHttpPostRequestDecoder.destroy();
currentHttpPostRequestDecoder = null;
currentHttpRequest = null;
+   currentUploadDir = null;
+   currentJsonPayload = null;
+   }
+
+   public static Optional 
getMultipartJsonPayload(ChannelHandlerContext ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get());
+   }
--- End diff --

By sending the json payload down stream, we could avoid having this method.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517492#comment-16517492
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196557260
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
 ---
@@ -39,15 +41,21 @@
 public class HandlerRequest {
 
private final R requestBody;
+   private final FileUploads uploadedFiles;
--- End diff --

This could also be a `Collection`


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196559093
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest {
--- End diff --

Missing `extends TestLogger`


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196554025
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java 
---
@@ -103,77 +104,74 @@ protected void respondAsLeader(ChannelHandlerContext 
ctx, RoutedRequest routedRe
return;
}
 
-   ByteBuf msgContent = ((FullHttpRequest) 
httpRequest).content();
+   final ByteBuf msgContent;
+   Optional multipartJsonPayload = 
FileUploadHandler.getMultipartJsonPayload(ctx);
+   if (multipartJsonPayload.isPresent()) {
+   msgContent = 
Unpooled.wrappedBuffer(multipartJsonPayload.get());
--- End diff --

Let's send the Json payload as a proper `HttpRequest`, then we don't have 
this special casing here.


---


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517481#comment-16517481
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196452583
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -116,5 +136,16 @@ private void reset() {
currentHttpPostRequestDecoder.destroy();
currentHttpPostRequestDecoder = null;
currentHttpRequest = null;
+   currentUploadDir = null;
+   currentJsonPayload = null;
+   }
+
+   public static Optional 
getMultipartJsonPayload(ChannelHandlerContext ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get());
+   }
+
+   public static FileUploads getMultipartFileUploads(ChannelHandlerContext 
ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get())
+   .orElse(FileUploads.EMPTY);
--- End diff --

I would suggest to simply return the upload directory.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517477#comment-16517477
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196452755
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -95,14 +107,22 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
checkState(fileUpload.isCompleted());
 
-   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
-   "_" + 
fileUpload.getFilename()));
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
fileUpload.renameTo(dest.toFile());
-   
ctx.channel().attr(UPLOADED_FILE).set(dest);
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = (Attribute) 
data;
+   // this could also be implemented by 
using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received unknown 
attribute {}, will be ignored.", data.getName());
--- End diff --

Should we rather fail?


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196557260
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
 ---
@@ -39,15 +41,21 @@
 public class HandlerRequest {
 
private final R requestBody;
+   private final FileUploads uploadedFiles;
--- End diff --

This could also be a `Collection`


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196452418
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -116,5 +136,16 @@ private void reset() {
currentHttpPostRequestDecoder.destroy();
currentHttpPostRequestDecoder = null;
currentHttpRequest = null;
+   currentUploadDir = null;
+   currentJsonPayload = null;
+   }
+
+   public static Optional 
getMultipartJsonPayload(ChannelHandlerContext ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get());
+   }
--- End diff --

By sending the json payload down stream, we could avoid having this method.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196560163
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest {
+
+   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static RestServerEndpoint serverEndpoint;
+   private static String serverAddress;
+
+   private static MultipartMixedHandler mixedHandler;
+   private static MultipartJsonHandler jsonHandler;
+   private static MultipartFileHandler fileHandler;
+   private static File file1;
+   private static File file2;
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196555487
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   final Collection directories = new ArrayList<>(1);
+   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
+   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
+   if (Files.isDirectory(fileOrDirectory)) {
+   directories.add(fileOrDirectory);
+   FileAdderVisitor visitor = new 
FileAdderVisitor();
+   Files.walkFileTree(fileOrDirectory, visitor);
+   files.addAll(visitor.get());
+   } else {
+   files.add(fileOrDirectory);
+   }
+   }
+   directoriesToClean = 
Collections.unmodifiableCollection(directories);
+   uploadedFiles = Collections.unmodifiableCollection(files);
+   }
+
+   public Collection getUploadedFiles() {
+   return uploadedFiles;
+   }
+
+   @Override
+   public void close() throws IOException {
+   for (Path file : uploadedFiles) {
+   try {
+   Files.delete(file);
+   } catch (FileNotFoundException ignored) {
+   // file may have been moved by a handler
+   }
+   }
+   for (Path directory : directoriesToClean) {
+   Files.walkFileTree(directory, CleanupFileVisitor.get());
+   }
+   }
+
+   private static final class FileAdderVisitor extends 
SimpleFileVisitor {
+
+   private final Collection files = new ArrayList<>(4);
+
+   Collection get() {
+   return files;
+   }
+
+   FileAdderVisitor() {
+   }
+
+   @Override
+   public FileVisitResult visitFile(Path file, BasicFileAttributes 
attrs) throws IOException {
+   FileVisitResult result = super.visitFile(file, attrs);
+   files.add(file);
+   return result;
+   }
+   }
+
+   private static final class CleanupFileVisitor extends 
SimpleFileVisitor {
--- End diff --

I think it would be better to make this an enum. Then we get all singleton 
properties for free.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196455211
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java 
---
@@ -103,77 +104,74 @@ protected void respondAsLeader(ChannelHandlerContext 
ctx, RoutedRequest routedRe
return;
}
 
-   ByteBuf msgContent = ((FullHttpRequest) 
httpRequest).content();
+   final ByteBuf msgContent;
+   Optional multipartJsonPayload = 
FileUploadHandler.getMultipartJsonPayload(ctx);
+   if (multipartJsonPayload.isPresent()) {
+   msgContent = 
Unpooled.wrappedBuffer(multipartJsonPayload.get());
+   } else {
+   msgContent = ((FullHttpRequest) 
httpRequest).content();
+   }
 
-   R request;
-   if (isFileUpload()) {
-   final Path path = 
ctx.channel().attr(FileUploadHandler.UPLOADED_FILE).get();
-   if (path == null) {
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   httpRequest,
-   new ErrorResponseBody("Client 
did not upload a file."),
-   HttpResponseStatus.BAD_REQUEST,
-   responseHeaders);
-   return;
-   }
-   //noinspection unchecked
-   request = (R) new FileUpload(path);
-   } else if (msgContent.capacity() == 0) {
-   try {
-   request = MAPPER.readValue("{}", 
untypedResponseMessageHeaders.getRequestClass());
-   } catch (JsonParseException | 
JsonMappingException je) {
-   log.error("Request did not conform to 
expected format.", je);
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   httpRequest,
-   new ErrorResponseBody("Bad 
request received."),
-   HttpResponseStatus.BAD_REQUEST,
-   responseHeaders);
-   return;
+   try (FileUploads uploadedFiles = 
FileUploadHandler.getMultipartFileUploads(ctx)) {
--- End diff --

I would obtain the upload directory from `FileUploadHandler` and simply 
delete this directory after the call has been processed. We could, then also 
create `FileUploads` outside of the `FileUploadHandler` to instantiate a 
`HandlerRequest` with it. This would also simplify the `FileUploads` class 
significantly, because it is no longer responsible for deleting the files.


---


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517478#comment-16517478
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196453235
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -95,14 +107,22 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
checkState(fileUpload.isCompleted());
 
-   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
-   "_" + 
fileUpload.getFilename()));
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
fileUpload.renameTo(dest.toFile());
-   
ctx.channel().attr(UPLOADED_FILE).set(dest);
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = (Attribute) 
data;
+   // this could also be implemented by 
using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received unknown 
attribute {}, will be ignored.", data.getName());
+   }
}
}
 
if (httpContent instanceof LastHttpContent) {
+   ctx.channel().attr(UPLOADED_FILES).set(new 
FileUploads(Collections.singleton(currentUploadDir)));
--- End diff --

I would suggest to simply store the upload directory in the `UPLOAD_FILES` 
attribute.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r19670
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   final Collection directories = new ArrayList<>(1);
+   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
+   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
+   if (Files.isDirectory(fileOrDirectory)) {
+   directories.add(fileOrDirectory);
+   FileAdderVisitor visitor = new 
FileAdderVisitor();
+   Files.walkFileTree(fileOrDirectory, visitor);
+   files.addAll(visitor.get());
+   } else {
+   files.add(fileOrDirectory);
+   }
+   }
+   directoriesToClean = 
Collections.unmodifiableCollection(directories);
+   uploadedFiles = Collections.unmodifiableCollection(files);
+   }
+
+   public Collection getUploadedFiles() {
+   return uploadedFiles;
+   }
+
+   @Override
+   public void close() throws IOException {
+   for (Path file : uploadedFiles) {
+   try {
+   Files.delete(file);
+   } catch (FileNotFoundException ignored) {
+   // file may have been moved by a handler
+   }
+   }
+   for (Path directory : directoriesToClean) {
+   Files.walkFileTree(directory, CleanupFileVisitor.get());
+   }
+   }
+
+   private static final class FileAdderVisitor extends 
SimpleFileVisitor {
+
+   private final Collection files = new ArrayList<>(4);
+
+   Collection get() {
--- End diff --

maybe more descriptive name than `get`.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196452755
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -95,14 +107,22 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
checkState(fileUpload.isCompleted());
 
-   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
-   "_" + 
fileUpload.getFilename()));
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
fileUpload.renameTo(dest.toFile());
-   
ctx.channel().attr(UPLOADED_FILE).set(dest);
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = (Attribute) 
data;
+   // this could also be implemented by 
using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received unknown 
attribute {}, will be ignored.", data.getName());
--- End diff --

Should we rather fail?


---


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517486#comment-16517486
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196555487
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   final Collection directories = new ArrayList<>(1);
+   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
+   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
+   if (Files.isDirectory(fileOrDirectory)) {
+   directories.add(fileOrDirectory);
+   FileAdderVisitor visitor = new 
FileAdderVisitor();
+   Files.walkFileTree(fileOrDirectory, visitor);
+   files.addAll(visitor.get());
+   } else {
+   files.add(fileOrDirectory);
+   }
+   }
+   directoriesToClean = 
Collections.unmodifiableCollection(directories);
+   uploadedFiles = Collections.unmodifiableCollection(files);
+   }
+
+   public Collection getUploadedFiles() {
+   return uploadedFiles;
+   }
+
+   @Override
+   public void close() throws IOException {
+   for (Path file : uploadedFiles) {
+   try {
+   Files.delete(file);
+   } catch (FileNotFoundException ignored) {
+   // file may have been moved by a handler
+   }
+   }
+   for (Path directory : directoriesToClean) {
+   Files.walkFileTree(directory, CleanupFileVisitor.get());
+   }
+   }
+
+   private static final class FileAdderVisitor extends 
SimpleFileVisitor {
+
+   private final Collection files = new ArrayList<>(4);
+
+   Collection get() {
+   return files;
+   }
+
+   FileAdderVisitor() {
+   }
+
+   @Override
+   public FileVisitResult visitFile(Path file, BasicFileAttributes 
attrs) throws IOException {
+   FileVisitResult result = super.visitFile(file, attrs);
+   files.add(file);
+   return result;
+   }
+   }

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517483#comment-16517483
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196455211
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java 
---
@@ -103,77 +104,74 @@ protected void respondAsLeader(ChannelHandlerContext 
ctx, RoutedRequest routedRe
return;
}
 
-   ByteBuf msgContent = ((FullHttpRequest) 
httpRequest).content();
+   final ByteBuf msgContent;
+   Optional multipartJsonPayload = 
FileUploadHandler.getMultipartJsonPayload(ctx);
+   if (multipartJsonPayload.isPresent()) {
+   msgContent = 
Unpooled.wrappedBuffer(multipartJsonPayload.get());
+   } else {
+   msgContent = ((FullHttpRequest) 
httpRequest).content();
+   }
 
-   R request;
-   if (isFileUpload()) {
-   final Path path = 
ctx.channel().attr(FileUploadHandler.UPLOADED_FILE).get();
-   if (path == null) {
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   httpRequest,
-   new ErrorResponseBody("Client 
did not upload a file."),
-   HttpResponseStatus.BAD_REQUEST,
-   responseHeaders);
-   return;
-   }
-   //noinspection unchecked
-   request = (R) new FileUpload(path);
-   } else if (msgContent.capacity() == 0) {
-   try {
-   request = MAPPER.readValue("{}", 
untypedResponseMessageHeaders.getRequestClass());
-   } catch (JsonParseException | 
JsonMappingException je) {
-   log.error("Request did not conform to 
expected format.", je);
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   httpRequest,
-   new ErrorResponseBody("Bad 
request received."),
-   HttpResponseStatus.BAD_REQUEST,
-   responseHeaders);
-   return;
+   try (FileUploads uploadedFiles = 
FileUploadHandler.getMultipartFileUploads(ctx)) {
--- End diff --

I would obtain the upload directory from `FileUploadHandler` and simply 
delete this directory after the call has been processed. We could, then also 
create `FileUploads` outside of the `FileUploadHandler` to instantiate a 
`HandlerRequest` with it. This would also simplify the `FileUploads` class 
significantly, because it is no longer responsible for deleting the files.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196452583
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -116,5 +136,16 @@ private void reset() {
currentHttpPostRequestDecoder.destroy();
currentHttpPostRequestDecoder = null;
currentHttpRequest = null;
+   currentUploadDir = null;
+   currentJsonPayload = null;
+   }
+
+   public static Optional 
getMultipartJsonPayload(ChannelHandlerContext ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get());
+   }
+
+   public static FileUploads getMultipartFileUploads(ChannelHandlerContext 
ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get())
+   .orElse(FileUploads.EMPTY);
--- End diff --

I would suggest to simply return the upload directory.


---


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517487#comment-16517487
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r19670
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   final Collection directories = new ArrayList<>(1);
+   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
+   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
+   if (Files.isDirectory(fileOrDirectory)) {
+   directories.add(fileOrDirectory);
+   FileAdderVisitor visitor = new 
FileAdderVisitor();
+   Files.walkFileTree(fileOrDirectory, visitor);
+   files.addAll(visitor.get());
+   } else {
+   files.add(fileOrDirectory);
+   }
+   }
+   directoriesToClean = 
Collections.unmodifiableCollection(directories);
+   uploadedFiles = Collections.unmodifiableCollection(files);
+   }
+
+   public Collection getUploadedFiles() {
+   return uploadedFiles;
+   }
+
+   @Override
+   public void close() throws IOException {
+   for (Path file : uploadedFiles) {
+   try {
+   Files.delete(file);
+   } catch (FileNotFoundException ignored) {
+   // file may have been moved by a handler
+   }
+   }
+   for (Path directory : directoriesToClean) {
+   Files.walkFileTree(directory, CleanupFileVisitor.get());
+   }
+   }
+
+   private static final class FileAdderVisitor extends 
SimpleFileVisitor {
+
+   private final Collection files = new ArrayList<>(4);
+
+   Collection get() {
--- End diff --

maybe more descriptive name than `get`.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: 

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517491#comment-16517491
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196558949
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
 ---
@@ -63,4 +63,13 @@
 * @return description for the header
 */
String getDescription();
+
+   /**
+* Returns whether this header allows file uploads.
+*
+* @return whether this header allows file uploads
+*/
+   default boolean acceptsFileUploads() {
+   return false;
+   }
--- End diff --

Should this maybe go into `UntypedResponseMessageHeaders`? At the moment 
one can upload files for a `AbstractHandler` (e.g. 
`AbstractTaskManagerFileHandler`) implementation and also has access to it via 
the `HandlerRequest` without being able to specify whether file upload is 
allowed or not. 


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517488#comment-16517488
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196559335
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest {
+
+   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static RestServerEndpoint serverEndpoint;
+   private static String serverAddress;
+
+   private static MultipartMixedHandler mixedHandler;
+   private static 

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196453584
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   final Collection directories = new ArrayList<>(1);
+   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
+   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
+   if (Files.isDirectory(fileOrDirectory)) {
+   directories.add(fileOrDirectory);
+   FileAdderVisitor visitor = new 
FileAdderVisitor();
+   Files.walkFileTree(fileOrDirectory, visitor);
+   files.addAll(visitor.get());
+   } else {
+   files.add(fileOrDirectory);
+   }
+   }
+   directoriesToClean = 
Collections.unmodifiableCollection(directories);
+   uploadedFiles = Collections.unmodifiableCollection(files);
--- End diff --

Let's move this logic out of `FileUploads` and simply initialize it with a 
`Collection`.


---


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517485#comment-16517485
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196554025
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java 
---
@@ -103,77 +104,74 @@ protected void respondAsLeader(ChannelHandlerContext 
ctx, RoutedRequest routedRe
return;
}
 
-   ByteBuf msgContent = ((FullHttpRequest) 
httpRequest).content();
+   final ByteBuf msgContent;
+   Optional multipartJsonPayload = 
FileUploadHandler.getMultipartJsonPayload(ctx);
+   if (multipartJsonPayload.isPresent()) {
+   msgContent = 
Unpooled.wrappedBuffer(multipartJsonPayload.get());
--- End diff --

Let's send the Json payload as a proper `HttpRequest`, then we don't have 
this special casing here.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517484#comment-16517484
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196453980
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
 ---
@@ -129,4 +137,9 @@ public R getRequestBody() {
return queryParameter.getValue();
}
}
+
+   @Nonnull
+   public FileUploads getFileUploads() {
+   return uploadedFiles;
+   }
--- End diff --

I would not expose `FileUploads` to the user but rather return a 
`Collection`.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517493#comment-16517493
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196560163
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest {
+
+   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static RestServerEndpoint serverEndpoint;
+   private static String serverAddress;
+
+   private static MultipartMixedHandler mixedHandler;
+   private static 

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517482#comment-16517482
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196453584
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   final Collection directories = new ArrayList<>(1);
+   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
+   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
+   if (Files.isDirectory(fileOrDirectory)) {
+   directories.add(fileOrDirectory);
+   FileAdderVisitor visitor = new 
FileAdderVisitor();
+   Files.walkFileTree(fileOrDirectory, visitor);
+   files.addAll(visitor.get());
+   } else {
+   files.add(fileOrDirectory);
+   }
+   }
+   directoriesToClean = 
Collections.unmodifiableCollection(directories);
+   uploadedFiles = Collections.unmodifiableCollection(files);
--- End diff --

Let's move this logic out of `FileUploads` and simply initialize it with a 
`Collection`.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196452024
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -95,14 +107,22 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
checkState(fileUpload.isCompleted());
 
-   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
-   "_" + 
fileUpload.getFilename()));
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
fileUpload.renameTo(dest.toFile());
-   
ctx.channel().attr(UPLOADED_FILE).set(dest);
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = (Attribute) 
data;
+   // this could also be implemented by 
using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received unknown 
attribute {}, will be ignored.", data.getName());
+   }
}
}
 
if (httpContent instanceof LastHttpContent) {
+   ctx.channel().attr(UPLOADED_FILES).set(new 
FileUploads(Collections.singleton(currentUploadDir)));
+   
ctx.channel().attr(UPLOADED_JSON).set(currentJsonPayload);
--- End diff --

I think it would be better to not store the JSON payload as an `Attribute` 
but instead forward it via 
`httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload))`.


---


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517476#comment-16517476
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196452024
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -95,14 +107,22 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
checkState(fileUpload.isCompleted());
 
-   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
-   "_" + 
fileUpload.getFilename()));
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
fileUpload.renameTo(dest.toFile());
-   
ctx.channel().attr(UPLOADED_FILE).set(dest);
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = (Attribute) 
data;
+   // this could also be implemented by 
using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received unknown 
attribute {}, will be ignored.", data.getName());
+   }
}
}
 
if (httpContent instanceof LastHttpContent) {
+   ctx.channel().attr(UPLOADED_FILES).set(new 
FileUploads(Collections.singleton(currentUploadDir)));
+   
ctx.channel().attr(UPLOADED_JSON).set(currentJsonPayload);
--- End diff --

I think it would be better to not store the JSON payload as an `Attribute` 
but instead forward it via 
`httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload))`.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196453235
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -95,14 +107,22 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
checkState(fileUpload.isCompleted());
 
-   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
-   "_" + 
fileUpload.getFilename()));
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
fileUpload.renameTo(dest.toFile());
-   
ctx.channel().attr(UPLOADED_FILE).set(dest);
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = (Attribute) 
data;
+   // this could also be implemented by 
using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received unknown 
attribute {}, will be ignored.", data.getName());
+   }
}
}
 
if (httpContent instanceof LastHttpContent) {
+   ctx.channel().attr(UPLOADED_FILES).set(new 
FileUploads(Collections.singleton(currentUploadDir)));
--- End diff --

I would suggest to simply store the upload directory in the `UPLOAD_FILES` 
attribute.


---


[jira] [Commented] (FLINK-4542) Add MULTISET operations

2018-06-19 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517283#comment-16517283
 ] 

Timo Walther commented on FLINK-4542:
-

Hi [~Sergey Nuyanzin],

I thought Calcite has support (at least parsing and validation) for the 
operations mentioned in the issue, because they are listed in 
{{org.apache.calcite.sql.fun.SqlStdOperatorTable}}. The multiset type is 
supported in Flink SQL (this as been introduced as part of FLINK-7491). 
However, creating literals using {{SELECT MULTISET(...)}} is not. For this we 
need to implement the value constructor call 
{{SqlStdOperatorTable.MULTISET_VALUE}}. I hope Calcite is releasing 1.17 soon 
that we are not blocked on this. Feel free to split this issue in multiple 
subtasks and open PRs for functions that we can already support and functions 
that are blocked.

> Add MULTISET operations
> ---
>
> Key: FLINK-4542
> URL: https://issues.apache.org/jira/browse/FLINK-4542
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Sergey Nuyanzin
>Priority: Minor
>
> Umbrella issue for MULTISET operations like:
> MULTISET UNION, MULTISET UNION ALL, MULTISET EXCEPT, MULTISET EXCEPT ALL, 
> MULTISET INTERSECT, MULTISET INTERSECT ALL, CARDINALITY, ELEMENT, MEMBER OF, 
> SUBMULTISET OF, IS A SET, FUSION
> At the moment we only support COLLECT.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8921) Split code generated call expression

2018-06-19 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517268#comment-16517268
 ] 

Timo Walther commented on FLINK-8921:
-

[~RuidongLi] what is the status of this issue. Will you find time to work on 
it? Users are asking for this feature on the mailing list.

> Split code generated call expression 
> -
>
> Key: FLINK-8921
> URL: https://issues.apache.org/jira/browse/FLINK-8921
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Ruidong Li
>Priority: Major
>
> In FLINK-8274 we introduced the possibility of splitting the generated code 
> into multiple methods in order to exceed the JVMs maximum method size (see 
> also https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-4.html#jvms-4.9).
> At the moment we only split methods by fields, however, this is not enough in 
> all case. We should also split expressions. I suggest to split the operands 
> of a {{RexCall}} in 
> {{org.apache.flink.table.codegen.CodeGenerator#visitCall}} if we reach a 
> certain threshold. However, this should happen as lazily as possible to keep 
> the runtime overhead low.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5777: [FLINK-7897] Consider using nio.Files for file deletion i...

2018-06-19 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5777
  
@StefanRRichter , does this PR look good to you?


---


[jira] [Commented] (FLINK-7897) Consider using nio.Files for file deletion in TransientBlobCleanupTask

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517233#comment-16517233
 ] 

ASF GitHub Bot commented on FLINK-7897:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5777
  
@StefanRRichter , does this PR look good to you?


> Consider using nio.Files for file deletion in TransientBlobCleanupTask
> --
>
> Key: FLINK-7897
> URL: https://issues.apache.org/jira/browse/FLINK-7897
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> nio.Files#delete() provides better clue as to why the deletion may fail:
> https://docs.oracle.com/javase/7/docs/api/java/nio/file/Files.html#delete(java.nio.file.Path)
> Depending on the potential exception (FileNotFound), the call to 
> localFile.exists() may be skipped.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9352) In Standalone checkpoint recover mode many jobs with same checkpoint interval cause IO pressure

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517230#comment-16517230
 ] 

ASF GitHub Bot commented on FLINK-9352:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6092
  
cc @zentol @kl0u 


> In Standalone checkpoint recover mode many jobs with same checkpoint interval 
> cause IO pressure
> ---
>
> Key: FLINK-9352
> URL: https://issues.apache.org/jira/browse/FLINK-9352
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> currently, the periodic checkpoint coordinator startCheckpointScheduler uses 
> *baseInterval* as the initialDelay parameter. the *baseInterval* is also the 
> checkpoint interval. 
> In standalone checkpoint mode, many jobs config the same checkpoint interval. 
> When all jobs being recovered (the cluster restart or jobmanager leadership 
> switched), all jobs' checkpoint period will tend to accordance. All jobs' 
> CheckpointCoordinator would start and trigger in a approximate time point.
> This caused the high IO cost in the same time period in our production 
> scenario.
> I suggest let the scheduleAtFixedRate's initial delay parameter as a API 
> config which can let user scatter checkpoint in this scenario.
>  
> cc [~StephanEwen] [~Zentol]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6092: [FLINK-9352] In Standalone checkpoint recover mode many j...

2018-06-19 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6092
  
cc @zentol @kl0u 


---


[jira] [Created] (FLINK-9617) Provide alias for whole records in Table API

2018-06-19 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-9617:
-

 Summary: Provide alias for whole records in Table API
 Key: FLINK-9617
 URL: https://issues.apache.org/jira/browse/FLINK-9617
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Affects Versions: 1.5.0
Reporter: Piotr Nowojski


In SQL we can provide an alias for whole table to avoid column name collisions 
between two tables. For example:
{code:java}
SELECT
  SUM(o.amount * r.rate) 
FROM 
  Orders AS o, 
  Rates AS r
WHERE r.currency = o.currency{code}
However that's not possible in table API. In Table API user have to provide 
aliases for all of the columns, which can be annoying especially if table 
consists of tens or even hundred of columns

For example I would expect some feature like this:
{code:java}
val result = orders.as('o)
  .join(rates(`o.rowtime).as('r), "o.currency = r.currency")
  .select("SUM(o.amount * r.rate) AS amount")
{code}
where \{{rates}} is a TableFunction.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9616) DatadogHttpReporter fails to be created due to missing shaded dependency

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517205#comment-16517205
 ] 

ASF GitHub Bot commented on FLINK-9616:
---

GitHub user addisonj opened a pull request:

https://github.com/apache/flink/pull/6183

[FLINK-9616][metrics] Fix datadog to include shaded deps


## What is the purpose of the change

This fixes a broken build that wasn't properly including a shaded in the 
jar it builds. This causes the instantiation of DatadogHttpReporter to fail and 
with no easy way to fix since the dependencies exist on a shaded import path.

## Brief change log

- Changes 


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

However, it can be validated by:
```
cd flink-metrics/flink-metrics-datadog
mvn package
jar tf target/flink-metrics-datadog-1.6-SNAPSHOT.jar
```

And then seeing the expected okhttp3 and okio dependencies being included 
in the resulting jar.


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): yes, but brings in 
line with documented behavior here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#datadog-orgapacheflinkmetricsdatadogdatadoghttpreporter
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/instructure/flink datadog_fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6183.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6183


commit 10fe56a9adbe6f35dc4b5fae0e7478e99028f5f7
Author: Addison Higham 
Date:   2018-06-19T14:49:56Z

[FLINK-9616] Fix datadog to include shaded deps

flink-metrics-datadog wasn't properly included it's shaded dependencies
in the jar it builds.

Looking at other places where shaded dependecies are used, it seems like
the build wasn't working as intended.




> DatadogHttpReporter fails to be created due to missing shaded dependency
> 
>
> Key: FLINK-9616
> URL: https://issues.apache.org/jira/browse/FLINK-9616
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0
>Reporter: Addison Higham
>Priority: Major
>
> When using the DatadogHttpReporter, it fails to instantiate with the 
> following exception:
> {code:java}
> 2018-06-19 06:01:19,640 INFO 
> org.apache.flink.runtime.metrics.MetricRegistryImpl - Configuring dghttp with 
> {apikey=, tags=, 
> class=org.apache.flink.metrics.datadog.DatadogHttpReporter}.
> 2018-06-19 06:01:19,642 ERROR 
> org.apache.flink.runtime.metrics.MetricRegistryImpl - Could not instantiate 
> metrics reporter dghttp. Metrics might not be exposed/reported.
> java.lang.NoClassDefFoundError: org/apache/flink/shaded/okhttp3/MediaType
> at 
> org.apache.flink.metrics.datadog.DatadogHttpClient.(DatadogHttpClient.java:45)
> at 
> org.apache.flink.metrics.datadog.DatadogHttpReporter.open(DatadogHttpReporter.java:105)
> at 
> org.apache.flink.runtime.metrics.MetricRegistryImpl.(MetricRegistryImpl.java:150)
> at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:413)
> at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:274)
> at 
> org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.initializeServices(MesosSessionClusterEntrypoint.java:92)
> at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:225)
> at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:189)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1889)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at 
> 

[GitHub] flink pull request #6183: [FLINK-9616][metrics] Fix datadog to include shade...

2018-06-19 Thread addisonj
GitHub user addisonj opened a pull request:

https://github.com/apache/flink/pull/6183

[FLINK-9616][metrics] Fix datadog to include shaded deps


## What is the purpose of the change

This fixes a broken build that wasn't properly including a shaded in the 
jar it builds. This causes the instantiation of DatadogHttpReporter to fail and 
with no easy way to fix since the dependencies exist on a shaded import path.

## Brief change log

- Changes 


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

However, it can be validated by:
```
cd flink-metrics/flink-metrics-datadog
mvn package
jar tf target/flink-metrics-datadog-1.6-SNAPSHOT.jar
```

And then seeing the expected okhttp3 and okio dependencies being included 
in the resulting jar.


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): yes, but brings in 
line with documented behavior here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#datadog-orgapacheflinkmetricsdatadogdatadoghttpreporter
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/instructure/flink datadog_fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6183.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6183


commit 10fe56a9adbe6f35dc4b5fae0e7478e99028f5f7
Author: Addison Higham 
Date:   2018-06-19T14:49:56Z

[FLINK-9616] Fix datadog to include shaded deps

flink-metrics-datadog wasn't properly included it's shaded dependencies
in the jar it builds.

Looking at other places where shaded dependecies are used, it seems like
the build wasn't working as intended.




---


[jira] [Created] (FLINK-9616) DatadogHttpReporter fails to be created due to missing shaded dependency

2018-06-19 Thread Addison Higham (JIRA)
Addison Higham created FLINK-9616:
-

 Summary: DatadogHttpReporter fails to be created due to missing 
shaded dependency
 Key: FLINK-9616
 URL: https://issues.apache.org/jira/browse/FLINK-9616
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.5.0
Reporter: Addison Higham


When using the DatadogHttpReporter, it fails to instantiate with the following 
exception:


{code:java}
2018-06-19 06:01:19,640 INFO 
org.apache.flink.runtime.metrics.MetricRegistryImpl - Configuring dghttp with 
{apikey=, tags=, 
class=org.apache.flink.metrics.datadog.DatadogHttpReporter}.
2018-06-19 06:01:19,642 ERROR 
org.apache.flink.runtime.metrics.MetricRegistryImpl - Could not instantiate 
metrics reporter dghttp. Metrics might not be exposed/reported.
java.lang.NoClassDefFoundError: org/apache/flink/shaded/okhttp3/MediaType
at 
org.apache.flink.metrics.datadog.DatadogHttpClient.(DatadogHttpClient.java:45)
at 
org.apache.flink.metrics.datadog.DatadogHttpReporter.open(DatadogHttpReporter.java:105)
at 
org.apache.flink.runtime.metrics.MetricRegistryImpl.(MetricRegistryImpl.java:150)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:413)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:274)
at 
org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.initializeServices(MesosSessionClusterEntrypoint.java:92)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:225)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:189)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1889)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:188)
at 
org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.main(MesosSessionClusterEntrypoint.java:181)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.shaded.okhttp3.MediaType
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 14 more

{code}
Looking at the pom.xml for `flink-metrics-datadog` it looks like that 
dependency is intended to be shaded and included in the jar, however, when we 
build the jar we see the following lines:

 
{noformat}
$ mvn package
[INFO] Scanning for projects...
[INFO] 
[INFO] 
[INFO] Building flink-metrics-datadog 1.5.0
[INFO] 


[INFO] --- maven-shade-plugin:3.0.0:shade (shade-flink) @ flink-metrics-datadog 
---
[INFO] Excluding com.squareup.okhttp3:okhttp:jar:3.7.0 from the shaded jar.
[INFO] Excluding com.squareup.okio:okio:jar:1.12.0 from the shaded jar.
[INFO] Including org.apache.flink:force-shading:jar:1.5.0 in the shaded jar.
[INFO] Replacing original artifact with shaded artifact.
{noformat}
And inspecting the built jar:
{noformat}
$ jar tf flink-metrics-datadog-1.5.0.jar
META-INF/
META-INF/MANIFEST.MF
org/
org/apache/
org/apache/flink/
org/apache/flink/metrics/
org/apache/flink/metrics/datadog/
org/apache/flink/metrics/datadog/DatadogHttpClient$EmptyCallback.class
org/apache/flink/metrics/datadog/DMetric.class
org/apache/flink/metrics/datadog/DSeries.class
org/apache/flink/metrics/datadog/DGauge.class
org/apache/flink/metrics/datadog/DatadogHttpReporter.class
org/apache/flink/metrics/datadog/DatadogHttpClient.class
org/apache/flink/metrics/datadog/MetricType.class
org/apache/flink/metrics/datadog/DatadogHttpReporter$DatadogHttpRequest.class
org/apache/flink/metrics/datadog/DMeter.class
org/apache/flink/metrics/datadog/DCounter.class
META-INF/DEPENDENCIES
META-INF/maven/
META-INF/maven/org.apache.flink/
META-INF/maven/org.apache.flink/flink-metrics-datadog/
META-INF/maven/org.apache.flink/flink-metrics-datadog/pom.xml
META-INF/maven/org.apache.flink/flink-metrics-datadog/pom.properties
META-INF/NOTICE
{noformat}
We don't see the included dependencies

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8795) Scala shell broken for Flip6

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517150#comment-16517150
 ] 

ASF GitHub Bot commented on FLINK-8795:
---

GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/6182

[FLINK-8795] Fixed local scala shell for Flip6

## What is the purpose of the change

Enable to run scala-shell in local mode with new runtime mode.

## Brief change log

- Creating either MiniCluster or StandaloneMiniCluster based on the mode.

## Verifying this change

*(Please pick either of the following options)*

Changed tests to run both on new and legacy runtime.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no /** don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink FLINK-8795

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6182.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6182


commit 6fa4aa18c2d111a846386fed27354bd7a1da2a0b
Author: Dawid Wysakowicz 
Date:   2018-06-19T07:49:31Z

[FLINK-8795] Fixed local scala shell for Flip6




> Scala shell broken for Flip6
> 
>
> Key: FLINK-8795
> URL: https://issues.apache.org/jira/browse/FLINK-8795
> Project: Flink
>  Issue Type: Bug
>Reporter: kant kodali
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> I am trying to run the simple code below after building everything from 
> Flink's github master branch for various reasons. I get an exception below 
> and I wonder what runs on port 9065? and How to fix this exception?
> I followed the instructions from the Flink master branch so I did the 
> following.
> {code:java}
> git clone https://github.com/apache/flink.git 
> cd flink mvn clean package -DskipTests 
> cd build-target
>  ./bin/start-scala-shell.sh local{code}
> {{And Here is the code I ran}}
> {code:java}
> val dataStream = senv.fromElements(1, 2, 3, 4)
> dataStream.countWindowAll(2).sum(0).print()
> senv.execute("My streaming program"){code}
> {{And I finally get this exception}}
> {code:java}
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
> submit JobGraph. at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$18(RestClusterClient.java:306)
>  at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>  at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$222(RestClient.java:196)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>  at 
> 

[GitHub] flink pull request #6182: [FLINK-8795] Fixed local scala shell for Flip6

2018-06-19 Thread dawidwys
GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/6182

[FLINK-8795] Fixed local scala shell for Flip6

## What is the purpose of the change

Enable to run scala-shell in local mode with new runtime mode.

## Brief change log

- Creating either MiniCluster or StandaloneMiniCluster based on the mode.

## Verifying this change

*(Please pick either of the following options)*

Changed tests to run both on new and legacy runtime.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no /** don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink FLINK-8795

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6182.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6182


commit 6fa4aa18c2d111a846386fed27354bd7a1da2a0b
Author: Dawid Wysakowicz 
Date:   2018-06-19T07:49:31Z

[FLINK-8795] Fixed local scala shell for Flip6




---


[jira] [Commented] (FLINK-9615) Add possibility to use Connectors as Sinks in SQL Client

2018-06-19 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517128#comment-16517128
 ] 

Fabian Hueske commented on FLINK-9615:
--

Thanks for opening this issue!
This is (partially) a duplicate of FLINK-8858.

> Add possibility to use Connectors as Sinks in SQL Client
> 
>
> Key: FLINK-9615
> URL: https://issues.apache.org/jira/browse/FLINK-9615
> Project: Flink
>  Issue Type: Improvement
>Reporter: Dominik Wosiński
>Priority: Major
>
> AFAIK, there is currently no possibility to use Kafka or other connectors as 
> a sink in SQL Client. Such feature would be good for prototyping or quick 
> streams manipulation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9610) Add Kafka partitioner that uses the key to partition by

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517115#comment-16517115
 ] 

ASF GitHub Bot commented on FLINK-9610:
---

Github user nielsbasjes commented on a diff in the pull request:

https://github.com/apache/flink/pull/6181#discussion_r196435400
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKeyHashPartitioner.java
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+
+/**
+ * A partitioner that uses the hash of the provided key to distribute
+ * the values over the partitions as evenly as possible.
+ * This partitioner ensures that all records with the same key will be 
sent to
+ * the same Kafka partition.
+ *
+ * Note that this will cause a lot of network connections to be created 
between
+ * all the Flink instances and all the Kafka brokers.
+ */
+@PublicEvolving
+public class FlinkKeyHashPartitioner extends FlinkKafkaPartitioner {
+
+   private static final long serialVersionUID = -2006468063065010594L;
+
+   @Override
+   public int partition(T record, byte[] key, byte[] value, String 
targetTopic, int[] partitions) {
+   Preconditions.checkArgument(
+   partitions != null && partitions.length > 0,
+   "Partitions of the target topic is empty.");
+
+   return partitions[hash(key) % partitions.length];
--- End diff --

Yes, good catch. I fixed that just now.


> Add Kafka partitioner that uses the key to partition by
> ---
>
> Key: FLINK-9610
> URL: https://issues.apache.org/jira/browse/FLINK-9610
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Major
>
> The kafka connector package only contains the FlinkFixedPartitioner 
> implementation of the FlinkKafkaPartitioner.
> The most common usecase I have seen is the need to spread the records across 
> the Kafka partitions while keeping all messages with the same key together.
> I'll put up a pull request with a very simple implementation that should make 
> this a lot easier for others to use and extend.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafk...

2018-06-19 Thread nielsbasjes
Github user nielsbasjes commented on a diff in the pull request:

https://github.com/apache/flink/pull/6181#discussion_r196435400
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKeyHashPartitioner.java
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+
+/**
+ * A partitioner that uses the hash of the provided key to distribute
+ * the values over the partitions as evenly as possible.
+ * This partitioner ensures that all records with the same key will be 
sent to
+ * the same Kafka partition.
+ *
+ * Note that this will cause a lot of network connections to be created 
between
+ * all the Flink instances and all the Kafka brokers.
+ */
+@PublicEvolving
+public class FlinkKeyHashPartitioner extends FlinkKafkaPartitioner {
+
+   private static final long serialVersionUID = -2006468063065010594L;
+
+   @Override
+   public int partition(T record, byte[] key, byte[] value, String 
targetTopic, int[] partitions) {
+   Preconditions.checkArgument(
+   partitions != null && partitions.length > 0,
+   "Partitions of the target topic is empty.");
+
+   return partitions[hash(key) % partitions.length];
--- End diff --

Yes, good catch. I fixed that just now.


---


[jira] [Issue Comment Deleted] (FLINK-9615) Add possibility to use Connectors as Sinks in SQL Client

2018-06-19 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/FLINK-9615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dominik Wosiński updated FLINK-9615:

Comment: was deleted

(was: I can take care of that :) It's not a big improvement to be made.)

> Add possibility to use Connectors as Sinks in SQL Client
> 
>
> Key: FLINK-9615
> URL: https://issues.apache.org/jira/browse/FLINK-9615
> Project: Flink
>  Issue Type: Improvement
>Reporter: Dominik Wosiński
>Priority: Major
>
> AFAIK, there is currently no possibility to use Kafka or other connectors as 
> a sink in SQL Client. Such feature would be good for prototyping or quick 
> streams manipulation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9615) Add possibility to use Connectors as Sinks in SQL Client

2018-06-19 Thread JIRA


[ 
https://issues.apache.org/jira/browse/FLINK-9615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517095#comment-16517095
 ] 

Dominik Wosiński commented on FLINK-9615:
-

I can take care of that :) It's not a big improvement to be made.

> Add possibility to use Connectors as Sinks in SQL Client
> 
>
> Key: FLINK-9615
> URL: https://issues.apache.org/jira/browse/FLINK-9615
> Project: Flink
>  Issue Type: Improvement
>Reporter: Dominik Wosiński
>Priority: Major
>
> AFAIK, there is currently no possibility to use Kafka or other connectors as 
> a sink in SQL Client. Such feature would be good for prototyping or quick 
> streams manipulation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9614) Improve the error message for Compiler#compile

2018-06-19 Thread mingleizhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang updated FLINK-9614:

Description: 
When the below sql has too long. Like

case when  case when .
 when host in 
('114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247')
 then 'condition'

Then cause the {{StackOverflowError}}. And the current code is, but I would 
suggest prompt users add the {{-Xss 20m}} to solve this, instead of {{This is a 
bug..}}

{code:java}
trait Compiler[T] {

  @throws(classOf[CompileException])
  def compile(cl: ClassLoader, name: String, code: String): Class[T] = {
require(cl != null, "Classloader must not be null.")
val compiler = new SimpleCompiler()
compiler.setParentClassLoader(cl)
try {
  compiler.cook(code)
} catch {
  case t: Throwable =>
throw new InvalidProgramException("Table program cannot be compiled. " +
  "This is a bug. Please file an issue.", t)
}
compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]]
  }
}
{code}


> Improve the error message for Compiler#compile
> --
>
> Key: FLINK-9614
> URL: https://issues.apache.org/jira/browse/FLINK-9614
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: mingleizhang
>Assignee: mingleizhang
>Priority: Major
>
> When the below sql has too long. Like
> case when  case when .
>  when host in 
> ('114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247','114.67.56.94','114.67.56.102','114.67.56.103','114.67.56.106','114.67.56.107','183.60.220.231','183.60.220.232','183.60.219.247')
>  then 'condition'
> Then cause the {{StackOverflowError}}. And the current code is, but I would 
> suggest prompt users add the {{-Xss 20m}} to solve this, instead of {{This is 
> a bug..}}
> {code:java}
> trait Compiler[T] {
>   @throws(classOf[CompileException])
>   def compile(cl: ClassLoader, name: String, code: String): Class[T] = {
> require(cl != null, "Classloader must not be null.")
> val compiler = new SimpleCompiler()
> compiler.setParentClassLoader(cl)
> try {
>   compiler.cook(code)
> } catch {
>   case t: Throwable =>
> throw new InvalidProgramException("Table program cannot be compiled. 
> " +
>   "This is a bug. Please file an issue.", t)
> }
> compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]]
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9615) Add possibility to use Connectors as Sinks in SQL Client

2018-06-19 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/FLINK-9615?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dominik Wosiński updated FLINK-9615:

Summary: Add possibility to use Connectors as Sinks in SQL Client  (was: 
Add)

> Add possibility to use Connectors as Sinks in SQL Client
> 
>
> Key: FLINK-9615
> URL: https://issues.apache.org/jira/browse/FLINK-9615
> Project: Flink
>  Issue Type: Improvement
>Reporter: Dominik Wosiński
>Priority: Major
>
> AFAIK, there is currently no possibility to use Kafka or other connectors as 
> a sink in SQL Client. Such feature would be good for prototyping or quick 
> streams manipulation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9615) Add

2018-06-19 Thread JIRA
Dominik Wosiński created FLINK-9615:
---

 Summary: Add
 Key: FLINK-9615
 URL: https://issues.apache.org/jira/browse/FLINK-9615
 Project: Flink
  Issue Type: Improvement
Reporter: Dominik Wosiński


AFAIK, there is currently no possibility to use Kafka or other connectors as a 
sink in SQL Client. Such feature would be good for prototyping or quick streams 
manipulation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9614) Improve the error message for Compiler#compile

2018-06-19 Thread mingleizhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang updated FLINK-9614:

Component/s: Table API & SQL

> Improve the error message for Compiler#compile
> --
>
> Key: FLINK-9614
> URL: https://issues.apache.org/jira/browse/FLINK-9614
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: mingleizhang
>Assignee: mingleizhang
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9614) Improve the error message for Compiler#compile

2018-06-19 Thread mingleizhang (JIRA)
mingleizhang created FLINK-9614:
---

 Summary: Improve the error message for Compiler#compile
 Key: FLINK-9614
 URL: https://issues.apache.org/jira/browse/FLINK-9614
 Project: Flink
  Issue Type: Improvement
Reporter: mingleizhang
Assignee: mingleizhang






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9367) Truncate() in BucketingSink is only allowed after hadoop2.7

2018-06-19 Thread zhangxinyu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangxinyu closed FLINK-9367.
-
Resolution: Won't Do

BucketingSink is going to be rewritten, and hadoop below 2.7 won't be 
supported. Therefore, this issue is unnecessary.

> Truncate() in BucketingSink is only allowed after hadoop2.7
> ---
>
> Key: FLINK-9367
> URL: https://issues.apache.org/jira/browse/FLINK-9367
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.5.0
>Reporter: zhangxinyu
>Priority: Major
>
> When output to HDFS using BucketingSink, truncate() is only allowed after 
> hadoop2.7.
> If some tasks failed, the ".valid-length" file is created for the lower 
> version hadoop.
> The problem is, if other people want to use the data in HDFS, they must know 
> how to deal with the ".valid-length" file, otherwise, the data may be not 
> exactly-once.
> I think it's not convenient for other people to use the data. Why not just 
> read the in-progress file and write a new file when restoring instead of 
> writing a ".valid-length" file.
> In this way, others who use the data in HDFS don't need to know how to deal 
> with the ".valid-length" file.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516986#comment-16516986
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/6147


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516985#comment-16516985
 ] 

ASF GitHub Bot commented on FLINK-9280:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6147
  
I will split this PR to address the various issues separately.


> Extend JobSubmitHandler to accept jar files
> ---
>
> Key: FLINK-9280
> URL: https://issues.apache.org/jira/browse/FLINK-9280
> Project: Flink
>  Issue Type: New Feature
>  Components: Job-Submission, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob 
> server, sets the blob keys in the jobgraph, and then uploads this graph to 
> The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the 
> blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an 
> optional list of jar files, that were previously uploaded through the 
> {{JarUploadHandler}}. If present, the handler would upload these jars to the 
> blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6147: [FLINK-9280][rest] Rework JobSubmitHandler to acce...

2018-06-19 Thread zentol
Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/6147


---


[GitHub] flink issue #6147: [FLINK-9280][rest] Rework JobSubmitHandler to accept jar/...

2018-06-19 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6147
  
I will split this PR to address the various issues separately.


---


[jira] [Commented] (FLINK-9567) Flink does not release resource in Yarn Cluster mode

2018-06-19 Thread Shimin Yang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516972#comment-16516972
 ] 

Shimin Yang commented on FLINK-9567:


Thanks for assigning the issue to me, I will do my best to fix it.

> Flink does not release resource in Yarn Cluster mode
> 
>
> Key: FLINK-9567
> URL: https://issues.apache.org/jira/browse/FLINK-9567
> Project: Flink
>  Issue Type: Bug
>  Components: Cluster Management, YARN
>Affects Versions: 1.5.0
>Reporter: Shimin Yang
>Assignee: Shimin Yang
>Priority: Critical
> Fix For: 1.6.0
>
> Attachments: FlinkYarnProblem, fulllog.txt
>
>
> After restart the Job Manager in Yarn Cluster mode, sometimes Flink does not 
> release task manager containers in some specific case. In the worst case, I 
> had a job configured to 5 task managers, but possess more than 100 containers 
> in the end. Although the task didn't failed, but it affect other jobs in the 
> Yarn Cluster.
> In the first log I posted, the container with id 24 is the reason why Yarn 
> did not release resources. As the container was killed before restart, but it 
> has not received the callback of *onContainerComplete* in 
> *YarnResourceManager* which should be called by *AMRMAsyncClient* of Yarn. 
> After restart, as we can see in line 347 of FlinkYarnProblem log, 
> 2018-06-14 22:50:47,846 WARN akka.remote.ReliableDeliverySupervisor - 
> Association with remote system [akka.tcp://flink@bd-r1hdp69:30609] has 
> failed, address is now gated for [50] ms. Reason: [Disassociated]
> Flink lost the connection of container 24 which is on bd-r1hdp69 machine. 
> When it try to call *closeTaskManagerConnection* in *onContainerComplete*, it 
> did not has the connection to TaskManager on container 24, so it just ignore 
> the close of TaskManger.
> 2018-06-14 22:50:51,812 DEBUG org.apache.flink.yarn.YarnResourceManager - No 
> open TaskExecutor connection container_1528707394163_29461_02_24. 
> Ignoring close TaskExecutor connection.
>  However, bafore calling *closeTaskManagerConnection,* it already called 
> *requestYarnContainer* which lead to *numPendingContainerRequests variable 
> in* *YarnResourceManager* increased by 1.
> As the excessive container return is determined by the 
> *numPendingContainerRequests* variable in *YarnResourceManager*, it cannot 
> return this container although it is not required. Meanwhile, the restart 
> logic has already allocated enough containers for Task Managers, Flink will 
> possess the extra container for a long time for nothing. 
> In the full log, the job ended with 7 containers while only 3 are running 
> TaskManagers.
> ps: Another strange thing I found is that when sometimes request for a yarn 
> container, it will return much more than requested. Is it a normal scenario 
> for AMRMAsyncClient?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9367) Truncate() in BucketingSink is only allowed after hadoop2.7

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516961#comment-16516961
 ] 

ASF GitHub Bot commented on FLINK-9367:
---

Github user zhangxinyu1 closed the pull request at:

https://github.com/apache/flink/pull/6108


> Truncate() in BucketingSink is only allowed after hadoop2.7
> ---
>
> Key: FLINK-9367
> URL: https://issues.apache.org/jira/browse/FLINK-9367
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.5.0
>Reporter: zhangxinyu
>Priority: Major
>
> When output to HDFS using BucketingSink, truncate() is only allowed after 
> hadoop2.7.
> If some tasks failed, the ".valid-length" file is created for the lower 
> version hadoop.
> The problem is, if other people want to use the data in HDFS, they must know 
> how to deal with the ".valid-length" file, otherwise, the data may be not 
> exactly-once.
> I think it's not convenient for other people to use the data. Why not just 
> read the in-progress file and write a new file when restoring instead of 
> writing a ".valid-length" file.
> In this way, others who use the data in HDFS don't need to know how to deal 
> with the ".valid-length" file.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6108: [FLINK-9367] [Streaming Connectors] Allow to do tr...

2018-06-19 Thread zhangxinyu1
Github user zhangxinyu1 closed the pull request at:

https://github.com/apache/flink/pull/6108


---


[jira] [Commented] (FLINK-9610) Add Kafka partitioner that uses the key to partition by

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516914#comment-16516914
 ] 

ASF GitHub Bot commented on FLINK-9610:
---

Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/6181#discussion_r196368052
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKeyHashPartitioner.java
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+
+/**
+ * A partitioner that uses the hash of the provided key to distribute
+ * the values over the partitions as evenly as possible.
+ * This partitioner ensures that all records with the same key will be 
sent to
+ * the same Kafka partition.
+ *
+ * Note that this will cause a lot of network connections to be created 
between
+ * all the Flink instances and all the Kafka brokers.
+ */
+@PublicEvolving
+public class FlinkKeyHashPartitioner extends FlinkKafkaPartitioner {
+
+   private static final long serialVersionUID = -2006468063065010594L;
+
+   @Override
+   public int partition(T record, byte[] key, byte[] value, String 
targetTopic, int[] partitions) {
+   Preconditions.checkArgument(
+   partitions != null && partitions.length > 0,
+   "Partitions of the target topic is empty.");
+
+   return partitions[hash(key) % partitions.length];
--- End diff --

Should we guard against hash(key) % partitions.length < 0 (in case someone 
overrides hash()) ?


> Add Kafka partitioner that uses the key to partition by
> ---
>
> Key: FLINK-9610
> URL: https://issues.apache.org/jira/browse/FLINK-9610
> Project: Flink
>  Issue Type: New Feature
>  Components: Kafka Connector
>Reporter: Niels Basjes
>Assignee: Niels Basjes
>Priority: Major
>
> The kafka connector package only contains the FlinkFixedPartitioner 
> implementation of the FlinkKafkaPartitioner.
> The most common usecase I have seen is the need to spread the records across 
> the Kafka partitions while keeping all messages with the same key together.
> I'll put up a pull request with a very simple implementation that should make 
> this a lot easier for others to use and extend.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6181: [FLINK-9610] [flink-connector-kafka-base] Add Kafk...

2018-06-19 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/6181#discussion_r196368052
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkKeyHashPartitioner.java
 ---
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+
+/**
+ * A partitioner that uses the hash of the provided key to distribute
+ * the values over the partitions as evenly as possible.
+ * This partitioner ensures that all records with the same key will be 
sent to
+ * the same Kafka partition.
+ *
+ * Note that this will cause a lot of network connections to be created 
between
+ * all the Flink instances and all the Kafka brokers.
+ */
+@PublicEvolving
+public class FlinkKeyHashPartitioner extends FlinkKafkaPartitioner {
+
+   private static final long serialVersionUID = -2006468063065010594L;
+
+   @Override
+   public int partition(T record, byte[] key, byte[] value, String 
targetTopic, int[] partitions) {
+   Preconditions.checkArgument(
+   partitions != null && partitions.length > 0,
+   "Partitions of the target topic is empty.");
+
+   return partitions[hash(key) % partitions.length];
--- End diff --

Should we guard against hash(key) % partitions.length < 0 (in case someone 
overrides hash()) ?


---


[jira] [Created] (FLINK-9613) YARNSessionCapacitySchedulerITCase failed because YarnTestBase.checkClusterEmpty()

2018-06-19 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9613:
-

 Summary: YARNSessionCapacitySchedulerITCase failed because 
YarnTestBase.checkClusterEmpty()
 Key: FLINK-9613
 URL: https://issues.apache.org/jira/browse/FLINK-9613
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.6.0
Reporter: Sihua Zhou


The test YARNSessionCapacitySchedulerITCase failed on travis because of 
.YarnTestBase.checkClusterEmpty().

https://api.travis-ci.org/v3/job/394017104/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516911#comment-16516911
 ] 

ASF GitHub Bot commented on FLINK-9187:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5857
  
I I see the value of a `cluster_id` but you're mixing concerns here. It is 
not the responsibility of a `reporter` to introduce a `cluster_id` tag. 
Reporters are to faithfully report the metrics and their associated variables, 
not add more. Instead we may want to think about adding a configurable 
`cluster_id` value.

With that out of the way we have arrived at my previous question, random ID 
vs actual ID. That the `job_name` will be equal to `tm/jm_id` is precisely why 
i prefer this approach, it doesn't introduce additional noise in the tags.

I've already started working on `FLINK-9543` so we can proceed with this 
PR; the issue must be extended to also expose an ID for Dispatchers and 
register the JVM metrics for each JM. as to your question, `ResourceID`s are 
exactly what we're looking for; this is also what we're using for 
`TaskExecutor`.


> add prometheus pushgateway reporter
> ---
>
> Key: FLINK-9187
> URL: https://issues.apache.org/jira/browse/FLINK-9187
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> make flink system can send metrics to prometheus via pushgateway.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9569) Confusing construction of AvroSerializers for generic records

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516912#comment-16516912
 ] 

ASF GitHub Bot commented on FLINK-9569:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6151#discussion_r196366033
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -105,41 +108,54 @@
/** The currently accessing thread, set and checked on debug level 
only. */
private transient volatile Thread currentThread;
 
-   // 

+   // --- instantiation methods 
--
 
/**
 * Creates a new AvroSerializer for the type indicated by the given 
class.
-* This constructor is intended to be used with {@link SpecificRecord} 
or reflection serializer.
-* For serializing {@link GenericData.Record} use {@link 
AvroSerializer#AvroSerializer(Class, Schema)}
+*
+* This constructor is expected to be used only with {@link 
GenericRecord}.
+* For {@link SpecificRecord} or reflection serializer use {@link 
AvroSerializer#forNonGeneric(Class)}.
+*
+* @param schema the explicit schema to use for generic records.
 */
-   public AvroSerializer(Class type) {
-   checkArgument(!isGenericRecord(type),
-   "For GenericData.Record use constructor with explicit 
schema.");
-   this.type = checkNotNull(type);
-   this.schemaString = null;
+   public static AvroSerializer forGeneric(Schema schema) {
+   return new AvroSerializer<>(GenericRecord.class, schema);
--- End diff --

Should we do checking for schema to make sure it not null here?


> Confusing construction of AvroSerializers for generic records
> -
>
> Key: FLINK-9569
> URL: https://issues.apache.org/jira/browse/FLINK-9569
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>
> The {{AvroSerializer}} currently has a {{AvroSerializer(Class type, Schema 
> schema)}} public constructor when used for generic records.
> This is a bit confusing, because when using the \{{AvroSerializer}}, the type 
> to be serialized should always be a {{GenericData.Record}} type.
> We should either:
> - have a separate subclass of {{AvroSerializer}}, say 
> {{GenericRecordAvroSerializer}} that is a {{AvroSerializer}}, 
> or
> - follow a similar approach to the instantiation methods in the 
> {{AvroDeserialiationSchema}}. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6151: [FLINK-9569] [avro] Fix confusing construction of ...

2018-06-19 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6151#discussion_r196366033
  
--- Diff: 
flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
 ---
@@ -105,41 +108,54 @@
/** The currently accessing thread, set and checked on debug level 
only. */
private transient volatile Thread currentThread;
 
-   // 

+   // --- instantiation methods 
--
 
/**
 * Creates a new AvroSerializer for the type indicated by the given 
class.
-* This constructor is intended to be used with {@link SpecificRecord} 
or reflection serializer.
-* For serializing {@link GenericData.Record} use {@link 
AvroSerializer#AvroSerializer(Class, Schema)}
+*
+* This constructor is expected to be used only with {@link 
GenericRecord}.
+* For {@link SpecificRecord} or reflection serializer use {@link 
AvroSerializer#forNonGeneric(Class)}.
+*
+* @param schema the explicit schema to use for generic records.
 */
-   public AvroSerializer(Class type) {
-   checkArgument(!isGenericRecord(type),
-   "For GenericData.Record use constructor with explicit 
schema.");
-   this.type = checkNotNull(type);
-   this.schemaString = null;
+   public static AvroSerializer forGeneric(Schema schema) {
+   return new AvroSerializer<>(GenericRecord.class, schema);
--- End diff --

Should we do checking for schema to make sure it not null here?


---


[GitHub] flink issue #5857: [FLINK-9187][METRICS] add prometheus pushgateway reporter

2018-06-19 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5857
  
I I see the value of a `cluster_id` but you're mixing concerns here. It is 
not the responsibility of a `reporter` to introduce a `cluster_id` tag. 
Reporters are to faithfully report the metrics and their associated variables, 
not add more. Instead we may want to think about adding a configurable 
`cluster_id` value.

With that out of the way we have arrived at my previous question, random ID 
vs actual ID. That the `job_name` will be equal to `tm/jm_id` is precisely why 
i prefer this approach, it doesn't introduce additional noise in the tags.

I've already started working on `FLINK-9543` so we can proceed with this 
PR; the issue must be extended to also expose an ID for Dispatchers and 
register the JVM metrics for each JM. as to your question, `ResourceID`s are 
exactly what we're looking for; this is also what we're using for 
`TaskExecutor`.


---


[GitHub] flink issue #5857: [FLINK-9187][METRICS] add prometheus pushgateway reporter

2018-06-19 Thread lamber-ken
Github user lamber-ken commented on the issue:

https://github.com/apache/flink/pull/5857
  
@zentol , thanks for review.
first, there is a small point that the IDs may be duplicated when use JM/TM 
actual unique ID to compose the job name of pushgateway, like the picture below.

![image](https://user-images.githubusercontent.com/20113411/41587417-c7367d86-73e1-11e8-95fc-ae54dbd63809.png)

second, when config the grafana dashboard, the prefix of the pushgateway 
job name is useful, the uniq ID may not useful, because our metric data 
contains (`jm_id`, `tm_id`).

so, for pushgateway, the jobName is used to distinguish different services.
for grafana, the prefix of jobName can use to distinguish different 
clusters, we also can just use JM/TM ID and ignore the jobName.

`FLINK-9543` is useful and important. I tried to finished it. 
Is't ok to use `resourceId` to represent the JM‘ID?
```java
public abstract class ResourceManager ...{
public static final String RESOURCE_MANAGER_NAME = "resourcemanager";

/** Unique id of the resource manager. */
private final ResourceID resourceId;

```


---


[jira] [Commented] (FLINK-9187) add prometheus pushgateway reporter

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16516863#comment-16516863
 ] 

ASF GitHub Bot commented on FLINK-9187:
---

Github user lamber-ken commented on the issue:

https://github.com/apache/flink/pull/5857
  
@zentol , thanks for review.
first, there is a small point that the IDs may be duplicated when use JM/TM 
actual unique ID to compose the job name of pushgateway, like the picture below.

![image](https://user-images.githubusercontent.com/20113411/41587417-c7367d86-73e1-11e8-95fc-ae54dbd63809.png)

second, when config the grafana dashboard, the prefix of the pushgateway 
job name is useful, the uniq ID may not useful, because our metric data 
contains (`jm_id`, `tm_id`).

so, for pushgateway, the jobName is used to distinguish different services.
for grafana, the prefix of jobName can use to distinguish different 
clusters, we also can just use JM/TM ID and ignore the jobName.

`FLINK-9543` is useful and important. I tried to finished it. 
Is't ok to use `resourceId` to represent the JM‘ID?
```java
public abstract class ResourceManager ...{
public static final String RESOURCE_MANAGER_NAME = "resourcemanager";

/** Unique id of the resource manager. */
private final ResourceID resourceId;

```


> add prometheus pushgateway reporter
> ---
>
> Key: FLINK-9187
> URL: https://issues.apache.org/jira/browse/FLINK-9187
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> make flink system can send metrics to prometheus via pushgateway.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >