[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16365367#comment-16365367 ] ASF GitHub Bot commented on FLINK-7713: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5442 > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364294#comment-16364294 ] ASF GitHub Bot commented on FLINK-7713: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r168212886 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java --- @@ -103,6 +126,9 @@ public static RestServerEndpointConfiguration fromConfiguration(Configuration co } } - return new RestServerEndpointConfiguration(address, port, sslEngine); + final Path uploadDir = Paths.get(config.getString(WebOptions.UPLOAD_DIR, + config.getString(WebOptions.TMP_DIR))); --- End diff -- I think we should create a unique sub folder in order to avoid clashes between concurrent processes on the same machine. Will do this when merging this PR. > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362650#comment-16362650 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167930373 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers.ng; --- End diff -- fixed > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362648#comment-16362648 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167930317 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java --- @@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor( } } + @SuppressWarnings({"unchecked", "rawtypes"}) + public static List> tryLoadJarUploadHandler( + GatewayRetriever leaderRetriever, + CompletableFuture restAddressFuture, + Time timeout, + java.nio.file.Path uploadDir, + Executor executor) { + + // 1. Check if flink-runtime-web is in the classpath + try { + final String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"; --- End diff -- fixed > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362646#comment-16362646 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167930279 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java --- @@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor( } } + @SuppressWarnings({"unchecked", "rawtypes"}) + public static List> tryLoadJarUploadHandler( --- End diff -- method is tested now > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362651#comment-16362651 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167930401 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java --- @@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor( } } + @SuppressWarnings({"unchecked", "rawtypes"}) + public static List> tryLoadJarUploadHandler( + GatewayRetriever leaderRetriever, + CompletableFuture restAddressFuture, + Time timeout, + java.nio.file.Path uploadDir, + Executor executor) { + + // 1. Check if flink-runtime-web is in the classpath + try { + final String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"; + Class.forName(classname).asSubclass(WebMonitor.class); + } catch (ClassNotFoundException e) { + // class not found means that there is no flink-runtime-web in the classpath + return Collections.emptyList(); + } + + try { + final String classname = "org.apache.flink.runtime.webmonitor.handlers.ng.JarUploadHandler"; + final Class clazz = Class.forName(classname); + final Constructor constructor = clazz.getConstructor( + CompletableFuture.class, + GatewayRetriever.class, + Time.class, + Map.class, + MessageHeaders.class, + java.nio.file.Path.class, + Executor.class); + + final MessageHeaders jarUploadMessageHeaders = + (MessageHeaders) Class + .forName("org.apache.flink.runtime.webmonitor.handlers.ng.JarUploadMessageHeaders") + .newInstance(); + + return Arrays.asList(Tuple2.of(jarUploadMessageHeaders, (ChannelInboundHandler) constructor.newInstance( + restAddressFuture, + leaderRetriever, + timeout, + Collections.emptyMap(), --- End diff -- fixed > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362654#comment-16362654 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167930470 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers.ng; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.FileUpload; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +import static java.util.Objects.requireNonNull; + +/** + * Handles .jar file uploads. + */ +public class JarUploadHandler extends --- End diff -- added test > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362652#comment-16362652 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167930421 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java --- @@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor( } } + @SuppressWarnings({"unchecked", "rawtypes"}) + public static List> tryLoadJarUploadHandler( --- End diff -- fixed > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362653#comment-16362653 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167930454 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadResponseBody.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers.ng; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import static java.util.Objects.requireNonNull; + +/** + * {@link ResponseBody} for {@link JarUploadHandler}. + */ +public class JarUploadResponseBody implements ResponseBody { --- End diff -- added test > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362649#comment-16362649 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167930349 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java --- @@ -100,6 +107,16 @@ public DispatcherRestEndpoint( timeout, responseHeaders); + final List> optJarHandlers = --- End diff -- fixed > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362642#comment-16362642 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167929949 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.InterfaceHttpData; +import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.UUID; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Writes multipart/form-data to disk. Delegates all other requests to the next + * {@link ChannelInboundHandler} in the {@link ChannelPipeline}. + */ +public class FileUploadHandler extends SimpleChannelInboundHandler { + + private static final Logger LOG = LoggerFactory.getLogger(FileUploadHandler.class); + + static final AttributeKey UPLOADED_FILE = AttributeKey.valueOf("UPLOADED_FILE"); + + private static final HttpDataFactory DATA_FACTORY = new DefaultHttpDataFactory(true); + + private final Path uploadDir; + + private HttpPostRequestDecoder currentHttpPostRequestDecoder; + + private HttpRequest currentHttpRequest; + + public FileUploadHandler(final Path uploadDir) { + super(false); + + this.uploadDir = requireNonNull(uploadDir); + DiskFileUpload.baseDirectory = uploadDir.toAbsolutePath().toString(); + } + + @Override + protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) throws Exception { + if (msg instanceof HttpRequest) { + final HttpRequest httpRequest = (HttpRequest) msg; + if (httpRequest.getMethod().equals(HttpMethod.POST)) { + final HttpPostRequestDecoder httpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest); --- End diff -- changed it > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination,
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362644#comment-16362644 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167930099 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java --- @@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor( } } + @SuppressWarnings({"unchecked", "rawtypes"}) + public static List> tryLoadJarUploadHandler( + GatewayRetriever leaderRetriever, + CompletableFuture restAddressFuture, + Time timeout, + java.nio.file.Path uploadDir, + Executor executor) { + + // 1. Check if flink-runtime-web is in the classpath + try { + final String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"; + Class.forName(classname).asSubclass(WebMonitor.class); + } catch (ClassNotFoundException e) { + // class not found means that there is no flink-runtime-web in the classpath + return Collections.emptyList(); + } --- End diff -- extracted into a method > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362643#comment-16362643 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167930021 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.InterfaceHttpData; +import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.UUID; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Writes multipart/form-data to disk. Delegates all other requests to the next + * {@link ChannelInboundHandler} in the {@link ChannelPipeline}. + */ +public class FileUploadHandler extends SimpleChannelInboundHandler { + + private static final Logger LOG = LoggerFactory.getLogger(FileUploadHandler.class); + + static final AttributeKey UPLOADED_FILE = AttributeKey.valueOf("UPLOADED_FILE"); + + private static final HttpDataFactory DATA_FACTORY = new DefaultHttpDataFactory(true); + + private final Path uploadDir; + + private HttpPostRequestDecoder currentHttpPostRequestDecoder; + + private HttpRequest currentHttpRequest; + + public FileUploadHandler(final Path uploadDir) { + super(false); + + this.uploadDir = requireNonNull(uploadDir); + DiskFileUpload.baseDirectory = uploadDir.toAbsolutePath().toString(); + } + + @Override + protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) throws Exception { + if (msg instanceof HttpRequest) { + final HttpRequest httpRequest = (HttpRequest) msg; + if (httpRequest.getMethod().equals(HttpMethod.POST)) { + final HttpPostRequestDecoder httpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest); + if (httpPostRequestDecoder.isMultipart()) { + currentHttpPostRequestDecoder = httpPostRequestDecoder; + currentHttpRequest = httpRequest; + } else { + ctx.fireChannelRead(m
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362640#comment-16362640 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167929912 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.InterfaceHttpData; +import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.UUID; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Writes multipart/form-data to disk. Delegates all other requests to the next + * {@link ChannelInboundHandler} in the {@link ChannelPipeline}. + */ +public class FileUploadHandler extends SimpleChannelInboundHandler { + + private static final Logger LOG = LoggerFactory.getLogger(FileUploadHandler.class); + + static final AttributeKey UPLOADED_FILE = AttributeKey.valueOf("UPLOADED_FILE"); + + private static final HttpDataFactory DATA_FACTORY = new DefaultHttpDataFactory(true); + + private final Path uploadDir; + + private HttpPostRequestDecoder currentHttpPostRequestDecoder; + + private HttpRequest currentHttpRequest; + + public FileUploadHandler(final Path uploadDir) { + super(false); + + this.uploadDir = requireNonNull(uploadDir); + DiskFileUpload.baseDirectory = uploadDir.toAbsolutePath().toString(); --- End diff -- yes > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362645#comment-16362645 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167930174 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java --- @@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor( } } + @SuppressWarnings({"unchecked", "rawtypes"}) + public static List> tryLoadJarUploadHandler( --- End diff -- both comments fixed > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362639#comment-16362639 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167929864 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers.ng; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.FileUpload; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +import static java.util.Objects.requireNonNull; + +/** + * Handles .jar file uploads. + */ +public class JarUploadHandler extends + AbstractRestHandler { + + private final Path jarDir; + + private final Executor executor; + + public JarUploadHandler( + final CompletableFuture localRestAddress, + final GatewayRetriever leaderRetriever, + final Time timeout, + final Map responseHeaders, + final MessageHeaders messageHeaders, + final Path jarDir, + final Executor executor) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders); + this.jarDir = requireNonNull(jarDir); + this.executor = requireNonNull(executor); + } + + @Override + protected CompletableFuture handleRequest( + @Nonnull final HandlerRequest request, + @Nonnull final RestfulGateway gateway) throws RestHandlerException { + + final FileUpload fileUpload = request.getRequestBody(); + return CompletableFuture.supplyAsync(() -> { + if (!fileUpload.getPath().getFileName().toString().endsWith(".jar")) { + deleteUploadedFile(fileUpload); + throw new CompletionException(new RestHandlerException( + "Only Jar files are allowed.", + HttpResponseStatus.BAD_REQUEST)); + } else { + try { + Files.move(fileUpload.getPath(), jarDir.resolve(fileUpload.getPath().getFileName())); + } catch (IOException e) { --- End diff -- yes > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > >
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360691#comment-16360691 ] ASF GitHub Bot commented on FLINK-7713: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167542881 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers.ng; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.FileUpload; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +import static java.util.Objects.requireNonNull; + +/** + * Handles .jar file uploads. + */ +public class JarUploadHandler extends + AbstractRestHandler { + + private final Path jarDir; + + private final Executor executor; + + public JarUploadHandler( + final CompletableFuture localRestAddress, + final GatewayRetriever leaderRetriever, + final Time timeout, + final Map responseHeaders, + final MessageHeaders messageHeaders, + final Path jarDir, + final Executor executor) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders); + this.jarDir = requireNonNull(jarDir); + this.executor = requireNonNull(executor); + } + + @Override + protected CompletableFuture handleRequest( + @Nonnull final HandlerRequest request, + @Nonnull final RestfulGateway gateway) throws RestHandlerException { + + final FileUpload fileUpload = request.getRequestBody(); + return CompletableFuture.supplyAsync(() -> { + if (!fileUpload.getPath().getFileName().toString().endsWith(".jar")) { + deleteUploadedFile(fileUpload); + throw new CompletionException(new RestHandlerException( + "Only Jar files are allowed.", + HttpResponseStatus.BAD_REQUEST)); + } else { + try { + Files.move(fileUpload.getPath(), jarDir.resolve(fileUpload.getPath().getFileName())); --- End diff -- please guard the `jarDir` access as done in 8fdea6093a55c33732ae869b82552371b8142c2a. I suppose you'll have to create new utility methods outside the `WebRuntimeMonitor`. > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360614#comment-16360614 ] ASF GitHub Bot commented on FLINK-7713: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167525118 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java --- @@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor( } } + @SuppressWarnings({"unchecked", "rawtypes"}) + public static List> tryLoadJarUploadHandler( --- End diff -- Could return a `Collection` instead of `List` > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360612#comment-16360612 ] ASF GitHub Bot commented on FLINK-7713: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167522151 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.InterfaceHttpData; +import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.UUID; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Writes multipart/form-data to disk. Delegates all other requests to the next + * {@link ChannelInboundHandler} in the {@link ChannelPipeline}. + */ +public class FileUploadHandler extends SimpleChannelInboundHandler { + + private static final Logger LOG = LoggerFactory.getLogger(FileUploadHandler.class); + + static final AttributeKey UPLOADED_FILE = AttributeKey.valueOf("UPLOADED_FILE"); + + private static final HttpDataFactory DATA_FACTORY = new DefaultHttpDataFactory(true); + + private final Path uploadDir; + + private HttpPostRequestDecoder currentHttpPostRequestDecoder; + + private HttpRequest currentHttpRequest; + + public FileUploadHandler(final Path uploadDir) { + super(false); + + this.uploadDir = requireNonNull(uploadDir); + DiskFileUpload.baseDirectory = uploadDir.toAbsolutePath().toString(); --- End diff -- This is a bit ugly how Netty did it here... > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360615#comment-16360615 ] ASF GitHub Bot commented on FLINK-7713: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167523546 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.InterfaceHttpData; +import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.UUID; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Writes multipart/form-data to disk. Delegates all other requests to the next + * {@link ChannelInboundHandler} in the {@link ChannelPipeline}. + */ +public class FileUploadHandler extends SimpleChannelInboundHandler { + + private static final Logger LOG = LoggerFactory.getLogger(FileUploadHandler.class); + + static final AttributeKey UPLOADED_FILE = AttributeKey.valueOf("UPLOADED_FILE"); + + private static final HttpDataFactory DATA_FACTORY = new DefaultHttpDataFactory(true); + + private final Path uploadDir; + + private HttpPostRequestDecoder currentHttpPostRequestDecoder; + + private HttpRequest currentHttpRequest; + + public FileUploadHandler(final Path uploadDir) { + super(false); + + this.uploadDir = requireNonNull(uploadDir); + DiskFileUpload.baseDirectory = uploadDir.toAbsolutePath().toString(); + } + + @Override + protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) throws Exception { + if (msg instanceof HttpRequest) { + final HttpRequest httpRequest = (HttpRequest) msg; + if (httpRequest.getMethod().equals(HttpMethod.POST)) { + final HttpPostRequestDecoder httpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest); + if (httpPostRequestDecoder.isMultipart()) { + currentHttpPostRequestDecoder = httpPostRequestDecoder; + currentHttpRequest = httpRequest; + } else { + ctx.fireChan
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360613#comment-16360613 ] ASF GitHub Bot commented on FLINK-7713: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167525322 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java --- @@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor( } } + @SuppressWarnings({"unchecked", "rawtypes"}) + public static List> tryLoadJarUploadHandler( + GatewayRetriever leaderRetriever, + CompletableFuture restAddressFuture, + Time timeout, + java.nio.file.Path uploadDir, + Executor executor) { + + // 1. Check if flink-runtime-web is in the classpath + try { + final String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"; + Class.forName(classname).asSubclass(WebMonitor.class); + } catch (ClassNotFoundException e) { + // class not found means that there is no flink-runtime-web in the classpath + return Collections.emptyList(); + } --- End diff -- This could be a separate method since we use this in `tryLoadWebContent` as well. > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360616#comment-16360616 ] ASF GitHub Bot commented on FLINK-7713: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167520629 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers.ng; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.FileUpload; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +import static java.util.Objects.requireNonNull; + +/** + * Handles .jar file uploads. + */ +public class JarUploadHandler extends + AbstractRestHandler { + + private final Path jarDir; + + private final Executor executor; + + public JarUploadHandler( + final CompletableFuture localRestAddress, + final GatewayRetriever leaderRetriever, + final Time timeout, + final Map responseHeaders, + final MessageHeaders messageHeaders, + final Path jarDir, + final Executor executor) { + super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders); + this.jarDir = requireNonNull(jarDir); + this.executor = requireNonNull(executor); + } + + @Override + protected CompletableFuture handleRequest( + @Nonnull final HandlerRequest request, + @Nonnull final RestfulGateway gateway) throws RestHandlerException { + + final FileUpload fileUpload = request.getRequestBody(); + return CompletableFuture.supplyAsync(() -> { + if (!fileUpload.getPath().getFileName().toString().endsWith(".jar")) { + deleteUploadedFile(fileUpload); + throw new CompletionException(new RestHandlerException( + "Only Jar files are allowed.", + HttpResponseStatus.BAD_REQUEST)); + } else { + try { + Files.move(fileUpload.getPath(), jarDir.resolve(fileUpload.getPath().getFileName())); + } catch (IOException e) { --- End diff -- Should we try to delete the uploaded file in case of this error? > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360617#comment-16360617 ] ASF GitHub Bot commented on FLINK-7713: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167523095 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline; +import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.InterfaceHttpData; +import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.UUID; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * Writes multipart/form-data to disk. Delegates all other requests to the next + * {@link ChannelInboundHandler} in the {@link ChannelPipeline}. + */ +public class FileUploadHandler extends SimpleChannelInboundHandler { + + private static final Logger LOG = LoggerFactory.getLogger(FileUploadHandler.class); + + static final AttributeKey UPLOADED_FILE = AttributeKey.valueOf("UPLOADED_FILE"); + + private static final HttpDataFactory DATA_FACTORY = new DefaultHttpDataFactory(true); + + private final Path uploadDir; + + private HttpPostRequestDecoder currentHttpPostRequestDecoder; + + private HttpRequest currentHttpRequest; + + public FileUploadHandler(final Path uploadDir) { + super(false); + + this.uploadDir = requireNonNull(uploadDir); + DiskFileUpload.baseDirectory = uploadDir.toAbsolutePath().toString(); + } + + @Override + protected void channelRead0(final ChannelHandlerContext ctx, final HttpObject msg) throws Exception { + if (msg instanceof HttpRequest) { + final HttpRequest httpRequest = (HttpRequest) msg; + if (httpRequest.getMethod().equals(HttpMethod.POST)) { + final HttpPostRequestDecoder httpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest); --- End diff -- What happens with this decoder, if the request is not a multi part request? Wouldn't it be better to use `HttpPostRequestDecoder#isMultipart(HttpRequest request)`? > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 >
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16359894#comment-16359894 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167430276 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java --- @@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor( } } + @SuppressWarnings({"unchecked", "rawtypes"}) + public static List> tryLoadJarUploadHandler( --- End diff -- Should be `T extends DispatcherGateway` > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16359888#comment-16359888 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167429820 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java --- @@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor( } } + @SuppressWarnings({"unchecked", "rawtypes"}) + public static List> tryLoadJarUploadHandler( --- End diff -- tests are missing > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16359886#comment-16359886 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167429671 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java --- @@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor( } } + @SuppressWarnings({"unchecked", "rawtypes"}) + public static List> tryLoadJarUploadHandler( + GatewayRetriever leaderRetriever, + CompletableFuture restAddressFuture, + Time timeout, + java.nio.file.Path uploadDir, + Executor executor) { + + // 1. Check if flink-runtime-web is in the classpath + try { + final String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"; --- End diff -- Should be a constant. > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16359885#comment-16359885 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167429608 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java --- @@ -100,6 +107,16 @@ public DispatcherRestEndpoint( timeout, responseHeaders); + final List> optJarHandlers = --- End diff -- Should also check `WebOptions.SUBMIT_ENABLE`. > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358506#comment-16358506 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167250379 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers.ng; --- End diff -- The legacy handlers could be moved to a `legacy` package. > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358504#comment-16358504 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167249950 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java --- @@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor( } } + @SuppressWarnings({"unchecked", "rawtypes"}) + public static List> tryLoadJarUploadHandler( + GatewayRetriever leaderRetriever, + CompletableFuture restAddressFuture, + Time timeout, + java.nio.file.Path uploadDir, + Executor executor) { + + // 1. Check if flink-runtime-web is in the classpath + try { + final String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"; + Class.forName(classname).asSubclass(WebMonitor.class); + } catch (ClassNotFoundException e) { + // class not found means that there is no flink-runtime-web in the classpath + return Collections.emptyList(); + } + + try { + final String classname = "org.apache.flink.runtime.webmonitor.handlers.ng.JarUploadHandler"; + final Class clazz = Class.forName(classname); + final Constructor constructor = clazz.getConstructor( + CompletableFuture.class, + GatewayRetriever.class, + Time.class, + Map.class, + MessageHeaders.class, + java.nio.file.Path.class, + Executor.class); + + final MessageHeaders jarUploadMessageHeaders = + (MessageHeaders) Class + .forName("org.apache.flink.runtime.webmonitor.handlers.ng.JarUploadMessageHeaders") + .newInstance(); + + return Arrays.asList(Tuple2.of(jarUploadMessageHeaders, (ChannelInboundHandler) constructor.newInstance( + restAddressFuture, + leaderRetriever, + timeout, + Collections.emptyMap(), --- End diff -- should use headers defined by `restConfiguration.getResponseHeaders()` > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358503#comment-16358503 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167249729 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java --- @@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor( } } + @SuppressWarnings({"unchecked", "rawtypes"}) + public static List> tryLoadJarUploadHandler( --- End diff -- Method name is not accurate. There will be more handlers. > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358499#comment-16358499 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167249285 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers.ng; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.FileUpload; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; + +import static java.util.Objects.requireNonNull; + +/** + * Handles .jar file uploads. + */ +public class JarUploadHandler extends --- End diff -- Tests missing. > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358500#comment-16358500 ] ASF GitHub Bot commented on FLINK-7713: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5442#discussion_r167249371 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadResponseBody.java --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers.ng; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import static java.util.Objects.requireNonNull; + +/** + * {@link ResponseBody} for {@link JarUploadHandler}. + */ +public class JarUploadResponseBody implements ResponseBody { --- End diff -- Marshalling test missing. > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16358498#comment-16358498 ] ASF GitHub Bot commented on FLINK-7713: --- GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5442 [FLINK-7713][flip6] Implement JarUploadHandler ## What is the purpose of the change *Allow uploading jars through HTTP to enable job submissions from the web ui.* cc: @tillrohrmann ## Brief change log - *Allow uploading jars through HTTP.* - *Implement and register JarUploadHandler.* ## Verifying this change This change added tests and can be verified as follows: - *Added integration tests for uploading files.* - *Started cluster locally and uploaded jars using* ``` curl -v -X POST -H "Expect:" -F "jarfile=@examples/streaming/Kafka010Example.jar" http://127.0.0.1:9065/jars/upload ``` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-7713 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5442.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5442 commit caf74f63db6047a79393e53e9434eb1cf078ee48 Author: gyao Date: 2018-02-09T14:46:22Z [FLINK-7713][flip6] Implement JarUploadHandler > Port JarUploadHandler to new REST endpoint > -- > > Key: FLINK-7713 > URL: https://issues.apache.org/jira/browse/FLINK-7713 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Gary Yao >Priority: Major > Labels: flip-6 > Fix For: 1.5.0 > > > Port {{JarUploadHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v7.6.3#76005)