[FLINK-2732] Display TM logs in Dashboard

This closes #1790


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d53bbc4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d53bbc4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d53bbc4

Branch: refs/heads/master
Commit: 6d53bbc4b92e651786ecc8c2c6dfeb8e450a16a3
Parents: 5f993c6
Author: zentol <ches...@apache.org>
Authored: Tue Apr 5 17:29:55 2016 +0200
Committer: zentol <ches...@apache.org>
Committed: Mon Apr 18 11:20:08 2016 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |   5 +
 .../webmonitor/RuntimeMonitorHandler.java       |  55 +---
 .../webmonitor/RuntimeMonitorHandlerBase.java   |  95 +++++++
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  19 +-
 .../handlers/TaskManagerLogHandler.java         | 281 +++++++++++++++++++
 .../app/partials/taskmanager/taskmanager.jade   |   8 +-
 .../partials/taskmanager/taskmanager.log.jade   |  37 +++
 .../taskmanager/taskmanager.stdout.jade         |  37 +++
 .../web-dashboard/app/scripts/index.coffee      |  16 +-
 .../modules/taskmanager/taskmanager.ctrl.coffee |  23 ++
 .../modules/taskmanager/taskmanager.svc.coffee  |  20 +-
 flink-runtime-web/web-dashboard/web/js/index.js |  68 ++++-
 .../web/partials/taskmanager/taskmanager.html   |   6 +-
 .../partials/taskmanager/taskmanager.log.html   |  39 +++
 .../taskmanager/taskmanager.stdout.html         |  39 +++
 .../apache/flink/runtime/blob/BlobCache.java    |  16 ++
 .../apache/flink/runtime/blob/BlobServer.java   |   6 +
 .../apache/flink/runtime/blob/BlobService.java  |   2 +
 .../runtime/messages/TaskManagerMessages.scala  |  29 ++
 .../flink/runtime/taskmanager/TaskManager.scala |  45 ++-
 .../apache/flink/test/util/TestBaseUtils.java   |   2 +
 .../flink/test/web/WebFrontendITCase.java       |  29 ++
 22 files changed, 812 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 53d9a37..6b3974b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -161,6 +161,11 @@ public final class ConfigConstants {
        public static final String TASK_MANAGER_TMP_DIR_KEY = 
"taskmanager.tmp.dirs";
 
        /**
+        * The config parameter defining the taskmanager log file location
+        */
+       public static final String TASK_MANAGER_LOG_PATH_KEY = 
"taskmanager.log.path";
+
+       /**
         * The config parameter defining the amount of memory to be allocated 
by the task manager's
         * memory manager (in megabytes). If not set, a relative fraction will 
be allocated, as defined
         * by {@link #TASK_MANAGER_MEMORY_FRACTION_KEY}.

http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index c304abb..36fd83a 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -22,25 +22,19 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.handler.codec.http.HttpResponse;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpVersion;
 import io.netty.handler.codec.http.router.KeepAliveWrite;
 import io.netty.handler.codec.http.router.Routed;
 
 import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.Tuple2;
-import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -58,7 +52,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
  * proper codes, like OK, NOT_FOUND, or SERVER_ERROR.
  */
 @ChannelHandler.Sharable
-public class RuntimeMonitorHandler extends SimpleChannelInboundHandler<Routed> 
{
+public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(RuntimeMonitorHandler.class);
 
@@ -66,60 +60,19 @@ public class RuntimeMonitorHandler extends 
SimpleChannelInboundHandler<Routed> {
 
        public static final String WEB_MONITOR_ADDRESS_KEY = 
"web.monitor.address";
        
-
-       private final RequestHandler handler;
-
-       private final JobManagerRetriever retriever;
-
-       private final Future<String> localJobManagerAddressFuture;
-
-       private final FiniteDuration timeout;
-
-       private String localJobManagerAddress;
-       
+       private final RequestHandler handler;   
 
        public RuntimeMonitorHandler(
                        RequestHandler handler,
                        JobManagerRetriever retriever,
                        Future<String> localJobManagerAddressFuture,
                        FiniteDuration timeout) {
-
+               super(retriever, localJobManagerAddressFuture, timeout);
                this.handler = checkNotNull(handler);
-               this.retriever = checkNotNull(retriever);
-               this.localJobManagerAddressFuture = 
checkNotNull(localJobManagerAddressFuture);
-               this.timeout = checkNotNull(timeout);
        }
 
        @Override
-       protected void channelRead0(ChannelHandlerContext ctx, Routed routed) 
throws Exception {
-               if (localJobManagerAddressFuture.isCompleted()) {
-                       if (localJobManagerAddress == null) {
-                               localJobManagerAddress = 
Await.result(localJobManagerAddressFuture, timeout);
-                       }
-
-                       Option<Tuple2<ActorGateway, Integer>> jobManager = 
retriever.getJobManagerGatewayAndWebPort();
-
-                       if (jobManager.isDefined()) {
-                               Tuple2<ActorGateway, Integer> gatewayPort = 
jobManager.get();
-                               String redirectAddress = 
HandlerRedirectUtils.getRedirectAddress(
-                                       localJobManagerAddress, gatewayPort);
-
-                               if (redirectAddress != null) {
-                                       HttpResponse redirect = 
HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path());
-                                       KeepAliveWrite.flush(ctx, 
routed.request(), redirect);
-                               }
-                               else {
-                                       respondAsLeader(ctx, routed, 
gatewayPort._1());
-                               }
-                       } else {
-                               KeepAliveWrite.flush(ctx, routed.request(), 
HandlerRedirectUtils.getUnavailableResponse());
-                       }
-               } else {
-                       KeepAliveWrite.flush(ctx, routed.request(), 
HandlerRedirectUtils.getUnavailableResponse());
-               }
-       }
-
-       private void respondAsLeader(ChannelHandlerContext ctx, Routed routed, 
ActorGateway jobManager) {
+       protected void respondAsLeader(ChannelHandlerContext ctx, Routed 
routed, ActorGateway jobManager) {
                DefaultFullHttpResponse response;
 
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
new file mode 100644
index 0000000..7a38de4
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.router.KeepAliveWrite;
+import io.netty.handler.codec.http.router.Routed;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
+import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
+import scala.Option;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * The Netty channel handler that processes all HTTP requests.
+ * This handler takes the path parameters and delegates the work to a {@link 
RequestHandler}.
+ * This handler also deals with setting correct response MIME types and 
returning
+ * proper codes, like OK, NOT_FOUND, or SERVER_ERROR.
+ */
+@ChannelHandler.Sharable
+public abstract class RuntimeMonitorHandlerBase extends 
SimpleChannelInboundHandler<Routed> {
+
+       private final JobManagerRetriever retriever;
+
+       protected final Future<String> localJobManagerAddressFuture;
+
+       protected final FiniteDuration timeout;
+
+       protected String localJobManagerAddress;
+       
+       public RuntimeMonitorHandlerBase(
+               JobManagerRetriever retriever,
+               Future<String> localJobManagerAddressFuture,
+               FiniteDuration timeout) {
+
+               this.retriever = checkNotNull(retriever);
+               this.localJobManagerAddressFuture = 
checkNotNull(localJobManagerAddressFuture);
+               this.timeout = checkNotNull(timeout);
+       }
+
+       @Override
+       protected void channelRead0(ChannelHandlerContext ctx, Routed routed) 
throws Exception {
+               if (localJobManagerAddressFuture.isCompleted()) {
+                       if (localJobManagerAddress == null) {
+                               localJobManagerAddress = 
Await.result(localJobManagerAddressFuture, timeout);
+                       }
+
+                       Option<Tuple2<ActorGateway, Integer>> jobManager = 
retriever.getJobManagerGatewayAndWebPort();
+
+                       if (jobManager.isDefined()) {
+                               Tuple2<ActorGateway, Integer> gatewayPort = 
jobManager.get();
+                               String redirectAddress = 
HandlerRedirectUtils.getRedirectAddress(
+                                       localJobManagerAddress, gatewayPort);
+
+                               if (redirectAddress != null) {
+                                       HttpResponse redirect = 
HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path());
+                                       KeepAliveWrite.flush(ctx, 
routed.request(), redirect);
+                               }
+                               else {
+                                       respondAsLeader(ctx, routed, 
gatewayPort._1());
+                               }
+                       } else {
+                               KeepAliveWrite.flush(ctx, routed.request(), 
HandlerRedirectUtils.getUnavailableResponse());
+                       }
+               } else {
+                       KeepAliveWrite.flush(ctx, routed.request(), 
HandlerRedirectUtils.getUnavailableResponse());
+               }
+       }
+
+       protected abstract void respondAsLeader(ChannelHandlerContext ctx, 
Routed routed, ActorGateway jobManager);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 582004a..b583ca5 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -65,9 +65,12 @@ import 
org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptAccum
 import 
org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptDetailsHandler;
 import 
org.apache.flink.runtime.webmonitor.handlers.SubtasksAllAccumulatorsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.SubtasksTimesHandler;
+import org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler;
 import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.ExecutionContext$;
+import scala.concurrent.ExecutionContextExecutor;
 import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -75,6 +78,8 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -126,6 +131,8 @@ public class WebRuntimeMonitor implements WebMonitor {
 
        private AtomicBoolean cleanedUp = new AtomicBoolean();
 
+       private ExecutorService executorService;
+
        public WebRuntimeMonitor(
                        Configuration config,
                        LeaderRetrievalService leaderRetrievalService,
@@ -193,6 +200,10 @@ public class WebRuntimeMonitor implements WebMonitor {
 
                // 
--------------------------------------------------------------------
 
+               executorService = new ForkJoinPool();
+
+               ExecutionContextExecutor context = 
ExecutionContext$.MODULE$.fromExecutor(executorService);
+
                router = new Router()
                        // config how to interact with this web server
                        .GET("/config", handler(new 
DashboardConfigHandler(cfg.getRefreshInterval())))
@@ -234,7 +245,11 @@ public class WebRuntimeMonitor implements WebMonitor {
                        .GET("/jobs/:jobid/checkpoints", handler(new 
JobCheckpointsHandler(currentGraphs)))
 
                        .GET("/taskmanagers", handler(new 
TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
-                       .GET("/taskmanagers/:" + 
TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new 
TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
+                       .GET("/taskmanagers/:" + 
TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new 
TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
+                       .GET("/taskmanagers/:" + 
TaskManagersHandler.TASK_MANAGER_ID_KEY + "/log", 
+                               new TaskManagerLogHandler(retriever, context, 
jobManagerAddressPromise.future(), timeout, TaskManagerLogHandler.FileMode.LOG, 
config))
+                       .GET("/taskmanagers/:" + 
TaskManagersHandler.TASK_MANAGER_ID_KEY + "/stdout", 
+                               new TaskManagerLogHandler(retriever, context, 
jobManagerAddressPromise.future(), timeout, 
TaskManagerLogHandler.FileMode.STDOUT, config))
 
                        // log and stdout
                        .GET("/jobmanager/log", logFiles.logFile == null ? new 
ConstantTextHandler("(log file unavailable)") :
@@ -377,6 +392,8 @@ public class WebRuntimeMonitor implements WebMonitor {
 
                        backPressureStatsTracker.shutDown();
 
+                       executorService.shutdownNow();
+
                        cleanup();
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
new file mode 100644
index 0000000..38957dc
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+/*****************************************************************************
+ * This code is based on the "HttpStaticFileServerHandler" from the
+ * Netty project's HTTP server example.
+ *
+ * See http://netty.io and
+ * 
https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
+ *****************************************************************************/
+
+import akka.dispatch.Mapper;
+import akka.dispatch.OnFailure;
+import akka.dispatch.OnSuccess;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.router.Routed;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobCache;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.concurrent.ExecutionContextExecutor;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.nio.channels.FileChannel;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/**
+ * Request handler that returns the TaskManager log/out files.
+ *
+ * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty 
project's HTTP server
+ * example.</p>
+ */
+@ChannelHandler.Sharable
+public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase {
+       private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerLogHandler.class);
+
+       /** Keep track of last transmitted log, to clean up old ones */
+       private final HashMap<String, BlobKey> lastSubmittedLog = new 
HashMap<>();
+       private final HashMap<String, BlobKey> lastSubmittedStdout = new 
HashMap<>();
+
+       /** Keep track of request status, prevents multiple log requests for a 
single TM running concurrently */
+       private final ConcurrentHashMap<String, Boolean> lastRequestPending = 
new ConcurrentHashMap<>();
+       private final Configuration config;
+
+       /** */
+       private Future<BlobCache> cache;
+
+       /** Indicates which log file should be displayed; true indicates .log, 
false indicates .out */
+       private boolean serveLogFile;
+
+       private final ExecutionContextExecutor executor;
+
+       public enum FileMode {
+               LOG,
+               STDOUT
+       }
+
+       public TaskManagerLogHandler(
+               JobManagerRetriever retriever,
+               ExecutionContextExecutor executor,
+               Future<String> localJobManagerAddressPromise,
+               FiniteDuration timeout,
+               FileMode fileMode,
+               Configuration config) throws IOException {
+               super(retriever, localJobManagerAddressPromise, timeout);
+
+               this.executor = checkNotNull(executor);
+               this.config = config;
+               switch (fileMode) {
+                       case LOG:
+                               serveLogFile = true;
+                               break;
+                       case STDOUT:
+                               serveLogFile = false;
+                               break;
+               }
+       }
+
+       /**
+        * Response when running with leading JobManager.
+        */
+       @Override
+       protected void respondAsLeader(final ChannelHandlerContext ctx, final 
Routed routed, final ActorGateway jobManager) {
+               if (cache == null) {
+                       Future<Object> portFuture = 
jobManager.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout);
+                       cache = portFuture.map(new Mapper<Object, BlobCache>() {
+                               @Override
+                               public BlobCache apply(Object result) {
+                                       Option<String> hostOption = 
jobManager.actor().path().address().host();
+                                       String host = hostOption.isDefined() ? 
hostOption.get() : "localhost";
+                                       int port = (int) result;
+                                       return new BlobCache(new 
InetSocketAddress(host, port), config);
+                               }
+                       }, executor);
+               }
+
+               final String taskManagerID = 
routed.pathParams().get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
+               final HttpRequest request = routed.request();
+
+               //fetch TaskManager logs if no other process is currently doing 
it
+               if (lastRequestPending.putIfAbsent(taskManagerID, true) == 
null) {
+                       try {
+                               InstanceID instanceID = new 
InstanceID(StringUtils.hexStringToByte(taskManagerID));
+                               Future<Object> taskManagerFuture = 
jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), 
timeout);
+
+                               Future<Object> blobKeyFuture = 
taskManagerFuture.flatMap(new Mapper<Object, Future<Object>>() {
+                                       @Override
+                                       public Future<Object> apply(Object 
instance) {
+                                               Instance taskManager = 
((JobManagerMessages.TaskManagerInstance) instance).instance().get();
+                                               return 
taskManager.getActorGateway().ask(serveLogFile ? 
TaskManagerMessages.getRequestTaskManagerLog() : 
TaskManagerMessages.getRequestTaskManagerStdout(), timeout);
+                                       }
+                               }, executor);
+
+                               Future<Object> logPathFuture = 
cache.zip(blobKeyFuture).map(new Mapper<Tuple2<BlobCache, Object>, Object>() {
+                                       @Override
+                                       public Object 
checkedApply(Tuple2<BlobCache, Object> instance) throws Exception {
+                                               BlobCache cache = instance._1();
+                                               if (instance._2() instanceof 
Exception) {
+                                                       throw (Exception) 
instance._2();
+                                               }
+                                               BlobKey blobKey = (BlobKey) 
instance._2();
+
+                                               //delete previous log file, if 
it is different than the current one
+                                               HashMap<String, BlobKey> 
lastSubmittedFile = serveLogFile ? lastSubmittedLog : lastSubmittedStdout;
+                                               if 
(lastSubmittedFile.containsKey(taskManagerID)) {
+                                                       if 
(!blobKey.equals(lastSubmittedFile.get(taskManagerID))) {
+                                                               
cache.deleteGlobal(lastSubmittedFile.get(taskManagerID));
+                                                               
lastSubmittedFile.put(taskManagerID, blobKey);
+                                                       }
+                                               } else {
+                                                       
lastSubmittedFile.put(taskManagerID, blobKey);
+                                               }
+                                               return 
cache.getURL(blobKey).getFile();
+                                       }
+                               }, executor);
+
+                               logPathFuture.onFailure(new OnFailure() {
+                                       @Override
+                                       public void onFailure(Throwable 
failure) throws Throwable {
+                                               display(ctx, request, "Fetching 
TaskManager log failed.");
+                                               LOG.error("Fetching TaskManager 
log failed.", failure);
+                                               
lastRequestPending.remove(taskManagerID);
+                                       }
+                               }, executor);
+
+                               logPathFuture.onSuccess(new OnSuccess<Object>() 
{
+                                       @Override
+                                       public void onSuccess(Object 
filePathOption) throws Throwable {
+                                               String filePath = (String) 
filePathOption;
+
+                                               File file = new File(filePath);
+                                               final RandomAccessFile raf;
+                                               try {
+                                                       raf = new 
RandomAccessFile(file, "r");
+                                               } catch (FileNotFoundException 
e) {
+                                                       display(ctx, request, 
"Displaying TaskManager log failed.");
+                                                       LOG.error("Displaying 
TaskManager log failed.", e);
+                                                       return;
+                                               }
+                                               long fileLength = raf.length();
+                                               final FileChannel fc = 
raf.getChannel();
+
+                                               HttpResponse response = new 
DefaultHttpResponse(HTTP_1_1, OK);
+                                               
response.headers().set(CONTENT_TYPE, "text/plain");
+
+                                               if 
(HttpHeaders.isKeepAlive(request)) {
+                                                       
response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+                                               }
+                                               
HttpHeaders.setContentLength(response, fileLength);
+
+                                               // write the initial line and 
the header.
+                                               ctx.write(response);
+
+                                               // write the content.
+                                               ctx.write(new 
DefaultFileRegion(fc, 0, fileLength), ctx.newProgressivePromise())
+                                                       .addListener(new 
GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() {
+                                                               @Override
+                                                               public void 
operationComplete(io.netty.util.concurrent.Future<? super Void> future) throws 
Exception {
+                                                                       
lastRequestPending.remove(taskManagerID);
+                                                                       
fc.close();
+                                                                       
raf.close();
+                                                               }
+                                                       });
+                                               ChannelFuture lastContentFuture 
= ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+
+                                               // close the connection, if no 
keep-alive is needed
+                                               if 
(!HttpHeaders.isKeepAlive(request)) {
+                                                       
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+                                               }
+                                       }
+                               }, executor);
+                       } catch (Exception e) {
+                               display(ctx, request, "Error: " + 
e.getMessage());
+                               LOG.error("Fetching TaskManager log failed.", 
e);
+                               lastRequestPending.remove(taskManagerID);
+                       }
+               } else {
+                       display(ctx, request, "loading...");
+               }
+       }
+
+       private void display(ChannelHandlerContext ctx, HttpRequest request, 
String message) {
+               HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+               response.headers().set(CONTENT_TYPE, "text/plain");
+
+               if (HttpHeaders.isKeepAlive(request)) {
+                       response.headers().set(CONNECTION, 
HttpHeaders.Values.KEEP_ALIVE);
+               }
+
+               byte[] buf = message.getBytes();
+
+               ByteBuf b = Unpooled.copiedBuffer(buf);
+
+               HttpHeaders.setContentLength(response, buf.length);
+
+               // write the initial line and the header.
+               ctx.write(response);
+
+               ctx.write(b);
+
+               ChannelFuture lastContentFuture = 
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+
+               // close the connection, if no keep-alive is needed
+               if (!HttpHeaders.isKeepAlive(request)) {
+                       
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.jade
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.jade 
b/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.jade
index 23809de..5223d17 100644
--- a/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.jade
+++ b/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.jade
@@ -15,7 +15,7 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 
-nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="metrics.id")
+nav.navbar.navbar-default.navbar-fixed-top.navbar-main
   #fold-button.btn.btn-default.navbar-btn.pull-left(ng-click='showSidebar()')
     i.fa.fa-navicon
 
@@ -26,10 +26,14 @@ 
nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="metrics.id")
   .navbar-info.last.first.hidden-xs.hidden-sm
     | {{metrics.path}}
 
-nav.navbar.navbar-default.navbar-fixed-top.navbar-main-additional(ng-if="metrics.id")
+nav.navbar.navbar-default.navbar-fixed-top.navbar-main-additional
   ul.nav.nav-tabs
     li(ui-sref-active='active')
       a(ui-sref=".metrics") Metrics
+    li(ui-sref-active='active')
+      a(ui-sref=".log") Logs
+    li(ui-sref-active='active')
+      a(ui-sref=".stdout") Stdout
 
 #content-inner.has-navbar-main-additional
   div(ui-view="details")

http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.log.jade
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.log.jade 
b/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.log.jade
new file mode 100644
index 0000000..7a253a4
--- /dev/null
+++ 
b/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.log.jade
@@ -0,0 +1,37 @@
+//
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+ 
+      http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+table.table.table-properties
+  thead
+    tr
+      th(colspan="2")
+        .row
+          .col-xs-10
+            | Task Manager Logs
+          .col-xs-1.text-right
+            a(ng-click="reloadData()" class="show-pointer")
+              i.fa.fa-refresh
+          .col-xs-1.text-left
+            a(ng-click="downloadData()" class="show-pointer")
+              i.fa.fa-download
+
+  tbody
+    tr
+      td(colspan="2")
+        pre
+          | {{log}}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.stdout.jade
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.stdout.jade
 
b/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.stdout.jade
new file mode 100644
index 0000000..438a2e8
--- /dev/null
+++ 
b/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.stdout.jade
@@ -0,0 +1,37 @@
+//
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+ 
+      http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+table.table.table-properties
+  thead
+    tr
+      th(colspan="2")
+        .row
+          .col-xs-10
+            | Task Manager Output
+          .col-xs-1.text-right
+            a(ng-click="reloadData()" class="show-pointer")
+              i.fa.fa-refresh
+          .col-xs-1.text-left
+            a(ng-click="downloadData()" class="show-pointer")
+              i.fa.fa-download
+
+  tbody
+    tr
+      td(colspan="2")
+        pre
+          | {{stdout}}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/web-dashboard/app/scripts/index.coffee
----------------------------------------------------------------------
diff --git a/flink-runtime-web/web-dashboard/app/scripts/index.coffee 
b/flink-runtime-web/web-dashboard/app/scripts/index.coffee
index e7a4829..aeb77bc 100644
--- a/flink-runtime-web/web-dashboard/app/scripts/index.coffee
+++ b/flink-runtime-web/web-dashboard/app/scripts/index.coffee
@@ -180,13 +180,27 @@ angular.module('flinkApp', ['ui.router', 'angularMoment'])
       views:
         main:
           templateUrl: "partials/taskmanager/taskmanager.html"
-          controller: 'SingleTaskManagerController'
 
   .state "single-manager.metrics",
     url: "/metrics"
     views:
       details:
         templateUrl: "partials/taskmanager/taskmanager.metrics.html"
+        controller: 'SingleTaskManagerController'
+
+  .state "single-manager.stdout",
+    url: "/stdout"
+    views:
+      details:
+        templateUrl: "partials/taskmanager/taskmanager.stdout.html"
+        controller: 'SingleTaskManagerStdoutController'
+
+  .state "single-manager.log",
+    url: "/log"
+    views:
+      details:
+        templateUrl: "partials/taskmanager/taskmanager.log.html"
+        controller: 'SingleTaskManagerLogsController'
 
   .state "jobmanager",
       url: "/jobmanager"

http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/web-dashboard/app/scripts/modules/taskmanager/taskmanager.ctrl.coffee
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/web-dashboard/app/scripts/modules/taskmanager/taskmanager.ctrl.coffee
 
b/flink-runtime-web/web-dashboard/app/scripts/modules/taskmanager/taskmanager.ctrl.coffee
index 16adab6..a83a444 100644
--- 
a/flink-runtime-web/web-dashboard/app/scripts/modules/taskmanager/taskmanager.ctrl.coffee
+++ 
b/flink-runtime-web/web-dashboard/app/scripts/modules/taskmanager/taskmanager.ctrl.coffee
@@ -43,3 +43,26 @@ angular.module('flinkApp')
     $scope.$on '$destroy', ->
       $interval.cancel(refresh)
 
+.controller 'SingleTaskManagerLogsController', ($scope, $stateParams, 
SingleTaskManagerService, $interval, flinkConfig) ->
+  $scope.log = {}
+  SingleTaskManagerService.loadLogs($stateParams.taskmanagerid).then (data) ->
+    $scope.log = data
+
+  $scope.reloadData = () ->
+    SingleTaskManagerService.loadLogs($stateParams.taskmanagerid).then (data) 
->
+      $scope.log = data
+
+  $scope.downloadData = () ->
+    window.location.href = "/taskmanagers/" + ($stateParams.taskmanagerid) + 
"/log"
+
+.controller 'SingleTaskManagerStdoutController', ($scope, $stateParams, 
SingleTaskManagerService, $interval, flinkConfig) ->
+  $scope.stdout = {}
+  SingleTaskManagerService.loadStdout($stateParams.taskmanagerid).then (data) 
->
+    $scope.stdout = data
+
+  $scope.reloadData = () ->
+    SingleTaskManagerService.loadStdout($stateParams.taskmanagerid).then 
(data) ->
+      $scope.stdout = data
+
+  $scope.downloadData = () ->
+    window.location.href = "/taskmanagers/" + ($stateParams.taskmanagerid) + 
"/stdout"

http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/web-dashboard/app/scripts/modules/taskmanager/taskmanager.svc.coffee
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/web-dashboard/app/scripts/modules/taskmanager/taskmanager.svc.coffee
 
b/flink-runtime-web/web-dashboard/app/scripts/modules/taskmanager/taskmanager.svc.coffee
index fe39f57..ec8bcc0 100644
--- 
a/flink-runtime-web/web-dashboard/app/scripts/modules/taskmanager/taskmanager.svc.coffee
+++ 
b/flink-runtime-web/web-dashboard/app/scripts/modules/taskmanager/taskmanager.svc.coffee
@@ -34,11 +34,29 @@ angular.module('flinkApp')
   @loadMetrics = (taskmanagerid) ->
     deferred = $q.defer()
 
-    $http.get(flinkConfig.jobServer + "taskmanagers/" + taskmanagerid)
+    $http.get(flinkConfig.jobServer + "taskmanagers/" + taskmanagerid + 
"/metrics")
     .success (data, status, headers, config) ->
       deferred.resolve(data['taskmanagers'])
 
     deferred.promise
 
+  @loadLogs = (taskmanagerid) ->
+    deferred = $q.defer()
+
+    $http.get(flinkConfig.jobServer + "taskmanagers/" + taskmanagerid + "/log")
+    .success (data, status, headers, config) ->
+      deferred.resolve(data)
+
+    deferred.promise
+
+  @loadStdout = (taskmanagerid) ->
+    deferred = $q.defer()
+
+    $http.get(flinkConfig.jobServer + "taskmanagers/" + taskmanagerid + 
"/stdout")
+    .success (data, status, headers, config) ->
+      deferred.resolve(data)
+
+    deferred.promise
+
   @
 

Reply via email to