[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520233#comment-16520233 ] Chesnay Schepler commented on FLINK-9599: - client: master: 181559d5bcad50f919e95c7602057b929553f76b 1.5: c78e99007cb9dda3296b10e712e9de5f7d116cff > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520140#comment-16520140 ] ASF GitHub Bot commented on FLINK-9599: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6189 > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16520054#comment-16520054 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/6189 merging. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0, 1.5.1 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519614#comment-16519614 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197217367 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -184,7 +206,64 @@ public void shutdown(Time timeout) { return submitRequest(targetAddress, targetPort, httpRequest, responseType); } - private CompletableFuture submitRequest(String targetAddress, int targetPort, FullHttpRequest httpRequest, JavaType responseType) { + private static Request createRequest(String targetAddress, int targetPort, String targetUrl, HttpMethodWrapper httpMethod, ByteBuf jsonPayload, Collection fileUploads) throws IOException { --- End diff -- Could we combine `targetAddress` and `targetPort` into `targetAddressPort = targetAddress + ':' + targetPort`? > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519616#comment-16519616 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197217460 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -184,7 +206,64 @@ public void shutdown(Time timeout) { return submitRequest(targetAddress, targetPort, httpRequest, responseType); } - private CompletableFuture submitRequest(String targetAddress, int targetPort, FullHttpRequest httpRequest, JavaType responseType) { + private static Request createRequest(String targetAddress, int targetPort, String targetUrl, HttpMethodWrapper httpMethod, ByteBuf jsonPayload, Collection fileUploads) throws IOException { --- End diff -- We could directly pass in `httpMethod.getNettyHttpMethod()` instead of the wrapper. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519617#comment-16519617 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197217551 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -239,6 +322,45 @@ public void shutdown(Time timeout) { return responseFuture; } + private interface Request { + void writeTo(Channel channel) throws IOException; + } + + private static final class SimpleRequest implements Request { + private final HttpRequest httpRequest; + + SimpleRequest(HttpRequest httpRequest) { + this.httpRequest = httpRequest; + } + + @Override + public void writeTo(Channel channel) { + channel.writeAndFlush(httpRequest); + } + } + + private static final class MultipartRequest implements Request { + private final HttpRequest httpRequest; + private final HttpPostRequestEncoder bodyRequestEncoder; + + MultipartRequest(HttpRequest httpRequest, HttpPostRequestEncoder bodyRequestEncoder) { + this.httpRequest = httpRequest; + this.bodyRequestEncoder = bodyRequestEncoder; + } + + @Override + public void writeTo(Channel channel) { + channel.writeAndFlush(httpRequest); + // this should never be false as we explicitly set the encoder to use multipart messages + if (bodyRequestEncoder.isChunked()) { + channel.writeAndFlush(bodyRequestEncoder); + } + + // release data and remove temporary files if they were created + bodyRequestEncoder.cleanFiles(); + } + } --- End diff -- Nice, this looks now really sleek > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519550#comment-16519550 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/6189 I've rebased the PR. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519507#comment-16519507 ] ASF GitHub Bot commented on FLINK-9599: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6178 > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519393#comment-16519393 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197142993 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -106,7 +106,7 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms final HttpContent httpContent = (HttpContent) msg; currentHttpPostRequestDecoder.offer(httpContent); - while (currentHttpPostRequestDecoder.hasNext()) { + while (httpContent != LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) { --- End diff -- As far as i can tell we are using the encoder correctly, but the decoder usage wasn't written against the encoder, but (i guess) only against `curl` or the web UI that never send an empty LAST_HTTP_CONTENT, but a `DefaultLastHttpContent` instead. Interestingly enough, if you use anything but the netty encoder, without an `instanceof LastHttpContent` check it isn't possible to know whether the decoder is done or not. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519380#comment-16519380 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197138450 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -106,7 +106,7 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms final HttpContent httpContent = (HttpContent) msg; currentHttpPostRequestDecoder.offer(httpContent); - while (currentHttpPostRequestDecoder.hasNext()) { + while (httpContent != LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) { --- End diff -- But we are using the `HttpPostRequestEncoder`, right? So do we use it maybe wrongly? > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519305#comment-16519305 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/6178 will change the logging and merge afterwards. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519270#comment-16519270 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197106285 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -106,7 +106,7 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms final HttpContent httpContent = (HttpContent) msg; currentHttpPostRequestDecoder.offer(httpContent); - while (currentHttpPostRequestDecoder.hasNext()) { + while (httpContent != LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) { --- End diff -- The `HttpPostRequestEncoder` always sends an `EMPTY_LAST_HTTP_CONTENT` to mark the end of the requests, and the decoder handles it as described. It could be that the decoder is not actually a general-purpose decoder but explicitly written as a counterpart for the encoder. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519239#comment-16519239 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197098273 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -106,7 +106,7 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms final HttpContent httpContent = (HttpContent) msg; currentHttpPostRequestDecoder.offer(httpContent); - while (currentHttpPostRequestDecoder.hasNext()) { + while (httpContent != LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) { --- End diff -- Why is an `EMPTY_LAST_HTTP_CONTENT` sent after the processing of the HTTP content has been completed? Are we doing something wrong on the client side? Is it a bug in Netty? Before adding these kind of fixes, I would really like to completely understand why this is happening. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519232#comment-16519232 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197095206 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -106,7 +106,7 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms final HttpContent httpContent = (HttpContent) msg; currentHttpPostRequestDecoder.offer(httpContent); - while (currentHttpPostRequestDecoder.hasNext()) { + while (httpContent != LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) { --- End diff -- When you have passed an `EMPTY_LAST_HTTP_CONTENT` to the decoder the entire content of the message was already processsed. Calling `hasNext()` on a decoder that has already processed the entire content throws the exception. This was done to differentiate between "I don't have more data _right now_." and "I will never have more date.". We manually do this by checking `instanceof LastHttpContent`. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519211#comment-16519211 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r197086869 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -151,18 +143,26 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms ctx.fireChannelRead(msg); } } catch (Exception e) { - HttpRequest tmpRequest = currentHttpRequest; - deleteUploadedFiles(); - reset(); - LOG.warn("Internal server error. File upload failed.", e); - HandlerUtils.sendErrorResponse( - ctx, - tmpRequest, - new ErrorResponseBody("File upload failed."), - HttpResponseStatus.INTERNAL_SERVER_ERROR, - Collections.emptyMap() - ); + handleError(ctx, "File upload failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, e); + } + } + + private void handleError(ChannelHandlerContext ctx, String errorMessage, HttpResponseStatus responseStatus, @Nullable Throwable e) { + HttpRequest tmpRequest = currentHttpRequest; + deleteUploadedFiles(); + reset(); + if (e == null) { + LOG.warn(errorMessage); + } else { + LOG.warn(errorMessage, e); --- End diff -- I don't think so. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519206#comment-16519206 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r197085237 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -151,18 +143,26 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms ctx.fireChannelRead(msg); } } catch (Exception e) { - HttpRequest tmpRequest = currentHttpRequest; - deleteUploadedFiles(); - reset(); - LOG.warn("Internal server error. File upload failed.", e); - HandlerUtils.sendErrorResponse( - ctx, - tmpRequest, - new ErrorResponseBody("File upload failed."), - HttpResponseStatus.INTERNAL_SERVER_ERROR, - Collections.emptyMap() - ); + handleError(ctx, "File upload failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, e); + } + } + + private void handleError(ChannelHandlerContext ctx, String errorMessage, HttpResponseStatus responseStatus, @Nullable Throwable e) { + HttpRequest tmpRequest = currentHttpRequest; + deleteUploadedFiles(); + reset(); + if (e == null) { + LOG.warn(errorMessage); + } else { + LOG.warn(errorMessage, e); --- End diff -- but this will also print "null", won't it? That's what i was trying to avoid here. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519182#comment-16519182 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r197078502 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -151,18 +143,26 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms ctx.fireChannelRead(msg); } } catch (Exception e) { - HttpRequest tmpRequest = currentHttpRequest; - deleteUploadedFiles(); - reset(); - LOG.warn("Internal server error. File upload failed.", e); - HandlerUtils.sendErrorResponse( - ctx, - tmpRequest, - new ErrorResponseBody("File upload failed."), - HttpResponseStatus.INTERNAL_SERVER_ERROR, - Collections.emptyMap() - ); + handleError(ctx, "File upload failed.", HttpResponseStatus.INTERNAL_SERVER_ERROR, e); + } + } + + private void handleError(ChannelHandlerContext ctx, String errorMessage, HttpResponseStatus responseStatus, @Nullable Throwable e) { + HttpRequest tmpRequest = currentHttpRequest; + deleteUploadedFiles(); + reset(); + if (e == null) { + LOG.warn(errorMessage); + } else { + LOG.warn(errorMessage, e); --- End diff -- I think we don't have to make this distinction here. `LOG.warn(errorMessage, null)` should also work. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519177#comment-16519177 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197077293 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -106,7 +106,7 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms final HttpContent httpContent = (HttpContent) msg; currentHttpPostRequestDecoder.offer(httpContent); - while (currentHttpPostRequestDecoder.hasNext()) { + while (httpContent != LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) { --- End diff -- Why does it fail otherwise? > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519175#comment-16519175 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197076585 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -106,7 +106,7 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms final HttpContent httpContent = (HttpContent) msg; currentHttpPostRequestDecoder.offer(httpContent); - while (currentHttpPostRequestDecoder.hasNext()) { + while (httpContent != LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) { --- End diff -- because it fails otherwise, see https://issues.apache.org/jira/browse/FLINK-9500. See PR description on how to reproduce it. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519165#comment-16519165 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197073186 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/MultipartUploadTestBase.java --- @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test base for verifying support of multipart uploads via REST. + */ +public abstract class MultipartUploadTestBase extends TestLogger { --- End diff -- I'm not a huge fan of using inheritance in order to share code since it makes understanding tests quite hard. Instead one could create a `RestServerEndpointResource` or a `MultipartUploadResource` which extends `ExternalResource` and can be added via a `@Rule` or `@ClassRule`. See `MiniClusterResource` for an example. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519156#comment-16519156 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197062416 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -145,12 +157,43 @@ public void shutdown(Time timeout) { } } - public , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, R request) throws IOException { + public , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest( + String targetAddress, + int targetPort, + M messageHeaders, + U messageParameters, + R request) throws IOException { + return internalSendRequest(targetAddress, targetPort, messageHeaders, messageParameters, request, false, new DefaultProcessor()); + } + + public , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest( + String targetAddress, + int targetPort, + M messageHeaders, + U messageParameters, + R request, + Collection fileUploads) throws IOException { --- End diff -- Wrong indentation > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519152#comment-16519152 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197062458 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -145,12 +157,43 @@ public void shutdown(Time timeout) { } } - public , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, R request) throws IOException { + public , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest( + String targetAddress, + int targetPort, + M messageHeaders, + U messageParameters, + R request) throws IOException { + return internalSendRequest(targetAddress, targetPort, messageHeaders, messageParameters, request, false, new DefaultProcessor()); + } + + public , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest( + String targetAddress, + int targetPort, + M messageHeaders, + U messageParameters, + R request, + Collection fileUploads) throws IOException { + if (fileUploads.isEmpty()) { + return sendRequest(targetAddress, targetPort, messageHeaders, messageParameters, request); + } else { + return internalSendRequest(targetAddress, targetPort, messageHeaders, messageParameters, request, true, new MultipartProcessor(fileUploads)); + } + } + + private , U extends MessageParameters, R extends RequestBody, P extends ResponseBody, T> CompletableFuture internalSendRequest( + String targetAddress, + int targetPort, + M messageHeaders, + U messageParameters, + R request, + boolean multipart, + RequestProcessor requestProcessor) throws IOException { --- End diff -- Wrong indentation > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519157#comment-16519157 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197062491 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -145,12 +157,43 @@ public void shutdown(Time timeout) { } } - public , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, R request) throws IOException { + public , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest( + String targetAddress, + int targetPort, + M messageHeaders, + U messageParameters, + R request) throws IOException { --- End diff -- Wrong indentation > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519163#comment-16519163 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197069574 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -212,6 +271,86 @@ public void shutdown(Time timeout) { executor); } + private interface RequestProcessor { + T createRequest(HttpRequest request, ByteBuf jsonPayload) throws IOException; --- End diff -- Remove this method > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519159#comment-16519159 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197070028 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -212,6 +271,86 @@ public void shutdown(Time timeout) { executor); } + private interface RequestProcessor { + T createRequest(HttpRequest request, ByteBuf jsonPayload) throws IOException; + + void writeRequest(T body, Channel channel) throws IOException; + } + + private static final class DefaultProcessor implements RequestProcessor { + + @Override + public HttpRequest createRequest(HttpRequest request, ByteBuf jsonPayload) throws IOException { + return request; + } + + @Override + public void writeRequest(HttpRequest body, Channel channel) throws IOException { + channel.writeAndFlush(body); + } + } + + private static final class MultipartProcessor implements RequestProcessor { --- End diff -- Rename to `MultipartRequest` which is initialized with a `HttpPostRequestEncoder` which it uses to write out the multi part request in the `writeRequest` implementation. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519158#comment-16519158 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197069640 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -212,6 +271,86 @@ public void shutdown(Time timeout) { executor); } + private interface RequestProcessor { + T createRequest(HttpRequest request, ByteBuf jsonPayload) throws IOException; + + void writeRequest(T body, Channel channel) throws IOException; --- End diff -- Change this method to only take a `Channel`. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519153#comment-16519153 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197069542 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -212,6 +271,86 @@ public void shutdown(Time timeout) { executor); } + private interface RequestProcessor { --- End diff -- Following my suggestion I would rename this interface into `Request` > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519154#comment-16519154 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197062277 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -145,12 +157,43 @@ public void shutdown(Time timeout) { } } - public , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, R request) throws IOException { + public , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest( + String targetAddress, + int targetPort, + M messageHeaders, + U messageParameters, + R request) throws IOException { + return internalSendRequest(targetAddress, targetPort, messageHeaders, messageParameters, request, false, new DefaultProcessor()); + } + + public , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest( + String targetAddress, + int targetPort, + M messageHeaders, + U messageParameters, + R request, + Collection fileUploads) throws IOException { + if (fileUploads.isEmpty()) { + return sendRequest(targetAddress, targetPort, messageHeaders, messageParameters, request); --- End diff -- here we should call `internalSendRequest(..., false, new DefaultProcessor())` > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519162#comment-16519162 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197069842 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -212,6 +271,86 @@ public void shutdown(Time timeout) { executor); } + private interface RequestProcessor { + T createRequest(HttpRequest request, ByteBuf jsonPayload) throws IOException; + + void writeRequest(T body, Channel channel) throws IOException; + } + + private static final class DefaultProcessor implements RequestProcessor { --- End diff -- Rename to `SimpleRequest` which is initialized with a `HttpRequest` which it writes out in the writeRequest method. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519151#comment-16519151 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197061001 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -106,7 +106,7 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms final HttpContent httpContent = (HttpContent) msg; currentHttpPostRequestDecoder.offer(httpContent); - while (currentHttpPostRequestDecoder.hasNext()) { + while (httpContent != LastHttpContent.EMPTY_LAST_CONTENT && currentHttpPostRequestDecoder.hasNext()) { --- End diff -- Why do we have to add this filter condition? > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519164#comment-16519164 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197068513 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -162,13 +205,25 @@ public void shutdown(Time timeout) { ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET)); // create request and set headers - FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload); + final FullHttpRequest httpRequest; + if (multipart) { --- End diff -- I think it would be better to have our own `Request` base class, a method `createRequest(targetUrl, payload, files, etc)` and `MultiPartRequest` and `SimpleRequest` sub classes. A method `Request#send(Channel)` could encapsulate the logic to write out the request. Then `internalSendRequest` has the responsibility to construct the `Request` and triggering the submission. I'm actually not sure whether we would need `internalSendRequest` at all. Doing this in `sendRequest` where we have a `Collection` should also work. `createRequest` would then check whether to create a `MultiPartRequest` or a `SimpleRequest` based on the `files` parameter. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519155#comment-16519155 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197062109 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -145,12 +157,43 @@ public void shutdown(Time timeout) { } } - public , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, R request) throws IOException { + public , U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture sendRequest( + String targetAddress, + int targetPort, + M messageHeaders, + U messageParameters, + R request) throws IOException { + return internalSendRequest(targetAddress, targetPort, messageHeaders, messageParameters, request, false, new DefaultProcessor()); --- End diff -- I think here we should call `sendRequest(targetAddress, targetPort, messageHeaders, messageParameters, request, Collections.emptyList())` > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519161#comment-16519161 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197070198 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -212,6 +271,86 @@ public void shutdown(Time timeout) { executor); } + private interface RequestProcessor { + T createRequest(HttpRequest request, ByteBuf jsonPayload) throws IOException; + + void writeRequest(T body, Channel channel) throws IOException; + } + + private static final class DefaultProcessor implements RequestProcessor { + + @Override + public HttpRequest createRequest(HttpRequest request, ByteBuf jsonPayload) throws IOException { + return request; + } + + @Override + public void writeRequest(HttpRequest body, Channel channel) throws IOException { + channel.writeAndFlush(body); + } + } + + private static final class MultipartProcessor implements RequestProcessor { + + private final Collection fileUploads; + + MultipartProcessor(Collection fileUploads) { + this.fileUploads = fileUploads; + } + + @Override + public HttpPostRequestEncoder createRequest(HttpRequest httpRequest, ByteBuf jsonPayload) throws IOException { --- End diff -- Move this method out to `RestClient` to generate the `HttpPostRequestEncoder` with which we then create a `MultipartRequest` instance. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519160#comment-16519160 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197069173 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -181,10 +236,10 @@ public void shutdown(Time timeout) { typeParameters.toArray(new Class[typeParameters.size()])); } - return submitRequest(targetAddress, targetPort, httpRequest, responseType); + return submitRequest(targetAddress, targetPort, finalRequest, requestProcessor, responseType); } - private CompletableFuture submitRequest(String targetAddress, int targetPort, FullHttpRequest httpRequest, JavaType responseType) { + private CompletableFuture submitRequest(String targetAddress, int targetPort, T request, RequestProcessor processor, JavaType responseType) { --- End diff -- I think we don't have to split the request from the request write logic by splitting `RequestProcessor` and `request` up. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519116#comment-16519116 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197065693 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java --- @@ -103,77 +102,68 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe return; } - ByteBuf msgContent = ((FullHttpRequest) httpRequest).content(); - - R request; - if (isFileUpload()) { - final Path path = ctx.channel().attr(FileUploadHandler.UPLOADED_FILE).get(); - if (path == null) { - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody("Client did not upload a file."), - HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; - } - //noinspection unchecked - request = (R) new FileUpload(path); - } else if (msgContent.capacity() == 0) { - try { - request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass()); - } catch (JsonParseException | JsonMappingException je) { - log.error("Request did not conform to expected format.", je); - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody("Bad request received."), - HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; + final ByteBuf msgContent = ((FullHttpRequest) httpRequest).content(); + + try (FileUploads uploadedFiles = FileUploadHandler.getMultipartFileUploads(ctx)) { + + R request; + if (msgContent.capacity() == 0) { + try { + request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass()); + } catch (JsonParseException | JsonMappingException je) { + log.error("Request did not conform to expected format.", je); + HandlerUtils.sendErrorResponse( + ctx, + httpRequest, + new ErrorResponseBody("Bad request received."), + HttpResponseStatus.BAD_REQUEST, + responseHeaders); + return; + } + } else { + try { + ByteBufInputStream in = new ByteBufInputStream(msgContent); --- End diff -- btw, this change is also part of the previosu commit that isn't part of this PR. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519115#comment-16519115 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/6178 @tillrohrmann Ready for another review. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519093#comment-16519093 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r197058127 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -259,6 +271,29 @@ public void testFileMultipart() throws Exception { } } + @Test + public void testUploadCleanupOnFailure() throws IOException { + OkHttpClient client = new OkHttpClient(); + + Request request = buildMixedRequestWithUnknownAttribute(mixedHandler.getMessageHeaders().getTargetRestEndpointURL()); + try (Response response = client.newCall(request).execute()) { + assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code()); + } + assertUploadDirectoryIsEmpty(); + } + + private static void assertUploadDirectoryIsEmpty() throws IOException { + Preconditions.checkArgument( + 1 == Files.list(configuredUploadDir).count(), + "Directory structure in rest upload directory has changed. Test must be adjusted"); + Optional actualUploadDir = Files.list(configuredUploadDir).findAny(); + Preconditions.checkArgument( + actualUploadDir.isPresent(), + "Expected upload directory does not exist."); + System.out.println(Files.list(actualUploadDir.get()).collect(Collectors.toList())); + assertEquals("Not all files were cleaned up.", 0, Files.list(actualUploadDir.get()).count()); --- End diff -- Alright, but then the order how methods are called in the catch block is incorrect. Maybe that should be factored out into a method to avoid code duplication. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519096#comment-16519096 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/6178 Checkstyle errors: ``` 18:31:10.780 [INFO] There are 2 errors reported by Checkstyle 8.4 with /tools/maven/checkstyle.xml ruleset. 18:31:10.788 [ERROR] src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java:[56] (regexp) RegexpSingleline: Trailing whitespace 18:31:10.804 [ERROR] src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java:[99] (regexp) RegexpSingleline: Trailing whitespace ``` > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519090#comment-16519090 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r197055449 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.Preconditions; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +/** + * A container for uploaded files. + * + * Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up. + * For convenience during testing it also accepts files directly. + */ +public final class FileUploads implements AutoCloseable { + private final Collection directoriesToClean; + private final Collection uploadedFiles; + + @SuppressWarnings("resource") + public static final FileUploads EMPTY = new FileUploads(); + + public static FileUploads forDirectory(Path directory) throws IOException { + final Collection files = new ArrayList<>(4); + Preconditions.checkArgument(directory.isAbsolute(), "Path must be absolute."); + Preconditions.checkArgument(Files.isDirectory(directory), "Path must be a directory."); + + FileAdderVisitor visitor = new FileAdderVisitor(); + Files.walkFileTree(directory, visitor); + files.addAll(visitor.getContainedFiles()); + + return new FileUploads(Collections.singleton(directory), files); + } + + private FileUploads() { + this.directoriesToClean = Collections.emptyList(); + this.uploadedFiles = Collections.emptyList(); + } + + public FileUploads(Collection directoriesToClean, Collection uploadedFiles) { + this.directoriesToClean = Preconditions.checkNotNull(directoriesToClean); + this.uploadedFiles = Preconditions.checkNotNull(uploadedFiles); --- End diff -- I know it's really nitpicking what I'm doing here, but I think it would be slightly better to let FileUploads only represent the upload directories and add a method FileUploads#getFiles which returns a Collection which are all files being found in the upload directory. The difference is that we don't initialize FileUploads with it. That would effectively enforce that all files reside in the given upload directories. What we could do now is to initialize this class with directories /web/upload/a, /web/upload/b and files /web/different/path/file where the files are somewhere else located. Due to this, we not only need to delete the directories but also all files. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519087#comment-16519087 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r197054770 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -259,6 +271,29 @@ public void testFileMultipart() throws Exception { } } + @Test + public void testUploadCleanupOnFailure() throws IOException { + OkHttpClient client = new OkHttpClient(); + + Request request = buildMixedRequestWithUnknownAttribute(mixedHandler.getMessageHeaders().getTargetRestEndpointURL()); + try (Response response = client.newCall(request).execute()) { + assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code()); + } + assertUploadDirectoryIsEmpty(); + } + + private static void assertUploadDirectoryIsEmpty() throws IOException { + Preconditions.checkArgument( + 1 == Files.list(configuredUploadDir).count(), + "Directory structure in rest upload directory has changed. Test must be adjusted"); + Optional actualUploadDir = Files.list(configuredUploadDir).findAny(); + Preconditions.checkArgument( + actualUploadDir.isPresent(), + "Expected upload directory does not exist."); + System.out.println(Files.list(actualUploadDir.get()).collect(Collectors.toList())); + assertEquals("Not all files were cleaned up.", 0, Files.list(actualUploadDir.get()).count()); --- End diff -- This test wasn't covering the case of exceptions but the rejection of unknown attributes. Will try to find a way to crash the handler. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519086#comment-16519086 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r197054545 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java --- @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.runtime.rest.handler.FileUploads; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.router.RouteResult; +import org.apache.flink.runtime.rest.handler.router.RoutedRequest; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.shaded.netty4.io.netty.util.Attribute; +import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link AbstractHandler}. + */ +public class AbstractHandlerTest { + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void testFileCleanup() throws Exception { + final Path file = temporaryFolder.newFile().toPath(); + + final String restAddress = "http://localhost:1234;; + RestfulGateway mockRestfulGateway = TestingRestfulGateway.newBuilder() + .setRestAddress(restAddress) + .build(); + + final GatewayRetriever mockGatewayRetriever = () -> + CompletableFuture.completedFuture(mockRestfulGateway); + + TestHandler handler = new TestHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever); + + RouteResult routeResult = new RouteResult<>("", "", Collections.emptyMap(), Collections.emptyMap(), ""); + HttpRequest request = new DefaultFullHttpRequest( + HttpVersion.HTTP_1_1, + HttpMethod.GET, + TestHandler.TestHeaders.INSTANCE.getTargetRestEndpointURL(), + Unpooled.wrappedBuffer(new byte[0])); + RoutedRequest routerRequest = new RoutedRequest<>(routeResult, request); + + Attribute attribute = new
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519081#comment-16519081 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r197051395 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -259,6 +271,29 @@ public void testFileMultipart() throws Exception { } } + @Test + public void testUploadCleanupOnFailure() throws IOException { + OkHttpClient client = new OkHttpClient(); + + Request request = buildMixedRequestWithUnknownAttribute(mixedHandler.getMessageHeaders().getTargetRestEndpointURL()); + try (Response response = client.newCall(request).execute()) { + assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code()); + } + assertUploadDirectoryIsEmpty(); + } + + private static void assertUploadDirectoryIsEmpty() throws IOException { + Preconditions.checkArgument( + 1 == Files.list(configuredUploadDir).count(), + "Directory structure in rest upload directory has changed. Test must be adjusted"); + Optional actualUploadDir = Files.list(configuredUploadDir).findAny(); + Preconditions.checkArgument( + actualUploadDir.isPresent(), + "Expected upload directory does not exist."); + System.out.println(Files.list(actualUploadDir.get()).collect(Collectors.toList())); --- End diff -- Remove system out > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519082#comment-16519082 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r197053303 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -259,6 +271,29 @@ public void testFileMultipart() throws Exception { } } + @Test + public void testUploadCleanupOnFailure() throws IOException { + OkHttpClient client = new OkHttpClient(); + + Request request = buildMixedRequestWithUnknownAttribute(mixedHandler.getMessageHeaders().getTargetRestEndpointURL()); + try (Response response = client.newCall(request).execute()) { + assertEquals(HttpResponseStatus.BAD_REQUEST.code(), response.code()); + } + assertUploadDirectoryIsEmpty(); + } + + private static void assertUploadDirectoryIsEmpty() throws IOException { + Preconditions.checkArgument( + 1 == Files.list(configuredUploadDir).count(), + "Directory structure in rest upload directory has changed. Test must be adjusted"); + Optional actualUploadDir = Files.list(configuredUploadDir).findAny(); + Preconditions.checkArgument( + actualUploadDir.isPresent(), + "Expected upload directory does not exist."); + System.out.println(Files.list(actualUploadDir.get()).collect(Collectors.toList())); + assertEquals("Not all files were cleaned up.", 0, Files.list(actualUploadDir.get()).count()); --- End diff -- Why don't we run into a race condition with the catch block of `FileUploadHandler`? > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519080#comment-16519080 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r197053400 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java --- @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.runtime.rest.handler.FileUploads; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.router.RouteResult; +import org.apache.flink.runtime.rest.handler.router.RoutedRequest; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.shaded.netty4.io.netty.util.Attribute; +import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link AbstractHandler}. + */ +public class AbstractHandlerTest { + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void testFileCleanup() throws Exception { + final Path file = temporaryFolder.newFile().toPath(); + + final String restAddress = "http://localhost:1234;; + RestfulGateway mockRestfulGateway = TestingRestfulGateway.newBuilder() + .setRestAddress(restAddress) + .build(); + + final GatewayRetriever mockGatewayRetriever = () -> + CompletableFuture.completedFuture(mockRestfulGateway); + + TestHandler handler = new TestHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever); + + RouteResult routeResult = new RouteResult<>("", "", Collections.emptyMap(), Collections.emptyMap(), ""); + HttpRequest request = new DefaultFullHttpRequest( + HttpVersion.HTTP_1_1, + HttpMethod.GET, + TestHandler.TestHeaders.INSTANCE.getTargetRestEndpointURL(), + Unpooled.wrappedBuffer(new byte[0])); + RoutedRequest routerRequest = new RoutedRequest<>(routeResult, request); + + Attribute attribute = new
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519071#comment-16519071 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r197050662 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java --- @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.runtime.rest.handler.FileUploads; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.router.RouteResult; +import org.apache.flink.runtime.rest.handler.router.RoutedRequest; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.shaded.netty4.io.netty.util.Attribute; +import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link AbstractHandler}. + */ +public class AbstractHandlerTest { + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void testFileCleanup() throws Exception { + final Path file = temporaryFolder.newFile().toPath(); + + final String restAddress = "http://localhost:1234;; + RestfulGateway mockRestfulGateway = TestingRestfulGateway.newBuilder() + .setRestAddress(restAddress) + .build(); + + final GatewayRetriever mockGatewayRetriever = () -> + CompletableFuture.completedFuture(mockRestfulGateway); + + TestHandler handler = new TestHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever); + + RouteResult routeResult = new RouteResult<>("", "", Collections.emptyMap(), Collections.emptyMap(), ""); + HttpRequest request = new DefaultFullHttpRequest( + HttpVersion.HTTP_1_1, + HttpMethod.GET, + TestHandler.TestHeaders.INSTANCE.getTargetRestEndpointURL(), + Unpooled.wrappedBuffer(new byte[0])); + RoutedRequest routerRequest = new RoutedRequest<>(routeResult, request); + + Attribute attribute = new
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519070#comment-16519070 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r197050066 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java --- @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.runtime.rest.handler.FileUploads; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.router.RouteResult; +import org.apache.flink.runtime.rest.handler.router.RoutedRequest; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.shaded.netty4.io.netty.util.Attribute; +import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for {@link AbstractHandler}. + */ +public class AbstractHandlerTest { --- End diff -- `TestLogger` missing > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519067#comment-16519067 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r197049400 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java --- @@ -45,27 +45,26 @@ @SuppressWarnings("resource") public static final FileUploads EMPTY = new FileUploads(); + public static FileUploads forDirectory(Path directory) throws IOException { + final Collection files = new ArrayList<>(4); + Preconditions.checkArgument(directory.isAbsolute(), "Path must be absolute."); + Preconditions.checkArgument(Files.isDirectory(directory), "Path must be a directory."); + + FileAdderVisitor visitor = new FileAdderVisitor(); + Files.walkFileTree(directory, visitor); + files.addAll(visitor.getContainedFiles()); + + return new FileUploads(Collections.singleton(directory), files); + } + private FileUploads() { this.directoriesToClean = Collections.emptyList(); this.uploadedFiles = Collections.emptyList(); } - public FileUploads(Collection uploadedFilesOrDirectory) throws IOException { - final Collection files = new ArrayList<>(4); - final Collection directories = new ArrayList<>(1); - for (Path fileOrDirectory : uploadedFilesOrDirectory) { - Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute."); - if (Files.isDirectory(fileOrDirectory)) { - directories.add(fileOrDirectory); - FileAdderVisitor visitor = new FileAdderVisitor(); - Files.walkFileTree(fileOrDirectory, visitor); - files.addAll(visitor.getContainedFiles()); - } else { - files.add(fileOrDirectory); - } - } - directoriesToClean = Collections.unmodifiableCollection(directories); - uploadedFiles = Collections.unmodifiableCollection(files); + public FileUploads(Collection directoriesToClean, Collection uploadedFiles) { + this.directoriesToClean = Preconditions.checkNotNull(directoriesToClean); + this.uploadedFiles = Preconditions.checkNotNull(uploadedFiles); --- End diff -- I know it's really nitpicking what I'm doing here, but I think it would be slightly better to let `FileUploads` only represent the upload directories and add a method `FileUploads#getFiles` which returns a `Collection` which are all files being found in the upload directory. The difference is that we don't initialize `FileUploads` with it. That would effectively enforce that all files reside in the given upload directories. What we could do now is to initialize this class with directories `/web/upload/a, /web/upload/b` and files `/web/different/path/file` where the files are somewhere else located. Due to this, we not only need to delete the directories but also all files. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16519006#comment-16519006 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197029723 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java --- @@ -103,77 +102,68 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe return; } - ByteBuf msgContent = ((FullHttpRequest) httpRequest).content(); - - R request; - if (isFileUpload()) { - final Path path = ctx.channel().attr(FileUploadHandler.UPLOADED_FILE).get(); - if (path == null) { - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody("Client did not upload a file."), - HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; - } - //noinspection unchecked - request = (R) new FileUpload(path); - } else if (msgContent.capacity() == 0) { - try { - request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass()); - } catch (JsonParseException | JsonMappingException je) { - log.error("Request did not conform to expected format.", je); - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody("Bad request received."), - HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; + final ByteBuf msgContent = ((FullHttpRequest) httpRequest).content(); + + try (FileUploads uploadedFiles = FileUploadHandler.getMultipartFileUploads(ctx)) { + + R request; + if (msgContent.capacity() == 0) { + try { + request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass()); + } catch (JsonParseException | JsonMappingException je) { + log.error("Request did not conform to expected format.", je); + HandlerUtils.sendErrorResponse( + ctx, + httpRequest, + new ErrorResponseBody("Bad request received."), + HttpResponseStatus.BAD_REQUEST, + responseHeaders); + return; + } + } else { + try { + ByteBufInputStream in = new ByteBufInputStream(msgContent); --- End diff -- The change is is only indendation, afaik there's already a PR for fixing this. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518905#comment-16518905 ] ASF GitHub Bot commented on FLINK-9599: --- Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r197008798 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java --- @@ -103,77 +102,68 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe return; } - ByteBuf msgContent = ((FullHttpRequest) httpRequest).content(); - - R request; - if (isFileUpload()) { - final Path path = ctx.channel().attr(FileUploadHandler.UPLOADED_FILE).get(); - if (path == null) { - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody("Client did not upload a file."), - HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; - } - //noinspection unchecked - request = (R) new FileUpload(path); - } else if (msgContent.capacity() == 0) { - try { - request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass()); - } catch (JsonParseException | JsonMappingException je) { - log.error("Request did not conform to expected format.", je); - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody("Bad request received."), - HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; + final ByteBuf msgContent = ((FullHttpRequest) httpRequest).content(); + + try (FileUploads uploadedFiles = FileUploadHandler.getMultipartFileUploads(ctx)) { + + R request; + if (msgContent.capacity() == 0) { + try { + request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass()); + } catch (JsonParseException | JsonMappingException je) { + log.error("Request did not conform to expected format.", je); + HandlerUtils.sendErrorResponse( + ctx, + httpRequest, + new ErrorResponseBody("Bad request received."), + HttpResponseStatus.BAD_REQUEST, + responseHeaders); + return; + } + } else { + try { + ByteBufInputStream in = new ByteBufInputStream(msgContent); --- End diff -- I would suggest to use try-with-resource to make sure to close `in`. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518360#comment-16518360 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196868266 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -0,0 +1,483 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly + * handled. + */ +public class FileUploadHandlerTest extends TestLogger { + + private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper(); + private static final Random RANDOM = new Random(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static RestServerEndpoint serverEndpoint; + private static String serverAddress; + + private static MultipartMixedHandler mixedHandler; + private static MultipartJsonHandler jsonHandler; + private static MultipartFileHandler fileHandler; + private static File file1; + private static File file2; + +
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518333#comment-16518333 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196849558 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -0,0 +1,483 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly + * handled. + */ +public class FileUploadHandlerTest extends TestLogger { + + private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper(); + private static final Random RANDOM = new Random(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static RestServerEndpoint serverEndpoint; + private static String serverAddress; + + private static MultipartMixedHandler mixedHandler; + private static MultipartJsonHandler jsonHandler; + private static MultipartFileHandler fileHandler; + private static File file1; + private static File file2; + +
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518311#comment-16518311 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196836698 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.Preconditions; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +/** + * A container for uploaded files. + * + * Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up. + * For convenience during testing it also accepts files directly. + */ +public final class FileUploads implements AutoCloseable { + private final Collection directoriesToClean; + private final Collection uploadedFiles; + + @SuppressWarnings("resource") + public static final FileUploads EMPTY = new FileUploads(); + + private FileUploads() { + this.directoriesToClean = Collections.emptyList(); + this.uploadedFiles = Collections.emptyList(); + } + + public FileUploads(Collection uploadedFilesOrDirectory) throws IOException { + final Collection files = new ArrayList<>(4); + final Collection directories = new ArrayList<>(1); + for (Path fileOrDirectory : uploadedFilesOrDirectory) { + Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute."); + if (Files.isDirectory(fileOrDirectory)) { + directories.add(fileOrDirectory); + FileAdderVisitor visitor = new FileAdderVisitor(); + Files.walkFileTree(fileOrDirectory, visitor); + files.addAll(visitor.getContainedFiles()); + } else { + files.add(fileOrDirectory); + } --- End diff -- we don't have to, it's for testing convenience as noted in the class javadocs. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518310#comment-16518310 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196836575 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -70,51 +84,103 @@ public FileUploadHandler(final Path uploadDir) { @Override protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) throws Exception { - if (msg instanceof HttpRequest) { - final HttpRequest httpRequest = (HttpRequest) msg; - if (httpRequest.getMethod().equals(HttpMethod.POST)) { - if (HttpPostRequestDecoder.isMultipart(httpRequest)) { - currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest); - currentHttpRequest = httpRequest; + try { + if (msg instanceof HttpRequest) { + final HttpRequest httpRequest = (HttpRequest) msg; + LOG.trace("Received request. URL:{} Method:{}", httpRequest.getUri(), httpRequest.getMethod()); + if (httpRequest.getMethod().equals(HttpMethod.POST)) { + if (HttpPostRequestDecoder.isMultipart(httpRequest)) { + currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest); + currentHttpRequest = httpRequest; + currentUploadDir = Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString())); + } else { + ctx.fireChannelRead(msg); + } } else { ctx.fireChannelRead(msg); } + } else if (msg instanceof HttpContent && currentHttpPostRequestDecoder != null) { + // make sure that we still have a upload dir in case that it got deleted in the meanwhile + RestServerEndpoint.createUploadDir(uploadDir, LOG); + + final HttpContent httpContent = (HttpContent) msg; + currentHttpPostRequestDecoder.offer(httpContent); + + while (currentHttpPostRequestDecoder.hasNext()) { + final InterfaceHttpData data = currentHttpPostRequestDecoder.next(); + if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) { + final DiskFileUpload fileUpload = (DiskFileUpload) data; + checkState(fileUpload.isCompleted()); + + final Path dest = currentUploadDir.resolve(fileUpload.getFilename()); + fileUpload.renameTo(dest.toFile()); + } else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { + final Attribute request = (Attribute) data; + // this could also be implemented by using the first found Attribute as the payload + if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) { + currentJsonPayload = request.get(); + } else { + LOG.warn("Received unknown attribute {}.", data.getName()); + HandlerUtils.sendErrorResponse( + ctx, + currentHttpRequest, + new ErrorResponseBody("Received unknown attribute " + data.getName() + '.'), + HttpResponseStatus.BAD_REQUEST, + Collections.emptyMap() + ); + deleteUploadedFiles(); +
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518183#comment-16518183 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r196793648 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -162,13 +205,25 @@ public void shutdown(Time timeout) { ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET)); // create request and set headers - FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload); + final FullHttpRequest httpRequest; + if (multipart) { + httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl); + } else { + httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload); + } + httpRequest.headers() - .add(HttpHeaders.Names.CONTENT_LENGTH, payload.capacity()) - .add(HttpHeaders.Names.CONTENT_TYPE, RestConstants.REST_CONTENT_TYPE) .set(HttpHeaders.Names.HOST, targetAddress + ':' + targetPort) .set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); + if (!multipart) { + httpRequest.headers() --- End diff -- wasn't sure whether this should also be done by the `RequestProcessor` > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518182#comment-16518182 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6189#discussion_r196793605 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -162,13 +205,25 @@ public void shutdown(Time timeout) { ByteBuf payload = Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET)); // create request and set headers - FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, messageHeaders.getHttpMethod().getNettyHttpMethod(), targetUrl, payload); + final FullHttpRequest httpRequest; + if (multipart) { --- End diff -- wasn't sure whether this should also be done by the `RequestProcessor` > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518180#comment-16518180 ] ASF GitHub Bot commented on FLINK-9599: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6189 [FLINK-9599][rest] RestClient supports FileUploads This PR is based on a squashed #6178. ## What is the purpose of the change This PR extends the `RestClient` to allow sending multipart messages containing files and an optional json payload. @tillrohrmann Regarding the previously discussed issue about ´EMPTY_LAST_HTTP_CONTENT`, you can reproduce the issue by reverting the change to the `FileUploadHandler` and running the `RestClientMultipartTest`. ## Brief change log * rework `FileUploadHandlerTest` into an abstract base class, to re-use classes for the `RestClient` ## Verifying this change * add `RequestProcess` interface for hiding differences between non-/multipart messages * refactor `RestClient#sendRequest` into `internalSendRequest` that allows passing a `RequestProcessor` * see `RestClientMultipartTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 9280_gamma Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6189.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6189 commit 97d7eaf2adbc8174025fafd32b3424f05bd27da1 Author: zentol Date: 2018-06-18T08:54:42Z [FLINK-9599][rest] Implement generic mechanism to access uploaded files commit 0ea85f1658c3ceca09a481e27392ed39b955d8bb Author: zentol Date: 2018-06-19T07:45:09Z [FLINK-9599][rest] RestClient supports FileUploads > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518054#comment-16518054 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196739865 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.Preconditions; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +/** + * A container for uploaded files. + * + * Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up. + * For convenience during testing it also accepts files directly. + */ +public final class FileUploads implements AutoCloseable { + private final Collection directoriesToClean; + private final Collection uploadedFiles; + + @SuppressWarnings("resource") + public static final FileUploads EMPTY = new FileUploads(); + + private FileUploads() { + this.directoriesToClean = Collections.emptyList(); + this.uploadedFiles = Collections.emptyList(); + } + + public FileUploads(Collection uploadedFilesOrDirectory) throws IOException { + final Collection files = new ArrayList<>(4); + final Collection directories = new ArrayList<>(1); + for (Path fileOrDirectory : uploadedFilesOrDirectory) { + Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute."); + if (Files.isDirectory(fileOrDirectory)) { + directories.add(fileOrDirectory); + FileAdderVisitor visitor = new FileAdderVisitor(); + Files.walkFileTree(fileOrDirectory, visitor); + files.addAll(visitor.getContainedFiles()); + } else { + files.add(fileOrDirectory); + } + } + directoriesToClean = Collections.unmodifiableCollection(directories); + uploadedFiles = Collections.unmodifiableCollection(files); --- End diff -- I think it is better to move this logic out of the constructor. Adding logic to a constructor makes testing always difficult. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518055#comment-16518055 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196740578 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java --- @@ -39,15 +41,21 @@ public class HandlerRequest { private final R requestBody; + private final FileUploads uploadedFiles; --- End diff -- This comment has not been addressed. I think the `HandlerRequest` should not know about the `FileUploads` because it can use to delete the files. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518056#comment-16518056 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196740018 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -70,51 +84,103 @@ public FileUploadHandler(final Path uploadDir) { @Override protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) throws Exception { - if (msg instanceof HttpRequest) { - final HttpRequest httpRequest = (HttpRequest) msg; - if (httpRequest.getMethod().equals(HttpMethod.POST)) { - if (HttpPostRequestDecoder.isMultipart(httpRequest)) { - currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest); - currentHttpRequest = httpRequest; + try { + if (msg instanceof HttpRequest) { + final HttpRequest httpRequest = (HttpRequest) msg; + LOG.trace("Received request. URL:{} Method:{}", httpRequest.getUri(), httpRequest.getMethod()); + if (httpRequest.getMethod().equals(HttpMethod.POST)) { + if (HttpPostRequestDecoder.isMultipart(httpRequest)) { + currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest); + currentHttpRequest = httpRequest; + currentUploadDir = Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString())); + } else { + ctx.fireChannelRead(msg); + } } else { ctx.fireChannelRead(msg); } + } else if (msg instanceof HttpContent && currentHttpPostRequestDecoder != null) { + // make sure that we still have a upload dir in case that it got deleted in the meanwhile + RestServerEndpoint.createUploadDir(uploadDir, LOG); + + final HttpContent httpContent = (HttpContent) msg; + currentHttpPostRequestDecoder.offer(httpContent); + + while (currentHttpPostRequestDecoder.hasNext()) { + final InterfaceHttpData data = currentHttpPostRequestDecoder.next(); + if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) { + final DiskFileUpload fileUpload = (DiskFileUpload) data; + checkState(fileUpload.isCompleted()); + + final Path dest = currentUploadDir.resolve(fileUpload.getFilename()); + fileUpload.renameTo(dest.toFile()); + } else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { + final Attribute request = (Attribute) data; + // this could also be implemented by using the first found Attribute as the payload + if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) { + currentJsonPayload = request.get(); + } else { + LOG.warn("Received unknown attribute {}.", data.getName()); + HandlerUtils.sendErrorResponse( + ctx, + currentHttpRequest, + new ErrorResponseBody("Received unknown attribute " + data.getName() + '.'), + HttpResponseStatus.BAD_REQUEST, + Collections.emptyMap() + ); + deleteUploadedFiles(); +
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518053#comment-16518053 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196739603 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java --- @@ -70,9 +70,14 @@ protected AbstractRestHandler( } @Override - protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest handlerRequest, T gateway) throws RestHandlerException { + protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest httpRequest, HandlerRequest handlerRequest, T gateway) { CompletableFuture response; + if (!messageHeaders.acceptsFileUploads() && !handlerRequest.getUploadedFiles().isEmpty()) { + processRestHandlerException(ctx, httpRequest, new RestHandlerException("File uploads not allowed.", HttpResponseStatus.BAD_REQUEST)); + return; + } --- End diff -- Shouldn't this be moved to the `AbstractHandler`? > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518058#comment-16518058 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196741255 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -0,0 +1,483 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly + * handled. + */ +public class FileUploadHandlerTest extends TestLogger { + + private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper(); + private static final Random RANDOM = new Random(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static RestServerEndpoint serverEndpoint; + private static String serverAddress; + + private static MultipartMixedHandler mixedHandler; + private static MultipartJsonHandler jsonHandler; + private static MultipartFileHandler fileHandler; + private static File file1; + private static File file2; + +
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518059#comment-16518059 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196741065 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -0,0 +1,483 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.TestingRestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly + * handled. + */ +public class FileUploadHandlerTest extends TestLogger { + + private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper(); + private static final Random RANDOM = new Random(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static RestServerEndpoint serverEndpoint; + private static String serverAddress; + + private static MultipartMixedHandler mixedHandler; + private static MultipartJsonHandler jsonHandler; + private static MultipartFileHandler fileHandler; + private static File file1; + private static File file2; + +
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16518057#comment-16518057 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196740407 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.Preconditions; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +/** + * A container for uploaded files. + * + * Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up. + * For convenience during testing it also accepts files directly. + */ +public final class FileUploads implements AutoCloseable { + private final Collection directoriesToClean; + private final Collection uploadedFiles; + + @SuppressWarnings("resource") + public static final FileUploads EMPTY = new FileUploads(); + + private FileUploads() { + this.directoriesToClean = Collections.emptyList(); + this.uploadedFiles = Collections.emptyList(); + } + + public FileUploads(Collection uploadedFilesOrDirectory) throws IOException { + final Collection files = new ArrayList<>(4); + final Collection directories = new ArrayList<>(1); + for (Path fileOrDirectory : uploadedFilesOrDirectory) { + Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute."); + if (Files.isDirectory(fileOrDirectory)) { + directories.add(fileOrDirectory); + FileAdderVisitor visitor = new FileAdderVisitor(); + Files.walkFileTree(fileOrDirectory, visitor); + files.addAll(visitor.getContainedFiles()); + } else { + files.add(fileOrDirectory); + } --- End diff -- Do we have to allow that we can specify files and directories alike? Why not requiring that you have to provide a upload directory which contains all uploaded files. This makes the whole clean up logic easier. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517983#comment-16517983 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/6178 @tillrohrmann I believe I've addressed all your comments. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517971#comment-16517971 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196712002 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -95,14 +107,22 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms final DiskFileUpload fileUpload = (DiskFileUpload) data; checkState(fileUpload.isCompleted()); - final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() + - "_" + fileUpload.getFilename())); + final Path dest = currentUploadDir.resolve(fileUpload.getFilename()); fileUpload.renameTo(dest.toFile()); - ctx.channel().attr(UPLOADED_FILE).set(dest); + } else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { + final Attribute request = (Attribute) data; + // this could also be implemented by using the first found Attribute as the payload + if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) { + currentJsonPayload = request.get(); + } else { + LOG.warn("Received unknown attribute {}, will be ignored.", data.getName()); + } } } if (httpContent instanceof LastHttpContent) { + ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(Collections.singleton(currentUploadDir))); + ctx.channel().attr(UPLOADED_JSON).set(currentJsonPayload); --- End diff -- well look at that, it _actually works_ > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517952#comment-16517952 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196701452 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -116,5 +136,16 @@ private void reset() { currentHttpPostRequestDecoder.destroy(); currentHttpPostRequestDecoder = null; currentHttpRequest = null; + currentUploadDir = null; + currentJsonPayload = null; + } + + public static Optional getMultipartJsonPayload(ChannelHandlerContext ctx) { + return Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get()); + } + + public static FileUploads getMultipartFileUploads(ChannelHandlerContext ctx) { + return Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get()) + .orElse(FileUploads.EMPTY); --- End diff -- I think not much. My gut feeling is just that `FileUploads` can be simplified. Instead of having our own FileVisitor, we could simply call `FileUtils.deleteDirectory(uploadDirectory)`. And I think this class has actually two responsibilities: Listing all files to make them accessible and storing the directories in which they reside to delete them afterwards. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517950#comment-16517950 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196699736 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -72,10 +82,12 @@ public FileUploadHandler(final Path uploadDir) { protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) throws Exception { if (msg instanceof HttpRequest) { final HttpRequest httpRequest = (HttpRequest) msg; + LOG.trace("Received request. URL:{} Method:{}", httpRequest.getUri(), httpRequest.getMethod()); if (httpRequest.getMethod().equals(HttpMethod.POST)) { if (HttpPostRequestDecoder.isMultipart(httpRequest)) { currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest); currentHttpRequest = httpRequest; + currentUploadDir = Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString())); --- End diff -- I think we should add this. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517946#comment-16517946 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196698434 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -72,10 +82,12 @@ public FileUploadHandler(final Path uploadDir) { protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) throws Exception { if (msg instanceof HttpRequest) { final HttpRequest httpRequest = (HttpRequest) msg; + LOG.trace("Received request. URL:{} Method:{}", httpRequest.getUri(), httpRequest.getMethod()); if (httpRequest.getMethod().equals(HttpMethod.POST)) { if (HttpPostRequestDecoder.isMultipart(httpRequest)) { currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest); currentHttpRequest = httpRequest; + currentUploadDir = Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString())); --- End diff -- If the `FileUploadHandler` itself fail it isn't cleaned up, but that was already the case in the existing code. The handler is generally rather _light_ when it comes to failure handling (i.e. it doesn't do anything in that regard). > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517940#comment-16517940 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196696248 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -116,5 +136,16 @@ private void reset() { currentHttpPostRequestDecoder.destroy(); currentHttpPostRequestDecoder = null; currentHttpRequest = null; + currentUploadDir = null; + currentJsonPayload = null; + } + + public static Optional getMultipartJsonPayload(ChannelHandlerContext ctx) { + return Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get()); + } + + public static FileUploads getMultipartFileUploads(ChannelHandlerContext ctx) { + return Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get()) + .orElse(FileUploads.EMPTY); --- End diff -- How does this differ to the current implementation? Are you suggesting to simplify `FileUploads` to only consider the case of 1 directory containing N files? > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517935#comment-16517935 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196695325 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -72,10 +82,12 @@ public FileUploadHandler(final Path uploadDir) { protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) throws Exception { if (msg instanceof HttpRequest) { final HttpRequest httpRequest = (HttpRequest) msg; + LOG.trace("Received request. URL:{} Method:{}", httpRequest.getUri(), httpRequest.getMethod()); if (httpRequest.getMethod().equals(HttpMethod.POST)) { if (HttpPostRequestDecoder.isMultipart(httpRequest)) { currentHttpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest); currentHttpRequest = httpRequest; + currentUploadDir = Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString())); --- End diff -- How do we clean up the `currentUploadDir` in case of a failure? > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517916#comment-16517916 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196689843 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -116,5 +136,16 @@ private void reset() { currentHttpPostRequestDecoder.destroy(); currentHttpPostRequestDecoder = null; currentHttpRequest = null; + currentUploadDir = null; + currentJsonPayload = null; + } + + public static Optional getMultipartJsonPayload(ChannelHandlerContext ctx) { + return Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get()); + } + + public static FileUploads getMultipartFileUploads(ChannelHandlerContext ctx) { + return Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get()) + .orElse(FileUploads.EMPTY); --- End diff -- True, then I would suggest to only store the directory in the `FileUploads` and adding a method to retrieve all uploaded files. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517896#comment-16517896 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196681937 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.FileUploads; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly + * handled. + */ +public class FileUploadHandlerTest { + + private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper(); + private static final Random RANDOM = new Random(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static RestServerEndpoint serverEndpoint; + private static String serverAddress; + + private static MultipartMixedHandler mixedHandler; + private static MultipartJsonHandler
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517861#comment-16517861 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196667391 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -95,14 +107,22 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms final DiskFileUpload fileUpload = (DiskFileUpload) data; checkState(fileUpload.isCompleted()); - final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() + - "_" + fileUpload.getFilename())); + final Path dest = currentUploadDir.resolve(fileUpload.getFilename()); fileUpload.renameTo(dest.toFile()); - ctx.channel().attr(UPLOADED_FILE).set(dest); + } else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { + final Attribute request = (Attribute) data; + // this could also be implemented by using the first found Attribute as the payload + if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) { + currentJsonPayload = request.get(); + } else { + LOG.warn("Received unknown attribute {}, will be ignored.", data.getName()); --- End diff -- yes. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517860#comment-16517860 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196667307 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -116,5 +136,16 @@ private void reset() { currentHttpPostRequestDecoder.destroy(); currentHttpPostRequestDecoder = null; currentHttpRequest = null; + currentUploadDir = null; + currentJsonPayload = null; + } + + public static Optional getMultipartJsonPayload(ChannelHandlerContext ctx) { + return Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get()); + } + + public static FileUploads getMultipartFileUploads(ChannelHandlerContext ctx) { + return Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get()) + .orElse(FileUploads.EMPTY); --- End diff -- How files are stored is an implementation detail of the `FileUploadHandler`, why would we expose this to subsequent handlers? > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517858#comment-16517858 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196667205 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -95,14 +107,22 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms final DiskFileUpload fileUpload = (DiskFileUpload) data; checkState(fileUpload.isCompleted()); - final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() + - "_" + fileUpload.getFilename())); + final Path dest = currentUploadDir.resolve(fileUpload.getFilename()); fileUpload.renameTo(dest.toFile()); - ctx.channel().attr(UPLOADED_FILE).set(dest); + } else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { + final Attribute request = (Attribute) data; + // this could also be implemented by using the first found Attribute as the payload + if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) { + currentJsonPayload = request.get(); + } else { + LOG.warn("Received unknown attribute {}, will be ignored.", data.getName()); + } } } if (httpContent instanceof LastHttpContent) { + ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(Collections.singleton(currentUploadDir))); + ctx.channel().attr(UPLOADED_JSON).set(currentJsonPayload); --- End diff -- I can try this (it would be neat to remove the special case in `AbstractHandler`, but I'm wondering whether we can "simply" replace the payload of the multipart request (as identified by the headers that we also forward) with plain json. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517490#comment-16517490 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196559093 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.FileUploads; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly + * handled. + */ +public class FileUploadHandlerTest { --- End diff -- Missing `extends TestLogger` > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major >
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517480#comment-16517480 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196556101 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java --- @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.Collections; + +/** + * Tests for {@link FileUploads}. + */ +public class FileUploadsTest { --- End diff -- `extends TestLogger` is missing > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517479#comment-16517479 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196452418 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -116,5 +136,16 @@ private void reset() { currentHttpPostRequestDecoder.destroy(); currentHttpPostRequestDecoder = null; currentHttpRequest = null; + currentUploadDir = null; + currentJsonPayload = null; + } + + public static Optional getMultipartJsonPayload(ChannelHandlerContext ctx) { + return Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get()); + } --- End diff -- By sending the json payload down stream, we could avoid having this method. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517492#comment-16517492 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196557260 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java --- @@ -39,15 +41,21 @@ public class HandlerRequest { private final R requestBody; + private final FileUploads uploadedFiles; --- End diff -- This could also be a `Collection` > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517481#comment-16517481 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196452583 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -116,5 +136,16 @@ private void reset() { currentHttpPostRequestDecoder.destroy(); currentHttpPostRequestDecoder = null; currentHttpRequest = null; + currentUploadDir = null; + currentJsonPayload = null; + } + + public static Optional getMultipartJsonPayload(ChannelHandlerContext ctx) { + return Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get()); + } + + public static FileUploads getMultipartFileUploads(ChannelHandlerContext ctx) { + return Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get()) + .orElse(FileUploads.EMPTY); --- End diff -- I would suggest to simply return the upload directory. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517477#comment-16517477 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196452755 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -95,14 +107,22 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms final DiskFileUpload fileUpload = (DiskFileUpload) data; checkState(fileUpload.isCompleted()); - final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() + - "_" + fileUpload.getFilename())); + final Path dest = currentUploadDir.resolve(fileUpload.getFilename()); fileUpload.renameTo(dest.toFile()); - ctx.channel().attr(UPLOADED_FILE).set(dest); + } else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { + final Attribute request = (Attribute) data; + // this could also be implemented by using the first found Attribute as the payload + if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) { + currentJsonPayload = request.get(); + } else { + LOG.warn("Received unknown attribute {}, will be ignored.", data.getName()); --- End diff -- Should we rather fail? > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517478#comment-16517478 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196453235 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -95,14 +107,22 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms final DiskFileUpload fileUpload = (DiskFileUpload) data; checkState(fileUpload.isCompleted()); - final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() + - "_" + fileUpload.getFilename())); + final Path dest = currentUploadDir.resolve(fileUpload.getFilename()); fileUpload.renameTo(dest.toFile()); - ctx.channel().attr(UPLOADED_FILE).set(dest); + } else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { + final Attribute request = (Attribute) data; + // this could also be implemented by using the first found Attribute as the payload + if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) { + currentJsonPayload = request.get(); + } else { + LOG.warn("Received unknown attribute {}, will be ignored.", data.getName()); + } } } if (httpContent instanceof LastHttpContent) { + ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(Collections.singleton(currentUploadDir))); --- End diff -- I would suggest to simply store the upload directory in the `UPLOAD_FILES` attribute. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517486#comment-16517486 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196555487 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.util.Preconditions; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +/** + * A container for uploaded files. + * + * Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up. + * For convenience during testing it also accepts files directly. + */ +public final class FileUploads implements AutoCloseable { + private final Collection directoriesToClean; + private final Collection uploadedFiles; + + @SuppressWarnings("resource") + public static final FileUploads EMPTY = new FileUploads(); + + private FileUploads() { + this.directoriesToClean = Collections.emptyList(); + this.uploadedFiles = Collections.emptyList(); + } + + public FileUploads(Collection uploadedFilesOrDirectory) throws IOException { + final Collection files = new ArrayList<>(4); + final Collection directories = new ArrayList<>(1); + for (Path fileOrDirectory : uploadedFilesOrDirectory) { + Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute."); + if (Files.isDirectory(fileOrDirectory)) { + directories.add(fileOrDirectory); + FileAdderVisitor visitor = new FileAdderVisitor(); + Files.walkFileTree(fileOrDirectory, visitor); + files.addAll(visitor.get()); + } else { + files.add(fileOrDirectory); + } + } + directoriesToClean = Collections.unmodifiableCollection(directories); + uploadedFiles = Collections.unmodifiableCollection(files); + } + + public Collection getUploadedFiles() { + return uploadedFiles; + } + + @Override + public void close() throws IOException { + for (Path file : uploadedFiles) { + try { + Files.delete(file); + } catch (FileNotFoundException ignored) { + // file may have been moved by a handler + } + } + for (Path directory : directoriesToClean) { + Files.walkFileTree(directory, CleanupFileVisitor.get()); + } + } + + private static final class FileAdderVisitor extends SimpleFileVisitor { + + private final Collection files = new ArrayList<>(4); + + Collection get() { + return files; + } + + FileAdderVisitor() { + } + + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + FileVisitResult result = super.visitFile(file, attrs); + files.add(file); + return result; + } + }
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517483#comment-16517483 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196455211 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java --- @@ -103,77 +104,74 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe return; } - ByteBuf msgContent = ((FullHttpRequest) httpRequest).content(); + final ByteBuf msgContent; + Optional multipartJsonPayload = FileUploadHandler.getMultipartJsonPayload(ctx); + if (multipartJsonPayload.isPresent()) { + msgContent = Unpooled.wrappedBuffer(multipartJsonPayload.get()); + } else { + msgContent = ((FullHttpRequest) httpRequest).content(); + } - R request; - if (isFileUpload()) { - final Path path = ctx.channel().attr(FileUploadHandler.UPLOADED_FILE).get(); - if (path == null) { - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody("Client did not upload a file."), - HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; - } - //noinspection unchecked - request = (R) new FileUpload(path); - } else if (msgContent.capacity() == 0) { - try { - request = MAPPER.readValue("{}", untypedResponseMessageHeaders.getRequestClass()); - } catch (JsonParseException | JsonMappingException je) { - log.error("Request did not conform to expected format.", je); - HandlerUtils.sendErrorResponse( - ctx, - httpRequest, - new ErrorResponseBody("Bad request received."), - HttpResponseStatus.BAD_REQUEST, - responseHeaders); - return; + try (FileUploads uploadedFiles = FileUploadHandler.getMultipartFileUploads(ctx)) { --- End diff -- I would obtain the upload directory from `FileUploadHandler` and simply delete this directory after the call has been processed. We could, then also create `FileUploads` outside of the `FileUploadHandler` to instantiate a `HandlerRequest` with it. This would also simplify the `FileUploads` class significantly, because it is no longer responsible for deleting the files. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517487#comment-16517487 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r19670 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.util.Preconditions; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +/** + * A container for uploaded files. + * + * Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up. + * For convenience during testing it also accepts files directly. + */ +public final class FileUploads implements AutoCloseable { + private final Collection directoriesToClean; + private final Collection uploadedFiles; + + @SuppressWarnings("resource") + public static final FileUploads EMPTY = new FileUploads(); + + private FileUploads() { + this.directoriesToClean = Collections.emptyList(); + this.uploadedFiles = Collections.emptyList(); + } + + public FileUploads(Collection uploadedFilesOrDirectory) throws IOException { + final Collection files = new ArrayList<>(4); + final Collection directories = new ArrayList<>(1); + for (Path fileOrDirectory : uploadedFilesOrDirectory) { + Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute."); + if (Files.isDirectory(fileOrDirectory)) { + directories.add(fileOrDirectory); + FileAdderVisitor visitor = new FileAdderVisitor(); + Files.walkFileTree(fileOrDirectory, visitor); + files.addAll(visitor.get()); + } else { + files.add(fileOrDirectory); + } + } + directoriesToClean = Collections.unmodifiableCollection(directories); + uploadedFiles = Collections.unmodifiableCollection(files); + } + + public Collection getUploadedFiles() { + return uploadedFiles; + } + + @Override + public void close() throws IOException { + for (Path file : uploadedFiles) { + try { + Files.delete(file); + } catch (FileNotFoundException ignored) { + // file may have been moved by a handler + } + } + for (Path directory : directoriesToClean) { + Files.walkFileTree(directory, CleanupFileVisitor.get()); + } + } + + private static final class FileAdderVisitor extends SimpleFileVisitor { + + private final Collection files = new ArrayList<>(4); + + Collection get() { --- End diff -- maybe more descriptive name than `get`. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee:
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517491#comment-16517491 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196558949 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java --- @@ -63,4 +63,13 @@ * @return description for the header */ String getDescription(); + + /** +* Returns whether this header allows file uploads. +* +* @return whether this header allows file uploads +*/ + default boolean acceptsFileUploads() { + return false; + } --- End diff -- Should this maybe go into `UntypedResponseMessageHeaders`? At the moment one can upload files for a `AbstractHandler` (e.g. `AbstractTaskManagerFileHandler`) implementation and also has access to it via the `HandlerRequest` without being able to specify whether file upload is allowed or not. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517488#comment-16517488 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196559335 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.FileUploads; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly + * handled. + */ +public class FileUploadHandlerTest { + + private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper(); + private static final Random RANDOM = new Random(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static RestServerEndpoint serverEndpoint; + private static String serverAddress; + + private static MultipartMixedHandler mixedHandler; + private static
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517485#comment-16517485 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196554025 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java --- @@ -103,77 +104,74 @@ protected void respondAsLeader(ChannelHandlerContext ctx, RoutedRequest routedRe return; } - ByteBuf msgContent = ((FullHttpRequest) httpRequest).content(); + final ByteBuf msgContent; + Optional multipartJsonPayload = FileUploadHandler.getMultipartJsonPayload(ctx); + if (multipartJsonPayload.isPresent()) { + msgContent = Unpooled.wrappedBuffer(multipartJsonPayload.get()); --- End diff -- Let's send the Json payload as a proper `HttpRequest`, then we don't have this special casing here. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517484#comment-16517484 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196453980 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java --- @@ -129,4 +137,9 @@ public R getRequestBody() { return queryParameter.getValue(); } } + + @Nonnull + public FileUploads getFileUploads() { + return uploadedFiles; + } --- End diff -- I would not expose `FileUploads` to the user but rather return a `Collection`. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517493#comment-16517493 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196560163 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.FileUploads; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly + * handled. + */ +public class FileUploadHandlerTest { + + private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper(); + private static final Random RANDOM = new Random(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static RestServerEndpoint serverEndpoint; + private static String serverAddress; + + private static MultipartMixedHandler mixedHandler; + private static
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517482#comment-16517482 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196453584 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.util.Preconditions; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +/** + * A container for uploaded files. + * + * Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up. + * For convenience during testing it also accepts files directly. + */ +public final class FileUploads implements AutoCloseable { + private final Collection directoriesToClean; + private final Collection uploadedFiles; + + @SuppressWarnings("resource") + public static final FileUploads EMPTY = new FileUploads(); + + private FileUploads() { + this.directoriesToClean = Collections.emptyList(); + this.uploadedFiles = Collections.emptyList(); + } + + public FileUploads(Collection uploadedFilesOrDirectory) throws IOException { + final Collection files = new ArrayList<>(4); + final Collection directories = new ArrayList<>(1); + for (Path fileOrDirectory : uploadedFilesOrDirectory) { + Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute."); + if (Files.isDirectory(fileOrDirectory)) { + directories.add(fileOrDirectory); + FileAdderVisitor visitor = new FileAdderVisitor(); + Files.walkFileTree(fileOrDirectory, visitor); + files.addAll(visitor.get()); + } else { + files.add(fileOrDirectory); + } + } + directoriesToClean = Collections.unmodifiableCollection(directories); + uploadedFiles = Collections.unmodifiableCollection(files); --- End diff -- Let's move this logic out of `FileUploads` and simply initialize it with a `Collection`. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16517476#comment-16517476 ] ASF GitHub Bot commented on FLINK-9599: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196452024 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -95,14 +107,22 @@ protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject ms final DiskFileUpload fileUpload = (DiskFileUpload) data; checkState(fileUpload.isCompleted()); - final Path dest = uploadDir.resolve(Paths.get(UUID.randomUUID() + - "_" + fileUpload.getFilename())); + final Path dest = currentUploadDir.resolve(fileUpload.getFilename()); fileUpload.renameTo(dest.toFile()); - ctx.channel().attr(UPLOADED_FILE).set(dest); + } else if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) { + final Attribute request = (Attribute) data; + // this could also be implemented by using the first found Attribute as the payload + if (data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) { + currentJsonPayload = request.get(); + } else { + LOG.warn("Received unknown attribute {}, will be ignored.", data.getName()); + } } } if (httpContent instanceof LastHttpContent) { + ctx.channel().attr(UPLOADED_FILES).set(new FileUploads(Collections.singleton(currentUploadDir))); + ctx.channel().attr(UPLOADED_JSON).set(currentJsonPayload); --- End diff -- I think it would be better to not store the JSON payload as an `Attribute` but instead forward it via `httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload))`. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16515915#comment-16515915 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196129193 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.util.Preconditions; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +/** + * A container for uploaded files. + * + * Implementation note: The constructor also accepts directories to ensure that the upload directories are cleaned up. + * For convenience during testing it also accepts files directly. + */ +public final class FileUploads implements AutoCloseable { + private final Collection directoriesToClean; + private final Collection uploadedFiles; + + @SuppressWarnings("resource") + public static final FileUploads EMPTY = new FileUploads(); + + private FileUploads() { + this.directoriesToClean = Collections.emptyList(); + this.uploadedFiles = Collections.emptyList(); + } + + public FileUploads(Collection uploadedFilesOrDirectory) throws IOException { + final Collection files = new ArrayList<>(4); + final Collection directories = new ArrayList<>(1); + for (Path fileOrDirectory : uploadedFilesOrDirectory) { + Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be absolute."); + if (Files.isDirectory(fileOrDirectory)) { + directories.add(fileOrDirectory); + FileAdderVisitor visitor = new FileAdderVisitor(); --- End diff -- This is probably more complex than really necessary; there is no use-case for a _nested_ directory structure. > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16515744#comment-16515744 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196090706 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -52,7 +57,10 @@ --- End diff -- should maybe rename the class to `MultipartRequestHandler` > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16515713#comment-16515713 ] ASF GitHub Bot commented on FLINK-9599: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196081080 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.FileUploads; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly + * handled. + */ +public class FileUploadHandlerTest { --- End diff -- needs a test for * multiple files * name of uploaded file should be what is specified in the request > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >
[jira] [Commented] (FLINK-9599) Implement generic mechanism to receive files via rest
[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16515689#comment-16515689 ] ASF GitHub Bot commented on FLINK-9599: --- GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6178 [FLINK-9599][rest] Implement generic mechanism to access uploaded files ## What is the purpose of the change This PR extends the existing multipart handling to also support mixed multipart message (i.e. requests containing both JSON and files), and generalizes the `FileUpload` handling to provide access to all handlers extending `AbstractHandler` Handlers may access uploaded files via `HandlerRequest#getFileUploads`. File uploads must be explicitly allowed by returning true in `MessageHeaders#acceptsFileUploads`. The files and JSON payload are forwarded to the handler by the `FileUploadHandler` via channel attributes. If a JSON payload is forwarded this way a handler will ignore the content of the received `HttpRequest`. This PR only covers the server-side; the `RestClient` remains unchanged. This will be done in a follow-up. ## Brief change log * add `MessageHeaders#acceptsFileUploads` to signal that a handler accepts file uploads (default=false) * add `FileUploads` class as a container for uploaded files * extend `FileUploadHandler` to * accept multiple files in one request * also accept a JSON payload * forward both files and json via channel attributes * extend `AbstractHandler` to retrieve files/json from channel attributes * remove special case for `JarRunHandler` in `AbstractHandler` * extend `HandlerRequest` to accept a `FileUploads` object * add `OkHttp` dependency to `flink-runtime` for testing purposes * update `JarUploadHandler/Headers` ## Verifying this change added tests: * FileUploadsTest * FileUploadHandlerTest * manually verified via WebUI job submission ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 9280_beta Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6178.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6178 commit c2adb3880180d8426697c33035d2b31d57d93952 Author: zentol Date: 2018-06-18T08:54:42Z [FLINK-9599][rest] Implement generic mechanism to access uploaded files > Implement generic mechanism to receive files via rest > - > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)