[jira] [Commented] (FLINK-6630) Implement FLIP-6 MesosAppMasterRunner
[ https://issues.apache.org/jira/browse/FLINK-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134141#comment-16134141 ] ASF GitHub Bot commented on FLINK-6630: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4555 > Implement FLIP-6 MesosAppMasterRunner > - > > Key: FLINK-6630 > URL: https://issues.apache.org/jira/browse/FLINK-6630 > Project: Flink > Issue Type: Sub-task > Components: Mesos >Reporter: Eron Wright >Assignee: Eron Wright > Fix For: 1.4.0 > > > A new runner must be developed for the FLIP-6 RM. Target the "single job" > scenario. > Take some time to consider a general solution or a base implementation that > is shared with the old implementation. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6630) Implement FLIP-6 MesosAppMasterRunner
[ https://issues.apache.org/jira/browse/FLINK-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16134126#comment-16134126 ] ASF GitHub Bot commented on FLINK-6630: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4555 Merging this PR. > Implement FLIP-6 MesosAppMasterRunner > - > > Key: FLINK-6630 > URL: https://issues.apache.org/jira/browse/FLINK-6630 > Project: Flink > Issue Type: Sub-task > Components: Mesos >Reporter: Eron Wright >Assignee: Eron Wright > > A new runner must be developed for the FLIP-6 RM. Target the "single job" > scenario. > Take some time to consider a general solution or a base implementation that > is shared with the old implementation. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6630) Implement FLIP-6 MesosAppMasterRunner
[ https://issues.apache.org/jira/browse/FLINK-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133058#comment-16133058 ] ASF GitHub Bot commented on FLINK-6630: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4555#discussion_r133975100 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.entrypoint; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager; +import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters; +import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices; +import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; + +/** + * Entry point for Mesos per-job clusters. + */ +public class MesosJobClusterEntrypoint extends JobClusterEntrypoint { + + public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path"; + + // + // Command-line options + // + + private static final Options ALL_OPTIONS; + + static { + ALL_OPTIONS = + new Options() + .addOption(BootstrapTools.newDynamicPropertiesOption()); + } + + private MesosConfiguration schedulerConfiguration; + + private MesosServices mesosServices; + + private MesosTaskManagerParameters taskManagerParameters; + + private ContainerSpecification taskManagerContainerSpec; + + public MesosJobClusterEntrypoint(Configuration config) { + super(config); + } + + @Override + protected void initializeServices(Configuration config) throws Exception { + super.initializeServices(config); + + final String hostname = config.getString(JobManagerOptions.ADDRESS); + + // Mesos configuration + schedulerConfiguration =
[jira] [Commented] (FLINK-6630) Implement FLIP-6 MesosAppMasterRunner
[ https://issues.apache.org/jira/browse/FLINK-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133057#comment-16133057 ] ASF GitHub Bot commented on FLINK-6630: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4555#discussion_r133975661 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosTaskExecutorRunner.java --- @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.entrypoint; + +import org.apache.flink.configuration.AkkaOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.mesos.runtime.clusterframework.MesosConfigKeys; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.util.Preconditions; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.Callable; + +/** + * The entry point for running a TaskManager in a Mesos container. + */ +public class MesosTaskExecutorRunner { + + private static final Logger LOG = LoggerFactory.getLogger(MesosTaskExecutorRunner.class); + + private static final int INIT_ERROR_EXIT_CODE = 31; + + private static final Options ALL_OPTIONS; + + static { + ALL_OPTIONS = + new Options() + .addOption(BootstrapTools.newDynamicPropertiesOption()); + } + + /** The process environment variables. */ + private static final MapENV = System.getenv(); + + public static void main(String[] args) throws Exception { + EnvironmentInformation.logEnvironmentInfo(LOG, MesosTaskExecutorRunner.class.getSimpleName(), args); + SignalHandler.register(LOG); + JvmShutdownSafeguard.installAsShutdownHook(LOG); + + // try to parse the command line arguments + CommandLineParser parser = new PosixParser(); + CommandLine cmd = parser.parse(ALL_OPTIONS, args); + + final Configuration configuration; + try { + final Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd); + GlobalConfiguration.setDynamicProperties(dynamicProperties); + LOG.debug("Mesos dynamic properties: {}", dynamicProperties); + + configuration = GlobalConfiguration.loadConfiguration(); --- End diff -- Here I think we could use `MesosEntrypointUtils#loadConfiguration(cmd)`. > Implement FLIP-6 MesosAppMasterRunner > - > > Key: FLINK-6630 > URL: https://issues.apache.org/jira/browse/FLINK-6630 > Project: Flink > Issue Type: Sub-task > Components: Mesos >Reporter: Eron Wright >Assignee: Eron Wright > > A new runner must be developed for the FLIP-6 RM. Target the "single job" > scenario. > Take some time to consider a general solution or a base implementation that > is shared with the old
[jira] [Commented] (FLINK-6630) Implement FLIP-6 MesosAppMasterRunner
[ https://issues.apache.org/jira/browse/FLINK-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133056#comment-16133056 ] ASF GitHub Bot commented on FLINK-6630: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4555#discussion_r133974899 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosJobClusterEntrypoint.java --- @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.entrypoint; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager; +import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters; +import org.apache.flink.mesos.runtime.clusterframework.services.MesosServices; +import org.apache.flink.mesos.runtime.clusterframework.services.MesosServicesUtils; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.entrypoint.JobClusterEntrypoint; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.util.FlinkException; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; + +/** + * Entry point for Mesos per-job clusters. + */ +public class MesosJobClusterEntrypoint extends JobClusterEntrypoint { + + public static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path"; + + // + // Command-line options + // + + private static final Options ALL_OPTIONS; + + static { + ALL_OPTIONS = + new Options() + .addOption(BootstrapTools.newDynamicPropertiesOption()); + } + + private MesosConfiguration schedulerConfiguration; + + private MesosServices mesosServices; + + private MesosTaskManagerParameters taskManagerParameters; + + private ContainerSpecification taskManagerContainerSpec; + + public MesosJobClusterEntrypoint(Configuration config) { + super(config); + } + + @Override + protected void initializeServices(Configuration config) throws Exception { + super.initializeServices(config); + + final String hostname = config.getString(JobManagerOptions.ADDRESS); + + // Mesos configuration + schedulerConfiguration =
[jira] [Commented] (FLINK-6630) Implement FLIP-6 MesosAppMasterRunner
[ https://issues.apache.org/jira/browse/FLINK-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16133055#comment-16133055 ] ASF GitHub Bot commented on FLINK-6630: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4555#discussion_r133974223 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosEntrypointUtils.java --- @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.entrypoint; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.configuration.MesosOptions; +import org.apache.flink.mesos.runtime.clusterframework.MesosTaskManagerParameters; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.flink.runtime.clusterframework.overlays.CompositeContainerOverlay; +import org.apache.flink.runtime.clusterframework.overlays.FlinkDistributionOverlay; +import org.apache.flink.runtime.clusterframework.overlays.HadoopConfOverlay; +import org.apache.flink.runtime.clusterframework.overlays.HadoopUserOverlay; +import org.apache.flink.runtime.clusterframework.overlays.KeytabOverlay; +import org.apache.flink.runtime.clusterframework.overlays.Krb5ConfOverlay; +import org.apache.flink.runtime.clusterframework.overlays.SSLStoreOverlay; + +import org.apache.commons.cli.CommandLine; +import org.apache.mesos.Protos; +import org.slf4j.Logger; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +/** + * Utils for Mesos entrpoints. + */ +public class MesosEntrypointUtils { + + @Deprecated + public static Configuration loadConfiguration(CommandLine cmd) { + + // merge the dynamic properties from the command-line + Configuration dynamicProperties = BootstrapTools.parseDynamicProperties(cmd); + GlobalConfiguration.setDynamicProperties(dynamicProperties); + Configuration config = GlobalConfiguration.loadConfiguration(); + + return config; + } + + /** +* Loads and validates the Mesos scheduler configuration. +* @param flinkConfig the global configuration. +* @param hostname the hostname to advertise to the Mesos master. +*/ + public static MesosConfiguration createMesosSchedulerConfiguration(Configuration flinkConfig, String hostname) { + + Protos.FrameworkInfo.Builder frameworkInfo = Protos.FrameworkInfo.newBuilder() + .setHostname(hostname); + Protos.Credential.Builder credential = null; + + if (!flinkConfig.contains(MesosOptions.MASTER_URL)) { + throw new IllegalConfigurationException(MesosOptions.MASTER_URL.key() + " must be configured."); + } + String masterUrl = flinkConfig.getString(MesosOptions.MASTER_URL); + + Duration failoverTimeout = FiniteDuration.apply( + flinkConfig.getInteger( + MesosOptions.FAILOVER_TIMEOUT_SECONDS), + TimeUnit.SECONDS); + frameworkInfo.setFailoverTimeout(failoverTimeout.toSeconds()); + + frameworkInfo.setName(flinkConfig.getString( + MesosOptions.RESOURCEMANAGER_FRAMEWORK_NAME)); + + frameworkInfo.setRole(flinkConfig.getString( + MesosOptions.RESOURCEMANAGER_FRAMEWORK_ROLE)); + + frameworkInfo.setUser(flinkConfig.getString(
[jira] [Commented] (FLINK-6630) Implement FLIP-6 MesosAppMasterRunner
[ https://issues.apache.org/jira/browse/FLINK-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16129532#comment-16129532 ] ASF GitHub Bot commented on FLINK-6630: --- GitHub user EronWright opened a pull request: https://github.com/apache/flink/pull/4555 [FLINK-6630] [Mesos] Implement FLIP-6 MesosAppMasterRunner ## What is the purpose of the change Implement Mesos runners for FLIP-6. [FLINK-6630] [Mesos] Implement FLIP-6 MesosAppMasterRunner [FLINK-6631] [Mesos] Implement FLIP-6 MesosTaskExecutorRunner ## Brief change log - bin: new entrypoints scripts for flip-6 - ClusterEntrypoint: Refactor the shutdown method - ClusterEntrypoint: Install default FileSystem (for parity with legacy entrypoints) - ClusterEntrypoint: new MesosJobClusterEntrypoint, MesosSessionClusterEntrypoint, MesosEntrypointUtils, MesosTaskExecutorRunner - MesosServices: enhanced with artifactServer, localActorSystem - MesosResourceManager: Fallback to old TM params when UNKNOWN resource profile is provided - MesosResourceManager: config setting for taskmanager startup script (mesos.resourcemanager.tasks.taskmanager-cmd) - test: added a 'noop' job graph for testing purposes ## Testing This change involves manual testing and is verified as follows: 1. Configure Flink to use the FLIP-6 TM runner. ``` mesos.resourcemanager.tasks.taskmanager-cmd: $FLINK_HOME/bin/mesos-taskmanager-flip6.sh ``` 2. Configure other Mesos options as normal. 3. Launch a job-specific cluster using `mesos-appmaster-flip6-job.sh`: ``` $ bin/mesos-appmaster-flip6-job.sh -Dflink.jobgraph.path=/flink-tests/src/test/resources/jobgraphs/streaming-noop-3.graph ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/EronWright/flink FLINK-6630 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4555.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4555 commit 7d257db84f4b9bf2a02d1375a04ff64516266186 Author: Wright, EronDate: 2017-08-16T21:30:24Z [FLINK-6630] Implement FLIP-6 MesosAppMasterRunner [FLINK-6631] Implement FLIP-6 MesosTaskExecutorRunner - bin: new entrypoints scripts for flip-6 - ClusterEntrypoint: Refactor the shutdown method - ClusterEntrypoint: Install default FileSystem (for parity with legacy entrypoints) - ClusterEntrypoint: new MesosJobClusterEntrypoint, MesosSessionClusterEntrypoint, MesosEntrypointUtils, MesosTaskExecutorRunner - MesosServices: enhanced with artifactServer, localActorSystem - MesosResourceManager: Fallback to old TM params when UNKNOWN resource profile is provided - MesosResourceManager: config setting for taskmanager startup script (mesos.resourcemanager.tasks.taskmanager-cmd) - test: added a 'noop' job graph for testing purposes > Implement FLIP-6 MesosAppMasterRunner > - > > Key: FLINK-6630 > URL: https://issues.apache.org/jira/browse/FLINK-6630 > Project: Flink > Issue Type: Sub-task > Components: Mesos >Reporter: Eron Wright >Assignee: Eron Wright > > A new runner must be developed for the FLIP-6 RM. Target the "single job" > scenario. > Take some time to consider a general solution or a base implementation that > is shared with the old implementation. -- This message was sent by Atlassian JIRA (v6.4.14#64029)