This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 9d20eae Make amount of parallel docker runs configurable. (#3698) 9d20eae is described below commit 9d20eae1b51c1599273506c85d7fe45fa4d6e0bc Author: Christian Bickel <git...@cbickel.de> AuthorDate: Fri Aug 10 14:14:22 2018 +0200 Make amount of parallel docker runs configurable. (#3698) --- ansible/roles/invoker/tasks/deploy.yml | 3 ++- .../src/main/scala/whisk/core/WhiskConfig.scala | 2 +- core/invoker/src/main/resources/application.conf | 28 ++++++++++++++------- .../core/containerpool/docker/DockerClient.scala | 29 +++++++++++++--------- 4 files changed, 39 insertions(+), 23 deletions(-) diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml index 98b5c14..8fd93bf 100644 --- a/ansible/roles/invoker/tasks/deploy.yml +++ b/ansible/roles/invoker/tasks/deploy.yml @@ -206,6 +206,7 @@ "INVOKER_CONTAINER_POLICY": "{{ invoker_container_policy_name | default()}}" "CONFIG_whisk_containerPool_numCore": "{{ invoker.numcore }}" "CONFIG_whisk_containerPool_coreShare": "{{ invoker.coreshare }}" + "CONFIG_whisk_docker_client_parallelRuns": "{{ invoker_parallel_runs | default() }}" "CONFIG_whisk_docker_containerFactory_useRunc": "{{ invoker.useRunc }}" "WHISK_LOGS_DIR": "{{ whisk_logs_dir }}" "METRICS_KAMON": "{{ metrics.kamon.enabled }}" @@ -271,7 +272,7 @@ volumes: "{{ volumes|default('') }},{{ coverage_logs_dir }}/invoker:/coverage" when: coverage_enabled -- name: start invoker using docker cli +- name: start invoker docker_container: userns_mode: "host" pid_mode: "host" diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala index 7fdb10b..5f7a8db 100644 --- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala @@ -213,7 +213,7 @@ object ConfigKeys { val db = "whisk.db" val docker = "whisk.docker" - val dockerTimeouts = s"$docker.timeouts" + val dockerClient = s"$docker.client" val dockerContainerFactory = s"${docker}.container-factory" val runc = "whisk.runc" val runcTimeouts = s"$runc.timeouts" diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf index 8edb386..c471d1b 100644 --- a/core/invoker/src/main/resources/application.conf +++ b/core/invoker/src/main/resources/application.conf @@ -10,15 +10,25 @@ whisk { poll-interval: 5 minutes } - # Timeouts for docker commands. Set to "Inf" to disable timeout. - docker.timeouts { - run: 1 minute - rm: 1 minute - pull: 10 minutes - ps: 1 minute - inspect: 1 minute - pause: 10 seconds - unpause: 10 seconds + docker.client { + # Docker < 1.13.1 has a known problem: if more than 10 containers are created (docker run) + # concurrently, there is a good chance that some of them will fail. + # See https://github.com/moby/moby/issues/29369 + # Use a semaphore to make sure that at most 10 `docker run` commands are active + # the same time. + # 0 means that there are infinite parallel runs. + parallel-runs: 10 + + # Timeouts for docker commands. Set to "Inf" to disable timeout. + timeouts { + run: 1 minute + rm: 1 minute + pull: 10 minutes + ps: 1 minute + inspect: 1 minute + pause: 10 seconds + unpause: 10 seconds + } } docker.container-factory { diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala index 44102b8..8a87e37 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala @@ -66,6 +66,11 @@ case class DockerClientTimeoutConfig(run: Duration, inspect: Duration) /** + * Configuration for docker client + */ +case class DockerClientConfig(parallelRuns: Int, timeouts: DockerClientTimeoutConfig) + +/** * Serves as interface to the docker CLI tool. * * Be cautious with the ExecutionContext passed to this, as the @@ -74,8 +79,7 @@ case class DockerClientTimeoutConfig(run: Duration, * You only need one instance (and you shouldn't get more). */ class DockerClient(dockerHost: Option[String] = None, - timeouts: DockerClientTimeoutConfig = - loadConfigOrThrow[DockerClientTimeoutConfig](ConfigKeys.dockerTimeouts))( + config: DockerClientConfig = loadConfigOrThrow[DockerClientConfig](ConfigKeys.dockerClient))( executionContext: ExecutionContext)(implicit log: Logging, as: ActorSystem) extends DockerApi with ProcessRunner { @@ -96,8 +100,9 @@ class DockerClient(dockerHost: Option[String] = None, Seq(dockerBin) ++ host } - protected val maxParallelRuns = 10 - protected val runSemaphore = new Semaphore( /* permits= */ maxParallelRuns, /* fair= */ true) + protected val maxParallelRuns = config.parallelRuns + protected val runSemaphore = + new Semaphore( /* permits= */ if (maxParallelRuns > 0) maxParallelRuns else Int.MaxValue, /* fair= */ true) // Docker < 1.13.1 has a known problem: if more than 10 containers are created (docker run) // concurrently, there is a good chance that some of them will fail. @@ -114,7 +119,7 @@ class DockerClient(dockerHost: Option[String] = None, } }.flatMap { _ => // Iff the semaphore was acquired successfully - runCmd(Seq("run", "-d") ++ args ++ Seq(image), timeouts.run) + runCmd(Seq("run", "-d") ++ args ++ Seq(image), config.timeouts.run) .andThen { // Release the semaphore as quick as possible regardless of the runCmd() result case _ => runSemaphore.release() @@ -139,26 +144,26 @@ class DockerClient(dockerHost: Option[String] = None, def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId): Future[ContainerAddress] = runCmd( Seq("inspect", "--format", s"{{.NetworkSettings.Networks.${network}.IPAddress}}", id.asString), - timeouts.inspect).flatMap { + config.timeouts.inspect).flatMap { case "<no value>" => Future.failed(new NoSuchElementException) case stdout => Future.successful(ContainerAddress(stdout)) } def pause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] = - runCmd(Seq("pause", id.asString), timeouts.pause).map(_ => ()) + runCmd(Seq("pause", id.asString), config.timeouts.pause).map(_ => ()) def unpause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] = - runCmd(Seq("unpause", id.asString), timeouts.unpause).map(_ => ()) + runCmd(Seq("unpause", id.asString), config.timeouts.unpause).map(_ => ()) def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit] = - runCmd(Seq("rm", "-f", id.asString), timeouts.rm).map(_ => ()) + runCmd(Seq("rm", "-f", id.asString), config.timeouts.rm).map(_ => ()) def ps(filters: Seq[(String, String)] = Seq.empty, all: Boolean = false)( implicit transid: TransactionId): Future[Seq[ContainerId]] = { val filterArgs = filters.flatMap { case (attr, value) => Seq("--filter", s"$attr=$value") } val allArg = if (all) Seq("--all") else Seq.empty[String] val cmd = Seq("ps", "--quiet", "--no-trunc") ++ allArg ++ filterArgs - runCmd(cmd, timeouts.ps).map(_.lines.toSeq.map(ContainerId.apply)) + runCmd(cmd, config.timeouts.ps).map(_.lines.toSeq.map(ContainerId.apply)) } /** @@ -169,11 +174,11 @@ class DockerClient(dockerHost: Option[String] = None, private val pullsInFlight = TrieMap[String, Future[Unit]]() def pull(image: String)(implicit transid: TransactionId): Future[Unit] = pullsInFlight.getOrElseUpdate(image, { - runCmd(Seq("pull", image), timeouts.pull).map(_ => ()).andThen { case _ => pullsInFlight.remove(image) } + runCmd(Seq("pull", image), config.timeouts.pull).map(_ => ()).andThen { case _ => pullsInFlight.remove(image) } }) def isOomKilled(id: ContainerId)(implicit transid: TransactionId): Future[Boolean] = - runCmd(Seq("inspect", id.asString, "--format", "{{.State.OOMKilled}}"), timeouts.inspect).map(_.toBoolean) + runCmd(Seq("inspect", id.asString, "--format", "{{.State.OOMKilled}}"), config.timeouts.inspect).map(_.toBoolean) private def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId): Future[String] = { val cmd = dockerCmd ++ args