This is an automated email from the ASF dual-hosted git repository. cbickel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new c184cc6 Put active-ack consumers in their own consumer-groups. (#3337) c184cc6 is described below commit c184cc6fdabbe857bfc1bd33aa7acda07179d1e5 Author: Markus Thömmes <markusthoem...@me.com> AuthorDate: Mon Feb 26 17:26:12 2018 +0100 Put active-ack consumers in their own consumer-groups. (#3337) Just like with the invoker consumers, it doesn't make sense to have those in one group as crash of one will cause a rebalancing pause for the other. --- .../scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala | 8 +++----- .../whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala | 7 ++----- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala index dfa57bb..de2e56e 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala @@ -205,14 +205,12 @@ class ContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId) * Subscribes to active acks (completion messages from the invokers), and * registers a handler for received active acks from invokers. */ + val activeAckTopic = s"completed${controllerInstance.toInt}" val maxActiveAcksPerPoll = 128 val activeAckPollDuration = 1.second private val activeAckConsumer = - messagingProvider.getConsumer( - config, - "completions", - s"completed${controllerInstance.toInt}", - maxPeek = maxActiveAcksPerPoll) + messagingProvider.getConsumer(config, activeAckTopic, activeAckTopic, maxPeek = maxActiveAcksPerPoll) + val activationFeed = actorSystem.actorOf(Props { new MessageFeed( "activeack", diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala index 607670d..9b7aaec 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala @@ -205,14 +205,11 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins * Subscribes to active acks (completion messages from the invokers), and * registers a handler for received active acks from invokers. */ + private val activeAckTopic = s"completed${controllerInstance.toInt}" private val maxActiveAcksPerPoll = 128 private val activeAckPollDuration = 1.second private val activeAckConsumer = - messagingProvider.getConsumer( - config, - "completions", - s"completed${controllerInstance.toInt}", - maxPeek = maxActiveAcksPerPoll) + messagingProvider.getConsumer(config, activeAckTopic, activeAckTopic, maxPeek = maxActiveAcksPerPoll) private val activationFeed = actorSystem.actorOf(Props { new MessageFeed( -- To stop receiving notification emails like this one, please contact cbic...@apache.org.