[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3460 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r106949177 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.history.FsJobArchivist; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * This class is used by the {@link HistoryServer} to fetch the job archives that are located at + * {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS}. The directories are polled in regular intervals, defined + * by {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL}. + * + * The archives are downloaded and expanded into a file structure analog to the REST API defined in the WebRuntimeMonitor. + */ +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r106948899 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.history; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.jobmanager.JobManagerOptions; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; + +/** + * Utility class for writing an archive file to a {@link FileSystem} and reading it back. + */ +public class FsJobArchivist { + + private static final Logger LOG = LoggerFactory.getLogger(FsJobArchivist.class); + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final String ARCHIVE = "archive"; + private static final String PATH = "path"; + private static final String JSON = "json"; + + private FsJobArchivist() { + } + + /** +* Writes the given {@link AccessExecutionGraph} to the {@link FileSystem} pointed to by {@link JobManagerOptions#ARCHIVE_DIR}. +* +* @param rootPath directory to which the archive should be written to +* @param graph graph to archive +* @return path to where the archive was written, or null if no archive was created +* @throws IOException +*/ + public static Path archiveJob(Path rootPath, AccessExecutionGraph graph) throws IOException { + try { + FileSystem fs = rootPath.getFileSystem(); + Path path = new Path(rootPath, graph.getJobID().toString()); + OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE); + + try (JsonGenerator gen = jacksonFactory.createGenerator(out, JsonEncoding.UTF8)) { + gen.writeStartObject(); + gen.writeArrayFieldStart(ARCHIVE); + for (JsonArchivist archiver : WebMonitorUtils.getJsonArchivists()) { + for (ArchivedJson archive : archiver.archiveJsonWithPath(graph)) { + gen.writeStartObject(); + gen.writeStringField(PATH, archive.getPath()); + gen.writeStringField(JSON, archive.getJson()); + gen.writeEndObject(); + } + } + gen.writeEndArray(); + gen.writeEndObject(); + } catch (Exception e) { + fs.delete(path, false); + throw e; + } + LOG.info("Job {} has been archived at {}.", graph.getJobID(), path); + return path; + } catch (IOException e) { +
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r106948356 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.history.FsJobArchivist; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * This class is used by the {@link HistoryServer} to fetch the job archives that are located at + * {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS}. The directories are polled in regular intervals, defined + * by {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL}. + * + * The archives are downloaded and expanded into a file structure analog to the REST API defined in the WebRuntimeMonitor. + */ +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_ARCHIVE_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r106920467 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.history; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.jobmanager.JobManagerOptions; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.OperatingSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; + +/** + * Utility class for writing an archive file to a {@link FileSystem} and reading it back. + */ +public class FsJobArchivist { + + private static final Logger LOG = LoggerFactory.getLogger(FsJobArchivist.class); + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final String ARCHIVE = "archive"; + private static final String PATH = "path"; + private static final String JSON = "json"; + + private FsJobArchivist() { + } + + /** +* Writes the given {@link AccessExecutionGraph} to the {@link FileSystem} pointed to by {@link JobManagerOptions#ARCHIVE_DIR}. +* +* @param rootPath directory to which the archive should be written to +* @param graph graph to archive +* @return path to where the archive was written, or null if no archive was created +* @throws IOException +*/ + public static Path archiveJob(Path rootPath, AccessExecutionGraph graph) throws IOException { + try { + FileSystem fs = rootPath.getFileSystem(); + Path path = new Path(rootPath, graph.getJobID().toString()); + OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE); + + try (JsonGenerator gen = jacksonFactory.createGenerator(out, JsonEncoding.UTF8)) { + gen.writeStartObject(); + gen.writeArrayFieldStart(ARCHIVE); + for (JsonArchivist archiver : WebMonitorUtils.getJsonArchivists()) { + for (ArchivedJson archive : archiver.archiveJsonWithPath(graph)) { + gen.writeStartObject(); + gen.writeStringField(PATH, archive.getPath()); + gen.writeStringField(JSON, archive.getJson()); + gen.writeEndObject(); + } + } + gen.writeEndArray(); + gen.writeEndObject(); + } catch (Exception e) { + fs.delete(path, false); + throw e; + } + LOG.info("Job {} has been archived at {}.", graph.getJobID(), path); + return path; + } catch (IOException e) { + LOG.error("Failed to archive job.", e); +
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r106907731 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * The set of configuration options relating to the HistoryServer. + */ +@PublicEvolving +public class HistoryServerOptions { + + /** +* The interval at which the HistoryServer polls {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for new archives. +*/ + public static final ConfigOption HISTORY_SERVER_REFRESH_INTERVAL = + key("historyserver.archive.fs.refresh-interval") + .defaultValue(1L); + + /** +* Comma-separated list of directories which the HistoryServer polls for new archives. +*/ + public static final ConfigOption HISTORY_SERVER_DIRS = + key("historyserver.archive.fs.dirs") --- End diff -- I would keep the default as dirs to make it clear that we accept multiple directories. To avoid the misconfiguration we could add a deprecated key "historyserver.archive.fs.dir". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r106902557 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * The set of configuration options relating to the HistoryServer. + */ +@PublicEvolving +public class HistoryServerOptions { + + /** +* The interval at which the HistoryServer polls {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for new archives. +*/ + public static final ConfigOption HISTORY_SERVER_REFRESH_INTERVAL = + key("historyserver.archive.fs.refresh-interval") + .defaultValue(1L); + + /** +* Comma-separated list of directories which the HistoryServer polls for new archives. +*/ + public static final ConfigOption HISTORY_SERVER_DIRS = + key("historyserver.archive.fs.dirs") --- End diff -- One last thought here: What do you think about having the same suffix `dirs` or `dir` for both `jobmanager.archive.fs` and `historyserver.archive.fs` for the sake of consistency? I know that the HS accepts multiple dirs and the JM only one, but it might help prevent typos etc. when configuring the history server. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r106900975 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.history.FsJobArchivist; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * This class is used by the {@link HistoryServer} to fetch the job archives that are located at + * {@link HistoryServerOptions#HISTORY_SERVER_DIRS}. The directories are polled in regular intervals, defined + * by {@link HistoryServerOptions#HISTORY_SERVER_REFRESH_INTERVAL}. + * + * The archives are first copied into a temporary directory in {@link HistoryServerOptions#HISTORY_SERVER_WEB_DIR} and + * then expanded. The resulting file structure is analog to the REST API defined in the WebRuntimeMonitor. + */ +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +*
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r106900609 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.history.FsJobArchivist; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The HistoryServer provides a WebInterface and REST API to retrieve information about finished jobs for which + * the JobManager may have already shut down. + * + * The HistoryServer regularly checks a set of directories for job archives created by the {@link FsJobArchivist} and + * caches these in a local directory. See {@link HistoryServerArchiveFetcher}. + * + * All configuration options are defined in{@link HistoryServerOptions}. + * + * The WebInterface only displays the "Completed Jobs" page. + * + * The REST API is limited to + * + * /config + * /joboverview + * /jobs/:jobid/* + * + * and relies on static files that are served by the {@link HistoryServerStaticFileServerHandler}. + */ +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final HistoryServerArchiveFetcher archiveFetcher; + + private final SSLContext serverSSLContext; + private WebFrontendBootstrap netty; + + private final Object startupShutdownLock = new Object(); + private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); + private final Thread shutdownHook; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + final Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + // run the history server + SecurityUtils.install(new SecurityUtils.SecurityConfiguration(flinkConfig)); + + try { + SecurityUtils.getInstalledContext().runSecured(new Callable() { + @Override + public Integer call() throws Exception { + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + return 0;
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r106900349 --- Diff: docs/monitoring/rest_api.md --- @@ -22,36 +22,69 @@ specific language governing permissions and limitations under the License. --> -Flink has a monitoring API that can be used to query status and statistics of running jobs, as well as recent completed jobs. +Flink has a monitoring API that can be used to query the status and statistics of running jobs, as well as recent completed jobs. This monitoring API is used by Flink's own dashboard, but is designed to be used also by custom monitoring tools. The monitoring API is a REST-ful API that accepts HTTP GET requests and responds with JSON data. * This will be replaced by the TOC {:toc} - ## Overview -The monitoring API is backed by a web server that runs as part of the *JobManager*. By default, this server listens at post `8081`, which can be configured in `flink-conf.yaml` via `jobmanager.web.port`. Note that the monitoring API web server and the web dashboard web server are currently the same and thus run together at the same port. They respond to different HTTP URLs, though. + + + +The jobManager monitoring API allows you to query the status and statistics of running jobs, as well as recent completed jobs. + +By default, this server binds to `localhost`` and listens at post `8081`, which can be configured in `flink-conf.yaml` via `jobmanager.web.address` and `jobmanager.web.port`. In the case of multiple JobManagers (for high availability), each JobManager will run its own instance of the monitoring API, which offers information about completed and running job while that JobManager was elected the cluster leader. + + + --- End diff -- I think it's OK to keep it here as more detailed documentation, but I would vote to add a custom page which lists the relevant configuration options on a single page. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r106900686 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.history.FsJobArchivist; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The HistoryServer provides a WebInterface and REST API to retrieve information about finished jobs for which + * the JobManager may have already shut down. + * + * The HistoryServer regularly checks a set of directories for job archives created by the {@link FsJobArchivist} and + * caches these in a local directory. See {@link HistoryServerArchiveFetcher}. + * + * All configuration options are defined in{@link HistoryServerOptions}. + * + * The WebInterface only displays the "Completed Jobs" page. + * + * The REST API is limited to + * + * /config + * /joboverview + * /jobs/:jobid/* + * + * and relies on static files that are served by the {@link HistoryServerStaticFileServerHandler}. + */ +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final HistoryServerArchiveFetcher archiveFetcher; + + private final SSLContext serverSSLContext; + private WebFrontendBootstrap netty; + + private final Object startupShutdownLock = new Object(); + private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); + private final Thread shutdownHook; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + final Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + // run the history server + SecurityUtils.install(new SecurityUtils.SecurityConfiguration(flinkConfig)); + + try { + SecurityUtils.getInstalledContext().runSecured(new Callable() { + @Override + public Integer call() throws Exception { + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + return 0;
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r106901748 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/history/FsJobArchivist.java --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.history; + +import com.fasterxml.jackson.core.JsonEncoding; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.jobmanager.JobManagerOptions; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.util.OperatingSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; + +/** + * Utility class for writing an archive file to a {@link FileSystem} and reading it back. + */ +public class FsJobArchivist { + + private static final Logger LOG = LoggerFactory.getLogger(FsJobArchivist.class); + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final String ARCHIVE = "archive"; + private static final String PATH = "path"; + private static final String JSON = "json"; + + private FsJobArchivist() { + } + + /** +* Writes the given {@link AccessExecutionGraph} to the {@link FileSystem} pointed to by {@link JobManagerOptions#ARCHIVE_DIR}. +* +* @param rootPath directory to which the archive should be written to +* @param graph graph to archive +* @return path to where the archive was written, or null if no archive was created +* @throws IOException +*/ + public static Path archiveJob(Path rootPath, AccessExecutionGraph graph) throws IOException { + try { + FileSystem fs = rootPath.getFileSystem(); + Path path = new Path(rootPath, graph.getJobID().toString()); + OutputStream out = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE); + + try (JsonGenerator gen = jacksonFactory.createGenerator(out, JsonEncoding.UTF8)) { + gen.writeStartObject(); + gen.writeArrayFieldStart(ARCHIVE); + for (JsonArchivist archiver : WebMonitorUtils.getJsonArchivists()) { + for (ArchivedJson archive : archiver.archiveJsonWithPath(graph)) { + gen.writeStartObject(); + gen.writeStringField(PATH, archive.getPath()); + gen.writeStringField(JSON, archive.getJson()); + gen.writeEndObject(); + } + } + gen.writeEndArray(); + gen.writeEndObject(); + } catch (Exception e) { + fs.delete(path, false); + throw e; + } + LOG.info("Job {} has been archived at {}.", graph.getJobID(), path); + return path; + } catch (IOException e) { + LOG.error("Failed to archive job.", e); +
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r106903661 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.history.FsJobArchivist; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * This class is used by the {@link HistoryServer} to fetch the job archives that are located at + * {@link HistoryServerOptions#HISTORY_SERVER_DIRS}. The directories are polled in regular intervals, defined + * by {@link HistoryServerOptions#HISTORY_SERVER_REFRESH_INTERVAL}. + * + * The archives are first copied into a temporary directory in {@link HistoryServerOptions#HISTORY_SERVER_WEB_DIR} and + * then expanded. The resulting file structure is analog to the REST API defined in the WebRuntimeMonitor. + */ +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +*
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105943415 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives; + private final File webDir; + private final File webTmpDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + + JobArchiveFetcherTask(List refreshDirs, File webDir) {
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105937756 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import io.netty.channel.ChannelHandler; +import org.apache.flink.runtime.webmonitor.files.AbstractStaticFileServerHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +@ChannelHandler.Sharable +public class HistoryServerStaticFileServerHandler extends AbstractStaticFileServerHandler { --- End diff -- all right. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105934771 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/AbstractStaticFileServerHandler.java --- @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.files; + +/* + * This code is based on the "HttpStaticFileServerHandler" from the + * Netty project's HTTP server example. + * + * See http://netty.io and + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java + */ + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpChunkedInput; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Routed; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedFile; +import io.netty.util.CharsetUtil; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.net.URI; +import java.net.URL; +import java.nio.file.Files; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.Locale; +import java.util.TimeZone; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpHeaders.Names.DATE; +import static io.netty.handler.codec.http.HttpHeaders.Names.EXPIRES; +import static io.netty.handler.codec.http.HttpHeaders.Names.IF_MODIFIED_SINCE; +import static io.netty.handler.codec.http.HttpHeaders.Names.LAST_MODIFIED; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_MODIFIED; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * Simple file server handler that serves requests to web frontend's static files, such as + * HTML, CSS, or JS files. + * + * This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server + * example. + * + * For every incoming requests the {@link Routed#path()} is pre-processed in + * {@link AbstractStaticFileServerHandler#preProcessRequestPath(String)}. + * + * This path is then interpreted as a relative file path, with the configured rootDir being the parent. + * + * If no file exists for this path, another (optional) pre-processing step is executed in + * {@link
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105931282 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives; + private final File webDir; + private final File webTmpDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + + JobArchiveFetcherTask(List refreshDirs, File webDir) {
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105929511 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives; + private final File webDir; + private final File webTmpDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + + JobArchiveFetcherTask(List refreshDirs, File webDir) {
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105929296 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives; + private final File webDir; + private final File webTmpDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + + JobArchiveFetcherTask(List refreshDirs, File webDir) {
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105928423 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * The set of configuration options relating to the HistoryServer. + */ +@PublicEvolving +public class HistoryServerOptions { + + /** +* The interval at which the HistoryServer polls {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for new archives. +*/ + public static final ConfigOption HISTORY_SERVER_REFRESH_INTERVAL = + key("historyserver.refresh-interval") + .defaultValue(3000L); --- End diff -- I was under the impression that the `historyserver.web.refresh-interval` does not affect the actual web refresh interval specified in index.coffee, right? What I thought originally was that the fs refresh interval being smaller than the web refresh interval does not make sense if we only update the frontend every 10s. But I forgot that people also manually browse the pages ;-) I'm still in favour to increase as you say. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105927418 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * The set of configuration options relating to the HistoryServer. + */ +@PublicEvolving +public class HistoryServerOptions { + + /** +* The interval at which the HistoryServer polls {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for new archives. +*/ + public static final ConfigOption HISTORY_SERVER_REFRESH_INTERVAL = + key("historyserver.refresh-interval") + .defaultValue(3000L); + + /** +* Comma-separated list of directories which the HistoryServer polls for new archives. +*/ + public static final ConfigOption HISTORY_SERVER_DIRS = + key("historyserver.archive.dirs") + .noDefaultValue(); + + /** +* The local directory used by the HistoryServer web-frontend. +*/ + public static final ConfigOption HISTORY_SERVER_WEB_DIR = + key("historyserver.web.dir") + .noDefaultValue(); + + /** +* The address under which the HistoryServer web-frontend is accessible. +*/ + public static final ConfigOption HISTORY_SERVER_WEB_ADDRESS = + key("historyserver.web.address") + .noDefaultValue(); + + /** +* The port under which the HistoryServer web-frontend is accessible. +*/ + public static final ConfigOption HISTORY_SERVER_WEB_PORT = + key("historyserver.web.port") + .defaultValue(8082); + + /** +* The refresh interval for the HistoryServer web-frontend in milliseconds. +*/ + public static final ConfigOption HISTORY_SERVER_WEB_REFRESH_INTERVAL = --- End diff -- I think so. I don't know whether the configured value is respected though. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105924461 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives; + private final File webDir; + private final File webTmpDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + + JobArchiveFetcherTask(List refreshDirs, File webDir) {
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105926750 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * The set of configuration options relating to the HistoryServer. + */ +@PublicEvolving +public class HistoryServerOptions { + + /** +* The interval at which the HistoryServer polls {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for new archives. +*/ + public static final ConfigOption HISTORY_SERVER_REFRESH_INTERVAL = + key("historyserver.refresh-interval") + .defaultValue(3000L); --- End diff -- I will also rename the key to "historyserver.archive.fs.refresh-interval". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105926484 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * The set of configuration options relating to the HistoryServer. + */ +@PublicEvolving +public class HistoryServerOptions { + + /** +* The interval at which the HistoryServer polls {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for new archives. +*/ + public static final ConfigOption HISTORY_SERVER_REFRESH_INTERVAL = + key("historyserver.refresh-interval") + .defaultValue(3000L); + + /** +* Comma-separated list of directories which the HistoryServer polls for new archives. +*/ + public static final ConfigOption HISTORY_SERVER_DIRS = + key("historyserver.archive.dirs") + .noDefaultValue(); + + /** +* The local directory used by the HistoryServer web-frontend. +*/ + public static final ConfigOption HISTORY_SERVER_WEB_DIR = + key("historyserver.web.dir") --- End diff -- yes, it's also more consistent with the jobmanager options. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105923977 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives; + private final File webDir; + private final File webTmpDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + + JobArchiveFetcherTask(List refreshDirs, File webDir) {
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105926155 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * The set of configuration options relating to the HistoryServer. + */ +@PublicEvolving +public class HistoryServerOptions { + + /** +* The interval at which the HistoryServer polls {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for new archives. +*/ + public static final ConfigOption HISTORY_SERVER_REFRESH_INTERVAL = + key("historyserver.refresh-interval") + .defaultValue(3000L); --- End diff -- the corresponding key to the one in index.coffee is historyserver.**web**.refresh-interval, which is set to 10 seconds. Nevertheless i wanted to increase the default fs.refresh-interval anyway. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105922775 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives; + private final File webDir; + private final File webTmpDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + + JobArchiveFetcherTask(List refreshDirs, File webDir) {
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105906846 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java --- @@ -32,6 +32,13 @@ public static final ConfigOption MAX_ATTEMPTS_HISTORY_SIZE = key("job-manager.max-attempts-history-size").defaultValue(16); + /** +* The location where the {@link JobManager} stores the archives for finished jobs. +*/ + public static final ConfigOption ARCHIVE_DIR = + key("jobmanager.archive.dir") --- End diff -- Should we rename this to be in line with my other proposal to allow future changes? E.g. `jobmanager.archive.fs.dir`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105918471 --- Diff: flink-runtime-web/web-dashboard/app/index_hs.jade --- @@ -0,0 +1,60 @@ +// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +doctype html +html(lang='en') + head +meta(charset='utf-8') +meta(http-equiv='X-UA-Compatible', content='IE=edge') +meta(name='viewport', content='width=device-width, initial-scale=1') + +title Apache Flink Web Dashboard + +link(rel="apple-touch-icon", sizes="180x180", href="images/apple-touch-icon.png") +link(rel="icon", type="image/png", href="images/favicon-32x32.png", sizes="32x32") +link(rel="icon", type="image/png", href="images/favicon-16x16.png", sizes="16x16") +link(rel="manifest", href="images/manifest.json") +link(rel="mask-icon", href="images/safari-pinned-tab.svg", color="#aa1919") +link(rel="shortcut icon", href="images/favicon.ico") +meta(name="msapplication-config", content="images/browserconfig.xml") +meta(name="theme-color", content="#ff") + +link(rel='stylesheet', href='css/vendor.css', type='text/css') +link(rel='stylesheet', href='css/index.css', type='text/css') + +script(src="js/vendor.js") +script(src="js/hs/index.js") + + body(ng-app="flinkApp" ng-strict-di) +#sidebar(ng-class="{ 'sidebar-visible': sidebarVisible }") + nav.navbar.navbar-inverse.navbar-static-top +.navbar-header + a.navbar-brand(ui-sref="completed-jobs") +img.logo(alt="Apache Flink Dashboard" src="images/flink-logo.png") + a.navbar-brand.navbar-brand-text(ui-sref="completed-jobs") +| Apache Flink Dashboard --- End diff -- Also rename to `Apache Flink History Server`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105898166 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives; + private final File webDir; + private final File webTmpDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + + JobArchiveFetcherTask(List refreshDirs, File webDir) {
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105894483 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives; + private final File webDir; + private final File webTmpDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + + JobArchiveFetcherTask(List refreshDirs, File webDir) {
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105876698 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final HistoryServerArchiveFetcher archiveFetcher; + + private final SSLContext serverSSLContext; + private WebFrontendBootstrap netty; + + private final Object startupShutdownLock = new Object(); + private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); + private final Thread shutdownHook; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + final Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + // run the job manager + SecurityUtils.install(new SecurityUtils.SecurityConfiguration(flinkConfig)); + + try { + SecurityUtils.getInstalledContext().runSecured(new Callable() { + @Override + public Integer call() throws Exception { + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + return 0; + } + }); + System.exit(0); + } catch (UndeclaredThrowableException ute) { + Throwable cause = ute. getUndeclaredThrowable(); + LOG.error("Failed to run HistoryServer.", cause); + cause.printStackTrace(); + System.exit(1); + } catch (Exception e) { + LOG.error("Failed to run HistoryServer.", e); + e.printStackTrace(); + System.exit(1); + } + } + + public HistoryServer(Configuration config) throws IOException, FlinkException { + if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { +
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105894735 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives; + private final File webDir; + private final File webTmpDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + + JobArchiveFetcherTask(List refreshDirs, File webDir) {
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105877694 --- Diff: flink-runtime-web/web-dashboard/app/index_hs.jade --- @@ -0,0 +1,60 @@ +// + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +doctype html +html(lang='en') + head +meta(charset='utf-8') +meta(http-equiv='X-UA-Compatible', content='IE=edge') +meta(name='viewport', content='width=device-width, initial-scale=1') + +title Apache Flink Web Dashboard --- End diff -- Should we adjust the title to `Apache Flink History Server`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105870188 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +public class HistoryServer { --- End diff -- Please add a high level comment describing what the history server does. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105898278 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives; + private final File webDir; + private final File webTmpDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + + JobArchiveFetcherTask(List refreshDirs, File webDir) {
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105875866 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final HistoryServerArchiveFetcher archiveFetcher; + + private final SSLContext serverSSLContext; + private WebFrontendBootstrap netty; + + private final Object startupShutdownLock = new Object(); + private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); + private final Thread shutdownHook; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + final Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + // run the job manager + SecurityUtils.install(new SecurityUtils.SecurityConfiguration(flinkConfig)); + + try { + SecurityUtils.getInstalledContext().runSecured(new Callable() { + @Override + public Integer call() throws Exception { + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + return 0; + } + }); + System.exit(0); + } catch (UndeclaredThrowableException ute) { + Throwable cause = ute. getUndeclaredThrowable(); + LOG.error("Failed to run HistoryServer.", cause); + cause.printStackTrace(); + System.exit(1); + } catch (Exception e) { + LOG.error("Failed to run HistoryServer.", e); + e.printStackTrace(); + System.exit(1); + } + } + + public HistoryServer(Configuration config) throws IOException, FlinkException { + if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { +
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105893756 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives; + private final File webDir; + private final File webTmpDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + + JobArchiveFetcherTask(List refreshDirs, File webDir) {
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105918260 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import io.netty.channel.ChannelHandler; +import org.apache.flink.runtime.webmonitor.files.AbstractStaticFileServerHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +@ChannelHandler.Sharable +public class HistoryServerStaticFileServerHandler extends AbstractStaticFileServerHandler { --- End diff -- You are right that this was not good in the first place, but refactoring an untested class makes this even more pressing. Let's handle this in a follow up issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105874859 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final HistoryServerArchiveFetcher archiveFetcher; + + private final SSLContext serverSSLContext; + private WebFrontendBootstrap netty; + + private final Object startupShutdownLock = new Object(); + private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); + private final Thread shutdownHook; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + final Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + // run the job manager + SecurityUtils.install(new SecurityUtils.SecurityConfiguration(flinkConfig)); + + try { + SecurityUtils.getInstalledContext().runSecured(new Callable() { + @Override + public Integer call() throws Exception { + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + return 0; + } + }); + System.exit(0); + } catch (UndeclaredThrowableException ute) { + Throwable cause = ute. getUndeclaredThrowable(); + LOG.error("Failed to run HistoryServer.", cause); + cause.printStackTrace(); + System.exit(1); + } catch (Exception e) { + LOG.error("Failed to run HistoryServer.", e); + e.printStackTrace(); + System.exit(1); + } + } + + public HistoryServer(Configuration config) throws IOException, FlinkException { + if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { +
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105876054 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final HistoryServerArchiveFetcher archiveFetcher; + + private final SSLContext serverSSLContext; + private WebFrontendBootstrap netty; + + private final Object startupShutdownLock = new Object(); + private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); + private final Thread shutdownHook; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + final Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + // run the job manager + SecurityUtils.install(new SecurityUtils.SecurityConfiguration(flinkConfig)); + + try { + SecurityUtils.getInstalledContext().runSecured(new Callable() { + @Override + public Integer call() throws Exception { + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + return 0; + } + }); + System.exit(0); + } catch (UndeclaredThrowableException ute) { + Throwable cause = ute. getUndeclaredThrowable(); + LOG.error("Failed to run HistoryServer.", cause); + cause.printStackTrace(); + System.exit(1); + } catch (Exception e) { + LOG.error("Failed to run HistoryServer.", e); + e.printStackTrace(); + System.exit(1); + } + } + + public HistoryServer(Configuration config) throws IOException, FlinkException { + if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { +
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105898216 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives; + private final File webDir; + private final File webTmpDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + + JobArchiveFetcherTask(List refreshDirs, File webDir) {
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105893041 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives; --- End diff -- In the long run, we should make the size of this map bounded with a high default to bound the memory consumption. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105898348 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives; + private final File webDir; + private final File webTmpDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + + JobArchiveFetcherTask(List refreshDirs, File webDir) {
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105878628 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala --- @@ -247,8 +248,11 @@ class LocalFlinkMiniCluster( // Props for the distributed components // - def getArchiveProps(archiveClass: Class[_ <: MemoryArchivist], archiveCount: Int): Props = { -JobManager.getArchiveProps(archiveClass, archiveCount) + def getArchiveProps( +archiveClass: Class[_ <: MemoryArchivist], --- End diff -- I think the arguments should have one more level of indentation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105899831 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives; + private final File webDir; + private final File webTmpDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + + JobArchiveFetcherTask(List refreshDirs, File webDir) {
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105868993 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * The set of configuration options relating to the HistoryServer. + */ +@PublicEvolving +public class HistoryServerOptions { + + /** +* The interval at which the HistoryServer polls {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for new archives. +*/ + public static final ConfigOption HISTORY_SERVER_REFRESH_INTERVAL = + key("historyserver.refresh-interval") + .defaultValue(3000L); + + /** +* Comma-separated list of directories which the HistoryServer polls for new archives. +*/ + public static final ConfigOption HISTORY_SERVER_DIRS = + key("historyserver.archive.dirs") + .noDefaultValue(); + + /** +* The local directory used by the HistoryServer web-frontend. +*/ + public static final ConfigOption HISTORY_SERVER_WEB_DIR = + key("historyserver.web.dir") --- End diff -- Should we call this `historyserver.web.tmpDir` to better distinguish against the other `fs` config options? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105898701 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { --- End diff -- Please add a high level comment describing what the fetcher does mentioning that we copy files from the archive dirs to the local dirs etc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105904946 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/AbstractStaticFileServerHandler.java --- @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.files; + +/* + * This code is based on the "HttpStaticFileServerHandler" from the + * Netty project's HTTP server example. + * + * See http://netty.io and + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java + */ + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpChunkedInput; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Routed; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedFile; +import io.netty.util.CharsetUtil; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.net.URI; +import java.net.URL; +import java.nio.file.Files; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.Locale; +import java.util.TimeZone; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpHeaders.Names.DATE; +import static io.netty.handler.codec.http.HttpHeaders.Names.EXPIRES; +import static io.netty.handler.codec.http.HttpHeaders.Names.IF_MODIFIED_SINCE; +import static io.netty.handler.codec.http.HttpHeaders.Names.LAST_MODIFIED; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_MODIFIED; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * Simple file server handler that serves requests to web frontend's static files, such as + * HTML, CSS, or JS files. + * + * This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server + * example. + * + * For every incoming requests the {@link Routed#path()} is pre-processed in + * {@link AbstractStaticFileServerHandler#preProcessRequestPath(String)}. + * + * This path is then interpreted as a relative file path, with the configured rootDir being the parent. + * + * If no file exists for this path, another (optional) pre-processing step is executed in + * {@link AbstractStaticFileServerHandler#preProcessFilePath(String)}.
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105905806 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala --- @@ -183,6 +198,45 @@ class MemoryArchivist(private val max_entries: Int) } } + private def archiveJsonFiles(graph: ArchivedExecutionGraph) { +future { + try { +val rootPath = new Path(flinkConfiguration.getString( --- End diff -- Since this has no default value, we need to check whether the path is null. You should check this before submitting the future to safe work when no directory is configured. I get the following Exception in the logs: ``` 2017-03-14 14:24:41,953 ERROR org.apache.flink.runtime.jobmanager.MemoryArchivist - Failed to archive job. java.lang.IllegalArgumentException: Can not create a Path from a null string at org.apache.flink.core.fs.Path.checkAndTrimPathArg(Path.java:159) at org.apache.flink.core.fs.Path.(Path.java:176) at org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$org$apache$flink$runtime$jobmanager$MemoryArchivist$$archiveJsonFiles$1.apply$mcV$sp(MemoryArchivist.scala:204) at org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$org$apache$flink$runtime$jobmanager$MemoryArchivist$$archiveJsonFiles$1.apply(MemoryArchivist.scala:203) at org.apache.flink.runtime.jobmanager.MemoryArchivist$$anonfun$org$apache$flink$runtime$jobmanager$MemoryArchivist$$archiveJsonFiles$1.apply(MemoryArchivist.scala:203) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105868354 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * The set of configuration options relating to the HistoryServer. + */ +@PublicEvolving +public class HistoryServerOptions { --- End diff -- Should we rename the following keys to make them filesystem-specific, leaving the option to implement other ways of data exchange between the JM and history server. ``` historyserver.refresh-interval -> historyserver.archive.fs.refresh-interval historyserver.archive.dirs -> historyserver.archive.fs.dirs ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105906412 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java --- @@ -27,6 +27,11 @@ * resembling the REST API. */ public class ArchivedJson { + + public static final String ARCHIVE = "archive"; --- End diff -- Instead of having these constants here that are accessed in different placed, we should encapsulate writing out and reading in stuff in a single class. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105869478 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * The set of configuration options relating to the HistoryServer. + */ +@PublicEvolving +public class HistoryServerOptions { + + /** +* The interval at which the HistoryServer polls {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for new archives. +*/ + public static final ConfigOption HISTORY_SERVER_REFRESH_INTERVAL = + key("historyserver.refresh-interval") + .defaultValue(3000L); + + /** +* Comma-separated list of directories which the HistoryServer polls for new archives. +*/ + public static final ConfigOption HISTORY_SERVER_DIRS = + key("historyserver.archive.dirs") + .noDefaultValue(); + + /** +* The local directory used by the HistoryServer web-frontend. +*/ + public static final ConfigOption HISTORY_SERVER_WEB_DIR = + key("historyserver.web.dir") + .noDefaultValue(); + + /** +* The address under which the HistoryServer web-frontend is accessible. +*/ + public static final ConfigOption HISTORY_SERVER_WEB_ADDRESS = + key("historyserver.web.address") + .noDefaultValue(); + + /** +* The port under which the HistoryServer web-frontend is accessible. +*/ + public static final ConfigOption HISTORY_SERVER_WEB_PORT = + key("historyserver.web.port") + .defaultValue(8082); + + /** +* The refresh interval for the HistoryServer web-frontend in milliseconds. +*/ + public static final ConfigOption HISTORY_SERVER_WEB_REFRESH_INTERVAL = --- End diff -- I've seen this in the web frontend as well, but I'm wondering what it is exactly. Is it the same interval that is configured in the `index.coffee` file? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105870285 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.FlinkException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final HistoryServerArchiveFetcher archiveFetcher; + + private final SSLContext serverSSLContext; + private WebFrontendBootstrap netty; + + private final Object startupShutdownLock = new Object(); + private final AtomicBoolean shutdownRequested = new AtomicBoolean(false); + private final Thread shutdownHook; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + final Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + // run the job manager --- End diff -- Copy paste comment, please adjust --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105907455 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala --- @@ -183,6 +198,45 @@ class MemoryArchivist(private val max_entries: Int) } } + private def archiveJsonFiles(graph: ArchivedExecutionGraph) { +future { + try { +val rootPath = new Path(flinkConfiguration.getString( + JobManagerOptions.ARCHIVE_DIR)) +val fs = rootPath.getFileSystem +val path = new Path(rootPath, s"${graph.getJobID.toString}") +val out = fs.create(path, WriteMode.NO_OVERWRITE) + +try { + val gen = jacksonFactory.createGenerator(out, JsonEncoding.UTF8) + try { +gen.writeStartObject() +gen.writeArrayFieldStart(ArchivedJson.ARCHIVE) +for (archiver <- WebMonitorUtils.getJsonArchivists) { + for (archive <- archiver.archiveJsonWithPath(graph).asScala) { +gen.writeStartObject() +gen.writeStringField(ArchivedJson.PATH, archive.getPath) +gen.writeStringField(ArchivedJson.JSON, archive.getJson) +gen.writeEndObject() + } +} +gen.writeEndArray() +gen.writeEndObject() --- End diff -- Should we add a log message that the job has been archived in the path XYZ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105869850 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.configuration; + +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** + * The set of configuration options relating to the HistoryServer. + */ +@PublicEvolving +public class HistoryServerOptions { + + /** +* The interval at which the HistoryServer polls {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for new archives. +*/ + public static final ConfigOption HISTORY_SERVER_REFRESH_INTERVAL = + key("historyserver.refresh-interval") + .defaultValue(3000L); --- End diff -- We wanted to increase this to 10secs (same as in `index.coffee`) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r105897968 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServerArchiveFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerArchiveFetcher.class); + + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-ArchiveFetcher")); + private final JobArchiveFetcherTask fetcherTask; + private final long refreshIntervalMillis; + + HistoryServerArchiveFetcher(long refreshIntervalMillis, List refreshDirs, File webDir) { + this.refreshIntervalMillis = refreshIntervalMillis; + this.fetcherTask = new JobArchiveFetcherTask(refreshDirs, webDir); + if (LOG.isInfoEnabled()) { + for (HistoryServer.RefreshLocation refreshDir : refreshDirs) { + LOG.info("Monitoring directory {} for archived jobs.", refreshDir.getPath()); + } + } + } + + void start() { + executor.scheduleWithFixedDelay(fetcherTask, 0, refreshIntervalMillis, TimeUnit.MILLISECONDS); + } + + void stop() { + executor.shutdown(); + + try { + if (!executor.awaitTermination(1, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException ignored) { + executor.shutdownNow(); + } + } + + /** +* {@link TimerTask} that polls the directories configured as {@link HistoryServerOptions#HISTORY_SERVER_DIRS} for +* new job archives. +*/ + static class JobArchiveFetcherTask extends TimerTask { + private final List refreshDirs; + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives; + private final File webDir; + private final File webTmpDir; + private final File webJobDir; + private final File webOverviewDir; + + private static final String JSON_FILE_ENDING = ".json"; + + JobArchiveFetcherTask(List refreshDirs, File webDir) {
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104643908 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.NettySetup; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final String OVERVIEWS_FOLDER_NAME = "overviews"; + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final long refreshIntervalMillis; + private final List refreshDirs = new ArrayList<>(); + + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives = new ConcurrentHashMap<>(); + + private final SSLContext serverSSLContext; + private NettySetup netty; + private final Object startupShutdownLock = new Object(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-FileFetcher")); + private final JobArchiveFetcherTask fetcherTask; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + } + + public HistoryServer(Configuration config) throws IOException { + if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { + LOG.info("Enabling ssl for the history
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104640647 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala --- @@ -183,6 +193,33 @@ class MemoryArchivist(private val max_entries: Int) } } + private def archiveJsonFiles(graph: ArchivedExecutionGraph) { +future { + val rootPath = new Path(flinkConfiguration.getString( +JobManagerOptions.ARCHIVE_DIR)) + val fs = rootPath.getFileSystem + val tmpArchivePath = new Path(rootPath, s"tmp_${graph.getJobID.toString}") + for (archiver <- WebMonitorUtils.getArchivers) { +try { + for (archive <- archiver.archiveJsonWithPath(graph).asScala) { +val targetPath = + new Path(tmpArchivePath, s"${archive.getPath}.json") +val out = fs.create(targetPath, false) +out.write(archive.getJson.getBytes(StandardCharsets.UTF_8)) +out.close() + } +} catch { + case ioe: IOException => { +log.error("Failed to archive job details.", ioe) + } +} + } + if (!fs.rename(tmpArchivePath, new Path(rootPath, s"${graph.getJobID.toString}"))) { --- End diff -- yes. Once we write out 1 file this should no longer be an issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104640447 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.NettySetup; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final String OVERVIEWS_FOLDER_NAME = "overviews"; + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final long refreshIntervalMillis; + private final List refreshDirs = new ArrayList<>(); + + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives = new ConcurrentHashMap<>(); + + private final SSLContext serverSSLContext; + private NettySetup netty; + private final Object startupShutdownLock = new Object(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-FileFetcher")); + private final JobArchiveFetcherTask fetcherTask; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + } + + public HistoryServer(Configuration config) throws IOException { + if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { + LOG.info("Enabling ssl for the history
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104640380 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.NettySetup; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final String OVERVIEWS_FOLDER_NAME = "overviews"; + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final long refreshIntervalMillis; + private final List refreshDirs = new ArrayList<>(); + + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives = new ConcurrentHashMap<>(); + + private final SSLContext serverSSLContext; + private NettySetup netty; + private final Object startupShutdownLock = new Object(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-FileFetcher")); + private final JobArchiveFetcherTask fetcherTask; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + } + + public HistoryServer(Configuration config) throws IOException { --- End diff -- that's ood, will look into it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104635328 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.NettySetup; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final String OVERVIEWS_FOLDER_NAME = "overviews"; + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final long refreshIntervalMillis; + private final List refreshDirs = new ArrayList<>(); + + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives = new ConcurrentHashMap<>(); + + private final SSLContext serverSSLContext; + private NettySetup netty; + private final Object startupShutdownLock = new Object(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-FileFetcher")); + private final JobArchiveFetcherTask fetcherTask; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + } + + public HistoryServer(Configuration config) throws IOException { + if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { + LOG.info("Enabling ssl for the history
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104634695 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.NettySetup; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final String OVERVIEWS_FOLDER_NAME = "overviews"; + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final long refreshIntervalMillis; + private final List refreshDirs = new ArrayList<>(); + + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives = new ConcurrentHashMap<>(); + + private final SSLContext serverSSLContext; + private NettySetup netty; + private final Object startupShutdownLock = new Object(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-FileFetcher")); + private final JobArchiveFetcherTask fetcherTask; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + } + + public HistoryServer(Configuration config) throws IOException { + if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { + LOG.info("Enabling ssl for the history
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104634061 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.NettySetup; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final String OVERVIEWS_FOLDER_NAME = "overviews"; + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final long refreshIntervalMillis; + private final List refreshDirs = new ArrayList<>(); + + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives = new ConcurrentHashMap<>(); + + private final SSLContext serverSSLContext; + private NettySetup netty; + private final Object startupShutdownLock = new Object(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-FileFetcher")); + private final JobArchiveFetcherTask fetcherTask; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + } + + public HistoryServer(Configuration config) throws IOException { + if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { + LOG.info("Enabling ssl for the history
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r10463 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import io.netty.channel.ChannelHandler; +import org.apache.flink.runtime.webmonitor.files.AbstractStaticFileServerHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +@ChannelHandler.Sharable +public class HistoryServerStaticFileServerHandler extends AbstractStaticFileServerHandler { --- End diff -- All StaticFileServer classes are currently 100% untested. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104436518 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerOptions.java --- @@ -32,6 +32,10 @@ public static final ConfigOption MAX_ATTEMPTS_HISTORY_SIZE = key("job-manager.max-attempts-history-size").defaultValue(16); + public static final ConfigOption ARCHIVE_DIR = + key("job-manager.archive.dir") --- End diff -- I know that you are following this classes example, but we have `jobmanager` as the regular config option spelling in the shipped default config. We need to change this and create a issue to rename the max attempts key accordingly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104432442 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.NettySetup; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final String OVERVIEWS_FOLDER_NAME = "overviews"; + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final long refreshIntervalMillis; + private final List refreshDirs = new ArrayList<>(); + + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives = new ConcurrentHashMap<>(); + + private final SSLContext serverSSLContext; + private NettySetup netty; + private final Object startupShutdownLock = new Object(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-FileFetcher")); + private final JobArchiveFetcherTask fetcherTask; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + } + + public HistoryServer(Configuration config) throws IOException { + if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { + LOG.info("Enabling ssl for the history
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104433910 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.NettySetup; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final String OVERVIEWS_FOLDER_NAME = "overviews"; + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final long refreshIntervalMillis; + private final List refreshDirs = new ArrayList<>(); + + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives = new ConcurrentHashMap<>(); + + private final SSLContext serverSSLContext; + private NettySetup netty; + private final Object startupShutdownLock = new Object(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-FileFetcher")); + private final JobArchiveFetcherTask fetcherTask; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + } + + public HistoryServer(Configuration config) throws IOException { + if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { + LOG.info("Enabling ssl for the history
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104432749 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.NettySetup; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final String OVERVIEWS_FOLDER_NAME = "overviews"; + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final long refreshIntervalMillis; + private final List refreshDirs = new ArrayList<>(); + + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives = new ConcurrentHashMap<>(); + + private final SSLContext serverSSLContext; + private NettySetup netty; + private final Object startupShutdownLock = new Object(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-FileFetcher")); + private final JobArchiveFetcherTask fetcherTask; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + } + + public HistoryServer(Configuration config) throws IOException { + if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { + LOG.info("Enabling ssl for the history
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104434129 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.NettySetup; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final String OVERVIEWS_FOLDER_NAME = "overviews"; + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final long refreshIntervalMillis; + private final List refreshDirs = new ArrayList<>(); + + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives = new ConcurrentHashMap<>(); + + private final SSLContext serverSSLContext; + private NettySetup netty; + private final Object startupShutdownLock = new Object(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-FileFetcher")); + private final JobArchiveFetcherTask fetcherTask; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + } + + public HistoryServer(Configuration config) throws IOException { + if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { + LOG.info("Enabling ssl for the history
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104433701 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.NettySetup; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final String OVERVIEWS_FOLDER_NAME = "overviews"; + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final long refreshIntervalMillis; + private final List refreshDirs = new ArrayList<>(); + + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives = new ConcurrentHashMap<>(); + + private final SSLContext serverSSLContext; + private NettySetup netty; + private final Object startupShutdownLock = new Object(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-FileFetcher")); + private final JobArchiveFetcherTask fetcherTask; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + } + + public HistoryServer(Configuration config) throws IOException { + if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { + LOG.info("Enabling ssl for the history
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104409593 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import io.netty.channel.ChannelHandler; +import org.apache.flink.runtime.webmonitor.files.AbstractStaticFileServerHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +@ChannelHandler.Sharable +public class HistoryServerStaticFileServerHandler extends AbstractStaticFileServerHandler { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServerStaticFileServerHandler.class); + + // + + private final HistoryServer.JobFileFetcher fileFetcher; + + public HistoryServerStaticFileServerHandler(File rootPath, HistoryServer.JobFileFetcher fileFetcher) throws IOException { --- End diff -- Can be package private --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104431499 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java --- @@ -205,215 +125,18 @@ public void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Except } } - /** -* Response when running with leading JobManager. -*/ - private void respondAsLeader(ChannelHandlerContext ctx, HttpRequest request, String requestPath) - throws IOException, ParseException, URISyntaxException { - - // convert to absolute path - final File file = new File(rootPath, requestPath); - - if (!file.exists()) { - // file does not exist. Try to load it with the classloader - ClassLoader cl = StaticFileServerHandler.class.getClassLoader(); - - try(InputStream resourceStream = cl.getResourceAsStream("web" + requestPath)) { - boolean success = false; - try { - if (resourceStream != null) { - URL root = cl.getResource("web"); - URL requested = cl.getResource("web" + requestPath); - - if (root != null && requested != null) { - URI rootURI = new URI(root.getPath()).normalize(); - URI requestedURI = new URI(requested.getPath()).normalize(); - - // Check that we don't load anything from outside of the - // expected scope. - if (!rootURI.relativize(requestedURI).equals(requestedURI)) { - logger.debug("Loading missing file from classloader: {}", requestPath); - // ensure that directory to file exists. - file.getParentFile().mkdirs(); - Files.copy(resourceStream, file.toPath()); - - success = true; - } - } - } - } catch (Throwable t) { - logger.error("error while responding", t); - } finally { - if (!success) { - logger.debug("Unable to load requested file {} from classloader", requestPath); - sendError(ctx, NOT_FOUND); - return; - } - } - } - } - - if (!file.exists() || file.isHidden() || file.isDirectory() || !file.isFile()) { - sendError(ctx, NOT_FOUND); - return; - } - - if (!file.getCanonicalFile().toPath().startsWith(rootPath.toPath())) { - sendError(ctx, NOT_FOUND); - return; - } - - // cache validation - final String ifModifiedSince = request.headers().get(IF_MODIFIED_SINCE); - if (ifModifiedSince != null && !ifModifiedSince.isEmpty()) { - SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US); - Date ifModifiedSinceDate = dateFormatter.parse(ifModifiedSince); - - // Only compare up to the second because the datetime format we send to the client - // does not have milliseconds - long ifModifiedSinceDateSeconds = ifModifiedSinceDate.getTime() / 1000; - long fileLastModifiedSeconds = file.lastModified() / 1000; - if (ifModifiedSinceDateSeconds == fileLastModifiedSeconds) { - if (logger.isDebugEnabled()) { - logger.debug("Responding 'NOT MODIFIED' for file '" + file.getAbsolutePath() + '\''); - } - - sendNotModified(ctx); - return; - } -
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104436179 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala --- @@ -183,6 +193,33 @@ class MemoryArchivist(private val max_entries: Int) } } + private def archiveJsonFiles(graph: ArchivedExecutionGraph) { +future { + val rootPath = new Path(flinkConfiguration.getString( +JobManagerOptions.ARCHIVE_DIR)) + val fs = rootPath.getFileSystem + val tmpArchivePath = new Path(rootPath, s"tmp_${graph.getJobID.toString}") + for (archiver <- WebMonitorUtils.getArchivers) { +try { + for (archive <- archiver.archiveJsonWithPath(graph).asScala) { +val targetPath = + new Path(tmpArchivePath, s"${archive.getPath}.json") +val out = fs.create(targetPath, false) +out.write(archive.getJson.getBytes(StandardCharsets.UTF_8)) +out.close() + } +} catch { + case ioe: IOException => { +log.error("Failed to archive job details.", ioe) + } +} + } + if (!fs.rename(tmpArchivePath, new Path(rootPath, s"${graph.getJobID.toString}"))) { --- End diff -- Do we have lingering files if the rename does not work as expected? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104436017 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala --- @@ -183,6 +193,33 @@ class MemoryArchivist(private val max_entries: Int) } } + private def archiveJsonFiles(graph: ArchivedExecutionGraph) { +future { + val rootPath = new Path(flinkConfiguration.getString( --- End diff -- Since this is executed async, we should add a safety try-catch around it that logs any exceptions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104401428 --- Diff: flink-dist/src/main/flink-bin/bin/historyserver.sh --- @@ -0,0 +1,37 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Start/stop a Flink HistoryServer +USAGE="Usage: historyserver.sh (start|stop)" + +STARTSTOP=$1 + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +if [[ $STARTSTOP == "start" ]]; then + # export HS specific env opts + --- End diff -- Empty line here and in line 34 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104433142 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.NettySetup; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final String OVERVIEWS_FOLDER_NAME = "overviews"; + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final long refreshIntervalMillis; + private final List refreshDirs = new ArrayList<>(); + + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives = new ConcurrentHashMap<>(); + + private final SSLContext serverSSLContext; + private NettySetup netty; + private final Object startupShutdownLock = new Object(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-FileFetcher")); + private final JobArchiveFetcherTask fetcherTask; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + } + + public HistoryServer(Configuration config) throws IOException { + if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { + LOG.info("Enabling ssl for the history
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104436093 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala --- @@ -183,6 +193,33 @@ class MemoryArchivist(private val max_entries: Int) } } + private def archiveJsonFiles(graph: ArchivedExecutionGraph) { +future { + val rootPath = new Path(flinkConfiguration.getString( +JobManagerOptions.ARCHIVE_DIR)) + val fs = rootPath.getFileSystem + val tmpArchivePath = new Path(rootPath, s"tmp_${graph.getJobID.toString}") + for (archiver <- WebMonitorUtils.getArchivers) { +try { + for (archive <- archiver.archiveJsonWithPath(graph).asScala) { +val targetPath = + new Path(tmpArchivePath, s"${archive.getPath}.json") +val out = fs.create(targetPath, false) --- End diff -- Can we use try with resources here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104401910 --- Diff: flink-dist/src/main/flink-bin/bin/historyserver.sh --- @@ -0,0 +1,37 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Start/stop a Flink HistoryServer +USAGE="Usage: historyserver.sh (start|stop)" + +STARTSTOP=$1 + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` + +. "$bin"/config.sh + +if [[ $STARTSTOP == "start" ]]; then + # export HS specific env opts --- End diff -- We are not exporting anything here. Is this a left over comment? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104408644 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/AbstractStaticFileServerHandler.java --- @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.files; + +/* + * This code is based on the "HttpStaticFileServerHandler" from the + * Netty project's HTTP server example. + * + * See http://netty.io and + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java + */ + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpChunkedInput; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Routed; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedFile; +import io.netty.util.CharsetUtil; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.net.URI; +import java.net.URL; +import java.nio.file.Files; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.Locale; +import java.util.TimeZone; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpHeaders.Names.DATE; +import static io.netty.handler.codec.http.HttpHeaders.Names.EXPIRES; +import static io.netty.handler.codec.http.HttpHeaders.Names.IF_MODIFIED_SINCE; +import static io.netty.handler.codec.http.HttpHeaders.Names.LAST_MODIFIED; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_MODIFIED; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * Simple file server handler that serves requests to web frontend's static files, such as + * HTML, CSS, or JS files. + * + * This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server + * example. + */ +public abstract class AbstractStaticFileServerHandler extends SimpleChannelInboundHandler { + + /** Timezone in which this server answers its "if-modified" requests */ + private static final TimeZone GMT_TIMEZONE = TimeZone.getTimeZone("GMT"); + + /** Date format for HTTP */ + public static final String HTTP_DATE_FORMAT = "EEE, dd MMM HH:mm:ss zzz"; + + /** Be default, we allow files to be cached for 5 minutes */ + private static
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104435301 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/NettySetup.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.router.Handler; +import io.netty.handler.codec.http.router.Router; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedWriteHandler; +import org.apache.flink.runtime.webmonitor.HttpRequestHandler; +import org.apache.flink.runtime.webmonitor.PipelineErrorHandler; +import org.slf4j.Logger; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import java.io.File; +import java.net.InetSocketAddress; + +public class NettySetup { + private final Router router; + private final Logger LOG; + private final File uploadDir; + private final SSLContext serverSSLContext; + private final ServerBootstrap bootstrap; + private final Channel serverChannel; + + public NettySetup(Router router, Logger log, File directory, SSLContext sslContext, String configuredAddress, int configuredPort) throws InterruptedException { + this.router = router; --- End diff -- Add checks for not null? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104401759 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.configuration; + +import static org.apache.flink.configuration.ConfigOptions.key; + +public class HistoryServerOptions { --- End diff -- Missing stability annotation, I think this should be `@PublicEvolving` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104401703 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.configuration; + +import static org.apache.flink.configuration.ConfigOptions.key; + +public class HistoryServerOptions { + + public static final ConfigOption HISTORY_SERVER_REFRESH_INTERVAL = + key("historyserver.refresh-interval") + .defaultValue(3000L); + + public static final ConfigOption HISTORY_SERVER_DIR = + key("historyserver.archive.dirs") + .noDefaultValue(); + + public static final ConfigOption HISTORY_SERVER_WEB_DIR = + key("historyserver.web.dir") + .noDefaultValue(); + + public static final ConfigOption HISTRY_SERVER_WEB_ADDRESS = --- End diff -- typo `HISTRY` => `HISTORY` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104407909 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/AbstractStaticFileServerHandler.java --- @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.files; + +/* + * This code is based on the "HttpStaticFileServerHandler" from the + * Netty project's HTTP server example. + * + * See http://netty.io and + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java + */ + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpChunkedInput; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Routed; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedFile; +import io.netty.util.CharsetUtil; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.net.URI; +import java.net.URL; +import java.nio.file.Files; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.Locale; +import java.util.TimeZone; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpHeaders.Names.DATE; +import static io.netty.handler.codec.http.HttpHeaders.Names.EXPIRES; +import static io.netty.handler.codec.http.HttpHeaders.Names.IF_MODIFIED_SINCE; +import static io.netty.handler.codec.http.HttpHeaders.Names.LAST_MODIFIED; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_MODIFIED; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * Simple file server handler that serves requests to web frontend's static files, such as + * HTML, CSS, or JS files. + * + * This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server + * example. + */ +public abstract class AbstractStaticFileServerHandler extends SimpleChannelInboundHandler { + + /** Timezone in which this server answers its "if-modified" requests */ + private static final TimeZone GMT_TIMEZONE = TimeZone.getTimeZone("GMT"); + + /** Date format for HTTP */ + public static final String HTTP_DATE_FORMAT = "EEE, dd MMM HH:mm:ss zzz"; + + /** Be default, we allow files to be cached for 5 minutes */ + private static
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104401622 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/HistoryServerOptions.java --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.configuration; + +import static org.apache.flink.configuration.ConfigOptions.key; + +public class HistoryServerOptions { --- End diff -- Can you please add a comment to this class and each config option. I know, some are obvious, but other are not. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104431925 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.NettySetup; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final String OVERVIEWS_FOLDER_NAME = "overviews"; + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final long refreshIntervalMillis; + private final List refreshDirs = new ArrayList<>(); + + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives = new ConcurrentHashMap<>(); + + private final SSLContext serverSSLContext; + private NettySetup netty; + private final Object startupShutdownLock = new Object(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-FileFetcher")); + private final JobArchiveFetcherTask fetcherTask; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + } + + public HistoryServer(Configuration config) throws IOException { + if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { + LOG.info("Enabling ssl for the history
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104435195 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/NettySetup.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.router.Handler; +import io.netty.handler.codec.http.router.Router; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedWriteHandler; +import org.apache.flink.runtime.webmonitor.HttpRequestHandler; +import org.apache.flink.runtime.webmonitor.PipelineErrorHandler; +import org.slf4j.Logger; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import java.io.File; +import java.net.InetSocketAddress; + +public class NettySetup { + private final Router router; + private final Logger LOG; --- End diff -- This is against common Java conventions. Let's go with the usual `log` or `logger`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104435068 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/NettySetup.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.router.Handler; +import io.netty.handler.codec.http.router.Router; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedWriteHandler; +import org.apache.flink.runtime.webmonitor.HttpRequestHandler; +import org.apache.flink.runtime.webmonitor.PipelineErrorHandler; +import org.slf4j.Logger; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import java.io.File; +import java.net.InetSocketAddress; + +public class NettySetup { --- End diff -- Again missing comments. Also, the name is too generic in my opinion. I know the package gives the context, but still. What do you think about something like "WebFrontendBootstrap"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104409758 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import io.netty.channel.ChannelHandler; +import org.apache.flink.runtime.webmonitor.files.AbstractStaticFileServerHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +@ChannelHandler.Sharable +public class HistoryServerStaticFileServerHandler extends AbstractStaticFileServerHandler { --- End diff -- Do you have a unit test for this? With the new refactoring, it should be possible to guard the behaviour with a very specific unit test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104431734 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.NettySetup; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final String OVERVIEWS_FOLDER_NAME = "overviews"; + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final long refreshIntervalMillis; + private final List refreshDirs = new ArrayList<>(); + + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives = new ConcurrentHashMap<>(); + + private final SSLContext serverSSLContext; + private NettySetup netty; + private final Object startupShutdownLock = new Object(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-FileFetcher")); + private final JobArchiveFetcherTask fetcherTask; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + } + + public HistoryServer(Configuration config) throws IOException { --- End diff -- I've noticed that the startup fails silently with output in *.out only when required configuration options are missing. I think we should make sure that they appear in
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104432833 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.NettySetup; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final String OVERVIEWS_FOLDER_NAME = "overviews"; + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final long refreshIntervalMillis; + private final List refreshDirs = new ArrayList<>(); + + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives = new ConcurrentHashMap<>(); + + private final SSLContext serverSSLContext; + private NettySetup netty; + private final Object startupShutdownLock = new Object(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-FileFetcher")); + private final JobArchiveFetcherTask fetcherTask; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + } + + public HistoryServer(Configuration config) throws IOException { + if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { + LOG.info("Enabling ssl for the history
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104408311 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java --- @@ -120,15 +59,9 @@ private final FiniteDuration timeout; - /** The path in which the static documents are */ - private final File rootPath; - /** Whether the web service has https enabled */ private final boolean httpsEnabled; - /** The log for all error reporting */ - private final Logger logger; - private String localJobManagerAddress; public StaticFileServerHandler( --- End diff -- Could you remove this constructor and only use the other one? This one is only used via the other one and the logger is never overwritten. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104407277 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/AbstractStaticFileServerHandler.java --- @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.files; + +/* + * This code is based on the "HttpStaticFileServerHandler" from the + * Netty project's HTTP server example. + * + * See http://netty.io and + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java + */ + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpChunkedInput; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Routed; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedFile; +import io.netty.util.CharsetUtil; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.net.URI; +import java.net.URL; +import java.nio.file.Files; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.Locale; +import java.util.TimeZone; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpHeaders.Names.DATE; +import static io.netty.handler.codec.http.HttpHeaders.Names.EXPIRES; +import static io.netty.handler.codec.http.HttpHeaders.Names.IF_MODIFIED_SINCE; +import static io.netty.handler.codec.http.HttpHeaders.Names.LAST_MODIFIED; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_MODIFIED; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * Simple file server handler that serves requests to web frontend's static files, such as + * HTML, CSS, or JS files. + * + * This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server + * example. + */ +public abstract class AbstractStaticFileServerHandler extends SimpleChannelInboundHandler { --- End diff -- Could you please update the comments to reflect your refactorings for the "request life cycle"? BTW You can remove the closing `` tag, too (I know it's copied from `master`). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104435595 --- Diff: flink-runtime-web/web-dashboard/app/scripts/index_hs.coffee --- @@ -0,0 +1,193 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +angular.module('flinkApp', ['ui.router', 'angularMoment', 'dndLists']) + +# -- + +.run ($rootScope) -> + $rootScope.sidebarVisible = false + $rootScope.showSidebar = -> +$rootScope.sidebarVisible = !$rootScope.sidebarVisible +$rootScope.sidebarClass = 'force-show' + +# -- + +.value 'flinkConfig', { + jobServer: '' +# jobServer: 'http://localhost:8081/' + "refresh-interval": 1 --- End diff -- The default UI refresh interval is larger than the history server refresh interval. I think both should be the same. I would go with 10s for both. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104433287 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java --- @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.history; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.handler.codec.http.router.Router; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HistoryServerOptions; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.net.SSLUtils; +import org.apache.flink.runtime.util.ExecutorThreadFactory; +import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.webmonitor.utils.NettySetup; +import org.apache.flink.util.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.net.ssl.SSLContext; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class HistoryServer { + + private static final Logger LOG = LoggerFactory.getLogger(HistoryServer.class); + private static final JsonFactory jacksonFactory = new JsonFactory(); + private static final ObjectMapper mapper = new ObjectMapper(); + + private static final String OVERVIEWS_FOLDER_NAME = "overviews"; + + private final String webAddress; + private final int webPort; + private final long webRefreshIntervalMillis; + private final File webDir; + + private final long refreshIntervalMillis; + private final List refreshDirs = new ArrayList<>(); + + /** Map containing the JobID of all fetched jobs and the refreshDir from with they originate. */ + private final MapcachedArchives = new ConcurrentHashMap<>(); + + private final SSLContext serverSSLContext; + private NettySetup netty; + private final Object startupShutdownLock = new Object(); + + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("Flink-HistoryServer-FileFetcher")); + private final JobArchiveFetcherTask fetcherTask; + + public static void main(String[] args) throws Exception { + ParameterTool pt = ParameterTool.fromArgs(args); + String configDir = pt.getRequired("configDir"); + + LOG.info("Loading configuration from {}", configDir); + Configuration flinkConfig = GlobalConfiguration.loadConfiguration(configDir); + + HistoryServer hs = new HistoryServer(flinkConfig); + hs.run(); + } + + public HistoryServer(Configuration config) throws IOException { + if (config.getBoolean(HistoryServerOptions.HISTORY_SERVER_WEB_SSL_ENABLED) && SSLUtils.getSSLEnabled(config)) { + LOG.info("Enabling ssl for the history
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/3460#discussion_r104407696 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/AbstractStaticFileServerHandler.java --- @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.files; + +/* + * This code is based on the "HttpStaticFileServerHandler" from the + * Netty project's HTTP server example. + * + * See http://netty.io and + * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java + */ + +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpChunkedInput; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Routed; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedFile; +import io.netty.util.CharsetUtil; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; +import java.net.URI; +import java.net.URL; +import java.nio.file.Files; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.GregorianCalendar; +import java.util.Locale; +import java.util.TimeZone; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpHeaders.Names.DATE; +import static io.netty.handler.codec.http.HttpHeaders.Names.EXPIRES; +import static io.netty.handler.codec.http.HttpHeaders.Names.IF_MODIFIED_SINCE; +import static io.netty.handler.codec.http.HttpHeaders.Names.LAST_MODIFIED; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_MODIFIED; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +/** + * Simple file server handler that serves requests to web frontend's static files, such as + * HTML, CSS, or JS files. + * + * This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server + * example. + */ +public abstract class AbstractStaticFileServerHandler extends SimpleChannelInboundHandler { + + /** Timezone in which this server answers its "if-modified" requests */ + private static final TimeZone GMT_TIMEZONE = TimeZone.getTimeZone("GMT"); + + /** Date format for HTTP */ + public static final String HTTP_DATE_FORMAT = "EEE, dd MMM HH:mm:ss zzz"; + + /** Be default, we allow files to be cached for 5 minutes */ + private static
[GitHub] flink pull request #3460: [FLINK-1579] Implement History Server
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/3460 [FLINK-1579] Implement History Server This PR adds a slightly unpolished HistoryServer implementation. It is missing tests and some documentation, but is working. This PR builds on top of #3377. The basic idea is as follows: The ```MemoryArchivist```, upon receiving an ```ExecutionGraph```, writes a set of json files into a directory structure resembling the REST API using the features introduced in FLINK-5870, FLINK-5852 and FLINK-5941. The target location is configurable using ```job-manager.archive.dir```. Each job resides in it's own directory, using the job ID as the directory name. As such, each archive is consistent on it's own and multiple jobmanagers may use the same archive dir. The ```HistoryServer``` polls certain directories, configured via ```historyserver.archive.dirs```, in regular intervals, configured via ```historyserver.refresh-interval```, for new job archives. If a new archive is found it is downloaded and integrated into a cache of job archives in the local file system, configurable using ```historyserver.web.dir```. These files are served to a slightly modified WebFrontend using the ```HistoryServerStaticFileServerHandler```. In the end the HistoryServer is little more than an aggregator and archive viewer. None of the directory configuration options have defaults; as it stands the entire feature is opt-in. Should a file that the WebFrontend requests be missing a separate fetch routine kicks in which attempts to fetch the missing file. This is primarily aimed at eventually-consistent file-systems. The HistoryServer is started using the new historyserver.sh script, which works similarly to job- or taskmanager scripts: ```./bin/historyserver.sh [start|stop]``` 2 bigger refactorings were made to existing code to increase the amount of shared code: * the netty setup in the WebRuntimeMonitor was moved into a separate NettySetup class which the HistoryServer can use as well * an AbstractStaticFileServerHandler was added which the (HistoryServer)StaticFileServerHandler extend You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 1579_history_server_pr Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3460.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3460 commit 61a07456f151ac8f5418ac66629751e1a83ada3a Author: zentolDate: 2017-01-24T09:13:24Z [FLINK-1579] Implement History Server - Frontend commit e6316e544fea160f7d050dd1b087301a83345d31 Author: zentol Date: 2017-02-21T11:36:17Z [FLINK-5645] Store accumulators/metrics for canceled/failed tasks commit 84fd2746b09ce41c2d9bd5be7f6e8a8cc1a3291d Author: zentol Date: 2017-03-02T12:31:56Z Refactor netty setup into separate class commit 81d7e6b92fe69326d6edf6b63f3f9c95f5ebd0ef Author: zentol Date: 2017-02-22T14:47:07Z [FLINK-1579] Implement History Server - Backend commit 8d1e8c59690ea97be4bbaf1a011c8ec4a68f5892 Author: zentol Date: 2017-03-02T11:09:36Z Rebuild frontend --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---