[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6178


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197086869
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -151,18 +143,26 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
ctx.fireChannelRead(msg);
}
} catch (Exception e) {
-   HttpRequest tmpRequest = currentHttpRequest;
-   deleteUploadedFiles();
-   reset();
-   LOG.warn("Internal server error. File upload failed.", 
e);
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   tmpRequest,
-   new ErrorResponseBody("File upload failed."),
-   HttpResponseStatus.INTERNAL_SERVER_ERROR,
-   Collections.emptyMap()
-   );
+   handleError(ctx, "File upload failed.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR, e);
+   }
+   }
+
+   private void handleError(ChannelHandlerContext ctx, String 
errorMessage, HttpResponseStatus responseStatus, @Nullable Throwable e) {
+   HttpRequest tmpRequest = currentHttpRequest;
+   deleteUploadedFiles();
+   reset();
+   if (e == null) {
+   LOG.warn(errorMessage);
+   } else {
+   LOG.warn(errorMessage, e);
--- End diff --

I don't think so.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197085237
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -151,18 +143,26 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
ctx.fireChannelRead(msg);
}
} catch (Exception e) {
-   HttpRequest tmpRequest = currentHttpRequest;
-   deleteUploadedFiles();
-   reset();
-   LOG.warn("Internal server error. File upload failed.", 
e);
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   tmpRequest,
-   new ErrorResponseBody("File upload failed."),
-   HttpResponseStatus.INTERNAL_SERVER_ERROR,
-   Collections.emptyMap()
-   );
+   handleError(ctx, "File upload failed.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR, e);
+   }
+   }
+
+   private void handleError(ChannelHandlerContext ctx, String 
errorMessage, HttpResponseStatus responseStatus, @Nullable Throwable e) {
+   HttpRequest tmpRequest = currentHttpRequest;
+   deleteUploadedFiles();
+   reset();
+   if (e == null) {
+   LOG.warn(errorMessage);
+   } else {
+   LOG.warn(errorMessage, e);
--- End diff --

but this will also print "null", won't it? That's what i was trying to 
avoid here.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197078502
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -151,18 +143,26 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
ctx.fireChannelRead(msg);
}
} catch (Exception e) {
-   HttpRequest tmpRequest = currentHttpRequest;
-   deleteUploadedFiles();
-   reset();
-   LOG.warn("Internal server error. File upload failed.", 
e);
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   tmpRequest,
-   new ErrorResponseBody("File upload failed."),
-   HttpResponseStatus.INTERNAL_SERVER_ERROR,
-   Collections.emptyMap()
-   );
+   handleError(ctx, "File upload failed.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR, e);
+   }
+   }
+
+   private void handleError(ChannelHandlerContext ctx, String 
errorMessage, HttpResponseStatus responseStatus, @Nullable Throwable e) {
+   HttpRequest tmpRequest = currentHttpRequest;
+   deleteUploadedFiles();
+   reset();
+   if (e == null) {
+   LOG.warn(errorMessage);
+   } else {
+   LOG.warn(errorMessage, e);
--- End diff --

I think we don't have to make this distinction here. 
`LOG.warn(errorMessage, null)` should also work.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197058127
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -259,6 +271,29 @@ public void testFileMultipart() throws Exception {
}
}
 
+   @Test
+   public void testUploadCleanupOnFailure() throws IOException {
+   OkHttpClient client = new OkHttpClient();
+
+   Request request = 
buildMixedRequestWithUnknownAttribute(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
+   try (Response response = client.newCall(request).execute()) {
+   assertEquals(HttpResponseStatus.BAD_REQUEST.code(), 
response.code());
+   }
+   assertUploadDirectoryIsEmpty();
+   }
+
+   private static void assertUploadDirectoryIsEmpty() throws IOException {
+   Preconditions.checkArgument(
+   1 == Files.list(configuredUploadDir).count(),
+   "Directory structure in rest upload directory has 
changed. Test must be adjusted");
+   Optional actualUploadDir = 
Files.list(configuredUploadDir).findAny();
+   Preconditions.checkArgument(
+   actualUploadDir.isPresent(),
+   "Expected upload directory does not exist.");
+   
System.out.println(Files.list(actualUploadDir.get()).collect(Collectors.toList()));
+   assertEquals("Not all files were cleaned up.", 0, 
Files.list(actualUploadDir.get()).count());
--- End diff --

Alright, but then the order how methods are called in the catch block is 
incorrect. Maybe that should be factored out into a method to avoid code 
duplication.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197055449
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   public static FileUploads forDirectory(Path directory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   Preconditions.checkArgument(directory.isAbsolute(), "Path must 
be absolute.");
+   Preconditions.checkArgument(Files.isDirectory(directory), "Path 
must be a directory.");
+
+   FileAdderVisitor visitor = new FileAdderVisitor();
+   Files.walkFileTree(directory, visitor);
+   files.addAll(visitor.getContainedFiles());
+   
+   return new FileUploads(Collections.singleton(directory), files);
+   }
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection directoriesToClean, 
Collection uploadedFiles) {
+   this.directoriesToClean = 
Preconditions.checkNotNull(directoriesToClean);
+   this.uploadedFiles = Preconditions.checkNotNull(uploadedFiles);
--- End diff --

I know it's really nitpicking what I'm doing here, but I think it would be 
slightly better to let FileUploads only represent the upload directories and 
add a method FileUploads#getFiles which returns a Collection which are 
all files being found in the upload directory. The difference is that we don't 
initialize FileUploads with it. That would effectively enforce that all files 
reside in the given upload directories. What we could do now is to initialize 
this class with directories /web/upload/a, /web/upload/b and files 
/web/different/path/file where the files are somewhere else located. Due to 
this, we not only need to delete the directories but also all files.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197054770
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -259,6 +271,29 @@ public void testFileMultipart() throws Exception {
}
}
 
+   @Test
+   public void testUploadCleanupOnFailure() throws IOException {
+   OkHttpClient client = new OkHttpClient();
+
+   Request request = 
buildMixedRequestWithUnknownAttribute(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
+   try (Response response = client.newCall(request).execute()) {
+   assertEquals(HttpResponseStatus.BAD_REQUEST.code(), 
response.code());
+   }
+   assertUploadDirectoryIsEmpty();
+   }
+
+   private static void assertUploadDirectoryIsEmpty() throws IOException {
+   Preconditions.checkArgument(
+   1 == Files.list(configuredUploadDir).count(),
+   "Directory structure in rest upload directory has 
changed. Test must be adjusted");
+   Optional actualUploadDir = 
Files.list(configuredUploadDir).findAny();
+   Preconditions.checkArgument(
+   actualUploadDir.isPresent(),
+   "Expected upload directory does not exist.");
+   
System.out.println(Files.list(actualUploadDir.get()).collect(Collectors.toList()));
+   assertEquals("Not all files were cleaned up.", 0, 
Files.list(actualUploadDir.get()).count());
--- End diff --

This test wasn't covering the case of exceptions but the rejection of 
unknown attributes. Will try to find a way to crash the handler.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197054545
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.router.RouteResult;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.util.Attribute;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AbstractHandler}.
+ */
+public class AbstractHandlerTest {
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @Test
+   public void testFileCleanup() throws Exception {
+   final Path file = temporaryFolder.newFile().toPath();
+
+   final String restAddress = "http://localhost:1234;;
+   RestfulGateway mockRestfulGateway = 
TestingRestfulGateway.newBuilder()
+   .setRestAddress(restAddress)
+   .build();
+
+   final GatewayRetriever mockGatewayRetriever = 
() ->
+   CompletableFuture.completedFuture(mockRestfulGateway);
+
+   TestHandler handler = new 
TestHandler(CompletableFuture.completedFuture(restAddress), 
mockGatewayRetriever);
+
+   RouteResult routeResult = new RouteResult<>("", "", 
Collections.emptyMap(), Collections.emptyMap(), "");
+   HttpRequest request = new DefaultFullHttpRequest(
+   HttpVersion.HTTP_1_1,
+   HttpMethod.GET,
+   
TestHandler.TestHeaders.INSTANCE.getTargetRestEndpointURL(),
+   Unpooled.wrappedBuffer(new byte[0]));
+   RoutedRequest routerRequest = new 
RoutedRequest<>(routeResult, request);
+
+   Attribute attribute = new SimpleAttribute();
+   attribute.set(new FileUploads(Collections.emptyList(), 
Collections.singleton(file)));
+   Channel channel = mock(Channel.class);
+   

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197051395
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -259,6 +271,29 @@ public void testFileMultipart() throws Exception {
}
}
 
+   @Test
+   public void testUploadCleanupOnFailure() throws IOException {
+   OkHttpClient client = new OkHttpClient();
+
+   Request request = 
buildMixedRequestWithUnknownAttribute(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
+   try (Response response = client.newCall(request).execute()) {
+   assertEquals(HttpResponseStatus.BAD_REQUEST.code(), 
response.code());
+   }
+   assertUploadDirectoryIsEmpty();
+   }
+
+   private static void assertUploadDirectoryIsEmpty() throws IOException {
+   Preconditions.checkArgument(
+   1 == Files.list(configuredUploadDir).count(),
+   "Directory structure in rest upload directory has 
changed. Test must be adjusted");
+   Optional actualUploadDir = 
Files.list(configuredUploadDir).findAny();
+   Preconditions.checkArgument(
+   actualUploadDir.isPresent(),
+   "Expected upload directory does not exist.");
+   
System.out.println(Files.list(actualUploadDir.get()).collect(Collectors.toList()));
--- End diff --

Remove system out


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197053303
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -259,6 +271,29 @@ public void testFileMultipart() throws Exception {
}
}
 
+   @Test
+   public void testUploadCleanupOnFailure() throws IOException {
+   OkHttpClient client = new OkHttpClient();
+
+   Request request = 
buildMixedRequestWithUnknownAttribute(mixedHandler.getMessageHeaders().getTargetRestEndpointURL());
+   try (Response response = client.newCall(request).execute()) {
+   assertEquals(HttpResponseStatus.BAD_REQUEST.code(), 
response.code());
+   }
+   assertUploadDirectoryIsEmpty();
+   }
+
+   private static void assertUploadDirectoryIsEmpty() throws IOException {
+   Preconditions.checkArgument(
+   1 == Files.list(configuredUploadDir).count(),
+   "Directory structure in rest upload directory has 
changed. Test must be adjusted");
+   Optional actualUploadDir = 
Files.list(configuredUploadDir).findAny();
+   Preconditions.checkArgument(
+   actualUploadDir.isPresent(),
+   "Expected upload directory does not exist.");
+   
System.out.println(Files.list(actualUploadDir.get()).collect(Collectors.toList()));
+   assertEquals("Not all files were cleaned up.", 0, 
Files.list(actualUploadDir.get()).count());
--- End diff --

Why don't we run into a race condition with the catch block of 
`FileUploadHandler`?


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-21 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197053400
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.router.RouteResult;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.util.Attribute;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AbstractHandler}.
+ */
+public class AbstractHandlerTest {
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @Test
+   public void testFileCleanup() throws Exception {
+   final Path file = temporaryFolder.newFile().toPath();
+
+   final String restAddress = "http://localhost:1234;;
+   RestfulGateway mockRestfulGateway = 
TestingRestfulGateway.newBuilder()
+   .setRestAddress(restAddress)
+   .build();
+
+   final GatewayRetriever mockGatewayRetriever = 
() ->
+   CompletableFuture.completedFuture(mockRestfulGateway);
+
+   TestHandler handler = new 
TestHandler(CompletableFuture.completedFuture(restAddress), 
mockGatewayRetriever);
+
+   RouteResult routeResult = new RouteResult<>("", "", 
Collections.emptyMap(), Collections.emptyMap(), "");
+   HttpRequest request = new DefaultFullHttpRequest(
+   HttpVersion.HTTP_1_1,
+   HttpMethod.GET,
+   
TestHandler.TestHeaders.INSTANCE.getTargetRestEndpointURL(),
+   Unpooled.wrappedBuffer(new byte[0]));
+   RoutedRequest routerRequest = new 
RoutedRequest<>(routeResult, request);
+
+   Attribute attribute = new SimpleAttribute();
+   attribute.set(new FileUploads(Collections.emptyList(), 
Collections.singleton(file)));
+   Channel channel = mock(Channel.class);
+   

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197050066
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.router.RouteResult;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.util.Attribute;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AbstractHandler}.
+ */
+public class AbstractHandlerTest {
--- End diff --

`TestLogger` missing


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197050662
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/AbstractHandlerTest.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.router.RouteResult;
+import org.apache.flink.runtime.rest.handler.router.RoutedRequest;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.shaded.netty4.io.netty.util.Attribute;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link AbstractHandler}.
+ */
+public class AbstractHandlerTest {
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @Test
+   public void testFileCleanup() throws Exception {
+   final Path file = temporaryFolder.newFile().toPath();
+
+   final String restAddress = "http://localhost:1234;;
+   RestfulGateway mockRestfulGateway = 
TestingRestfulGateway.newBuilder()
+   .setRestAddress(restAddress)
+   .build();
+
+   final GatewayRetriever mockGatewayRetriever = 
() ->
+   CompletableFuture.completedFuture(mockRestfulGateway);
+
+   TestHandler handler = new 
TestHandler(CompletableFuture.completedFuture(restAddress), 
mockGatewayRetriever);
+
+   RouteResult routeResult = new RouteResult<>("", "", 
Collections.emptyMap(), Collections.emptyMap(), "");
+   HttpRequest request = new DefaultFullHttpRequest(
+   HttpVersion.HTTP_1_1,
+   HttpMethod.GET,
+   
TestHandler.TestHeaders.INSTANCE.getTargetRestEndpointURL(),
+   Unpooled.wrappedBuffer(new byte[0]));
+   RoutedRequest routerRequest = new 
RoutedRequest<>(routeResult, request);
+
+   Attribute attribute = new SimpleAttribute();
+   attribute.set(new FileUploads(Collections.emptyList(), 
Collections.singleton(file)));
+   Channel channel = mock(Channel.class);
+   

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r197049400
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -45,27 +45,26 @@
@SuppressWarnings("resource")
public static final FileUploads EMPTY = new FileUploads();
 
+   public static FileUploads forDirectory(Path directory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   Preconditions.checkArgument(directory.isAbsolute(), "Path must 
be absolute.");
+   Preconditions.checkArgument(Files.isDirectory(directory), "Path 
must be a directory.");
+
+   FileAdderVisitor visitor = new FileAdderVisitor();
+   Files.walkFileTree(directory, visitor);
+   files.addAll(visitor.getContainedFiles());
+   
+   return new FileUploads(Collections.singleton(directory), files);
+   }
+
private FileUploads() {
this.directoriesToClean = Collections.emptyList();
this.uploadedFiles = Collections.emptyList();
}
 
-   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
-   final Collection files = new ArrayList<>(4);
-   final Collection directories = new ArrayList<>(1);
-   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
-   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
-   if (Files.isDirectory(fileOrDirectory)) {
-   directories.add(fileOrDirectory);
-   FileAdderVisitor visitor = new 
FileAdderVisitor();
-   Files.walkFileTree(fileOrDirectory, visitor);
-   files.addAll(visitor.getContainedFiles());
-   } else {
-   files.add(fileOrDirectory);
-   }
-   }
-   directoriesToClean = 
Collections.unmodifiableCollection(directories);
-   uploadedFiles = Collections.unmodifiableCollection(files);
+   public FileUploads(Collection directoriesToClean, 
Collection uploadedFiles) {
+   this.directoriesToClean = 
Preconditions.checkNotNull(directoriesToClean);
+   this.uploadedFiles = Preconditions.checkNotNull(uploadedFiles);
--- End diff --

I know it's really nitpicking what I'm doing here, but I think it would be 
slightly better to let `FileUploads` only represent the upload directories and 
add a method `FileUploads#getFiles` which returns a `Collection` which 
are all files being found in the upload directory. The difference is that we 
don't initialize `FileUploads` with it. That would effectively enforce that all 
files reside in the given upload directories. What we could do now is to 
initialize this class with directories `/web/upload/a, /web/upload/b` and files 
`/web/different/path/file` where the files are somewhere else located. Due to 
this, we not only need to delete the directories but also all files.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196868266
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest extends TestLogger {
+
+   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static RestServerEndpoint serverEndpoint;
+   private static String serverAddress;
+
+   private static MultipartMixedHandler mixedHandler;
+   private static MultipartJsonHandler jsonHandler;
+   private static MultipartFileHandler fileHandler;
+   private static File file1;
+   private static File file2;
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   Configuration config = new Configuration();
+   config.setInteger(RestOptions.PORT, 0);
+   config.setString(RestOptions.ADDRESS, "localhost");

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196849558
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest extends TestLogger {
+
+   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static RestServerEndpoint serverEndpoint;
+   private static String serverAddress;
+
+   private static MultipartMixedHandler mixedHandler;
+   private static MultipartJsonHandler jsonHandler;
+   private static MultipartFileHandler fileHandler;
+   private static File file1;
+   private static File file2;
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   Configuration config = new Configuration();
+   config.setInteger(RestOptions.PORT, 0);
+   config.setString(RestOptions.ADDRESS, "localhost");

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196836698
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   final Collection directories = new ArrayList<>(1);
+   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
+   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
+   if (Files.isDirectory(fileOrDirectory)) {
+   directories.add(fileOrDirectory);
+   FileAdderVisitor visitor = new 
FileAdderVisitor();
+   Files.walkFileTree(fileOrDirectory, visitor);
+   files.addAll(visitor.getContainedFiles());
+   } else {
+   files.add(fileOrDirectory);
+   }
--- End diff --

we don't have to, it's for testing convenience as noted in the class 
javadocs.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196836575
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -70,51 +84,103 @@ public FileUploadHandler(final Path uploadDir) {
 
@Override
protected void channelRead0(final ChannelHandlerContext ctx, final 
HttpObject msg) throws Exception {
-   if (msg instanceof HttpRequest) {
-   final HttpRequest httpRequest = (HttpRequest) msg;
-   if (httpRequest.getMethod().equals(HttpMethod.POST)) {
-   if 
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
-   currentHttpPostRequestDecoder = new 
HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
-   currentHttpRequest = httpRequest;
+   try {
+   if (msg instanceof HttpRequest) {
+   final HttpRequest httpRequest = (HttpRequest) 
msg;
+   LOG.trace("Received request. URL:{} Method:{}", 
httpRequest.getUri(), httpRequest.getMethod());
+   if 
(httpRequest.getMethod().equals(HttpMethod.POST)) {
+   if 
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
+   currentHttpPostRequestDecoder = 
new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
+   currentHttpRequest = 
httpRequest;
+   currentUploadDir = 
Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString()));
+   } else {
+   ctx.fireChannelRead(msg);
+   }
} else {
ctx.fireChannelRead(msg);
}
+   } else if (msg instanceof HttpContent && 
currentHttpPostRequestDecoder != null) {
+   // make sure that we still have a upload dir in 
case that it got deleted in the meanwhile
+   RestServerEndpoint.createUploadDir(uploadDir, 
LOG);
+
+   final HttpContent httpContent = (HttpContent) 
msg;
+   
currentHttpPostRequestDecoder.offer(httpContent);
+
+   while (currentHttpPostRequestDecoder.hasNext()) 
{
+   final InterfaceHttpData data = 
currentHttpPostRequestDecoder.next();
+   if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.FileUpload) {
+   final DiskFileUpload fileUpload 
= (DiskFileUpload) data;
+   
checkState(fileUpload.isCompleted());
+
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
+   
fileUpload.renameTo(dest.toFile());
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = 
(Attribute) data;
+   // this could also be 
implemented by using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received 
unknown attribute {}.", data.getName());
+   
HandlerUtils.sendErrorResponse(
+   ctx,
+   
currentHttpRequest,
+   new 
ErrorResponseBody("Received unknown attribute " + data.getName() + '.'),
+   
HttpResponseStatus.BAD_REQUEST,
+   
Collections.emptyMap()
+   );
+   deleteUploadedFiles();
+   reset();
+   return;
+   }
+   }
+   }
+
+

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196740578
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
 ---
@@ -39,15 +41,21 @@
 public class HandlerRequest {
 
private final R requestBody;
+   private final FileUploads uploadedFiles;
--- End diff --

This comment has not been addressed. I think the `HandlerRequest` should 
not know about the `FileUploads` because it can use to delete the files.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196741065
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest extends TestLogger {
+
+   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static RestServerEndpoint serverEndpoint;
+   private static String serverAddress;
+
+   private static MultipartMixedHandler mixedHandler;
+   private static MultipartJsonHandler jsonHandler;
+   private static MultipartFileHandler fileHandler;
+   private static File file1;
+   private static File file2;
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   Configuration config = new Configuration();
+   config.setInteger(RestOptions.PORT, 0);
+   config.setString(RestOptions.ADDRESS, 

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196739603
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 ---
@@ -70,9 +70,14 @@ protected AbstractRestHandler(
}
 
@Override
-   protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest 
httpRequest, HandlerRequest handlerRequest, T gateway) throws 
RestHandlerException {
+   protected void respondToRequest(ChannelHandlerContext ctx, HttpRequest 
httpRequest, HandlerRequest handlerRequest, T gateway) {
CompletableFuture response;
 
+   if (!messageHeaders.acceptsFileUploads() && 
!handlerRequest.getUploadedFiles().isEmpty()) {
+   processRestHandlerException(ctx, httpRequest, new 
RestHandlerException("File uploads not allowed.", 
HttpResponseStatus.BAD_REQUEST));
+   return;
+   }
--- End diff --

Shouldn't this be moved to the `AbstractHandler`?


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196741255
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest extends TestLogger {
+
+   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static RestServerEndpoint serverEndpoint;
+   private static String serverAddress;
+
+   private static MultipartMixedHandler mixedHandler;
+   private static MultipartJsonHandler jsonHandler;
+   private static MultipartFileHandler fileHandler;
+   private static File file1;
+   private static File file2;
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   Configuration config = new Configuration();
+   config.setInteger(RestOptions.PORT, 0);
+   config.setString(RestOptions.ADDRESS, 

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196740018
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -70,51 +84,103 @@ public FileUploadHandler(final Path uploadDir) {
 
@Override
protected void channelRead0(final ChannelHandlerContext ctx, final 
HttpObject msg) throws Exception {
-   if (msg instanceof HttpRequest) {
-   final HttpRequest httpRequest = (HttpRequest) msg;
-   if (httpRequest.getMethod().equals(HttpMethod.POST)) {
-   if 
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
-   currentHttpPostRequestDecoder = new 
HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
-   currentHttpRequest = httpRequest;
+   try {
+   if (msg instanceof HttpRequest) {
+   final HttpRequest httpRequest = (HttpRequest) 
msg;
+   LOG.trace("Received request. URL:{} Method:{}", 
httpRequest.getUri(), httpRequest.getMethod());
+   if 
(httpRequest.getMethod().equals(HttpMethod.POST)) {
+   if 
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
+   currentHttpPostRequestDecoder = 
new HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
+   currentHttpRequest = 
httpRequest;
+   currentUploadDir = 
Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString()));
+   } else {
+   ctx.fireChannelRead(msg);
+   }
} else {
ctx.fireChannelRead(msg);
}
+   } else if (msg instanceof HttpContent && 
currentHttpPostRequestDecoder != null) {
+   // make sure that we still have a upload dir in 
case that it got deleted in the meanwhile
+   RestServerEndpoint.createUploadDir(uploadDir, 
LOG);
+
+   final HttpContent httpContent = (HttpContent) 
msg;
+   
currentHttpPostRequestDecoder.offer(httpContent);
+
+   while (currentHttpPostRequestDecoder.hasNext()) 
{
+   final InterfaceHttpData data = 
currentHttpPostRequestDecoder.next();
+   if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.FileUpload) {
+   final DiskFileUpload fileUpload 
= (DiskFileUpload) data;
+   
checkState(fileUpload.isCompleted());
+
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
+   
fileUpload.renameTo(dest.toFile());
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = 
(Attribute) data;
+   // this could also be 
implemented by using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received 
unknown attribute {}.", data.getName());
+   
HandlerUtils.sendErrorResponse(
+   ctx,
+   
currentHttpRequest,
+   new 
ErrorResponseBody("Received unknown attribute " + data.getName() + '.'),
+   
HttpResponseStatus.BAD_REQUEST,
+   
Collections.emptyMap()
+   );
+   deleteUploadedFiles();
+   reset();
+   return;
+   }
+   }
+   }
+
+  

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196740407
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   final Collection directories = new ArrayList<>(1);
+   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
+   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
+   if (Files.isDirectory(fileOrDirectory)) {
+   directories.add(fileOrDirectory);
+   FileAdderVisitor visitor = new 
FileAdderVisitor();
+   Files.walkFileTree(fileOrDirectory, visitor);
+   files.addAll(visitor.getContainedFiles());
+   } else {
+   files.add(fileOrDirectory);
+   }
--- End diff --

Do we have to allow that we can specify files and directories alike? Why 
not requiring that you have to provide a upload directory which contains all 
uploaded files. This makes the whole clean up logic easier.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196739865
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   final Collection directories = new ArrayList<>(1);
+   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
+   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
+   if (Files.isDirectory(fileOrDirectory)) {
+   directories.add(fileOrDirectory);
+   FileAdderVisitor visitor = new 
FileAdderVisitor();
+   Files.walkFileTree(fileOrDirectory, visitor);
+   files.addAll(visitor.getContainedFiles());
+   } else {
+   files.add(fileOrDirectory);
+   }
+   }
+   directoriesToClean = 
Collections.unmodifiableCollection(directories);
+   uploadedFiles = Collections.unmodifiableCollection(files);
--- End diff --

I think it is better to move this logic out of the constructor. Adding 
logic to a constructor makes testing always difficult.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196712002
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -95,14 +107,22 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
checkState(fileUpload.isCompleted());
 
-   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
-   "_" + 
fileUpload.getFilename()));
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
fileUpload.renameTo(dest.toFile());
-   
ctx.channel().attr(UPLOADED_FILE).set(dest);
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = (Attribute) 
data;
+   // this could also be implemented by 
using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received unknown 
attribute {}, will be ignored.", data.getName());
+   }
}
}
 
if (httpContent instanceof LastHttpContent) {
+   ctx.channel().attr(UPLOADED_FILES).set(new 
FileUploads(Collections.singleton(currentUploadDir)));
+   
ctx.channel().attr(UPLOADED_JSON).set(currentJsonPayload);
--- End diff --

well look at that, it _actually works_


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196701452
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -116,5 +136,16 @@ private void reset() {
currentHttpPostRequestDecoder.destroy();
currentHttpPostRequestDecoder = null;
currentHttpRequest = null;
+   currentUploadDir = null;
+   currentJsonPayload = null;
+   }
+
+   public static Optional 
getMultipartJsonPayload(ChannelHandlerContext ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get());
+   }
+
+   public static FileUploads getMultipartFileUploads(ChannelHandlerContext 
ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get())
+   .orElse(FileUploads.EMPTY);
--- End diff --

I think not much. My gut feeling is just that `FileUploads` can be 
simplified. Instead of having our own FileVisitor, we could simply call 
`FileUtils.deleteDirectory(uploadDirectory)`. And I think  this class has 
actually two responsibilities: Listing all files to make them accessible and 
storing the directories in which they reside to delete them afterwards.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196699736
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -72,10 +82,12 @@ public FileUploadHandler(final Path uploadDir) {
protected void channelRead0(final ChannelHandlerContext ctx, final 
HttpObject msg) throws Exception {
if (msg instanceof HttpRequest) {
final HttpRequest httpRequest = (HttpRequest) msg;
+   LOG.trace("Received request. URL:{} Method:{}", 
httpRequest.getUri(), httpRequest.getMethod());
if (httpRequest.getMethod().equals(HttpMethod.POST)) {
if 
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
currentHttpPostRequestDecoder = new 
HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
currentHttpRequest = httpRequest;
+   currentUploadDir = 
Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString()));
--- End diff --

I think we should add this.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196698434
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -72,10 +82,12 @@ public FileUploadHandler(final Path uploadDir) {
protected void channelRead0(final ChannelHandlerContext ctx, final 
HttpObject msg) throws Exception {
if (msg instanceof HttpRequest) {
final HttpRequest httpRequest = (HttpRequest) msg;
+   LOG.trace("Received request. URL:{} Method:{}", 
httpRequest.getUri(), httpRequest.getMethod());
if (httpRequest.getMethod().equals(HttpMethod.POST)) {
if 
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
currentHttpPostRequestDecoder = new 
HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
currentHttpRequest = httpRequest;
+   currentUploadDir = 
Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString()));
--- End diff --

If the `FileUploadHandler` itself fail it isn't cleaned up, but that was 
already the case in the existing code. The handler is generally rather _light_ 
when it comes to failure handling (i.e. it doesn't do anything in that regard).


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196696248
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -116,5 +136,16 @@ private void reset() {
currentHttpPostRequestDecoder.destroy();
currentHttpPostRequestDecoder = null;
currentHttpRequest = null;
+   currentUploadDir = null;
+   currentJsonPayload = null;
+   }
+
+   public static Optional 
getMultipartJsonPayload(ChannelHandlerContext ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get());
+   }
+
+   public static FileUploads getMultipartFileUploads(ChannelHandlerContext 
ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get())
+   .orElse(FileUploads.EMPTY);
--- End diff --

How does this differ to the current implementation? Are you suggesting to 
simplify `FileUploads` to only consider the case of 1 directory containing N 
files?


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196695325
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -72,10 +82,12 @@ public FileUploadHandler(final Path uploadDir) {
protected void channelRead0(final ChannelHandlerContext ctx, final 
HttpObject msg) throws Exception {
if (msg instanceof HttpRequest) {
final HttpRequest httpRequest = (HttpRequest) msg;
+   LOG.trace("Received request. URL:{} Method:{}", 
httpRequest.getUri(), httpRequest.getMethod());
if (httpRequest.getMethod().equals(HttpMethod.POST)) {
if 
(HttpPostRequestDecoder.isMultipart(httpRequest)) {
currentHttpPostRequestDecoder = new 
HttpPostRequestDecoder(DATA_FACTORY, httpRequest);
currentHttpRequest = httpRequest;
+   currentUploadDir = 
Files.createDirectory(uploadDir.resolve(UUID.randomUUID().toString()));
--- End diff --

How do we clean up the `currentUploadDir` in case of a failure?


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196689843
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -116,5 +136,16 @@ private void reset() {
currentHttpPostRequestDecoder.destroy();
currentHttpPostRequestDecoder = null;
currentHttpRequest = null;
+   currentUploadDir = null;
+   currentJsonPayload = null;
+   }
+
+   public static Optional 
getMultipartJsonPayload(ChannelHandlerContext ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get());
+   }
+
+   public static FileUploads getMultipartFileUploads(ChannelHandlerContext 
ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get())
+   .orElse(FileUploads.EMPTY);
--- End diff --

True, then I would suggest to only store the directory in the `FileUploads` 
and adding a method to retrieve all uploaded files.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196681937
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest {
+
+   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static RestServerEndpoint serverEndpoint;
+   private static String serverAddress;
+
+   private static MultipartMixedHandler mixedHandler;
+   private static MultipartJsonHandler jsonHandler;
+   private static MultipartFileHandler fileHandler;
+   private static File file1;
+   private static File file2;
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196667391
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -95,14 +107,22 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
checkState(fileUpload.isCompleted());
 
-   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
-   "_" + 
fileUpload.getFilename()));
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
fileUpload.renameTo(dest.toFile());
-   
ctx.channel().attr(UPLOADED_FILE).set(dest);
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = (Attribute) 
data;
+   // this could also be implemented by 
using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received unknown 
attribute {}, will be ignored.", data.getName());
--- End diff --

yes.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196667307
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -116,5 +136,16 @@ private void reset() {
currentHttpPostRequestDecoder.destroy();
currentHttpPostRequestDecoder = null;
currentHttpRequest = null;
+   currentUploadDir = null;
+   currentJsonPayload = null;
+   }
+
+   public static Optional 
getMultipartJsonPayload(ChannelHandlerContext ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get());
+   }
+
+   public static FileUploads getMultipartFileUploads(ChannelHandlerContext 
ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get())
+   .orElse(FileUploads.EMPTY);
--- End diff --

How files are stored is an implementation detail of the 
`FileUploadHandler`, why would we expose this to subsequent handlers?


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196667205
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -95,14 +107,22 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
checkState(fileUpload.isCompleted());
 
-   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
-   "_" + 
fileUpload.getFilename()));
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
fileUpload.renameTo(dest.toFile());
-   
ctx.channel().attr(UPLOADED_FILE).set(dest);
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = (Attribute) 
data;
+   // this could also be implemented by 
using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received unknown 
attribute {}, will be ignored.", data.getName());
+   }
}
}
 
if (httpContent instanceof LastHttpContent) {
+   ctx.channel().attr(UPLOADED_FILES).set(new 
FileUploads(Collections.singleton(currentUploadDir)));
+   
ctx.channel().attr(UPLOADED_JSON).set(currentJsonPayload);
--- End diff --

I can try this (it would be neat to remove the special case in 
`AbstractHandler`, but I'm wondering whether we can "simply" replace the 
payload of the multipart request (as identified by the headers that we also 
forward) with plain json.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196556101
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/FileUploadsTest.java
 ---
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Tests for {@link FileUploads}.
+ */
+public class FileUploadsTest {
--- End diff --

`extends TestLogger` is missing


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196453980
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
 ---
@@ -129,4 +137,9 @@ public R getRequestBody() {
return queryParameter.getValue();
}
}
+
+   @Nonnull
+   public FileUploads getFileUploads() {
+   return uploadedFiles;
+   }
--- End diff --

I would not expose `FileUploads` to the user but rather return a 
`Collection`.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196559335
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest {
+
+   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static RestServerEndpoint serverEndpoint;
+   private static String serverAddress;
+
+   private static MultipartMixedHandler mixedHandler;
+   private static MultipartJsonHandler jsonHandler;
+   private static MultipartFileHandler fileHandler;
+   private static File file1;
+   private static File file2;
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196558949
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
 ---
@@ -63,4 +63,13 @@
 * @return description for the header
 */
String getDescription();
+
+   /**
+* Returns whether this header allows file uploads.
+*
+* @return whether this header allows file uploads
+*/
+   default boolean acceptsFileUploads() {
+   return false;
+   }
--- End diff --

Should this maybe go into `UntypedResponseMessageHeaders`? At the moment 
one can upload files for a `AbstractHandler` (e.g. 
`AbstractTaskManagerFileHandler`) implementation and also has access to it via 
the `HandlerRequest` without being able to specify whether file upload is 
allowed or not. 


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196559093
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest {
--- End diff --

Missing `extends TestLogger`


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196554025
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java 
---
@@ -103,77 +104,74 @@ protected void respondAsLeader(ChannelHandlerContext 
ctx, RoutedRequest routedRe
return;
}
 
-   ByteBuf msgContent = ((FullHttpRequest) 
httpRequest).content();
+   final ByteBuf msgContent;
+   Optional multipartJsonPayload = 
FileUploadHandler.getMultipartJsonPayload(ctx);
+   if (multipartJsonPayload.isPresent()) {
+   msgContent = 
Unpooled.wrappedBuffer(multipartJsonPayload.get());
--- End diff --

Let's send the Json payload as a proper `HttpRequest`, then we don't have 
this special casing here.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196557260
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/HandlerRequest.java
 ---
@@ -39,15 +41,21 @@
 public class HandlerRequest {
 
private final R requestBody;
+   private final FileUploads uploadedFiles;
--- End diff --

This could also be a `Collection`


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196452418
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -116,5 +136,16 @@ private void reset() {
currentHttpPostRequestDecoder.destroy();
currentHttpPostRequestDecoder = null;
currentHttpRequest = null;
+   currentUploadDir = null;
+   currentJsonPayload = null;
+   }
+
+   public static Optional 
getMultipartJsonPayload(ChannelHandlerContext ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get());
+   }
--- End diff --

By sending the json payload down stream, we could avoid having this method.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196560163
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest {
+
+   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+   private static final Random RANDOM = new Random();
+
+   @ClassRule
+   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+   private static RestServerEndpoint serverEndpoint;
+   private static String serverAddress;
+
+   private static MultipartMixedHandler mixedHandler;
+   private static MultipartJsonHandler jsonHandler;
+   private static MultipartFileHandler fileHandler;
+   private static File file1;
+   private static File file2;
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   

[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196555487
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   final Collection directories = new ArrayList<>(1);
+   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
+   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
+   if (Files.isDirectory(fileOrDirectory)) {
+   directories.add(fileOrDirectory);
+   FileAdderVisitor visitor = new 
FileAdderVisitor();
+   Files.walkFileTree(fileOrDirectory, visitor);
+   files.addAll(visitor.get());
+   } else {
+   files.add(fileOrDirectory);
+   }
+   }
+   directoriesToClean = 
Collections.unmodifiableCollection(directories);
+   uploadedFiles = Collections.unmodifiableCollection(files);
+   }
+
+   public Collection getUploadedFiles() {
+   return uploadedFiles;
+   }
+
+   @Override
+   public void close() throws IOException {
+   for (Path file : uploadedFiles) {
+   try {
+   Files.delete(file);
+   } catch (FileNotFoundException ignored) {
+   // file may have been moved by a handler
+   }
+   }
+   for (Path directory : directoriesToClean) {
+   Files.walkFileTree(directory, CleanupFileVisitor.get());
+   }
+   }
+
+   private static final class FileAdderVisitor extends 
SimpleFileVisitor {
+
+   private final Collection files = new ArrayList<>(4);
+
+   Collection get() {
+   return files;
+   }
+
+   FileAdderVisitor() {
+   }
+
+   @Override
+   public FileVisitResult visitFile(Path file, BasicFileAttributes 
attrs) throws IOException {
+   FileVisitResult result = super.visitFile(file, attrs);
+   files.add(file);
+   return result;
+   }
+   }
+
+   private static final class CleanupFileVisitor extends 
SimpleFileVisitor {
--- End diff --

I think it would be better to make this an enum. Then we get all singleton 
properties for free.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196455211
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/AbstractHandler.java 
---
@@ -103,77 +104,74 @@ protected void respondAsLeader(ChannelHandlerContext 
ctx, RoutedRequest routedRe
return;
}
 
-   ByteBuf msgContent = ((FullHttpRequest) 
httpRequest).content();
+   final ByteBuf msgContent;
+   Optional multipartJsonPayload = 
FileUploadHandler.getMultipartJsonPayload(ctx);
+   if (multipartJsonPayload.isPresent()) {
+   msgContent = 
Unpooled.wrappedBuffer(multipartJsonPayload.get());
+   } else {
+   msgContent = ((FullHttpRequest) 
httpRequest).content();
+   }
 
-   R request;
-   if (isFileUpload()) {
-   final Path path = 
ctx.channel().attr(FileUploadHandler.UPLOADED_FILE).get();
-   if (path == null) {
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   httpRequest,
-   new ErrorResponseBody("Client 
did not upload a file."),
-   HttpResponseStatus.BAD_REQUEST,
-   responseHeaders);
-   return;
-   }
-   //noinspection unchecked
-   request = (R) new FileUpload(path);
-   } else if (msgContent.capacity() == 0) {
-   try {
-   request = MAPPER.readValue("{}", 
untypedResponseMessageHeaders.getRequestClass());
-   } catch (JsonParseException | 
JsonMappingException je) {
-   log.error("Request did not conform to 
expected format.", je);
-   HandlerUtils.sendErrorResponse(
-   ctx,
-   httpRequest,
-   new ErrorResponseBody("Bad 
request received."),
-   HttpResponseStatus.BAD_REQUEST,
-   responseHeaders);
-   return;
+   try (FileUploads uploadedFiles = 
FileUploadHandler.getMultipartFileUploads(ctx)) {
--- End diff --

I would obtain the upload directory from `FileUploadHandler` and simply 
delete this directory after the call has been processed. We could, then also 
create `FileUploads` outside of the `FileUploadHandler` to instantiate a 
`HandlerRequest` with it. This would also simplify the `FileUploads` class 
significantly, because it is no longer responsible for deleting the files.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r19670
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   final Collection directories = new ArrayList<>(1);
+   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
+   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
+   if (Files.isDirectory(fileOrDirectory)) {
+   directories.add(fileOrDirectory);
+   FileAdderVisitor visitor = new 
FileAdderVisitor();
+   Files.walkFileTree(fileOrDirectory, visitor);
+   files.addAll(visitor.get());
+   } else {
+   files.add(fileOrDirectory);
+   }
+   }
+   directoriesToClean = 
Collections.unmodifiableCollection(directories);
+   uploadedFiles = Collections.unmodifiableCollection(files);
+   }
+
+   public Collection getUploadedFiles() {
+   return uploadedFiles;
+   }
+
+   @Override
+   public void close() throws IOException {
+   for (Path file : uploadedFiles) {
+   try {
+   Files.delete(file);
+   } catch (FileNotFoundException ignored) {
+   // file may have been moved by a handler
+   }
+   }
+   for (Path directory : directoriesToClean) {
+   Files.walkFileTree(directory, CleanupFileVisitor.get());
+   }
+   }
+
+   private static final class FileAdderVisitor extends 
SimpleFileVisitor {
+
+   private final Collection files = new ArrayList<>(4);
+
+   Collection get() {
--- End diff --

maybe more descriptive name than `get`.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196452755
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -95,14 +107,22 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
checkState(fileUpload.isCompleted());
 
-   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
-   "_" + 
fileUpload.getFilename()));
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
fileUpload.renameTo(dest.toFile());
-   
ctx.channel().attr(UPLOADED_FILE).set(dest);
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = (Attribute) 
data;
+   // this could also be implemented by 
using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received unknown 
attribute {}, will be ignored.", data.getName());
--- End diff --

Should we rather fail?


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196452583
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -116,5 +136,16 @@ private void reset() {
currentHttpPostRequestDecoder.destroy();
currentHttpPostRequestDecoder = null;
currentHttpRequest = null;
+   currentUploadDir = null;
+   currentJsonPayload = null;
+   }
+
+   public static Optional 
getMultipartJsonPayload(ChannelHandlerContext ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_JSON).get());
+   }
+
+   public static FileUploads getMultipartFileUploads(ChannelHandlerContext 
ctx) {
+   return 
Optional.ofNullable(ctx.channel().attr(UPLOADED_FILES).get())
+   .orElse(FileUploads.EMPTY);
--- End diff --

I would suggest to simply return the upload directory.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196453584
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   final Collection directories = new ArrayList<>(1);
+   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
+   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
+   if (Files.isDirectory(fileOrDirectory)) {
+   directories.add(fileOrDirectory);
+   FileAdderVisitor visitor = new 
FileAdderVisitor();
+   Files.walkFileTree(fileOrDirectory, visitor);
+   files.addAll(visitor.get());
+   } else {
+   files.add(fileOrDirectory);
+   }
+   }
+   directoriesToClean = 
Collections.unmodifiableCollection(directories);
+   uploadedFiles = Collections.unmodifiableCollection(files);
--- End diff --

Let's move this logic out of `FileUploads` and simply initialize it with a 
`Collection`.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196452024
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -95,14 +107,22 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
checkState(fileUpload.isCompleted());
 
-   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
-   "_" + 
fileUpload.getFilename()));
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
fileUpload.renameTo(dest.toFile());
-   
ctx.channel().attr(UPLOADED_FILE).set(dest);
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = (Attribute) 
data;
+   // this could also be implemented by 
using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received unknown 
attribute {}, will be ignored.", data.getName());
+   }
}
}
 
if (httpContent instanceof LastHttpContent) {
+   ctx.channel().attr(UPLOADED_FILES).set(new 
FileUploads(Collections.singleton(currentUploadDir)));
+   
ctx.channel().attr(UPLOADED_JSON).set(currentJsonPayload);
--- End diff --

I think it would be better to not store the JSON payload as an `Attribute` 
but instead forward it via 
`httpContent.replace(Unpooled.wrappedBuffer(currentJsonPayload))`.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196453235
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -95,14 +107,22 @@ protected void channelRead0(final 
ChannelHandlerContext ctx, final HttpObject ms
final DiskFileUpload fileUpload = 
(DiskFileUpload) data;
checkState(fileUpload.isCompleted());
 
-   final Path dest = 
uploadDir.resolve(Paths.get(UUID.randomUUID() +
-   "_" + 
fileUpload.getFilename()));
+   final Path dest = 
currentUploadDir.resolve(fileUpload.getFilename());
fileUpload.renameTo(dest.toFile());
-   
ctx.channel().attr(UPLOADED_FILE).set(dest);
+   } else if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.Attribute) {
+   final Attribute request = (Attribute) 
data;
+   // this could also be implemented by 
using the first found Attribute as the payload
+   if 
(data.getName().equals(HTTP_ATTRIBUTE_REQUEST)) {
+   currentJsonPayload = 
request.get();
+   } else {
+   LOG.warn("Received unknown 
attribute {}, will be ignored.", data.getName());
+   }
}
}
 
if (httpContent instanceof LastHttpContent) {
+   ctx.channel().attr(UPLOADED_FILES).set(new 
FileUploads(Collections.singleton(currentUploadDir)));
--- End diff --

I would suggest to simply store the upload directory in the `UPLOAD_FILES` 
attribute.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196129193
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/FileUploads.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A container for uploaded files.
+ *
+ * Implementation note: The constructor also accepts directories to 
ensure that the upload directories are cleaned up.
+ * For convenience during testing it also accepts files directly.
+ */
+public final class FileUploads implements AutoCloseable {
+   private final Collection directoriesToClean;
+   private final Collection uploadedFiles;
+
+   @SuppressWarnings("resource")
+   public static final FileUploads EMPTY = new FileUploads();
+
+   private FileUploads() {
+   this.directoriesToClean = Collections.emptyList();
+   this.uploadedFiles = Collections.emptyList();
+   }
+
+   public FileUploads(Collection uploadedFilesOrDirectory) throws 
IOException {
+   final Collection files = new ArrayList<>(4);
+   final Collection directories = new ArrayList<>(1);
+   for (Path fileOrDirectory : uploadedFilesOrDirectory) {
+   
Preconditions.checkArgument(fileOrDirectory.isAbsolute(), "Path must be 
absolute.");
+   if (Files.isDirectory(fileOrDirectory)) {
+   directories.add(fileOrDirectory);
+   FileAdderVisitor visitor = new 
FileAdderVisitor();
--- End diff --

This is probably more complex than really necessary; there is no use-case 
for a _nested_ directory structure.


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196090706
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 ---
@@ -52,7 +57,10 @@
 
--- End diff --

should maybe rename the class to `MultipartRequestHandler`


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6178#discussion_r196081080
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.FileUploads;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import okhttp3.MediaType;
+import okhttp3.MultipartBody;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
+ * handled.
+ */
+public class FileUploadHandlerTest {
--- End diff --

needs a test for
* multiple files
* name of uploaded file should be what is specified in the request


---


[GitHub] flink pull request #6178: [FLINK-9599][rest] Implement generic mechanism to ...

2018-06-18 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6178

[FLINK-9599][rest] Implement generic mechanism to access uploaded files

## What is the purpose of the change

This PR extends the existing multipart handling to also support mixed 
multipart message (i.e. requests  containing both JSON and files), and 
generalizes the `FileUpload` handling to provide access to all handlers 
extending `AbstractHandler`

Handlers may access uploaded files via `HandlerRequest#getFileUploads`. 
File uploads must be explicitly allowed by returning true in 
`MessageHeaders#acceptsFileUploads`.
The files and JSON payload are forwarded to the handler by the 
`FileUploadHandler` via channel attributes. If a JSON payload is forwarded this 
way a handler will ignore the content of the received `HttpRequest`.

This PR only covers the server-side; the `RestClient` remains unchanged. 
This will be done in a follow-up.

## Brief change log
* add `MessageHeaders#acceptsFileUploads` to signal that a handler accepts 
file uploads (default=false)
* add `FileUploads` class as a container for uploaded files
* extend `FileUploadHandler` to
  * accept multiple files in one request
  * also accept a JSON payload
  * forward both files and json via channel attributes
* extend `AbstractHandler` to retrieve files/json from channel attributes
  * remove special case for `JarRunHandler` in `AbstractHandler`
* extend `HandlerRequest` to accept a `FileUploads` object
* add `OkHttp` dependency to `flink-runtime` for testing purposes
* update `JarUploadHandler/Headers`

## Verifying this change

added tests:
* FileUploadsTest
* FileUploadHandlerTest
* manually verified via WebUI job submission

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9280_beta

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6178.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6178


commit c2adb3880180d8426697c33035d2b31d57d93952
Author: zentol 
Date:   2018-06-18T08:54:42Z

[FLINK-9599][rest] Implement generic mechanism to access uploaded files




---