[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365367#comment-16365367
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5442


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364294#comment-16364294
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r168212886
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpointConfiguration.java
 ---
@@ -103,6 +126,9 @@ public static RestServerEndpointConfiguration 
fromConfiguration(Configuration co
}
}
 
-   return new RestServerEndpointConfiguration(address, port, 
sslEngine);
+   final Path uploadDir = 
Paths.get(config.getString(WebOptions.UPLOAD_DIR,
+   config.getString(WebOptions.TMP_DIR)));
--- End diff --

I think we should create a unique sub folder in order to avoid clashes 
between concurrent processes on the same machine. Will do this when merging 
this PR.


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362650#comment-16362650
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167930373
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
--- End diff --

fixed


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362648#comment-16362648
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167930317
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
+   GatewayRetriever leaderRetriever,
+   CompletableFuture restAddressFuture,
+   Time timeout,
+   java.nio.file.Path uploadDir,
+   Executor executor) {
+
+   // 1. Check if flink-runtime-web is in the classpath
+   try {
+   final String classname = 
"org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
--- End diff --

fixed


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362646#comment-16362646
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167930279
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
--- End diff --

method is tested now


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362651#comment-16362651
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167930401
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
+   GatewayRetriever leaderRetriever,
+   CompletableFuture restAddressFuture,
+   Time timeout,
+   java.nio.file.Path uploadDir,
+   Executor executor) {
+
+   // 1. Check if flink-runtime-web is in the classpath
+   try {
+   final String classname = 
"org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
+   Class.forName(classname).asSubclass(WebMonitor.class);
+   } catch (ClassNotFoundException e) {
+   // class not found means that there is no 
flink-runtime-web in the classpath
+   return Collections.emptyList();
+   }
+
+   try {
+   final String classname = 
"org.apache.flink.runtime.webmonitor.handlers.ng.JarUploadHandler";
+   final Class clazz = Class.forName(classname);
+   final Constructor constructor = clazz.getConstructor(
+   CompletableFuture.class,
+   GatewayRetriever.class,
+   Time.class,
+   Map.class,
+   MessageHeaders.class,
+   java.nio.file.Path.class,
+   Executor.class);
+
+   final MessageHeaders jarUploadMessageHeaders =
+   (MessageHeaders) Class
+   
.forName("org.apache.flink.runtime.webmonitor.handlers.ng.JarUploadMessageHeaders")
+   .newInstance();
+
+   return Arrays.asList(Tuple2.of(jarUploadMessageHeaders, 
(ChannelInboundHandler) constructor.newInstance(
+   restAddressFuture,
+   leaderRetriever,
+   timeout,
+   Collections.emptyMap(),
--- End diff --

fixed


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362654#comment-16362654
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167930470
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.FileUpload;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Handles .jar file uploads.
+ */
+public class JarUploadHandler extends
--- End diff --

added test


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362652#comment-16362652
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167930421
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
--- End diff --

fixed


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362653#comment-16362653
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167930454
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadResponseBody.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link ResponseBody} for {@link JarUploadHandler}.
+ */
+public class JarUploadResponseBody implements ResponseBody {
--- End diff --

added test


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362649#comment-16362649
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167930349
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 ---
@@ -100,6 +107,16 @@ public DispatcherRestEndpoint(
timeout,
responseHeaders);
 
+   final List> optJarHandlers =
--- End diff --

fixed


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362642#comment-16362642
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167929949
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.InterfaceHttpData;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writes multipart/form-data to disk. Delegates all other requests to the 
next
+ * {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
+ */
+public class FileUploadHandler extends 
SimpleChannelInboundHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileUploadHandler.class);
+
+   static final AttributeKey UPLOADED_FILE = 
AttributeKey.valueOf("UPLOADED_FILE");
+
+   private static final HttpDataFactory DATA_FACTORY = new 
DefaultHttpDataFactory(true);
+
+   private final Path uploadDir;
+
+   private HttpPostRequestDecoder currentHttpPostRequestDecoder;
+
+   private HttpRequest currentHttpRequest;
+
+   public FileUploadHandler(final Path uploadDir) {
+   super(false);
+
+   this.uploadDir = requireNonNull(uploadDir);
+   DiskFileUpload.baseDirectory = 
uploadDir.toAbsolutePath().toString();
+   }
+
+   @Override
+   protected void channelRead0(final ChannelHandlerContext ctx, final 
HttpObject msg) throws Exception {
+   if (msg instanceof HttpRequest) {
+   final HttpRequest httpRequest = (HttpRequest) msg;
+   if (httpRequest.getMethod().equals(HttpMethod.POST)) {
+   final HttpPostRequestDecoder 
httpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
--- End diff --

changed it


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, 

[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362644#comment-16362644
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167930099
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
+   GatewayRetriever leaderRetriever,
+   CompletableFuture restAddressFuture,
+   Time timeout,
+   java.nio.file.Path uploadDir,
+   Executor executor) {
+
+   // 1. Check if flink-runtime-web is in the classpath
+   try {
+   final String classname = 
"org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
+   Class.forName(classname).asSubclass(WebMonitor.class);
+   } catch (ClassNotFoundException e) {
+   // class not found means that there is no 
flink-runtime-web in the classpath
+   return Collections.emptyList();
+   }
--- End diff --

extracted into a method


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362643#comment-16362643
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167930021
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.InterfaceHttpData;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writes multipart/form-data to disk. Delegates all other requests to the 
next
+ * {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
+ */
+public class FileUploadHandler extends 
SimpleChannelInboundHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileUploadHandler.class);
+
+   static final AttributeKey UPLOADED_FILE = 
AttributeKey.valueOf("UPLOADED_FILE");
+
+   private static final HttpDataFactory DATA_FACTORY = new 
DefaultHttpDataFactory(true);
+
+   private final Path uploadDir;
+
+   private HttpPostRequestDecoder currentHttpPostRequestDecoder;
+
+   private HttpRequest currentHttpRequest;
+
+   public FileUploadHandler(final Path uploadDir) {
+   super(false);
+
+   this.uploadDir = requireNonNull(uploadDir);
+   DiskFileUpload.baseDirectory = 
uploadDir.toAbsolutePath().toString();
+   }
+
+   @Override
+   protected void channelRead0(final ChannelHandlerContext ctx, final 
HttpObject msg) throws Exception {
+   if (msg instanceof HttpRequest) {
+   final HttpRequest httpRequest = (HttpRequest) msg;
+   if (httpRequest.getMethod().equals(HttpMethod.POST)) {
+   final HttpPostRequestDecoder 
httpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
+   if (httpPostRequestDecoder.isMultipart()) {
+   currentHttpPostRequestDecoder = 
httpPostRequestDecoder;
+   currentHttpRequest = httpRequest;
+   } else {
+   ctx.fireChannelRead(msg);
--- End 

[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362645#comment-16362645
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167930174
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
--- End diff --

both comments fixed


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362640#comment-16362640
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167929912
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.InterfaceHttpData;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writes multipart/form-data to disk. Delegates all other requests to the 
next
+ * {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
+ */
+public class FileUploadHandler extends 
SimpleChannelInboundHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileUploadHandler.class);
+
+   static final AttributeKey UPLOADED_FILE = 
AttributeKey.valueOf("UPLOADED_FILE");
+
+   private static final HttpDataFactory DATA_FACTORY = new 
DefaultHttpDataFactory(true);
+
+   private final Path uploadDir;
+
+   private HttpPostRequestDecoder currentHttpPostRequestDecoder;
+
+   private HttpRequest currentHttpRequest;
+
+   public FileUploadHandler(final Path uploadDir) {
+   super(false);
+
+   this.uploadDir = requireNonNull(uploadDir);
+   DiskFileUpload.baseDirectory = 
uploadDir.toAbsolutePath().toString();
--- End diff --

yes


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16362639#comment-16362639
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167929864
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.FileUpload;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Handles .jar file uploads.
+ */
+public class JarUploadHandler extends
+   AbstractRestHandler {
+
+   private final Path jarDir;
+
+   private final Executor executor;
+
+   public JarUploadHandler(
+   final CompletableFuture localRestAddress,
+   final GatewayRetriever 
leaderRetriever,
+   final Time timeout,
+   final Map responseHeaders,
+   final MessageHeaders messageHeaders,
+   final Path jarDir,
+   final Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+   this.jarDir = requireNonNull(jarDir);
+   this.executor = requireNonNull(executor);
+   }
+
+   @Override
+   protected CompletableFuture handleRequest(
+   @Nonnull final HandlerRequest request,
+   @Nonnull final RestfulGateway gateway) throws 
RestHandlerException {
+
+   final FileUpload fileUpload = request.getRequestBody();
+   return CompletableFuture.supplyAsync(() -> {
+   if 
(!fileUpload.getPath().getFileName().toString().endsWith(".jar")) {
+   deleteUploadedFile(fileUpload);
+   throw new CompletionException(new 
RestHandlerException(
+   "Only Jar files are allowed.",
+   HttpResponseStatus.BAD_REQUEST));
+   } else {
+   try {
+   Files.move(fileUpload.getPath(), 
jarDir.resolve(fileUpload.getPath().getFileName()));
+   } catch (IOException e) {
--- End diff --

yes


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, 

[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360691#comment-16360691
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167542881
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.FileUpload;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Handles .jar file uploads.
+ */
+public class JarUploadHandler extends
+   AbstractRestHandler {
+
+   private final Path jarDir;
+
+   private final Executor executor;
+
+   public JarUploadHandler(
+   final CompletableFuture localRestAddress,
+   final GatewayRetriever 
leaderRetriever,
+   final Time timeout,
+   final Map responseHeaders,
+   final MessageHeaders messageHeaders,
+   final Path jarDir,
+   final Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+   this.jarDir = requireNonNull(jarDir);
+   this.executor = requireNonNull(executor);
+   }
+
+   @Override
+   protected CompletableFuture handleRequest(
+   @Nonnull final HandlerRequest request,
+   @Nonnull final RestfulGateway gateway) throws 
RestHandlerException {
+
+   final FileUpload fileUpload = request.getRequestBody();
+   return CompletableFuture.supplyAsync(() -> {
+   if 
(!fileUpload.getPath().getFileName().toString().endsWith(".jar")) {
+   deleteUploadedFile(fileUpload);
+   throw new CompletionException(new 
RestHandlerException(
+   "Only Jar files are allowed.",
+   HttpResponseStatus.BAD_REQUEST));
+   } else {
+   try {
+   Files.move(fileUpload.getPath(), 
jarDir.resolve(fileUpload.getPath().getFileName()));
--- End diff --

please guard the `jarDir` access as done in 
8fdea6093a55c33732ae869b82552371b8142c2a. I suppose you'll have to create new 
utility methods outside the  `WebRuntimeMonitor`.


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
>  

[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360614#comment-16360614
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167525118
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
--- End diff --

Could return a `Collection` instead of `List`


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360612#comment-16360612
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167522151
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.InterfaceHttpData;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writes multipart/form-data to disk. Delegates all other requests to the 
next
+ * {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
+ */
+public class FileUploadHandler extends 
SimpleChannelInboundHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileUploadHandler.class);
+
+   static final AttributeKey UPLOADED_FILE = 
AttributeKey.valueOf("UPLOADED_FILE");
+
+   private static final HttpDataFactory DATA_FACTORY = new 
DefaultHttpDataFactory(true);
+
+   private final Path uploadDir;
+
+   private HttpPostRequestDecoder currentHttpPostRequestDecoder;
+
+   private HttpRequest currentHttpRequest;
+
+   public FileUploadHandler(final Path uploadDir) {
+   super(false);
+
+   this.uploadDir = requireNonNull(uploadDir);
+   DiskFileUpload.baseDirectory = 
uploadDir.toAbsolutePath().toString();
--- End diff --

This is a bit ugly how Netty did it here...


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360615#comment-16360615
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167523546
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.InterfaceHttpData;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writes multipart/form-data to disk. Delegates all other requests to the 
next
+ * {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
+ */
+public class FileUploadHandler extends 
SimpleChannelInboundHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileUploadHandler.class);
+
+   static final AttributeKey UPLOADED_FILE = 
AttributeKey.valueOf("UPLOADED_FILE");
+
+   private static final HttpDataFactory DATA_FACTORY = new 
DefaultHttpDataFactory(true);
+
+   private final Path uploadDir;
+
+   private HttpPostRequestDecoder currentHttpPostRequestDecoder;
+
+   private HttpRequest currentHttpRequest;
+
+   public FileUploadHandler(final Path uploadDir) {
+   super(false);
+
+   this.uploadDir = requireNonNull(uploadDir);
+   DiskFileUpload.baseDirectory = 
uploadDir.toAbsolutePath().toString();
+   }
+
+   @Override
+   protected void channelRead0(final ChannelHandlerContext ctx, final 
HttpObject msg) throws Exception {
+   if (msg instanceof HttpRequest) {
+   final HttpRequest httpRequest = (HttpRequest) msg;
+   if (httpRequest.getMethod().equals(HttpMethod.POST)) {
+   final HttpPostRequestDecoder 
httpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
+   if (httpPostRequestDecoder.isMultipart()) {
+   currentHttpPostRequestDecoder = 
httpPostRequestDecoder;
+   currentHttpRequest = httpRequest;
+   } else {
+   ctx.fireChannelRead(msg);
   

[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360613#comment-16360613
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167525322
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
+   GatewayRetriever leaderRetriever,
+   CompletableFuture restAddressFuture,
+   Time timeout,
+   java.nio.file.Path uploadDir,
+   Executor executor) {
+
+   // 1. Check if flink-runtime-web is in the classpath
+   try {
+   final String classname = 
"org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
+   Class.forName(classname).asSubclass(WebMonitor.class);
+   } catch (ClassNotFoundException e) {
+   // class not found means that there is no 
flink-runtime-web in the classpath
+   return Collections.emptyList();
+   }
--- End diff --

This could be a separate method since we use this in `tryLoadWebContent` as 
well.


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360616#comment-16360616
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167520629
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.FileUpload;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Handles .jar file uploads.
+ */
+public class JarUploadHandler extends
+   AbstractRestHandler {
+
+   private final Path jarDir;
+
+   private final Executor executor;
+
+   public JarUploadHandler(
+   final CompletableFuture localRestAddress,
+   final GatewayRetriever 
leaderRetriever,
+   final Time timeout,
+   final Map responseHeaders,
+   final MessageHeaders messageHeaders,
+   final Path jarDir,
+   final Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+   this.jarDir = requireNonNull(jarDir);
+   this.executor = requireNonNull(executor);
+   }
+
+   @Override
+   protected CompletableFuture handleRequest(
+   @Nonnull final HandlerRequest request,
+   @Nonnull final RestfulGateway gateway) throws 
RestHandlerException {
+
+   final FileUpload fileUpload = request.getRequestBody();
+   return CompletableFuture.supplyAsync(() -> {
+   if 
(!fileUpload.getPath().getFileName().toString().endsWith(".jar")) {
+   deleteUploadedFile(fileUpload);
+   throw new CompletionException(new 
RestHandlerException(
+   "Only Jar files are allowed.",
+   HttpResponseStatus.BAD_REQUEST));
+   } else {
+   try {
+   Files.move(fileUpload.getPath(), 
jarDir.resolve(fileUpload.getPath().getFileName()));
+   } catch (IOException e) {
--- End diff --

Should we try to delete the uploaded file in case of this error?


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: 

[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16360617#comment-16360617
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167523095
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObject;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.DiskFileUpload;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpDataFactory;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.InterfaceHttpData;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writes multipart/form-data to disk. Delegates all other requests to the 
next
+ * {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
+ */
+public class FileUploadHandler extends 
SimpleChannelInboundHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(FileUploadHandler.class);
+
+   static final AttributeKey UPLOADED_FILE = 
AttributeKey.valueOf("UPLOADED_FILE");
+
+   private static final HttpDataFactory DATA_FACTORY = new 
DefaultHttpDataFactory(true);
+
+   private final Path uploadDir;
+
+   private HttpPostRequestDecoder currentHttpPostRequestDecoder;
+
+   private HttpRequest currentHttpRequest;
+
+   public FileUploadHandler(final Path uploadDir) {
+   super(false);
+
+   this.uploadDir = requireNonNull(uploadDir);
+   DiskFileUpload.baseDirectory = 
uploadDir.toAbsolutePath().toString();
+   }
+
+   @Override
+   protected void channelRead0(final ChannelHandlerContext ctx, final 
HttpObject msg) throws Exception {
+   if (msg instanceof HttpRequest) {
+   final HttpRequest httpRequest = (HttpRequest) msg;
+   if (httpRequest.getMethod().equals(HttpMethod.POST)) {
+   final HttpPostRequestDecoder 
httpPostRequestDecoder = new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
--- End diff --

What happens with this decoder, if the request is not a multi part request? 
Wouldn't it be better to use `HttpPostRequestDecoder#isMultipart(HttpRequest 
request)`?


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: 

[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359894#comment-16359894
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167430276
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
--- End diff --

Should be `T extends DispatcherGateway`


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359888#comment-16359888
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167429820
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
--- End diff --

tests are missing


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359886#comment-16359886
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167429671
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
+   GatewayRetriever leaderRetriever,
+   CompletableFuture restAddressFuture,
+   Time timeout,
+   java.nio.file.Path uploadDir,
+   Executor executor) {
+
+   // 1. Check if flink-runtime-web is in the classpath
+   try {
+   final String classname = 
"org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
--- End diff --

Should be a constant.


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359885#comment-16359885
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167429608
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 ---
@@ -100,6 +107,16 @@ public DispatcherRestEndpoint(
timeout,
responseHeaders);
 
+   final List> optJarHandlers =
--- End diff --

Should also check `WebOptions.SUBMIT_ENABLE`.


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358506#comment-16358506
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167250379
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
--- End diff --

The legacy handlers could be moved to  a `legacy` package.


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358504#comment-16358504
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167249950
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
+   GatewayRetriever leaderRetriever,
+   CompletableFuture restAddressFuture,
+   Time timeout,
+   java.nio.file.Path uploadDir,
+   Executor executor) {
+
+   // 1. Check if flink-runtime-web is in the classpath
+   try {
+   final String classname = 
"org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
+   Class.forName(classname).asSubclass(WebMonitor.class);
+   } catch (ClassNotFoundException e) {
+   // class not found means that there is no 
flink-runtime-web in the classpath
+   return Collections.emptyList();
+   }
+
+   try {
+   final String classname = 
"org.apache.flink.runtime.webmonitor.handlers.ng.JarUploadHandler";
+   final Class clazz = Class.forName(classname);
+   final Constructor constructor = clazz.getConstructor(
+   CompletableFuture.class,
+   GatewayRetriever.class,
+   Time.class,
+   Map.class,
+   MessageHeaders.class,
+   java.nio.file.Path.class,
+   Executor.class);
+
+   final MessageHeaders jarUploadMessageHeaders =
+   (MessageHeaders) Class
+   
.forName("org.apache.flink.runtime.webmonitor.handlers.ng.JarUploadMessageHeaders")
+   .newInstance();
+
+   return Arrays.asList(Tuple2.of(jarUploadMessageHeaders, 
(ChannelInboundHandler) constructor.newInstance(
+   restAddressFuture,
+   leaderRetriever,
+   timeout,
+   Collections.emptyMap(),
--- End diff --

should use headers defined by `restConfiguration.getResponseHeaders()`


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358503#comment-16358503
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167249729
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 ---
@@ -207,6 +215,53 @@ public static WebMonitor startWebRuntimeMonitor(
}
}
 
+   @SuppressWarnings({"unchecked", "rawtypes"})
+   public static  
List> 
tryLoadJarUploadHandler(
--- End diff --

Method name is not accurate. There will be more handlers.


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358499#comment-16358499
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167249285
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadHandler.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.FileUpload;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Handles .jar file uploads.
+ */
+public class JarUploadHandler extends
--- End diff --

Tests missing.


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358500#comment-16358500
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5442#discussion_r167249371
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ng/JarUploadResponseBody.java
 ---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers.ng;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link ResponseBody} for {@link JarUploadHandler}.
+ */
+public class JarUploadResponseBody implements ResponseBody {
--- End diff --

Marshalling test missing.


> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7713) Port JarUploadHandler to new REST endpoint

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358498#comment-16358498
 ] 

ASF GitHub Bot commented on FLINK-7713:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5442

[FLINK-7713][flip6] Implement JarUploadHandler

## What is the purpose of the change

*Allow uploading jars through HTTP to enable job submissions from the web 
ui.*

cc: @tillrohrmann  

## Brief change log

  - *Allow uploading jars through HTTP.*
  - *Implement and register JarUploadHandler.*

## Verifying this change

This change added tests and can be verified as follows:
  - *Added integration tests for uploading files.*
  - *Started cluster locally and uploaded jars using* 
```
curl -v -X POST -H "Expect:" -F 
"jarfile=@examples/streaming/Kafka010Example.jar" 
http://127.0.0.1:9065/jars/upload
```

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-7713

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5442.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5442


commit caf74f63db6047a79393e53e9434eb1cf078ee48
Author: gyao 
Date:   2018-02-09T14:46:22Z

[FLINK-7713][flip6] Implement JarUploadHandler




> Port JarUploadHandler to new REST endpoint
> --
>
> Key: FLINK-7713
> URL: https://issues.apache.org/jira/browse/FLINK-7713
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarUploadHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)