[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-22 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520233#comment-16520233
 ] 

Chesnay Schepler commented on FLINK-9599:
-

client:

master: 181559d5bcad50f919e95c7602057b929553f76b

1.5: c78e99007cb9dda3296b10e712e9de5f7d116cff

> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520140#comment-16520140
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6189


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520054#comment-16520054
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6189
  
merging.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519614#comment-16519614
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197217367
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -184,7 +206,64 @@ public void shutdown(Time timeout) {
return submitRequest(targetAddress, targetPort, httpRequest, 
responseType);
}
 
-   private  CompletableFuture 
submitRequest(String targetAddress, int targetPort, FullHttpRequest 
httpRequest, JavaType responseType) {
+   private static Request createRequest(String targetAddress, int 
targetPort, String targetUrl, HttpMethodWrapper httpMethod, ByteBuf 
jsonPayload, Collection fileUploads) throws IOException {
--- End diff --

Could we combine `targetAddress` and `targetPort` into `targetAddressPort = 
targetAddress + ':' + targetPort`?


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519616#comment-16519616
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197217460
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -184,7 +206,64 @@ public void shutdown(Time timeout) {
return submitRequest(targetAddress, targetPort, httpRequest, 
responseType);
}
 
-   private  CompletableFuture 
submitRequest(String targetAddress, int targetPort, FullHttpRequest 
httpRequest, JavaType responseType) {
+   private static Request createRequest(String targetAddress, int 
targetPort, String targetUrl, HttpMethodWrapper httpMethod, ByteBuf 
jsonPayload, Collection fileUploads) throws IOException {
--- End diff --

We could directly pass in `httpMethod.getNettyHttpMethod()` instead of the 
wrapper.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519617#comment-16519617
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197217551
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -239,6 +322,45 @@ public void shutdown(Time timeout) {
return responseFuture;
}
 
+   private interface Request {
+   void writeTo(Channel channel) throws IOException;
+   }
+
+   private static final class SimpleRequest implements Request {
+   private final HttpRequest httpRequest;
+
+   SimpleRequest(HttpRequest httpRequest) {
+   this.httpRequest = httpRequest;
+   }
+
+   @Override
+   public void writeTo(Channel channel) {
+   channel.writeAndFlush(httpRequest);
+   }
+   }
+
+   private static final class MultipartRequest implements Request {
+   private final HttpRequest httpRequest;
+   private final HttpPostRequestEncoder bodyRequestEncoder;
+
+   MultipartRequest(HttpRequest httpRequest, 
HttpPostRequestEncoder bodyRequestEncoder) {
+   this.httpRequest = httpRequest;
+   this.bodyRequestEncoder = bodyRequestEncoder;
+   }
+
+   @Override
+   public void writeTo(Channel channel) {
+   channel.writeAndFlush(httpRequest);
+   // this should never be false as we explicitly set the 
encoder to use multipart messages
+   if (bodyRequestEncoder.isChunked()) {
+   channel.writeAndFlush(bodyRequestEncoder);
+   }
+
+   // release data and remove temporary files if they were 
created
+   bodyRequestEncoder.cleanFiles();
+   }
+   }
--- End diff --

Nice, this looks now really sleek  


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519550#comment-16519550
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6189
  
I've rebased the PR.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519507#comment-16519507
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6178


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519393#comment-16519393
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197142993
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -106,7 +106,7 @@ protected void channelRead0(final ChannelHandlerContext 
ctx, final HttpObject ms
final HttpContent httpContent = (HttpContent) 
msg;

currentHttpPostRequestDecoder.offer(httpContent);
 
-   while (currentHttpPostRequestDecoder.hasNext()) 
{
+   while (httpContent != 
LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) {
--- End diff --

As far as i can tell we are using the encoder correctly, but the decoder 
usage wasn't written against the encoder, but (i guess) only against `curl` or 
the web UI that never send an empty LAST_HTTP_CONTENT, but a 
`DefaultLastHttpContent` instead.

Interestingly enough, if you use anything but the netty encoder, without an 
`instanceof LastHttpContent` check it isn't possible to know whether the 
decoder is done or not.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519380#comment-16519380
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197138450
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -106,7 +106,7 @@ protected void channelRead0(final ChannelHandlerContext 
ctx, final HttpObject ms
final HttpContent httpContent = (HttpContent) 
msg;

currentHttpPostRequestDecoder.offer(httpContent);
 
-   while (currentHttpPostRequestDecoder.hasNext()) 
{
+   while (httpContent != 
LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) {
--- End diff --

But we are using the `HttpPostRequestEncoder`, right? So do we use it maybe 
wrongly?


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519305#comment-16519305
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6178
  
will change the logging and merge afterwards.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519270#comment-16519270
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197106285
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -106,7 +106,7 @@ protected void channelRead0(final ChannelHandlerContext 
ctx, final HttpObject ms
final HttpContent httpContent = (HttpContent) 
msg;

currentHttpPostRequestDecoder.offer(httpContent);
 
-   while (currentHttpPostRequestDecoder.hasNext()) 
{
+   while (httpContent != 
LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) {
--- End diff --

The `HttpPostRequestEncoder` always sends an `EMPTY_LAST_HTTP_CONTENT` to 
mark the end of the requests, and the decoder handles it as described.

It could be that the decoder is not actually a general-purpose decoder but 
explicitly written as a counterpart for the encoder.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519239#comment-16519239
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197098273
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -106,7 +106,7 @@ protected void channelRead0(final ChannelHandlerContext 
ctx, final HttpObject ms
final HttpContent httpContent = (HttpContent) 
msg;

currentHttpPostRequestDecoder.offer(httpContent);
 
-   while (currentHttpPostRequestDecoder.hasNext()) 
{
+   while (httpContent != 
LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) {
--- End diff --

Why is an `EMPTY_LAST_HTTP_CONTENT` sent after the processing of the HTTP 
content has been completed? Are we doing something wrong on the client side? Is 
it a bug in Netty? Before adding these kind of fixes, I would really like to 
completely understand why this is happening.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519232#comment-16519232
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197095206
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -106,7 +106,7 @@ protected void channelRead0(final ChannelHandlerContext 
ctx, final HttpObject ms
final HttpContent httpContent = (HttpContent) 
msg;

currentHttpPostRequestDecoder.offer(httpContent);
 
-   while (currentHttpPostRequestDecoder.hasNext()) 
{
+   while (httpContent != 
LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) {
--- End diff --

When you have passed an `EMPTY_LAST_HTTP_CONTENT` to the decoder the entire 
content of the message was already processsed. Calling `hasNext()` on a decoder 
that has already processed the entire content throws the exception.

This was done to differentiate between "I don't have more data _right 
now_." and "I will never have more date.". We manually do this by checking 
`instanceof LastHttpContent`.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519211#comment-16519211
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197086869
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -151,18 +143,26 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
ctx.fireChannelRead(msg);
}
} catch (Exception e) {
-   HttpRequest tmpRequest = currentHttpRequest;
-   deleteUploadedFiles();
-   reset();
-   LOG.warn("Internal server error. File upload failed.", 
e);
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   tmpRequest,
-   new ErrorResponseBody("File upload failed."),
-   HttpResponseStatus.INTERNAL_SERVER_ERROR,
-   Collections.emptyMap()
-   );
+   handleError(ctx, "File upload failed.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR, e);
+   }
+   }
+
+   private void handleError(ChannelHandlerContext ctx, String 
errorMessage, HttpResponseStatus responseStatus, @Nullable Throwable e) {
+   HttpRequest tmpRequest = currentHttpRequest;
+   deleteUploadedFiles();
+   reset();
+   if (e == null) {
+   LOG.warn(errorMessage);
+   } else {
+   LOG.warn(errorMessage, e);
--- End diff --

I don't think so.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519206#comment-16519206
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197085237
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -151,18 +143,26 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
ctx.fireChannelRead(msg);
}
} catch (Exception e) {
-   HttpRequest tmpRequest = currentHttpRequest;
-   deleteUploadedFiles();
-   reset();
-   LOG.warn("Internal server error. File upload failed.", 
e);
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   tmpRequest,
-   new ErrorResponseBody("File upload failed."),
-   HttpResponseStatus.INTERNAL_SERVER_ERROR,
-   Collections.emptyMap()
-   );
+   handleError(ctx, "File upload failed.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR, e);
+   }
+   }
+
+   private void handleError(ChannelHandlerContext ctx, String 
errorMessage, HttpResponseStatus responseStatus, @Nullable Throwable e) {
+   HttpRequest tmpRequest = currentHttpRequest;
+   deleteUploadedFiles();
+   reset();
+   if (e == null) {
+   LOG.warn(errorMessage);
+   } else {
+   LOG.warn(errorMessage, e);
--- End diff --

but this will also print "null", won't it? That's what i was trying to 
avoid here.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519182#comment-16519182
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197078502
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -151,18 +143,26 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
ctx.fireChannelRead(msg);
}
} catch (Exception e) {
-   HttpRequest tmpRequest = currentHttpRequest;
-   deleteUploadedFiles();
-   reset();
-   LOG.warn("Internal server error. File upload failed.", 
e);
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   tmpRequest,
-   new ErrorResponseBody("File upload failed."),
-   HttpResponseStatus.INTERNAL_SERVER_ERROR,
-   Collections.emptyMap()
-   );
+   handleError(ctx, "File upload failed.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR, e);
+   }
+   }
+
+   private void handleError(ChannelHandlerContext ctx, String 
errorMessage, HttpResponseStatus responseStatus, @Nullable Throwable e) {
+   HttpRequest tmpRequest = currentHttpRequest;
+   deleteUploadedFiles();
+   reset();
+   if (e == null) {
+   LOG.warn(errorMessage);
+   } else {
+   LOG.warn(errorMessage, e);
--- End diff --

I think we don't have to make this distinction here. 
`LOG.warn(errorMessage, null)` should also work.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519177#comment-16519177
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197077293
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -106,7 +106,7 @@ protected void channelRead0(final ChannelHandlerContext 
ctx, final HttpObject ms
final HttpContent httpContent = (HttpContent) 
msg;

currentHttpPostRequestDecoder.offer(httpContent);
 
-   while (currentHttpPostRequestDecoder.hasNext()) 
{
+   while (httpContent != 
LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) {
--- End diff --

Why does it fail otherwise?


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519175#comment-16519175
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197076585
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -106,7 +106,7 @@ protected void channelRead0(final ChannelHandlerContext 
ctx, final HttpObject ms
final HttpContent httpContent = (HttpContent) 
msg;

currentHttpPostRequestDecoder.offer(httpContent);
 
-   while (currentHttpPostRequestDecoder.hasNext()) 
{
+   while (httpContent != 
LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) {
--- End diff --

because it fails otherwise, see 
https://issues.apache.org/jira/browse/FLINK-9500. See PR description on how to 
reproduce it.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519165#comment-16519165
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197073186
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadTestBase.java
 ---
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test base for verifying support of multipart uploads via REST.
+ */
+public abstract class MultipartUploadTestBase extends TestLogger {
--- End diff --

I'm not a huge fan of using inheritance in order to share code since it 
makes understanding tests quite hard. Instead one could create a 
`RestServerEndpointResource` or a `MultipartUploadResource` which extends 
`ExternalResource` and can be added via a `@Rule` or `@ClassRule`. See 
`MiniClusterResource` for an example.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite 

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519156#comment-16519156
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197062416
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -145,12 +157,43 @@ public void shutdown(Time timeout) {
}
}
 
-   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture 
sendRequest(String targetAddress, int targetPort, M messageHeaders, U 
messageParameters, R request) throws IOException {
+   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(
+   String targetAddress,
+   int targetPort,
+   M messageHeaders,
+   U messageParameters,
+   R request) throws IOException {
+   return internalSendRequest(targetAddress, targetPort, 
messageHeaders, messageParameters, request, false, new DefaultProcessor());
+   }
+
+   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(
+   String targetAddress,
+   int targetPort,
+   M messageHeaders,
+   U messageParameters,
+   R request,
+   Collection fileUploads) throws IOException {
--- End diff --

Wrong indentation


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519152#comment-16519152
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197062458
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -145,12 +157,43 @@ public void shutdown(Time timeout) {
}
}
 
-   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture 
sendRequest(String targetAddress, int targetPort, M messageHeaders, U 
messageParameters, R request) throws IOException {
+   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(
+   String targetAddress,
+   int targetPort,
+   M messageHeaders,
+   U messageParameters,
+   R request) throws IOException {
+   return internalSendRequest(targetAddress, targetPort, 
messageHeaders, messageParameters, request, false, new DefaultProcessor());
+   }
+
+   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(
+   String targetAddress,
+   int targetPort,
+   M messageHeaders,
+   U messageParameters,
+   R request,
+   Collection fileUploads) throws IOException {
+   if (fileUploads.isEmpty()) {
+   return sendRequest(targetAddress, targetPort, 
messageHeaders, messageParameters, request);
+   } else {
+   return internalSendRequest(targetAddress, targetPort, 
messageHeaders, messageParameters, request, true, new 
MultipartProcessor(fileUploads));
+   }
+   }
+
+   private , U extends 
MessageParameters, R extends RequestBody, P extends ResponseBody, T> 
CompletableFuture internalSendRequest(
+   String targetAddress,
+   int targetPort,
+   M messageHeaders,
+   U messageParameters,
+   R request,
+   boolean multipart,
+   RequestProcessor requestProcessor) throws IOException {
--- End diff --

Wrong indentation


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519157#comment-16519157
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197062491
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -145,12 +157,43 @@ public void shutdown(Time timeout) {
}
}
 
-   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture 
sendRequest(String targetAddress, int targetPort, M messageHeaders, U 
messageParameters, R request) throws IOException {
+   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(
+   String targetAddress,
+   int targetPort,
+   M messageHeaders,
+   U messageParameters,
+   R request) throws IOException {
--- End diff --

Wrong indentation


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519163#comment-16519163
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197069574
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -212,6 +271,86 @@ public void shutdown(Time timeout) {
executor);
}
 
+   private interface RequestProcessor {
+   T createRequest(HttpRequest request, ByteBuf jsonPayload) 
throws IOException;
--- End diff --

Remove this method


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519159#comment-16519159
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197070028
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -212,6 +271,86 @@ public void shutdown(Time timeout) {
executor);
}
 
+   private interface RequestProcessor {
+   T createRequest(HttpRequest request, ByteBuf jsonPayload) 
throws IOException;
+
+   void writeRequest(T body, Channel channel) throws IOException;
+   }
+
+   private static final class DefaultProcessor implements 
RequestProcessor {
+
+   @Override
+   public HttpRequest createRequest(HttpRequest request, ByteBuf 
jsonPayload) throws IOException {
+   return request;
+   }
+
+   @Override
+   public void writeRequest(HttpRequest body, Channel channel) 
throws IOException {
+   channel.writeAndFlush(body);
+   }
+   }
+
+   private static final class MultipartProcessor implements 
RequestProcessor {
--- End diff --

Rename to `MultipartRequest` which is initialized with a 
`HttpPostRequestEncoder` which it uses to write out the multi part request in 
the `writeRequest` implementation.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519158#comment-16519158
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197069640
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -212,6 +271,86 @@ public void shutdown(Time timeout) {
executor);
}
 
+   private interface RequestProcessor {
+   T createRequest(HttpRequest request, ByteBuf jsonPayload) 
throws IOException;
+
+   void writeRequest(T body, Channel channel) throws IOException;
--- End diff --

Change this method to only take a `Channel`.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519153#comment-16519153
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197069542
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -212,6 +271,86 @@ public void shutdown(Time timeout) {
executor);
}
 
+   private interface RequestProcessor {
--- End diff --

Following my suggestion I would rename this interface into `Request`


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519154#comment-16519154
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197062277
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -145,12 +157,43 @@ public void shutdown(Time timeout) {
}
}
 
-   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture 
sendRequest(String targetAddress, int targetPort, M messageHeaders, U 
messageParameters, R request) throws IOException {
+   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(
+   String targetAddress,
+   int targetPort,
+   M messageHeaders,
+   U messageParameters,
+   R request) throws IOException {
+   return internalSendRequest(targetAddress, targetPort, 
messageHeaders, messageParameters, request, false, new DefaultProcessor());
+   }
+
+   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(
+   String targetAddress,
+   int targetPort,
+   M messageHeaders,
+   U messageParameters,
+   R request,
+   Collection fileUploads) throws IOException {
+   if (fileUploads.isEmpty()) {
+   return sendRequest(targetAddress, targetPort, 
messageHeaders, messageParameters, request);
--- End diff --

here we should call `internalSendRequest(..., false, new 
DefaultProcessor())`


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519162#comment-16519162
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197069842
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -212,6 +271,86 @@ public void shutdown(Time timeout) {
executor);
}
 
+   private interface RequestProcessor {
+   T createRequest(HttpRequest request, ByteBuf jsonPayload) 
throws IOException;
+
+   void writeRequest(T body, Channel channel) throws IOException;
+   }
+
+   private static final class DefaultProcessor implements 
RequestProcessor {
--- End diff --

Rename to `SimpleRequest` which is initialized with a `HttpRequest` which 
it writes out in the writeRequest method.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519151#comment-16519151
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197061001
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -106,7 +106,7 @@ protected void channelRead0(final ChannelHandlerContext 
ctx, final HttpObject ms
final HttpContent httpContent = (HttpContent) 
msg;

currentHttpPostRequestDecoder.offer(httpContent);
 
-   while (currentHttpPostRequestDecoder.hasNext()) 
{
+   while (httpContent != 
LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) {
--- End diff --

Why do we have to add this filter condition?


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519164#comment-16519164
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197068513
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -162,13 +205,25 @@ public void shutdown(Time timeout) {
ByteBuf payload = 
Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
 
// create request and set headers
-   FullHttpRequest httpRequest = new 
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, 
messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
+   final FullHttpRequest httpRequest;
+   if (multipart) {
--- End diff --

I think it would be better to have our own `Request` base class, a method 
`createRequest(targetUrl, payload, files, etc)` and `MultiPartRequest` and 
`SimpleRequest` sub classes. A method `Request#send(Channel)` could encapsulate 
the logic to write out the request. Then `internalSendRequest` has the 
responsibility to construct the `Request` and triggering the submission. I'm 
actually not sure whether we would need `internalSendRequest` at all. Doing 
this in `sendRequest` where we have a `Collection` should also 
work. `createRequest` would then check whether to create a `MultiPartRequest` 
or a `SimpleRequest` based on the `files` parameter.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519155#comment-16519155
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197062109
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -145,12 +157,43 @@ public void shutdown(Time timeout) {
}
}
 
-   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture 
sendRequest(String targetAddress, int targetPort, M messageHeaders, U 
messageParameters, R request) throws IOException {
+   public , U extends MessageParameters, 
R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(
+   String targetAddress,
+   int targetPort,
+   M messageHeaders,
+   U messageParameters,
+   R request) throws IOException {
+   return internalSendRequest(targetAddress, targetPort, 
messageHeaders, messageParameters, request, false, new DefaultProcessor());
--- End diff --

I think here we should call `sendRequest(targetAddress, targetPort, 
messageHeaders, messageParameters, request, Collections.emptyList())`


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519161#comment-16519161
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197070198
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -212,6 +271,86 @@ public void shutdown(Time timeout) {
executor);
}
 
+   private interface RequestProcessor {
+   T createRequest(HttpRequest request, ByteBuf jsonPayload) 
throws IOException;
+
+   void writeRequest(T body, Channel channel) throws IOException;
+   }
+
+   private static final class DefaultProcessor implements 
RequestProcessor {
+
+   @Override
+   public HttpRequest createRequest(HttpRequest request, ByteBuf 
jsonPayload) throws IOException {
+   return request;
+   }
+
+   @Override
+   public void writeRequest(HttpRequest body, Channel channel) 
throws IOException {
+   channel.writeAndFlush(body);
+   }
+   }
+
+   private static final class MultipartProcessor implements 
RequestProcessor {
+
+   private final Collection fileUploads;
+
+   MultipartProcessor(Collection fileUploads) {
+   this.fileUploads = fileUploads;
+   }
+
+   @Override
+   public HttpPostRequestEncoder createRequest(HttpRequest 
httpRequest, ByteBuf jsonPayload) throws IOException {
--- End diff --

Move this method out to `RestClient` to generate the 
`HttpPostRequestEncoder` with which we then create a `MultipartRequest` 
instance.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519160#comment-16519160
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197069173
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -181,10 +236,10 @@ public void shutdown(Time timeout) {
typeParameters.toArray(new 
Class[typeParameters.size()]));
}
 
-   return submitRequest(targetAddress, targetPort, httpRequest, 
responseType);
+   return submitRequest(targetAddress, targetPort, finalRequest, 
requestProcessor, responseType);
}
 
-   private  CompletableFuture 
submitRequest(String targetAddress, int targetPort, FullHttpRequest 
httpRequest, JavaType responseType) {
+   private  CompletableFuture 
submitRequest(String targetAddress, int targetPort, T request, 
RequestProcessor processor, JavaType responseType) {
--- End diff --

I think we don't have to split the request from the request write logic by 
splitting `RequestProcessor` and `request` up.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519116#comment-16519116
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197065693
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java 
---
@@ -103,77 +102,68 @@ protected void respondAsLeader(ChannelHandlerContext 
ctx, RoutedRequest routedRe
return;
}
 
-   ByteBuf msgContent = ((FullHttpRequest) 
httpRequest).content();
-
-   R request;
-   if (isFileUpload()) {
-   final Path path = 
ctx.channel().attr(FileUploadHandler.UPLOADED_FILE).get();
-   if (path == null) {
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   httpRequest,
-   new ErrorResponseBody("Client 
did not upload a file."),
-   HttpResponseStatus.BAD_REQUEST,
-   responseHeaders);
-   return;
-   }
-   //noinspection unchecked
-   request = (R) new FileUpload(path);
-   } else if (msgContent.capacity() == 0) {
-   try {
-   request = MAPPER.readValue("{}", 
untypedResponseMessageHeaders.getRequestClass());
-   } catch (JsonParseException | 
JsonMappingException je) {
-   log.error("Request did not conform to 
expected format.", je);
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   httpRequest,
-   new ErrorResponseBody("Bad 
request received."),
-   HttpResponseStatus.BAD_REQUEST,
-   responseHeaders);
-   return;
+   final ByteBuf msgContent = ((FullHttpRequest) 
httpRequest).content();
+
+   try (FileUploads uploadedFiles = 
FileUploadHandler.getMultipartFileUploads(ctx)) {
+
+   R request;
+   if (msgContent.capacity() == 0) {
+   try {
+   request = 
MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
+   } catch (JsonParseException | 
JsonMappingException je) {
+   log.error("Request did not 
conform to expected format.", je);
+   HandlerUtils.sendErrorResponse(
+   ctx,
+   httpRequest,
+   new 
ErrorResponseBody("Bad request received."),
+   
HttpResponseStatus.BAD_REQUEST,
+   responseHeaders);
+   return;
+   }
+   } else {
+   try {
+   ByteBufInputStream in = new 
ByteBufInputStream(msgContent);
--- End diff --

btw, this change is also part of the previosu commit that isn't part of 
this PR.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar 

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519115#comment-16519115
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6178
  
@tillrohrmann Ready for another review.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519093#comment-16519093
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197058127
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -259,6 +271,29 @@ public void testFileMultipart() throws Exception {
}
}
 
+   @Test
+   public void testUploadCleanupOnFailure() throws IOException {
+   OkHttpClient client = new OkHttpClient();
+
+   Request request = 
buildMixedRequestWithUnknownAttribute(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
+   try (Response response = client.newCall(request).execute()) {
+   assertEquals(HttpResponseStatus.BAD_REQUEST.code(), 
response.code());
+   }
+   assertUploadDirectoryIsEmpty();
+   }
+
+   private static void assertUploadDirectoryIsEmpty() throws IOException {
+   Preconditions.checkArgument(
+   1 == Files.list(configuredUploadDir).count(),
+   "Directory structure in rest upload directory has 
changed. Test must be adjusted");
+   Optional actualUploadDir = 
Files.list(configuredUploadDir).findAny();
+   Preconditions.checkArgument(
+   actualUploadDir.isPresent(),
+   "Expected upload directory does not exist.");
+   
System.out.println(Files.list(actualUploadDir.get()).collect(Collectors.toList()));
+   assertEquals("Not all files were cleaned up.", 0, 
Files.list(actualUploadDir.get()).count());
--- End diff --

Alright, but then the order how methods are called in the catch block is 
incorrect. Maybe that should be factored out into a method to avoid code 
duplication.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519096#comment-16519096
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/6178
  
Checkstyle errors:
```
18:31:10.780 [INFO] There are 2 errors reported by Checkstyle 8.4 with 
/tools/maven/checkstyle.xml ruleset.
18:31:10.788 [ERROR] 
src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java:[56] 
(regexp) RegexpSingleline: Trailing whitespace
18:31:10.804 [ERROR] 
src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java:[99] 
(regexp) RegexpSingleline: Trailing whitespace
```


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519090#comment-16519090
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197055449
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   public static FileUploads forDirectory(Path directory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   Preconditions.checkArgument(directory.isAbsolute(), "Path must 
be absolute.");
+   Preconditions.checkArgument(Files.isDirectory(directory), "Path 
must be a directory.");
+
+   FileAdderVisitor visitor = new FileAdderVisitor();
+   Files.walkFileTree(directory, visitor);
+   files.addAll(visitor.getContainedFiles());
+   
+   return new FileUploads(Collections.singleton(directory), files);
+   }
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection directoriesToClean, 
Collection uploadedFiles) {
+   this.directoriesToClean = 
Preconditions.checkNotNull(directoriesToClean);
+   this.uploadedFiles = Preconditions.checkNotNull(uploadedFiles);
--- End diff --

I know it's really nitpicking what I'm doing here, but I think it would be 
slightly better to let FileUploads only represent the upload directories and 
add a method FileUploads#getFiles which returns a Collection which are 
all files being found in the upload directory. The difference is that we don't 
initialize FileUploads with it. That would effectively enforce that all files 
reside in the given upload directories. What we could do now is to initialize 
this class with directories /web/upload/a, /web/upload/b and files 
/web/different/path/file where the files are somewhere else located. Due to 
this, we not only need to delete the directories but also all files.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519087#comment-16519087
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197054770
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -259,6 +271,29 @@ public void testFileMultipart() throws Exception {
}
}
 
+   @Test
+   public void testUploadCleanupOnFailure() throws IOException {
+   OkHttpClient client = new OkHttpClient();
+
+   Request request = 
buildMixedRequestWithUnknownAttribute(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
+   try (Response response = client.newCall(request).execute()) {
+   assertEquals(HttpResponseStatus.BAD_REQUEST.code(), 
response.code());
+   }
+   assertUploadDirectoryIsEmpty();
+   }
+
+   private static void assertUploadDirectoryIsEmpty() throws IOException {
+   Preconditions.checkArgument(
+   1 == Files.list(configuredUploadDir).count(),
+   "Directory structure in rest upload directory has 
changed. Test must be adjusted");
+   Optional actualUploadDir = 
Files.list(configuredUploadDir).findAny();
+   Preconditions.checkArgument(
+   actualUploadDir.isPresent(),
+   "Expected upload directory does not exist.");
+   
System.out.println(Files.list(actualUploadDir.get()).collect(Collectors.toList()));
+   assertEquals("Not all files were cleaned up.", 0, 
Files.list(actualUploadDir.get()).count());
--- End diff --

This test wasn't covering the case of exceptions but the rejection of 
unknown attributes. Will try to find a way to crash the handler.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519086#comment-16519086
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197054545
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.router.RouteResult;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.util.Attribute;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AbstractHandler}.
+ */
+public class AbstractHandlerTest {
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @Test
+   public void testFileCleanup() throws Exception {
+   final Path file = temporaryFolder.newFile().toPath();
+
+   final String restAddress = "http://localhost:1234;;
+   RestfulGateway mockRestfulGateway = 
TestingRestfulGateway.newBuilder()
+   .setRestAddress(restAddress)
+   .build();
+
+   final GatewayRetriever mockGatewayRetriever = 
() ->
+   CompletableFuture.completedFuture(mockRestfulGateway);
+
+   TestHandler handler = new 
TestHandler(CompletableFuture.completedFuture(restAddress), 
mockGatewayRetriever);
+
+   RouteResult routeResult = new RouteResult<>("", "", 
Collections.emptyMap(), Collections.emptyMap(), "");
+   HttpRequest request = new DefaultFullHttpRequest(
+   HttpVersion.HTTP_1_1,
+   HttpMethod.GET,
+   
TestHandler.TestHeaders.INSTANCE.getTargetRestEndpointURL(),
+   Unpooled.wrappedBuffer(new byte[0]));
+   RoutedRequest routerRequest = new 
RoutedRequest<>(routeResult, request);
+
+   Attribute attribute = new 

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519081#comment-16519081
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197051395
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -259,6 +271,29 @@ public void testFileMultipart() throws Exception {
}
}
 
+   @Test
+   public void testUploadCleanupOnFailure() throws IOException {
+   OkHttpClient client = new OkHttpClient();
+
+   Request request = 
buildMixedRequestWithUnknownAttribute(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
+   try (Response response = client.newCall(request).execute()) {
+   assertEquals(HttpResponseStatus.BAD_REQUEST.code(), 
response.code());
+   }
+   assertUploadDirectoryIsEmpty();
+   }
+
+   private static void assertUploadDirectoryIsEmpty() throws IOException {
+   Preconditions.checkArgument(
+   1 == Files.list(configuredUploadDir).count(),
+   "Directory structure in rest upload directory has 
changed. Test must be adjusted");
+   Optional actualUploadDir = 
Files.list(configuredUploadDir).findAny();
+   Preconditions.checkArgument(
+   actualUploadDir.isPresent(),
+   "Expected upload directory does not exist.");
+   
System.out.println(Files.list(actualUploadDir.get()).collect(Collectors.toList()));
--- End diff --

Remove system out


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519082#comment-16519082
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197053303
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -259,6 +271,29 @@ public void testFileMultipart() throws Exception {
}
}
 
+   @Test
+   public void testUploadCleanupOnFailure() throws IOException {
+   OkHttpClient client = new OkHttpClient();
+
+   Request request = 
buildMixedRequestWithUnknownAttribute(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
+   try (Response response = client.newCall(request).execute()) {
+   assertEquals(HttpResponseStatus.BAD_REQUEST.code(), 
response.code());
+   }
+   assertUploadDirectoryIsEmpty();
+   }
+
+   private static void assertUploadDirectoryIsEmpty() throws IOException {
+   Preconditions.checkArgument(
+   1 == Files.list(configuredUploadDir).count(),
+   "Directory structure in rest upload directory has 
changed. Test must be adjusted");
+   Optional actualUploadDir = 
Files.list(configuredUploadDir).findAny();
+   Preconditions.checkArgument(
+   actualUploadDir.isPresent(),
+   "Expected upload directory does not exist.");
+   
System.out.println(Files.list(actualUploadDir.get()).collect(Collectors.toList()));
+   assertEquals("Not all files were cleaned up.", 0, 
Files.list(actualUploadDir.get()).count());
--- End diff --

Why don't we run into a race condition with the catch block of 
`FileUploadHandler`?


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519080#comment-16519080
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197053400
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.router.RouteResult;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.util.Attribute;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AbstractHandler}.
+ */
+public class AbstractHandlerTest {
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @Test
+   public void testFileCleanup() throws Exception {
+   final Path file = temporaryFolder.newFile().toPath();
+
+   final String restAddress = "http://localhost:1234;;
+   RestfulGateway mockRestfulGateway = 
TestingRestfulGateway.newBuilder()
+   .setRestAddress(restAddress)
+   .build();
+
+   final GatewayRetriever mockGatewayRetriever = 
() ->
+   CompletableFuture.completedFuture(mockRestfulGateway);
+
+   TestHandler handler = new 
TestHandler(CompletableFuture.completedFuture(restAddress), 
mockGatewayRetriever);
+
+   RouteResult routeResult = new RouteResult<>("", "", 
Collections.emptyMap(), Collections.emptyMap(), "");
+   HttpRequest request = new DefaultFullHttpRequest(
+   HttpVersion.HTTP_1_1,
+   HttpMethod.GET,
+   
TestHandler.TestHeaders.INSTANCE.getTargetRestEndpointURL(),
+   Unpooled.wrappedBuffer(new byte[0]));
+   RoutedRequest routerRequest = new 
RoutedRequest<>(routeResult, request);
+
+   Attribute attribute = new 

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519071#comment-16519071
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197050662
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.router.RouteResult;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.util.Attribute;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AbstractHandler}.
+ */
+public class AbstractHandlerTest {
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @Test
+   public void testFileCleanup() throws Exception {
+   final Path file = temporaryFolder.newFile().toPath();
+
+   final String restAddress = "http://localhost:1234;;
+   RestfulGateway mockRestfulGateway = 
TestingRestfulGateway.newBuilder()
+   .setRestAddress(restAddress)
+   .build();
+
+   final GatewayRetriever mockGatewayRetriever = 
() ->
+   CompletableFuture.completedFuture(mockRestfulGateway);
+
+   TestHandler handler = new 
TestHandler(CompletableFuture.completedFuture(restAddress), 
mockGatewayRetriever);
+
+   RouteResult routeResult = new RouteResult<>("", "", 
Collections.emptyMap(), Collections.emptyMap(), "");
+   HttpRequest request = new DefaultFullHttpRequest(
+   HttpVersion.HTTP_1_1,
+   HttpMethod.GET,
+   
TestHandler.TestHeaders.INSTANCE.getTargetRestEndpointURL(),
+   Unpooled.wrappedBuffer(new byte[0]));
+   RoutedRequest routerRequest = new 
RoutedRequest<>(routeResult, request);
+
+   Attribute attribute = new 

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519070#comment-16519070
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197050066
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.router.RouteResult;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.util.Attribute;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AbstractHandler}.
+ */
+public class AbstractHandlerTest {
--- End diff --

`TestLogger` missing


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519067#comment-16519067
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197049400
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -45,27 +45,26 @@
@SuppressWarnings("resource")
public static final FileUploads EMPTY = new FileUploads();
 
+   public static FileUploads forDirectory(Path directory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   Preconditions.checkArgument(directory.isAbsolute(), "Path must 
be absolute.");
+   Preconditions.checkArgument(Files.isDirectory(directory), "Path 
must be a directory.");
+
+   FileAdderVisitor visitor = new FileAdderVisitor();
+   Files.walkFileTree(directory, visitor);
+   files.addAll(visitor.getContainedFiles());
+   
+   return new FileUploads(Collections.singleton(directory), files);
+   }
+
private FileUploads() {
this.directoriesToClean = Collections.emptyList();
this.uploadedFiles = Collections.emptyList();
}
 
-   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
-   final Collection files = new ArrayList<>(4);
-   final Collection directories = new ArrayList<>(1);
-   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
-   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
-   if (Files.isDirectory(fileOrDirectory)) {
-   directories.add(fileOrDirectory);
-   FileAdderVisitor visitor = new 
FileAdderVisitor();
-   Files.walkFileTree(fileOrDirectory, visitor);
-   files.addAll(visitor.getContainedFiles());
-   } else {
-   files.add(fileOrDirectory);
-   }
-   }
-   directoriesToClean = 
Collections.unmodifiableCollection(directories);
-   uploadedFiles = Collections.unmodifiableCollection(files);
+   public FileUploads(Collection directoriesToClean, 
Collection uploadedFiles) {
+   this.directoriesToClean = 
Preconditions.checkNotNull(directoriesToClean);
+   this.uploadedFiles = Preconditions.checkNotNull(uploadedFiles);
--- End diff --

I know it's really nitpicking what I'm doing here, but I think it would be 
slightly better to let `FileUploads` only represent the upload directories and 
add a method `FileUploads#getFiles` which returns a `Collection` which 
are all files being found in the upload directory. The difference is that we 
don't initialize `FileUploads` with it. That would effectively enforce that all 
files reside in the given upload directories. What we could do now is to 
initialize this class with directories `/web/upload/a, /web/upload/b` and files 
`/web/different/path/file` where the files are somewhere else located. Due to 
this, we not only need to delete the directories but also all files.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519006#comment-16519006
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197029723
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java 
---
@@ -103,77 +102,68 @@ protected void respondAsLeader(ChannelHandlerContext 
ctx, RoutedRequest routedRe
return;
}
 
-   ByteBuf msgContent = ((FullHttpRequest) 
httpRequest).content();
-
-   R request;
-   if (isFileUpload()) {
-   final Path path = 
ctx.channel().attr(FileUploadHandler.UPLOADED_FILE).get();
-   if (path == null) {
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   httpRequest,
-   new ErrorResponseBody("Client 
did not upload a file."),
-   HttpResponseStatus.BAD_REQUEST,
-   responseHeaders);
-   return;
-   }
-   //noinspection unchecked
-   request = (R) new FileUpload(path);
-   } else if (msgContent.capacity() == 0) {
-   try {
-   request = MAPPER.readValue("{}", 
untypedResponseMessageHeaders.getRequestClass());
-   } catch (JsonParseException | 
JsonMappingException je) {
-   log.error("Request did not conform to 
expected format.", je);
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   httpRequest,
-   new ErrorResponseBody("Bad 
request received."),
-   HttpResponseStatus.BAD_REQUEST,
-   responseHeaders);
-   return;
+   final ByteBuf msgContent = ((FullHttpRequest) 
httpRequest).content();
+
+   try (FileUploads uploadedFiles = 
FileUploadHandler.getMultipartFileUploads(ctx)) {
+
+   R request;
+   if (msgContent.capacity() == 0) {
+   try {
+   request = 
MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
+   } catch (JsonParseException | 
JsonMappingException je) {
+   log.error("Request did not 
conform to expected format.", je);
+   HandlerUtils.sendErrorResponse(
+   ctx,
+   httpRequest,
+   new 
ErrorResponseBody("Bad request received."),
+   
HttpResponseStatus.BAD_REQUEST,
+   responseHeaders);
+   return;
+   }
+   } else {
+   try {
+   ByteBufInputStream in = new 
ByteBufInputStream(msgContent);
--- End diff --

The change is is only indendation, afaik there's already a PR for fixing 
this.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar 

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518905#comment-16518905
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r197008798
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java 
---
@@ -103,77 +102,68 @@ protected void respondAsLeader(ChannelHandlerContext 
ctx, RoutedRequest routedRe
return;
}
 
-   ByteBuf msgContent = ((FullHttpRequest) 
httpRequest).content();
-
-   R request;
-   if (isFileUpload()) {
-   final Path path = 
ctx.channel().attr(FileUploadHandler.UPLOADED_FILE).get();
-   if (path == null) {
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   httpRequest,
-   new ErrorResponseBody("Client 
did not upload a file."),
-   HttpResponseStatus.BAD_REQUEST,
-   responseHeaders);
-   return;
-   }
-   //noinspection unchecked
-   request = (R) new FileUpload(path);
-   } else if (msgContent.capacity() == 0) {
-   try {
-   request = MAPPER.readValue("{}", 
untypedResponseMessageHeaders.getRequestClass());
-   } catch (JsonParseException | 
JsonMappingException je) {
-   log.error("Request did not conform to 
expected format.", je);
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   httpRequest,
-   new ErrorResponseBody("Bad 
request received."),
-   HttpResponseStatus.BAD_REQUEST,
-   responseHeaders);
-   return;
+   final ByteBuf msgContent = ((FullHttpRequest) 
httpRequest).content();
+
+   try (FileUploads uploadedFiles = 
FileUploadHandler.getMultipartFileUploads(ctx)) {
+
+   R request;
+   if (msgContent.capacity() == 0) {
+   try {
+   request = 
MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass());
+   } catch (JsonParseException | 
JsonMappingException je) {
+   log.error("Request did not 
conform to expected format.", je);
+   HandlerUtils.sendErrorResponse(
+   ctx,
+   httpRequest,
+   new 
ErrorResponseBody("Bad request received."),
+   
HttpResponseStatus.BAD_REQUEST,
+   responseHeaders);
+   return;
+   }
+   } else {
+   try {
+   ByteBufInputStream in = new 
ByteBufInputStream(msgContent);
--- End diff --

I would suggest to use try-with-resource to make sure to close `in`.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the 

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518360#comment-16518360
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196868266
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest extends TestLogger {
+
+   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static RestServerEndpoint serverEndpoint;
+   private static String serverAddress;
+
+   private static MultipartMixedHandler mixedHandler;
+   private static MultipartJsonHandler jsonHandler;
+   private static MultipartFileHandler fileHandler;
+   private static File file1;
+   private static File file2;
+
+   

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518333#comment-16518333
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196849558
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest extends TestLogger {
+
+   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static RestServerEndpoint serverEndpoint;
+   private static String serverAddress;
+
+   private static MultipartMixedHandler mixedHandler;
+   private static MultipartJsonHandler jsonHandler;
+   private static MultipartFileHandler fileHandler;
+   private static File file1;
+   private static File file2;
+
+   

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518311#comment-16518311
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196836698
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   final Collection directories = new ArrayList<>(1);
+   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
+   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
+   if (Files.isDirectory(fileOrDirectory)) {
+   directories.add(fileOrDirectory);
+   FileAdderVisitor visitor = new 
FileAdderVisitor();
+   Files.walkFileTree(fileOrDirectory, visitor);
+   files.addAll(visitor.getContainedFiles());
+   } else {
+   files.add(fileOrDirectory);
+   }
--- End diff --

we don't have to, it's for testing convenience as noted in the class 
javadocs.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518310#comment-16518310
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196836575
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -70,51 +84,103 @@ public FileUploadHandler(final Path uploadDir) {
 
@Override
protected void channelRead0(final ChannelHandlerContext ctx, final 
HttpObject msg) throws Exception {
-   if (msg instanceof HttpRequest) {
-   final HttpRequest httpRequest = (HttpRequest) msg;
-   if (httpRequest.getMethod().equals(HttpMethod.POST)) {
-   if 
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
-   currentHttpPostRequestDecoder = new 
HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
-   currentHttpRequest = httpRequest;
+   try {
+   if (msg instanceof HttpRequest) {
+   final HttpRequest httpRequest = (HttpRequest) 
msg;
+   LOG.trace("Received request. URL:{} Method:{}", 
httpRequest.getUri(), httpRequest.getMethod());
+   if 
(httpRequest.getMethod().equals(HttpMethod.POST)) {
+   if 
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
+   currentHttpPostRequestDecoder = 
new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
+   currentHttpRequest = 
httpRequest;
+   currentUploadDir = 
Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString()));
+   } else {
+   ctx.fireChannelRead(msg);
+   }
} else {
ctx.fireChannelRead(msg);
}
+   } else if (msg instanceof HttpContent && 
currentHttpPostRequestDecoder != null) {
+   // make sure that we still have a upload dir in 
case that it got deleted in the meanwhile
+   RestServerEndpoint.createUploadDir(uploadDir, 
LOG);
+
+   final HttpContent httpContent = (HttpContent) 
msg;
+   
currentHttpPostRequestDecoder.offer(httpContent);
+
+   while (currentHttpPostRequestDecoder.hasNext()) 
{
+   final InterfaceHttpData data = 
currentHttpPostRequestDecoder.next();
+   if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.FileUpload) {
+   final DiskFileUpload fileUpload 
= (DiskFileUpload) data;
+   
checkState(fileUpload.isCompleted());
+
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
+   
fileUpload.renameTo(dest.toFile());
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = 
(Attribute) data;
+   // this could also be 
implemented by using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received 
unknown attribute {}.", data.getName());
+   
HandlerUtils.sendErrorResponse(
+   ctx,
+   
currentHttpRequest,
+   new 
ErrorResponseBody("Received unknown attribute " + data.getName() + '.'),
+   
HttpResponseStatus.BAD_REQUEST,
+   
Collections.emptyMap()
+   );
+   deleteUploadedFiles();
+ 

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518183#comment-16518183
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r196793648
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -162,13 +205,25 @@ public void shutdown(Time timeout) {
ByteBuf payload = 
Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
 
// create request and set headers
-   FullHttpRequest httpRequest = new 
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, 
messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
+   final FullHttpRequest httpRequest;
+   if (multipart) {
+   httpRequest = new 
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, 
messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl);
+   } else {
+   httpRequest = new 
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, 
messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
+   }
+
httpRequest.headers()
-   .add(HttpHeaders.Names.CONTENT_LENGTH, 
payload.capacity())
-   .add(HttpHeaders.Names.CONTENT_TYPE, 
RestConstants.REST_CONTENT_TYPE)
.set(HttpHeaders.Names.HOST, targetAddress + ':' + 
targetPort)
.set(HttpHeaders.Names.CONNECTION, 
HttpHeaders.Values.CLOSE);
 
+   if (!multipart) {
+   httpRequest.headers()
--- End diff --

wasn't sure whether this should also be done by the `RequestProcessor`


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518182#comment-16518182
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6189#discussion_r196793605
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -162,13 +205,25 @@ public void shutdown(Time timeout) {
ByteBuf payload = 
Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
 
// create request and set headers
-   FullHttpRequest httpRequest = new 
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, 
messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload);
+   final FullHttpRequest httpRequest;
+   if (multipart) {
--- End diff --

wasn't sure whether this should also be done by the `RequestProcessor`


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518180#comment-16518180
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6189

 [FLINK-9599][rest] RestClient supports FileUploads 

This PR is based on a squashed #6178.

## What is the purpose of the change

This PR extends the `RestClient` to allow sending multipart messages 
containing files and an optional json payload.

@tillrohrmann Regarding the previously discussed issue about 
´EMPTY_LAST_HTTP_CONTENT`, you can reproduce the issue by reverting the change 
to the `FileUploadHandler` and running the `RestClientMultipartTest`.

## Brief change log

* rework `FileUploadHandlerTest` into an abstract base class, to re-use 
classes for the `RestClient`


## Verifying this change

* add `RequestProcess` interface for hiding differences between 
non-/multipart messages
* refactor `RestClient#sendRequest` into `internalSendRequest` that allows 
passing a `RequestProcessor`
* see `RestClientMultipartTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9280_gamma

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6189.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6189


commit 97d7eaf2adbc8174025fafd32b3424f05bd27da1
Author: zentol 
Date:   2018-06-18T08:54:42Z

[FLINK-9599][rest] Implement generic mechanism to access uploaded files

commit 0ea85f1658c3ceca09a481e27392ed39b955d8bb
Author: zentol 
Date:   2018-06-19T07:45:09Z

[FLINK-9599][rest] RestClient supports FileUploads




> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518054#comment-16518054
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196739865
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   final Collection directories = new ArrayList<>(1);
+   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
+   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
+   if (Files.isDirectory(fileOrDirectory)) {
+   directories.add(fileOrDirectory);
+   FileAdderVisitor visitor = new 
FileAdderVisitor();
+   Files.walkFileTree(fileOrDirectory, visitor);
+   files.addAll(visitor.getContainedFiles());
+   } else {
+   files.add(fileOrDirectory);
+   }
+   }
+   directoriesToClean = 
Collections.unmodifiableCollection(directories);
+   uploadedFiles = Collections.unmodifiableCollection(files);
--- End diff --

I think it is better to move this logic out of the constructor. Adding 
logic to a constructor makes testing always difficult.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we 

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518055#comment-16518055
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196740578
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
 ---
@@ -39,15 +41,21 @@
 public class HandlerRequest {
 
private final R requestBody;
+   private final FileUploads uploadedFiles;
--- End diff --

This comment has not been addressed. I think the `HandlerRequest` should 
not know about the `FileUploads` because it can use to delete the files.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518056#comment-16518056
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196740018
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -70,51 +84,103 @@ public FileUploadHandler(final Path uploadDir) {
 
@Override
protected void channelRead0(final ChannelHandlerContext ctx, final 
HttpObject msg) throws Exception {
-   if (msg instanceof HttpRequest) {
-   final HttpRequest httpRequest = (HttpRequest) msg;
-   if (httpRequest.getMethod().equals(HttpMethod.POST)) {
-   if 
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
-   currentHttpPostRequestDecoder = new 
HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
-   currentHttpRequest = httpRequest;
+   try {
+   if (msg instanceof HttpRequest) {
+   final HttpRequest httpRequest = (HttpRequest) 
msg;
+   LOG.trace("Received request. URL:{} Method:{}", 
httpRequest.getUri(), httpRequest.getMethod());
+   if 
(httpRequest.getMethod().equals(HttpMethod.POST)) {
+   if 
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
+   currentHttpPostRequestDecoder = 
new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
+   currentHttpRequest = 
httpRequest;
+   currentUploadDir = 
Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString()));
+   } else {
+   ctx.fireChannelRead(msg);
+   }
} else {
ctx.fireChannelRead(msg);
}
+   } else if (msg instanceof HttpContent && 
currentHttpPostRequestDecoder != null) {
+   // make sure that we still have a upload dir in 
case that it got deleted in the meanwhile
+   RestServerEndpoint.createUploadDir(uploadDir, 
LOG);
+
+   final HttpContent httpContent = (HttpContent) 
msg;
+   
currentHttpPostRequestDecoder.offer(httpContent);
+
+   while (currentHttpPostRequestDecoder.hasNext()) 
{
+   final InterfaceHttpData data = 
currentHttpPostRequestDecoder.next();
+   if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.FileUpload) {
+   final DiskFileUpload fileUpload 
= (DiskFileUpload) data;
+   
checkState(fileUpload.isCompleted());
+
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
+   
fileUpload.renameTo(dest.toFile());
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = 
(Attribute) data;
+   // this could also be 
implemented by using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received 
unknown attribute {}.", data.getName());
+   
HandlerUtils.sendErrorResponse(
+   ctx,
+   
currentHttpRequest,
+   new 
ErrorResponseBody("Received unknown attribute " + data.getName() + '.'),
+   
HttpResponseStatus.BAD_REQUEST,
+   
Collections.emptyMap()
+   );
+   deleteUploadedFiles();
+   

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518053#comment-16518053
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196739603
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 ---
@@ -70,9 +70,14 @@ protected AbstractRestHandler(
}
 
@Override
-   protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest 
httpRequest, HandlerRequest handlerRequest, T gateway) throws 
RestHandlerException {
+   protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest 
httpRequest, HandlerRequest handlerRequest, T gateway) {
CompletableFuture response;
 
+   if (!messageHeaders.acceptsFileUploads() && 
!handlerRequest.getUploadedFiles().isEmpty()) {
+   processRestHandlerException(ctx, httpRequest, new 
RestHandlerException("File uploads not allowed.", 
HttpResponseStatus.BAD_REQUEST));
+   return;
+   }
--- End diff --

Shouldn't this be moved to the `AbstractHandler`?


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518058#comment-16518058
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196741255
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest extends TestLogger {
+
+   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static RestServerEndpoint serverEndpoint;
+   private static String serverAddress;
+
+   private static MultipartMixedHandler mixedHandler;
+   private static MultipartJsonHandler jsonHandler;
+   private static MultipartFileHandler fileHandler;
+   private static File file1;
+   private static File file2;
+
+   

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518059#comment-16518059
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196741065
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest extends TestLogger {
+
+   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static RestServerEndpoint serverEndpoint;
+   private static String serverAddress;
+
+   private static MultipartMixedHandler mixedHandler;
+   private static MultipartJsonHandler jsonHandler;
+   private static MultipartFileHandler fileHandler;
+   private static File file1;
+   private static File file2;
+
+   

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518057#comment-16518057
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196740407
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   final Collection directories = new ArrayList<>(1);
+   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
+   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
+   if (Files.isDirectory(fileOrDirectory)) {
+   directories.add(fileOrDirectory);
+   FileAdderVisitor visitor = new 
FileAdderVisitor();
+   Files.walkFileTree(fileOrDirectory, visitor);
+   files.addAll(visitor.getContainedFiles());
+   } else {
+   files.add(fileOrDirectory);
+   }
--- End diff --

Do we have to allow that we can specify files and directories alike? Why 
not requiring that you have to provide a upload directory which contains all 
uploaded files. This makes the whole clean up logic easier.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517983#comment-16517983
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6178
  
@tillrohrmann I believe I've addressed all your comments.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517971#comment-16517971
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196712002
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -95,14 +107,22 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
checkState(fileUpload.isCompleted());
 
-   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
-   "_" + 
fileUpload.getFilename()));
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
fileUpload.renameTo(dest.toFile());
-   
ctx.channel().attr(UPLOADED_FILE).set(dest);
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = (Attribute) 
data;
+   // this could also be implemented by 
using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received unknown 
attribute {}, will be ignored.", data.getName());
+   }
}
}
 
if (httpContent instanceof LastHttpContent) {
+   ctx.channel().attr(UPLOADED_FILES).set(new 
FileUploads(Collections.singleton(currentUploadDir)));
+   
ctx.channel().attr(UPLOADED_JSON).set(currentJsonPayload);
--- End diff --

well look at that, it _actually works_


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517952#comment-16517952
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196701452
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -116,5 +136,16 @@ private void reset() {
currentHttpPostRequestDecoder.destroy();
currentHttpPostRequestDecoder = null;
currentHttpRequest = null;
+   currentUploadDir = null;
+   currentJsonPayload = null;
+   }
+
+   public static Optional 
getMultipartJsonPayload(ChannelHandlerContext ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get());
+   }
+
+   public static FileUploads getMultipartFileUploads(ChannelHandlerContext 
ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get())
+   .orElse(FileUploads.EMPTY);
--- End diff --

I think not much. My gut feeling is just that `FileUploads` can be 
simplified. Instead of having our own FileVisitor, we could simply call 
`FileUtils.deleteDirectory(uploadDirectory)`. And I think  this class has 
actually two responsibilities: Listing all files to make them accessible and 
storing the directories in which they reside to delete them afterwards.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517950#comment-16517950
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196699736
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -72,10 +82,12 @@ public FileUploadHandler(final Path uploadDir) {
protected void channelRead0(final ChannelHandlerContext ctx, final 
HttpObject msg) throws Exception {
if (msg instanceof HttpRequest) {
final HttpRequest httpRequest = (HttpRequest) msg;
+   LOG.trace("Received request. URL:{} Method:{}", 
httpRequest.getUri(), httpRequest.getMethod());
if (httpRequest.getMethod().equals(HttpMethod.POST)) {
if 
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
currentHttpPostRequestDecoder = new 
HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
currentHttpRequest = httpRequest;
+   currentUploadDir = 
Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString()));
--- End diff --

I think we should add this.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517946#comment-16517946
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196698434
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -72,10 +82,12 @@ public FileUploadHandler(final Path uploadDir) {
protected void channelRead0(final ChannelHandlerContext ctx, final 
HttpObject msg) throws Exception {
if (msg instanceof HttpRequest) {
final HttpRequest httpRequest = (HttpRequest) msg;
+   LOG.trace("Received request. URL:{} Method:{}", 
httpRequest.getUri(), httpRequest.getMethod());
if (httpRequest.getMethod().equals(HttpMethod.POST)) {
if 
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
currentHttpPostRequestDecoder = new 
HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
currentHttpRequest = httpRequest;
+   currentUploadDir = 
Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString()));
--- End diff --

If the `FileUploadHandler` itself fail it isn't cleaned up, but that was 
already the case in the existing code. The handler is generally rather _light_ 
when it comes to failure handling (i.e. it doesn't do anything in that regard).


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517940#comment-16517940
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196696248
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -116,5 +136,16 @@ private void reset() {
currentHttpPostRequestDecoder.destroy();
currentHttpPostRequestDecoder = null;
currentHttpRequest = null;
+   currentUploadDir = null;
+   currentJsonPayload = null;
+   }
+
+   public static Optional 
getMultipartJsonPayload(ChannelHandlerContext ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get());
+   }
+
+   public static FileUploads getMultipartFileUploads(ChannelHandlerContext 
ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get())
+   .orElse(FileUploads.EMPTY);
--- End diff --

How does this differ to the current implementation? Are you suggesting to 
simplify `FileUploads` to only consider the case of 1 directory containing N 
files?


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517935#comment-16517935
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196695325
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -72,10 +82,12 @@ public FileUploadHandler(final Path uploadDir) {
protected void channelRead0(final ChannelHandlerContext ctx, final 
HttpObject msg) throws Exception {
if (msg instanceof HttpRequest) {
final HttpRequest httpRequest = (HttpRequest) msg;
+   LOG.trace("Received request. URL:{} Method:{}", 
httpRequest.getUri(), httpRequest.getMethod());
if (httpRequest.getMethod().equals(HttpMethod.POST)) {
if 
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
currentHttpPostRequestDecoder = new 
HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
currentHttpRequest = httpRequest;
+   currentUploadDir = 
Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString()));
--- End diff --

How do we clean up the `currentUploadDir` in case of a failure?


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517916#comment-16517916
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196689843
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -116,5 +136,16 @@ private void reset() {
currentHttpPostRequestDecoder.destroy();
currentHttpPostRequestDecoder = null;
currentHttpRequest = null;
+   currentUploadDir = null;
+   currentJsonPayload = null;
+   }
+
+   public static Optional 
getMultipartJsonPayload(ChannelHandlerContext ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get());
+   }
+
+   public static FileUploads getMultipartFileUploads(ChannelHandlerContext 
ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get())
+   .orElse(FileUploads.EMPTY);
--- End diff --

True, then I would suggest to only store the directory in the `FileUploads` 
and adding a method to retrieve all uploaded files.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517896#comment-16517896
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196681937
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest {
+
+   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static RestServerEndpoint serverEndpoint;
+   private static String serverAddress;
+
+   private static MultipartMixedHandler mixedHandler;
+   private static MultipartJsonHandler 

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517861#comment-16517861
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196667391
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -95,14 +107,22 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
checkState(fileUpload.isCompleted());
 
-   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
-   "_" + 
fileUpload.getFilename()));
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
fileUpload.renameTo(dest.toFile());
-   
ctx.channel().attr(UPLOADED_FILE).set(dest);
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = (Attribute) 
data;
+   // this could also be implemented by 
using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received unknown 
attribute {}, will be ignored.", data.getName());
--- End diff --

yes.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517860#comment-16517860
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196667307
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -116,5 +136,16 @@ private void reset() {
currentHttpPostRequestDecoder.destroy();
currentHttpPostRequestDecoder = null;
currentHttpRequest = null;
+   currentUploadDir = null;
+   currentJsonPayload = null;
+   }
+
+   public static Optional 
getMultipartJsonPayload(ChannelHandlerContext ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get());
+   }
+
+   public static FileUploads getMultipartFileUploads(ChannelHandlerContext 
ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get())
+   .orElse(FileUploads.EMPTY);
--- End diff --

How files are stored is an implementation detail of the 
`FileUploadHandler`, why would we expose this to subsequent handlers?


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517858#comment-16517858
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196667205
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -95,14 +107,22 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
checkState(fileUpload.isCompleted());
 
-   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
-   "_" + 
fileUpload.getFilename()));
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
fileUpload.renameTo(dest.toFile());
-   
ctx.channel().attr(UPLOADED_FILE).set(dest);
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = (Attribute) 
data;
+   // this could also be implemented by 
using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received unknown 
attribute {}, will be ignored.", data.getName());
+   }
}
}
 
if (httpContent instanceof LastHttpContent) {
+   ctx.channel().attr(UPLOADED_FILES).set(new 
FileUploads(Collections.singleton(currentUploadDir)));
+   
ctx.channel().attr(UPLOADED_JSON).set(currentJsonPayload);
--- End diff --

I can try this (it would be neat to remove the special case in 
`AbstractHandler`, but I'm wondering whether we can "simply" replace the 
payload of the multipart request (as identified by the headers that we also 
forward) with plain json.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517490#comment-16517490
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196559093
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest {
--- End diff --

Missing `extends TestLogger`


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517480#comment-16517480
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196556101
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Tests for {@link FileUploads}.
+ */
+public class FileUploadsTest {
--- End diff --

`extends TestLogger` is missing


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517479#comment-16517479
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196452418
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -116,5 +136,16 @@ private void reset() {
currentHttpPostRequestDecoder.destroy();
currentHttpPostRequestDecoder = null;
currentHttpRequest = null;
+   currentUploadDir = null;
+   currentJsonPayload = null;
+   }
+
+   public static Optional 
getMultipartJsonPayload(ChannelHandlerContext ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get());
+   }
--- End diff --

By sending the json payload down stream, we could avoid having this method.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517492#comment-16517492
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196557260
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
 ---
@@ -39,15 +41,21 @@
 public class HandlerRequest {
 
private final R requestBody;
+   private final FileUploads uploadedFiles;
--- End diff --

This could also be a `Collection`


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517481#comment-16517481
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196452583
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -116,5 +136,16 @@ private void reset() {
currentHttpPostRequestDecoder.destroy();
currentHttpPostRequestDecoder = null;
currentHttpRequest = null;
+   currentUploadDir = null;
+   currentJsonPayload = null;
+   }
+
+   public static Optional 
getMultipartJsonPayload(ChannelHandlerContext ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get());
+   }
+
+   public static FileUploads getMultipartFileUploads(ChannelHandlerContext 
ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get())
+   .orElse(FileUploads.EMPTY);
--- End diff --

I would suggest to simply return the upload directory.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517477#comment-16517477
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196452755
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -95,14 +107,22 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
checkState(fileUpload.isCompleted());
 
-   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
-   "_" + 
fileUpload.getFilename()));
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
fileUpload.renameTo(dest.toFile());
-   
ctx.channel().attr(UPLOADED_FILE).set(dest);
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = (Attribute) 
data;
+   // this could also be implemented by 
using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received unknown 
attribute {}, will be ignored.", data.getName());
--- End diff --

Should we rather fail?


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517478#comment-16517478
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196453235
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -95,14 +107,22 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
checkState(fileUpload.isCompleted());
 
-   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
-   "_" + 
fileUpload.getFilename()));
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
fileUpload.renameTo(dest.toFile());
-   
ctx.channel().attr(UPLOADED_FILE).set(dest);
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = (Attribute) 
data;
+   // this could also be implemented by 
using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received unknown 
attribute {}, will be ignored.", data.getName());
+   }
}
}
 
if (httpContent instanceof LastHttpContent) {
+   ctx.channel().attr(UPLOADED_FILES).set(new 
FileUploads(Collections.singleton(currentUploadDir)));
--- End diff --

I would suggest to simply store the upload directory in the `UPLOAD_FILES` 
attribute.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517486#comment-16517486
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196555487
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   final Collection directories = new ArrayList<>(1);
+   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
+   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
+   if (Files.isDirectory(fileOrDirectory)) {
+   directories.add(fileOrDirectory);
+   FileAdderVisitor visitor = new 
FileAdderVisitor();
+   Files.walkFileTree(fileOrDirectory, visitor);
+   files.addAll(visitor.get());
+   } else {
+   files.add(fileOrDirectory);
+   }
+   }
+   directoriesToClean = 
Collections.unmodifiableCollection(directories);
+   uploadedFiles = Collections.unmodifiableCollection(files);
+   }
+
+   public Collection getUploadedFiles() {
+   return uploadedFiles;
+   }
+
+   @Override
+   public void close() throws IOException {
+   for (Path file : uploadedFiles) {
+   try {
+   Files.delete(file);
+   } catch (FileNotFoundException ignored) {
+   // file may have been moved by a handler
+   }
+   }
+   for (Path directory : directoriesToClean) {
+   Files.walkFileTree(directory, CleanupFileVisitor.get());
+   }
+   }
+
+   private static final class FileAdderVisitor extends 
SimpleFileVisitor {
+
+   private final Collection files = new ArrayList<>(4);
+
+   Collection get() {
+   return files;
+   }
+
+   FileAdderVisitor() {
+   }
+
+   @Override
+   public FileVisitResult visitFile(Path file, BasicFileAttributes 
attrs) throws IOException {
+   FileVisitResult result = super.visitFile(file, attrs);
+   files.add(file);
+   return result;
+   }
+   }

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517483#comment-16517483
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196455211
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java 
---
@@ -103,77 +104,74 @@ protected void respondAsLeader(ChannelHandlerContext 
ctx, RoutedRequest routedRe
return;
}
 
-   ByteBuf msgContent = ((FullHttpRequest) 
httpRequest).content();
+   final ByteBuf msgContent;
+   Optional multipartJsonPayload = 
FileUploadHandler.getMultipartJsonPayload(ctx);
+   if (multipartJsonPayload.isPresent()) {
+   msgContent = 
Unpooled.wrappedBuffer(multipartJsonPayload.get());
+   } else {
+   msgContent = ((FullHttpRequest) 
httpRequest).content();
+   }
 
-   R request;
-   if (isFileUpload()) {
-   final Path path = 
ctx.channel().attr(FileUploadHandler.UPLOADED_FILE).get();
-   if (path == null) {
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   httpRequest,
-   new ErrorResponseBody("Client 
did not upload a file."),
-   HttpResponseStatus.BAD_REQUEST,
-   responseHeaders);
-   return;
-   }
-   //noinspection unchecked
-   request = (R) new FileUpload(path);
-   } else if (msgContent.capacity() == 0) {
-   try {
-   request = MAPPER.readValue("{}", 
untypedResponseMessageHeaders.getRequestClass());
-   } catch (JsonParseException | 
JsonMappingException je) {
-   log.error("Request did not conform to 
expected format.", je);
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   httpRequest,
-   new ErrorResponseBody("Bad 
request received."),
-   HttpResponseStatus.BAD_REQUEST,
-   responseHeaders);
-   return;
+   try (FileUploads uploadedFiles = 
FileUploadHandler.getMultipartFileUploads(ctx)) {
--- End diff --

I would obtain the upload directory from `FileUploadHandler` and simply 
delete this directory after the call has been processed. We could, then also 
create `FileUploads` outside of the `FileUploadHandler` to instantiate a 
`HandlerRequest` with it. This would also simplify the `FileUploads` class 
significantly, because it is no longer responsible for deleting the files.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517487#comment-16517487
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r19670
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   final Collection directories = new ArrayList<>(1);
+   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
+   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
+   if (Files.isDirectory(fileOrDirectory)) {
+   directories.add(fileOrDirectory);
+   FileAdderVisitor visitor = new 
FileAdderVisitor();
+   Files.walkFileTree(fileOrDirectory, visitor);
+   files.addAll(visitor.get());
+   } else {
+   files.add(fileOrDirectory);
+   }
+   }
+   directoriesToClean = 
Collections.unmodifiableCollection(directories);
+   uploadedFiles = Collections.unmodifiableCollection(files);
+   }
+
+   public Collection getUploadedFiles() {
+   return uploadedFiles;
+   }
+
+   @Override
+   public void close() throws IOException {
+   for (Path file : uploadedFiles) {
+   try {
+   Files.delete(file);
+   } catch (FileNotFoundException ignored) {
+   // file may have been moved by a handler
+   }
+   }
+   for (Path directory : directoriesToClean) {
+   Files.walkFileTree(directory, CleanupFileVisitor.get());
+   }
+   }
+
+   private static final class FileAdderVisitor extends 
SimpleFileVisitor {
+
+   private final Collection files = new ArrayList<>(4);
+
+   Collection get() {
--- End diff --

maybe more descriptive name than `get`.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: 

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517491#comment-16517491
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196558949
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
 ---
@@ -63,4 +63,13 @@
 * @return description for the header
 */
String getDescription();
+
+   /**
+* Returns whether this header allows file uploads.
+*
+* @return whether this header allows file uploads
+*/
+   default boolean acceptsFileUploads() {
+   return false;
+   }
--- End diff --

Should this maybe go into `UntypedResponseMessageHeaders`? At the moment 
one can upload files for a `AbstractHandler` (e.g. 
`AbstractTaskManagerFileHandler`) implementation and also has access to it via 
the `HandlerRequest` without being able to specify whether file upload is 
allowed or not. 


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517488#comment-16517488
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196559335
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest {
+
+   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static RestServerEndpoint serverEndpoint;
+   private static String serverAddress;
+
+   private static MultipartMixedHandler mixedHandler;
+   private static 

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517485#comment-16517485
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196554025
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java 
---
@@ -103,77 +104,74 @@ protected void respondAsLeader(ChannelHandlerContext 
ctx, RoutedRequest routedRe
return;
}
 
-   ByteBuf msgContent = ((FullHttpRequest) 
httpRequest).content();
+   final ByteBuf msgContent;
+   Optional multipartJsonPayload = 
FileUploadHandler.getMultipartJsonPayload(ctx);
+   if (multipartJsonPayload.isPresent()) {
+   msgContent = 
Unpooled.wrappedBuffer(multipartJsonPayload.get());
--- End diff --

Let's send the Json payload as a proper `HttpRequest`, then we don't have 
this special casing here.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517484#comment-16517484
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196453980
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
 ---
@@ -129,4 +137,9 @@ public R getRequestBody() {
return queryParameter.getValue();
}
}
+
+   @Nonnull
+   public FileUploads getFileUploads() {
+   return uploadedFiles;
+   }
--- End diff --

I would not expose `FileUploads` to the user but rather return a 
`Collection`.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517493#comment-16517493
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196560163
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest {
+
+   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static RestServerEndpoint serverEndpoint;
+   private static String serverAddress;
+
+   private static MultipartMixedHandler mixedHandler;
+   private static 

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517482#comment-16517482
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196453584
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   final Collection directories = new ArrayList<>(1);
+   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
+   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
+   if (Files.isDirectory(fileOrDirectory)) {
+   directories.add(fileOrDirectory);
+   FileAdderVisitor visitor = new 
FileAdderVisitor();
+   Files.walkFileTree(fileOrDirectory, visitor);
+   files.addAll(visitor.get());
+   } else {
+   files.add(fileOrDirectory);
+   }
+   }
+   directoriesToClean = 
Collections.unmodifiableCollection(directories);
+   uploadedFiles = Collections.unmodifiableCollection(files);
--- End diff --

Let's move this logic out of `FileUploads` and simply initialize it with a 
`Collection`.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517476#comment-16517476
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196452024
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -95,14 +107,22 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
checkState(fileUpload.isCompleted());
 
-   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
-   "_" + 
fileUpload.getFilename()));
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
fileUpload.renameTo(dest.toFile());
-   
ctx.channel().attr(UPLOADED_FILE).set(dest);
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = (Attribute) 
data;
+   // this could also be implemented by 
using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received unknown 
attribute {}, will be ignored.", data.getName());
+   }
}
}
 
if (httpContent instanceof LastHttpContent) {
+   ctx.channel().attr(UPLOADED_FILES).set(new 
FileUploads(Collections.singleton(currentUploadDir)));
+   
ctx.channel().attr(UPLOADED_JSON).set(currentJsonPayload);
--- End diff --

I think it would be better to not store the JSON payload as an `Attribute` 
but instead forward it via 
`httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload))`.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16515915#comment-16515915
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196129193
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   final Collection directories = new ArrayList<>(1);
+   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
+   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
+   if (Files.isDirectory(fileOrDirectory)) {
+   directories.add(fileOrDirectory);
+   FileAdderVisitor visitor = new 
FileAdderVisitor();
--- End diff --

This is probably more complex than really necessary; there is no use-case 
for a _nested_ directory structure.


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16515744#comment-16515744
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196090706
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -52,7 +57,10 @@
 
--- End diff --

should maybe rename the class to `MultipartRequestHandler`


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16515713#comment-16515713
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196081080
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest {
--- End diff --

needs a test for
* multiple files
* name of uploaded file should be what is specified in the request


> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
> 

[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest

2018-06-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16515689#comment-16515689
 ] 

ASF GitHub Bot commented on FLINK-9599:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6178

[FLINK-9599][rest] Implement generic mechanism to access uploaded files

## What is the purpose of the change

This PR extends the existing multipart handling to also support mixed 
multipart message (i.e. requests  containing both JSON and files), and 
generalizes the `FileUpload` handling to provide access to all handlers 
extending `AbstractHandler`

Handlers may access uploaded files via `HandlerRequest#getFileUploads`. 
File uploads must be explicitly allowed by returning true in 
`MessageHeaders#acceptsFileUploads`.
The files and JSON payload are forwarded to the handler by the 
`FileUploadHandler` via channel attributes. If a JSON payload is forwarded this 
way a handler will ignore the content of the received `HttpRequest`.

This PR only covers the server-side; the `RestClient` remains unchanged. 
This will be done in a follow-up.

## Brief change log
* add `MessageHeaders#acceptsFileUploads` to signal that a handler accepts 
file uploads (default=false)
* add `FileUploads` class as a container for uploaded files
* extend `FileUploadHandler` to
  * accept multiple files in one request
  * also accept a JSON payload
  * forward both files and json via channel attributes
* extend `AbstractHandler` to retrieve files/json from channel attributes
  * remove special case for `JarRunHandler` in `AbstractHandler`
* extend `HandlerRequest` to accept a `FileUploads` object
* add `OkHttp` dependency to `flink-runtime` for testing purposes
* update `JarUploadHandler/Headers`

## Verifying this change

added tests:
* FileUploadsTest
* FileUploadHandlerTest
* manually verified via WebUI job submission

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9280_beta

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6178.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6178


commit c2adb3880180d8426697c33035d2b31d57d93952
Author: zentol 
Date:   2018-06-18T08:54:42Z

[FLINK-9599][rest] Implement generic mechanism to access uploaded files




> Implement generic mechanism to receive files via rest
> -
>
> Key: FLINK-9599
> URL: https://issues.apache.org/jira/browse/FLINK-9599
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)