[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2016-03-19 Thread Astralidea
Github user Astralidea commented on a diff in the pull request:

https://github.com/apache/flink/pull/948#discussion_r56613503
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/FlinkScheduler.scala
 ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import java.io.File
+import java.util.{List => JList}
+
+import org.apache.flink.configuration.{Configuration, GlobalConfiguration}
+import org.apache.mesos.Protos.TaskState._
+import org.apache.mesos.Protos._
+import org.apache.mesos.{Scheduler, SchedulerDriver}
+import org.slf4j.LoggerFactory
+import scopt.OptionParser
+
+import scala.collection.JavaConversions._
+
+object FlinkScheduler extends Scheduler with SchedulerUtils {
+
+  val LOG = LoggerFactory.getLogger(FlinkScheduler.getClass)
+  var jobManager: Option[Thread] = None
+  var currentConfiguration: Option[Configuration] = None
+  var taskManagers: Set[RunningTaskManager] = Set()
+  var taskManagerCount = 0
+  // http port where http server is hosting the configuration files
+  var httpConfigServerAddress: Option[String] = None
+
+  override def offerRescinded(driver: SchedulerDriver, offerId: OfferID): 
Unit = { }
+
+  override def disconnected(driver: SchedulerDriver): Unit = { }
+
+  override def reregistered(driver: SchedulerDriver, masterInfo: 
MasterInfo): Unit = { }
+
+  override def slaveLost(driver: SchedulerDriver, slaveId: SlaveID): Unit 
= {
+LOG.warn(s"Slave lost: ${slaveId.getValue}, removing all task managers 
matching slaveId")
+taskManagers = taskManagers.filter(_.slaveId != slaveId)
+  }
+
+  override def error(driver: SchedulerDriver, message: String): Unit = { }
+
+  override def frameworkMessage(driver: SchedulerDriver, executorId: 
ExecutorID,
+slaveId: SlaveID, data: Array[Byte]): Unit 
= { }
+
+  override def registered(driver: SchedulerDriver, frameworkId: 
FrameworkID,
+  masterInfo: MasterInfo): Unit = { }
+
+  override def executorLost(driver: SchedulerDriver, executorId: 
ExecutorID,
+slaveId: SlaveID, status: Int): Unit = {
+LOG.warn(s"Executor ${executorId.getValue} lost with status $status on 
slave $slaveId")
+  }
+
+  override def statusUpdate(driver: SchedulerDriver, status: TaskStatus): 
Unit = {
+val taskId = status.getTaskId.getValue
+val slaveId = status.getSlaveId.getValue
+LOG.info(
+  s"statusUpdate received from taskId: $taskId slaveId: $slaveId 
[${status.getState.name()}]")
+
+status.getState match {
+  case TASK_FAILED | TASK_FINISHED | TASK_KILLED | TASK_LOST | 
TASK_ERROR =>
+LOG.info(s"Lost taskManager with TaskId: $taskId on slave: 
$slaveId")
+taskManagers = taskManagers.filter(_.taskId != status.getTaskId)
+  case _ =>
+LOG.debug(s"No action to take for statusUpdate 
${status.getState.name()}")
+}
+  }
+
+  override def resourceOffers(driver: SchedulerDriver, offers: 
JList[Offer]): Unit = {
+// we will combine all resources from te same slave and then launch a 
single task rather
+// than one per offer this way we have better utilization and less 
memory wasted on overhead.
+for((slaveId, offers) <- offers.groupBy(_.getSlaveId)) {
+  val tasks = constructTaskInfoFromOffers(slaveId, offers.toList)
+  driver.launchTasks(offers.map(_.getId), tasks)
+}
+  }
+
+  def constructTaskInfoFromOffers(slaveId: SlaveID, offers: List[Offer]): 
Seq[TaskInfo] = {
+val maxTaskManagers = GlobalConfiguration.getInteger(
+  TASK_MANAGER_COUNT_KEY, DEFAULT_TASK_MANAGER_COUNT)
+val requiredMem = GlobalConfiguration.getFloat(
+  TASK_MANAGER_MEM_KEY, DEFAULT_TASK_MANAGER_MEM)
+val requiredCPU = GlobalConfiguration.getFloat(
+  

[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2016-03-18 Thread ankurcha
Github user ankurcha closed the pull request at:

https://github.com/apache/flink/pull/948


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2016-03-11 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-195398349
  
With the recently introduced changes to the resource management in order to 
better be in line with Mesos' model, I think we can close this PR. Sorry! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-10-15 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-148403603
  
I tried running the code from this pull request again, this time using the 
`mesos-playa` vagrant image, and it does not work for me.
I was following your instructions.

When did you test the changes recently?
My motivation to test this pull request goes down every time I'm testing 
it. I've spun up a Mesos cluster on GCE two times, plus the VM now.
Maybe I'm doing it wrong, please let me know what I can do to get it to run.

CLI output:
```
vagrant@mesos:~/flink/build-target$ java 
-Dlog4j.configuration=file://`pwd`/conf/log4j.properties -Dlog.file=logs.log 
-cp lib/flink-dist-0.10-SNAPSHOT.jar 
org.apache.flink.mesos.scheduler.FlinkScheduler --confDir conf/
I1015 14:05:01.591161  9992 sched.cpp:157] Version: 0.22.1
2015-10-15 14:05:01,592:9991(0x7f67c700):ZOO_INFO@log_env@712: Client 
environment:zookeeper.version=zookeeper C client 3.4.5
2015-10-15 14:05:01,592:9991(0x7f67c700):ZOO_INFO@log_env@716: Client 
environment:host.name=mesos
2015-10-15 14:05:01,592:9991(0x7f67c700):ZOO_INFO@log_env@723: Client 
environment:os.name=Linux
2015-10-15 14:05:01,592:9991(0x7f67c700):ZOO_INFO@log_env@724: Client 
environment:os.arch=3.16.0-30-generic
2015-10-15 14:05:01,592:9991(0x7f67c700):ZOO_INFO@log_env@725: Client 
environment:os.version=#40~14.04.1-Ubuntu SMP Thu Jan 15 17:43:14 UTC 2015
2015-10-15 14:05:01,592:9991(0x7f67c700):ZOO_INFO@log_env@733: Client 
environment:user.name=vagrant
2015-10-15 14:05:01,592:9991(0x7f67c700):ZOO_INFO@log_env@741: Client 
environment:user.home=/home/vagrant
2015-10-15 14:05:01,592:9991(0x7f67c700):ZOO_INFO@log_env@753: Client 
environment:user.dir=/home/vagrant/flink/flink-dist/target/flink-0.10-SNAPSHOT-bin/flink-0.10-SNAPSHOT
2015-10-15 14:05:01,592:9991(0x7f67c700):ZOO_INFO@zookeeper_init@786: 
Initiating client connection, host=127.0.0.1:2181 sessionTimeout=1 
watcher=0x7f67dac33a60 sessionId=0 sessionPasswd= context=0x7f67f0004470 
flags=0
2015-10-15 14:05:01,592:9991(0x7f67c6ffd700):ZOO_INFO@check_events@1703: 
initiated connection to server [127.0.0.1:2181]
Embedded server listening at
  http://127.0.0.1:40815
Press any key to stop.
2015-10-15 14:05:04,959:9991(0x7f67c6ffd700):ZOO_INFO@check_events@1750: 
session establishment complete on server [127.0.0.1:2181], 
sessionId=0x1506b6312fa000b, negotiated timeout=1
I1015 14:05:04.959841 10024 group.cpp:313] Group process 
(group(1)@127.0.1.1:57437) connected to ZooKeeper
I1015 14:05:04.959899 10024 group.cpp:790] Syncing group operations: queue 
size (joins, cancels, datas) = (0, 0, 0)
I1015 14:05:04.959928 10024 group.cpp:385] Trying to create path '/mesos' 
in ZooKeeper
I1015 14:05:05.204282 10024 detector.cpp:138] Detected a new leader: 
(id='2')
I1015 14:05:05.204489 10024 group.cpp:659] Trying to get 
'/mesos/info_02' in ZooKeeper
I1015 14:05:05.303072 10024 detector.cpp:452] A new leading master 
(UPID=master@127.0.1.1:5050) is detected
I1015 14:05:05.303467 10024 sched.cpp:254] New master detected at 
master@127.0.1.1:5050
I1015 14:05:05.303890 10024 sched.cpp:264] No credentials provided. 
Attempting to register without authentication
I1015 14:05:05.851562 10024 sched.cpp:448] Framework registered with 
20151015-120419-16842879-5050-1244-
```

log file content
```
14:04:54,564 WARN  org.apache.hadoop.util.NativeCodeLoader  
 - Unable to load native-hadoop library for your platform... using 
builtin-java classes where applicable
14:04:55,763 INFO  org.apache.flink.mesos.scheduler.FlinkScheduler$ 
 - 

14:04:55,763 INFO  org.apache.flink.mesos.scheduler.FlinkScheduler$ 
 -  Starting JobManager (Version: 0.10-SNAPSHOT, Rev:d905af0, 
Date:06.10.2015 @ 19:37:22 UTC)
14:04:55,763 INFO  org.apache.flink.mesos.scheduler.FlinkScheduler$ 
 -  Current user: vagrant
14:04:55,763 INFO  org.apache.flink.mesos.scheduler.FlinkScheduler$ 
 -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.7/24.79-b02
14:04:55,763 INFO  org.apache.flink.mesos.scheduler.FlinkScheduler$ 
 -  Maximum heap size: 592 MiBytes
14:04:55,763 INFO  org.apache.flink.mesos.scheduler.FlinkScheduler$ 
 -  JAVA_HOME: (not set)
14:04:55,823 INFO  org.apache.flink.mesos.scheduler.FlinkScheduler$ 
 -  Hadoop version: 2.3.0
14:04:55,824 INFO  org.apache.flink.mesos.scheduler.FlinkScheduler$ 
 -  JVM Options:
14:04:55,824 INFO  org.apache.flink.mesos.scheduler.FlinkScheduler$ 
 - 
-Dlog4j.configuration=file:///home/vagrant/flink/build-target/conf/log4j.properties
14:04:55,824 INFO  

[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-10-13 Thread hsaputra
Github user hsaputra commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-147937031
  
Thx again for updating the patch, @ankurcha . Apologize for the delay of 
the review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-10-11 Thread ankurcha
Github user ankurcha commented on a diff in the pull request:

https://github.com/apache/flink/pull/948#discussion_r41721162
  
--- Diff: flink-mesos/pom.xml ---
@@ -0,0 +1,203 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-parent
+   org.apache.flink
+   0.10-SNAPSHOT
+   ..
+   
+   4.0.0
+
+   flink-mesos
+   flink-mesos
+   jar
+
+   
+   0.23.0
+   0.8.4
+   
+
+   
+   
+   com.github.scopt
+   scopt_${scala.binary.version}
+   
+   
+   net.databinder
+   
unfiltered-jetty_${scala.binary.version}
--- End diff --

The dependency is for the simple http server for hosting the configuration 
for the executors.

The unfiltered library is under mit license. 

Sent from my iPhone
> On Oct 11, 2015, at 20:28, Henry Saputra  wrote:
> 
> In flink-mesos/pom.xml:
> 
> > +   flink-mesos
> > +   jar
> > +
> > +   
> > +   0.23.0
> > +   0.8.4
> > +   
> > +
> > +   
> > +   
> > +   com.github.scopt
> > +   
scopt_${scala.binary.version}
> > +   
> > +   
> > +   net.databinder
> > +   
unfiltered-jetty_${scala.binary.version}
> What is this dependency for? And also what is the license for it?
> 
> —
> Reply to this email directly or view it on GitHub.
> 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-10-11 Thread hsaputra
Github user hsaputra commented on a diff in the pull request:

https://github.com/apache/flink/pull/948#discussion_r41721026
  
--- Diff: flink-mesos/pom.xml ---
@@ -0,0 +1,203 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-parent
+   org.apache.flink
+   0.10-SNAPSHOT
+   ..
+   
+   4.0.0
+
+   flink-mesos
+   flink-mesos
+   jar
+
+   
+   0.23.0
+   0.8.4
+   
+
+   
+   
+   com.github.scopt
+   scopt_${scala.binary.version}
+   
+   
+   net.databinder
+   
unfiltered-jetty_${scala.binary.version}
--- End diff --

What is this dependency for? And also what is the license for it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-10-11 Thread hsaputra
Github user hsaputra commented on a diff in the pull request:

https://github.com/apache/flink/pull/948#discussion_r41721041
  
--- Diff: flink-mesos/pom.xml ---
@@ -0,0 +1,203 @@
+
+
+http://maven.apache.org/POM/4.0.0;
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+   
+   flink-parent
+   org.apache.flink
+   0.10-SNAPSHOT
+   ..
+   
+   4.0.0
+
+   flink-mesos
+   flink-mesos
+   jar
+
+   
+   0.23.0
+   0.8.4
+   
+
+   
+   
+   com.github.scopt
+   scopt_${scala.binary.version}
+   
+   
+   net.databinder
+   
unfiltered-jetty_${scala.binary.version}
+   ${unfiltererd.version}
+   
+   
+   net.databinder
+   
unfiltered-filter_${scala.binary.version}
--- End diff --

Same question:
What is this dependency for? And also what is the license for it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-10-01 Thread hsaputra
Github user hsaputra commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-144858877
  
Quick comment on the PR, could you add Javadoc header information to give 
short description on why each new class/ trait is created and what role do they 
play to support Mesos integration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-10-01 Thread hsaputra
Github user hsaputra commented on a diff in the pull request:

https://github.com/apache/flink/pull/948#discussion_r40970091
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/executor/FlinkExecutor.scala 
---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.executor
+
+import org.apache.flink.configuration.{Configuration, GlobalConfiguration}
+import org.apache.flink.mesos._
+import org.apache.flink.mesos.scheduler._
+import org.apache.flink.runtime.StreamingMode
+import org.apache.mesos.Protos._
+import org.apache.mesos.{Executor, ExecutorDriver}
+
+import scala.util.{Failure, Success, Try}
+
+trait FlinkExecutor extends Executor {
--- End diff --

Could you add Javadoc header to explain why this trait is created and 
summary of when and where this would be used in the execution?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-10-01 Thread hsaputra
Github user hsaputra commented on a diff in the pull request:

https://github.com/apache/flink/pull/948#discussion_r40970304
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/executor/TaskManagerExecutor.scala
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.executor
+
+import scala.util.Try
+
+import org.apache.flink.configuration.GlobalConfiguration
+import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.runtime.taskmanager.TaskManager
+import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.mesos.{Executor, ExecutorDriver, MesosExecutorDriver}
+import org.apache.mesos.Protos.Status
+import org.slf4j.{Logger, LoggerFactory}
+
+class TaskManagerExecutor extends FlinkExecutor {
--- End diff --

Could you add Javadoc header to describe the purpose of this class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-09-29 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-144074252
  
Thank you for the update @ankurcha. I'm trying to review your PR in the 
next few days.
Sorry for the delay.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-09-24 Thread ankurcha
Github user ankurcha commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-142825340
  
Hi @rmetzger,

This completely got buried in scheduler but i think i have finally worked 
out the kinks. Thanks for spending the time to try it out. 

I think we are closer to having this out of the door so have a go at it and 
let me know if you see something off.

```bash
vagrant@mesos:/vagrant$ java 
-Dlog4j.configuration=file:/vagrant/log4j.properties -cp *.jar 
org.apache.flink.mesos.scheduler.FlinkScheduler --confDir /vagrant --port 10080
INFO  2015-09-24 06:22:04,584 [main] FlinkScheduler$: 

INFO  2015-09-24 06:22:04,585 [main] FlinkScheduler$:  Starting JobManager 
(Version: 0.10-SNAPSHOT, Rev:bc21de2, Date:23.09.2015 @ 20:06:33 UTC)
INFO  2015-09-24 06:22:04,585 [main] FlinkScheduler$:  Current user: vagrant
INFO  2015-09-24 06:22:04,585 [main] FlinkScheduler$:  JVM: OpenJDK 64-Bit 
Server VM - Oracle Corporation - 1.7/24.79-b02
INFO  2015-09-24 06:22:04,585 [main] FlinkScheduler$:  Maximum heap size: 
592 MiBytes
INFO  2015-09-24 06:22:04,585 [main] FlinkScheduler$:  JAVA_HOME: (not set)
INFO  2015-09-24 06:22:04,588 [main] FlinkScheduler$:  Hadoop version: 2.3.0
INFO  2015-09-24 06:22:04,588 [main] FlinkScheduler$:  JVM Options:
INFO  2015-09-24 06:22:04,588 [main] FlinkScheduler$: 
-Dlog4j.configuration=file:/vagrant/log4j.properties
INFO  2015-09-24 06:22:04,588 [main] FlinkScheduler$:  Program Arguments:
INFO  2015-09-24 06:22:04,588 [main] FlinkScheduler$: --confDir
INFO  2015-09-24 06:22:04,588 [main] FlinkScheduler$: /vagrant
INFO  2015-09-24 06:22:04,588 [main] FlinkScheduler$: --port
INFO  2015-09-24 06:22:04,589 [main] FlinkScheduler$: 10080
INFO  2015-09-24 06:22:04,589 [main] FlinkScheduler$: 

INFO  2015-09-24 06:22:04,595 [main] FlinkScheduler$: Maximum number of 
open file descriptors is 4096
INFO  2015-09-24 06:22:04,596 [main] FlinkScheduler$: Loading configuration 
from /vagrant
DEBUG 2015-09-24 06:22:04,665 [main] FlinkScheduler$: Serving configuration 
via: Some(http://127.0.0.1:10080)
INFO  2015-09-24 06:22:04,725 [Thread-3] JobManager: Starting JobManager
INFO  2015-09-24 06:22:04,726 [Thread-3] JobManager: Starting JobManager 
actor system at :6123.
I0924 06:22:04.812088  7312 sched.cpp:157] Version: 0.22.1
2015-09-24 06:22:04,815:7311(0x7fcaf8bbb700):ZOO_INFO@log_env@712: Client 
environment:zookeeper.version=zookeeper C client 3.4.5
2015-09-24 06:22:04,816:7311(0x7fcaf8bbb700):ZOO_INFO@log_env@716: Client 
environment:host.name=mesos
2015-09-24 06:22:04,818:7311(0x7fcaf8bbb700):ZOO_INFO@log_env@723: Client 
environment:os.name=Linux
2015-09-24 06:22:04,821:7311(0x7fcaf8bbb700):ZOO_INFO@log_env@724: Client 
environment:os.arch=3.16.0-30-generic
2015-09-24 06:22:04,822:7311(0x7fcaf8bbb700):ZOO_INFO@log_env@725: Client 
environment:os.version=#40~14.04.1-Ubuntu SMP Thu Jan 15 17:43:14 UTC 2015
2015-09-24 06:22:04,824:7311(0x7fcaf8bbb700):ZOO_INFO@log_env@733: Client 
environment:user.name=vagrant
2015-09-24 06:22:04,827:7311(0x7fcaf8bbb700):ZOO_INFO@log_env@741: Client 
environment:user.home=/home/vagrant
2015-09-24 06:22:04,829:7311(0x7fcaf8bbb700):ZOO_INFO@log_env@753: Client 
environment:user.dir=/vagrant
2015-09-24 06:22:04,830:7311(0x7fcaf8bbb700):ZOO_INFO@zookeeper_init@786: 
Initiating client connection, host=127.0.0.1:2181 sessionTimeout=1 
watcher=0x7fcb29b99a60 sessionId=0 sessionPasswd= context=0x1c0f350 
flags=0
2015-09-24 06:22:04,837:7311(0x7fcaf2ffd700):ZOO_INFO@check_events@1703: 
initiated connection to server [127.0.0.1:2181]
2015-09-24 06:22:04,839:7311(0x7fcaf2ffd700):ZOO_INFO@check_events@1750: 
session establishment complete on server [127.0.0.1:2181], 
sessionId=0x14ffdcf98d00015, negotiated timeout=1
I0924 06:22:04.840190  7333 group.cpp:313] Group process 
(group(1)@127.0.1.1:36068) connected to ZooKeeper
I0924 06:22:04.840239  7333 group.cpp:790] Syncing group operations: queue 
size (joins, cancels, datas) = (0, 0, 0)
I0924 06:22:04.840265  7333 group.cpp:385] Trying to create path '/mesos' 
in ZooKeeper
I0924 06:22:04.843736  7333 detector.cpp:138] Detected a new leader: 
(id='0')
I0924 06:22:04.843868  7331 group.cpp:659] Trying to get 
'/mesos/info_00' in ZooKeeper
I0924 06:22:04.844832  7331 detector.cpp:452] A new leading master 
(UPID=master@127.0.1.1:5050) is detected
I0924 06:22:04.844923  7331 sched.cpp:254] New master detected at 
master@127.0.1.1:5050
I0924 06:22:04.845263  7331 sched.cpp:264] No credentials provided. 
Attempting to register without authentication
I0924 06:22:04.847537  7331 sched.cpp:448] Framework registered 

[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-09-08 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/948#discussion_r38957167
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/SchedulerUtils.scala
 ---
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
+import scala.util.{Failure, Success, Try}
+import scala.util.control.NonFatal
+
+import com.google.common.base.Splitter
+import com.google.protobuf.{ByteString, GeneratedMessage}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration}
+import org.apache.flink.configuration.ConfigConstants._
+import org.apache.flink.mesos._
+import org.apache.flink.mesos.scheduler.FlinkScheduler._
+import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.runtime.jobmanager.{JobManager, JobManagerMode}
+import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.mesos.{MesosSchedulerDriver, Scheduler, SchedulerDriver}
+import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.CommandInfo.URI
+import org.apache.mesos.Protos.Value.Ranges
+import org.apache.mesos.Protos.Value.Type._
+
+/**
+ * This code is borrowed and inspired from Apache Spark Project:
+ *   
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+ */
+trait SchedulerUtils {
+
+  /**
+   * Converts the attributes from the resource offer into a Map of name -> 
Attribute Value
+   * The attribute values are the mesos attribute types and they are
+   * @param offerAttributes List of attributes sent with an offer
+   * @return
+   */
+  def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, 
GeneratedMessage] = {
+offerAttributes.map(attr => {
+  val attrValue = attr.getType match {
+case SCALAR => attr.getScalar
+case Value.Type.RANGES => attr.getRanges
+case Value.Type.SET => attr.getSet
+case Value.Type.TEXT => attr.getText
+  }
+  (attr.getName, attrValue)
+}).toMap
+  }
+
+  def createJavaExecCommand(jvmArgs: String = "", classPath: String = 
"flink-*.jar",
+classToExecute: String, args: String = ""): 
String = {
+s"env; java $jvmArgs -cp $classPath $classToExecute $args"
+  }
+
+  def createExecutorInfo(id: String, role: String, artifactURIs: 
Set[String], command: String,
+ nativeLibPath: String): ExecutorInfo = {
+val uris = artifactURIs.map(uri => 
URI.newBuilder().setValue(uri).build())
+ExecutorInfo.newBuilder()
+  .setExecutorId(ExecutorID
+.newBuilder()
+.setValue(s"executor_$id"))
+  .setName(s"Apache Flink Mesos Executor - $id")
+  .setCommand(CommandInfo.newBuilder()
+.setValue(s"env; $command")
+.addAllUris(uris)
+.setEnvironment(Environment.newBuilder()
+  .addVariables(Environment.Variable.newBuilder()
+.setName("MESOS_NATIVE_JAVA_LIBRARY").setValue(nativeLibPath)))
+.setValue(command))
+  .build()
+  }
+
+  def createTaskInfo(taskName: String, taskId: TaskID, slaveID: SlaveID, 
role: String, mem: Double,
+ cpus: Double, disk: Double, ports: Set[Int], 
executorInfo: ExecutorInfo,
+ conf: Configuration): TaskInfo = {
+
+val portRanges = Ranges.newBuilder().addAllRange(
+  ports.map(port => 
Value.Range.newBuilder().setBegin(port).setEnd(port).build())).build()
+
+val taskConf = conf.clone()
+val portsSeq = ports.toSeq
+// set task manager ports
+taskConf.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 
portsSeq.get(0))
+taskConf.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, 

[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-09-08 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-138640533
  
I'm trying out the code again in GCE ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-09-08 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/948#discussion_r38958292
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/SchedulerUtils.scala
 ---
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
+import scala.util.{Failure, Success, Try}
+import scala.util.control.NonFatal
+
+import com.google.common.base.Splitter
+import com.google.protobuf.{ByteString, GeneratedMessage}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration}
+import org.apache.flink.configuration.ConfigConstants._
+import org.apache.flink.mesos._
+import org.apache.flink.mesos.scheduler.FlinkScheduler._
+import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.runtime.jobmanager.{JobManager, JobManagerMode}
+import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.mesos.{MesosSchedulerDriver, Scheduler, SchedulerDriver}
+import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.CommandInfo.URI
+import org.apache.mesos.Protos.Value.Ranges
+import org.apache.mesos.Protos.Value.Type._
+
+/**
+ * This code is borrowed and inspired from Apache Spark Project:
+ *   
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+ */
+trait SchedulerUtils {
+
+  /**
+   * Converts the attributes from the resource offer into a Map of name -> 
Attribute Value
+   * The attribute values are the mesos attribute types and they are
+   * @param offerAttributes List of attributes sent with an offer
+   * @return
+   */
+  def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, 
GeneratedMessage] = {
+offerAttributes.map(attr => {
+  val attrValue = attr.getType match {
+case SCALAR => attr.getScalar
+case Value.Type.RANGES => attr.getRanges
+case Value.Type.SET => attr.getSet
+case Value.Type.TEXT => attr.getText
+  }
+  (attr.getName, attrValue)
+}).toMap
+  }
+
+  def createJavaExecCommand(jvmArgs: String = "", classPath: String = 
"flink-*.jar",
+classToExecute: String, args: String = ""): 
String = {
+s"env; java $jvmArgs -cp $classPath $classToExecute $args"
+  }
+
+  def createExecutorInfo(id: String, role: String, artifactURIs: 
Set[String], command: String,
+ nativeLibPath: String): ExecutorInfo = {
+val uris = artifactURIs.map(uri => 
URI.newBuilder().setValue(uri).build())
+ExecutorInfo.newBuilder()
+  .setExecutorId(ExecutorID
+.newBuilder()
+.setValue(s"executor_$id"))
+  .setName(s"Apache Flink Mesos Executor - $id")
+  .setCommand(CommandInfo.newBuilder()
+.setValue(s"env; $command")
+.addAllUris(uris)
+.setEnvironment(Environment.newBuilder()
+  .addVariables(Environment.Variable.newBuilder()
+.setName("MESOS_NATIVE_JAVA_LIBRARY").setValue(nativeLibPath)))
+.setValue(command))
+  .build()
+  }
+
+  def createTaskInfo(taskName: String, taskId: TaskID, slaveID: SlaveID, 
role: String, mem: Double,
+ cpus: Double, disk: Double, ports: Set[Int], 
executorInfo: ExecutorInfo,
+ conf: Configuration): TaskInfo = {
+
+val portRanges = Ranges.newBuilder().addAllRange(
+  ports.map(port => 
Value.Range.newBuilder().setBegin(port).setEnd(port).build())).build()
+
+val taskConf = conf.clone()
+val portsSeq = ports.toSeq
+// set task manager ports
+taskConf.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 
portsSeq.get(0))
+taskConf.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, 

[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-09-08 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-138649089
  
it seems that the taskmanagers are failing and the scheduler keeps 
scheduling new ones: http://i.imgur.com/iZiYa4u.png




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-09-07 Thread ankurcha
Github user ankurcha commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-138417721
  
@rmetzger This PR should be good to test. I have embedded a simple http 
server that can serve the `log4j.configuration=...` file or the a default file 
to the task managers. I recommend using `mvn -P include-mesos clean package` in 
flink dist followed by something similar to:

```bash
java 
-Dlog4j.configuration=/Users/achauhan/Projects/flink/flink-dist/target/flink-0.10-SNAPSHOT-bin/flink-0.10-SNAPSHOT/conf/log4j.properties
 -cp flink-dist-0.10-SNAPSHOT.jar 
org.apache.flink.mesos.scheduler.FlinkScheduler --confDir 
/Users/achauhan/Projects/flink/flink-dist/target/flink-0.10-SNAPSHOT-bin/flink-0.10-SNAPSHOT/conf
```

to start the job manager. The `--confDir` path must contain correct 
configuration etc. Please let me know if you face any problems. I have tried it 
using [playa-mesos](https://github.com/mesosphere/playa-mesos).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-08-31 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-136377793
  
Thank you for the response. I was asking because I think its a requirement 
that the JobManager is running as a mesos task in the cluster as well.
But as far as I understood your answer (I'm really not a Mesos expert) that 
is the case.

Did you also address the issues I had while deploying Flink on Mesos?

Let me know when the PR is ready for a test drive again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-08-30 Thread ankurcha
Github user ankurcha commented on a diff in the pull request:

https://github.com/apache/flink/pull/948#discussion_r38281174
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/SchedulerUtils.scala
 ---
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import java.util.{List = JList}
+
+import scala.collection.JavaConversions._
+import scala.util.{Failure, Success, Try}
+import scala.util.control.NonFatal
+
+import com.google.common.base.Splitter
+import com.google.protobuf.{ByteString, GeneratedMessage}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration}
+import org.apache.flink.configuration.ConfigConstants._
+import org.apache.flink.mesos._
+import org.apache.flink.mesos.scheduler.FlinkScheduler._
+import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.runtime.jobmanager.{JobManager, JobManagerMode}
+import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.mesos.{MesosSchedulerDriver, Scheduler, SchedulerDriver}
+import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.CommandInfo.URI
+import org.apache.mesos.Protos.Value.Ranges
+import org.apache.mesos.Protos.Value.Type._
+
+trait SchedulerUtils {
--- End diff --

I have addressed this in the latest set of changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-08-30 Thread ankurcha
Github user ankurcha commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-136236719
  
@rmetzger I have finally got some time to work on this again. Let me 
address your question one by one:

 Why did you decide to start the JobManager alongside the Scheduler?

This is basically a easy first step way of getting things running the way 
it was done in a whole bunch of projects. The easiest way to run a single 
master + multiple worker application is to make the scheduler run the master 
process and have another meta-framework such as marathon submit the whole 
framework as a task to the mesos server. In the lack of marathon or aurora etc, 
mesos-submit ( an app that ships with mesos) can be used to submit the 
scheduler as a task. This means the job manager + scheduler would be running in 
the mesos cluster submitted as an app (just like in YARN).

My eventual goal is to make the scheduler support a completely standalone 
mode of operation but that requires coordination in order to assure that only 
one scheduler instance exists at a time - this may have some hooks that can be 
a part of the HA job manager initiative.

 Tests
 
I am working on some docker and vagrant based scripts that can make the 
setup part of the tests more palatable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-08-19 Thread ankurcha
Github user ankurcha commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-132529165
  
Thanks all for the comments. I am out for MesosCon and work this week. I'll 
try to address the feedback and push some changes later this weekend.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-08-19 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-132503290
  
Sorry for not getting back on time @ankurcha. Robert did a good review in 
the meantime. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-08-17 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-131796852
  
Hi @ankurcha,
I've started a Mesos cluster on Google Compute Engine to try out your pull 
request.

I've used this configuration:
```
flink.mesos.master: zk://127.0.0.1:2181/mesos
flink.uberjar.location: hdfs:///user/jclouds/flink-dist-0.10-SNAPSHOT.jar
flink.mesos.taskmanagers.mem: 512
flink.mesos.taskmanagers.cpu: 0.5
taskmanager.logging.level: INFO
streamingMode: streaming

jobmanager.web.port: 8081
webclient.port: 8080
```

But I'm getting this error
```
Exception in thread main java.lang.NullPointerException
at 
org.apache.flink.mesos.scheduler.SchedulerUtils$class.createFrameworkInfoAndCredentials(SchedulerUtils.scala:255)
at 
org.apache.flink.mesos.scheduler.FlinkScheduler$.createFrameworkInfoAndCredentials(FlinkScheduler.scala:31)
at 
org.apache.flink.mesos.scheduler.FlinkScheduler$.main(FlinkScheduler.scala:183)
at 
org.apache.flink.mesos.scheduler.FlinkScheduler.main(FlinkScheduler.scala)
```

I'll further investigate the issue.

*Why did you decide to start the JobManager alongside the Scheduler?*
For Flink on YARN, we are starting the JobManager in a separate container. 
There is a lot of communication going on between the JobManager and 
TaskManagers, also, we need to ensure that the TaskManagers are able to reach 
the JM.
I think we can safely assume that containers can always communicate among 
each other ... I'm not so sure about Mesos clients and cluster containers.

 The mesos scheduler is not HA and should be used with marathon or similar 
service to ensure that there is always one instance running. This may be 
addressed in future patches.

Would you start the mesos scheduler on the client machine or inside the 
cluster, using a container?
Whats the typical deployment model for Mesos?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-08-17 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-131812796
  
It seems that you are ignoring methods such as `error(driver: 
SchedulerDriver, message: String)` or `frameworkMessage()`.
Are they application specific (e.g. send by our scheduler) or are they 
receiving events by Mesos?
I think we should not ignore these events.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-08-17 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/948#discussion_r37184472
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/executor/FlinkExecutor.scala 
---
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.executor
+
+import scala.util.{Failure, Success, Try}
+
+import org.apache.flink.configuration.{Configuration, GlobalConfiguration}
+import org.apache.flink.mesos._
+import org.apache.flink.mesos.scheduler._
+import org.apache.flink.runtime.StreamingMode
+import org.apache.log4j.{ConsoleAppender, Level, Logger = ApacheLogger, 
PatternLayout}
+import org.apache.mesos.{Executor, ExecutorDriver}
+import org.apache.mesos.Protos._
+
+trait FlinkExecutor extends Executor {
+  // logger to use
+  def LOG: org.slf4j.Logger
+
+  var currentRunningTaskId: Option[TaskID] = None
+  val TASK_MANAGER_LOGGING_LEVEL_KEY = taskmanager.logging.level
+  val DEFAULT_TASK_MANAGER_LOGGING_LEVEL = INFO
+
+
+  // methods that defines how the task is started when a launchTask is sent
+  def startTask(streamingMode: StreamingMode): Try[Unit]
+
+  var thread: Option[Thread] = None
+  var slaveId: Option[SlaveID] = None
+
+  override def shutdown(driver: ExecutorDriver): Unit = {
+LOG.info(Killing taskManager thread)
+// kill task manager thread
+for (t - thread) {
+  t.stop()
+}
+
+// exit
+sys.exit(0)
+  }
+
+  override def disconnected(driver: ExecutorDriver): Unit = {}
+
+  override def killTask(driver: ExecutorDriver, taskId: TaskID): Unit = {
+for (t - thread) {
+  LOG.info(sKilling task : ${taskId.getValue})
+  thread = None
+  currentRunningTaskId = None
+
+  // stop running thread
+  t.stop()
+
+  // Send the TASK_FINISHED status
+  driver.sendStatusUpdate(TaskStatus.newBuilder()
+.setTaskId(taskId)
+.setState(TaskState.TASK_FINISHED)
+.build())
+}
+  }
+
+
+  override def error(driver: ExecutorDriver, message: String): Unit = {}
+
+  override def frameworkMessage(driver: ExecutorDriver, data: 
Array[Byte]): Unit = {}
+
+  override def registered(driver: ExecutorDriver, executorInfo: 
ExecutorInfo,
+  frameworkInfo: FrameworkInfo, slaveInfo: 
SlaveInfo): Unit = {
+LOG.info(s${executorInfo.getName} was registered on slave: 
${slaveInfo.getHostname})
+slaveId = Some(slaveInfo.getId)
+// get the configuration passed to it
+if (executorInfo.hasData) {
+  val newConfig: Configuration = 
Utils.deserialize(executorInfo.getData.toByteArray)
+  GlobalConfiguration.includeConfiguration(newConfig)
+}
+LOG.debug(Loaded configuration: {}, 
GlobalConfiguration.getConfiguration)
+  }
+
+
+  override def reregistered(driver: ExecutorDriver, slaveInfo: SlaveInfo): 
Unit = {
+slaveId = Some(slaveInfo.getId)
+  }
+
+
+  override def launchTask(driver: ExecutorDriver, task: TaskInfo): Unit = {
+// overlay the new config over this one
+val taskConf: Configuration = 
Utils.deserialize(task.getData.toByteArray)
+GlobalConfiguration.includeConfiguration(taskConf)
+
+// reconfigure log4j
+val logLevel = GlobalConfiguration.getString(
+  TASK_MANAGER_LOGGING_LEVEL_KEY, DEFAULT_TASK_MANAGER_LOGGING_LEVEL)
+
+initializeLog4j(Level.toLevel(logLevel, Level.DEBUG))
+
+// get streaming mode
+val streamingMode = getStreamingMode()
+
+// create the thread
+val t = createThread(driver, task.getTaskId, streamingMode)
+thread = Some(t)
+t.start()
+
+// send message
+driver.sendStatusUpdate(TaskStatus.newBuilder()
+  .setTaskId(task.getTaskId)
+  .setState(TaskState.TASK_RUNNING)
+  

[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-08-17 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/948#discussion_r37185622
  
--- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/SchedulerUtils.scala
 ---
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.scheduler
+
+import java.util.{List = JList}
+
+import scala.collection.JavaConversions._
+import scala.util.{Failure, Success, Try}
+import scala.util.control.NonFatal
+
+import com.google.common.base.Splitter
+import com.google.protobuf.{ByteString, GeneratedMessage}
+import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration}
+import org.apache.flink.configuration.ConfigConstants._
+import org.apache.flink.mesos._
+import org.apache.flink.mesos.scheduler.FlinkScheduler._
+import org.apache.flink.runtime.StreamingMode
+import org.apache.flink.runtime.jobmanager.{JobManager, JobManagerMode}
+import org.apache.flink.runtime.util.EnvironmentInformation
+import org.apache.mesos.{MesosSchedulerDriver, Scheduler, SchedulerDriver}
+import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.CommandInfo.URI
+import org.apache.mesos.Protos.Value.Ranges
+import org.apache.mesos.Protos.Value.Type._
+
+trait SchedulerUtils {
--- End diff --

Some code in this trait have been copied from the Spark sources, right?
We should state this at least in the scaladocs of the trait.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-08-17 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/948#discussion_r37183905
  
--- Diff: flink-mesos/pom.xml ---
@@ -0,0 +1,188 @@
+?xml version=1.0 encoding=UTF-8?
+!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+License); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+--
+project xmlns=http://maven.apache.org/POM/4.0.0;
+xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance;
+xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+   parent
+   artifactIdflink-parent/artifactId
+   groupIdorg.apache.flink/groupId
+   version0.10-SNAPSHOT/version
+   relativePath../relativePath
+   /parent
+   modelVersion4.0.0/modelVersion
+
+   artifactIdflink-mesos/artifactId
+   nameflink-mesos/name
+   packagingjar/packaging
+
+   properties
+   mesos.version0.22.1/mesos.version
+   /properties
+
+   dependencies
+   dependency
+   groupIdorg.rogach/groupId
+   artifactIdscallop_${scala.binary.version}/artifactId
+   version0.9.5/version
--- End diff --

Looks like you are introducing a new CLI parsing library dependency to 
Flink.
There is actually a JIRA in Flink to reduce the number of those libaries: 
https://issues.apache.org/jira/browse/FLINK-1347
Since its probably just a few lines of code, can you try to use the same 
parsing library we are already using?
I think for other Scala parts in Flink, we are using 
```
dependency
groupIdcom.github.scopt/groupId
artifactIdscopt_${scala.binary.version}/artifactId
exclusions
exclusion
groupIdorg.scala-lang/groupId
artifactIdscala-library/artifactId
/exclusion
/exclusions
/dependency
```
(see flink-runtime/pom.xml)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-08-17 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-131803783
  
For the tests: For HDFS, Kafka and YARN, we are using MiniClusters which 
start all needed services in one JVM.
I don't think we can do something similar with Mesos because its written in 
C++ and there doesn't seem to be an implementation for Java.

Other JVM-based projects integrating with Mesos 
(https://github.com/mesosphere/cassandra-mesos, Spark, ...) also don't have 
these kind of end-to-end integration tests.

I suspect this project requires at least a local docker setup: 
https://github.com/ContainerSolutions/mini-mesos
But we could use something like this to provide a set of tests we can start 
manually locally, for example before releasing or when somebody is contributing 
a change to mesos


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-08-11 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-129785986
  
Thanks for the reply at the mailing list. I will try out your PR this week 
and have a look at the code. Sorry for the delay. I needed to clear some more 
time, because it is a big addition. :) Thanks for all your effort.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-08-11 Thread ankurcha
Github user ankurcha commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-129765378
  
@StephanEwen Thanks for the pointer, I replied on the mailing list thread.

Any code-review comments for this pull request?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-08-02 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-127048248
  
Here is the thread where the discussion started: 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/add-some-new-api-to-the-scheduler-in-the-job-manager-td7153.html#a7166

I would personally very much like to see a properly abstracted 
ResourceManager backend. It would make it much easier to add functionality like 
dynamic allocation and release across backends.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-07-30 Thread ankurcha
Github user ankurcha commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-126522087
  
Clearly i don't know how to add stuff correctly to the .travis file. If I 
could get some help with that, it'll be great. 

@uce @StephanEwen - I haven't seen a resource manager thread, but yes while 
developing this feature i did look a lot at the spark SchedulerBackend and 
may have some thoughts if you could point me to the relevant document / thread.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-07-28 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/948#issuecomment-125782171
  
Hey Ankur! Welcome to the Flink community. I didn't have a proper look at 
your PR yet, but regarding the question you've asked in the issue: @StephanEwen 
kicked off a discussion recently about abstracting the resource manager behind 
an interface to get overlap between YARN and Mesos. I don't know if you had a 
look at the YARN integration, but maybe you would like to be part of that 
discussion as well. In any case your PR will probably help the discussion in 
any case. ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1984] Integrate Flink with Apache Mesos

2015-07-28 Thread ankurcha
GitHub user ankurcha opened a pull request:

https://github.com/apache/flink/pull/948

[FLINK-1984] Integrate Flink with Apache Mesos

This pull requests adds a mesos scheduler and an executor (inspired from 
the work done in, now abandoned, PR #251). The highlights are as follows:

* The mesos scheduler is starts a jobManager in a parallel thread based on 
the configuration provided using `--configDir` argument.
* The mesos scheduler is not HA and should be used with marathon or similar 
service to ensure that there is always one instance running. This may be 
addressed in future patches.
* The mesos scheduler uses the some new properties which can be set using 
the `conf/flink-conf.yaml`. The configuration directory can be specified using 
the `--confDir` command line argument and a list of configuration values is 
present in 
`flink-mesos/src/main/scala/org/apache/flink/mesos/scheduler/package.scala`.

Example usage:

```bash
mvn clean package -DskipTests -Pinclude-mesos
java -Dlog4j.configuration=file:/vagrant/log4j.properties -cp *.jar 
org.apache.flink.mesos.scheduler.FlinkScheduler --conf-dir /vagrant
```

```yaml
# flink-conf.yaml

flink.mesos.master: zk://127.0.0.1:2181/mesos
flink.uberjar.location: file:///vagrant/flink-dist-0.10-SNAPSHOT.jar
flink.mesos.taskmanagers.mem: 512
flink.mesos.taskmanagers.cpu: 0.5
taskmanager.logging.level: INFO
streamingMode: streaming

jobmanager.web.port: 8081
webclient.port: 8080
```

```
# log4j.properties

log4j.rootLogger=INFO, console

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%-5p %d{ISO8601} [%t] 
%c{1}: %m%n

# suppress the warning that hadoop native libraries are not loaded 
(irrelevant for the client)
log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF
log4j.logger.org.apache.flink.mesos=DEBUG

# suppress the irrelevant (wrong) warnings from the netty channel handler
log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console

```

An easy way to test this is using https://github.com/mesosphere/playa-mesos 
to start a mesos cluster in virtualbox (using vagrant) and copying the uberjar 
to /vagrant and running the above command. The job manager web UI url is set as 
the mesos framework url.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ankurcha/flink flink-mesos

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/948.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #948


commit 8a9e3791a2c3946f1adefe64845f16f89e092c77
Author: Ankur Chauhan achau...@brightcove.com
Date:   2015-07-20T08:45:41Z

wip

commit a937ff65d72a938814162de1e9d97640c566b3f1
Author: Ankur Chauhan achau...@brightcove.com
Date:   2015-07-25T19:18:42Z

WIP - flink mesos integration initial commit

commit 3f0237adb7b1f45a1fd80e025aa5b511f92a209c
Author: Ankur Chauhan achau...@brightcove.com
Date:   2015-07-25T19:33:57Z

Add todo for config changes/issues

commit d9c29e2a3174cb90626c9deccc1604e0e010d965
Author: Ankur Chauhan achau...@brightcove.com
Date:   2015-07-27T00:03:12Z

Fix some stuff

commit 9910f50c11c81631ff8bb351cdf394cda753ff98
Author: Ankur Chauhan achau...@brightcove.com
Date:   2015-07-28T01:41:20Z

Add flink-mesos to dist, update scheduler and executor code

commit 4f2d8fed5b2bf8f49eb4da7cd148e83016206330
Author: Ankur Chauhan achau...@brightcove.com
Date:   2015-07-28T03:14:16Z

Clean up code + codestyle changes

commit b1613a965a59f89aa2e9c96dcec52fc40847423a
Author: Ankur Chauhan achau...@brightcove.com
Date:   2015-07-28T07:12:55Z

Clean up executor, adjust resource defaults

commit 5d39b1524b9ec565c5be28abcd4f80fab928042b
Author: Ankur Chauhan achau...@brightcove.com
Date:   2015-07-28T20:56:24Z

Remove status pinging and fix logging level in TaskManagerExecutor

commit 3331596c957ad774ffca31bb2c959b22422560d2
Author: Ankur Chauhan achau...@brightcove.com
Date:   2015-07-28T22:33:54Z

Code cleanup and refactoring of the conf classes

commit 33f3597f1c2aa0c7582025d9c242d4aafd9cfa63
Author: Ankur Chauhan achau...@brightcove.com
Date:   2015-07-28T22:36:04Z

Remove unused dependencies and plugins

commit 36846a95a64b9700d37954a2d0176b222b6adbe0
Author: Ankur Chauhan achau...@brightcove.com
Date:   2015-07-28T22:40:50Z

Use tab based indents in pom files

commit 52904c8593377c349bccf599b863f2a40df0f562
Author: Ankur Chauhan achau...@brightcove.com
Date:   2015-07-28T23:06:30Z

Add comments for each of the properties




---
If your project is set up for it, you can reply