[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...

2018-02-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5353


---


[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...

2018-02-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5353#discussion_r166235797
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.taskmanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.AbstractHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.RemovalNotification;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
+import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;
+import 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/**
+ * Base class for serving files from the {@link 

[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...

2018-02-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5353#discussion_r166232567
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java
 ---
@@ -222,7 +221,13 @@ public File getStorageLocation(@Nullable JobID jobId, 
BlobKey key) throws IOExce
}
 
private BlobClient createClient() throws IOException {
-   return new BlobClient(serverAddress, blobClientConfig);
+   final InetSocketAddress currentServerAddress = serverAddress;
+
+   if (currentServerAddress != null) {
+   return new BlobClient(serverAddress, blobClientConfig);
--- End diff --

Definitely. Good catch!


---


[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...

2018-02-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5353#discussion_r166231866
  
--- Diff: 
flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java ---
@@ -285,11 +287,22 @@ private static String createMessageHtmlEntry(Class 
messageClass, Class emp
 
dispatcherGatewayRetriever = () -> null;
resourceManagerGatewayRetriever = () -> null;
+   transientBlobService = 
NoOpTransientBlobService.INSTANCE;
metricQueryServiceRetriever = path -> null;
}
 
private DocumentingDispatcherRestEndpoint() {
-   super(restConfig, dispatcherGatewayRetriever, config, 
handlerConfig, resourceManagerGatewayRetriever, executor, 
metricQueryServiceRetriever, NoOpElectionService.INSTANCE, 
NoOpFatalErrorHandler.INSTANCE);
+   super(
+   restConfig,
+   dispatcherGatewayRetriever,
+   config,
+   handlerConfig,
+   resourceManagerGatewayRetriever,
+   transientBlobService,
--- End diff --

True, will change it.


---


[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...

2018-02-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5353#discussion_r165972923
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.taskmanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.AbstractHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.RemovalNotification;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
+import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;
+import 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/**
+ * Base class for serving files from the {@link 

[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...

2018-02-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5353#discussion_r165950588
  
--- Diff: 
flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java ---
@@ -285,11 +287,22 @@ private static String createMessageHtmlEntry(Class 
messageClass, Class emp
 
dispatcherGatewayRetriever = () -> null;
resourceManagerGatewayRetriever = () -> null;
+   transientBlobService = 
NoOpTransientBlobService.INSTANCE;
metricQueryServiceRetriever = path -> null;
}
 
private DocumentingDispatcherRestEndpoint() {
-   super(restConfig, dispatcherGatewayRetriever, config, 
handlerConfig, resourceManagerGatewayRetriever, executor, 
metricQueryServiceRetriever, NoOpElectionService.INSTANCE, 
NoOpFatalErrorHandler.INSTANCE);
+   super(
+   restConfig,
+   dispatcherGatewayRetriever,
+   config,
+   handlerConfig,
+   resourceManagerGatewayRetriever,
+   transientBlobService,
--- End diff --

I don't think we need these `static final` variables. Can just use 
`NoOpTransientBlobService.INSTANCE` as a constructor argument.


---


[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...

2018-02-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5353#discussion_r165962518
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java
 ---
@@ -222,7 +221,13 @@ public File getStorageLocation(@Nullable JobID jobId, 
BlobKey key) throws IOExce
}
 
private BlobClient createClient() throws IOException {
-   return new BlobClient(serverAddress, blobClientConfig);
+   final InetSocketAddress currentServerAddress = serverAddress;
+
+   if (currentServerAddress != null) {
+   return new BlobClient(serverAddress, blobClientConfig);
--- End diff --

I think you should use `currentServerAddress`, or the creation can fail 
with an NPE. 


---


[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...

2018-01-31 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5353#discussion_r165018462
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 ---
@@ -175,4 +177,13 @@
final AllocationID allocationId,
final Throwable cause,
@RpcTimeout final Time timeout);
+
+   /**
+* Requests the file upload of the specified type to the cluster's 
{@link BlobServer}.
+*
+* @param fileType to upload
+* @param timeout for the asynchronous operation
+* @return Future which is completed with the {@link TransientBlobKey} 
of the uploaded file.
+*/
+   CompletableFuture requestFileUpload(FileType 
fileType, @RpcTimeout Time timeout);
--- End diff --

Yes, we should adapt `RpcTaskManagerGateway` accordingly.


---


[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...

2018-01-31 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5353#discussion_r165017798
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 ---
@@ -175,4 +177,13 @@
final AllocationID allocationId,
final Throwable cause,
@RpcTimeout final Time timeout);
+
+   /**
+* Requests the file upload of the specified type to the cluster's 
{@link BlobServer}.
+*
+* @param fileType to upload
+* @param timeout for the asynchronous operation
+* @return Future which is completed with the {@link TransientBlobKey} 
of the uploaded file.
+*/
+   CompletableFuture requestFileUpload(FileType 
fileType, @RpcTimeout Time timeout);
--- End diff --

Yes we do. Good catch. Will apply the change.


---


[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...

2018-01-31 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5353#discussion_r165017616
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 ---
@@ -175,4 +177,13 @@
final AllocationID allocationId,
final Throwable cause,
@RpcTimeout final Time timeout);
+
+   /**
+* Requests the file upload of the specified type to the cluster's 
{@link BlobServer}.
+*
+* @param fileType to upload
+* @param timeout for the asynchronous operation
+* @return Future which is completed with the {@link TransientBlobKey} 
of the uploaded file.
+*/
+   CompletableFuture requestFileUpload(FileType 
fileType, @RpcTimeout Time timeout);
--- End diff --

Not strictly, but I thought that we could combine both methods in 
`requestFileUpload` which will give us a leaner interface.


---


[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...

2018-01-31 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5353#discussion_r164992599
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 ---
@@ -175,4 +177,13 @@
final AllocationID allocationId,
final Throwable cause,
@RpcTimeout final Time timeout);
+
+   /**
+* Requests the file upload of the specified type to the cluster's 
{@link BlobServer}.
+*
+* @param fileType to upload
+* @param timeout for the asynchronous operation
+* @return Future which is completed with the {@link TransientBlobKey} 
of the uploaded file.
+*/
+   CompletableFuture requestFileUpload(FileType 
fileType, @RpcTimeout Time timeout);
--- End diff --

Don't we need to clean up these methods?
```
@Override
public CompletableFuture requestTaskManagerLog(Time 
timeout) {
//  return taskExecutorGateway.requestTaskManagerLog(timeout);
throw new UnsupportedOperationException("Operation is not yet 
supported.");
}

@Override
public CompletableFuture 
requestTaskManagerStdout(Time timeout) {
//  return taskExecutorGateway.requestTaskManagerStdout(timeout);
throw new UnsupportedOperationException("Operation is not yet 
supported.");
}
```


---


[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...

2018-01-24 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5353

[FLINK-8503] [flip6] Display TaskExecutor logs and stdout files in web ui

## What is the purpose of the change

Introduce the AbstractHandler which takes a typed request and returns an 
untyped
response. The AbstractRestHandler extends the AbstractHandler to add typed 
reponses.

Introduce AbstractTaskManagerFileHandler which encapsulates the file 
loading logic.
Upon request of a TaskManager file, the handler will trigger the file 
upload via
the ResourceManager. The returned TransientBlobKey is then downloaded via 
the
TransientBlobService. Once downloaded, the file is served to the client. 
Each
transient blob key is cached for maximum duration after which it is purged 
and has
to be reuploaded by the TaskExecutor.

This PR is based on #5341 

## Brief change log

- Introduced untyped response handler `AbstractHandler`
- Added `AbstractTaskManagerFileHandler` which is responsible for serving 
files from the `TaskExecutor`
- The `AbstractTaskManagerFileHandler` triggers the file upload via the 
`ResourceManager` which knows the `TaskExecutors`, additionally it caches the 
`TransientBlobKeys` in order to not always trigger a file upload
- Added `TaskManagerLogFileHandler` to serve the log file
- Added `TaskManagerStdoutFileHandler` to serve the stdout file

## Verifying this change

- Added `AbstractTaskManagerFileHandlerTest`
- Tested functionality manually

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
enableTaskManagerLogRetrieval

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5353.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5353


commit b9db2ab22c346ec64363b446ed23692af1365239
Author: Till Rohrmann 
Date:   2018-01-23T18:12:27Z

[FLINK-8501] [flip6] Use single BlobCacheService per TaskExecutor

Instead of creating for each new JobManagerConnection a dedicated 
BlobCacheService
the TaskExecutor uses a single BlobCacheService which it shares between the
different JobManagerConnections. The initial BlobServer address is passed 
by the
ResourceManager when the TaskExecutor registers at it. In order to avoid 
the re-
creation of BlobCacheServices, this commit changes the behaviour such that 
one can
update the BlobServer address.

commit 5bd4db619ff0e984477ead323345f5f7fa740626
Author: Till Rohrmann 
Date:   2018-01-24T12:41:53Z

[hotfix] [tests] Remove JobManagerRunnerMockTest

The JobManagerRunnerMockTest is completely ignored. Moreover, it tests 
things with
heavy usage of Mockito which is hard to maintain.

commit c38adb65dd49150a03a2f5f7ea8d421b2ef34616
Author: Till Rohrmann 
Date:   2018-01-24T12:42:41Z

[FLINK-8502] [flip6] Remove LibraryCacheManager from JobMaster

This commit removes the LibraryCacheManager from the JobMaster since it is
no longer needed. The JobMaster is started with the correct user code class
loader and, thus, does not need the LibraryCacheManager.

This commit also corrects that the BlobServer is not closed by the
JobManagerServices#shutdown method.

commit eb3fed4ce96b4ce27f01d72f2dc76b151dbdd1ae
Author: Till Rohrmann 
Date:   2018-01-23T14:17:16Z

[FLINK-8495] [flip6] Enable main cluster component's log and stdout file 
retrieval

This commit enables the log and stdout file retrieval of the cluster's main 
component
via the web ui. This happens via the StaticFileServerHandler which serves 
the log
and stdout file.

commit 23496241962c69ceb3ada9bd0d67c0344554cf99
Author: Till Rohrmann 
Date:   2018-01-23T17:27:28Z

[FLINK-8503] [flip6] Display TaskExecutor logs and stdout files in web ui

Introduce the AbstractHandler which takes a typed request and returns an 
untyped
response. The AbstractRestHandler extends the AbstractHandler to add typed 
reponses.

Introduce