[GitHub] tysonnorris commented on a change in pull request #2833: MesosContainerFactory
tysonnorris commented on a change in pull request #2833: MesosContainerFactory URL: https://github.com/apache/incubator-openwhisk/pull/2833#discussion_r170763841 ## File path: common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala ## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.mesos + +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.pattern.ask +import com.adobe.api.platform.runtime.mesos.MesosClient +import com.adobe.api.platform.runtime.mesos.Subscribe +import com.adobe.api.platform.runtime.mesos.SubscribeComplete +import com.adobe.api.platform.runtime.mesos.Teardown +import java.time.Instant +import pureconfig.loadConfigOrThrow +import scala.concurrent.Await +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.TimeoutException +import scala.concurrent.duration._ +import scala.util.Try +import whisk.common.Counter +import whisk.common.Logging +import whisk.common.TransactionId +import whisk.core.ConfigKeys +import whisk.core.WhiskConfig +import whisk.core.containerpool.Container +import whisk.core.containerpool.ContainerArgsConfig +import whisk.core.containerpool.ContainerFactory +import whisk.core.containerpool.ContainerFactoryProvider +import whisk.core.entity.ByteSize +import whisk.core.entity.ExecManifest +import whisk.core.entity.InstanceId +import whisk.core.entity.UUID + +/** + * Configuration for MesosClient + * @param masterUrl The mesos url e.g. http://leader.mesos:5050. + * @param masterPublicUrl A public facing mesos url (which may be different that the internal facing url) e.g. http://mymesos:5050. + * @param role The role used by this framework (see http://mesos.apache.org/documentation/latest/roles/#associating-frameworks-with-roles). + * @param failoverTimeoutSeconds Timeout allowed for framework to reconnect after disconnection. + * @param mesosLinkLogMessage If true, display a link to mesos in the static log message, otherwise do not include a link to mesos. + */ +case class MesosConfig(masterUrl: String, + masterPublicUrl: Option[String] = None, + role: String = "*", + failoverTimeoutSeconds: FiniteDuration = 0.seconds, + mesosLinkLogMessage: Boolean = true) + +class MesosContainerFactory(config: WhiskConfig, +actorSystem: ActorSystem, +logging: Logging, +parameters: Map[String, Set[String]], +containerArgs: ContainerArgsConfig = + loadConfigOrThrow[ContainerArgsConfig](ConfigKeys.containerArgs), +mesosConfig: MesosConfig = loadConfigOrThrow[MesosConfig](ConfigKeys.mesos), +clientFactory: (ActorSystem, MesosConfig) => ActorRef = MesosContainerFactory.createClient, +taskIdGenerator: () => String = MesosContainerFactory.taskIdGenerator) +extends ContainerFactory { + + val subscribeTimeout = 10.seconds + val teardownTimeout = 30.seconds + + //init mesos framework: + implicit val as: ActorSystem = actorSystem + implicit val ec: ExecutionContext = actorSystem.dispatcher + + //mesos master url to subscribe the framework to + val mesosMaster = mesosConfig.masterUrl + //public mesos url where developers can browse logs (till there is way to delegate log retrieval to an external system) + val mesosMasterPublic = mesosConfig.masterPublicUrl + + val mesosClientActor = clientFactory(as, mesosConfig) + + subscribe() Review comment: Mesos tasks can be submitted to the client even if the client is not yet subscribed. Regardless of subscription status, submitting a task to mesos is an async process anyways (framework must wait for offers that have required resources before any launch can be issued). The offer handling may be tuned at the mesos-actor config in the future, e.g. to allow "holding" offers, instead of immediately accepting/declining them, to optimize for cases where no task is queued at offer receipt time, but an task arrives shortly
[GitHub] tysonnorris commented on a change in pull request #2833: MesosContainerFactory
tysonnorris commented on a change in pull request #2833: MesosContainerFactory URL: https://github.com/apache/incubator-openwhisk/pull/2833#discussion_r170762841 ## File path: common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala ## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.mesos + +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.pattern.ask +import com.adobe.api.platform.runtime.mesos.MesosClient +import com.adobe.api.platform.runtime.mesos.Subscribe +import com.adobe.api.platform.runtime.mesos.SubscribeComplete +import com.adobe.api.platform.runtime.mesos.Teardown +import java.time.Instant +import pureconfig.loadConfigOrThrow +import scala.concurrent.Await +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.TimeoutException +import scala.concurrent.duration._ +import scala.util.Try +import whisk.common.Counter +import whisk.common.Logging +import whisk.common.TransactionId +import whisk.core.ConfigKeys +import whisk.core.WhiskConfig +import whisk.core.containerpool.Container +import whisk.core.containerpool.ContainerArgsConfig +import whisk.core.containerpool.ContainerFactory +import whisk.core.containerpool.ContainerFactoryProvider +import whisk.core.entity.ByteSize +import whisk.core.entity.ExecManifest +import whisk.core.entity.InstanceId +import whisk.core.entity.UUID + +/** + * Configuration for MesosClient + * @param masterUrl The mesos url e.g. http://leader.mesos:5050. + * @param masterPublicUrl A public facing mesos url (which may be different that the internal facing url) e.g. http://mymesos:5050. + * @param role The role used by this framework (see http://mesos.apache.org/documentation/latest/roles/#associating-frameworks-with-roles). + * @param failoverTimeoutSeconds Timeout allowed for framework to reconnect after disconnection. + * @param mesosLinkLogMessage If true, display a link to mesos in the static log message, otherwise do not include a link to mesos. + */ +case class MesosConfig(masterUrl: String, + masterPublicUrl: Option[String] = None, + role: String = "*", + failoverTimeoutSeconds: FiniteDuration = 0.seconds, + mesosLinkLogMessage: Boolean = true) Review comment: It seems weird to have a bunch of unused configs there - e.g. defaults configs that are not used by the default spi impl. But I see that kube has started doing that, so if other impls are going to have actual defaults (even when they are not in use) I can put the mesos ones there as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tysonnorris commented on a change in pull request #2833: MesosContainerFactory
tysonnorris commented on a change in pull request #2833: MesosContainerFactory URL: https://github.com/apache/incubator-openwhisk/pull/2833#discussion_r168255460 ## File path: common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala ## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.mesos + +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.pattern.ask +import com.adobe.api.platform.runtime.mesos.MesosClient +import com.adobe.api.platform.runtime.mesos.Subscribe +import com.adobe.api.platform.runtime.mesos.SubscribeComplete +import com.adobe.api.platform.runtime.mesos.Teardown +import java.time.Instant +import pureconfig.loadConfigOrThrow +import scala.concurrent.Await +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.TimeoutException +import scala.concurrent.duration._ +import scala.util.Failure +import scala.util.Success +import scala.util.Try +import whisk.common.Counter +import whisk.common.Logging +import whisk.common.TransactionId +import whisk.core.ConfigKeys +import whisk.core.WhiskConfig +import whisk.core.containerpool.Container +import whisk.core.containerpool.ContainerArgsConfig +import whisk.core.containerpool.ContainerFactory +import whisk.core.containerpool.ContainerFactoryProvider +import whisk.core.entity.ByteSize +import whisk.core.entity.ExecManifest +import whisk.core.entity.InstanceId +import whisk.core.entity.UUID + +/** + * Configuration for MesosClient + * @param masterUrl The mesos url e.g. http://leader.mesos:5050. + * @param masterPublicUrl A public facing mesos url (which may be different that the internal facing url) e.g. http://mymesos:5050. + * @param role The role used by this framework (see http://mesos.apache.org/documentation/latest/roles/#associating-frameworks-with-roles). + * @param failoverTimeoutSeconds Timeout allowed for framework to reconnect after disconnection. + * @param mesosLinkLogMessage If true, display a link to mesos in the static log message, otherwise do not include a link to mesos. + */ +case class MesosConfig(masterUrl: String, + masterPublicUrl: Option[String] = None, + role: String = "*", + failoverTimeoutSeconds: FiniteDuration = 0.seconds, + mesosLinkLogMessage: Boolean = true) + +class MesosContainerFactory(config: WhiskConfig, +actorSystem: ActorSystem, +logging: Logging, +parameters: Map[String, Set[String]], +containerArgs: ContainerArgsConfig = + loadConfigOrThrow[ContainerArgsConfig](ConfigKeys.containerArgs), +mesosConfig: MesosConfig = loadConfigOrThrow[MesosConfig](ConfigKeys.mesos), +clientFactory: (ActorSystem, MesosConfig) => ActorRef = MesosContainerFactory.createClient, +taskIdGenerator: () => String = MesosContainerFactory.taskIdGenerator) +extends ContainerFactory { + + val subscribeTimeout = 30.seconds + val teardownTimeout = 30.seconds + + //init mesos framework: + implicit val as: ActorSystem = actorSystem + implicit val ec: ExecutionContext = actorSystem.dispatcher + + //mesos master url to subscribe the framework to + val mesosMaster = mesosConfig.masterUrl + //public mesos url where developers can browse logs (till there is way to delegate log retrieval to an external system) + val mesosMasterPublic = mesosConfig.masterPublicUrl + + var isSubscribed = false; + + val mesosClientActor = clientFactory(as, mesosConfig) + + //periodically retry if subscribing did not suceed + as.scheduler.schedule(0.seconds, (subscribeTimeout.toSeconds + 10).seconds) { +if (!isSubscribed) { + subscribe() +} + } Review comment: agreed! done This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this
[GitHub] tysonnorris commented on a change in pull request #2833: MesosContainerFactory
tysonnorris commented on a change in pull request #2833: MesosContainerFactory URL: https://github.com/apache/incubator-openwhisk/pull/2833#discussion_r168235692 ## File path: common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.mesos + +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.pattern.ask +import akka.stream.scaladsl.Source +import akka.util.ByteString +import akka.util.Timeout +import com.adobe.api.platform.runtime.mesos.Bridge +import com.adobe.api.platform.runtime.mesos.DeleteTask +import com.adobe.api.platform.runtime.mesos.Host +import com.adobe.api.platform.runtime.mesos.Running +import com.adobe.api.platform.runtime.mesos.SubmitTask +import com.adobe.api.platform.runtime.mesos.TaskDef +import com.adobe.api.platform.runtime.mesos.User +import java.time.LocalDateTime +import java.time.format.DateTimeFormatter +import org.apache.mesos.v1.Protos.TaskState +import org.apache.mesos.v1.Protos.TaskStatus +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.language.postfixOps +import spray.json._ +import whisk.common.Logging +import whisk.common.TransactionId +import whisk.core.containerpool.Container +import whisk.core.containerpool.ContainerAddress +import whisk.core.containerpool.ContainerId +import whisk.core.entity.ByteSize +import whisk.core.entity.size._ + +/** + * MesosTask implementation of Container. + * Differences from DockerContainer include: + * - does not launch container using docker cli, but rather a Mesos framework + * - does not support pause/resume + * - does not support log collection (currently), but does provide a message indicating logs can be viewed via Mesos UI + * (external log collection and retrieval must be enabled via LogStore SPI to expose logs to wsk cli) + */ +case object Environment +case class CreateContainer(image: String, memory: String, cpuShare: String) + +object MesosTask { + val taskLaunchTimeout = Timeout(45 seconds) + val taskDeleteTimeout = Timeout(30 seconds) + + def create(mesosClientActor: ActorRef, + mesosConfig: MesosConfig, + taskIdGenerator: () => String, + transid: TransactionId, + image: String, + userProvidedImage: Boolean = false, + memory: ByteSize = 256.MB, + cpuShares: Int = 0, + environment: Map[String, String] = Map(), + network: String = "bridge", + dnsServers: Seq[String] = Seq(), + name: Option[String] = None, + parameters: Map[String, Set[String]] = Map())(implicit ec: ExecutionContext, + log: Logging, + as: ActorSystem): Future[Container] = { +implicit val tid = transid + +log.info(this, s"creating task for image ${image}...") + +val mesosCpuShares = cpuShares / 1024.0 //convert openwhisk (docker based) shares to mesos (cpu percentage) +val mesosRam = memory.toMB.toInt + +val taskId = taskIdGenerator() +val lowerNetwork = network.toLowerCase //match bridge+host without case, but retain case for user specified network +val taskNetwork = lowerNetwork match { + case "bridge" => Bridge + case "host" => Host + case _=> User(network) +} +val dnsOrEmpty = if (dnsServers.nonEmpty) Map("dns" -> dnsServers.toSet) else Map() + +val task = new TaskDef( + taskId, + name.getOrElse(image), //task name either the indicated name, or else the image name + image, + mesosCpuShares, + mesosRam, + List(8080), //all action containers listen on 8080 + Some(0), //port at index 0 used for health + false, + taskNetwork, + dnsOrEmpty ++ parameters, + environment) + +val launched: Future[Running] = + mesosClientActor.ask(SubmitTask(task))(taskLaunchTimeout).mapTo[Running] + +launched.map(taskDetails => { + val taskHost = taskDetails.hostname + val taskPort = taskDetails.hostports(0) +
[GitHub] tysonnorris commented on a change in pull request #2833: MesosContainerFactory
tysonnorris commented on a change in pull request #2833: MesosContainerFactory URL: https://github.com/apache/incubator-openwhisk/pull/2833#discussion_r167350754 ## File path: common/scala/build.gradle ## @@ -40,6 +40,8 @@ dependencies { compile 'com.google.code.findbugs:jsr305:3.0.2' compile 'io.kamon:kamon-core_2.11:0.6.7' compile 'io.kamon:kamon-statsd_2.11:0.6.7' +//for mesos +compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.3' Review comment: Not sure what parts or concerns there are specifically? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tysonnorris commented on a change in pull request #2833: MesosContainerFactory
tysonnorris commented on a change in pull request #2833: MesosContainerFactory URL: https://github.com/apache/incubator-openwhisk/pull/2833#discussion_r164245116 ## File path: docs/mesos.md ## @@ -0,0 +1,47 @@ +# Mesos Support + +The MesosContainerFactory enables launching action containers within a Mesos cluster. It does not affect the deployment of OpenWhisk components (invoker, controller). + +## Enable + +To enable MesosContainerFactory, use the following TypeSafe Config properties + +| property | required | details | example | +| --- | --- | --- | --- | +| whisk.spi.ContainerFactoryProvider | required | enable the MesosContainerFactory | whisk.core.mesos.MesosContainerFactoryProvider | +| whisk.mesos.master-url | required | mesos master http endpoint to be accessed from the invoker for framework subscription | http://192.168.99.100:5050 | +| whisk.mesos.master-url-public | optional (default to whisk.mesos.master-url) | public facing mesos master http endpoint for exposing logs to cli users | http://192.168.99.100:5050 | +| whisk.mesos.role | optional (default *) | mesos framework role| any string e.g. `openwhisk` | +| whisk.mesos.failover-timeout-seconds | optional (default 0) | how long to wait for the framework to reconnect with the same id before tasks are terminated | see http://mesos.apache.org/documentation/latest/high-availability-framework-guide/ | +| whisk.mesos.mesos-link-log-message | optional (default true) | display a log message with a link to mesos when using the default LogStore (or no log message) | Since logs are not available for invoker to collect from mesos in general, you can either use an alternate LogStore or direct users to the mesos ui | + +To set these properties for your invoker, set the corresponding environment variables e.g. +```properties +CONFIG_whisk_spi_ContainerFactoryProvider=whisk.core.mesos.MesosContainerFactoryProvider +CONFIG_Dwhisk_mesos_masterUrl=http://192.168.99.100:5050 +``` + +## Known Issues + +* logs are not collected from action containers + + For now, the mesos public url will be included in the logs retrieved via the wsk CLI. Once log retrieval from external sources is enabled, logs from mesos containers would have to be routed to the external source, and then retrieved from that source. + +* no HA or failover support (single invoker per cluster) + + Currently the version of mesos-actor in use does not support HA or failover. Failover support is planned to be provided by: + + * multiple invokers running in an Akka cluster + * the mesos framework actor is a singleton within the cluster + * the mesos framework actor is available from the other invoker nodes + * if the node that contains the mesos framework actor fails : + * the actor will be recreated on a separate invoker node + * the actor will resubscribe to mesos scheduler API with the same ID + * the tasks that were previously launched by the actor will be reconciled + * normal operation resumes + Review comment: The invoker relevant pieces will be part of the mesos-actor (client). There is a separate issue with HA for controllers, where seed node discovery needs to be based on mesos for distributed bookeeping data #2992 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tysonnorris commented on a change in pull request #2833: MesosContainerFactory
tysonnorris commented on a change in pull request #2833: MesosContainerFactory URL: https://github.com/apache/incubator-openwhisk/pull/2833#discussion_r164244571 ## File path: common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.mesos + +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.pattern.ask +import akka.stream.scaladsl.Source +import akka.util.ByteString +import akka.util.Timeout +import com.adobe.api.platform.runtime.mesos.Bridge +import com.adobe.api.platform.runtime.mesos.DeleteTask +import com.adobe.api.platform.runtime.mesos.Running +import com.adobe.api.platform.runtime.mesos.SubmitTask +import com.adobe.api.platform.runtime.mesos.TaskDef +import java.time.LocalDateTime +import java.time.format.DateTimeFormatter +import org.apache.mesos.v1.Protos.TaskState +import org.apache.mesos.v1.Protos.TaskStatus +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.language.postfixOps +import spray.json._ +import whisk.common.Logging +import whisk.common.TransactionId +import whisk.core.containerpool.Container +import whisk.core.containerpool.ContainerAddress +import whisk.core.containerpool.ContainerId +import whisk.core.entity.ByteSize +import whisk.core.entity.size._ + +/** + * MesosTask implementation of Container. + * Differences from DockerContainer include: + * - does not launch container using docker cli, but rather a Mesos framework + * - does not support pause/resume + * - does not support log collection (currently), but does provide a message indicating logs can be viewed via Mesos UI + * (external log collection and retrieval must be enabled via LogStore SPI to expose logs to wsk cli) + */ +case object Environment +case class CreateContainer(image: String, memory: String, cpuShare: String) + +object MesosTask { + val taskLaunchTimeout = Timeout(45 seconds) + val taskDeleteTimeout = Timeout(30 seconds) + + def create(mesosClientActor: ActorRef, + mesosConfig: MesosConfig, + taskIdGenerator: () => String, + transid: TransactionId, + image: String, + userProvidedImage: Boolean = false, + memory: ByteSize = 256.MB, + cpuShares: Int = 0, + environment: Map[String, String] = Map(), + network: String = "bridge", + dnsServers: Seq[String] = Seq(), + name: Option[String] = None, + parameters: Map[String, Set[String]] = Map())(implicit ec: ExecutionContext, + log: Logging, + as: ActorSystem): Future[Container] = { +implicit val tid = transid + +log.info(this, s"creating task for image ${image}...") + +val mesosCpuShares = cpuShares / 1024.0 //convert openwhisk (docker based) shares to mesos (cpu percentage) +val mesosRam = memory.toMB.toInt + +//TODO: update mesos-actor to support multiple param values for the same key via Map[String, Set[String]] +val flatParams = parameters.filter(_._2.nonEmpty).map(e => (e._1 -> e._2.head)) +val taskId = taskIdGenerator() +val task = new TaskDef( + taskId, + name.getOrElse(image), //task name either the indicated name, or else the image name + image, + mesosCpuShares, + mesosRam, + List(8080), //all action containers listen on 8080 + Some(0), //port at index 0 used for health + false, + Bridge, + flatParams, + environment) + +val launched: Future[Running] = + mesosClientActor.ask(SubmitTask(task))(taskLaunchTimeout).mapTo[Running] + +launched.map(taskDetails => { + val taskHost = taskDetails.hostname + val taskPort = taskDetails.hostports(0) + log.info(this, s"launched task with state ${taskDetails.taskStatus.getState} at ${taskHost}:${taskPort}") + val containerIp = new ContainerAddress(taskHost, taskPort) + val containerId = new ContainerId(taskId); + new MesosTask(containerId, containerIp, ec, log,
[GitHub] tysonnorris commented on a change in pull request #2833: MesosContainerFactory
tysonnorris commented on a change in pull request #2833: MesosContainerFactory URL: https://github.com/apache/incubator-openwhisk/pull/2833#discussion_r164244356 ## File path: common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.mesos + +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.pattern.ask +import com.adobe.api.platform.runtime.mesos.MesosClient +import com.adobe.api.platform.runtime.mesos.Subscribe +import com.adobe.api.platform.runtime.mesos.SubscribeComplete +import com.adobe.api.platform.runtime.mesos.Teardown +import java.time.Instant +import pureconfig.loadConfigOrThrow +import scala.concurrent.Await +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.Failure +import scala.util.Success +import whisk.common.Counter +import whisk.common.Logging +import whisk.common.TransactionId +import whisk.core.ConfigKeys +import whisk.core.WhiskConfig +import whisk.core.containerpool.Container +import whisk.core.containerpool.ContainerFactory +import whisk.core.containerpool.ContainerFactoryProvider +import whisk.core.entity.ByteSize +import whisk.core.entity.ExecManifest +import whisk.core.entity.InstanceId +import whisk.core.entity.UUID + +case class MesosConfig(masterUrl: String, + masterPublicUrl: Option[String] = None, + role: String = "*", + failoverTimeoutSeconds: FiniteDuration = 0.seconds, + mesosLinkLogMessage: Boolean = true) + +class MesosContainerFactory(config: WhiskConfig, +actorSystem: ActorSystem, +logging: Logging, +parameters: Map[String, Set[String]], +mesosConfig: MesosConfig = loadConfigOrThrow[MesosConfig](ConfigKeys.mesos), +clientFactory: (ActorSystem, MesosConfig) => ActorRef = MesosContainerFactory.createClient, +taskIdGenerator: () => String = MesosContainerFactory.taskIdGenerator) +extends ContainerFactory { + + val subscribeTimeout = 30.seconds + + //init mesos framework: + implicit val as: ActorSystem = actorSystem + implicit val ec: ExecutionContext = actorSystem.dispatcher + + //mesos master url to subscribe the framework to + val mesosMaster = mesosConfig.masterUrl + //public mesos url where developers can browse logs (till there is way to delegate log retrieval to an external system) + val mesosMasterPublic = mesosConfig.masterPublicUrl + + var isSubscribed = false; + + val mesosClientActor = clientFactory(as, mesosConfig) + + //periodically retry if subscribing did not suceed + as.scheduler.schedule(0.seconds, (subscribeTimeout.toSeconds + 10).seconds) { +if (!isSubscribed) { + subscribe() +} + } + + private def subscribe(): Unit = { +logging.info(this, s"subscribing to mesos master at ${mesosMaster}") +//subscribe mesos actor to mesos event stream +//TODO: subscribe failure should make invoker "unhealthy" +mesosClientActor + .ask(Subscribe)(subscribeTimeout) + .mapTo[SubscribeComplete] + .onComplete({ +case Success(complete) => + isSubscribed = true + logging.info(this, s"subscribe completed successfully...${complete}") +case Failure(e) => logging.error(this, s"subscribe failed...${e}") + }) + } + + override def createContainer(tid: TransactionId, + name: String, + actionImage: ExecManifest.ImageName, + userProvidedImage: Boolean, + memory: ByteSize)(implicit config: WhiskConfig, logging: Logging): Future[Container] = { +implicit val transid = tid +val image = if (userProvidedImage) { + actionImage.publicImageName +} else { + actionImage.localImageName(config.dockerRegistry, config.dockerImagePrefix, Some(config.dockerImageTag)) +} + +logging.info(this, s"using Mesos to create
[GitHub] tysonnorris commented on a change in pull request #2833: MesosContainerFactory
tysonnorris commented on a change in pull request #2833: MesosContainerFactory URL: https://github.com/apache/incubator-openwhisk/pull/2833#discussion_r164244282 ## File path: common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.mesos + +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.pattern.ask +import com.adobe.api.platform.runtime.mesos.MesosClient +import com.adobe.api.platform.runtime.mesos.Subscribe +import com.adobe.api.platform.runtime.mesos.SubscribeComplete +import com.adobe.api.platform.runtime.mesos.Teardown +import java.time.Instant +import pureconfig.loadConfigOrThrow +import scala.concurrent.Await +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.Failure +import scala.util.Success +import whisk.common.Counter +import whisk.common.Logging +import whisk.common.TransactionId +import whisk.core.ConfigKeys +import whisk.core.WhiskConfig +import whisk.core.containerpool.Container +import whisk.core.containerpool.ContainerFactory +import whisk.core.containerpool.ContainerFactoryProvider +import whisk.core.entity.ByteSize +import whisk.core.entity.ExecManifest +import whisk.core.entity.InstanceId +import whisk.core.entity.UUID + +case class MesosConfig(masterUrl: String, + masterPublicUrl: Option[String] = None, + role: String = "*", + failoverTimeoutSeconds: FiniteDuration = 0.seconds, + mesosLinkLogMessage: Boolean = true) + +class MesosContainerFactory(config: WhiskConfig, +actorSystem: ActorSystem, +logging: Logging, +parameters: Map[String, Set[String]], +mesosConfig: MesosConfig = loadConfigOrThrow[MesosConfig](ConfigKeys.mesos), +clientFactory: (ActorSystem, MesosConfig) => ActorRef = MesosContainerFactory.createClient, +taskIdGenerator: () => String = MesosContainerFactory.taskIdGenerator) +extends ContainerFactory { + + val subscribeTimeout = 30.seconds + + //init mesos framework: + implicit val as: ActorSystem = actorSystem + implicit val ec: ExecutionContext = actorSystem.dispatcher + + //mesos master url to subscribe the framework to + val mesosMaster = mesosConfig.masterUrl + //public mesos url where developers can browse logs (till there is way to delegate log retrieval to an external system) + val mesosMasterPublic = mesosConfig.masterPublicUrl + + var isSubscribed = false; + + val mesosClientActor = clientFactory(as, mesosConfig) + + //periodically retry if subscribing did not suceed + as.scheduler.schedule(0.seconds, (subscribeTimeout.toSeconds + 10).seconds) { +if (!isSubscribed) { + subscribe() +} + } + + private def subscribe(): Unit = { +logging.info(this, s"subscribing to mesos master at ${mesosMaster}") +//subscribe mesos actor to mesos event stream +//TODO: subscribe failure should make invoker "unhealthy" +mesosClientActor + .ask(Subscribe)(subscribeTimeout) + .mapTo[SubscribeComplete] + .onComplete({ +case Success(complete) => + isSubscribed = true + logging.info(this, s"subscribe completed successfully...${complete}") +case Failure(e) => logging.error(this, s"subscribe failed...${e}") + }) + } + + override def createContainer(tid: TransactionId, + name: String, + actionImage: ExecManifest.ImageName, + userProvidedImage: Boolean, + memory: ByteSize)(implicit config: WhiskConfig, logging: Logging): Future[Container] = { +implicit val transid = tid +val image = if (userProvidedImage) { + actionImage.publicImageName +} else { + actionImage.localImageName(config.dockerRegistry, config.dockerImagePrefix, Some(config.dockerImageTag)) +} + +logging.info(this, s"using Mesos to create
[GitHub] tysonnorris commented on a change in pull request #2833: MesosContainerFactory
tysonnorris commented on a change in pull request #2833: MesosContainerFactory URL: https://github.com/apache/incubator-openwhisk/pull/2833#discussion_r143053113 ## File path: common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala ## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.mesos + +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.pattern.ask +import akka.util.Timeout +import com.adobe.api.platform.runtime.mesos.Bridge +import com.adobe.api.platform.runtime.mesos.DeleteTask +import com.adobe.api.platform.runtime.mesos.Running +import com.adobe.api.platform.runtime.mesos.SubmitTask +import com.adobe.api.platform.runtime.mesos.TaskDef +import java.time.Instant +import org.apache.mesos.v1.Protos.TaskStatus +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.language.postfixOps +import spray.json._ +import whisk.common.Counter +import whisk.common.Logging +import whisk.common.TransactionId +import whisk.core.containerpool.Container +import whisk.core.containerpool.ContainerAddress +import whisk.core.containerpool.ContainerId +import whisk.core.entity.ByteSize +import whisk.core.entity.size._ + +/** + * MesosTask implementation of Container. + * Differences from DockerContainer include: + * - does not launch container using docker cli, but rather a Mesos framework + * - does not support pause/resume + * - does not support log collection (currently), but does provide a message indicating logs can be viewed via Mesos UI + * (once an extensible logging approach is supported, external log collection will be allowed to expose logs to wsk cli) + */ +case class Environment() +case class CreateContainer(image: String, memory: String, cpuShare: String) + +object MesosTask { + val taskLaunchTimeout = Timeout(45 seconds) + val taskDeleteTimeout = Timeout(30 seconds) + val counter = new Counter() + val startTime = Instant.now.getEpochSecond + + def create(mesosClientActor: ActorRef, + mesosUri: String, + transid: TransactionId, + image: String, + userProvidedImage: Boolean = false, + memory: ByteSize = 256.MB, + cpuShares: Int = 0, + environment: Map[String, String] = Map(), + network: String = "bridge", + dnsServers: Seq[String] = Seq(), + name: Option[String] = None, + parameters: Map[String, Set[String]])(implicit ec: ExecutionContext, + log: Logging, + as: ActorSystem): Future[Container] = { +implicit val tid = transid + +log.info(this, s"creating task for image ${image}...") + +val taskId = s"whisk-${counter.next()}-${startTime}" + +val mesosCpuShares = cpuShares / 1024.0 //convert openwhisk (docker based) shares to mesos (cpu percentage) +val mesosRam = memory.toMB.toInt + +//TODO: update mesos-actor to support multiple param values for the same key via Map[String, Set[String]] +val flatParams = parameters.filter(_._2.nonEmpty).map(e => (e._1 -> e._2.head)) +val task = new TaskDef( + taskId, + name.getOrElse(image), //task name either the indicated name, or else the image name + image, + mesosCpuShares, + mesosRam, + List(8080), //all action containers listen on 8080 + Some(0), //port at index 0 used for health + false, + Bridge, + flatParams, + environment) + +val launched: Future[Running] = + mesosClientActor.ask(SubmitTask(task))(taskLaunchTimeout).mapTo[Running] + +launched.map(taskDetails => { + val taskHost = taskDetails.hostname + val taskPort = taskDetails.hostports(0) + log.info(this, s"launched task with state ${taskDetails.taskStatus.getState} at ${taskHost}:${taskPort}") + val containerIp = new ContainerAddress(taskHost, taskPort) + val containerId = new ContainerId(taskId); + new MesosTask(containerId, containerIp, ec, log, taskId, mesosClientActor, mesosUri) +}) recover { + case t => throw new
[GitHub] tysonnorris commented on a change in pull request #2833: MesosContainerFactory
tysonnorris commented on a change in pull request #2833: MesosContainerFactory URL: https://github.com/apache/incubator-openwhisk/pull/2833#discussion_r143052621 ## File path: common/scala/src/main/resources/reference.conf ## @@ -1,5 +1,6 @@ -whisk.spi{ +whisk.spi { ArtifactStoreProvider = whisk.core.database.CouchDbStoreProvider MessagingProvider = whisk.connector.kafka.KafkaMessagingProvider ContainerFactoryProvider = whisk.core.containerpool.docker.DockerContainerFactoryProvider + #ContainerFactoryProvider = whisk.core.mesos.MesosContainerFactoryProvider Review comment: yes, an example; testing can be done to the extent of making sure the mesos-actor receives messages that are expected, but I wouldn't expect to run a mesos cluster; we need to trust the the mesos-actor maintains the support of the mesos API; additional tests will be added to mesos-actor as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tysonnorris commented on a change in pull request #2833: MesosContainerFactory
tysonnorris commented on a change in pull request #2833: MesosContainerFactory URL: https://github.com/apache/incubator-openwhisk/pull/2833#discussion_r143051866 ## File path: common/scala/src/main/resources/application.conf ## @@ -12,3 +12,7 @@ akka.http { max-open-requests = 128 } } + +#configs used by MesosContainerFactory +#whisk.mesos.master-url = "http://192.168.99.100:5050; Review comment: yes, currently there is no way to have an SPI impl advertise required properties to the whisk config; and we are already using typesafe config via akka This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tysonnorris commented on a change in pull request #2833: MesosContainerFactory
tysonnorris commented on a change in pull request #2833: MesosContainerFactory URL: https://github.com/apache/incubator-openwhisk/pull/2833#discussion_r143049817 ## File path: common/scala/build.gradle ## @@ -30,6 +30,8 @@ dependencies { compile 'com.fasterxml.uuid:java-uuid-generator:3.1.3' compile 'com.github.ben-manes.caffeine:caffeine:2.4.0' compile 'com.google.code.findbugs:jsr305:3.0.2' +//for mesos +compile 'com.adobe.api.platform.runtime:mesos-actor:0.0.3' Review comment: yes, apache This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tysonnorris commented on a change in pull request #2833: MesosContainerFactory
tysonnorris commented on a change in pull request #2833: MesosContainerFactory URL: https://github.com/apache/incubator-openwhisk/pull/2833#discussion_r142812631 ## File path: common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala ## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.mesos + +import akka.actor.ActorSystem +import akka.pattern.ask +import com.adobe.api.platform.runtime.mesos.MesosClient +import com.adobe.api.platform.runtime.mesos.Subscribe +import com.adobe.api.platform.runtime.mesos.SubscribeComplete +import com.adobe.api.platform.runtime.mesos.Teardown +import scala.concurrent.Await +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration._ +import whisk.common.Logging +import whisk.common.TransactionId +import whisk.core.WhiskConfig +import whisk.core.containerpool.Container +import whisk.core.containerpool.ContainerFactory +import whisk.core.containerpool.ContainerFactoryProvider +import whisk.core.entity.ByteSize +import whisk.core.entity.ExecManifest +import whisk.core.entity.InstanceId +import whisk.core.entity.UUID + +class MesosContainerFactory(config: WhiskConfig, +actorSystem: ActorSystem, +logging: Logging, +parameters: Map[String, Set[String]]) +extends ContainerFactory { + + val subscribeTimeout = 30.seconds + + //init mesos framework: + implicit val as: ActorSystem = actorSystem + implicit val ec: ExecutionContext = actorSystem.dispatcher + + val akkaConfig = actorSystem.settings.config + //mesos master url to subscribe the framework to + val mesosMaster = akkaConfig.getString("whisk.mesos.master-url") + //public mesos url where developers can browse logs (till there is way to delegate log retrieval to an external system) + val mesosMasterPublic = akkaConfig.getString("whisk.mesos.master-url-public") Review comment: Yes, since there is not yet any external log store possible, it shows them a link to the mesos ui. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services