This is an automated email from the ASF dual-hosted git repository. rabbah pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new fdbf073 Add ability to deploy a "hot-standby" controller (#2205) fdbf073 is described below commit fdbf073a386c33aed1b06ed93eaea39ee4382c7b Author: Christian Bickel <git...@cbickel.de> AuthorDate: Thu Jun 22 17:38:59 2017 +0200 Add ability to deploy a "hot-standby" controller (#2205) It is now possible to deploy a hot-standby controller. Each controller needs its own instance. This instance is a consecutive numbering, starting with 0. The state and cache of each controller is not shared to the other controllers. If the base controller crashes, the hot-standby controller will be used. After the base controller is up again, it will be used again. Because of the empty cache after restart, there are no problems with inconsistency. The only problem that could occur is, [...] --- ansible/group_vars/all | 4 +- ansible/roles/controller/tasks/clean.yml | 16 +++- ansible/roles/controller/tasks/deploy.yml | 16 ++-- ansible/roles/kafka/tasks/deploy.yml | 10 +- ansible/roles/nginx/templates/nginx.conf.j2 | 20 +++- ansible/templates/whisk.properties.j2 | 9 +- .../main/scala/whisk/common/TransactionId.scala | 10 +- .../connector/kafka/KafkaConsumerConnector.scala | 4 +- .../src/main/scala/whisk/core/WhiskConfig.scala | 12 +-- .../main/scala/whisk/core/connector/Message.scala | 4 +- .../main/scala/whisk/core/entity/InstanceId.scala | 27 ++++++ .../main/scala/whisk/http/BasicRasService.scala | 28 ------ core/controller/Dockerfile | 3 +- .../scala/whisk/core/controller/Controller.scala | 33 +++++-- .../scala/whisk/core/controller/RestAPIs.scala | 7 +- .../core/controller/actions/PrimitiveActions.scala | 9 +- .../core/entitlement/ActivationThrottler.scala | 32 +++---- .../scala/whisk/core/entitlement/Entitlement.scala | 3 - .../core/loadBalancer/InvokerSupervision.scala | 6 +- .../core/loadBalancer/LoadBalancerService.scala | 11 ++- .../scala/whisk/core/container/ContainerPool.scala | 4 +- .../whisk/core/container/WhiskContainer.scala | 1 + .../whisk/core/containerpool/ContainerProxy.scala | 8 +- .../main/scala/whisk/core/invoker/Invoker.scala | 35 ++++--- .../scala/whisk/core/invoker/InvokerReactive.scala | 14 +-- .../scala/whisk/core/invoker/InvokerServer.scala | 7 +- tests/src/test/scala/common/LoggedFunction.scala | 10 ++ tests/src/test/scala/common/WhiskProperties.java | 16 ++-- tests/src/test/scala/services/HeadersTests.scala | 2 +- tests/src/test/scala/services/PingTests.scala | 102 +++++++++++++-------- .../whisk/core/cli/test/WskBasicUsageTests.scala | 2 +- .../core/container/test/ContainerPoolTests.scala | 3 +- .../containerpool/test/ContainerPoolTests.scala | 1 + .../containerpool/test/ContainerProxyTests.scala | 3 +- .../controller/test/ControllerTestCommon.scala | 6 +- .../scala/whisk/core/database/test/DbUtils.scala | 5 +- .../core/dispatcher/test/DispatcherTests.scala | 2 +- .../test/InvokerSupervisionTests.scala | 10 +- tools/admin/wskadmin | 2 +- tools/build/checkLogs.py | 2 +- 40 files changed, 297 insertions(+), 202 deletions(-) diff --git a/ansible/group_vars/all b/ansible/group_vars/all index 4e42368..c4f438b 100644 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -89,10 +89,11 @@ defaultLimits: # port means outer port controller: - port: 10001 + basePort: 10001 heap: "{{ controller_heap | default('2g') }}" arguments: "{{ controller_arguments | default('') }}" blackboxFraction: 0.10 + instances: "{{ groups['controllers'] | length }}" consul: confdir: "{{ config_root_dir }}/consul" @@ -129,6 +130,7 @@ invoker: serializeDockerPull: true useRunc: false useReactivePool: "{{ invoker_use_reactive_pool | default(false) }}" + instances: "{{ groups['invokers'] | length }}" nginx: confdir: "{{ config_root_dir }}/nginx" diff --git a/ansible/roles/controller/tasks/clean.yml b/ansible/roles/controller/tasks/clean.yml index ba173b1..24910f6 100644 --- a/ansible/roles/controller/tasks/clean.yml +++ b/ansible/roles/controller/tasks/clean.yml @@ -3,6 +3,20 @@ - name: remove controller docker_container: + name: "controller{{ groups['controllers'].index(inventory_hostname) }}" + image: "{{ docker_registry }}{{ docker_image_prefix }}/controller:{{ docker_image_tag }}" + state: absent + ignore_errors: True + +- name: remove controller log directory + file: + path: "{{ whisk_logs_dir }}/controller{{ groups['controllers'].index(inventory_hostname) }}" + state: absent + become: true + +# Remove controller without prefix +- name: remove controller + docker_container: name: controller image: "{{ docker_registry }}{{ docker_image_prefix }}/controller:{{ docker_image_tag }}" state: absent @@ -12,4 +26,4 @@ file: path: "{{ whisk_logs_dir }}/controller" state: absent - become: true \ No newline at end of file + become: true diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml index 9dfbae4..e9b91ca 100644 --- a/ansible/roles/controller/tasks/deploy.yml +++ b/ansible/roles/controller/tasks/deploy.yml @@ -10,41 +10,41 @@ - name: ensure controller log directory is created with permissions file: - path: "{{ whisk_logs_dir }}/controller" + path: "{{ whisk_logs_dir }}/controller{{ groups['controllers'].index(inventory_hostname) }}" state: directory mode: 0777 become: true - name: (re)start controller docker_container: - name: controller + name: controller{{ groups['controllers'].index(inventory_hostname) }} image: "{{ docker_registry }}{{ docker_image_prefix }}/controller:{{ docker_image_tag }}" state: started recreate: true restart_policy: "{{ docker.restart.policy }}" - hostname: controller + hostname: "controller{{ groups['controllers'].index(inventory_hostname) }}" env: - "COMPONENT_NAME": "controller" + "COMPONENT_NAME": "controller{{ groups['controllers'].index(inventory_hostname) }}" "CONSULSERVER_HOST": "{{ groups['consul_servers'] | first }}" "CONSUL_HOST_PORT4": "{{ consul.port.http }}" "PORT": 8080 "WHISK_VERSION_NAME": "{{ whisk_version_name }}" "WHISK_VERSION_DATE": "{{ whisk.version.date }}" "WHISK_VERSION_BUILDNO": "{{ docker_image_tag }}" - "KAFKA_NUMPARTITIONS": 2 "SERVICE_CHECK_HTTP": "/ping" "SERVICE_CHECK_TIMEOUT": "2s" "SERVICE_CHECK_INTERVAL": "15s" "JAVA_OPTS": "-Xmx{{ controller.heap }}" "CONTROLLER_OPTS": "{{ controller.arguments }}" volumes: - - "{{ whisk_logs_dir }}/controller:/logs" + - "{{ whisk_logs_dir }}/controller{{ groups['controllers'].index(inventory_hostname) }}:/logs" ports: - - "{{ controller.port }}:8080" + - "{{ controller.basePort + groups['controllers'].index(inventory_hostname) }}:8080" + command: /bin/sh -c "controller/bin/controller {{ groups['controllers'].index(inventory_hostname) }} >> /logs/controller{{ groups['controllers'].index(inventory_hostname) }}_logs.log 2>&1" - name: wait until the Controller in this host is up and running uri: - url: "http://{{ inventory_hostname }}:{{ controller.port }}/ping" + url: "http://{{ inventory_hostname }}:{{ controller.basePort + groups['controllers'].index(inventory_hostname) }}/ping" register: result until: result.status == 200 retries: 12 diff --git a/ansible/roles/kafka/tasks/deploy.yml b/ansible/roles/kafka/tasks/deploy.yml index ecccd59..d438d33 100644 --- a/ansible/roles/kafka/tasks/deploy.yml +++ b/ansible/roles/kafka/tasks/deploy.yml @@ -47,15 +47,21 @@ retries: 10 delay: 5 -- name: create the active-ack and health topic +- name: create the health topic shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic {{ item }} --replication-factor 1 --partitions 1 --zookeeper {{ inventory_hostname }}:{{ zookeeper.port }}'" with_items: - - completed - health register: command_result failed_when: "not ('Created topic' in command_result.stdout or 'already exists' in command_result.stdout)" changed_when: "'Created topic' in command_result.stdout" +- name: create the active-ack topics + shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic health{{ item.0 }} --replication-factor 1 --partitions 1 --zookeeper {{ inventory_hostname }}:{{ zookeeper.port }}'" + with_indexed_items: "{{ groups['controllers'] }}" + register: command_result + failed_when: "not ('Created topic' in command_result.stdout or 'already exists' in command_result.stdout)" + changed_when: "'Created topic' in command_result.stdout" + - name: create the invoker topics shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic invoker{{ item.0 }} --replication-factor 1 --partitions 1 --zookeeper {{ inventory_hostname }}:{{ zookeeper.port }}'" with_indexed_items: "{{ groups['invokers'] }}" diff --git a/ansible/roles/nginx/templates/nginx.conf.j2 b/ansible/roles/nginx/templates/nginx.conf.j2 index d1c05dc..4795746 100644 --- a/ansible/roles/nginx/templates/nginx.conf.j2 +++ b/ansible/roles/nginx/templates/nginx.conf.j2 @@ -2,7 +2,7 @@ events { {# default: 1024 #} - worker_connections 4096; + worker_connections 4096; } http { @@ -16,6 +16,20 @@ http { '$http_referer $http_user_agent $upstream_addr'; access_log /logs/nginx_access.log combined-upstream; + upstream controllers { + # fail_timeout: period of time the server will be considered unavailable + # Mark the controller as unavailable for at least 60 seconds, to not get any requests during restart. + # Otherwise, nginx would dispatch requests when the container is up, but the backend in the container not. + # From the docs: + # "normally, requests with a non-idempotent method (POST, LOCK, PATCH) are not passed to the next server if a request has been sent to an upstream server" + server {{ groups['controllers'] | first }}:{{ controller.basePort }} fail_timeout=60s; +{% for ip in groups['controllers'] %} + {% if groups['controllers'].index(ip) > 0 %} + server {{ ip }}:{{ controller.basePort + groups['controllers'].index(ip) }} backup; + {% endif %} +{% endfor %} + } + server { listen 443 default ssl; @@ -42,7 +56,7 @@ http { if ($namespace) { rewrite /(.*) /api/v1/web/${namespace}/$1 break; } - proxy_pass http://{{ groups['controllers']|first }}:{{ controller.port }}; + proxy_pass http://controllers; proxy_read_timeout 70s; # 60+10 additional seconds to allow controller to terminate request } @@ -51,7 +65,7 @@ http { if ($namespace) { rewrite ^ /api/v1/web/${namespace}/public/index.html break; } - proxy_pass http://{{ groups['controllers']|first }}:{{ controller.port }}; + proxy_pass http://controllers; proxy_read_timeout 70s; # 60+10 additional seconds to allow controller to terminate request } diff --git a/ansible/templates/whisk.properties.j2 b/ansible/templates/whisk.properties.j2 index 5ee84f7..af444f3 100644 --- a/ansible/templates/whisk.properties.j2 +++ b/ansible/templates/whisk.properties.j2 @@ -47,10 +47,8 @@ limits.triggers.fires.perMinute={{ limits.triggers.fires.perMinute }} {% endif %} consulserver.host={{ groups["consul_servers"]|first }} -controller.host={{ groups["controllers"]|first }} edge.host={{ groups["edge"]|first }} kafka.host={{ groups["kafka"]|first }} -loadbalancer.host={{ groups["controllers"]|first }} router.host={{ groups["edge"]|first }} zookeeper.host={{ groups["kafka"]|first }} invoker.hosts={{ groups["invokers"] | join(",") }} @@ -59,12 +57,14 @@ edge.host.apiport=443 zookeeper.host.port={{ zookeeper.port }} kafka.host.port={{ kafka.port }} kafkaras.host.port={{ kafka.ras.port }} -controller.host.port={{ controller.port }} -loadbalancer.host.port={{ controller.port }} consul.host.port4={{ consul.port.http }} consul.host.port5={{ consul.port.server }} invoker.hosts.baseport={{ invoker.port }} +controller.hosts={{ groups["controllers"] | join(",") }} +controller.host.basePort={{ controller.basePort }} +controller.instances={{ controller.instances }} + invoker.container.network=bridge invoker.container.policy={{ invoker_container_policy_name | default()}} invoker.container.dns={{ invoker_container_network_dns_servers | default()}} @@ -74,6 +74,7 @@ invoker.serializeDockerOp={{ invoker.serializeDockerOp }} invoker.serializeDockerPull={{ invoker.serializeDockerPull }} invoker.useRunc={{ invoker_use_runc | default(invoker.useRunc) }} invoker.useReactivePool={{ invoker.useReactivePool }} +invoker.instances={{ invoker.instances }} consulserver.docker.endpoint={{ groups["consul_servers"]|first }}:{{ docker.port }} edge.docker.endpoint={{ groups["edge"]|first }}:{{ docker.port }} diff --git a/common/scala/src/main/scala/whisk/common/TransactionId.scala b/common/scala/src/main/scala/whisk/common/TransactionId.scala index 0c4fc8e..c05034b 100644 --- a/common/scala/src/main/scala/whisk/common/TransactionId.scala +++ b/common/scala/src/main/scala/whisk/common/TransactionId.scala @@ -31,6 +31,7 @@ import spray.json.JsArray import spray.json.JsNumber import spray.json.JsValue import spray.json.RootJsonFormat +import whisk.core.entity.InstanceId /** * A transaction id for tracking operations in the system that are specific to a request. @@ -175,9 +176,12 @@ object TransactionId { * A thread-safe transaction counter. */ trait TransactionCounter { + val numberOfInstances: Int + val instance: InstanceId + + private lazy val cnt = new AtomicInteger(numberOfInstances + instance.toInt) + def transid(): TransactionId = { - TransactionId(cnt.incrementAndGet()) + TransactionId(cnt.addAndGet(numberOfInstances)) } - - private val cnt = new AtomicInteger(1) } diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala index 9ec2cd8..ea1ff11 100644 --- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala +++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala @@ -33,7 +33,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.common.serialization.ByteArrayDeserializer import whisk.common.Logging -import whisk.common.TransactionCounter import whisk.core.connector.MessageConsumer class KafkaConsumerConnector( @@ -45,8 +44,7 @@ class KafkaConsumerConnector( sessionTimeout: FiniteDuration = 30 seconds, autoCommitInterval: FiniteDuration = 10 seconds)( implicit logging: Logging) - extends MessageConsumer - with TransactionCounter { + extends MessageConsumer { /** * Long poll for messages. Method returns once message are available but no later than given diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala index 267ad38..6af2fcb 100644 --- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala @@ -79,15 +79,15 @@ class WhiskConfig( val invokerSerializeDockerPull = this(WhiskConfig.invokerSerializeDockerPull) val invokerUseRunc = this(WhiskConfig.invokerUseRunc) val invokerUseReactivePool = this(WhiskConfig.invokerUseReactivePool) + val invokerInstances = this(WhiskConfig.invokerInstances) val wskApiHost = this(WhiskConfig.wskApiProtocol) + "://" + this(WhiskConfig.wskApiHostname) + ":" + this(WhiskConfig.wskApiPort) - val controllerHost = this(WhiskConfig.controllerHostName) + ":" + this(WhiskConfig.controllerHostPort) val controllerBlackboxFraction = this.getAsDouble(WhiskConfig.controllerBlackboxFraction, 0.10) val loadbalancerInvokerBusyThreshold = this.getAsInt(WhiskConfig.loadbalancerInvokerBusyThreshold, 16) + val controllerInstances = this(WhiskConfig.controllerInstances) val edgeHost = this(WhiskConfig.edgeHostName) + ":" + this(WhiskConfig.edgeHostApiPort) val kafkaHost = this(WhiskConfig.kafkaHostName) + ":" + this(WhiskConfig.kafkaHostPort) - val loadbalancerHost = this(WhiskConfig.loadbalancerHostName) + ":" + this(WhiskConfig.loadbalancerHostPort) val edgeHostName = this(WhiskConfig.edgeHostName) @@ -237,6 +237,7 @@ object WhiskConfig { val invokerSerializeDockerPull = "invoker.serializeDockerPull" val invokerUseRunc = "invoker.useRunc" val invokerUseReactivePool = "invoker.useReactivePool" + val invokerInstances = "invoker.instances" val wskApiProtocol = "whisk.api.host.proto" val wskApiPort = "whisk.api.host.port" @@ -247,19 +248,16 @@ object WhiskConfig { val kafkaDockerEndpoint = "kafka.docker.endpoint" val mainDockerEndpoint = "main.docker.endpoint" - private val controllerHostName = "controller.host" - private val controllerHostPort = "controller.host.port" private val controllerBlackboxFraction = "controller.blackboxFraction" + val controllerInstances = "controller.instances" val loadbalancerInvokerBusyThreshold = "loadbalancer.invokerBusyThreshold" val kafkaHostName = "kafka.host" - val loadbalancerHostName = "loadbalancer.host" private val zookeeperHostName = "zookeeper.host" private val edgeHostApiPort = "edge.host.apiport" val kafkaHostPort = "kafka.host.port" - private val loadbalancerHostPort = "loadbalancer.host.port" private val zookeeperHostPort = "zookeeper.host.port" val consulServerHost = "consulserver.host" @@ -270,8 +268,6 @@ object WhiskConfig { val consulServer = Map(consulServerHost -> null, consulPort -> null) val invokerHosts = Map(invokerHostsList -> null) val kafkaHost = Map(kafkaHostName -> null, kafkaHostPort -> null) - val controllerHost = Map(controllerHostName -> null, controllerHostPort -> null) - val loadbalancerHost = Map(loadbalancerHostName -> null, loadbalancerHostPort -> null) val runtimesManifest = "runtimes.manifest" diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala index 5ed4ed6..c895690 100644 --- a/common/scala/src/main/scala/whisk/core/connector/Message.scala +++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala @@ -26,6 +26,7 @@ import whisk.core.entity.DocRevision import whisk.core.entity.EntityPath import whisk.core.entity.FullyQualifiedEntityName import whisk.core.entity.Identity +import whisk.core.entity.InstanceId import whisk.core.entity.WhiskActivation /** Basic trait for messages that are sent on a message bus connector. */ @@ -53,6 +54,7 @@ case class ActivationMessage( user: Identity, activationId: ActivationId, activationNamespace: EntityPath, + rootControllerIndex: InstanceId, content: Option[JsObject], cause: Option[ActivationId] = None) extends Message { @@ -80,7 +82,7 @@ object ActivationMessage extends DefaultJsonProtocol { def parse(msg: String) = Try(serdes.read(msg.parseJson)) private implicit val fqnSerdes = FullyQualifiedEntityName.serdes - implicit val serdes = jsonFormat8(ActivationMessage.apply) + implicit val serdes = jsonFormat9(ActivationMessage.apply) } /** diff --git a/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala b/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala new file mode 100644 index 0000000..f573b04 --- /dev/null +++ b/common/scala/src/main/scala/whisk/core/entity/InstanceId.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2015-2016 IBM Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.entity + +import spray.json.DefaultJsonProtocol + +case class InstanceId(val instance: Int) { + def toInt: Int = instance +} + +object InstanceId extends DefaultJsonProtocol { + implicit val serdes = jsonFormat1(InstanceId.apply) +} diff --git a/common/scala/src/main/scala/whisk/http/BasicRasService.scala b/common/scala/src/main/scala/whisk/http/BasicRasService.scala index 4926e4c..f7d0034 100644 --- a/common/scala/src/main/scala/whisk/http/BasicRasService.scala +++ b/common/scala/src/main/scala/whisk/http/BasicRasService.scala @@ -17,10 +17,7 @@ package whisk.http -import akka.actor.Actor -import akka.actor.ActorSystem import akka.event.Logging -import akka.japi.Creator import spray.httpx.SprayJsonSupport._ import whisk.common.Logging import whisk.common.TransactionId @@ -45,28 +42,3 @@ trait BasicRasService extends BasicHttpService { get { complete("pong") } } } - -/** - * Singleton which provides a factory for instances of the BasicRasService. - */ -object BasicRasService { - - def startService(system: ActorSystem, name: String, interface: String, port: Integer)(implicit logging: Logging) = { - BasicHttpService.startService(system, name, interface, port, new ServiceBuilder) - } - - /** - * In spray, we send messages to an Akka Actor. A RasService represents an Actor - * which extends the BasicRasService trait. - */ - private class RasService(implicit val logging: Logging) extends BasicRasService with Actor { - override def actorRefFactory = context - } - - /** - * Akka-style factory for RasService. - */ - private class ServiceBuilder(implicit logging: Logging) extends Creator[RasService] { - def create = new RasService - } -} diff --git a/core/controller/Dockerfile b/core/controller/Dockerfile index 1b24625..ecfc0ec 100644 --- a/core/controller/Dockerfile +++ b/core/controller/Dockerfile @@ -5,7 +5,7 @@ ENV DEBIAN_FRONTEND noninteractive # Install swagger-ui RUN wget --no-verbose https://github.com/swagger-api/swagger-ui/archive/v2.1.4.tar.gz && \ mkdir swagger-ui && \ - tar zxf v2.1.4.tar.gz -C /swagger-ui --strip-components=2 swagger-ui-2.1.4/dist && \ + tar zxf v2.1.4.tar.gz -C /swagger-ui --strip-components=2 swagger-ui-2.1.4/dist && \ rm v2.1.4.tar.gz && \ perl -pi -w -e 's{http://petstore.swagger.io/v2/swagger.json}{/api/v1/api-docs}g;' /swagger-ui/index.html @@ -13,6 +13,5 @@ RUN wget --no-verbose https://github.com/swagger-api/swagger-ui/archive/v2.1.4.t # Copy app jars COPY build/distributions/controller.tar ./ RUN tar xf controller.tar -CMD controller/bin/controller >> /logs/${COMPONENT_NAME}_logs.log 2>&1 EXPOSE 8080 diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala index f383cd0..2740190 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala @@ -32,19 +32,19 @@ import spray.routing.Directive.pimpApply import spray.routing.Route import whisk.common.AkkaLogging import whisk.common.Logging +import whisk.common.LoggingMarkers import whisk.common.TransactionId import whisk.core.WhiskConfig import whisk.core.entitlement._ import whisk.core.entitlement.EntitlementProvider import whisk.core.entity._ -import whisk.core.entity.ExecManifest.Runtimes import whisk.core.entity.ActivationId.ActivationIdGenerator +import whisk.core.entity.ExecManifest.Runtimes import whisk.core.loadBalancer.LoadBalancerService import whisk.http.BasicHttpService import whisk.http.BasicRasService -import whisk.common.LoggingMarkers -import scala.util.{Failure, Success} +import scala.util.{ Failure, Success } /** * The Controller is the service that provides the REST API for OpenWhisk. @@ -53,6 +53,15 @@ import scala.util.{Failure, Success} * * Spray sends messages to akka Actors -- the Controller is an Actor, ready to receive messages. * + * It is possible to deploy a hot-standby controller. Each controller needs its own instance. This instance is a + * consecutive numbering, starting with 0. + * The state and cache of each controller is not shared to the other controllers. + * If the base controller crashes, the hot-standby controller will be used. After the base controller is up again, + * it will be used again. Because of the empty cache after restart, there are no problems with inconsistency. + * The only problem that could occur is, that the base controller is not reachable, but does not restart. After switching + * back to the base controller, there could be an inconsistency in the cache (e.g. if a user has updated an action). This + * inconsistency will be resolved by its own after removing the cached item, 5 minutes after it has been generated. + * * @Idioglossia uses the spray-routing DSL * http://spray.io/documentation/1.1.3/spray-routing/advanced-topics/understanding-dsl-structure/ * @@ -62,7 +71,7 @@ import scala.util.{Failure, Success} * @param executionContext Scala runtime support for concurrent operations */ class Controller( - instance: Int, + override val instance: InstanceId, runtimes: Runtimes, implicit val whiskConfig: WhiskConfig, implicit val logging: Logging) @@ -72,6 +81,8 @@ class Controller( // each akka Actor has an implicit context override def actorRefFactory: ActorContext = context + override val numberOfInstances = whiskConfig.controllerInstances.toInt + /** * A Route in spray is technically a function taking a RequestContext as a parameter. * @@ -96,7 +107,7 @@ class Controller( } } - TransactionId.controller.mark(this, LoggingMarkers.CONTROLLER_STARTUP(instance), s"starting controller instance ${instance}") + TransactionId.controller.mark(this, LoggingMarkers.CONTROLLER_STARTUP(instance.toInt), s"starting controller instance ${instance.toInt}") // initialize datastores private implicit val actorSystem = context.system @@ -106,7 +117,7 @@ class Controller( private implicit val activationStore = WhiskActivationStore.datastore(whiskConfig) // initialize backend services - private implicit val loadBalancer = new LoadBalancerService(whiskConfig, entityStore) + private implicit val loadBalancer = new LoadBalancerService(whiskConfig, instance, entityStore) private implicit val consulServer = whiskConfig.consulServer private implicit val entitlementProvider = new LocalEntitlementProvider(whiskConfig, loadBalancer) private implicit val activationIdFactory = new ActivationIdGenerator {} @@ -115,6 +126,7 @@ class Controller( Collection.initialize(entityStore) /** The REST APIs. */ + implicit val controllerInstance = instance private val apiv1 = new RestAPIVersion("api", "v1") private val swagger = new SwaggerDocs(Uri.Path.Empty, "infoswagger.json") @@ -144,6 +156,7 @@ object Controller { // a value, and whose values are default values. A null value in the Map means there is // no default value specified, so it must appear in the properties file def requiredProperties = Map(WhiskConfig.servicePort -> 8080.toString) ++ + Map(WhiskConfig.controllerInstances -> 1.toString) ++ ExecManifest.requiredProperties ++ RestApiCommons.requiredProperties ++ LoadBalancerService.requiredProperties ++ @@ -164,7 +177,7 @@ object Controller { "runtimes" -> runtimes.toJson) // akka-style factory to create a Controller object - private class ServiceBuilder(config: WhiskConfig, instance: Int, logging: Logging) extends Creator[Controller] { + private class ServiceBuilder(config: WhiskConfig, instance: InstanceId, logging: Logging) extends Creator[Controller] { // this method is not reached unless ExecManifest was initialized successfully def create = new Controller(instance, ExecManifest.runtimesManifest, config, logging) } @@ -177,8 +190,8 @@ object Controller { val config = new WhiskConfig(requiredProperties, optionalProperties) // if deploying multiple instances (scale out), must pass the instance number as the - // second argument. (TODO .. seems fragile) - val instance = if (args.length > 0) args(1).toInt else 0 + require(args.length >= 1, "controller instance required") + val instance = args(0).toInt def abort() = { logger.error(this, "Bad configuration, cannot start.") @@ -194,7 +207,7 @@ object Controller { ExecManifest.initialize(config) match { case Success(_) => val port = config.servicePort.toInt - BasicHttpService.startService(actorSystem, "controller", "0.0.0.0", port, new ServiceBuilder(config, instance, logger)) + BasicHttpService.startService(actorSystem, "controller", "0.0.0.0", port, new ServiceBuilder(config, InstanceId(instance), logger)) case Failure(t) => logger.error(this, s"Invalid runtimes manifest: $t") diff --git a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala index 6202868..08cf874 100644 --- a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala +++ b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala @@ -19,8 +19,8 @@ package whisk.core.controller import scala.concurrent.ExecutionContext +import RestApiCommons._ import akka.actor.ActorSystem - import spray.http.AllOrigins import spray.http.HttpHeaders._ import spray.http.StatusCodes._ @@ -31,7 +31,6 @@ import spray.json.DefaultJsonProtocol._ import spray.routing.Directive.pimpApply import spray.routing.Directives import spray.routing.Route - import whisk.common.Logging import whisk.common.TransactionId import whisk.core.WhiskConfig @@ -42,7 +41,6 @@ import whisk.core.entity._ import whisk.core.entity.ActivationId.ActivationIdGenerator import whisk.core.entity.types._ import whisk.core.loadBalancer.LoadBalancerService -import RestApiCommons._ /** * Abstract class which provides basic Directives which are used to construct route structures @@ -106,6 +104,7 @@ protected[controller] object RestApiCommons { override val webApiDirectives: WebApiDirectives)( implicit override val authStore: AuthStore, implicit val entityStore: EntityStore, + override val activeAckTopicIndex: InstanceId, override val activationStore: ActivationStore, override val entitlementProvider: EntitlementProvider, override val activationIdFactory: ActivationIdGenerator, @@ -133,6 +132,7 @@ protected[controller] trait RespondWithHeaders extends Directives { * An object which creates the Routes that define v1 of the whisk REST API. */ protected[controller] class RestAPIVersion(apipath: String, apiversion: String)( + implicit val activeAckTopicIndex: InstanceId, implicit val authStore: AuthStore, implicit val entityStore: EntityStore, implicit val activationStore: ActivationStore, @@ -224,6 +224,7 @@ protected[controller] class RestAPIVersion(apipath: String, apiversion: String)( val apipath: String, val apiversion: String)( implicit override val actorSystem: ActorSystem, + override val activeAckTopicIndex: InstanceId, override val entityStore: EntityStore, override val activationStore: ActivationStore, override val entitlementProvider: EntitlementProvider, diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala index 9f98a71..0911a75 100644 --- a/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/actions/PrimitiveActions.scala @@ -29,8 +29,8 @@ import whisk.common.Logging import whisk.common.LoggingMarkers import whisk.common.TransactionId import whisk.core.connector.ActivationMessage -import whisk.core.controller.WhiskServices import whisk.core.controller.WhiskActionsApi +import whisk.core.controller.WhiskServices import whisk.core.database.NoDocumentException import whisk.core.entity._ import whisk.core.entity.types.ActivationStore @@ -48,6 +48,12 @@ protected[actions] trait PrimitiveActions { protected implicit val logging: Logging + /** + * The index of the active ack topic, this controller is listening for. + * Typically this is also the instance number of the controller + */ + protected val activeAckTopicIndex: InstanceId + /** Database service to CRUD actions. */ protected val entityStore: EntityStore @@ -99,6 +105,7 @@ protected[actions] trait PrimitiveActions { user, activationIdFactory.make(), // activation id created here activationNamespace = user.namespace.toPath, + activeAckTopicIndex, args, cause = cause) diff --git a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala index 4e71260..4ba44e9 100644 --- a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala +++ b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala @@ -21,12 +21,9 @@ import scala.concurrent.Future import scala.concurrent.duration.DurationInt import akka.actor.ActorSystem -import spray.json._ -import spray.json.DefaultJsonProtocol._ -import whisk.common.ConsulClient -import whisk.common.ConsulKV.ControllerKeys import whisk.common.Logging import whisk.common.Scheduler +import whisk.common.TransactionId import whisk.core.entity.Subject import whisk.core.loadBalancer.LoadBalancer @@ -54,34 +51,27 @@ class ActivationThrottler(consulServer: String, loadBalancer: LoadBalancer, conc private var userActivationCounter = Map.empty[String, Int] private val healthCheckInterval = 5.seconds - private val consul = new ConsulClient(consulServer) /** * Checks whether the operation should be allowed to proceed. */ - def check(subject: Subject): Boolean = userActivationCounter.getOrElse(subject.asString, 0) < concurrencyLimit + def check(subject: Subject)(implicit tid: TransactionId): Boolean = { + val concurrentActivations = userActivationCounter.getOrElse(subject.asString, 0) + logging.info(this, s"subject = ${subject.toString}, concurrent activations = $concurrentActivations, below limit = $concurrencyLimit") + concurrentActivations < concurrencyLimit + } /** * Checks whether the system is in a generally overloaded state. */ - def isOverloaded = userActivationCounter.values.sum > systemOverloadLimit - - /** - * Publish into Consul KV values showing the controller's view - * of concurrent activations on a per-user basis. - */ - private def publishUserConcurrentActivation() = { - // Any sort of partitioning will be ok for monitoring - Future.sequence(userActivationCounter.groupBy(_._1.take(1)).map { - case (prefix, items) => - val key = ControllerKeys.userActivationCountKey + "/" + prefix - consul.kv.put(key, items.toJson.compactPrint) - }) + def isOverloaded()(implicit tid: TransactionId): Boolean = { + val concurrentActivations = userActivationCounter.values.sum + logging.info(this, s"concurrent activations in system = $concurrentActivations, below limit = $systemOverloadLimit") + concurrentActivations > systemOverloadLimit } Scheduler.scheduleWaitAtLeast(healthCheckInterval) { () => userActivationCounter = loadBalancer.getActiveUserActivationCounts - publishUserConcurrentActivation() + Future.successful(Unit) } - } diff --git a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala index caf33ae..6c205b5 100644 --- a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala +++ b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala @@ -28,7 +28,6 @@ import Privilege.REJECT import akka.actor.ActorSystem import spray.http.StatusCodes.Forbidden import spray.http.StatusCodes.TooManyRequests -import whisk.common.ConsulClient import whisk.common.Logging import whisk.common.TransactionId import whisk.core.WhiskConfig @@ -87,8 +86,6 @@ protected[core] abstract class EntitlementProvider(config: WhiskConfig, loadBala private val triggerRateThrottler = new RateThrottler("triggers per minute", config.triggerFirePerMinuteLimit.toInt) private val concurrentInvokeThrottler = new ActivationThrottler(config.consulServer, loadBalancer, config.actionInvokeConcurrentLimit.toInt, config.actionInvokeSystemOverloadLimit.toInt) - private val consul = new ConsulClient(config.consulServer) - /** * Grants a subject the right to access a resources. * diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala index 436c247..d7e9025 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala @@ -56,6 +56,7 @@ import whisk.core.entity.DocRevision import whisk.core.entity.EntityName import whisk.core.entity.ExecManifest import whisk.core.entity.Identity +import whisk.core.entity.InstanceId import whisk.core.entity.Secret import whisk.core.entity.Subject import whisk.core.entity.UUID @@ -190,7 +191,7 @@ object InvokerPool { * This finite state-machine represents an Invoker in its possible * states "Healthy" and "Offline". */ -class InvokerActor extends FSM[InvokerState, InvokerInfo] { +class InvokerActor(controllerInstance: InstanceId) extends FSM[InvokerState, InvokerInfo] { implicit val transid = TransactionId.invokerHealth implicit val logging = new AkkaLogging(context.system.log) def name = self.path.name @@ -311,6 +312,7 @@ class InvokerActor extends FSM[InvokerState, InvokerInfo] { // Create a new Activation ID for this activation activationId = new ActivationIdGenerator {}.make(), activationNamespace = action.namespace, + rootControllerIndex = controllerInstance, content = None) context.parent ! ActivationRequest(activationMessage, name) @@ -328,7 +330,7 @@ class InvokerActor extends FSM[InvokerState, InvokerInfo] { } object InvokerActor { - def props() = Props[InvokerActor] + def props(controllerInstance: InstanceId) = Props(new InvokerActor(controllerInstance)) val bufferSize = 10 val bufferErrorTolerance = 3 diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala index 7e633c5..636f157 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala @@ -18,7 +18,6 @@ package whisk.core.loadBalancer import java.nio.charset.StandardCharsets - import java.time.{ Clock, Instant } import scala.collection.concurrent.TrieMap @@ -50,6 +49,7 @@ import whisk.core.connector.{ ActivationMessage, CompletionMessage } import whisk.core.connector.MessageProducer import whisk.core.database.NoDocumentException import whisk.core.entity.{ ActivationId, CodeExec, WhiskAction, WhiskActivation } +import whisk.core.entity.InstanceId import whisk.core.entity.WhiskAction import whisk.core.entity.types.EntityStore import scala.annotation.tailrec @@ -77,7 +77,7 @@ trait LoadBalancer { } -class LoadBalancerService(config: WhiskConfig, entityStore: EntityStore)(implicit val actorSystem: ActorSystem, logging: Logging) extends LoadBalancer { +class LoadBalancerService(config: WhiskConfig, instance: InstanceId, entityStore: EntityStore)(implicit val actorSystem: ActorSystem, logging: Logging) extends LoadBalancer { /** The execution context for futures */ implicit val executionContext: ExecutionContext = actorSystem.dispatcher @@ -220,8 +220,9 @@ class LoadBalancerService(config: WhiskConfig, entityStore: EntityStore)(implici } val consul = new ConsulClient(config.consulServer) - val pingConsumer = new KafkaConsumerConnector(config.kafkaHost, "health", "health") - val invokerFactory = (f: ActorRefFactory, name: String) => f.actorOf(InvokerActor.props, name) + // Each controller gets its own Group Id, to receive all messages + val pingConsumer = new KafkaConsumerConnector(config.kafkaHost, s"health${instance.toInt}", "health") + val invokerFactory = (f: ActorRefFactory, name: String) => f.actorOf(InvokerActor.props(instance), name) actorSystem.actorOf(InvokerPool.props(invokerFactory, consul.kv, invoker => { clearInvokerState(invoker) @@ -230,7 +231,7 @@ class LoadBalancerService(config: WhiskConfig, entityStore: EntityStore)(implici } /** Subscribes to active acks (completion messages from the invokers). */ - private val activeAckConsumer = new KafkaConsumerConnector(config.kafkaHost, "completions", "completed") + private val activeAckConsumer = new KafkaConsumerConnector(config.kafkaHost, "completions", s"completed${instance.toInt}") /** Registers a handler for received active acks from invokers. */ activeAckConsumer.onMessage((topic, _, _, bytes) => { diff --git a/core/invoker/src/main/scala/whisk/core/container/ContainerPool.scala b/core/invoker/src/main/scala/whisk/core/container/ContainerPool.scala index 0b28f2e..af3d9e4 100644 --- a/core/invoker/src/main/scala/whisk/core/container/ContainerPool.scala +++ b/core/invoker/src/main/scala/whisk/core/container/ContainerPool.scala @@ -52,7 +52,7 @@ import whisk.core.entity._ */ class ContainerPool( config: WhiskConfig, - invokerInstance: Integer = 0, + invokerInstance: InstanceId = InstanceId(0), standalone: Boolean = false, saveContainerLog: Boolean = false)(implicit actorSystem: ActorSystem, val logging: Logging) extends ContainerUtils { @@ -429,7 +429,7 @@ class ContainerPool( // Sample container name: wsk1_1_joeibmcomhelloWorldDemo_20150901T202701852Z private def makeContainerName(localName: String): ContainerName = - ContainerCounter.containerName(invokerInstance.toString(), localName) + ContainerCounter.containerName(invokerInstance.toInt.toString, localName) private def makeContainerName(action: WhiskAction): ContainerName = makeContainerName(action.fullyQualifiedName(true).toString) diff --git a/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala b/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala index a68be9d..2e0e378 100644 --- a/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala +++ b/core/invoker/src/main/scala/whisk/core/container/WhiskContainer.scala @@ -120,6 +120,7 @@ class WhiskContainer( WhiskAuth(Subject(), AuthKey()).toIdentity, ActivationId(), EntityPath("no_namespace"), + InstanceId(0), None) run(msg, params, 30000.milliseconds)(system, TransactionId.testing) } diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala index 55c7999..1b8c7fe 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala @@ -96,7 +96,7 @@ case object ActivationCompleted */ class ContainerProxy( factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container], - sendActiveAck: (TransactionId, WhiskActivation) => Future[Any], + sendActiveAck: (TransactionId, WhiskActivation, InstanceId) => Future[Any], storeActivation: (TransactionId, WhiskActivation) => Future[Any], unusedTimeout: FiniteDuration, pauseGrace: FiniteDuration) extends FSM[ContainerState, ContainerData] with Stash { @@ -154,7 +154,7 @@ class ContainerProxy( // transitions to Running val activation = ContainerProxy.constructWhiskActivation(job, Interval.zero, response) self ! ActivationCompleted - sendActiveAck(transid, activation) + sendActiveAck(transid, activation, job.msg.rootControllerIndex) storeActivation(transid, activation) }.flatMap { container => @@ -361,7 +361,7 @@ class ContainerProxy( // asynchronous. activation.andThen { // the activation future will always complete with Success - case Success(ack) => sendActiveAck(tid, ack) + case Success(ack) => sendActiveAck(tid, ack, job.msg.rootControllerIndex) }.flatMap { activation => container.logs(job.action.limits.logs.asMegaBytes, job.action.exec.sentinelledLogs).map { logs => activation.withLogs(ActivationLogs(logs.toVector)) @@ -380,7 +380,7 @@ class ContainerProxy( object ContainerProxy { def props(factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container], - ack: (TransactionId, WhiskActivation) => Future[Any], + ack: (TransactionId, WhiskActivation, InstanceId) => Future[Any], store: (TransactionId, WhiskActivation) => Future[Any], unusedTimeout: FiniteDuration = 10.minutes, pauseGrace: FiniteDuration = 50.milliseconds) = Props(new ContainerProxy(factory, ack, store, unusedTimeout, pauseGrace)) diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala index 4ccc129..813cc6b 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala @@ -18,7 +18,6 @@ package whisk.core.invoker import java.nio.charset.StandardCharsets - import java.time.{ Clock, Instant } import scala.concurrent.{ Await, ExecutionContext, Future } @@ -26,6 +25,7 @@ import scala.concurrent.Promise import scala.concurrent.duration.{ Duration, DurationInt } import scala.language.postfixOps import scala.util.{ Failure, Success } +import scala.util.Try import akka.actor.{ ActorRef, ActorSystem, actorRef2Scala } import akka.japi.Creator @@ -33,10 +33,13 @@ import spray.json._ import spray.json.DefaultJsonProtocol._ import whisk.common.{ Counter, Logging, LoggingMarkers, TransactionId } import whisk.common.AkkaLogging +import whisk.common.Scheduler import whisk.connector.kafka.{ KafkaConsumerConnector, KafkaProducerConnector } import whisk.core.WhiskConfig import whisk.core.WhiskConfig.{ consulServer, dockerImagePrefix, dockerRegistry, kafkaHost, logsDir, servicePort, whiskVersion, invokerUseReactivePool } import whisk.core.connector.{ ActivationMessage, CompletionMessage } +import whisk.core.connector.MessageProducer +import whisk.core.connector.PingMessage import whisk.core.container._ import whisk.core.dispatcher.{ Dispatcher, MessageHandler } import whisk.core.dispatcher.ActivationFeed.{ ActivationNotification, ContainerReleased, FailedActivation } @@ -44,10 +47,6 @@ import whisk.core.entity._ import whisk.http.BasicHttpService import whisk.http.Messages import whisk.utils.ExecutionContextFactory -import whisk.common.Scheduler -import whisk.core.connector.PingMessage -import scala.util.Try -import whisk.core.connector.MessageProducer /** * A kafka message handler that invokes actions as directed by message on topic "/actions/invoke". @@ -59,16 +58,16 @@ import whisk.core.connector.MessageProducer */ class Invoker( config: WhiskConfig, - instance: Int, + instance: InstanceId, activationFeed: ActorRef, producer: MessageProducer, runningInContainer: Boolean = true)(implicit actorSystem: ActorSystem, logging: Logging) - extends MessageHandler(s"invoker$instance") + extends MessageHandler(s"invoker${instance.toInt}") with ActionLogDriver { private implicit val executionContext: ExecutionContext = actorSystem.dispatcher - TransactionId.invoker.mark(this, LoggingMarkers.INVOKER_STARTUP(instance), s"starting invoker instance ${instance}") + TransactionId.invoker.mark(this, LoggingMarkers.INVOKER_STARTUP(instance.toInt), s"starting invoker instance ${instance.toInt}") /** * This is the handler for the kafka message @@ -257,7 +256,7 @@ class Invoker( val activationResult = makeWhiskActivation(msg, EntityPath(action.fullyQualifiedName(false).toString), action.version, activationResponse, activationInterval, Some(action.limits)) val completeMsg = CompletionMessage(transid, activationResult, this.name) - producer.send("completed", completeMsg) map { status => + producer.send(s"completed${msg.rootControllerIndex.toInt}", completeMsg) map { status => logging.info(this, s"posted completion of activation ${msg.activationId}") } @@ -409,7 +408,8 @@ object Invoker { logsDir -> null, dockerRegistry -> null, dockerImagePrefix -> null, - invokerUseReactivePool -> false.toString) ++ + invokerUseReactivePool -> false.toString, + WhiskConfig.invokerInstances -> null) ++ ExecManifest.requiredProperties ++ WhiskAuthStore.requiredProperties ++ WhiskEntityStore.requiredProperties ++ @@ -421,7 +421,7 @@ object Invoker { def main(args: Array[String]): Unit = { require(args.length == 1, "invoker instance required") - val instance = args(0).toInt + val invokerInstance = InstanceId(args(0).toInt) implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext() implicit val actorSystem: ActorSystem = ActorSystem( @@ -449,7 +449,7 @@ object Invoker { abort() } - val topic = s"invoker$instance" + val topic = s"invoker${invokerInstance.toInt}" val groupid = "invokers" val maxdepth = ContainerPool.getDefaultMaxActive(config) val consumer = new KafkaConsumerConnector(config.kafkaHost, groupid, topic, maxdepth) @@ -457,9 +457,9 @@ object Invoker { val dispatcher = new Dispatcher(consumer, 500 milliseconds, 2 * maxdepth, actorSystem) val invoker = if (Try(config.invokerUseReactivePool.toBoolean).getOrElse(false)) { - new InvokerReactive(config, instance, dispatcher.activationFeed, producer) + new InvokerReactive(config, invokerInstance, dispatcher.activationFeed, producer) } else { - new Invoker(config, instance, dispatcher.activationFeed, producer) + new Invoker(config, invokerInstance, dispatcher.activationFeed, producer) } logger.info(this, s"using $invoker") @@ -467,18 +467,15 @@ object Invoker { dispatcher.start() Scheduler.scheduleWaitAtMost(1.seconds)(() => { - producer.send("health", PingMessage(s"invoker$instance")).andThen { + producer.send("health", PingMessage(s"invoker${invokerInstance.toInt}")).andThen { case Failure(t) => logger.error(this, s"failed to ping the controller: $t") } }) val port = config.servicePort.toInt BasicHttpService.startService(actorSystem, "invoker", "0.0.0.0", port, new Creator[InvokerServer] { - def create = new InvokerServer { - override implicit val logging = logger - } + def create = new InvokerServer(invokerInstance, config.invokerInstances.toInt) }) - } } diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala index cf7e3ae..b90db49 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala @@ -37,27 +37,27 @@ import whisk.core.connector.CompletionMessage import whisk.core.connector.MessageProducer import whisk.core.container.{ ContainerPool => OldContainerPool } import whisk.core.container.Interval +import whisk.core.containerpool.ContainerPool import whisk.core.containerpool.ContainerProxy import whisk.core.containerpool.PrewarmingConfig import whisk.core.containerpool.Run import whisk.core.containerpool.docker.DockerClientWithFileAccess import whisk.core.containerpool.docker.DockerContainer import whisk.core.containerpool.docker.RuncClient +import whisk.core.database.NoDocumentException import whisk.core.dispatcher.ActivationFeed.FailedActivation import whisk.core.dispatcher.MessageHandler import whisk.core.entity._ import whisk.core.entity.ExecManifest.ImageName import whisk.core.entity.size._ -import whisk.core.containerpool.ContainerPool -import whisk.core.database.NoDocumentException import whisk.http.Messages class InvokerReactive( config: WhiskConfig, - instance: Int, + instance: InstanceId, activationFeed: ActorRef, producer: MessageProducer)(implicit actorSystem: ActorSystem, logging: Logging) - extends MessageHandler(s"invoker$instance") { + extends MessageHandler(s"invoker${instance.toInt}") { implicit val ec = actorSystem.dispatcher @@ -107,9 +107,9 @@ class InvokerReactive( } /** Sends an active-ack. */ - val ack = (tid: TransactionId, activation: WhiskActivation) => { + val ack = (tid: TransactionId, activation: WhiskActivation, controllerInstance: InstanceId) => { implicit val transid = tid - producer.send("completed", CompletionMessage(tid, activation, s"invoker$instance")).andThen { + producer.send(s"completed${controllerInstance.toInt}", CompletionMessage(tid, activation, s"invoker${instance.toInt}")).andThen { case Success(_) => logging.info(this, s"posted completion of activation ${activation.activationId}") } } @@ -195,7 +195,7 @@ class InvokerReactive( }) activationFeed ! FailedActivation(msg.transid) - ack(msg.transid, activation) + ack(msg.transid, activation, msg.rootControllerIndex) store(msg.transid, activation) } } diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerServer.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerServer.scala index 9b71a8a..cd083cb 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerServer.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerServer.scala @@ -18,13 +18,18 @@ package whisk.core.invoker import akka.actor.Actor +import whisk.common.Logging +import whisk.core.entity.InstanceId import whisk.http.BasicRasService /** * Implements web server to handle certain REST API calls. * Currently provides a health ping route, only. */ -trait InvokerServer +class InvokerServer( + override val instance: InstanceId, + override val numberOfInstances: Int)( + override implicit val logging: Logging) extends BasicRasService with Actor { diff --git a/tests/src/test/scala/common/LoggedFunction.scala b/tests/src/test/scala/common/LoggedFunction.scala index 467bc5a..e6c0aad 100644 --- a/tests/src/test/scala/common/LoggedFunction.scala +++ b/tests/src/test/scala/common/LoggedFunction.scala @@ -38,6 +38,15 @@ class LoggedFunction2[A1, A2, B](body: (A1, A2) => B) extends Function2[A1, A2, } } +class LoggedFunction3[A1, A2, A3, B](body: (A1, A2, A3) => B) extends Function3[A1, A2, A3, B] { + val calls = mutable.Buffer[(A1, A2, A3)]() + + override def apply(v1: A1, v2: A2, v3: A3): B = { + calls += ((v1, v2, v3)) + body(v1, v2, v3) + } +} + class LoggedFunction5[A1, A2, A3, A4, A5, B](body: (A1, A2, A3, A4, A5) => B) extends Function5[A1, A2, A3, A4, A5, B] { val calls = mutable.Buffer[(A1, A2, A3, A4, A5)]() @@ -49,5 +58,6 @@ class LoggedFunction5[A1, A2, A3, A4, A5, B](body: (A1, A2, A3, A4, A5) => B) ex object LoggedFunction { def apply[A1, A2, B](body: (A1, A2) => B) = new LoggedFunction2(body) + def apply[A1, A2, A3, B](body: (A1, A2, A3) => B) = new LoggedFunction3(body) def apply[A1, A2, A3, A4, A5, B](body: (A1, A2, A3, A4, A5) => B) = new LoggedFunction5(body) } diff --git a/tests/src/test/scala/common/WhiskProperties.java b/tests/src/test/scala/common/WhiskProperties.java index 3591225..70176e7 100644 --- a/tests/src/test/scala/common/WhiskProperties.java +++ b/tests/src/test/scala/common/WhiskProperties.java @@ -238,20 +238,20 @@ public class WhiskProperties { return Integer.parseInt(whiskProperties.getProperty("edge.host.apiport")); } - public static String getLoadbalancerHost() { - return whiskProperties.getProperty("loadbalancer.host"); + public static String getControllerHosts() { + return whiskProperties.getProperty("controller.hosts"); } - public static int getLoadbalancerPort() { - return Integer.parseInt(whiskProperties.getProperty("loadbalancer.host.port")); + public static int getControllerBasePort() { + return Integer.parseInt(whiskProperties.getProperty("controller.host.basePort")); } - public static String getControllerHost() { - return whiskProperties.getProperty("controller.host"); + public static String getBaseControllerHost() { + return getControllerHosts().split(",")[0]; } - public static int getControllerPort() { - return Integer.parseInt(whiskProperties.getProperty("controller.host.port")); + public static String getBaseControllerAddress() { + return getBaseControllerHost() + ":" + getControllerBasePort(); } public static int getMaxActionInvokesPerMinute() { diff --git a/tests/src/test/scala/services/HeadersTests.scala b/tests/src/test/scala/services/HeadersTests.scala index 0f43d10..82a85d6 100644 --- a/tests/src/test/scala/services/HeadersTests.scala +++ b/tests/src/test/scala/services/HeadersTests.scala @@ -75,7 +75,7 @@ class HeadersTests extends FlatSpec val allowOrigin = `Access-Control-Allow-Origin`(AllOrigins) val allowHeaders = `Access-Control-Allow-Headers`("Authorization", "Content-Type") - val url = Uri(s"http://${WhiskProperties.getControllerHost}:${WhiskProperties.getControllerPort}") + val url = Uri(s"http://${WhiskProperties.getBaseControllerAddress()}") val pipeline: HttpRequest => Future[HttpResponse] = ( sendReceive ~> unmarshal[HttpResponse]) diff --git a/tests/src/test/scala/services/PingTests.scala b/tests/src/test/scala/services/PingTests.scala index 3883f7f..72d840d 100644 --- a/tests/src/test/scala/services/PingTests.scala +++ b/tests/src/test/scala/services/PingTests.scala @@ -17,19 +17,34 @@ package services -import org.junit.Assert.assertTrue - import java.io.File -import org.junit.Rule -import org.junit.Test -import org.junit.rules.TestRule +import scala.concurrent.Await +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.DurationInt +import scala.util.Try + +import org.junit.Assert.assertTrue +import org.junit.runner.RunWith +import org.scalatest.FlatSpec +import org.scalatest.Matchers +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.junit.JUnitRunner import com.jayway.restassured.RestAssured +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.HttpRequest +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.stream.ActorMaterializer +import akka.stream.Materializer import common.TestUtils -import common.WhiskProperties import common.TestUtils.RunResult +import common.WhiskProperties +import common.WskActorSystem +import common.WskTestHelpers +import akka.http.scaladsl.model.StatusCodes /** * Basic tests to check that a Whisk installation is healthy in that all @@ -38,14 +53,27 @@ import common.TestUtils.RunResult object PingTests { val bin: File = WhiskProperties.getFileRelativeToWhiskHome("tools/health") - def isAlive(name: String, whiskPropertyFile: String): RunResult = { + def isAliveScript(name: String, whiskPropertyFile: String): RunResult = { TestUtils.runCmd(TestUtils.SUCCESS_EXIT, bin, WhiskProperties.python, "isAlive", "-d", whiskPropertyFile, "--wait", "30", name) } + + def ping(host: String, port: Int)(implicit actorSystem: ActorSystem, ec: ExecutionContext, materializer: Materializer) = { + val response = Try { Await.result(Http().singleRequest(HttpRequest(uri = s"http://$host:$port/ping")), 10.seconds) }.toOption + + response.map { res => + (res.status, Await.result(Unmarshal(res).to[String], 10.seconds)) + } + } } -class PingTests { - @Rule - def watcher(): TestRule = TestUtils.makeTestWatcher +@RunWith(classOf[JUnitRunner]) +class PingTests extends FlatSpec + with Matchers + with WskTestHelpers + with ScalaFutures + with WskActorSystem { + + implicit val materializer = ActorMaterializer() /** * Check that the docker REST interface at endpoint is up. envVar is the @@ -61,45 +89,41 @@ class PingTests { assertTrue(response.contains("Containers")) } - /** - * Check that the main docker endpoint is functioning. - */ - @Test - def pingMainDocker(): Unit = { + behavior of "PingTest" + + it should "check that the main docker endpoint is functioning" in { pingDocker("main.docker.endpoint", WhiskProperties.getMainDockerEndpoint) } - /** - * Check the kafka docker endpoint is functioning. - */ - @Test - def pingKafkaDocker(): Unit = { + it should "Check the kafka docker endpoint is functioning" in { pingDocker("kafka.docker.endpoint", WhiskProperties.getKafkaDockerEndpoint) } - /** - * Check that the zookeeper endpoint is up and running - */ - @Test - def pingZookeeper(): Unit = { - PingTests.isAlive("zookeeper", WhiskProperties.getFileRelativeToWhiskHome(".").getAbsolutePath) + it should "check that the zookeeper endpoint is up and running" in { + PingTests.isAliveScript("zookeeper", WhiskProperties.getFileRelativeToWhiskHome(".").getAbsolutePath) } - /** - * Check that the invoker endpoints are up and running - */ - @Test - def pingInvoker(): Unit = { - for (i <- 0 until WhiskProperties.numberOfInvokers) { - PingTests.isAlive("invoker" + i, WhiskProperties.getFileRelativeToWhiskHome(".").getAbsolutePath) + it should "Check that the invoker endpoints are up and running" in { + val basePort = WhiskProperties.getProperty("invoker.hosts.baseport").toInt + val invokers = WhiskProperties.getInvokerHosts.zipWithIndex.map { + case (invoker, instance) => + val res = PingTests.ping(invoker, basePort + instance) + + res shouldBe defined + res.get._1 shouldBe StatusCodes.OK + res.get._2 shouldBe "pong" } } - /** - * Check that the controller endpoint is up and running - */ - @Test - def pingController(): Unit = { - PingTests.isAlive("controller", WhiskProperties.getFileRelativeToWhiskHome(".").getAbsolutePath) + it should "check that the controller endpoint is up and running" in { + val basePort = WhiskProperties.getControllerBasePort() + val controllers = WhiskProperties.getControllerHosts().split(",").zipWithIndex.map { + case (controller, instance) => + val res = PingTests.ping(controller, basePort + instance) + + res shouldBe defined + res.get._1 shouldBe StatusCodes.OK + res.get._2 shouldBe "pong" + } } } diff --git a/tests/src/test/scala/whisk/core/cli/test/WskBasicUsageTests.scala b/tests/src/test/scala/whisk/core/cli/test/WskBasicUsageTests.scala index ec526b4..a5cf39c 100644 --- a/tests/src/test/scala/whisk/core/cli/test/WskBasicUsageTests.scala +++ b/tests/src/test/scala/whisk/core/cli/test/WskBasicUsageTests.scala @@ -151,7 +151,7 @@ class WskBasicUsageTests val tmpwskprops = File.createTempFile("wskprops", ".tmp") try { val env = Map("WSK_CONFIG_FILE" -> tmpwskprops.getAbsolutePath()) - val apihost = s"http://${WhiskProperties.getControllerHost}:${WhiskProperties.getControllerPort}" + val apihost = s"http://${WhiskProperties.getBaseControllerAddress()}" wsk.cli(Seq("property", "set", "--apihost", apihost), env = env) val rr = wsk.cli(Seq("property", "get", "--apibuild", "-i"), env = env) rr.stdout should not include regex("""whisk API build\s*Unknown""") diff --git a/tests/src/test/scala/whisk/core/container/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/container/test/ContainerPoolTests.scala index 43fbae1..42d2e42 100644 --- a/tests/src/test/scala/whisk/core/container/test/ContainerPoolTests.scala +++ b/tests/src/test/scala/whisk/core/container/test/ContainerPoolTests.scala @@ -41,6 +41,7 @@ import whisk.core.container.ContainerPool import whisk.core.entity.AuthKey import whisk.core.entity.EntityName import whisk.core.entity.EntityPath +import whisk.core.entity.InstanceId import whisk.core.entity.WhiskAction import whisk.core.entity.WhiskAuthStore import whisk.core.entity.WhiskEntityStore @@ -72,7 +73,7 @@ class ContainerPoolTests extends FlatSpec assert(config.isValid) - val pool = new ContainerPool(config, 0, true, true) + val pool = new ContainerPool(config, InstanceId(0), true, true) pool.logDir = "/tmp" val datastore = WhiskEntityStore.datastore(config) diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala index e262525..d386597 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala @@ -77,6 +77,7 @@ class ContainerPoolTests extends TestKit(ActorSystem("ContainerPool")) Identity(Subject(), invocationNamespace, AuthKey(), Set()), ActivationId(), invocationNamespace.toPath, + InstanceId(0), None) Run(action, message) } diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala index f7a7e12..3f3f24b 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala @@ -79,6 +79,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys")) Identity(Subject(), invocationNamespace, AuthKey(), Set()), ActivationId(), invocationNamespace.toPath, + InstanceId(0), None) /* @@ -130,7 +131,7 @@ class ContainerProxyTests extends TestKit(ActorSystem("ContainerProxys")) } /** Creates an inspectable version of the ack method, which records all calls in a buffer */ - def createAcker = LoggedFunction { (_: TransactionId, _: WhiskActivation) => Future.successful(()) } + def createAcker = LoggedFunction { (_: TransactionId, _: WhiskActivation, _: InstanceId) => Future.successful(()) } /** Creates an inspectable factory */ def createFactory(response: Future[Container]) = LoggedFunction { diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala index 276e15f..8d8dc1d 100644 --- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala +++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala @@ -37,8 +37,8 @@ import whisk.common.TransactionCounter import whisk.common.TransactionId import whisk.core.WhiskConfig import whisk.core.connector.ActivationMessage -import whisk.core.controller.WhiskServices import whisk.core.controller.RestApiCommons +import whisk.core.controller.WhiskServices import whisk.core.database.DocumentFactory import whisk.core.database.test.DbUtils import whisk.core.entitlement._ @@ -59,6 +59,10 @@ protected trait ControllerTestCommon with HttpService with StreamLogging { + override val instance = InstanceId(0) + override val numberOfInstances = 1 + val activeAckTopicIndex = InstanceId(0) + override val actorRefFactory = null implicit val routeTestTimeout = RouteTestTimeout(90 seconds) diff --git a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala index 7ad2eef..355c2ec 100644 --- a/tests/src/test/scala/whisk/core/database/test/DbUtils.scala +++ b/tests/src/test/scala/whisk/core/database/test/DbUtils.scala @@ -38,15 +38,16 @@ import whisk.core.database.ArtifactStore import whisk.core.database.CouchDbRestClient import whisk.core.database.DocumentFactory import whisk.core.database.NoDocumentException +import whisk.core.entity.AuthKey import whisk.core.entity.DocId import whisk.core.entity.DocInfo import whisk.core.entity.EntityPath import whisk.core.entity.Identity +import whisk.core.entity.InstanceId import whisk.core.entity.WhiskDocument import whisk.core.entity.WhiskEntityQueries import whisk.core.entity.types.AuthStore import whisk.core.entity.types.EntityStore -import whisk.core.entity.AuthKey /** * WARNING: the put/get/del operations in this trait operate directly on the datastore, @@ -56,6 +57,8 @@ import whisk.core.entity.AuthKey */ trait DbUtils extends TransactionCounter { implicit val dbOpTimeout = 15 seconds + override val numberOfInstances = 1 + override val instance = InstanceId(0) val docsToDelete = ListBuffer[(ArtifactStore[_], DocInfo)]() case class RetryOp() extends Throwable diff --git a/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala b/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala index f867b75..3084270 100644 --- a/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala +++ b/tests/src/test/scala/whisk/core/dispatcher/test/DispatcherTests.scala @@ -65,7 +65,7 @@ class DispatcherTests val content = JsObject("payload" -> JsNumber(count)) val user = WhiskAuth(Subject(), AuthKey()).toIdentity val path = FullyQualifiedEntityName(EntityPath("test"), EntityName(s"count-$count"), Some(SemVer())) - val msg = Message(TransactionId.testing, path, DocRevision.empty, user, ActivationId(), EntityPath(user.subject.asString), Some(content)) + val msg = Message(TransactionId.testing, path, DocRevision.empty, user, ActivationId(), EntityPath(user.subject.asString), InstanceId(0), Some(content)) connector.send(msg) } diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala index aac1c43..d3feabe 100644 --- a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala +++ b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala @@ -61,6 +61,7 @@ import whisk.core.entity.EntityPath import whisk.core.entity.ExecManifest import whisk.core.entity.FullyQualifiedEntityName import whisk.core.entity.Identity +import whisk.core.entity.InstanceId import whisk.core.entity.Secret import whisk.core.entity.Subject import whisk.core.entity.UUID @@ -224,6 +225,7 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision")) user = Identity(Subject("unhealthyInvokerCheck"), EntityName("unhealthyInvokerCheck"), AuthKey(UUID(), Secret()), Set[Privilege]()), activationId = new ActivationIdGenerator {}.make(), activationNamespace = EntityPath("guest"), + rootControllerIndex = InstanceId(0), content = None) val msg = ActivationRequest(activationMessage, invokerName) @@ -241,7 +243,7 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision")) // offline -> unhealthy it should "start unhealthy, go offline if the state times out and goes unhealthy on a successful ping again" in { val pool = TestProbe() - val invoker = pool.system.actorOf(InvokerActor.props) + val invoker = pool.system.actorOf(InvokerActor.props(InstanceId(0))) within(timeout.duration) { pool.send(invoker, SubscribeTransitionCallBack(pool.ref)) @@ -257,7 +259,7 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision")) // unhealthy -> healthy it should "goto healthy again, if unhealthy and error buffer has enough successful invocations" in { val pool = TestProbe() - val invoker = pool.system.actorOf(InvokerActor.props) + val invoker = pool.system.actorOf(InvokerActor.props(InstanceId(0))) within(timeout.duration) { pool.send(invoker, SubscribeTransitionCallBack(pool.ref)) @@ -280,7 +282,7 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision")) // offline -> unhealthy it should "go offline when unhealthy, if the state times out and go unhealthy on a successful ping again" in { val pool = TestProbe() - val invoker = pool.system.actorOf(InvokerActor.props) + val invoker = pool.system.actorOf(InvokerActor.props(InstanceId(0))) within(timeout.duration) { pool.send(invoker, SubscribeTransitionCallBack(pool.ref)) @@ -295,7 +297,7 @@ class InvokerSupervisionTests extends TestKit(ActorSystem("InvokerSupervision")) } it should "start timer to send testactions when unhealthy" in { - val invoker = TestFSMRef(new InvokerActor) + val invoker = TestFSMRef(new InvokerActor(InstanceId(0))) invoker.stateName shouldBe UnHealthy invoker.isTimerActive(InvokerActor.timerName) shouldBe true diff --git a/tools/admin/wskadmin b/tools/admin/wskadmin index 659e81d..3e4b342 100755 --- a/tools/admin/wskadmin +++ b/tools/admin/wskadmin @@ -121,7 +121,7 @@ def parseArgs(): subparser = propmenu.add_subparsers(title='available commands', dest='subcmd') subcmd = subparser.add_parser('get', help='get logs') - subcmd.add_argument('components', help='components, one or more of [controller, invokerN] where N is invoker index', nargs='*', default=['controller', 'invoker0']) + subcmd.add_argument('components', help='components, one or more of [controllerN, invokerN] where N is the instance', nargs='*', default=['controller0', 'invoker0']) subcmd.add_argument('-t', '--tid', help='retrieve logs for the transaction id') subcmd.add_argument('-g', '--grep', help='retrieve logs that match grep expression') diff --git a/tools/build/checkLogs.py b/tools/build/checkLogs.py index b7f9e5d..cf315b7 100755 --- a/tools/build/checkLogs.py +++ b/tools/build/checkLogs.py @@ -91,7 +91,7 @@ if __name__ == "__main__": ("db-rules.log", [ partial(database_has_at_most_x_entries, 0) ]), ("db-triggers.log", [ partial(database_has_at_most_x_entries, 0) ]), # Assert that stdout of the container is correctly piped and empty - ("controller.log", [ partial(file_has_at_most_x_bytes, 0) ]), + ("controller0.log", [ partial(file_has_at_most_x_bytes, 0) ]), ("invoker0.log", [ partial(file_has_at_most_x_bytes, 0) ]) ] -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].