[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976366#comment-15976366 ] ASF GitHub Bot commented on FLINK-1984: --- Github user sgran commented on the issue: https://github.com/apache/flink/pull/3586 I've closed this, opened a JIRA, and rebased the commit onto master. The new PR is #3744 > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Fix For: 1.2.0 > > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Update (Oct '16): the core functionality is in the master branch. New > sub-tasks track remaining work for a first release. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976359#comment-15976359 ] ASF GitHub Bot commented on FLINK-1984: --- Github user sgran closed the pull request at: https://github.com/apache/flink/pull/3586 > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Fix For: 1.2.0 > > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Update (Oct '16): the core functionality is in the master branch. New > sub-tasks track remaining work for a first release. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973298#comment-15973298 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/3586 Thanks for this contribution, I'm glad to see Flink take advantage of Fenzo's considerable power. Please open a new bug for this, since FLINK-1984 is closed, assign it to yourself, and update the PR description accordingly.Mark the bug with 'Mesos' component. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Fix For: 1.2.0 > > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Update (Oct '16): the core functionality is in the master branch. New > sub-tasks track remaining work for a first release. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973293#comment-15973293 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/3586#discussion_r112036109 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java --- @@ -166,7 +191,36 @@ public static MesosTaskManagerParameters create(Configuration flinkConfig) { cpus, containerType, Option.apply(imageName), - containeredParameters); + containeredParameters, + constraints); + } + + private static List parseConstraints(String mesosConstraints) { + List constraints = new ArrayList<>(); + + if (mesosConstraints != null) { + for (String constraint : Arrays.asList(mesosConstraints.split(","))) { + if (constraint.isEmpty()) { + continue; + } + final List constraintList = Arrays.asList(constraint.split(":")); + if (constraintList.size() != 2) { + continue; + } + addConstraint(constraints, constraintList); + } + } + + return constraints; + } + + private static void addConstraint(List constraints, final List constraintList) { --- End diff -- Would you mind replacing `constraintList` with a pair of arguments for the attr name and value, and rename the `addConstraint` method to `addHostAttrValueConstraint`? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Fix For: 1.2.0 > > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Update (Oct '16): the core functionality is in the master branch. New > sub-tasks track remaining work for a first release. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15973292#comment-15973292 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/3586#discussion_r112036914 --- Diff: flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParametersTest.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import org.apache.flink.configuration.Configuration; +import org.junit.Test; + +import com.netflix.fenzo.ConstraintEvaluator; +import com.netflix.fenzo.functions.Func1; +import com.netflix.fenzo.plugins.HostAttrValueConstraint; + +public class MesosTaskManagerParametersTest { + + +@Test +public void givenTwoConstraintsInConfigShouldBeParsed() throws Exception { + +MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withConfiguration("cluster:foo,az:eu-west-1")); +assertThat(mesosTaskManagerParameters.constraints().size(), is(2)); +ConstraintEvaluator firstConstraintEvaluator = new HostAttrValueConstraint("cluster", new Func1() { +@Override +public String call(String s) { +return "foo"; +} +}); +ConstraintEvaluator secondConstraintEvaluator = new HostAttrValueConstraint("az", new Func1 () { +@Override +public String call(String s) { +return "foo"; +} +}); + assertThat(mesosTaskManagerParameters.constraints().get(0).getName(), is(firstConstraintEvaluator.getName())); + assertThat(mesosTaskManagerParameters.constraints().get(1).getName(), is(secondConstraintEvaluator.getName())); + +} + +@Test +public void givenOneConstraintInConfigShouldBeParsed() throws Exception { + +MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withConfiguration("cluster:foo")); +assertThat(mesosTaskManagerParameters.constraints().size(), is(1)); +ConstraintEvaluator firstConstraintEvaluator = new HostAttrValueConstraint("cluster", new Func1 () { +@Override +public String call(String s) { +return "foo"; +} +}); + assertThat(mesosTaskManagerParameters.constraints().get(0).getName(), is(firstConstraintEvaluator.getName())); +} + +@Test +public void givenEmptyConstraintInConfigShouldBeParsed() throws Exception { + +MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withConfiguration("")); +assertThat(mesosTaskManagerParameters.constraints().size(), is(0)); +} + +@Test +public void givenInvalidConstraintInConfigShouldBeParsed() throws Exception { + +MesosTaskManagerParameters mesosTaskManagerParameters = MesosTaskManagerParameters.create(withConfiguration(",:,")); +assertThat(mesosTaskManagerParameters.constraints().size(), is(0)); +} + + +private static Configuration withConfiguration(final String configuration) { --- End diff -- This method is too specific for its name, given that the class is called `MesosTaskManagerParametersTest`. Maybe it would be better to take a varargs of `Tuple2 ` to make it more generic. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL:
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972654#comment-15972654 ] ASF GitHub Bot commented on FLINK-1984: --- Github user sgran closed the pull request at: https://github.com/apache/flink/pull/3586 > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Fix For: 1.2.0 > > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Update (Oct '16): the core functionality is in the master branch. New > sub-tasks track remaining work for a first release. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15972655#comment-15972655 ] ASF GitHub Bot commented on FLINK-1984: --- GitHub user sgran reopened a pull request: https://github.com/apache/flink/pull/3586 [FLINK-1984] initial commit of hard constraint evaluator This is related to the work in FLINK-1984. In earlier patch sets, mesos constraints were evaluated, but that appears to have been dropped in the fenzo code, and now all constraint evaluators return null. This is a start of exposing the fenzo constraint system, and only exposes a minimal subset of that functionality for now, hopefully in a way that allows it to be extended by later authors. Signed-off-by: Stephen GranYou can merge this pull request into a Git repository by running: $ git pull https://github.com/pikselpalette/flink add_constraints Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3586.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3586 commit 67026533afd15c1572ac17d75119656897b03f52 Author: Stephen Gran Date: 2017-03-21T12:36:03Z initial commit of hard constraint evaluator Signed-off-by: Stephen Gran commit 5e9413122bfdbcb6bdec6c62fb0cf99e5450572b Author: Stephen Gran Date: 2017-03-22T10:27:56Z add license header to unit test Signed-off-by: Stephen Gran > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Fix For: 1.2.0 > > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Update (Oct '16): the core functionality is in the master branch. New > sub-tasks track remaining work for a first release. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15934673#comment-15934673 ] ASF GitHub Bot commented on FLINK-1984: --- GitHub user sgran opened a pull request: https://github.com/apache/flink/pull/3586 [FLINK-1984] initial commit of hard constraint evaluator This is related to the work in FLINK-1984. In earlier patch sets, mesos constraints were evaluated, but that appears to have been dropped in the fenzo code, and now all constraint evaluators return null. This is a start of exposing the fenzo constraint system, and only exposes a minimal subset of that functionality for now, hopefully in a way that allows it to be extended by later authors. Signed-off-by: Stephen GranYou can merge this pull request into a Git repository by running: $ git pull https://github.com/pikselpalette/flink add_constraints Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3586.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3586 > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management, Mesos >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Fix For: 1.2.0 > > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Update (Oct '16): the core functionality is in the master branch. New > sub-tasks track remaining work for a first release. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15446201#comment-15446201 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2315 Thank you so much for your contribution @EronWright! Looking forward to see the Mesos integration in Flink grow :) > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15446195#comment-15446195 ] ASF GitHub Bot commented on FLINK-1984: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2315 > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15446101#comment-15446101 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2315 Fixing the errors and trying to rebase your changes on top of the latest master now. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15446098#comment-15446098 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r76621116 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala --- @@ -0,0 +1,172 @@ +/* --- End diff -- This Scala file is in /java. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437131#comment-15437131 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2315 There are some compilation issues in the build logs: ``` [ERROR] COMPILATION ERROR : [INFO] - [ERROR] /home/jenkins/jenkins-slave/workspace/flink-github-ci/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java:[21,33] package com.google.common.collect does not exist [ERROR] /home/jenkins/jenkins-slave/workspace/flink-github-ci/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java:[65,24] cannot find symbol ``` Could you rebase to the latest master and also remove any merge commits? Thanks! > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15437032#comment-15437032 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/2315 @mxm ready > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15434810#comment-15434810 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2315 Let us know when you have made the last changes. I think we could then go ahead and merge this PR. I'm getting excited about this to be finally in the master :flushed: > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15430382#comment-15430382 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75642882 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/ConnectionMonitor.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, FSM, Props} +import grizzled.slf4j.Logger +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.ConnectionMonitor._ +import org.apache.flink.mesos.scheduler.messages._ + +import scala.concurrent.duration._ + +/** + * Actively monitors the Mesos connection. + */ +class ConnectionMonitor() extends Actor with FSM[FsmState, Unit] { + + val LOG = Logger(getClass) + + startWith(StoppedState, None) + + when(StoppedState) { +case Event(msg: Start, _) => + LOG.info(s"Connecting to Mesos...") + goto(ConnectingState) + } + + when(ConnectingState, stateTimeout = CONNECT_RETRY_RATE) { +case Event(msg: Stop, _) => + goto(StoppedState) + +case Event(msg: Registered, _) => + LOG.info(s"Connected to Mesos as framework ID ${msg.frameworkId.getValue}.") + LOG.debug(s" Master Info: ${msg.masterInfo}") + goto(ConnectedState) + +case Event(msg: ReRegistered, _) => + LOG.info("Reconnected to a new Mesos master.") + LOG.debug(s" Master Info: ${msg.masterInfo}") + goto(ConnectedState) + +case Event(StateTimeout, _) => + LOG.warn("Unable to connect to Mesos; still trying...") + stay() + } + + when(ConnectedState) { +case Event(msg: Stop, _) => + goto(StoppedState) + +case Event(msg: Disconnected, _) => + LOG.warn("Disconnected from the Mesos master. Reconnecting...") + goto(ConnectingState) + } + --- End diff -- The default is to just log a warning and drop the message? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15430377#comment-15430377 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75642452 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import com.netflix.fenzo.ConstraintEvaluator; +import com.netflix.fenzo.TaskAssignmentResult; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.VMTaskFitnessCalculator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.mesos.Protos; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.mesos.Utils.variable; +import static org.apache.flink.mesos.Utils.range; +import static org.apache.flink.mesos.Utils.ranges; +import static org.apache.flink.mesos.Utils.scalar; + +/** + * Specifies how to launch a Mesos worker. + */ +public class LaunchableMesosWorker implements LaunchableTask { + + /** +* The set of configuration keys to be dynamically configured with a port allocated from Mesos. +*/ + private static String[] TM_PORT_KEYS = { + "taskmanager.rpc.port", + "taskmanager.data.port" }; + + private final MesosTaskManagerParameters params; + private final Protos.TaskInfo.Builder template; + private final Protos.TaskID taskID; + private final Request taskRequest; + + /** +* Construct a launchable Mesos worker. +* @param params the TM parameters such as memory, cpu to acquire. +* @param template a template for the TaskInfo to be constructed at launch time. +* @param taskID the taskID for this worker. + */ + public LaunchableMesosWorker(MesosTaskManagerParameters params, Protos.TaskInfo.Builder template, Protos.TaskID taskID) { + this.params = params; + this.template = template; + this.taskID = taskID; + this.taskRequest = new Request(); + } + + public Protos.TaskID taskID() { + return taskID; + } + + @Override + public TaskRequest taskRequest() { + return taskRequest; + } + + class Request implements TaskRequest { + private final AtomicReference assignedResources = new AtomicReference<>(); + + @Override + public String getId() { + return taskID.getValue(); + } + + @Override + public String taskGroupName() { + return ""; + } + + @Override + public double getCPUs() { + return params.cpus(); + } + + @Override + public double getMemory() { + return params.containeredParameters().taskManagerTotalMemoryMB(); + } + + @Override + public double getNetworkMbps() { + return 0.0; + } + + @Override + public double getDisk() { + return 0.0; --- End diff -- So 0.0 means give me whatever? How about a minimum value, e.g. 500MB? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink >
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15430372#comment-15430372 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75641703 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.cli; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.Configuration; + +import java.io.IOException; +import java.util.Map; + +public class FlinkMesosSessionCli { --- End diff -- Yes, I recognized it :) Sure! No problem. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15430369#comment-15430369 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75641515 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +/** + * The Mesos environment variables used for settings of the containers. + */ +public class MesosConfigKeys { --- End diff -- Yes, a follow-up is fine. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15430367#comment-15430367 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75641421 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15429544#comment-15429544 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75587011 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import com.netflix.fenzo.ConstraintEvaluator; +import com.netflix.fenzo.TaskAssignmentResult; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.VMTaskFitnessCalculator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.mesos.Protos; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.mesos.Utils.variable; +import static org.apache.flink.mesos.Utils.range; +import static org.apache.flink.mesos.Utils.ranges; +import static org.apache.flink.mesos.Utils.scalar; + +/** + * Specifies how to launch a Mesos worker. + */ +public class LaunchableMesosWorker implements LaunchableTask { + + /** +* The set of configuration keys to be dynamically configured with a port allocated from Mesos. +*/ + private static String[] TM_PORT_KEYS = { + "taskmanager.rpc.port", + "taskmanager.data.port" }; + + private final MesosTaskManagerParameters params; + private final Protos.TaskInfo.Builder template; + private final Protos.TaskID taskID; + private final Request taskRequest; + + /** +* Construct a launchable Mesos worker. +* @param params the TM parameters such as memory, cpu to acquire. +* @param template a template for the TaskInfo to be constructed at launch time. +* @param taskID the taskID for this worker. + */ + public LaunchableMesosWorker(MesosTaskManagerParameters params, Protos.TaskInfo.Builder template, Protos.TaskID taskID) { + this.params = params; + this.template = template; + this.taskID = taskID; + this.taskRequest = new Request(); + } + + public Protos.TaskID taskID() { + return taskID; + } + + @Override + public TaskRequest taskRequest() { + return taskRequest; + } + + class Request implements TaskRequest { + private final AtomicReference assignedResources = new AtomicReference<>(); + + @Override + public String getId() { + return taskID.getValue(); + } + + @Override + public String taskGroupName() { + return ""; + } + + @Override + public double getCPUs() { + return params.cpus(); + } + + @Override + public double getMemory() { + return params.containeredParameters().taskManagerTotalMemoryMB(); + } + + @Override + public double getNetworkMbps() { + return 0.0; + } + + @Override + public double getDisk() { + return 0.0; --- End diff -- To be clear, this code implements the Fenzo `TaskRequest` interface, and `getDisk` must return a value. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL:
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15427212#comment-15427212 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75395325 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Handler; +import io.netty.handler.codec.http.router.Routed; +import io.netty.handler.codec.http.router.Router; +import io.netty.util.CharsetUtil; +import org.jets3t.service.utils.Mimetypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.RandomAccessFile; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URL; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpMethod.GET; +import static io.netty.handler.codec.http.HttpMethod.HEAD; +import static io.netty.handler.codec.http.HttpResponseStatus.GONE; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + + +/** + * A generic Mesos artifact server, designed specifically for use by the Mesos Fetcher. + * + * More information: + * http://mesos.apache.org/documentation/latest/fetcher/ + * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/ + */ +public class MesosArtifactServer { + + private static final Logger LOG = LoggerFactory.getLogger(MesosArtifactServer.class); + + private final Router router; + + private ServerBootstrap bootstrap; + + private Channel serverChannel; + + private URL baseURL; --- End diff -- Will tackle later in follow-up task, since I am making changes to the artifact server for the dispatcher's purposes. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature >
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426687#comment-15426687 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/2315 Thanks for the review @tillrohrmann @mxm and @StephanEwen.I'm addressing the feedback with a follow-up commit to be completed ASAP. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426682#comment-15426682 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75335049 --- Diff: flink-mesos/src/test/scala/org/apache/flink/mesos/scheduler/LaunchCoordinatorTest.scala --- @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import java.util.{Collections, UUID} +import java.util.concurrent.atomic.AtomicReference + +import akka.actor.FSM.StateTimeout +import akka.testkit._ +import com.netflix.fenzo.TaskRequest.{AssignedResources, NamedResourceSetRequest} +import com.netflix.fenzo._ +import com.netflix.fenzo.functions.{Action1, Action2} +import com.netflix.fenzo.plugins.VMLeaseObject +import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.LaunchCoordinator._ +import org.apache.flink.mesos.scheduler.messages._ +import org.apache.flink.runtime.akka.AkkaUtils +import org.apache.mesos.Protos.{SlaveID, TaskInfo} +import org.apache.mesos.{SchedulerDriver, Protos} +import org.junit.runner.RunWith +import org.mockito.Mockito.{verify, _} +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.mockito.{Matchers => MM, Mockito} +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike} + +import scala.collection.JavaConverters._ + +import org.apache.flink.mesos.Utils.range +import org.apache.flink.mesos.Utils.ranges +import org.apache.flink.mesos.Utils.scalar + +@RunWith(classOf[JUnitRunner]) +class LaunchCoordinatorTest + extends TestKitBase +with ImplicitSender +with WordSpecLike +with Matchers +with BeforeAndAfterAll { + + lazy val config = new Configuration() + implicit lazy val system = AkkaUtils.createLocalActorSystem(config) + + override def afterAll(): Unit = { +TestKit.shutdownActorSystem(system) + } + + def randomFramework = { + Protos.FrameworkID.newBuilder().setValue(UUID.randomUUID.toString).build + } + + def randomTask = { +val taskID = Protos.TaskID.newBuilder.setValue(UUID.randomUUID.toString).build + +def generateTaskRequest = { + new TaskRequest() { +private[mesos] val assignedResources = new AtomicReference[TaskRequest.AssignedResources] +override def getId: String = taskID.getValue +override def taskGroupName: String = "" +override def getCPUs: Double = 1.0 +override def getMemory: Double = 1024.0 +override def getNetworkMbps: Double = 0.0 +override def getDisk: Double = 0.0 +override def getPorts: Int = 1 +override def getCustomNamedResources: java.util.Map[String, NamedResourceSetRequest] = + Collections.emptyMap[String, NamedResourceSetRequest] +override def getSoftConstraints: java.util.List[_ <: VMTaskFitnessCalculator] = null +override def getHardConstraints: java.util.List[_ <: ConstraintEvaluator] = null +override def getAssignedResources: AssignedResources = assignedResources.get() +override def setAssignedResources(assignedResources: AssignedResources): Unit = { + this.assignedResources.set(assignedResources) +} + } +} + +val task: LaunchableTask = new LaunchableTask() { + override def taskRequest: TaskRequest = generateTaskRequest + override def launch(slaveId: SlaveID, taskAssignment: TaskAssignmentResult): Protos.TaskInfo = { +Protos.TaskInfo.newBuilder +
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426668#comment-15426668 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75333536 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, ActorRef, Props} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator.Reconcile +import org.apache.flink.mesos.scheduler.TaskMonitor.{TaskGoalState, TaskGoalStateUpdated, TaskTerminated} +import org.apache.flink.mesos.scheduler.Tasks._ +import org.apache.flink.mesos.scheduler.messages._ +import org.apache.mesos.{SchedulerDriver, Protos} + +import scala.collection.mutable.{Map => MutableMap} + +/** + * Aggregate of monitored tasks. + * + * Routes messages between the scheduler and individual task monitor actors. + */ +class Tasks[M <: TaskMonitor]( + flinkConfig: Configuration, + schedulerDriver: SchedulerDriver, + taskMonitorClass: Class[M]) extends Actor { + + /** +* A map of task monitors by task ID. +*/ + private val taskMap: MutableMap[Protos.TaskID,ActorRef] = MutableMap() + + /** +* Cache of current connection state. +*/ + private var registered: Option[Any] = None + + override def preStart(): Unit = { +// TODO subscribe to context.system.deadLetters for messages to nonexistent tasks + } + + override def receive: Receive = { + +case msg: Disconnected => + registered = None + context.actorSelection("*").tell(msg, self) + +case msg : Connected => + registered = Some(msg) + context.actorSelection("*").tell(msg, self) + +case msg: TaskGoalStateUpdated => + val taskID = msg.state.taskID + + // ensure task monitor exists + if(!taskMap.contains(taskID)) { +val actorRef = createTask(msg.state) +registered.foreach(actorRef ! _) + } + + taskMap(taskID) ! msg + +case msg: StatusUpdate => + taskMap(msg.status().getTaskId) ! msg + +case msg: Reconcile => + context.parent.forward(msg) --- End diff -- Actually this code is generic; at best we could say it is the scheduler (which is actually the RM or the dispatcher). I think the argument is stronger for this reference to be explicit than for the TaskMonitor case, because `Tasks` and `TaskMonitor` are coupled by design (one is the aggregate for the other). I'll fix this reference if I have time. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426646#comment-15426646 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75331224 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/ConnectionMonitor.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, FSM, Props} +import grizzled.slf4j.Logger +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.ConnectionMonitor._ +import org.apache.flink.mesos.scheduler.messages._ + +import scala.concurrent.duration._ + +/** + * Actively monitors the Mesos connection. + */ +class ConnectionMonitor() extends Actor with FSM[FsmState, Unit] { + + val LOG = Logger(getClass) + + startWith(StoppedState, None) + + when(StoppedState) { +case Event(msg: Start, _) => + LOG.info(s"Connecting to Mesos...") + goto(ConnectingState) + } + + when(ConnectingState, stateTimeout = CONNECT_RETRY_RATE) { +case Event(msg: Stop, _) => + goto(StoppedState) + +case Event(msg: Registered, _) => + LOG.info(s"Connected to Mesos as framework ID ${msg.frameworkId.getValue}.") + LOG.debug(s" Master Info: ${msg.masterInfo}") + goto(ConnectedState) + +case Event(msg: ReRegistered, _) => + LOG.info("Reconnected to a new Mesos master.") + LOG.debug(s" Master Info: ${msg.masterInfo}") + goto(ConnectedState) + +case Event(StateTimeout, _) => + LOG.warn("Unable to connect to Mesos; still trying...") + stay() + } + + when(ConnectedState) { +case Event(msg: Stop, _) => + goto(StoppedState) + +case Event(msg: Disconnected, _) => + LOG.warn("Disconnected from the Mesos master. Reconnecting...") + goto(ConnectingState) + } + --- End diff -- My rationale has been to let the default handling occur for unhandled events. When I do use the `whenUnhandled` block, it is for common code.Do tell if you see an event I should be handling, or I otherwise misunderstood your comment. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426661#comment-15426661 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75332580 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/TaskMonitor.scala --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import grizzled.slf4j.Logger + +import akka.actor.{Actor, FSM, Props} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator.Reconcile +import org.apache.flink.mesos.scheduler.TaskMonitor._ +import org.apache.flink.mesos.scheduler.messages.{Connected, Disconnected, StatusUpdate} +import org.apache.mesos.Protos.TaskState._ +import org.apache.mesos.{SchedulerDriver, Protos} + +import scala.PartialFunction.empty +import scala.concurrent.duration._ + +/** + * Monitors a Mesos task throughout its lifecycle. + * + * Models a task with a state machine reflecting the perceived state of the task in Mesos. The state + * is primarily updated when task status information arrives from Mesos. + * + * The associated state data primarily tracks the task's goal (intended) state, as persisted by the scheduler. + * Keep in mind that goal state is persisted before actions are taken. The goal state strictly transitions + * thru New->Launched->Released. + * + * Unlike most exchanges with Mesos, task status is delivered at-least-once, so status handling should be idempotent. + */ +class TaskMonitor( +flinkConfig: Configuration, +schedulerDriver: SchedulerDriver, +goalState: TaskGoalState) extends Actor with FSM[TaskMonitorState,StateData] { + + val LOG = Logger(getClass) + + startWith(Suspended, StateData(goalState)) + + // + // Suspended State + // + + when(Suspended) { +case Event(update: TaskGoalStateUpdated, _) => + stay() using StateData(update.state) +case Event(msg: StatusUpdate, _) => + stay() +case Event(msg: Connected, StateData(goal: New)) => + goto(New) +case Event(msg: Connected, StateData(goal: Launched)) => + goto(Reconciling) +case Event(msg: Connected, StateData(goal: Released)) => + goto(Killing) + } + + // + // New State + // + + when(New) { +case Event(TaskGoalStateUpdated(goal: Launched), _) => + goto(Staging) using StateData(goal) + } + + // + // Reconciliation State + // + + onTransition { +case _ -> Reconciling => + nextStateData.goal match { +case goal: Launched => + val taskStatus = Protos.TaskStatus.newBuilder() + .setTaskId(goal.taskID).setSlaveId(goal.slaveID).setState(TASK_STAGING).build() + context.parent ! Reconcile(Seq(taskStatus)) --- End diff -- I agree that the alternative to using the implicit parent reference is to always use an explicit reference. It can be unit tested either way. I actually considered the latter but couldn't think of a nice name for the explicit reference. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL:
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426595#comment-15426595 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75325649 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426589#comment-15426589 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75325271 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +/** + * The Mesos environment variables used for settings of the containers. + */ +public class MesosConfigKeys { --- End diff -- I considered that but chose to minimize the impact on the YARN code to avoid merge conflicts for such a large PR.How about we tackle this in a follow-up? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426570#comment-15426570 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75323135 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler; + +import akka.actor.ActorRef; + +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.SlaveLost; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.mesos.Protos; +import org.apache.mesos.Scheduler; +import org.apache.mesos.SchedulerDriver; + +import java.util.List; + +/** + * This class reacts to callbacks from the Mesos scheduler driver. + * + * In order to preserve actor concurrency safety, this class simply sends + * corresponding messages to the Mesos resource master actor. + * + * See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html + */ +public class SchedulerProxy implements Scheduler { + + /** The actor to which we report the callbacks */ + private ActorRef mesosActor; --- End diff -- @mxm it is generic because both the RM and the Dispatcher uses the code in the `scheduler` package. I'm guessing you were thinking the field's name could be more concrete. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426550#comment-15426550 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75319776 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426458#comment-15426458 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2315 Thank you for your work @EronWright. Finally could go through the code. All in all, very impressive as the first step of the Mesos integration! I think this PR is in a mergeable state if some minor comments are addressed. I'm not 100 % sure about all the additional actors yet. It seems like `ReconciliationCoordinator`, `ConnectionMonitor`, `TaskMonitor` could also easily be handled inside `MesosFlinkResourceManager`. In terms of modularity, I can see that having these run independently can give us a more flexible setup. Which of the actors do you plan to re-use in the next set of changes? Clearly, in terms of testability it comes in really handy. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426378#comment-15426378 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75301829 --- Diff: flink-mesos/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala --- @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework + +import java.util.concurrent.{TimeUnit, ExecutorService} + +import akka.actor.ActorRef + +import org.apache.flink.api.common.JobID +import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants} +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory +import org.apache.flink.runtime.clusterframework.ApplicationStatus +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory +import org.apache.flink.runtime.clusterframework.messages._ +import org.apache.flink.runtime.jobgraph.JobStatus +import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager} +import org.apache.flink.runtime.leaderelection.LeaderElectionService +import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, CurrentJobStatus, JobNotFound} +import org.apache.flink.runtime.messages.Messages.Acknowledge +import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry} +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager +import org.apache.flink.runtime.instance.InstanceManager +import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} + +import scala.concurrent.duration._ +import scala.language.postfixOps + + +/** JobManager actor for execution on Yarn or Mesos. It enriches the [[JobManager]] with additional messages + * to start/administer/stop the session. + * + * @param flinkConfiguration Configuration object for the actor + * @param executorService Execution context which is used to execute concurrent tasks in the + * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]] + * @param instanceManager Instance manager to manage the registered + * [[org.apache.flink.runtime.taskmanager.TaskManager]] + * @param scheduler Scheduler to schedule Flink jobs + * @param libraryCacheManager Manager to manage uploaded jar files + * @param archive Archive for finished Flink jobs + * @param restartStrategyFactory Restart strategy to be used in case of a job recovery + * @param timeout Timeout for futures + * @param leaderElectionService LeaderElectionService to participate in the leader election + */ +abstract class ContaineredJobManager( + flinkConfiguration: FlinkConfiguration, + executorService: ExecutorService, + instanceManager: InstanceManager, + scheduler: FlinkScheduler, + libraryCacheManager: BlobLibraryCacheManager, + archive: ActorRef, + restartStrategyFactory: RestartStrategyFactory, + timeout: FiniteDuration, + leaderElectionService: LeaderElectionService, + submittedJobGraphs : SubmittedJobGraphStore, + checkpointRecoveryFactory : CheckpointRecoveryFactory, + savepointStore: SavepointStore, + jobRecoveryTimeout: FiniteDuration, + metricsRegistry: Option[FlinkMetricRegistry]) + extends JobManager( +flinkConfiguration, +executorService, +instanceManager, +
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426377#comment-15426377 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75301383 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, ActorRef, Props} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator.Reconcile +import org.apache.flink.mesos.scheduler.TaskMonitor.{TaskGoalState, TaskGoalStateUpdated, TaskTerminated} +import org.apache.flink.mesos.scheduler.Tasks._ +import org.apache.flink.mesos.scheduler.messages._ +import org.apache.mesos.{SchedulerDriver, Protos} + +import scala.collection.mutable.{Map => MutableMap} + +/** + * Aggregate of monitored tasks. + * + * Routes messages between the scheduler and individual task monitor actors. + */ +class Tasks[M <: TaskMonitor]( + flinkConfig: Configuration, + schedulerDriver: SchedulerDriver, + taskMonitorClass: Class[M]) extends Actor { + + /** +* A map of task monitors by task ID. +*/ + private val taskMap: MutableMap[Protos.TaskID,ActorRef] = MutableMap() + + /** +* Cache of current connection state. +*/ + private var registered: Option[Any] = None + + override def preStart(): Unit = { +// TODO subscribe to context.system.deadLetters for messages to nonexistent tasks + } + + override def receive: Receive = { + +case msg: Disconnected => + registered = None + context.actorSelection("*").tell(msg, self) + +case msg : Connected => + registered = Some(msg) + context.actorSelection("*").tell(msg, self) + +case msg: TaskGoalStateUpdated => + val taskID = msg.state.taskID + + // ensure task monitor exists + if(!taskMap.contains(taskID)) { +val actorRef = createTask(msg.state) +registered.foreach(actorRef ! _) + } + + taskMap(taskID) ! msg + +case msg: StatusUpdate => + taskMap(msg.status().getTaskId) ! msg + +case msg: Reconcile => + context.parent.forward(msg) --- End diff -- Same as above. The parent is the resource manager. Do we want to make this explicit? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426375#comment-15426375 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75300879 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, ActorRef, Props} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator.Reconcile +import org.apache.flink.mesos.scheduler.TaskMonitor.{TaskGoalState, TaskGoalStateUpdated, TaskTerminated} +import org.apache.flink.mesos.scheduler.Tasks._ +import org.apache.flink.mesos.scheduler.messages._ +import org.apache.mesos.{SchedulerDriver, Protos} + +import scala.collection.mutable.{Map => MutableMap} + +/** + * Aggregate of monitored tasks. + * + * Routes messages between the scheduler and individual task monitor actors. + */ +class Tasks[M <: TaskMonitor]( + flinkConfig: Configuration, + schedulerDriver: SchedulerDriver, + taskMonitorClass: Class[M]) extends Actor { + + /** +* A map of task monitors by task ID. +*/ + private val taskMap: MutableMap[Protos.TaskID,ActorRef] = MutableMap() --- End diff -- space after Protos.TaskID, > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426372#comment-15426372 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75300491 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/TaskMonitor.scala --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import grizzled.slf4j.Logger + +import akka.actor.{Actor, FSM, Props} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator.Reconcile +import org.apache.flink.mesos.scheduler.TaskMonitor._ +import org.apache.flink.mesos.scheduler.messages.{Connected, Disconnected, StatusUpdate} +import org.apache.mesos.Protos.TaskState._ +import org.apache.mesos.{SchedulerDriver, Protos} + +import scala.PartialFunction.empty +import scala.concurrent.duration._ + +/** + * Monitors a Mesos task throughout its lifecycle. + * + * Models a task with a state machine reflecting the perceived state of the task in Mesos. The state + * is primarily updated when task status information arrives from Mesos. + * + * The associated state data primarily tracks the task's goal (intended) state, as persisted by the scheduler. + * Keep in mind that goal state is persisted before actions are taken. The goal state strictly transitions + * thru New->Launched->Released. + * + * Unlike most exchanges with Mesos, task status is delivered at-least-once, so status handling should be idempotent. + */ +class TaskMonitor( +flinkConfig: Configuration, +schedulerDriver: SchedulerDriver, +goalState: TaskGoalState) extends Actor with FSM[TaskMonitorState,StateData] { + + val LOG = Logger(getClass) + + startWith(Suspended, StateData(goalState)) + + // + // Suspended State + // + + when(Suspended) { +case Event(update: TaskGoalStateUpdated, _) => + stay() using StateData(update.state) +case Event(msg: StatusUpdate, _) => + stay() +case Event(msg: Connected, StateData(goal: New)) => + goto(New) +case Event(msg: Connected, StateData(goal: Launched)) => + goto(Reconciling) +case Event(msg: Connected, StateData(goal: Released)) => + goto(Killing) + } + + // + // New State + // + + when(New) { +case Event(TaskGoalStateUpdated(goal: Launched), _) => + goto(Staging) using StateData(goal) + } + + // + // Reconciliation State + // + + onTransition { +case _ -> Reconciling => + nextStateData.goal match { +case goal: Launched => + val taskStatus = Protos.TaskStatus.newBuilder() + .setTaskId(goal.taskID).setSlaveId(goal.slaveID).setState(TASK_STAGING).build() + context.parent ! Reconcile(Seq(taskStatus)) --- End diff -- Would it be cleaner to pass the `ActorRef` directly to the TaskMonitor? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426361#comment-15426361 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75299022 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala --- @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, ActorRef, FSM, Props} +import com.netflix.fenzo._ +import com.netflix.fenzo.functions.Action1 +import com.netflix.fenzo.plugins.VMLeaseObject +import grizzled.slf4j.Logger +import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.LaunchCoordinator._ +import org.apache.flink.mesos.scheduler.messages._ +import org.apache.mesos.Protos.TaskInfo --- End diff -- Unused import > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426356#comment-15426356 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75298678 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala --- @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, ActorRef, FSM, Props} +import com.netflix.fenzo._ +import com.netflix.fenzo.functions.Action1 +import com.netflix.fenzo.plugins.VMLeaseObject +import grizzled.slf4j.Logger +import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.LaunchCoordinator._ +import org.apache.flink.mesos.scheduler.messages._ +import org.apache.mesos.Protos.TaskInfo +import org.apache.mesos.{SchedulerDriver, Protos} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{Map => MutableMap} +import scala.concurrent.duration._ + +/** + * The launch coordinator handles offer processing, including + * matching offers to tasks and making reservations. + * + * The coordinator uses Netflix Fenzo to optimize task placement. During the GatheringOffers phase, --- End diff -- Fenzo also has my endorsement. It makes sense to delegate scheduling logic to a dedicated library. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426352#comment-15426352 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75297945 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/ConnectionMonitor.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, FSM, Props} +import grizzled.slf4j.Logger +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.ConnectionMonitor._ +import org.apache.flink.mesos.scheduler.messages._ + +import scala.concurrent.duration._ + +/** + * Actively monitors the Mesos connection. + */ +class ConnectionMonitor() extends Actor with FSM[FsmState, Unit] { + + val LOG = Logger(getClass) + + startWith(StoppedState, None) + + when(StoppedState) { +case Event(msg: Start, _) => + LOG.info(s"Connecting to Mesos...") + goto(ConnectingState) + } + + when(ConnectingState, stateTimeout = CONNECT_RETRY_RATE) { +case Event(msg: Stop, _) => + goto(StoppedState) + +case Event(msg: Registered, _) => + LOG.info(s"Connected to Mesos as framework ID ${msg.frameworkId.getValue}.") + LOG.debug(s" Master Info: ${msg.masterInfo}") + goto(ConnectedState) + +case Event(msg: ReRegistered, _) => + LOG.info("Reconnected to a new Mesos master.") + LOG.debug(s" Master Info: ${msg.masterInfo}") + goto(ConnectedState) + +case Event(StateTimeout, _) => + LOG.warn("Unable to connect to Mesos; still trying...") + stay() + } + + when(ConnectedState) { +case Event(msg: Stop, _) => + goto(StoppedState) + +case Event(msg: Disconnected, _) => + LOG.warn("Disconnected from the Mesos master. Reconnecting...") + goto(ConnectingState) + } + --- End diff -- Would it make sense to add a `whenUnhandled {...}` handler here? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426339#comment-15426339 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75296757 --- Diff: flink-mesos/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala --- @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework + +import java.util.concurrent.{TimeUnit, ExecutorService} + +import akka.actor.ActorRef + +import org.apache.flink.api.common.JobID +import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants} +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory +import org.apache.flink.runtime.clusterframework.ApplicationStatus +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory +import org.apache.flink.runtime.clusterframework.messages._ +import org.apache.flink.runtime.jobgraph.JobStatus +import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager} +import org.apache.flink.runtime.leaderelection.LeaderElectionService +import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, CurrentJobStatus, JobNotFound} +import org.apache.flink.runtime.messages.Messages.Acknowledge +import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry} +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager +import org.apache.flink.runtime.instance.InstanceManager +import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} + +import scala.concurrent.duration._ +import scala.language.postfixOps + + +/** JobManager actor for execution on Yarn or Mesos. It enriches the [[JobManager]] with additional messages + * to start/administer/stop the session. --- End diff -- Good idea but this is yet to be integrated in the flink-yarn module. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426336#comment-15426336 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75296348 --- Diff: flink-mesos/src/main/resources/log4j.properties --- @@ -0,0 +1,27 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + + +# Convenience file for local debugging of the JobManager/TaskManager. +log4j.rootLogger=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n + +log4j.logger.org.apache.flink.mesos=DEBUG +log4j.logger.org.apache.flink.mesos.runtime.clusterframework.MesosFlinkResourceManager=INFO --- End diff -- Do we want to uncomment these two rules and keep the INFO default? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426329#comment-15426329 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75295536 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Handler; +import io.netty.handler.codec.http.router.Routed; +import io.netty.handler.codec.http.router.Router; +import io.netty.util.CharsetUtil; +import org.jets3t.service.utils.Mimetypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.RandomAccessFile; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URL; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpMethod.GET; +import static io.netty.handler.codec.http.HttpMethod.HEAD; +import static io.netty.handler.codec.http.HttpResponseStatus.GONE; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + + +/** + * A generic Mesos artifact server, designed specifically for use by the Mesos Fetcher. + * + * More information: + * http://mesos.apache.org/documentation/latest/fetcher/ + * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/ + */ +public class MesosArtifactServer { + + private static final Logger LOG = LoggerFactory.getLogger(MesosArtifactServer.class); + + private final Router router; + + private ServerBootstrap bootstrap; + + private Channel serverChannel; + + private URL baseURL; + + public MesosArtifactServer(String sessionID, String serverHostname, int configuredPort) throws Exception { + if (configuredPort < 0 || configuredPort > 0x) { + throw new IllegalArgumentException("File server port is invalid: " + configuredPort); + } + + router = new Router(); + + ChannelInitializer initializer =
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426327#comment-15426327 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75295468 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Handler; +import io.netty.handler.codec.http.router.Routed; +import io.netty.handler.codec.http.router.Router; +import io.netty.util.CharsetUtil; +import org.jets3t.service.utils.Mimetypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.RandomAccessFile; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URL; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpMethod.GET; +import static io.netty.handler.codec.http.HttpMethod.HEAD; +import static io.netty.handler.codec.http.HttpResponseStatus.GONE; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + + +/** + * A generic Mesos artifact server, designed specifically for use by the Mesos Fetcher. + * + * More information: + * http://mesos.apache.org/documentation/latest/fetcher/ + * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/ + */ +public class MesosArtifactServer { + + private static final Logger LOG = LoggerFactory.getLogger(MesosArtifactServer.class); + + private final Router router; + + private ServerBootstrap bootstrap; + + private Channel serverChannel; + + private URL baseURL; + + public MesosArtifactServer(String sessionID, String serverHostname, int configuredPort) throws Exception { + if (configuredPort < 0 || configuredPort > 0x) { + throw new IllegalArgumentException("File server port is invalid: " + configuredPort); + } + + router = new Router(); + + ChannelInitializer initializer =
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426242#comment-15426242 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75285623 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler; + +import akka.actor.ActorRef; + +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.SlaveLost; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.mesos.Protos; +import org.apache.mesos.Scheduler; +import org.apache.mesos.SchedulerDriver; + +import java.util.List; + +/** + * This class reacts to callbacks from the Mesos scheduler driver. + * + * In order to preserve actor concurrency safety, this class simply sends + * corresponding messages to the Mesos resource master actor. + * + * See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html + */ +public class SchedulerProxy implements Scheduler { + + /** The actor to which we report the callbacks */ + private ActorRef mesosActor; + + public SchedulerProxy(ActorRef mesosActor) { + this.mesosActor = mesosActor; + } + + @Override + public void registered(SchedulerDriver driver, Protos.FrameworkID frameworkId, Protos.MasterInfo masterInfo) { + mesosActor.tell(new Registered(frameworkId, masterInfo), ActorRef.noSender()); + } + + @Override + public void reregistered(SchedulerDriver driver, Protos.MasterInfo masterInfo) { + mesosActor.tell(new ReRegistered(masterInfo), ActorRef.noSender()); + } + + @Override + public void disconnected(SchedulerDriver driver) { + mesosActor.tell(new Disconnected(), ActorRef.noSender()); + } + + + @Override + public void resourceOffers(SchedulerDriver driver, List offers) { + mesosActor.tell(new ResourceOffers(offers), ActorRef.noSender()); + } + + @Override + public void offerRescinded(SchedulerDriver driver, Protos.OfferID offerId) { + mesosActor.tell(new OfferRescinded(offerId), ActorRef.noSender()); + } + + @Override + public void statusUpdate(SchedulerDriver driver, Protos.TaskStatus status) { + mesosActor.tell(new StatusUpdate(status), ActorRef.noSender()); + } + + @Override + public void frameworkMessage(SchedulerDriver driver, Protos.ExecutorID executorId, Protos.SlaveID slaveId, byte[] data) { + throw new UnsupportedOperationException("frameworkMessage is unexpected"); --- End diff -- What other messages could the framework send? Is it worth crashing the actor? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426244#comment-15426244 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75285727 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/TaskSchedulerBuilder.java --- @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler; + +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; + +/** + * A builder for the Fenzo task scheduler. + * + * Note that the Fenzo-provided {@link TaskScheduler.Builder} cannot be mocked, which motivates this interface. + */ +public interface TaskSchedulerBuilder { + TaskSchedulerBuilder withLeaseRejectAction(Action1 action); --- End diff -- new line would be nice :) > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426241#comment-15426241 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75285550 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler; + +import akka.actor.ActorRef; + +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.SlaveLost; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.mesos.Protos; +import org.apache.mesos.Scheduler; +import org.apache.mesos.SchedulerDriver; + +import java.util.List; + +/** + * This class reacts to callbacks from the Mesos scheduler driver. + * + * In order to preserve actor concurrency safety, this class simply sends + * corresponding messages to the Mesos resource master actor. + * + * See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html + */ +public class SchedulerProxy implements Scheduler { + + /** The actor to which we report the callbacks */ + private ActorRef mesosActor; + + public SchedulerProxy(ActorRef mesosActor) { + this.mesosActor = mesosActor; + } + + @Override + public void registered(SchedulerDriver driver, Protos.FrameworkID frameworkId, Protos.MasterInfo masterInfo) { + mesosActor.tell(new Registered(frameworkId, masterInfo), ActorRef.noSender()); + } + + @Override + public void reregistered(SchedulerDriver driver, Protos.MasterInfo masterInfo) { + mesosActor.tell(new ReRegistered(masterInfo), ActorRef.noSender()); + } + + @Override + public void disconnected(SchedulerDriver driver) { + mesosActor.tell(new Disconnected(), ActorRef.noSender()); + } + + + @Override + public void resourceOffers(SchedulerDriver driver, List offers) { + mesosActor.tell(new ResourceOffers(offers), ActorRef.noSender()); + } + + @Override + public void offerRescinded(SchedulerDriver driver, Protos.OfferID offerId) { + mesosActor.tell(new OfferRescinded(offerId), ActorRef.noSender()); + } + + @Override + public void statusUpdate(SchedulerDriver driver, Protos.TaskStatus status) { + mesosActor.tell(new StatusUpdate(status), ActorRef.noSender()); + } + + @Override + public void frameworkMessage(SchedulerDriver driver, Protos.ExecutorID executorId, Protos.SlaveID slaveId, byte[] data) { + throw new UnsupportedOperationException("frameworkMessage is unexpected"); + } + + @Override + public void slaveLost(SchedulerDriver driver, Protos.SlaveID slaveId) { + mesosActor.tell(new SlaveLost(slaveId), ActorRef.noSender()); + } + + @Override + public void executorLost(SchedulerDriver driver, Protos.ExecutorID executorId, Protos.SlaveID slaveId, int status) { + throw new UnsupportedOperationException("executorLost is unexpected"); --- End diff -- Why don't we forward this message and crash the actor instead? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL:
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426238#comment-15426238 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75284993 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler; + +import akka.actor.ActorRef; + +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.SlaveLost; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.mesos.Protos; +import org.apache.mesos.Scheduler; +import org.apache.mesos.SchedulerDriver; + +import java.util.List; + +/** + * This class reacts to callbacks from the Mesos scheduler driver. + * + * In order to preserve actor concurrency safety, this class simply sends + * corresponding messages to the Mesos resource master actor. + * + * See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html + */ +public class SchedulerProxy implements Scheduler { + + /** The actor to which we report the callbacks */ + private ActorRef mesosActor; --- End diff -- The `MesosActor` is actually the `MesosFlinkResourceManager`, right? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426230#comment-15426230 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75283918 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.mesos.Protos; +import scala.Option; + +import java.io.Serializable; +import java.text.DecimalFormat; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * A store of Mesos workers and associated framework information. + * + * Generates a framework ID as necessary. + */ +public interface MesosWorkerStore { + + static final DecimalFormat TASKID_FORMAT = new DecimalFormat("taskmanager-0"); + + void start() throws Exception; + + void stop() throws Exception; + + Option getFrameworkID() throws Exception; + + void setFrameworkID(Option frameworkID) throws Exception; + + List recoverWorkers() throws Exception; + + Protos.TaskID newTaskID() throws Exception; + + void putWorker(Worker worker) throws Exception; + + void removeWorker(Protos.TaskID taskID) throws Exception; + + void cleanup() throws Exception; + + /** +* A stored task. +* +* The assigned slaveid/hostname is valid in Launched and Released states. The hostname is needed +* by Fenzo for optimization purposes. +*/ + class Worker implements Serializable { + private Protos.TaskID taskID; + + private Option slaveID; + + private Option hostname; + + private TaskState state; + + public Worker(Protos.TaskID taskID, Option slaveID, Option hostname, TaskState state) { + requireNonNull(taskID, "taskID"); + requireNonNull(slaveID, "slaveID"); + requireNonNull(hostname, "hostname"); + requireNonNull(state, "state"); + + this.taskID = taskID; + this.slaveID = slaveID; + this.hostname = hostname; + this.state = state; + } + + public Protos.TaskID taskID() { + return taskID; + } + + public Option slaveID() { + return slaveID; + } + + public Option hostname() { + return hostname; + } + + public TaskState state() { + return state; + } + + // valid transition methods + + public static Worker newTask(Protos.TaskID taskID) { + return new Worker( + taskID, + Option.empty(), Option.empty(), + TaskState.New); + } + + public Worker launchTask(Protos.SlaveID slaveID, String hostname) { + return new Worker(taskID, Option.apply(slaveID), Option.apply(hostname), TaskState.Launched); + } + + public Worker releaseTask() { + return new Worker(taskID, slaveID, hostname, TaskState.Released); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { +
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426227#comment-15426227 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75283689 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.mesos.Protos; +import scala.Option; + +import java.io.Serializable; +import java.text.DecimalFormat; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * A store of Mesos workers and associated framework information. + * + * Generates a framework ID as necessary. + */ +public interface MesosWorkerStore { + + static final DecimalFormat TASKID_FORMAT = new DecimalFormat("taskmanager-0"); + + void start() throws Exception; + + void stop() throws Exception; + + Option getFrameworkID() throws Exception; + + void setFrameworkID(Option frameworkID) throws Exception; + + List recoverWorkers() throws Exception; + + Protos.TaskID newTaskID() throws Exception; + + void putWorker(Worker worker) throws Exception; + + void removeWorker(Protos.TaskID taskID) throws Exception; + + void cleanup() throws Exception; + + /** +* A stored task. +* +* The assigned slaveid/hostname is valid in Launched and Released states. The hostname is needed +* by Fenzo for optimization purposes. +*/ + class Worker implements Serializable { + private Protos.TaskID taskID; + + private Option slaveID; + + private Option hostname; + + private TaskState state; + + public Worker(Protos.TaskID taskID, Option slaveID, Option hostname, TaskState state) { + requireNonNull(taskID, "taskID"); + requireNonNull(slaveID, "slaveID"); + requireNonNull(hostname, "hostname"); + requireNonNull(state, "state"); + + this.taskID = taskID; + this.slaveID = slaveID; + this.hostname = hostname; + this.state = state; + } + + public Protos.TaskID taskID() { + return taskID; + } + + public Option slaveID() { + return slaveID; + } + + public Option hostname() { + return hostname; + } + + public TaskState state() { + return state; + } + + // valid transition methods --- End diff -- Could you frame the transition methods with comments? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426226#comment-15426226 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75283539 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.SharedValue; +import org.apache.curator.framework.recipes.shared.VersionedValue; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.zookeeper.StateStorageHelper; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.mesos.Protos; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A ZooKeeper-backed Mesos worker store. + */ +public class ZooKeeperMesosWorkerStore implements MesosWorkerStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperMesosWorkerStore.class); + + private final Object cacheLock = new Object(); --- End diff -- Seems like the store should only be accessed by the ResourceManager. In this case we could remove the lock. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426220#comment-15426220 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75282731 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/StandaloneMesosWorkerStore.java --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import com.google.common.collect.ImmutableList; +import org.apache.mesos.Protos; +import scala.Option; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * A standalone Mesos worker store. + */ +public class StandaloneMesosWorkerStore implements MesosWorkerStore { + + private Option frameworkID = Option.empty(); + + private int taskCount = 0; + + private MapstoredWorkers = new LinkedHashMap<>(); + + public StandaloneMesosWorkerStore() { + } + + @Override + public void start() throws Exception { + + } + + @Override + public void stop() throws Exception { + + } + + @Override + public Option getFrameworkID() throws Exception { + return frameworkID; + } + + @Override + public void setFrameworkID(Option frameworkID) throws Exception { + this.frameworkID = frameworkID; + } + + @Override + public List recoverWorkers() throws Exception { + return ImmutableList.copyOf(storedWorkers.values()); + } + + @Override + public Protos.TaskID newTaskID() throws Exception { + Protos.TaskID taskID = Protos.TaskID.newBuilder().setValue(TASKID_FORMAT.format(++taskCount)).build(); + return taskID; --- End diff -- Could be simplified: `return Protos.TaskID.newBuilder().setValue(TASKID_FORMAT.format(++taskCount)).build();`. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426210#comment-15426210 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75282003 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/MesosWorkerStore.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.mesos.Protos; +import scala.Option; + +import java.io.Serializable; +import java.text.DecimalFormat; +import java.util.List; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +/** + * A store of Mesos workers and associated framework information. + * + * Generates a framework ID as necessary. + */ +public interface MesosWorkerStore { + + static final DecimalFormat TASKID_FORMAT = new DecimalFormat("taskmanager-0"); --- End diff -- Per definition, variables are static and final in interfaces :) > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426207#comment-15426207 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75281657 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/RegisteredMesosWorkerNode.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework + +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore +import org.apache.flink.runtime.clusterframework.types.{ResourceID, ResourceIDRetrievable} + +/** + * A representation of a registered Mesos task managed by the {@link MesosFlinkResourceManager}. + */ +case class RegisteredMesosWorkerNode(task: MesosWorkerStore.Worker) extends ResourceIDRetrievable { --- End diff -- Why is this class written in Scala? It seems like this class is only used from Java code. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426196#comment-15426196 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75280315 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426188#comment-15426188 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75279529 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426161#comment-15426161 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75276925 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426153#comment-15426153 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75276016 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426147#comment-15426147 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75275735 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosConfigKeys.java --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +/** + * The Mesos environment variables used for settings of the containers. + */ +public class MesosConfigKeys { --- End diff -- I wonder, would it make sense to create `ContainerEnvConfigKeys` with the shared environment variables in `YarnConfigKeys` and `MesosConfigKeys`? The overlap is quite quite significant. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426140#comment-15426140 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75275043 --- Diff: flink-mesos/pom.xml --- @@ -0,0 +1,294 @@ + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + 4.0.0 + + + org.apache.flink + flink-parent + 1.1-SNAPSHOT --- End diff -- This needs to be bumped to `1.2-SNAPSHOT`. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426135#comment-15426135 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75274022 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java --- @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore; +import org.apache.flink.mesos.util.MesosArtifactServer; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.mesos.util.ZooKeeperUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Option; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.net.InetAddress; +import java.net.URL; +import java.security.PrivilegedAction; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.mesos.Utils.uri; +import static org.apache.flink.mesos.Utils.variable; + +/** + * This class is the executable entry point for the Mesos Application Master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link MesosFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container + * allocation and failure detection. + */ +public class MesosApplicationMasterRunner { + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final MapENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426126#comment-15426126 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75272248 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import com.netflix.fenzo.ConstraintEvaluator; +import com.netflix.fenzo.TaskAssignmentResult; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.VMTaskFitnessCalculator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.mesos.Protos; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.mesos.Utils.variable; +import static org.apache.flink.mesos.Utils.range; +import static org.apache.flink.mesos.Utils.ranges; +import static org.apache.flink.mesos.Utils.scalar; + +/** + * Specifies how to launch a Mesos worker. + */ +public class LaunchableMesosWorker implements LaunchableTask { + + /** +* The set of configuration keys to be dynamically configured with a port allocated from Mesos. +*/ + private static String[] TM_PORT_KEYS = { + "taskmanager.rpc.port", + "taskmanager.data.port" }; + + private final MesosTaskManagerParameters params; + private final Protos.TaskInfo.Builder template; + private final Protos.TaskID taskID; + private final Request taskRequest; + + /** +* Construct a launchable Mesos worker. +* @param params the TM parameters such as memory, cpu to acquire. +* @param template a template for the TaskInfo to be constructed at launch time. +* @param taskID the taskID for this worker. + */ + public LaunchableMesosWorker(MesosTaskManagerParameters params, Protos.TaskInfo.Builder template, Protos.TaskID taskID) { + this.params = params; + this.template = template; + this.taskID = taskID; + this.taskRequest = new Request(); + } + + public Protos.TaskID taskID() { + return taskID; + } + + @Override + public TaskRequest taskRequest() { + return taskRequest; + } + + class Request implements TaskRequest { + private final AtomicReference assignedResources = new AtomicReference<>(); + + @Override + public String getId() { + return taskID.getValue(); + } + + @Override + public String taskGroupName() { + return ""; + } + + @Override + public double getCPUs() { + return params.cpus(); + } + + @Override + public double getMemory() { + return params.containeredParameters().taskManagerTotalMemoryMB(); + } + + @Override + public double getNetworkMbps() { + return 0.0; + } + + @Override + public double getDisk() { + return 0.0; --- End diff -- I would rather throw an exception here if the value is not in use. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink >
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426125#comment-15426125 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75272075 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import com.netflix.fenzo.ConstraintEvaluator; +import com.netflix.fenzo.TaskAssignmentResult; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.VMTaskFitnessCalculator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.mesos.Protos; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.mesos.Utils.variable; +import static org.apache.flink.mesos.Utils.range; +import static org.apache.flink.mesos.Utils.ranges; +import static org.apache.flink.mesos.Utils.scalar; + +/** + * Specifies how to launch a Mesos worker. + */ +public class LaunchableMesosWorker implements LaunchableTask { + + /** +* The set of configuration keys to be dynamically configured with a port allocated from Mesos. +*/ + private static String[] TM_PORT_KEYS = { + "taskmanager.rpc.port", + "taskmanager.data.port" }; + + private final MesosTaskManagerParameters params; + private final Protos.TaskInfo.Builder template; + private final Protos.TaskID taskID; + private final Request taskRequest; + + /** +* Construct a launchable Mesos worker. +* @param params the TM parameters such as memory, cpu to acquire. +* @param template a template for the TaskInfo to be constructed at launch time. +* @param taskID the taskID for this worker. + */ + public LaunchableMesosWorker(MesosTaskManagerParameters params, Protos.TaskInfo.Builder template, Protos.TaskID taskID) { + this.params = params; + this.template = template; + this.taskID = taskID; + this.taskRequest = new Request(); + } + + public Protos.TaskID taskID() { + return taskID; + } + + @Override + public TaskRequest taskRequest() { + return taskRequest; + } + + class Request implements TaskRequest { + private final AtomicReference assignedResources = new AtomicReference<>(); + + @Override + public String getId() { + return taskID.getValue(); + } + + @Override + public String taskGroupName() { + return ""; + } + + @Override + public double getCPUs() { + return params.cpus(); + } + + @Override + public double getMemory() { + return params.containeredParameters().taskManagerTotalMemoryMB(); + } + + @Override + public double getNetworkMbps() { + return 0.0; + } + + @Override + public double getDisk() { + return 0.0; + } + + @Override + public int getPorts() { + return TM_PORT_KEYS.length; + } + + @Override + public MapgetCustomNamedResources() { + return
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426120#comment-15426120 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75271531 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/LaunchableMesosWorker.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import com.netflix.fenzo.ConstraintEvaluator; +import com.netflix.fenzo.TaskAssignmentResult; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.VMTaskFitnessCalculator; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.mesos.Protos; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.mesos.Utils.variable; +import static org.apache.flink.mesos.Utils.range; +import static org.apache.flink.mesos.Utils.ranges; +import static org.apache.flink.mesos.Utils.scalar; + +/** + * Specifies how to launch a Mesos worker. + */ +public class LaunchableMesosWorker implements LaunchableTask { + + /** +* The set of configuration keys to be dynamically configured with a port allocated from Mesos. +*/ + private static String[] TM_PORT_KEYS = { + "taskmanager.rpc.port", + "taskmanager.data.port" }; + + private final MesosTaskManagerParameters params; + private final Protos.TaskInfo.Builder template; + private final Protos.TaskID taskID; + private final Request taskRequest; + + /** +* Construct a launchable Mesos worker. +* @param params the TM parameters such as memory, cpu to acquire. +* @param template a template for the TaskInfo to be constructed at launch time. +* @param taskID the taskID for this worker. + */ + public LaunchableMesosWorker(MesosTaskManagerParameters params, Protos.TaskInfo.Builder template, Protos.TaskID taskID) { + this.params = params; + this.template = template; + this.taskID = taskID; + this.taskRequest = new Request(); + } + + public Protos.TaskID taskID() { + return taskID; + } + + @Override + public TaskRequest taskRequest() { + return taskRequest; + } + + class Request implements TaskRequest { + private final AtomicReference assignedResources = new AtomicReference<>(); + + @Override + public String getId() { + return taskID.getValue(); + } + + @Override + public String taskGroupName() { + return ""; + } + + @Override + public double getCPUs() { + return params.cpus(); + } + + @Override + public double getMemory() { + return params.containeredParameters().taskManagerTotalMemoryMB(); + } + + @Override + public double getNetworkMbps() { + return 0.0; + } + + @Override + public double getDisk() { + return 0.0; --- End diff -- This is always 0.0 which means give me whatever is free? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426122#comment-15426122 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75271636 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/Utils.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos; + +import org.apache.mesos.Protos; + +import java.net.URL; +import java.util.Arrays; + +public class Utils { + /** +* Construct a Mesos environment variable. + */ --- End diff -- indention off here > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426118#comment-15426118 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75270753 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/cli/FlinkMesosSessionCli.java --- @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.cli; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.configuration.Configuration; + +import java.io.IOException; +import java.util.Map; + +public class FlinkMesosSessionCli { --- End diff -- This looks just like a dummy/stub class? Not a CLI yet :) > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426114#comment-15426114 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75269975 --- Diff: flink-mesos/pom.xml --- @@ -0,0 +1,294 @@ + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd;> + 4.0.0 + + + org.apache.flink + flink-parent + 1.1-SNAPSHOT + .. + + + flink-mesos_2.10 + flink-mesos + jar + + +0.27.1 + + + + + org.apache.flink + flink-runtime_2.10 + ${project.version} + + + hadoop-core --- End diff -- Why do you exclude just `hadoop-core` here? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15426113#comment-15426113 ] ASF GitHub Bot commented on FLINK-1984: --- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75269835 --- Diff: flink-dist/pom.xml --- @@ -113,8 +113,13 @@ under the License. flink-metrics-jmx ${project.version} + + + org.apache.flink + flink-mesos_2.10 + ${project.version} + --- End diff -- We always build yarn. We use the `include-yarn-tests` profile to include/exclude yarn tests. The `include-yarn` profile, on the other hand, it to exclude yarn for the Haodop 1 version of Flink. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424942#comment-15424942 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2315 Great work @EronWright. Code quality, testing and extent of this PR is really impressive :-) I think you've nicely modularized the Mesos resource manager with the additional actors. This makes testing much nicer :-) I only had some minor comments and questions for my own understanding. Tomorrow I want to try it out on a Mesos cluster to see how it works. There is only one thing I'm wondering. Since we're also currently working on Flip-6 where we try to put a new RPC abstraction in place in order to eventually remove Akka and Scala from flink-runtime, I wanted to ask whether you think that the FSM actors could also be replaced (sometime in the future) by something else? I totally agree that they are a nice abstraction for Mesos and allow to express the logic succinctly. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424837#comment-15424837 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75156694 --- Diff: flink-mesos/src/test/resources/log4j-test.properties --- @@ -0,0 +1,32 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +log4j.rootLogger=INFO, console --- End diff -- log level should be `OFF`. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424825#comment-15424825 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75154390 --- Diff: flink-mesos/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala --- @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework + +import java.util.concurrent.{TimeUnit, ExecutorService} + +import akka.actor.ActorRef + +import org.apache.flink.api.common.JobID +import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants} +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory +import org.apache.flink.runtime.clusterframework.ApplicationStatus +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory +import org.apache.flink.runtime.clusterframework.messages._ +import org.apache.flink.runtime.jobgraph.JobStatus +import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager} +import org.apache.flink.runtime.leaderelection.LeaderElectionService +import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, CurrentJobStatus, JobNotFound} +import org.apache.flink.runtime.messages.Messages.Acknowledge +import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry} +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager +import org.apache.flink.runtime.instance.InstanceManager +import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} + +import scala.concurrent.duration._ +import scala.language.postfixOps + + +/** JobManager actor for execution on Yarn or Mesos. It enriches the [[JobManager]] with additional messages + * to start/administer/stop the session. + * + * @param flinkConfiguration Configuration object for the actor + * @param executorService Execution context which is used to execute concurrent tasks in the + * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]] + * @param instanceManager Instance manager to manage the registered + * [[org.apache.flink.runtime.taskmanager.TaskManager]] + * @param scheduler Scheduler to schedule Flink jobs + * @param libraryCacheManager Manager to manage uploaded jar files + * @param archive Archive for finished Flink jobs + * @param restartStrategyFactory Restart strategy to be used in case of a job recovery + * @param timeout Timeout for futures + * @param leaderElectionService LeaderElectionService to participate in the leader election + */ +abstract class ContaineredJobManager( + flinkConfiguration: FlinkConfiguration, + executorService: ExecutorService, + instanceManager: InstanceManager, + scheduler: FlinkScheduler, + libraryCacheManager: BlobLibraryCacheManager, + archive: ActorRef, + restartStrategyFactory: RestartStrategyFactory, + timeout: FiniteDuration, + leaderElectionService: LeaderElectionService, + submittedJobGraphs : SubmittedJobGraphStore, + checkpointRecoveryFactory : CheckpointRecoveryFactory, + savepointStore: SavepointStore, + jobRecoveryTimeout: FiniteDuration, + metricsRegistry: Option[FlinkMetricRegistry]) + extends JobManager( +flinkConfiguration, +executorService, +instanceManager,
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424811#comment-15424811 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75153237 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala --- @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, ActorRef, FSM, Props} +import com.netflix.fenzo._ +import com.netflix.fenzo.functions.Action1 +import com.netflix.fenzo.plugins.VMLeaseObject +import grizzled.slf4j.Logger +import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.LaunchCoordinator._ +import org.apache.flink.mesos.scheduler.messages._ +import org.apache.mesos.Protos.TaskInfo +import org.apache.mesos.{SchedulerDriver, Protos} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{Map => MutableMap} +import scala.concurrent.duration._ + +/** + * The launch coordinator handles offer processing, including + * matching offers to tasks and making reservations. + * + * The coordinator uses Netflix Fenzo to optimize task placement. During the GatheringOffers phase, + * offers are evaluated by Fenzo for suitability to the planned tasks. Reservations are then placed + * against the best offers, leading to revised offers containing reserved resources with which to launch task(s). + */ +class LaunchCoordinator( +manager: ActorRef, +config: Configuration, +schedulerDriver: SchedulerDriver, +optimizerBuilder: TaskSchedulerBuilder + ) extends Actor with FSM[TaskState, GatherData] { + + val LOG = Logger(getClass) + + /** +* The task placement optimizer. +* +* The optimizer contains the following state: +* - unused offers +* - existing task placement (for fitness calculation involving task colocation) +*/ + private[mesos] val optimizer: TaskScheduler = { +optimizerBuilder + .withLeaseRejectAction(new Action1[VirtualMachineLease]() { +def call(lease: VirtualMachineLease) { + LOG.info(s"Declined offer ${lease.getId} from ${lease.hostname()} of ${lease.memoryMB()} MB, ${lease.cpuCores()} cpus.") + schedulerDriver.declineOffer(lease.getOffer.getId) +} + }).build + } + + override def postStop(): Unit = { +optimizer.shutdown() +super.postStop() + } + + /** +* Initial state +*/ + startWith(Suspended, GatherData(tasks = Nil, newLeases = Nil)) + + /** +* State: Suspended +* +* Wait for (re-)connection to Mesos. No offers exist in this state, but outstanding tasks might. +*/ + when(Suspended) { +case Event(msg: Connected, data: GatherData) => + if(data.tasks.nonEmpty) goto(GatheringOffers) + else goto(Idle) + } + + /** +* State: Idle +* +* Wait for a task request to arrive, then transition into gathering offers. +*/ + onTransition { +case _ -> Idle => assert(nextStateData.tasks.isEmpty) + } + + when(Idle) { +case Event(msg: Disconnected, data: GatherData) => + goto(Suspended) + +case Event(offers: ResourceOffers, data: GatherData) => + // decline any offers that come in + schedulerDriver.suppressOffers() + for(offer <- offers.offers().asScala) { schedulerDriver.declineOffer(offer.getId) } + stay() + +case Event(msg: Launch, data: GatherData) => + goto(GatheringOffers) using data.copy(tasks = data.tasks ++
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424783#comment-15424783 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75151735 --- Diff: flink-mesos/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala --- @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework + +import java.util.concurrent.{TimeUnit, ExecutorService} + +import akka.actor.ActorRef + +import org.apache.flink.api.common.JobID +import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants} +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory +import org.apache.flink.runtime.clusterframework.ApplicationStatus +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory +import org.apache.flink.runtime.clusterframework.messages._ +import org.apache.flink.runtime.jobgraph.JobStatus +import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager} +import org.apache.flink.runtime.leaderelection.LeaderElectionService +import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, CurrentJobStatus, JobNotFound} +import org.apache.flink.runtime.messages.Messages.Acknowledge +import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry} +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager +import org.apache.flink.runtime.instance.InstanceManager +import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} + +import scala.concurrent.duration._ +import scala.language.postfixOps + + +/** JobManager actor for execution on Yarn or Mesos. It enriches the [[JobManager]] with additional messages + * to start/administer/stop the session. + * + * @param flinkConfiguration Configuration object for the actor + * @param executorService Execution context which is used to execute concurrent tasks in the + * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]] + * @param instanceManager Instance manager to manage the registered + * [[org.apache.flink.runtime.taskmanager.TaskManager]] + * @param scheduler Scheduler to schedule Flink jobs + * @param libraryCacheManager Manager to manage uploaded jar files + * @param archive Archive for finished Flink jobs + * @param restartStrategyFactory Restart strategy to be used in case of a job recovery + * @param timeout Timeout for futures + * @param leaderElectionService LeaderElectionService to participate in the leader election + */ +abstract class ContaineredJobManager( + flinkConfiguration: FlinkConfiguration, + executorService: ExecutorService, + instanceManager: InstanceManager, + scheduler: FlinkScheduler, + libraryCacheManager: BlobLibraryCacheManager, + archive: ActorRef, + restartStrategyFactory: RestartStrategyFactory, + timeout: FiniteDuration, + leaderElectionService: LeaderElectionService, + submittedJobGraphs : SubmittedJobGraphStore, + checkpointRecoveryFactory : CheckpointRecoveryFactory, + savepointStore: SavepointStore, + jobRecoveryTimeout: FiniteDuration, + metricsRegistry: Option[FlinkMetricRegistry]) + extends JobManager( +flinkConfiguration, +executorService, +
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424766#comment-15424766 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75150474 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, ActorRef, Props} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator.Reconcile +import org.apache.flink.mesos.scheduler.TaskMonitor.{TaskGoalState, TaskGoalStateUpdated, TaskTerminated} +import org.apache.flink.mesos.scheduler.Tasks._ +import org.apache.flink.mesos.scheduler.messages._ +import org.apache.mesos.{SchedulerDriver, Protos} + +import scala.collection.mutable.{Map => MutableMap} + +/** + * Aggregate of monitored tasks. + * + * Routes messages between the scheduler and individual task monitor actors. + */ +class Tasks[M <: TaskMonitor]( + flinkConfig: Configuration, + schedulerDriver: SchedulerDriver, + taskMonitorClass: Class[M]) extends Actor { + + /** +* A map of task monitors by task ID. +*/ + private val taskMap: MutableMap[Protos.TaskID,ActorRef] = MutableMap() + + /** +* Cache of current connection state. +*/ + private var registered: Option[Any] = None + + override def preStart(): Unit = { +// TODO subscribe to context.system.deadLetters for messages to nonexistent tasks + } + + override def receive: Receive = { + +case msg: Disconnected => + registered = None + context.actorSelection("*").tell(msg, self) + +case msg : Connected => + registered = Some(msg) + context.actorSelection("*").tell(msg, self) + +case msg: TaskGoalStateUpdated => + val taskID = msg.state.taskID + + // ensure task monitor exists + if(!taskMap.contains(taskID)) { +val actorRef = createTask(msg.state) +registered.foreach(actorRef ! _) + } + + taskMap(taskID) ! msg + +case msg: StatusUpdate => + taskMap(msg.status().getTaskId) ! msg + +case msg: Reconcile => + context.parent.forward(msg) + +case msg: TaskTerminated => + context.parent.forward(msg) + } + + private def createTask(task: TaskGoalState): ActorRef = { +val actorProps = TaskMonitor.createActorProps(taskMonitorClass, flinkConfig, schedulerDriver, task) --- End diff -- line is longer than 100 characters. This should cause a checkstyle violation. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424758#comment-15424758 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75149938 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/Tasks.scala --- @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, ActorRef, Props} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator.Reconcile +import org.apache.flink.mesos.scheduler.TaskMonitor.{TaskGoalState, TaskGoalStateUpdated, TaskTerminated} +import org.apache.flink.mesos.scheduler.Tasks._ +import org.apache.flink.mesos.scheduler.messages._ +import org.apache.mesos.{SchedulerDriver, Protos} + +import scala.collection.mutable.{Map => MutableMap} + +/** + * Aggregate of monitored tasks. + * + * Routes messages between the scheduler and individual task monitor actors. + */ +class Tasks[M <: TaskMonitor]( + flinkConfig: Configuration, + schedulerDriver: SchedulerDriver, + taskMonitorClass: Class[M]) extends Actor { + + /** +* A map of task monitors by task ID. +*/ + private val taskMap: MutableMap[Protos.TaskID,ActorRef] = MutableMap() + + /** +* Cache of current connection state. +*/ + private var registered: Option[Any] = None + + override def preStart(): Unit = { +// TODO subscribe to context.system.deadLetters for messages to nonexistent tasks --- End diff -- Can we resolve this TODO? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424744#comment-15424744 ] ASF GitHub Bot commented on FLINK-1984: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75149129 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.SharedValue; +import org.apache.curator.framework.recipes.shared.VersionedValue; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.zookeeper.StateStorageHelper; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.mesos.Protos; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A ZooKeeper-backed Mesos worker store. + */ +public class ZooKeeperMesosWorkerStore implements MesosWorkerStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperMesosWorkerStore.class); + + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + /** A persistent value of the assigned framework ID */ + private final SharedValue frameworkIdInZooKeeper; + + /** A persistent count of all tasks created, for generating unique IDs */ + private final SharedCount totalTaskCountInZooKeeper; + + /** A persistent store of serialized workers */ + private final ZooKeeperStateHandleStore workersInZooKeeper; + + @SuppressWarnings("unchecked") + ZooKeeperMesosWorkerStore( + CuratorFramework client, + String storePath, + StateStorageHelper stateStorage + ) throws Exception { + checkNotNull(storePath, "storePath"); + checkNotNull(stateStorage, "stateStorage"); + + // Keep a reference to the original client and not the namespace facade. The namespace + // facade cannot be closed. + this.client = checkNotNull(client, "client"); + + // All operations will have the given path as root + client.newNamespaceAwareEnsurePath(storePath).ensure(client.getZookeeperClient()); + CuratorFramework facade = client.usingNamespace(client.getNamespace() + storePath); + + // Track the assignd framework ID. + frameworkIdInZooKeeper = new SharedValue(facade, "/frameworkId", new byte[0]); + + // Keep a count of all tasks created ever, as the basis for a unique ID. + totalTaskCountInZooKeeper = new SharedCount(facade, "/count", 0); + + // Keep track of the workers in state handle storage. + facade.newNamespaceAwareEnsurePath("/workers").ensure(client.getZookeeperClient()); + CuratorFramework storeFacade =
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424729#comment-15424729 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75147221 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala --- @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, ActorRef, FSM, Props} +import com.netflix.fenzo._ +import com.netflix.fenzo.functions.Action1 +import com.netflix.fenzo.plugins.VMLeaseObject +import grizzled.slf4j.Logger +import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.LaunchCoordinator._ +import org.apache.flink.mesos.scheduler.messages._ +import org.apache.mesos.Protos.TaskInfo +import org.apache.mesos.{SchedulerDriver, Protos} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{Map => MutableMap} +import scala.concurrent.duration._ + +/** + * The launch coordinator handles offer processing, including + * matching offers to tasks and making reservations. + * + * The coordinator uses Netflix Fenzo to optimize task placement. During the GatheringOffers phase, + * offers are evaluated by Fenzo for suitability to the planned tasks. Reservations are then placed + * against the best offers, leading to revised offers containing reserved resources with which to launch task(s). + */ +class LaunchCoordinator( +manager: ActorRef, +config: Configuration, +schedulerDriver: SchedulerDriver, +optimizerBuilder: TaskSchedulerBuilder + ) extends Actor with FSM[TaskState, GatherData] { + + val LOG = Logger(getClass) + + /** +* The task placement optimizer. +* +* The optimizer contains the following state: +* - unused offers +* - existing task placement (for fitness calculation involving task colocation) +*/ + private[mesos] val optimizer: TaskScheduler = { +optimizerBuilder + .withLeaseRejectAction(new Action1[VirtualMachineLease]() { +def call(lease: VirtualMachineLease) { + LOG.info(s"Declined offer ${lease.getId} from ${lease.hostname()} of ${lease.memoryMB()} MB, ${lease.cpuCores()} cpus.") + schedulerDriver.declineOffer(lease.getOffer.getId) +} + }).build + } + + override def postStop(): Unit = { +optimizer.shutdown() +super.postStop() + } + + /** +* Initial state +*/ + startWith(Suspended, GatherData(tasks = Nil, newLeases = Nil)) + + /** +* State: Suspended +* +* Wait for (re-)connection to Mesos. No offers exist in this state, but outstanding tasks might. +*/ + when(Suspended) { +case Event(msg: Connected, data: GatherData) => + if(data.tasks.nonEmpty) goto(GatheringOffers) + else goto(Idle) + } + + /** +* State: Idle +* +* Wait for a task request to arrive, then transition into gathering offers. +*/ + onTransition { +case _ -> Idle => assert(nextStateData.tasks.isEmpty) + } + + when(Idle) { +case Event(msg: Disconnected, data: GatherData) => + goto(Suspended) + +case Event(offers: ResourceOffers, data: GatherData) => + // decline any offers that come in + schedulerDriver.suppressOffers() + for(offer <- offers.offers().asScala) { schedulerDriver.declineOffer(offer.getId) } + stay() + +case Event(msg: Launch, data: GatherData) => + goto(GatheringOffers) using data.copy(tasks = data.tasks ++
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424670#comment-15424670 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75140675 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala --- @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, ActorRef, FSM, Props} +import com.netflix.fenzo._ +import com.netflix.fenzo.functions.Action1 +import com.netflix.fenzo.plugins.VMLeaseObject +import grizzled.slf4j.Logger +import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.LaunchCoordinator._ +import org.apache.flink.mesos.scheduler.messages._ +import org.apache.mesos.Protos.TaskInfo +import org.apache.mesos.{SchedulerDriver, Protos} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{Map => MutableMap} +import scala.concurrent.duration._ + +/** + * The launch coordinator handles offer processing, including + * matching offers to tasks and making reservations. + * + * The coordinator uses Netflix Fenzo to optimize task placement. During the GatheringOffers phase, --- End diff -- Because Fenzo solves a hard problem (knapsack packing of Mesos tasks->offers) today and nicely prepares the Mesos RM for future enhancement (task locality, etc). @StephanEwen was supportive. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424665#comment-15424665 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75139941 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/ConnectionMonitor.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, FSM, Props} +import grizzled.slf4j.Logger +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.ConnectionMonitor._ +import org.apache.flink.mesos.scheduler.messages._ + +import scala.concurrent.duration._ + +/** + * Actively monitors the Mesos connection. + */ +class ConnectionMonitor() extends Actor with FSM[FsmState, Unit] { --- End diff -- Code in the `org.apache.flink.mesos.scheduler` package is reusable; the Mesos dispatcher is also a Mesos framework and will use this same monitoring code. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424663#comment-15424663 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75139688 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.Configuration; + +public class ZooKeeperUtils { + + /** +* Starts a {@link CuratorFramework} instance and connects it to the given ZooKeeper +* quorum. +* +* @param configuration {@link Configuration} object containing the configuration values +* @return {@link CuratorFramework} instance +*/ + @SuppressWarnings("unchecked") + public static CuratorFramework startCuratorFramework(Configuration configuration) { + + // workaround for shaded curator dependency of flink-runtime + Object client = org.apache.flink.runtime.util.ZooKeeperUtils.startCuratorFramework(configuration); --- End diff -- (discussed above, see ZooKeeperMesosWorkerStore) > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424662#comment-15424662 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75139611 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.Configuration; + +public class ZooKeeperUtils { + + /** +* Starts a {@link CuratorFramework} instance and connects it to the given ZooKeeper +* quorum. +* +* @param configuration {@link Configuration} object containing the configuration values +* @return {@link CuratorFramework} instance +*/ + @SuppressWarnings("unchecked") + public static CuratorFramework startCuratorFramework(Configuration configuration) { + + // workaround for shaded curator dependency of flink-runtime --- End diff -- (discussed above, see ZooKeeperMesosWorkerStore) > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424652#comment-15424652 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75138892 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.SharedValue; +import org.apache.curator.framework.recipes.shared.VersionedValue; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.zookeeper.StateStorageHelper; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.mesos.Protos; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A ZooKeeper-backed Mesos worker store. + */ +public class ZooKeeperMesosWorkerStore implements MesosWorkerStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperMesosWorkerStore.class); + + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + /** A persistent value of the assigned framework ID */ + private final SharedValue frameworkIdInZooKeeper; + + /** A persistent count of all tasks created, for generating unique IDs */ + private final SharedCount totalTaskCountInZooKeeper; + + /** A persistent store of serialized workers */ + private final ZooKeeperStateHandleStore workersInZooKeeper; + + @SuppressWarnings("unchecked") + ZooKeeperMesosWorkerStore( + CuratorFramework client, + String storePath, + StateStorageHelper stateStorage + ) throws Exception { + checkNotNull(storePath, "storePath"); + checkNotNull(stateStorage, "stateStorage"); + + // Keep a reference to the original client and not the namespace facade. The namespace + // facade cannot be closed. + this.client = checkNotNull(client, "client"); + + // All operations will have the given path as root + client.newNamespaceAwareEnsurePath(storePath).ensure(client.getZookeeperClient()); + CuratorFramework facade = client.usingNamespace(client.getNamespace() + storePath); + + // Track the assignd framework ID. + frameworkIdInZooKeeper = new SharedValue(facade, "/frameworkId", new byte[0]); + + // Keep a count of all tasks created ever, as the basis for a unique ID. + totalTaskCountInZooKeeper = new SharedCount(facade, "/count", 0); + + // Keep track of the workers in state handle storage. + facade.newNamespaceAwareEnsurePath("/workers").ensure(client.getZookeeperClient()); + CuratorFramework storeFacade =
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424591#comment-15424591 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75131308 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework + +import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.instance.InstanceConnectionInfo +import org.apache.flink.runtime.io.disk.iomanager.IOManager +import org.apache.flink.runtime.io.network.NetworkEnvironment +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService +import org.apache.flink.runtime.memory.MemoryManager +import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration} + +/** An extension of the TaskManager that listens for additional Mesos-related + * messages. --- End diff -- Which additional messages is the `MesosTaskManager` listening to? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424611#comment-15424611 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75133244 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/LaunchCoordinator.scala --- @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, ActorRef, FSM, Props} +import com.netflix.fenzo._ +import com.netflix.fenzo.functions.Action1 +import com.netflix.fenzo.plugins.VMLeaseObject +import grizzled.slf4j.Logger +import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2} +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.LaunchCoordinator._ +import org.apache.flink.mesos.scheduler.messages._ +import org.apache.mesos.Protos.TaskInfo +import org.apache.mesos.{SchedulerDriver, Protos} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{Map => MutableMap} +import scala.concurrent.duration._ + +/** + * The launch coordinator handles offer processing, including + * matching offers to tasks and making reservations. + * + * The coordinator uses Netflix Fenzo to optimize task placement. During the GatheringOffers phase, --- End diff -- Any particular reason why you've chosen Fenzo as scheduler? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424607#comment-15424607 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75132701 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/ConnectionMonitor.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import akka.actor.{Actor, FSM, Props} +import grizzled.slf4j.Logger +import org.apache.flink.configuration.Configuration +import org.apache.flink.mesos.scheduler.ConnectionMonitor._ +import org.apache.flink.mesos.scheduler.messages._ + +import scala.concurrent.duration._ + +/** + * Actively monitors the Mesos connection. + */ +class ConnectionMonitor() extends Actor with FSM[FsmState, Unit] { --- End diff -- Does this class do anything else than logging the state transitions? Couldn't that also happen in the callbacks defined in `MesosFlinkResourceManager`? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424587#comment-15424587 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75130862 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManager.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework + +import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.instance.InstanceConnectionInfo +import org.apache.flink.runtime.io.disk.iomanager.IOManager +import org.apache.flink.runtime.io.network.NetworkEnvironment +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService +import org.apache.flink.runtime.memory.MemoryManager +import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerConfiguration} + +/** An extension of the TaskManager that listens for additional Mesos-related + * messages. + */ +class MesosTaskManager( + config: TaskManagerConfiguration, --- End diff -- Formatting inconsistent with `MesosJobManager`. I usually try to indent Scala class parameter twice and extended classes once. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424563#comment-15424563 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75128299 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.Configuration; + +public class ZooKeeperUtils { + + /** +* Starts a {@link CuratorFramework} instance and connects it to the given ZooKeeper +* quorum. +* +* @param configuration {@link Configuration} object containing the configuration values +* @return {@link CuratorFramework} instance +*/ + @SuppressWarnings("unchecked") + public static CuratorFramework startCuratorFramework(Configuration configuration) { + + // workaround for shaded curator dependency of flink-runtime + Object client = org.apache.flink.runtime.util.ZooKeeperUtils.startCuratorFramework(configuration); --- End diff -- Why storing the client in an `Object` and then casting it? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424562#comment-15424562 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75128222 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/util/ZooKeeperUtils.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.Configuration; + +public class ZooKeeperUtils { + + /** +* Starts a {@link CuratorFramework} instance and connects it to the given ZooKeeper +* quorum. +* +* @param configuration {@link Configuration} object containing the configuration values +* @return {@link CuratorFramework} instance +*/ + @SuppressWarnings("unchecked") + public static CuratorFramework startCuratorFramework(Configuration configuration) { + + // workaround for shaded curator dependency of flink-runtime --- End diff -- What is exactly the problem here? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424547#comment-15424547 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75126311 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Handler; +import io.netty.handler.codec.http.router.Routed; +import io.netty.handler.codec.http.router.Router; +import io.netty.util.CharsetUtil; +import org.jets3t.service.utils.Mimetypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.RandomAccessFile; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URL; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpMethod.GET; +import static io.netty.handler.codec.http.HttpMethod.HEAD; +import static io.netty.handler.codec.http.HttpResponseStatus.GONE; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + + +/** + * A generic Mesos artifact server, designed specifically for use by the Mesos Fetcher. + * + * More information: + * http://mesos.apache.org/documentation/latest/fetcher/ + * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/ + */ +public class MesosArtifactServer { + + private static final Logger LOG = LoggerFactory.getLogger(MesosArtifactServer.class); + + private final Router router; + + private ServerBootstrap bootstrap; + + private Channel serverChannel; + + private URL baseURL; + + public MesosArtifactServer(String sessionID, String serverHostname, int configuredPort) throws Exception { + if (configuredPort < 0 || configuredPort > 0x) { + throw new IllegalArgumentException("File server port is invalid: " + configuredPort); + } + + router = new Router(); + + ChannelInitializer
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424554#comment-15424554 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75126761 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosConfiguration.java --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import org.apache.mesos.MesosSchedulerDriver; +import org.apache.mesos.Protos; +import org.apache.mesos.Scheduler; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.Map; + +/** + * The typed configuration settings associated with a Mesos scheduler. + */ +public class MesosConfiguration { + + private String masterUrl; + + private Protos.FrameworkInfo.Builder frameworkInfo; + + private Option credential = Option.empty(); + + public MesosConfiguration( + String masterUrl, + Protos.FrameworkInfo.Builder frameworkInfo, + Option credential) { + + this.masterUrl = masterUrl; + this.frameworkInfo = frameworkInfo; + this.credential = credential; --- End diff -- Maybe check not null? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424539#comment-15424539 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75124909 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Handler; +import io.netty.handler.codec.http.router.Routed; +import io.netty.handler.codec.http.router.Router; +import io.netty.util.CharsetUtil; +import org.jets3t.service.utils.Mimetypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.RandomAccessFile; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URL; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpMethod.GET; +import static io.netty.handler.codec.http.HttpMethod.HEAD; +import static io.netty.handler.codec.http.HttpResponseStatus.GONE; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + + +/** + * A generic Mesos artifact server, designed specifically for use by the Mesos Fetcher. + * + * More information: + * http://mesos.apache.org/documentation/latest/fetcher/ + * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/ + */ +public class MesosArtifactServer { + + private static final Logger LOG = LoggerFactory.getLogger(MesosArtifactServer.class); + + private final Router router; + + private ServerBootstrap bootstrap; + + private Channel serverChannel; + + private URL baseURL; --- End diff -- Can these fields be final? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424537#comment-15424537 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75124552 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/util/MesosArtifactServer.java --- @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.util; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.DefaultFileRegion; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.DefaultHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.router.Handler; +import io.netty.handler.codec.http.router.Routed; +import io.netty.handler.codec.http.router.Router; +import io.netty.util.CharsetUtil; +import org.jets3t.service.utils.Mimetypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.RandomAccessFile; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URL; + +import static io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; +import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static io.netty.handler.codec.http.HttpMethod.GET; +import static io.netty.handler.codec.http.HttpMethod.HEAD; +import static io.netty.handler.codec.http.HttpResponseStatus.GONE; +import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; +import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; + + +/** + * A generic Mesos artifact server, designed specifically for use by the Mesos Fetcher. + * + * More information: + * http://mesos.apache.org/documentation/latest/fetcher/ + * http://mesos.apache.org/documentation/latest/fetcher-cache-internals/ + */ +public class MesosArtifactServer { + + private static final Logger LOG = LoggerFactory.getLogger(MesosArtifactServer.class); + + private final Router router; + + private ServerBootstrap bootstrap; + + private Channel serverChannel; + + private URL baseURL; + + public MesosArtifactServer(String sessionID, String serverHostname, int configuredPort) throws Exception { + if (configuredPort < 0 || configuredPort > 0x) { + throw new IllegalArgumentException("File server port is invalid: " + configuredPort); + } + + router = new Router(); + + ChannelInitializer
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424519#comment-15424519 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75123394 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/AcceptOffers.java --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler.messages; + +import org.apache.mesos.Protos; + +import java.util.Collection; + +/** + * Local message sent by the launch coordinator to the scheduler to accept offers. + */ +public class AcceptOffers { --- End diff -- Can these messages also be serializable? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424503#comment-15424503 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75121439 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/AcceptOffers.java --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler.messages; + +import org.apache.mesos.Protos; + +import java.util.Collection; + +/** + * Local message sent by the launch coordinator to the scheduler to accept offers. + */ +public class AcceptOffers { --- End diff -- Would be good to make this class immutable by using final fields. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424508#comment-15424508 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75121852 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/AcceptOffers.java --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler.messages; + +import org.apache.mesos.Protos; + +import java.util.Collection; + +/** + * Local message sent by the launch coordinator to the scheduler to accept offers. + */ +public class AcceptOffers { --- End diff -- I think it makes sense to make all message classes immutable. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424504#comment-15424504 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75121525 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/messages/AcceptOffers.java --- @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler.messages; + +import org.apache.mesos.Protos; + +import java.util.Collection; + +/** + * Local message sent by the launch coordinator to the scheduler to accept offers. + */ +public class AcceptOffers { + + private String hostname; + private Collection offerIds; + private Collection operations; + private Protos.Filters filters; + + public AcceptOffers(String hostname, Collection offerIds, Collection operations) { + this.hostname = hostname; + this.offerIds = offerIds; + this.operations = operations; + this.filters = Protos.Filters.newBuilder().build(); + } + + public AcceptOffers(String hostname, Collection offerIds, Collection operations, Protos.Filters filters) { + this.hostname = hostname; + this.offerIds = offerIds; + this.operations = operations; + this.filters = filters; + } + + public String hostname() { --- End diff -- Scala getter style. Would be good to follow the Java style. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424501#comment-15424501 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75121105 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler; + +import akka.actor.ActorRef; + +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.Error; --- End diff -- Twice the same import. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424499#comment-15424499 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75120933 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/scheduler/SchedulerProxy.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler; + +import akka.actor.ActorRef; + +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.SlaveLost; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.mesos.Protos; +import org.apache.mesos.Scheduler; +import org.apache.mesos.SchedulerDriver; + +import java.util.List; + +/** + * This class reacts to callbacks from the Mesos scheduler driver. + * + * In order to preserve actor concurrency safety, this class simply sends + * corresponding messages to the Mesos resource master actor. + * + * See https://mesos.apache.org/api/latest/java/org/apache/mesos/Scheduler.html + */ +public class SchedulerProxy implements Scheduler { + + /** The actor to which we report the callbacks */ + private ActorRef mesosActor; --- End diff -- final? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424495#comment-15424495 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75120223 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.SharedValue; +import org.apache.curator.framework.recipes.shared.VersionedValue; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.zookeeper.StateStorageHelper; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.mesos.Protos; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A ZooKeeper-backed Mesos worker store. + */ +public class ZooKeeperMesosWorkerStore implements MesosWorkerStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperMesosWorkerStore.class); + + private final Object cacheLock = new Object(); --- End diff -- Why do we have to lock? Where does the concurrent access come from? > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424489#comment-15424489 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75119617 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.SharedValue; +import org.apache.curator.framework.recipes.shared.VersionedValue; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.zookeeper.StateStorageHelper; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.mesos.Protos; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A ZooKeeper-backed Mesos worker store. + */ +public class ZooKeeperMesosWorkerStore implements MesosWorkerStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperMesosWorkerStore.class); + + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + /** A persistent value of the assigned framework ID */ + private final SharedValue frameworkIdInZooKeeper; + + /** A persistent count of all tasks created, for generating unique IDs */ + private final SharedCount totalTaskCountInZooKeeper; + + /** A persistent store of serialized workers */ + private final ZooKeeperStateHandleStore workersInZooKeeper; + + @SuppressWarnings("unchecked") + ZooKeeperMesosWorkerStore( + CuratorFramework client, + String storePath, + StateStorageHelper stateStorage + ) throws Exception { + checkNotNull(storePath, "storePath"); + checkNotNull(stateStorage, "stateStorage"); + + // Keep a reference to the original client and not the namespace facade. The namespace + // facade cannot be closed. + this.client = checkNotNull(client, "client"); + + // All operations will have the given path as root + client.newNamespaceAwareEnsurePath(storePath).ensure(client.getZookeeperClient()); + CuratorFramework facade = client.usingNamespace(client.getNamespace() + storePath); + + // Track the assignd framework ID. + frameworkIdInZooKeeper = new SharedValue(facade, "/frameworkId", new byte[0]); + + // Keep a count of all tasks created ever, as the basis for a unique ID. + totalTaskCountInZooKeeper = new SharedCount(facade, "/count", 0); + + // Keep track of the workers in state handle storage. + facade.newNamespaceAwareEnsurePath("/workers").ensure(client.getZookeeperClient()); + CuratorFramework storeFacade =
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15424461#comment-15424461 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75117043 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework.store; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.shared.SharedCount; +import org.apache.curator.framework.recipes.shared.SharedValue; +import org.apache.curator.framework.recipes.shared.VersionedValue; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.runtime.zookeeper.StateStorageHelper; +import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore; +import org.apache.mesos.Protos; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A ZooKeeper-backed Mesos worker store. + */ +public class ZooKeeperMesosWorkerStore implements MesosWorkerStore { + + private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperMesosWorkerStore.class); + + private final Object cacheLock = new Object(); + + /** Client (not a namespace facade) */ + private final CuratorFramework client; + + /** Flag indicating whether this instance is running. */ + private boolean isRunning; + + /** A persistent value of the assigned framework ID */ + private final SharedValue frameworkIdInZooKeeper; + + /** A persistent count of all tasks created, for generating unique IDs */ + private final SharedCount totalTaskCountInZooKeeper; + + /** A persistent store of serialized workers */ + private final ZooKeeperStateHandleStore workersInZooKeeper; + + @SuppressWarnings("unchecked") + ZooKeeperMesosWorkerStore( + CuratorFramework client, + String storePath, + StateStorageHelper stateStorage + ) throws Exception { + checkNotNull(storePath, "storePath"); + checkNotNull(stateStorage, "stateStorage"); + + // Keep a reference to the original client and not the namespace facade. The namespace + // facade cannot be closed. + this.client = checkNotNull(client, "client"); + + // All operations will have the given path as root + client.newNamespaceAwareEnsurePath(storePath).ensure(client.getZookeeperClient()); + CuratorFramework facade = client.usingNamespace(client.getNamespace() + storePath); + + // Track the assignd framework ID. + frameworkIdInZooKeeper = new SharedValue(facade, "/frameworkId", new byte[0]); + + // Keep a count of all tasks created ever, as the basis for a unique ID. + totalTaskCountInZooKeeper = new SharedCount(facade, "/count", 0); + + // Keep track of the workers in state handle storage. + facade.newNamespaceAwareEnsurePath("/workers").ensure(client.getZookeeperClient()); + CuratorFramework storeFacade =