[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5353 ---
[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5353#discussion_r166235797 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java --- @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.taskmanager; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.TransientBlobKey; +import org.apache.flink.runtime.blob.TransientBlobService; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rest.AbstractHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader; +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; +import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalNotification; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile; +import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future; +import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * Base class for serving files from the {@link T
[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5353#discussion_r166232567 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java --- @@ -222,7 +221,13 @@ public File getStorageLocation(@Nullable JobID jobId, BlobKey key) throws IOExce } private BlobClient createClient() throws IOException { - return new BlobClient(serverAddress, blobClientConfig); + final InetSocketAddress currentServerAddress = serverAddress; + + if (currentServerAddress != null) { + return new BlobClient(serverAddress, blobClientConfig); --- End diff -- Definitely. Good catch! ---
[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5353#discussion_r166231866 --- Diff: flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java --- @@ -285,11 +287,22 @@ private static String createMessageHtmlEntry(Class messageClass, Class emp dispatcherGatewayRetriever = () -> null; resourceManagerGatewayRetriever = () -> null; + transientBlobService = NoOpTransientBlobService.INSTANCE; metricQueryServiceRetriever = path -> null; } private DocumentingDispatcherRestEndpoint() { - super(restConfig, dispatcherGatewayRetriever, config, handlerConfig, resourceManagerGatewayRetriever, executor, metricQueryServiceRetriever, NoOpElectionService.INSTANCE, NoOpFatalErrorHandler.INSTANCE); + super( + restConfig, + dispatcherGatewayRetriever, + config, + handlerConfig, + resourceManagerGatewayRetriever, + transientBlobService, --- End diff -- True, will change it. ---
[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5353#discussion_r165972923 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java --- @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.taskmanager; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.blob.TransientBlobKey; +import org.apache.flink.runtime.blob.TransientBlobService; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rest.AbstractHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader; +import org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache; +import org.apache.flink.shaded.guava18.com.google.common.cache.RemovalNotification; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; +import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile; +import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future; +import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * Base class for serving files from the {@link TaskExecut
[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5353#discussion_r165950588 --- Diff: flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java --- @@ -285,11 +287,22 @@ private static String createMessageHtmlEntry(Class messageClass, Class emp dispatcherGatewayRetriever = () -> null; resourceManagerGatewayRetriever = () -> null; + transientBlobService = NoOpTransientBlobService.INSTANCE; metricQueryServiceRetriever = path -> null; } private DocumentingDispatcherRestEndpoint() { - super(restConfig, dispatcherGatewayRetriever, config, handlerConfig, resourceManagerGatewayRetriever, executor, metricQueryServiceRetriever, NoOpElectionService.INSTANCE, NoOpFatalErrorHandler.INSTANCE); + super( + restConfig, + dispatcherGatewayRetriever, + config, + handlerConfig, + resourceManagerGatewayRetriever, + transientBlobService, --- End diff -- I don't think we need these `static final` variables. Can just use `NoOpTransientBlobService.INSTANCE` as a constructor argument. ---
[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5353#discussion_r165962518 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/TransientBlobCache.java --- @@ -222,7 +221,13 @@ public File getStorageLocation(@Nullable JobID jobId, BlobKey key) throws IOExce } private BlobClient createClient() throws IOException { - return new BlobClient(serverAddress, blobClientConfig); + final InetSocketAddress currentServerAddress = serverAddress; + + if (currentServerAddress != null) { + return new BlobClient(serverAddress, blobClientConfig); --- End diff -- I think you should use `currentServerAddress`, or the creation can fail with an NPE. ---
[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5353#discussion_r165018462 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java --- @@ -175,4 +177,13 @@ final AllocationID allocationId, final Throwable cause, @RpcTimeout final Time timeout); + + /** +* Requests the file upload of the specified type to the cluster's {@link BlobServer}. +* +* @param fileType to upload +* @param timeout for the asynchronous operation +* @return Future which is completed with the {@link TransientBlobKey} of the uploaded file. +*/ + CompletableFuture requestFileUpload(FileType fileType, @RpcTimeout Time timeout); --- End diff -- Yes, we should adapt `RpcTaskManagerGateway` accordingly. ---
[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5353#discussion_r165017798 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java --- @@ -175,4 +177,13 @@ final AllocationID allocationId, final Throwable cause, @RpcTimeout final Time timeout); + + /** +* Requests the file upload of the specified type to the cluster's {@link BlobServer}. +* +* @param fileType to upload +* @param timeout for the asynchronous operation +* @return Future which is completed with the {@link TransientBlobKey} of the uploaded file. +*/ + CompletableFuture requestFileUpload(FileType fileType, @RpcTimeout Time timeout); --- End diff -- Yes we do. Good catch. Will apply the change. ---
[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5353#discussion_r165017616 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java --- @@ -175,4 +177,13 @@ final AllocationID allocationId, final Throwable cause, @RpcTimeout final Time timeout); + + /** +* Requests the file upload of the specified type to the cluster's {@link BlobServer}. +* +* @param fileType to upload +* @param timeout for the asynchronous operation +* @return Future which is completed with the {@link TransientBlobKey} of the uploaded file. +*/ + CompletableFuture requestFileUpload(FileType fileType, @RpcTimeout Time timeout); --- End diff -- Not strictly, but I thought that we could combine both methods in `requestFileUpload` which will give us a leaner interface. ---
[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5353#discussion_r164992599 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java --- @@ -175,4 +177,13 @@ final AllocationID allocationId, final Throwable cause, @RpcTimeout final Time timeout); + + /** +* Requests the file upload of the specified type to the cluster's {@link BlobServer}. +* +* @param fileType to upload +* @param timeout for the asynchronous operation +* @return Future which is completed with the {@link TransientBlobKey} of the uploaded file. +*/ + CompletableFuture requestFileUpload(FileType fileType, @RpcTimeout Time timeout); --- End diff -- Don't we need to clean up these methods? ``` @Override public CompletableFuture requestTaskManagerLog(Time timeout) { // return taskExecutorGateway.requestTaskManagerLog(timeout); throw new UnsupportedOperationException("Operation is not yet supported."); } @Override public CompletableFuture requestTaskManagerStdout(Time timeout) { // return taskExecutorGateway.requestTaskManagerStdout(timeout); throw new UnsupportedOperationException("Operation is not yet supported."); } ``` ---
[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/5353 [FLINK-8503] [flip6] Display TaskExecutor logs and stdout files in web ui ## What is the purpose of the change Introduce the AbstractHandler which takes a typed request and returns an untyped response. The AbstractRestHandler extends the AbstractHandler to add typed reponses. Introduce AbstractTaskManagerFileHandler which encapsulates the file loading logic. Upon request of a TaskManager file, the handler will trigger the file upload via the ResourceManager. The returned TransientBlobKey is then downloaded via the TransientBlobService. Once downloaded, the file is served to the client. Each transient blob key is cached for maximum duration after which it is purged and has to be reuploaded by the TaskExecutor. This PR is based on #5341 ## Brief change log - Introduced untyped response handler `AbstractHandler` - Added `AbstractTaskManagerFileHandler` which is responsible for serving files from the `TaskExecutor` - The `AbstractTaskManagerFileHandler` triggers the file upload via the `ResourceManager` which knows the `TaskExecutors`, additionally it caches the `TransientBlobKeys` in order to not always trigger a file upload - Added `TaskManagerLogFileHandler` to serve the log file - Added `TaskManagerStdoutFileHandler` to serve the stdout file ## Verifying this change - Added `AbstractTaskManagerFileHandlerTest` - Tested functionality manually ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink enableTaskManagerLogRetrieval Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5353.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5353 commit b9db2ab22c346ec64363b446ed23692af1365239 Author: Till Rohrmann Date: 2018-01-23T18:12:27Z [FLINK-8501] [flip6] Use single BlobCacheService per TaskExecutor Instead of creating for each new JobManagerConnection a dedicated BlobCacheService the TaskExecutor uses a single BlobCacheService which it shares between the different JobManagerConnections. The initial BlobServer address is passed by the ResourceManager when the TaskExecutor registers at it. In order to avoid the re- creation of BlobCacheServices, this commit changes the behaviour such that one can update the BlobServer address. commit 5bd4db619ff0e984477ead323345f5f7fa740626 Author: Till Rohrmann Date: 2018-01-24T12:41:53Z [hotfix] [tests] Remove JobManagerRunnerMockTest The JobManagerRunnerMockTest is completely ignored. Moreover, it tests things with heavy usage of Mockito which is hard to maintain. commit c38adb65dd49150a03a2f5f7ea8d421b2ef34616 Author: Till Rohrmann Date: 2018-01-24T12:42:41Z [FLINK-8502] [flip6] Remove LibraryCacheManager from JobMaster This commit removes the LibraryCacheManager from the JobMaster since it is no longer needed. The JobMaster is started with the correct user code class loader and, thus, does not need the LibraryCacheManager. This commit also corrects that the BlobServer is not closed by the JobManagerServices#shutdown method. commit eb3fed4ce96b4ce27f01d72f2dc76b151dbdd1ae Author: Till Rohrmann Date: 2018-01-23T14:17:16Z [FLINK-8495] [flip6] Enable main cluster component's log and stdout file retrieval This commit enables the log and stdout file retrieval of the cluster's main component via the web ui. This happens via the StaticFileServerHandler which serves the log and stdout file. commit 23496241962c69ceb3ada9bd0d67c0344554cf99 Author: Till Rohrmann Date: 2018-01-23T17:27:28Z [FLINK-8503] [flip6] Display TaskExecutor logs and stdout files in web ui Introduce the AbstractHandler which takes a typed request and returns an untyped response. The AbstractRestHandler extends the AbstractHandler to add typed reponses. Introduce AbstractTaskManagerFileHandler which encapsulates the file loading logic.