[FLINK-2732] Display TM logs in Dashboard This closes #1790
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d53bbc4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d53bbc4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d53bbc4 Branch: refs/heads/master Commit: 6d53bbc4b92e651786ecc8c2c6dfeb8e450a16a3 Parents: 5f993c6 Author: zentol <ches...@apache.org> Authored: Tue Apr 5 17:29:55 2016 +0200 Committer: zentol <ches...@apache.org> Committed: Mon Apr 18 11:20:08 2016 +0200 ---------------------------------------------------------------------- .../flink/configuration/ConfigConstants.java | 5 + .../webmonitor/RuntimeMonitorHandler.java | 55 +--- .../webmonitor/RuntimeMonitorHandlerBase.java | 95 +++++++ .../runtime/webmonitor/WebRuntimeMonitor.java | 19 +- .../handlers/TaskManagerLogHandler.java | 281 +++++++++++++++++++ .../app/partials/taskmanager/taskmanager.jade | 8 +- .../partials/taskmanager/taskmanager.log.jade | 37 +++ .../taskmanager/taskmanager.stdout.jade | 37 +++ .../web-dashboard/app/scripts/index.coffee | 16 +- .../modules/taskmanager/taskmanager.ctrl.coffee | 23 ++ .../modules/taskmanager/taskmanager.svc.coffee | 20 +- flink-runtime-web/web-dashboard/web/js/index.js | 68 ++++- .../web/partials/taskmanager/taskmanager.html | 6 +- .../partials/taskmanager/taskmanager.log.html | 39 +++ .../taskmanager/taskmanager.stdout.html | 39 +++ .../apache/flink/runtime/blob/BlobCache.java | 16 ++ .../apache/flink/runtime/blob/BlobServer.java | 6 + .../apache/flink/runtime/blob/BlobService.java | 2 + .../runtime/messages/TaskManagerMessages.scala | 29 ++ .../flink/runtime/taskmanager/TaskManager.scala | 45 ++- .../apache/flink/test/util/TestBaseUtils.java | 2 + .../flink/test/web/WebFrontendITCase.java | 29 ++ 22 files changed, 812 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 53d9a37..6b3974b 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -161,6 +161,11 @@ public final class ConfigConstants { public static final String TASK_MANAGER_TMP_DIR_KEY = "taskmanager.tmp.dirs"; /** + * The config parameter defining the taskmanager log file location + */ + public static final String TASK_MANAGER_LOG_PATH_KEY = "taskmanager.log.path"; + + /** * The config parameter defining the amount of memory to be allocated by the task manager's * memory manager (in megabytes). If not set, a relative fraction will be allocated, as defined * by {@link #TASK_MANAGER_MEMORY_FRACTION_KEY}. http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java index c304abb..36fd83a 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java @@ -22,25 +22,19 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.HttpHeaders; -import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.router.KeepAliveWrite; import io.netty.handler.codec.http.router.Routed; import org.apache.flink.runtime.instance.ActorGateway; -import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils; import org.apache.flink.runtime.webmonitor.handlers.RequestHandler; import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Option; -import scala.Tuple2; -import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -58,7 +52,7 @@ import static com.google.common.base.Preconditions.checkNotNull; * proper codes, like OK, NOT_FOUND, or SERVER_ERROR. */ @ChannelHandler.Sharable -public class RuntimeMonitorHandler extends SimpleChannelInboundHandler<Routed> { +public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase { private static final Logger LOG = LoggerFactory.getLogger(RuntimeMonitorHandler.class); @@ -66,60 +60,19 @@ public class RuntimeMonitorHandler extends SimpleChannelInboundHandler<Routed> { public static final String WEB_MONITOR_ADDRESS_KEY = "web.monitor.address"; - - private final RequestHandler handler; - - private final JobManagerRetriever retriever; - - private final Future<String> localJobManagerAddressFuture; - - private final FiniteDuration timeout; - - private String localJobManagerAddress; - + private final RequestHandler handler; public RuntimeMonitorHandler( RequestHandler handler, JobManagerRetriever retriever, Future<String> localJobManagerAddressFuture, FiniteDuration timeout) { - + super(retriever, localJobManagerAddressFuture, timeout); this.handler = checkNotNull(handler); - this.retriever = checkNotNull(retriever); - this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture); - this.timeout = checkNotNull(timeout); } @Override - protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception { - if (localJobManagerAddressFuture.isCompleted()) { - if (localJobManagerAddress == null) { - localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout); - } - - Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort(); - - if (jobManager.isDefined()) { - Tuple2<ActorGateway, Integer> gatewayPort = jobManager.get(); - String redirectAddress = HandlerRedirectUtils.getRedirectAddress( - localJobManagerAddress, gatewayPort); - - if (redirectAddress != null) { - HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path()); - KeepAliveWrite.flush(ctx, routed.request(), redirect); - } - else { - respondAsLeader(ctx, routed, gatewayPort._1()); - } - } else { - KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse()); - } - } else { - KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse()); - } - } - - private void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager) { + protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager) { DefaultFullHttpResponse response; try { http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java new file mode 100644 index 0000000..7a38de4 --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandlerBase.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor; + +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.router.KeepAliveWrite; +import io.netty.handler.codec.http.router.Routed; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils; +import org.apache.flink.runtime.webmonitor.handlers.RequestHandler; +import scala.Option; +import scala.Tuple2; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * The Netty channel handler that processes all HTTP requests. + * This handler takes the path parameters and delegates the work to a {@link RequestHandler}. + * This handler also deals with setting correct response MIME types and returning + * proper codes, like OK, NOT_FOUND, or SERVER_ERROR. + */ +@ChannelHandler.Sharable +public abstract class RuntimeMonitorHandlerBase extends SimpleChannelInboundHandler<Routed> { + + private final JobManagerRetriever retriever; + + protected final Future<String> localJobManagerAddressFuture; + + protected final FiniteDuration timeout; + + protected String localJobManagerAddress; + + public RuntimeMonitorHandlerBase( + JobManagerRetriever retriever, + Future<String> localJobManagerAddressFuture, + FiniteDuration timeout) { + + this.retriever = checkNotNull(retriever); + this.localJobManagerAddressFuture = checkNotNull(localJobManagerAddressFuture); + this.timeout = checkNotNull(timeout); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception { + if (localJobManagerAddressFuture.isCompleted()) { + if (localJobManagerAddress == null) { + localJobManagerAddress = Await.result(localJobManagerAddressFuture, timeout); + } + + Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort(); + + if (jobManager.isDefined()) { + Tuple2<ActorGateway, Integer> gatewayPort = jobManager.get(); + String redirectAddress = HandlerRedirectUtils.getRedirectAddress( + localJobManagerAddress, gatewayPort); + + if (redirectAddress != null) { + HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path()); + KeepAliveWrite.flush(ctx, routed.request(), redirect); + } + else { + respondAsLeader(ctx, routed, gatewayPort._1()); + } + } else { + KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse()); + } + } else { + KeepAliveWrite.flush(ctx, routed.request(), HandlerRedirectUtils.getUnavailableResponse()); + } + } + + protected abstract void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager); +} http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index 582004a..b583ca5 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -65,9 +65,12 @@ import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptAccum import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptDetailsHandler; import org.apache.flink.runtime.webmonitor.handlers.SubtasksAllAccumulatorsHandler; import org.apache.flink.runtime.webmonitor.handlers.SubtasksTimesHandler; +import org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler; import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.concurrent.ExecutionContext$; +import scala.concurrent.ExecutionContextExecutor; import scala.concurrent.Promise; import scala.concurrent.duration.FiniteDuration; @@ -75,6 +78,8 @@ import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -126,6 +131,8 @@ public class WebRuntimeMonitor implements WebMonitor { private AtomicBoolean cleanedUp = new AtomicBoolean(); + private ExecutorService executorService; + public WebRuntimeMonitor( Configuration config, LeaderRetrievalService leaderRetrievalService, @@ -193,6 +200,10 @@ public class WebRuntimeMonitor implements WebMonitor { // -------------------------------------------------------------------- + executorService = new ForkJoinPool(); + + ExecutionContextExecutor context = ExecutionContext$.MODULE$.fromExecutor(executorService); + router = new Router() // config how to interact with this web server .GET("/config", handler(new DashboardConfigHandler(cfg.getRefreshInterval()))) @@ -234,7 +245,11 @@ public class WebRuntimeMonitor implements WebMonitor { .GET("/jobs/:jobid/checkpoints", handler(new JobCheckpointsHandler(currentGraphs))) .GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT))) - .GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT))) + .GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/metrics", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT))) + .GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/log", + new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout, TaskManagerLogHandler.FileMode.LOG, config)) + .GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY + "/stdout", + new TaskManagerLogHandler(retriever, context, jobManagerAddressPromise.future(), timeout, TaskManagerLogHandler.FileMode.STDOUT, config)) // log and stdout .GET("/jobmanager/log", logFiles.logFile == null ? new ConstantTextHandler("(log file unavailable)") : @@ -377,6 +392,8 @@ public class WebRuntimeMonitor implements WebMonitor { backPressureStatsTracker.shutDown(); + executorService.shutdownNow(); + cleanup(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java new file mode 100644 index 0000000..38957dc --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +/***************************************************************************** + * This code is based on the "HttpStaticFileServerHandler" from the + * Netty project's HTTP server example. + * + * See http://netty.io and + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java + *****************************************************************************/ + +import akka.dispatch.Mapper; +import akka.dispatch.OnFailure; +import akka.dispatch.OnSuccess; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultFileRegion; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Routed; +import io.netty.util.concurrent.GenericFutureListener; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobCache; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.messages.TaskManagerMessages; +import org.apache.flink.runtime.webmonitor.JobManagerRetriever; +import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase; +import org.apache.flink.util.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Tuple2; +import scala.concurrent.ExecutionContextExecutor; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.net.InetSocketAddress; +import java.nio.channels.FileChannel; +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; + +import static com.google.common.base.Preconditions.checkNotNull; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * Request handler that returns the TaskManager log/out files. + * + * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server + * example.</p> + */ +@ChannelHandler.Sharable +public class TaskManagerLogHandler extends RuntimeMonitorHandlerBase { + private static final Logger LOG = LoggerFactory.getLogger(TaskManagerLogHandler.class); + + /** Keep track of last transmitted log, to clean up old ones */ + private final HashMap<String, BlobKey> lastSubmittedLog = new HashMap<>(); + private final HashMap<String, BlobKey> lastSubmittedStdout = new HashMap<>(); + + /** Keep track of request status, prevents multiple log requests for a single TM running concurrently */ + private final ConcurrentHashMap<String, Boolean> lastRequestPending = new ConcurrentHashMap<>(); + private final Configuration config; + + /** */ + private Future<BlobCache> cache; + + /** Indicates which log file should be displayed; true indicates .log, false indicates .out */ + private boolean serveLogFile; + + private final ExecutionContextExecutor executor; + + public enum FileMode { + LOG, + STDOUT + } + + public TaskManagerLogHandler( + JobManagerRetriever retriever, + ExecutionContextExecutor executor, + Future<String> localJobManagerAddressPromise, + FiniteDuration timeout, + FileMode fileMode, + Configuration config) throws IOException { + super(retriever, localJobManagerAddressPromise, timeout); + + this.executor = checkNotNull(executor); + this.config = config; + switch (fileMode) { + case LOG: + serveLogFile = true; + break; + case STDOUT: + serveLogFile = false; + break; + } + } + + /** + * Response when running with leading JobManager. + */ + @Override + protected void respondAsLeader(final ChannelHandlerContext ctx, final Routed routed, final ActorGateway jobManager) { + if (cache == null) { + Future<Object> portFuture = jobManager.ask(JobManagerMessages.getRequestBlobManagerPort(), timeout); + cache = portFuture.map(new Mapper<Object, BlobCache>() { + @Override + public BlobCache apply(Object result) { + Option<String> hostOption = jobManager.actor().path().address().host(); + String host = hostOption.isDefined() ? hostOption.get() : "localhost"; + int port = (int) result; + return new BlobCache(new InetSocketAddress(host, port), config); + } + }, executor); + } + + final String taskManagerID = routed.pathParams().get(TaskManagersHandler.TASK_MANAGER_ID_KEY); + final HttpRequest request = routed.request(); + + //fetch TaskManager logs if no other process is currently doing it + if (lastRequestPending.putIfAbsent(taskManagerID, true) == null) { + try { + InstanceID instanceID = new InstanceID(StringUtils.hexStringToByte(taskManagerID)); + Future<Object> taskManagerFuture = jobManager.ask(new JobManagerMessages.RequestTaskManagerInstance(instanceID), timeout); + + Future<Object> blobKeyFuture = taskManagerFuture.flatMap(new Mapper<Object, Future<Object>>() { + @Override + public Future<Object> apply(Object instance) { + Instance taskManager = ((JobManagerMessages.TaskManagerInstance) instance).instance().get(); + return taskManager.getActorGateway().ask(serveLogFile ? TaskManagerMessages.getRequestTaskManagerLog() : TaskManagerMessages.getRequestTaskManagerStdout(), timeout); + } + }, executor); + + Future<Object> logPathFuture = cache.zip(blobKeyFuture).map(new Mapper<Tuple2<BlobCache, Object>, Object>() { + @Override + public Object checkedApply(Tuple2<BlobCache, Object> instance) throws Exception { + BlobCache cache = instance._1(); + if (instance._2() instanceof Exception) { + throw (Exception) instance._2(); + } + BlobKey blobKey = (BlobKey) instance._2(); + + //delete previous log file, if it is different than the current one + HashMap<String, BlobKey> lastSubmittedFile = serveLogFile ? lastSubmittedLog : lastSubmittedStdout; + if (lastSubmittedFile.containsKey(taskManagerID)) { + if (!blobKey.equals(lastSubmittedFile.get(taskManagerID))) { + cache.deleteGlobal(lastSubmittedFile.get(taskManagerID)); + lastSubmittedFile.put(taskManagerID, blobKey); + } + } else { + lastSubmittedFile.put(taskManagerID, blobKey); + } + return cache.getURL(blobKey).getFile(); + } + }, executor); + + logPathFuture.onFailure(new OnFailure() { + @Override + public void onFailure(Throwable failure) throws Throwable { + display(ctx, request, "Fetching TaskManager log failed."); + LOG.error("Fetching TaskManager log failed.", failure); + lastRequestPending.remove(taskManagerID); + } + }, executor); + + logPathFuture.onSuccess(new OnSuccess<Object>() { + @Override + public void onSuccess(Object filePathOption) throws Throwable { + String filePath = (String) filePathOption; + + File file = new File(filePath); + final RandomAccessFile raf; + try { + raf = new RandomAccessFile(file, "r"); + } catch (FileNotFoundException e) { + display(ctx, request, "Displaying TaskManager log failed."); + LOG.error("Displaying TaskManager log failed.", e); + return; + } + long fileLength = raf.length(); + final FileChannel fc = raf.getChannel(); + + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + response.headers().set(CONTENT_TYPE, "text/plain"); + + if (HttpHeaders.isKeepAlive(request)) { + response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + } + HttpHeaders.setContentLength(response, fileLength); + + // write the initial line and the header. + ctx.write(response); + + // write the content. + ctx.write(new DefaultFileRegion(fc, 0, fileLength), ctx.newProgressivePromise()) + .addListener(new GenericFutureListener<io.netty.util.concurrent.Future<? super Void>>() { + @Override + public void operationComplete(io.netty.util.concurrent.Future<? super Void> future) throws Exception { + lastRequestPending.remove(taskManagerID); + fc.close(); + raf.close(); + } + }); + ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + + // close the connection, if no keep-alive is needed + if (!HttpHeaders.isKeepAlive(request)) { + lastContentFuture.addListener(ChannelFutureListener.CLOSE); + } + } + }, executor); + } catch (Exception e) { + display(ctx, request, "Error: " + e.getMessage()); + LOG.error("Fetching TaskManager log failed.", e); + lastRequestPending.remove(taskManagerID); + } + } else { + display(ctx, request, "loading..."); + } + } + + private void display(ChannelHandlerContext ctx, HttpRequest request, String message) { + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + response.headers().set(CONTENT_TYPE, "text/plain"); + + if (HttpHeaders.isKeepAlive(request)) { + response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + } + + byte[] buf = message.getBytes(); + + ByteBuf b = Unpooled.copiedBuffer(buf); + + HttpHeaders.setContentLength(response, buf.length); + + // write the initial line and the header. + ctx.write(response); + + ctx.write(b); + + ChannelFuture lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + + // close the connection, if no keep-alive is needed + if (!HttpHeaders.isKeepAlive(request)) { + lastContentFuture.addListener(ChannelFutureListener.CLOSE); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.jade ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.jade b/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.jade index 23809de..5223d17 100644 --- a/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.jade +++ b/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.jade @@ -15,7 +15,7 @@ See the License for the specific language governing permissions and limitations under the License. -nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="metrics.id") +nav.navbar.navbar-default.navbar-fixed-top.navbar-main #fold-button.btn.btn-default.navbar-btn.pull-left(ng-click='showSidebar()') i.fa.fa-navicon @@ -26,10 +26,14 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="metrics.id") .navbar-info.last.first.hidden-xs.hidden-sm | {{metrics.path}} -nav.navbar.navbar-default.navbar-fixed-top.navbar-main-additional(ng-if="metrics.id") +nav.navbar.navbar-default.navbar-fixed-top.navbar-main-additional ul.nav.nav-tabs li(ui-sref-active='active') a(ui-sref=".metrics") Metrics + li(ui-sref-active='active') + a(ui-sref=".log") Logs + li(ui-sref-active='active') + a(ui-sref=".stdout") Stdout #content-inner.has-navbar-main-additional div(ui-view="details") http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.log.jade ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.log.jade b/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.log.jade new file mode 100644 index 0000000..7a253a4 --- /dev/null +++ b/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.log.jade @@ -0,0 +1,37 @@ +// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +table.table.table-properties + thead + tr + th(colspan="2") + .row + .col-xs-10 + | Task Manager Logs + .col-xs-1.text-right + a(ng-click="reloadData()" class="show-pointer") + i.fa.fa-refresh + .col-xs-1.text-left + a(ng-click="downloadData()" class="show-pointer") + i.fa.fa-download + + tbody + tr + td(colspan="2") + pre + | {{log}} + http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.stdout.jade ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.stdout.jade b/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.stdout.jade new file mode 100644 index 0000000..438a2e8 --- /dev/null +++ b/flink-runtime-web/web-dashboard/app/partials/taskmanager/taskmanager.stdout.jade @@ -0,0 +1,37 @@ +// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +table.table.table-properties + thead + tr + th(colspan="2") + .row + .col-xs-10 + | Task Manager Output + .col-xs-1.text-right + a(ng-click="reloadData()" class="show-pointer") + i.fa.fa-refresh + .col-xs-1.text-left + a(ng-click="downloadData()" class="show-pointer") + i.fa.fa-download + + tbody + tr + td(colspan="2") + pre + | {{stdout}} + http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/web-dashboard/app/scripts/index.coffee ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/scripts/index.coffee b/flink-runtime-web/web-dashboard/app/scripts/index.coffee index e7a4829..aeb77bc 100644 --- a/flink-runtime-web/web-dashboard/app/scripts/index.coffee +++ b/flink-runtime-web/web-dashboard/app/scripts/index.coffee @@ -180,13 +180,27 @@ angular.module('flinkApp', ['ui.router', 'angularMoment']) views: main: templateUrl: "partials/taskmanager/taskmanager.html" - controller: 'SingleTaskManagerController' .state "single-manager.metrics", url: "/metrics" views: details: templateUrl: "partials/taskmanager/taskmanager.metrics.html" + controller: 'SingleTaskManagerController' + + .state "single-manager.stdout", + url: "/stdout" + views: + details: + templateUrl: "partials/taskmanager/taskmanager.stdout.html" + controller: 'SingleTaskManagerStdoutController' + + .state "single-manager.log", + url: "/log" + views: + details: + templateUrl: "partials/taskmanager/taskmanager.log.html" + controller: 'SingleTaskManagerLogsController' .state "jobmanager", url: "/jobmanager" http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/web-dashboard/app/scripts/modules/taskmanager/taskmanager.ctrl.coffee ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/taskmanager/taskmanager.ctrl.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/taskmanager/taskmanager.ctrl.coffee index 16adab6..a83a444 100644 --- a/flink-runtime-web/web-dashboard/app/scripts/modules/taskmanager/taskmanager.ctrl.coffee +++ b/flink-runtime-web/web-dashboard/app/scripts/modules/taskmanager/taskmanager.ctrl.coffee @@ -43,3 +43,26 @@ angular.module('flinkApp') $scope.$on '$destroy', -> $interval.cancel(refresh) +.controller 'SingleTaskManagerLogsController', ($scope, $stateParams, SingleTaskManagerService, $interval, flinkConfig) -> + $scope.log = {} + SingleTaskManagerService.loadLogs($stateParams.taskmanagerid).then (data) -> + $scope.log = data + + $scope.reloadData = () -> + SingleTaskManagerService.loadLogs($stateParams.taskmanagerid).then (data) -> + $scope.log = data + + $scope.downloadData = () -> + window.location.href = "/taskmanagers/" + ($stateParams.taskmanagerid) + "/log" + +.controller 'SingleTaskManagerStdoutController', ($scope, $stateParams, SingleTaskManagerService, $interval, flinkConfig) -> + $scope.stdout = {} + SingleTaskManagerService.loadStdout($stateParams.taskmanagerid).then (data) -> + $scope.stdout = data + + $scope.reloadData = () -> + SingleTaskManagerService.loadStdout($stateParams.taskmanagerid).then (data) -> + $scope.stdout = data + + $scope.downloadData = () -> + window.location.href = "/taskmanagers/" + ($stateParams.taskmanagerid) + "/stdout" http://git-wip-us.apache.org/repos/asf/flink/blob/6d53bbc4/flink-runtime-web/web-dashboard/app/scripts/modules/taskmanager/taskmanager.svc.coffee ---------------------------------------------------------------------- diff --git a/flink-runtime-web/web-dashboard/app/scripts/modules/taskmanager/taskmanager.svc.coffee b/flink-runtime-web/web-dashboard/app/scripts/modules/taskmanager/taskmanager.svc.coffee index fe39f57..ec8bcc0 100644 --- a/flink-runtime-web/web-dashboard/app/scripts/modules/taskmanager/taskmanager.svc.coffee +++ b/flink-runtime-web/web-dashboard/app/scripts/modules/taskmanager/taskmanager.svc.coffee @@ -34,11 +34,29 @@ angular.module('flinkApp') @loadMetrics = (taskmanagerid) -> deferred = $q.defer() - $http.get(flinkConfig.jobServer + "taskmanagers/" + taskmanagerid) + $http.get(flinkConfig.jobServer + "taskmanagers/" + taskmanagerid + "/metrics") .success (data, status, headers, config) -> deferred.resolve(data['taskmanagers']) deferred.promise + @loadLogs = (taskmanagerid) -> + deferred = $q.defer() + + $http.get(flinkConfig.jobServer + "taskmanagers/" + taskmanagerid + "/log") + .success (data, status, headers, config) -> + deferred.resolve(data) + + deferred.promise + + @loadStdout = (taskmanagerid) -> + deferred = $q.defer() + + $http.get(flinkConfig.jobServer + "taskmanagers/" + taskmanagerid + "/stdout") + .success (data, status, headers, config) -> + deferred.resolve(data) + + deferred.promise + @