[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user Astralidea commented on a diff in the pull request: https://github.com/apache/flink/pull/948#discussion_r56613503 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/FlinkScheduler.scala --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import java.io.File +import java.util.{List => JList} + +import org.apache.flink.configuration.{Configuration, GlobalConfiguration} +import org.apache.mesos.Protos.TaskState._ +import org.apache.mesos.Protos._ +import org.apache.mesos.{Scheduler, SchedulerDriver} +import org.slf4j.LoggerFactory +import scopt.OptionParser + +import scala.collection.JavaConversions._ + +object FlinkScheduler extends Scheduler with SchedulerUtils { + + val LOG = LoggerFactory.getLogger(FlinkScheduler.getClass) + var jobManager: Option[Thread] = None + var currentConfiguration: Option[Configuration] = None + var taskManagers: Set[RunningTaskManager] = Set() + var taskManagerCount = 0 + // http port where http server is hosting the configuration files + var httpConfigServerAddress: Option[String] = None + + override def offerRescinded(driver: SchedulerDriver, offerId: OfferID): Unit = { } + + override def disconnected(driver: SchedulerDriver): Unit = { } + + override def reregistered(driver: SchedulerDriver, masterInfo: MasterInfo): Unit = { } + + override def slaveLost(driver: SchedulerDriver, slaveId: SlaveID): Unit = { +LOG.warn(s"Slave lost: ${slaveId.getValue}, removing all task managers matching slaveId") +taskManagers = taskManagers.filter(_.slaveId != slaveId) + } + + override def error(driver: SchedulerDriver, message: String): Unit = { } + + override def frameworkMessage(driver: SchedulerDriver, executorId: ExecutorID, +slaveId: SlaveID, data: Array[Byte]): Unit = { } + + override def registered(driver: SchedulerDriver, frameworkId: FrameworkID, + masterInfo: MasterInfo): Unit = { } + + override def executorLost(driver: SchedulerDriver, executorId: ExecutorID, +slaveId: SlaveID, status: Int): Unit = { +LOG.warn(s"Executor ${executorId.getValue} lost with status $status on slave $slaveId") + } + + override def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = { +val taskId = status.getTaskId.getValue +val slaveId = status.getSlaveId.getValue +LOG.info( + s"statusUpdate received from taskId: $taskId slaveId: $slaveId [${status.getState.name()}]") + +status.getState match { + case TASK_FAILED | TASK_FINISHED | TASK_KILLED | TASK_LOST | TASK_ERROR => +LOG.info(s"Lost taskManager with TaskId: $taskId on slave: $slaveId") +taskManagers = taskManagers.filter(_.taskId != status.getTaskId) + case _ => +LOG.debug(s"No action to take for statusUpdate ${status.getState.name()}") +} + } + + override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = { +// we will combine all resources from te same slave and then launch a single task rather +// than one per offer this way we have better utilization and less memory wasted on overhead. +for((slaveId, offers) <- offers.groupBy(_.getSlaveId)) { + val tasks = constructTaskInfoFromOffers(slaveId, offers.toList) + driver.launchTasks(offers.map(_.getId), tasks) +} + } + + def constructTaskInfoFromOffers(slaveId: SlaveID, offers: List[Offer]): Seq[TaskInfo] = { +val maxTaskManagers = GlobalConfiguration.getInteger( + TASK_MANAGER_COUNT_KEY, DEFAULT_TASK_MANAGER_COUNT) +val requiredMem = GlobalConfiguration.getFloat( + TASK_MANAGER_MEM_KEY, DEFAULT_TASK_MANAGER_MEM) +val requiredCPU = GlobalConfiguration.getFloat( +
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user ankurcha closed the pull request at: https://github.com/apache/flink/pull/948 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user uce commented on the pull request: https://github.com/apache/flink/pull/948#issuecomment-195398349 With the recently introduced changes to the resource management in order to better be in line with Mesos' model, I think we can close this PR. Sorry! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/948#issuecomment-148403603 I tried running the code from this pull request again, this time using the `mesos-playa` vagrant image, and it does not work for me. I was following your instructions. When did you test the changes recently? My motivation to test this pull request goes down every time I'm testing it. I've spun up a Mesos cluster on GCE two times, plus the VM now. Maybe I'm doing it wrong, please let me know what I can do to get it to run. CLI output: ``` vagrant@mesos:~/flink/build-target$ java -Dlog4j.configuration=file://`pwd`/conf/log4j.properties -Dlog.file=logs.log -cp lib/flink-dist-0.10-SNAPSHOT.jar org.apache.flink.mesos.scheduler.FlinkScheduler --confDir conf/ I1015 14:05:01.591161 9992 sched.cpp:157] Version: 0.22.1 2015-10-15 14:05:01,592:9991(0x7f67c700):ZOO_INFO@log_env@712: Client environment:zookeeper.version=zookeeper C client 3.4.5 2015-10-15 14:05:01,592:9991(0x7f67c700):ZOO_INFO@log_env@716: Client environment:host.name=mesos 2015-10-15 14:05:01,592:9991(0x7f67c700):ZOO_INFO@log_env@723: Client environment:os.name=Linux 2015-10-15 14:05:01,592:9991(0x7f67c700):ZOO_INFO@log_env@724: Client environment:os.arch=3.16.0-30-generic 2015-10-15 14:05:01,592:9991(0x7f67c700):ZOO_INFO@log_env@725: Client environment:os.version=#40~14.04.1-Ubuntu SMP Thu Jan 15 17:43:14 UTC 2015 2015-10-15 14:05:01,592:9991(0x7f67c700):ZOO_INFO@log_env@733: Client environment:user.name=vagrant 2015-10-15 14:05:01,592:9991(0x7f67c700):ZOO_INFO@log_env@741: Client environment:user.home=/home/vagrant 2015-10-15 14:05:01,592:9991(0x7f67c700):ZOO_INFO@log_env@753: Client environment:user.dir=/home/vagrant/flink/flink-dist/target/flink-0.10-SNAPSHOT-bin/flink-0.10-SNAPSHOT 2015-10-15 14:05:01,592:9991(0x7f67c700):ZOO_INFO@zookeeper_init@786: Initiating client connection, host=127.0.0.1:2181 sessionTimeout=1 watcher=0x7f67dac33a60 sessionId=0 sessionPasswd= context=0x7f67f0004470 flags=0 2015-10-15 14:05:01,592:9991(0x7f67c6ffd700):ZOO_INFO@check_events@1703: initiated connection to server [127.0.0.1:2181] Embedded server listening at http://127.0.0.1:40815 Press any key to stop. 2015-10-15 14:05:04,959:9991(0x7f67c6ffd700):ZOO_INFO@check_events@1750: session establishment complete on server [127.0.0.1:2181], sessionId=0x1506b6312fa000b, negotiated timeout=1 I1015 14:05:04.959841 10024 group.cpp:313] Group process (group(1)@127.0.1.1:57437) connected to ZooKeeper I1015 14:05:04.959899 10024 group.cpp:790] Syncing group operations: queue size (joins, cancels, datas) = (0, 0, 0) I1015 14:05:04.959928 10024 group.cpp:385] Trying to create path '/mesos' in ZooKeeper I1015 14:05:05.204282 10024 detector.cpp:138] Detected a new leader: (id='2') I1015 14:05:05.204489 10024 group.cpp:659] Trying to get '/mesos/info_02' in ZooKeeper I1015 14:05:05.303072 10024 detector.cpp:452] A new leading master (UPID=master@127.0.1.1:5050) is detected I1015 14:05:05.303467 10024 sched.cpp:254] New master detected at master@127.0.1.1:5050 I1015 14:05:05.303890 10024 sched.cpp:264] No credentials provided. Attempting to register without authentication I1015 14:05:05.851562 10024 sched.cpp:448] Framework registered with 20151015-120419-16842879-5050-1244- ``` log file content ``` 14:04:54,564 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14:04:55,763 INFO org.apache.flink.mesos.scheduler.FlinkScheduler$ - 14:04:55,763 INFO org.apache.flink.mesos.scheduler.FlinkScheduler$ - Starting JobManager (Version: 0.10-SNAPSHOT, Rev:d905af0, Date:06.10.2015 @ 19:37:22 UTC) 14:04:55,763 INFO org.apache.flink.mesos.scheduler.FlinkScheduler$ - Current user: vagrant 14:04:55,763 INFO org.apache.flink.mesos.scheduler.FlinkScheduler$ - JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.7/24.79-b02 14:04:55,763 INFO org.apache.flink.mesos.scheduler.FlinkScheduler$ - Maximum heap size: 592 MiBytes 14:04:55,763 INFO org.apache.flink.mesos.scheduler.FlinkScheduler$ - JAVA_HOME: (not set) 14:04:55,823 INFO org.apache.flink.mesos.scheduler.FlinkScheduler$ - Hadoop version: 2.3.0 14:04:55,824 INFO org.apache.flink.mesos.scheduler.FlinkScheduler$ - JVM Options: 14:04:55,824 INFO org.apache.flink.mesos.scheduler.FlinkScheduler$ - -Dlog4j.configuration=file:///home/vagrant/flink/build-target/conf/log4j.properties 14:04:55,824 INFO
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/948#issuecomment-147937031 Thx again for updating the patch, @ankurcha . Apologize for the delay of the review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user ankurcha commented on a diff in the pull request: https://github.com/apache/flink/pull/948#discussion_r41721162 --- Diff: flink-mesos/pom.xml --- @@ -0,0 +1,203 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + flink-parent + org.apache.flink + 0.10-SNAPSHOT + .. + + 4.0.0 + + flink-mesos + flink-mesos + jar + + + 0.23.0 + 0.8.4 + + + + + com.github.scopt + scopt_${scala.binary.version} + + + net.databinder + unfiltered-jetty_${scala.binary.version} --- End diff -- The dependency is for the simple http server for hosting the configuration for the executors. The unfiltered library is under mit license. Sent from my iPhone > On Oct 11, 2015, at 20:28, Henry Saputrawrote: > > In flink-mesos/pom.xml: > > > + flink-mesos > > + jar > > + > > + > > + 0.23.0 > > + 0.8.4 > > + > > + > > + > > + > > + com.github.scopt > > + scopt_${scala.binary.version} > > + > > + > > + net.databinder > > + unfiltered-jetty_${scala.binary.version} > What is this dependency for? And also what is the license for it? > > â > Reply to this email directly or view it on GitHub. > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/948#discussion_r41721026 --- Diff: flink-mesos/pom.xml --- @@ -0,0 +1,203 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + flink-parent + org.apache.flink + 0.10-SNAPSHOT + .. + + 4.0.0 + + flink-mesos + flink-mesos + jar + + + 0.23.0 + 0.8.4 + + + + + com.github.scopt + scopt_${scala.binary.version} + + + net.databinder + unfiltered-jetty_${scala.binary.version} --- End diff -- What is this dependency for? And also what is the license for it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/948#discussion_r41721041 --- Diff: flink-mesos/pom.xml --- @@ -0,0 +1,203 @@ + + +http://maven.apache.org/POM/4.0.0; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + flink-parent + org.apache.flink + 0.10-SNAPSHOT + .. + + 4.0.0 + + flink-mesos + flink-mesos + jar + + + 0.23.0 + 0.8.4 + + + + + com.github.scopt + scopt_${scala.binary.version} + + + net.databinder + unfiltered-jetty_${scala.binary.version} + ${unfiltererd.version} + + + net.databinder + unfiltered-filter_${scala.binary.version} --- End diff -- Same question: What is this dependency for? And also what is the license for it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/948#issuecomment-144858877 Quick comment on the PR, could you add Javadoc header information to give short description on why each new class/ trait is created and what role do they play to support Mesos integration. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/948#discussion_r40970091 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/executor/FlinkExecutor.scala --- @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.executor + +import org.apache.flink.configuration.{Configuration, GlobalConfiguration} +import org.apache.flink.mesos._ +import org.apache.flink.mesos.scheduler._ +import org.apache.flink.runtime.StreamingMode +import org.apache.mesos.Protos._ +import org.apache.mesos.{Executor, ExecutorDriver} + +import scala.util.{Failure, Success, Try} + +trait FlinkExecutor extends Executor { --- End diff -- Could you add Javadoc header to explain why this trait is created and summary of when and where this would be used in the execution? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user hsaputra commented on a diff in the pull request: https://github.com/apache/flink/pull/948#discussion_r40970304 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/executor/TaskManagerExecutor.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.executor + +import scala.util.Try + +import org.apache.flink.configuration.GlobalConfiguration +import org.apache.flink.runtime.StreamingMode +import org.apache.flink.runtime.taskmanager.TaskManager +import org.apache.flink.runtime.util.EnvironmentInformation +import org.apache.mesos.{Executor, ExecutorDriver, MesosExecutorDriver} +import org.apache.mesos.Protos.Status +import org.slf4j.{Logger, LoggerFactory} + +class TaskManagerExecutor extends FlinkExecutor { --- End diff -- Could you add Javadoc header to describe the purpose of this class? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/948#issuecomment-144074252 Thank you for the update @ankurcha. I'm trying to review your PR in the next few days. Sorry for the delay. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user ankurcha commented on the pull request: https://github.com/apache/flink/pull/948#issuecomment-142825340 Hi @rmetzger, This completely got buried in scheduler but i think i have finally worked out the kinks. Thanks for spending the time to try it out. I think we are closer to having this out of the door so have a go at it and let me know if you see something off. ```bash vagrant@mesos:/vagrant$ java -Dlog4j.configuration=file:/vagrant/log4j.properties -cp *.jar org.apache.flink.mesos.scheduler.FlinkScheduler --confDir /vagrant --port 10080 INFO 2015-09-24 06:22:04,584 [main] FlinkScheduler$: INFO 2015-09-24 06:22:04,585 [main] FlinkScheduler$: Starting JobManager (Version: 0.10-SNAPSHOT, Rev:bc21de2, Date:23.09.2015 @ 20:06:33 UTC) INFO 2015-09-24 06:22:04,585 [main] FlinkScheduler$: Current user: vagrant INFO 2015-09-24 06:22:04,585 [main] FlinkScheduler$: JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.7/24.79-b02 INFO 2015-09-24 06:22:04,585 [main] FlinkScheduler$: Maximum heap size: 592 MiBytes INFO 2015-09-24 06:22:04,585 [main] FlinkScheduler$: JAVA_HOME: (not set) INFO 2015-09-24 06:22:04,588 [main] FlinkScheduler$: Hadoop version: 2.3.0 INFO 2015-09-24 06:22:04,588 [main] FlinkScheduler$: JVM Options: INFO 2015-09-24 06:22:04,588 [main] FlinkScheduler$: -Dlog4j.configuration=file:/vagrant/log4j.properties INFO 2015-09-24 06:22:04,588 [main] FlinkScheduler$: Program Arguments: INFO 2015-09-24 06:22:04,588 [main] FlinkScheduler$: --confDir INFO 2015-09-24 06:22:04,588 [main] FlinkScheduler$: /vagrant INFO 2015-09-24 06:22:04,588 [main] FlinkScheduler$: --port INFO 2015-09-24 06:22:04,589 [main] FlinkScheduler$: 10080 INFO 2015-09-24 06:22:04,589 [main] FlinkScheduler$: INFO 2015-09-24 06:22:04,595 [main] FlinkScheduler$: Maximum number of open file descriptors is 4096 INFO 2015-09-24 06:22:04,596 [main] FlinkScheduler$: Loading configuration from /vagrant DEBUG 2015-09-24 06:22:04,665 [main] FlinkScheduler$: Serving configuration via: Some(http://127.0.0.1:10080) INFO 2015-09-24 06:22:04,725 [Thread-3] JobManager: Starting JobManager INFO 2015-09-24 06:22:04,726 [Thread-3] JobManager: Starting JobManager actor system at :6123. I0924 06:22:04.812088 7312 sched.cpp:157] Version: 0.22.1 2015-09-24 06:22:04,815:7311(0x7fcaf8bbb700):ZOO_INFO@log_env@712: Client environment:zookeeper.version=zookeeper C client 3.4.5 2015-09-24 06:22:04,816:7311(0x7fcaf8bbb700):ZOO_INFO@log_env@716: Client environment:host.name=mesos 2015-09-24 06:22:04,818:7311(0x7fcaf8bbb700):ZOO_INFO@log_env@723: Client environment:os.name=Linux 2015-09-24 06:22:04,821:7311(0x7fcaf8bbb700):ZOO_INFO@log_env@724: Client environment:os.arch=3.16.0-30-generic 2015-09-24 06:22:04,822:7311(0x7fcaf8bbb700):ZOO_INFO@log_env@725: Client environment:os.version=#40~14.04.1-Ubuntu SMP Thu Jan 15 17:43:14 UTC 2015 2015-09-24 06:22:04,824:7311(0x7fcaf8bbb700):ZOO_INFO@log_env@733: Client environment:user.name=vagrant 2015-09-24 06:22:04,827:7311(0x7fcaf8bbb700):ZOO_INFO@log_env@741: Client environment:user.home=/home/vagrant 2015-09-24 06:22:04,829:7311(0x7fcaf8bbb700):ZOO_INFO@log_env@753: Client environment:user.dir=/vagrant 2015-09-24 06:22:04,830:7311(0x7fcaf8bbb700):ZOO_INFO@zookeeper_init@786: Initiating client connection, host=127.0.0.1:2181 sessionTimeout=1 watcher=0x7fcb29b99a60 sessionId=0 sessionPasswd= context=0x1c0f350 flags=0 2015-09-24 06:22:04,837:7311(0x7fcaf2ffd700):ZOO_INFO@check_events@1703: initiated connection to server [127.0.0.1:2181] 2015-09-24 06:22:04,839:7311(0x7fcaf2ffd700):ZOO_INFO@check_events@1750: session establishment complete on server [127.0.0.1:2181], sessionId=0x14ffdcf98d00015, negotiated timeout=1 I0924 06:22:04.840190 7333 group.cpp:313] Group process (group(1)@127.0.1.1:36068) connected to ZooKeeper I0924 06:22:04.840239 7333 group.cpp:790] Syncing group operations: queue size (joins, cancels, datas) = (0, 0, 0) I0924 06:22:04.840265 7333 group.cpp:385] Trying to create path '/mesos' in ZooKeeper I0924 06:22:04.843736 7333 detector.cpp:138] Detected a new leader: (id='0') I0924 06:22:04.843868 7331 group.cpp:659] Trying to get '/mesos/info_00' in ZooKeeper I0924 06:22:04.844832 7331 detector.cpp:452] A new leading master (UPID=master@127.0.1.1:5050) is detected I0924 06:22:04.844923 7331 sched.cpp:254] New master detected at master@127.0.1.1:5050 I0924 06:22:04.845263 7331 sched.cpp:264] No credentials provided. Attempting to register without authentication I0924 06:22:04.847537 7331 sched.cpp:448] Framework registered
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/948#discussion_r38957167 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/SchedulerUtils.scala --- @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import java.util.{List => JList} + +import scala.collection.JavaConversions._ +import scala.util.{Failure, Success, Try} +import scala.util.control.NonFatal + +import com.google.common.base.Splitter +import com.google.protobuf.{ByteString, GeneratedMessage} +import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} +import org.apache.flink.configuration.ConfigConstants._ +import org.apache.flink.mesos._ +import org.apache.flink.mesos.scheduler.FlinkScheduler._ +import org.apache.flink.runtime.StreamingMode +import org.apache.flink.runtime.jobmanager.{JobManager, JobManagerMode} +import org.apache.flink.runtime.util.EnvironmentInformation +import org.apache.mesos.{MesosSchedulerDriver, Scheduler, SchedulerDriver} +import org.apache.mesos.Protos._ +import org.apache.mesos.Protos.CommandInfo.URI +import org.apache.mesos.Protos.Value.Ranges +import org.apache.mesos.Protos.Value.Type._ + +/** + * This code is borrowed and inspired from Apache Spark Project: + * core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala + */ +trait SchedulerUtils { + + /** + * Converts the attributes from the resource offer into a Map of name -> Attribute Value + * The attribute values are the mesos attribute types and they are + * @param offerAttributes List of attributes sent with an offer + * @return + */ + def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = { +offerAttributes.map(attr => { + val attrValue = attr.getType match { +case SCALAR => attr.getScalar +case Value.Type.RANGES => attr.getRanges +case Value.Type.SET => attr.getSet +case Value.Type.TEXT => attr.getText + } + (attr.getName, attrValue) +}).toMap + } + + def createJavaExecCommand(jvmArgs: String = "", classPath: String = "flink-*.jar", +classToExecute: String, args: String = ""): String = { +s"env; java $jvmArgs -cp $classPath $classToExecute $args" + } + + def createExecutorInfo(id: String, role: String, artifactURIs: Set[String], command: String, + nativeLibPath: String): ExecutorInfo = { +val uris = artifactURIs.map(uri => URI.newBuilder().setValue(uri).build()) +ExecutorInfo.newBuilder() + .setExecutorId(ExecutorID +.newBuilder() +.setValue(s"executor_$id")) + .setName(s"Apache Flink Mesos Executor - $id") + .setCommand(CommandInfo.newBuilder() +.setValue(s"env; $command") +.addAllUris(uris) +.setEnvironment(Environment.newBuilder() + .addVariables(Environment.Variable.newBuilder() +.setName("MESOS_NATIVE_JAVA_LIBRARY").setValue(nativeLibPath))) +.setValue(command)) + .build() + } + + def createTaskInfo(taskName: String, taskId: TaskID, slaveID: SlaveID, role: String, mem: Double, + cpus: Double, disk: Double, ports: Set[Int], executorInfo: ExecutorInfo, + conf: Configuration): TaskInfo = { + +val portRanges = Ranges.newBuilder().addAllRange( + ports.map(port => Value.Range.newBuilder().setBegin(port).setEnd(port).build())).build() + +val taskConf = conf.clone() +val portsSeq = ports.toSeq +// set task manager ports +taskConf.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, portsSeq.get(0)) +taskConf.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/948#issuecomment-138640533 I'm trying out the code again in GCE ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/948#discussion_r38958292 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/SchedulerUtils.scala --- @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import java.util.{List => JList} + +import scala.collection.JavaConversions._ +import scala.util.{Failure, Success, Try} +import scala.util.control.NonFatal + +import com.google.common.base.Splitter +import com.google.protobuf.{ByteString, GeneratedMessage} +import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} +import org.apache.flink.configuration.ConfigConstants._ +import org.apache.flink.mesos._ +import org.apache.flink.mesos.scheduler.FlinkScheduler._ +import org.apache.flink.runtime.StreamingMode +import org.apache.flink.runtime.jobmanager.{JobManager, JobManagerMode} +import org.apache.flink.runtime.util.EnvironmentInformation +import org.apache.mesos.{MesosSchedulerDriver, Scheduler, SchedulerDriver} +import org.apache.mesos.Protos._ +import org.apache.mesos.Protos.CommandInfo.URI +import org.apache.mesos.Protos.Value.Ranges +import org.apache.mesos.Protos.Value.Type._ + +/** + * This code is borrowed and inspired from Apache Spark Project: + * core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala + */ +trait SchedulerUtils { + + /** + * Converts the attributes from the resource offer into a Map of name -> Attribute Value + * The attribute values are the mesos attribute types and they are + * @param offerAttributes List of attributes sent with an offer + * @return + */ + def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = { +offerAttributes.map(attr => { + val attrValue = attr.getType match { +case SCALAR => attr.getScalar +case Value.Type.RANGES => attr.getRanges +case Value.Type.SET => attr.getSet +case Value.Type.TEXT => attr.getText + } + (attr.getName, attrValue) +}).toMap + } + + def createJavaExecCommand(jvmArgs: String = "", classPath: String = "flink-*.jar", +classToExecute: String, args: String = ""): String = { +s"env; java $jvmArgs -cp $classPath $classToExecute $args" + } + + def createExecutorInfo(id: String, role: String, artifactURIs: Set[String], command: String, + nativeLibPath: String): ExecutorInfo = { +val uris = artifactURIs.map(uri => URI.newBuilder().setValue(uri).build()) +ExecutorInfo.newBuilder() + .setExecutorId(ExecutorID +.newBuilder() +.setValue(s"executor_$id")) + .setName(s"Apache Flink Mesos Executor - $id") + .setCommand(CommandInfo.newBuilder() +.setValue(s"env; $command") +.addAllUris(uris) +.setEnvironment(Environment.newBuilder() + .addVariables(Environment.Variable.newBuilder() +.setName("MESOS_NATIVE_JAVA_LIBRARY").setValue(nativeLibPath))) +.setValue(command)) + .build() + } + + def createTaskInfo(taskName: String, taskId: TaskID, slaveID: SlaveID, role: String, mem: Double, + cpus: Double, disk: Double, ports: Set[Int], executorInfo: ExecutorInfo, + conf: Configuration): TaskInfo = { + +val portRanges = Ranges.newBuilder().addAllRange( + ports.map(port => Value.Range.newBuilder().setBegin(port).setEnd(port).build())).build() + +val taskConf = conf.clone() +val portsSeq = ports.toSeq +// set task manager ports +taskConf.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, portsSeq.get(0)) +taskConf.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/948#issuecomment-138649089 it seems that the taskmanagers are failing and the scheduler keeps scheduling new ones: http://i.imgur.com/iZiYa4u.png --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user ankurcha commented on the pull request: https://github.com/apache/flink/pull/948#issuecomment-138417721 @rmetzger This PR should be good to test. I have embedded a simple http server that can serve the `log4j.configuration=...` file or the a default file to the task managers. I recommend using `mvn -P include-mesos clean package` in flink dist followed by something similar to: ```bash java -Dlog4j.configuration=/Users/achauhan/Projects/flink/flink-dist/target/flink-0.10-SNAPSHOT-bin/flink-0.10-SNAPSHOT/conf/log4j.properties -cp flink-dist-0.10-SNAPSHOT.jar org.apache.flink.mesos.scheduler.FlinkScheduler --confDir /Users/achauhan/Projects/flink/flink-dist/target/flink-0.10-SNAPSHOT-bin/flink-0.10-SNAPSHOT/conf ``` to start the job manager. The `--confDir` path must contain correct configuration etc. Please let me know if you face any problems. I have tried it using [playa-mesos](https://github.com/mesosphere/playa-mesos). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/948#issuecomment-136377793 Thank you for the response. I was asking because I think its a requirement that the JobManager is running as a mesos task in the cluster as well. But as far as I understood your answer (I'm really not a Mesos expert) that is the case. Did you also address the issues I had while deploying Flink on Mesos? Let me know when the PR is ready for a test drive again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user ankurcha commented on a diff in the pull request: https://github.com/apache/flink/pull/948#discussion_r38281174 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/SchedulerUtils.scala --- @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import java.util.{List = JList} + +import scala.collection.JavaConversions._ +import scala.util.{Failure, Success, Try} +import scala.util.control.NonFatal + +import com.google.common.base.Splitter +import com.google.protobuf.{ByteString, GeneratedMessage} +import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} +import org.apache.flink.configuration.ConfigConstants._ +import org.apache.flink.mesos._ +import org.apache.flink.mesos.scheduler.FlinkScheduler._ +import org.apache.flink.runtime.StreamingMode +import org.apache.flink.runtime.jobmanager.{JobManager, JobManagerMode} +import org.apache.flink.runtime.util.EnvironmentInformation +import org.apache.mesos.{MesosSchedulerDriver, Scheduler, SchedulerDriver} +import org.apache.mesos.Protos._ +import org.apache.mesos.Protos.CommandInfo.URI +import org.apache.mesos.Protos.Value.Ranges +import org.apache.mesos.Protos.Value.Type._ + +trait SchedulerUtils { --- End diff -- I have addressed this in the latest set of changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user ankurcha commented on the pull request: https://github.com/apache/flink/pull/948#issuecomment-136236719 @rmetzger I have finally got some time to work on this again. Let me address your question one by one: Why did you decide to start the JobManager alongside the Scheduler? This is basically a easy first step way of getting things running the way it was done in a whole bunch of projects. The easiest way to run a single master + multiple worker application is to make the scheduler run the master process and have another meta-framework such as marathon submit the whole framework as a task to the mesos server. In the lack of marathon or aurora etc, mesos-submit ( an app that ships with mesos) can be used to submit the scheduler as a task. This means the job manager + scheduler would be running in the mesos cluster submitted as an app (just like in YARN). My eventual goal is to make the scheduler support a completely standalone mode of operation but that requires coordination in order to assure that only one scheduler instance exists at a time - this may have some hooks that can be a part of the HA job manager initiative. Tests I am working on some docker and vagrant based scripts that can make the setup part of the tests more palatable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user ankurcha commented on the pull request: https://github.com/apache/flink/pull/948#issuecomment-132529165 Thanks all for the comments. I am out for MesosCon and work this week. I'll try to address the feedback and push some changes later this weekend. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user uce commented on the pull request: https://github.com/apache/flink/pull/948#issuecomment-132503290 Sorry for not getting back on time @ankurcha. Robert did a good review in the meantime. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/948#issuecomment-131796852 Hi @ankurcha, I've started a Mesos cluster on Google Compute Engine to try out your pull request. I've used this configuration: ``` flink.mesos.master: zk://127.0.0.1:2181/mesos flink.uberjar.location: hdfs:///user/jclouds/flink-dist-0.10-SNAPSHOT.jar flink.mesos.taskmanagers.mem: 512 flink.mesos.taskmanagers.cpu: 0.5 taskmanager.logging.level: INFO streamingMode: streaming jobmanager.web.port: 8081 webclient.port: 8080 ``` But I'm getting this error ``` Exception in thread main java.lang.NullPointerException at org.apache.flink.mesos.scheduler.SchedulerUtils$class.createFrameworkInfoAndCredentials(SchedulerUtils.scala:255) at org.apache.flink.mesos.scheduler.FlinkScheduler$.createFrameworkInfoAndCredentials(FlinkScheduler.scala:31) at org.apache.flink.mesos.scheduler.FlinkScheduler$.main(FlinkScheduler.scala:183) at org.apache.flink.mesos.scheduler.FlinkScheduler.main(FlinkScheduler.scala) ``` I'll further investigate the issue. *Why did you decide to start the JobManager alongside the Scheduler?* For Flink on YARN, we are starting the JobManager in a separate container. There is a lot of communication going on between the JobManager and TaskManagers, also, we need to ensure that the TaskManagers are able to reach the JM. I think we can safely assume that containers can always communicate among each other ... I'm not so sure about Mesos clients and cluster containers. The mesos scheduler is not HA and should be used with marathon or similar service to ensure that there is always one instance running. This may be addressed in future patches. Would you start the mesos scheduler on the client machine or inside the cluster, using a container? Whats the typical deployment model for Mesos? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/948#issuecomment-131812796 It seems that you are ignoring methods such as `error(driver: SchedulerDriver, message: String)` or `frameworkMessage()`. Are they application specific (e.g. send by our scheduler) or are they receiving events by Mesos? I think we should not ignore these events. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/948#discussion_r37184472 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/executor/FlinkExecutor.scala --- @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.executor + +import scala.util.{Failure, Success, Try} + +import org.apache.flink.configuration.{Configuration, GlobalConfiguration} +import org.apache.flink.mesos._ +import org.apache.flink.mesos.scheduler._ +import org.apache.flink.runtime.StreamingMode +import org.apache.log4j.{ConsoleAppender, Level, Logger = ApacheLogger, PatternLayout} +import org.apache.mesos.{Executor, ExecutorDriver} +import org.apache.mesos.Protos._ + +trait FlinkExecutor extends Executor { + // logger to use + def LOG: org.slf4j.Logger + + var currentRunningTaskId: Option[TaskID] = None + val TASK_MANAGER_LOGGING_LEVEL_KEY = taskmanager.logging.level + val DEFAULT_TASK_MANAGER_LOGGING_LEVEL = INFO + + + // methods that defines how the task is started when a launchTask is sent + def startTask(streamingMode: StreamingMode): Try[Unit] + + var thread: Option[Thread] = None + var slaveId: Option[SlaveID] = None + + override def shutdown(driver: ExecutorDriver): Unit = { +LOG.info(Killing taskManager thread) +// kill task manager thread +for (t - thread) { + t.stop() +} + +// exit +sys.exit(0) + } + + override def disconnected(driver: ExecutorDriver): Unit = {} + + override def killTask(driver: ExecutorDriver, taskId: TaskID): Unit = { +for (t - thread) { + LOG.info(sKilling task : ${taskId.getValue}) + thread = None + currentRunningTaskId = None + + // stop running thread + t.stop() + + // Send the TASK_FINISHED status + driver.sendStatusUpdate(TaskStatus.newBuilder() +.setTaskId(taskId) +.setState(TaskState.TASK_FINISHED) +.build()) +} + } + + + override def error(driver: ExecutorDriver, message: String): Unit = {} + + override def frameworkMessage(driver: ExecutorDriver, data: Array[Byte]): Unit = {} + + override def registered(driver: ExecutorDriver, executorInfo: ExecutorInfo, + frameworkInfo: FrameworkInfo, slaveInfo: SlaveInfo): Unit = { +LOG.info(s${executorInfo.getName} was registered on slave: ${slaveInfo.getHostname}) +slaveId = Some(slaveInfo.getId) +// get the configuration passed to it +if (executorInfo.hasData) { + val newConfig: Configuration = Utils.deserialize(executorInfo.getData.toByteArray) + GlobalConfiguration.includeConfiguration(newConfig) +} +LOG.debug(Loaded configuration: {}, GlobalConfiguration.getConfiguration) + } + + + override def reregistered(driver: ExecutorDriver, slaveInfo: SlaveInfo): Unit = { +slaveId = Some(slaveInfo.getId) + } + + + override def launchTask(driver: ExecutorDriver, task: TaskInfo): Unit = { +// overlay the new config over this one +val taskConf: Configuration = Utils.deserialize(task.getData.toByteArray) +GlobalConfiguration.includeConfiguration(taskConf) + +// reconfigure log4j +val logLevel = GlobalConfiguration.getString( + TASK_MANAGER_LOGGING_LEVEL_KEY, DEFAULT_TASK_MANAGER_LOGGING_LEVEL) + +initializeLog4j(Level.toLevel(logLevel, Level.DEBUG)) + +// get streaming mode +val streamingMode = getStreamingMode() + +// create the thread +val t = createThread(driver, task.getTaskId, streamingMode) +thread = Some(t) +t.start() + +// send message +driver.sendStatusUpdate(TaskStatus.newBuilder() + .setTaskId(task.getTaskId) + .setState(TaskState.TASK_RUNNING) +
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/948#discussion_r37185622 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/SchedulerUtils.scala --- @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.scheduler + +import java.util.{List = JList} + +import scala.collection.JavaConversions._ +import scala.util.{Failure, Success, Try} +import scala.util.control.NonFatal + +import com.google.common.base.Splitter +import com.google.protobuf.{ByteString, GeneratedMessage} +import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} +import org.apache.flink.configuration.ConfigConstants._ +import org.apache.flink.mesos._ +import org.apache.flink.mesos.scheduler.FlinkScheduler._ +import org.apache.flink.runtime.StreamingMode +import org.apache.flink.runtime.jobmanager.{JobManager, JobManagerMode} +import org.apache.flink.runtime.util.EnvironmentInformation +import org.apache.mesos.{MesosSchedulerDriver, Scheduler, SchedulerDriver} +import org.apache.mesos.Protos._ +import org.apache.mesos.Protos.CommandInfo.URI +import org.apache.mesos.Protos.Value.Ranges +import org.apache.mesos.Protos.Value.Type._ + +trait SchedulerUtils { --- End diff -- Some code in this trait have been copied from the Spark sources, right? We should state this at least in the scaladocs of the trait. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/948#discussion_r37183905 --- Diff: flink-mesos/pom.xml --- @@ -0,0 +1,188 @@ +?xml version=1.0 encoding=UTF-8? +!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +License); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +-- +project xmlns=http://maven.apache.org/POM/4.0.0; +xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance; +xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd; + parent + artifactIdflink-parent/artifactId + groupIdorg.apache.flink/groupId + version0.10-SNAPSHOT/version + relativePath../relativePath + /parent + modelVersion4.0.0/modelVersion + + artifactIdflink-mesos/artifactId + nameflink-mesos/name + packagingjar/packaging + + properties + mesos.version0.22.1/mesos.version + /properties + + dependencies + dependency + groupIdorg.rogach/groupId + artifactIdscallop_${scala.binary.version}/artifactId + version0.9.5/version --- End diff -- Looks like you are introducing a new CLI parsing library dependency to Flink. There is actually a JIRA in Flink to reduce the number of those libaries: https://issues.apache.org/jira/browse/FLINK-1347 Since its probably just a few lines of code, can you try to use the same parsing library we are already using? I think for other Scala parts in Flink, we are using ``` dependency groupIdcom.github.scopt/groupId artifactIdscopt_${scala.binary.version}/artifactId exclusions exclusion groupIdorg.scala-lang/groupId artifactIdscala-library/artifactId /exclusion /exclusions /dependency ``` (see flink-runtime/pom.xml) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/948#issuecomment-131803783 For the tests: For HDFS, Kafka and YARN, we are using MiniClusters which start all needed services in one JVM. I don't think we can do something similar with Mesos because its written in C++ and there doesn't seem to be an implementation for Java. Other JVM-based projects integrating with Mesos (https://github.com/mesosphere/cassandra-mesos, Spark, ...) also don't have these kind of end-to-end integration tests. I suspect this project requires at least a local docker setup: https://github.com/ContainerSolutions/mini-mesos But we could use something like this to provide a set of tests we can start manually locally, for example before releasing or when somebody is contributing a change to mesos --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user uce commented on the pull request: https://github.com/apache/flink/pull/948#issuecomment-129785986 Thanks for the reply at the mailing list. I will try out your PR this week and have a look at the code. Sorry for the delay. I needed to clear some more time, because it is a big addition. :) Thanks for all your effort. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user ankurcha commented on the pull request: https://github.com/apache/flink/pull/948#issuecomment-129765378 @StephanEwen Thanks for the pointer, I replied on the mailing list thread. Any code-review comments for this pull request? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/948#issuecomment-127048248 Here is the thread where the discussion started: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-td7153.html#a7166 I would personally very much like to see a properly abstracted ResourceManager backend. It would make it much easier to add functionality like dynamic allocation and release across backends. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user ankurcha commented on the pull request: https://github.com/apache/flink/pull/948#issuecomment-126522087 Clearly i don't know how to add stuff correctly to the .travis file. If I could get some help with that, it'll be great. @uce @StephanEwen - I haven't seen a resource manager thread, but yes while developing this feature i did look a lot at the spark SchedulerBackend and may have some thoughts if you could point me to the relevant document / thread. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
Github user uce commented on the pull request: https://github.com/apache/flink/pull/948#issuecomment-125782171 Hey Ankur! Welcome to the Flink community. I didn't have a proper look at your PR yet, but regarding the question you've asked in the issue: @StephanEwen kicked off a discussion recently about abstracting the resource manager behind an interface to get overlap between YARN and Mesos. I don't know if you had a look at the YARN integration, but maybe you would like to be part of that discussion as well. In any case your PR will probably help the discussion in any case. ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos
GitHub user ankurcha opened a pull request: https://github.com/apache/flink/pull/948 [FLINK-1984] Integrate Flink with Apache Mesos This pull requests adds a mesos scheduler and an executor (inspired from the work done in, now abandoned, PR #251). The highlights are as follows: * The mesos scheduler is starts a jobManager in a parallel thread based on the configuration provided using `--configDir` argument. * The mesos scheduler is not HA and should be used with marathon or similar service to ensure that there is always one instance running. This may be addressed in future patches. * The mesos scheduler uses the some new properties which can be set using the `conf/flink-conf.yaml`. The configuration directory can be specified using the `--confDir` command line argument and a list of configuration values is present in `flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/package.scala`. Example usage: ```bash mvn clean package -DskipTests -Pinclude-mesos java -Dlog4j.configuration=file:/vagrant/log4j.properties -cp *.jar org.apache.flink.mesos.scheduler.FlinkScheduler --conf-dir /vagrant ``` ```yaml # flink-conf.yaml flink.mesos.master: zk://127.0.0.1:2181/mesos flink.uberjar.location: file:///vagrant/flink-dist-0.10-SNAPSHOT.jar flink.mesos.taskmanagers.mem: 512 flink.mesos.taskmanagers.cpu: 0.5 taskmanager.logging.level: INFO streamingMode: streaming jobmanager.web.port: 8081 webclient.port: 8080 ``` ``` # log4j.properties log4j.rootLogger=INFO, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%-5p %d{ISO8601} [%t] %c{1}: %m%n # suppress the warning that hadoop native libraries are not loaded (irrelevant for the client) log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF log4j.logger.org.apache.flink.mesos=DEBUG # suppress the irrelevant (wrong) warnings from the netty channel handler log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console ``` An easy way to test this is using https://github.com/mesosphere/playa-mesos to start a mesos cluster in virtualbox (using vagrant) and copying the uberjar to /vagrant and running the above command. The job manager web UI url is set as the mesos framework url. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ankurcha/flink flink-mesos Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/948.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #948 commit 8a9e3791a2c3946f1adefe64845f16f89e092c77 Author: Ankur Chauhan achau...@brightcove.com Date: 2015-07-20T08:45:41Z wip commit a937ff65d72a938814162de1e9d97640c566b3f1 Author: Ankur Chauhan achau...@brightcove.com Date: 2015-07-25T19:18:42Z WIP - flink mesos integration initial commit commit 3f0237adb7b1f45a1fd80e025aa5b511f92a209c Author: Ankur Chauhan achau...@brightcove.com Date: 2015-07-25T19:33:57Z Add todo for config changes/issues commit d9c29e2a3174cb90626c9deccc1604e0e010d965 Author: Ankur Chauhan achau...@brightcove.com Date: 2015-07-27T00:03:12Z Fix some stuff commit 9910f50c11c81631ff8bb351cdf394cda753ff98 Author: Ankur Chauhan achau...@brightcove.com Date: 2015-07-28T01:41:20Z Add flink-mesos to dist, update scheduler and executor code commit 4f2d8fed5b2bf8f49eb4da7cd148e83016206330 Author: Ankur Chauhan achau...@brightcove.com Date: 2015-07-28T03:14:16Z Clean up code + codestyle changes commit b1613a965a59f89aa2e9c96dcec52fc40847423a Author: Ankur Chauhan achau...@brightcove.com Date: 2015-07-28T07:12:55Z Clean up executor, adjust resource defaults commit 5d39b1524b9ec565c5be28abcd4f80fab928042b Author: Ankur Chauhan achau...@brightcove.com Date: 2015-07-28T20:56:24Z Remove status pinging and fix logging level in TaskManagerExecutor commit 3331596c957ad774ffca31bb2c959b22422560d2 Author: Ankur Chauhan achau...@brightcove.com Date: 2015-07-28T22:33:54Z Code cleanup and refactoring of the conf classes commit 33f3597f1c2aa0c7582025d9c242d4aafd9cfa63 Author: Ankur Chauhan achau...@brightcove.com Date: 2015-07-28T22:36:04Z Remove unused dependencies and plugins commit 36846a95a64b9700d37954a2d0176b222b6adbe0 Author: Ankur Chauhan achau...@brightcove.com Date: 2015-07-28T22:40:50Z Use tab based indents in pom files commit 52904c8593377c349bccf599b863f2a40df0f562 Author: Ankur Chauhan achau...@brightcove.com Date: 2015-07-28T23:06:30Z Add comments for each of the properties --- If your project is set up for it, you can reply