This is an automated email from the ASF dual-hosted git repository. rabbah pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 78eb1f6 Add cache invalidation between controllers (#2624) 78eb1f6 is described below commit 78eb1f6f1fb5aae2dd12d1371391695fd92fe3af Author: Christian Bickel <git...@cbickel.de> AuthorDate: Thu Aug 31 15:31:15 2017 +0200 Add cache invalidation between controllers (#2624) CUD operations will generate cache invalidation traffic on dedicated message bus topic. These are picked up by other controllers which in turn invalidate their caches. --- ansible/roles/kafka/tasks/deploy.yml | 5 +- .../whisk/core/database/DocumentFactory.scala | 16 +- .../MultipleReadersSingleWriterCache.scala | 48 +++-- .../core/database/RemoteCacheInvalidation.scala | 88 ++++++++++ .../main/scala/whisk/core/entity/CacheKey.scala | 55 ++++++ .../main/scala/whisk/core/entity/Identity.scala | 6 +- .../main/scala/whisk/core/entity/WhiskAction.scala | 4 +- .../scala/whisk/core/entity/WhiskActivation.scala | 1 - .../scala/whisk/core/entity/WhiskPackage.scala | 1 - .../main/scala/whisk/core/entity/WhiskRule.scala | 1 - .../scala/whisk/core/entity/WhiskTrigger.scala | 1 - .../main/scala/whisk/core/controller/Actions.scala | 4 + .../scala/whisk/core/controller/ApiUtils.scala | 7 +- .../scala/whisk/core/controller/Controller.scala | 7 + .../scala/whisk/core/controller/Packages.scala | 4 + .../scala/whisk/core/controller/RestAPIs.scala | 39 +++-- .../main/scala/whisk/core/controller/Rules.scala | 4 + .../scala/whisk/core/controller/Triggers.scala | 6 +- .../core/controller/actions/SequenceActions.scala | 2 +- .../core/loadBalancer/LoadBalancerService.scala | 20 ++- .../scala/whisk/core/invoker/InvokerReactive.scala | 2 +- .../src/test/scala/ha/CacheInvalidationTests.scala | 179 +++++++++++++++++++ .../core/controller/test/ActionsApiTests.scala | 6 +- .../core/controller/test/AuthenticateTests.scala | 4 +- .../controller/test/ControllerTestCommon.scala | 8 +- .../MultipleReadersSingleWriterCacheTests.scala | 195 +++------------------ .../whisk/core/entity/test/DatastoreTests.scala | 3 + .../whisk/core/entity/test/MigrationEntities.scala | 2 - 28 files changed, 477 insertions(+), 241 deletions(-) diff --git a/ansible/roles/kafka/tasks/deploy.yml b/ansible/roles/kafka/tasks/deploy.yml index fb0e2b7..615c496 100644 --- a/ansible/roles/kafka/tasks/deploy.yml +++ b/ansible/roles/kafka/tasks/deploy.yml @@ -51,10 +51,13 @@ delay: 5 - name: create the health topic - shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic health --replication-factor 1 --partitions 1 --zookeeper {{ ansible_host }}:{{ zookeeper.port }} --config retention.bytes={{ kafka.topics.health.retentionBytes }} --config retention.ms={{ kafka.topics.health.retentionMS }} --config segment.bytes={{ kafka.topics.health.segmentBytes }}'" + shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic {{ item }} --replication-factor 1 --partitions 1 --zookeeper {{ ansible_host }}:{{ zookeeper.port }} --config retention.bytes={{ kafka.topics.health.retentionBytes }} --config retention.ms={{ kafka.topics.health.retentionMS }} --config segment.bytes={{ kafka.topics.health.segmentBytes }}'" register: command_result failed_when: "not ('Created topic' in command_result.stdout or 'already exists' in command_result.stdout)" changed_when: "'Created topic' in command_result.stdout" + with_items: + - health + - cacheInvalidation - name: create the active-ack topics shell: "docker exec kafka bash -c 'unset JMX_PORT; kafka-topics.sh --create --topic completed{{ item.0 }} --replication-factor 1 --partitions 1 --zookeeper {{ ansible_host }}:{{ zookeeper.port }} --config retention.bytes={{ kafka.topics.completed.retentionBytes }} --config retention.ms={{ kafka.topics.completed.retentionMS }} --config segment.bytes={{ kafka.topics.completed.segmentBytes }}'" diff --git a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala index fd3f41b..451bae9 100644 --- a/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala +++ b/common/scala/src/main/scala/whisk/core/database/DocumentFactory.scala @@ -30,6 +30,7 @@ import akka.stream.IOResult import akka.stream.scaladsl.StreamConverters import spray.json.JsObject import whisk.common.TransactionId +import whisk.core.entity.CacheKey import whisk.core.entity.DocId import whisk.core.entity.DocInfo import whisk.core.entity.DocRevision @@ -129,10 +130,11 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] { * @param db the datastore client to fetch entity from * @param doc the entity to store * @param transid the transaction id for logging + * @param notifier an optional callback when cache changes * @return Future[DocInfo] with completion to DocInfo containing the save document id and revision */ def put[Wsuper >: W](db: ArtifactStore[Wsuper], doc: W)( - implicit transid: TransactionId): Future[DocInfo] = { + implicit transid: TransactionId, notifier: Option[CacheChangeNotification]): Future[DocInfo] = { Try { require(db != null, "db undefined") require(doc != null, "doc undefined") @@ -140,7 +142,7 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] { implicit val logger = db.logging implicit val ec = db.executionContext - val key = cacheKeyForUpdate(doc) + val key = CacheKey(doc) cacheUpdate(doc, key, db.put(doc) map { docinfo => doc match { @@ -157,7 +159,7 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] { } def attach[Wsuper >: W](db: ArtifactStore[Wsuper], doc: DocInfo, attachmentName: String, contentType: ContentType, bytes: InputStream)( - implicit transid: TransactionId): Future[DocInfo] = { + implicit transid: TransactionId, notifier: Option[CacheChangeNotification]): Future[DocInfo] = { Try { require(db != null, "db undefined") @@ -166,7 +168,7 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] { implicit val logger = db.logging implicit val ec = db.executionContext - val key = doc.id.asDocInfo + val key = CacheKey(doc.id.asDocInfo) // invalidate the key because attachments update the revision; // do not cache the new attachment (controller does not need it) cacheInvalidate(key, { @@ -180,7 +182,7 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] { } def del[Wsuper >: W](db: ArtifactStore[Wsuper], doc: DocInfo)( - implicit transid: TransactionId): Future[Boolean] = { + implicit transid: TransactionId, notifier: Option[CacheChangeNotification]): Future[Boolean] = { Try { require(db != null, "db undefined") require(doc != null, "doc undefined") @@ -188,7 +190,7 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] { implicit val logger = db.logging implicit val ec = db.executionContext - val key = doc.id.asDocInfo + val key = CacheKey(doc.id.asDocInfo) cacheInvalidate(key, db.del(doc)) } match { case Success(f) => f @@ -221,7 +223,7 @@ trait DocumentFactory[W] extends MultipleReadersSingleWriterCache[W, DocInfo] { implicit val logger = db.logging implicit val ec = db.executionContext val key = doc.asDocInfo(rev) - _ => cacheLookup(key, db.get[W](key), fromCache) + _ => cacheLookup(CacheKey(key), db.get[W](key), fromCache) } match { case Success(f) => f case Failure(t) => Future.failed(t) diff --git a/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala b/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala index d824948..f767c60 100644 --- a/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala +++ b/common/scala/src/main/scala/whisk/core/database/MultipleReadersSingleWriterCache.scala @@ -22,22 +22,24 @@ package whisk.core.database -import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.ConcurrentMap import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise +import scala.language.implicitConversions import scala.util.Failure import scala.util.Success -import scala.language.implicitConversions +import scala.util.control.NonFatal + +import com.github.benmanes.caffeine.cache.Caffeine import whisk.common.Logging import whisk.common.LoggingMarkers import whisk.common.TransactionId -import com.github.benmanes.caffeine.cache.Caffeine -import scala.util.control.NonFatal +import whisk.core.entity.CacheKey /** * A cache that allows multiple readers, but only a single writer, at @@ -89,6 +91,8 @@ private object MultipleReadersSingleWriterCache { case class StaleRead(actualState: State) extends Exception(s"Attempted read of invalid entry due to $actualState.") } +trait CacheChangeNotification extends (CacheKey => Future[Unit]) + trait MultipleReadersSingleWriterCache[W, Winfo] { import MultipleReadersSingleWriterCache._ import MultipleReadersSingleWriterCache.State._ @@ -96,9 +100,6 @@ trait MultipleReadersSingleWriterCache[W, Winfo] { /** Subclasses: Toggle this to enable/disable caching for your entity type. */ protected val cacheEnabled = true - /** Subclasses: tell me what key to use for updates. */ - protected def cacheKeyForUpdate(w: W): Any - private object Entry { def apply(transid: TransactionId, state: State, value: Option[Future[W]]): Entry = { new Entry(transid, new AtomicReference(state), value) @@ -155,11 +156,13 @@ trait MultipleReadersSingleWriterCache[W, Winfo] { * This method posts a delete to the backing store, and either directly invalidates the cache entry * or informs any outstanding transaction that it must invalidate the cache on completion. */ - protected def cacheInvalidate[R](key: Any, invalidator: => Future[R])( - implicit ec: ExecutionContext, transid: TransactionId, logger: Logging): Future[R] = { + protected def cacheInvalidate[R](key: CacheKey, invalidator: => Future[R])( + implicit ec: ExecutionContext, transid: TransactionId, logger: Logging, notifier: Option[CacheChangeNotification]): Future[R] = { if (cacheEnabled) { logger.info(this, s"invalidating $key on delete") + notifier.foreach(_(key)) + // try inserting our desired entry... val desiredEntry = Entry(transid, InvalidateInProgress, None) cache(key)(desiredEntry) flatMap { actualEntry => @@ -210,7 +213,7 @@ trait MultipleReadersSingleWriterCache[W, Winfo] { /** * This method may initiate a read from the backing store, and potentially stores the result in the cache. */ - protected def cacheLookup[Wsuper >: W](key: Any, generator: => Future[W], fromCache: Boolean = cacheEnabled)( + protected def cacheLookup[Wsuper >: W](key: CacheKey, generator: => Future[W], fromCache: Boolean = cacheEnabled)( implicit ec: ExecutionContext, transid: TransactionId, logger: Logging): Future[W] = { if (fromCache) { val promise = Promise[W] // this promise completes with the generator value @@ -253,9 +256,12 @@ trait MultipleReadersSingleWriterCache[W, Winfo] { /** * This method posts an update to the backing store, and potentially stores the result in the cache. */ - protected def cacheUpdate(doc: W, key: Any, generator: => Future[Winfo])( - implicit ec: ExecutionContext, transid: TransactionId, logger: Logging): Future[Winfo] = { + protected def cacheUpdate(doc: W, key: CacheKey, generator: => Future[Winfo])( + implicit ec: ExecutionContext, transid: TransactionId, logger: Logging, notifier: Option[CacheChangeNotification]): Future[Winfo] = { if (cacheEnabled) { + + notifier.foreach(_(key)) + // try inserting our desired entry... val desiredEntry = Entry(transid, WriteInProgress, Some(Future.successful(doc))) cache(key)(desiredEntry) flatMap { actualEntry => @@ -288,10 +294,16 @@ trait MultipleReadersSingleWriterCache[W, Winfo] { def cacheSize: Int = cache.size /** + * This method removes an entry from the cache immediately. You can use this method + * if you do not need to perform any updates on the backing store but only to the cache. + */ + protected[database] def removeId(key: CacheKey)(implicit ec: ExecutionContext): Unit = cache.remove(key) + + /** * Log a cache hit * */ - private def makeNoteOfCacheHit(key: Any)(implicit transid: TransactionId, logger: Logging) = { + private def makeNoteOfCacheHit(key: CacheKey)(implicit transid: TransactionId, logger: Logging) = { transid.mark(this, LoggingMarkers.DATABASE_CACHE_HIT, s"[GET] serving from cache: $key")(logger) } @@ -299,7 +311,7 @@ trait MultipleReadersSingleWriterCache[W, Winfo] { * Log a cache miss * */ - private def makeNoteOfCacheMiss(key: Any)(implicit transid: TransactionId, logger: Logging) = { + private def makeNoteOfCacheMiss(key: CacheKey)(implicit transid: TransactionId, logger: Logging) = { transid.mark(this, LoggingMarkers.DATABASE_CACHE_MISS, s"[GET] serving from datastore: $key")(logger) } @@ -308,7 +320,7 @@ trait MultipleReadersSingleWriterCache[W, Winfo] { * 1. either cache the result if there is no intervening delete or update, or * 2. invalidate the cache because there was an intervening delete or update. */ - private def listenForReadDone(key: Any, entry: Entry, generator: => Future[W], promise: Promise[W])( + private def listenForReadDone(key: CacheKey, entry: Entry, generator: => Future[W], promise: Promise[W])( implicit ec: ExecutionContext, transid: TransactionId, logger: Logging): Unit = { generator onComplete { @@ -362,7 +374,7 @@ trait MultipleReadersSingleWriterCache[W, Winfo] { * 1. either cache the result if there is no intervening delete or update, or * 2. invalidate the cache cache because there was an intervening delete or update */ - private def listenForWriteDone(key: Any, entry: Entry, generator: => Future[Winfo])( + private def listenForWriteDone(key: CacheKey, entry: Entry, generator: => Future[Winfo])( implicit ec: ExecutionContext, transid: TransactionId, logger: Logging): Future[Winfo] = { generator andThen { @@ -398,7 +410,7 @@ trait MultipleReadersSingleWriterCache[W, Winfo] { } /** Immediately invalidates the given entry. */ - private def invalidateEntry(key: Any, entry: Entry)( + private def invalidateEntry(key: CacheKey, entry: Entry)( implicit transid: TransactionId, logger: Logging): Unit = { logger.info(this, s"invalidating $key") entry.invalidate() @@ -406,7 +418,7 @@ trait MultipleReadersSingleWriterCache[W, Winfo] { } /** Invalidates the given entry after a given invalidator completes. */ - private def invalidateEntryAfter[R](invalidator: => Future[R], key: Any, entry: Entry)( + private def invalidateEntryAfter[R](invalidator: => Future[R], key: CacheKey, entry: Entry)( implicit ec: ExecutionContext, transid: TransactionId, logger: Logging): Future[R] = { entry.grabInvalidationLock() diff --git a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala new file mode 100644 index 0000000..902bf7d --- /dev/null +++ b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.database + +import java.nio.charset.StandardCharsets + +import scala.concurrent.Future +import scala.concurrent.duration.DurationInt +import scala.util.Failure +import scala.util.Success +import scala.util.Try + +import akka.actor.ActorSystem +import akka.actor.Props +import spray.json._ +import whisk.common.Logging +import whisk.core.WhiskConfig +import whisk.core.connector.Message +import whisk.core.connector.MessageFeed +import whisk.core.connector.MessagingProvider +import whisk.core.entity.CacheKey +import whisk.core.entity.InstanceId +import whisk.core.entity.WhiskAction +import whisk.core.entity.WhiskPackage +import whisk.core.entity.WhiskRule +import whisk.core.entity.WhiskTrigger +import whisk.spi.SpiLoader + +case class CacheInvalidationMessage(key: CacheKey, instanceId: String) extends Message { + override def serialize = CacheInvalidationMessage.serdes.write(this).compactPrint +} + +object CacheInvalidationMessage extends DefaultJsonProtocol { + def parse(msg: String) = Try(serdes.read(msg.parseJson)) + implicit val serdes = jsonFormat(CacheInvalidationMessage.apply _, "key", "instanceId") +} + +class RemoteCacheInvalidation(config: WhiskConfig, component: String, instance: InstanceId)(implicit logging: Logging, as: ActorSystem) { + + implicit private val ec = as.dispatcher + + private val topic = "cacheInvalidation" + private val instanceId = s"$component${instance.toInt}" + + private val msgProvider = SpiLoader.get[MessagingProvider]() + private val cacheInvalidationConsumer = msgProvider.getConsumer(config, s"$topic$instanceId", topic, maxPeek = 128) + private val cacheInvalidationProducer = msgProvider.getProducer(config, ec) + + def notifyOtherInstancesAboutInvalidation(key: CacheKey): Future[Unit] = { + cacheInvalidationProducer.send(topic, CacheInvalidationMessage(key, instanceId)).map(_ => Unit) + } + + private val invalidationFeed = as.actorOf(Props { + new MessageFeed("cacheInvalidation", logging, cacheInvalidationConsumer, cacheInvalidationConsumer.maxPeek, 1.second, removeFromLocalCache) + }) + + private def removeFromLocalCache(bytes: Array[Byte]): Future[Unit] = Future { + val raw = new String(bytes, StandardCharsets.UTF_8) + + CacheInvalidationMessage.parse(raw) match { + case Success(msg: CacheInvalidationMessage) => { + if (msg.instanceId != instanceId) { + WhiskAction.removeId(msg.key) + WhiskPackage.removeId(msg.key) + WhiskRule.removeId(msg.key) + WhiskTrigger.removeId(msg.key) + } + } + case Failure(t) => logging.error(this, s"failed processing message: $raw with $t") + } + invalidationFeed ! MessageFeed.Processed + } +} diff --git a/common/scala/src/main/scala/whisk/core/entity/CacheKey.scala b/common/scala/src/main/scala/whisk/core/entity/CacheKey.scala new file mode 100644 index 0000000..de0f6fe --- /dev/null +++ b/common/scala/src/main/scala/whisk/core/entity/CacheKey.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.entity + +import spray.json.DefaultJsonProtocol + +class UnsupportedCacheKeyTypeException(msg: String) extends Exception(msg) + +/** + * A key that is used to store an entity on the cache. + * + * @param mainId The main part for the key to be used. For example this is the id of a document. + * @param secondaryId A second part of the key. For example the revision of an entity. This part + * of the key will not be written to the logs. + */ +case class CacheKey(mainId: String, secondaryId: Option[String]) { + override def toString() = { + s"CacheKey($mainId)" + } +} + +object CacheKey extends DefaultJsonProtocol { + implicit val serdes = jsonFormat2(CacheKey.apply) + + def apply(key: Any): CacheKey = { + key match { + case e: EntityName => CacheKey(e.asString, None) + case a: AuthKey => CacheKey(a.uuid.asString, Some(a.key.asString)) + case d: DocInfo => { + val revision = if (d.rev.empty) None else Some(d.rev.asString) + CacheKey(d.id.asString, revision) + } + case w: WhiskEntity => CacheKey(w.docid.asDocInfo) + case s: String => CacheKey(s, None) + case others => { + throw new UnsupportedCacheKeyTypeException(s"Unable to apply the entity ${others.getClass} on CacheKey.") + } + } + } +} diff --git a/common/scala/src/main/scala/whisk/core/entity/Identity.scala b/common/scala/src/main/scala/whisk/core/entity/Identity.scala index d272f7a..32093c8 100644 --- a/common/scala/src/main/scala/whisk/core/entity/Identity.scala +++ b/common/scala/src/main/scala/whisk/core/entity/Identity.scala @@ -45,7 +45,6 @@ object Identity extends MultipleReadersSingleWriterCache[Identity, DocInfo] with private val viewName = "subjects/identities" override val cacheEnabled = true - override def cacheKeyForUpdate(i: Identity) = i.authkey implicit val serdes = jsonFormat5(Identity.apply) /** @@ -58,8 +57,9 @@ object Identity extends MultipleReadersSingleWriterCache[Identity, DocInfo] with implicit val logger: Logging = datastore.logging implicit val ec = datastore.executionContext val ns = namespace.asString + val key = CacheKey(namespace) - cacheLookup(ns, { + cacheLookup(key, { list(datastore, List(ns), limit = 1) map { list => list.length match { case 1 => @@ -80,7 +80,7 @@ object Identity extends MultipleReadersSingleWriterCache[Identity, DocInfo] with implicit val logger: Logging = datastore.logging implicit val ec = datastore.executionContext - cacheLookup(authkey, { + cacheLookup(CacheKey(authkey), { list(datastore, List(authkey.uuid.asString, authkey.key.asString)) map { list => list.length match { case 1 => diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala index c4e90f4..f788ac4 100644 --- a/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala +++ b/common/scala/src/main/scala/whisk/core/entity/WhiskAction.scala @@ -30,6 +30,7 @@ import spray.json.DefaultJsonProtocol._ import whisk.common.TransactionId import whisk.core.database.ArtifactStore import whisk.core.database.DocumentFactory +import whisk.core.database.CacheChangeNotification import whisk.core.entity.Attachments._ import whisk.core.entity.types.EntityStore @@ -215,11 +216,10 @@ object WhiskAction override implicit val serdes = jsonFormat(WhiskAction.apply, "namespace", "name", "exec", "parameters", "limits", "version", "publish", "annotations") override val cacheEnabled = true - override def cacheKeyForUpdate(w: WhiskAction) = w.docid.asDocInfo // overriden to store attached code override def put[A >: WhiskAction](db: ArtifactStore[A], doc: WhiskAction)( - implicit transid: TransactionId): Future[DocInfo] = { + implicit transid: TransactionId, notifier: Option[CacheChangeNotification]): Future[DocInfo] = { Try { require(db != null, "db undefined") diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala index b4f2f89..0fe0dcd 100644 --- a/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala +++ b/common/scala/src/main/scala/whisk/core/entity/WhiskActivation.scala @@ -117,5 +117,4 @@ object WhiskActivation // Caching activations doesn't make much sense in the common case as usually, // an activation is only asked for once. override val cacheEnabled = false - override def cacheKeyForUpdate(w: WhiskActivation) = w.docid.asDocInfo } diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskPackage.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskPackage.scala index e29f03f..ac73147 100644 --- a/common/scala/src/main/scala/whisk/core/entity/WhiskPackage.scala +++ b/common/scala/src/main/scala/whisk/core/entity/WhiskPackage.scala @@ -198,7 +198,6 @@ object WhiskPackage } override val cacheEnabled = true - override def cacheKeyForUpdate(w: WhiskPackage) = w.docid.asDocInfo } /** diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskRule.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskRule.scala index c91cc9c..749b6f5 100644 --- a/common/scala/src/main/scala/whisk/core/entity/WhiskRule.scala +++ b/common/scala/src/main/scala/whisk/core/entity/WhiskRule.scala @@ -233,7 +233,6 @@ object WhiskRule } override val cacheEnabled = false - override def cacheKeyForUpdate(w: WhiskRule) = w.docid.asDocInfo } object WhiskRuleResponse extends DefaultJsonProtocol { diff --git a/common/scala/src/main/scala/whisk/core/entity/WhiskTrigger.scala b/common/scala/src/main/scala/whisk/core/entity/WhiskTrigger.scala index 128a7e5..d2271c4 100644 --- a/common/scala/src/main/scala/whisk/core/entity/WhiskTrigger.scala +++ b/common/scala/src/main/scala/whisk/core/entity/WhiskTrigger.scala @@ -119,7 +119,6 @@ object WhiskTrigger override implicit val serdes = jsonFormat8(WhiskTrigger.apply) override val cacheEnabled = true - override def cacheKeyForUpdate(w: WhiskTrigger) = w.docid.asDocInfo } object WhiskTriggerPut extends DefaultJsonProtocol { diff --git a/core/controller/src/main/scala/whisk/core/controller/Actions.scala b/core/controller/src/main/scala/whisk/core/controller/Actions.scala index ba74de8..d4a59cc 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Actions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Actions.scala @@ -39,6 +39,7 @@ import spray.json.DefaultJsonProtocol._ import whisk.common.TransactionId import whisk.core.WhiskConfig import whisk.core.controller.actions.PostActionActivation +import whisk.core.database.CacheChangeNotification import whisk.core.database.NoDocumentException import whisk.core.entitlement._ import whisk.core.entity._ @@ -84,6 +85,9 @@ trait WhiskActionsApi /** Database service to CRUD actions. */ protected val entityStore: EntityStore + /** Notification service for cache invalidation. */ + protected implicit val cacheChangeNotification: Some[CacheChangeNotification] + /** Database service to get activations. */ protected val activationStore: ActivationStore diff --git a/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala b/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala index a52bd24..e6f3c51 100644 --- a/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala +++ b/core/controller/src/main/scala/whisk/core/controller/ApiUtils.scala @@ -22,22 +22,20 @@ import scala.concurrent.Future import scala.util.Failure import scala.util.Success +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.model.StatusCode import akka.http.scaladsl.model.StatusCodes.Conflict import akka.http.scaladsl.model.StatusCodes.InternalServerError import akka.http.scaladsl.model.StatusCodes.NotFound import akka.http.scaladsl.model.StatusCodes.OK -import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.server.Directives import akka.http.scaladsl.server.RequestContext import akka.http.scaladsl.server.RouteResult - import spray.json.DefaultJsonProtocol._ import spray.json.JsBoolean import spray.json.JsObject import spray.json.JsValue import spray.json.RootJsonFormat - import whisk.common.Logging import whisk.common.TransactionId import whisk.core.controller.PostProcess.PostProcessEntity @@ -46,6 +44,7 @@ import whisk.core.database.ArtifactStoreException import whisk.core.database.DocumentConflictException import whisk.core.database.DocumentFactory import whisk.core.database.DocumentTypeMismatchException +import whisk.core.database.CacheChangeNotification import whisk.core.database.NoDocumentException import whisk.core.entity.DocId import whisk.core.entity.WhiskDocument @@ -276,6 +275,7 @@ trait WriteOps extends Directives { postProcess: Option[PostProcessEntity[A]] = None)( implicit transid: TransactionId, format: RootJsonFormat[A], + notifier: Option[CacheChangeNotification], ma: Manifest[A]) = { // marker to return an existing doc with status OK rather than conflict if overwrite is false case class IdentityPut(self: A) extends Throwable @@ -347,6 +347,7 @@ trait WriteOps extends Directives { postProcess: Option[PostProcessEntity[A]] = None)( implicit transid: TransactionId, format: RootJsonFormat[A], + notifier: Option[CacheChangeNotification], ma: Manifest[A]) = { onComplete(factory.get(datastore, docid) flatMap { entity => diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala index 90d4023..4037e0d 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala @@ -29,11 +29,14 @@ import akka.http.scaladsl.server.Route import akka.stream.ActorMaterializer import spray.json._ import spray.json.DefaultJsonProtocol._ + import whisk.common.AkkaLogging import whisk.common.Logging import whisk.common.LoggingMarkers import whisk.common.TransactionId import whisk.core.WhiskConfig +import whisk.core.database.RemoteCacheInvalidation +import whisk.core.database.CacheChangeNotification import whisk.core.entitlement._ import whisk.core.entity._ import whisk.core.entity.ActivationId.ActivationIdGenerator @@ -96,6 +99,10 @@ class Controller( private implicit val authStore = WhiskAuthStore.datastore(whiskConfig) private implicit val entityStore = WhiskEntityStore.datastore(whiskConfig) private implicit val activationStore = WhiskActivationStore.datastore(whiskConfig) + private implicit val cacheChangeNotification = Some(new CacheChangeNotification { + val remoteCacheInvalidaton = new RemoteCacheInvalidation(whiskConfig, "controller", instance) + override def apply(k: CacheKey) = remoteCacheInvalidaton.notifyOtherInstancesAboutInvalidation(k) + }) // initialize backend services private implicit val loadBalancer = new LoadBalancerService(whiskConfig, instance, entityStore) diff --git a/core/controller/src/main/scala/whisk/core/controller/Packages.scala b/core/controller/src/main/scala/whisk/core/controller/Packages.scala index 925d5c1..cca510e 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Packages.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Packages.scala @@ -31,6 +31,7 @@ import spray.json._ import whisk.common.TransactionId import whisk.core.database.DocumentTypeMismatchException +import whisk.core.database.CacheChangeNotification import whisk.core.database.NoDocumentException import whisk.core.entitlement._ import whisk.core.entity._ @@ -46,6 +47,9 @@ trait WhiskPackagesApi extends WhiskCollectionAPI with ReferencedEntities { /** Database service to CRUD packages. */ protected val entityStore: EntityStore + /** Notification service for cache invalidation. */ + protected implicit val cacheChangeNotification: Some[CacheChangeNotification] + /** Route directives for API. The methods that are supported on packages. */ protected override lazy val entityOps = put | get | delete diff --git a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala index 0e374f6..f75750e 100644 --- a/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala +++ b/core/controller/src/main/scala/whisk/core/controller/RestAPIs.scala @@ -17,31 +17,31 @@ package whisk.core.controller +import scala.concurrent.ExecutionContext + import akka.actor.ActorSystem -import akka.stream.ActorMaterializer import akka.http.scaladsl.model.StatusCodes._ import akka.http.scaladsl.model.Uri +import akka.http.scaladsl.model.headers._ import akka.http.scaladsl.server.Directives import akka.http.scaladsl.server.Route import akka.http.scaladsl.model.headers._ import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import akka.stream.ActorMaterializer import spray.json._ import spray.json.DefaultJsonProtocol._ - -import scala.concurrent.ExecutionContext - +import whisk.common.Logging import whisk.common.TransactionId +import whisk.core.database.CacheChangeNotification import whisk.core.WhiskConfig import whisk.core.WhiskConfig.whiskVersionBuildno import whisk.core.WhiskConfig.whiskVersionDate -import whisk.core.entity.WhiskAuthStore -import whisk.common.Logging -import whisk.common.TransactionId -import whisk.core.entity._ -import whisk.core.entity.types._ import whisk.core.entitlement._ +import whisk.core.entity._ import whisk.core.entity.ActivationId.ActivationIdGenerator +import whisk.core.entity.WhiskAuthStore +import whisk.core.entity.types._ import whisk.core.loadBalancer.LoadBalancerService /** @@ -89,10 +89,10 @@ protected[controller] object RestApiCommons { Authenticate.requiredProperties ++ Collection.requiredProperties + import akka.http.scaladsl.model.HttpCharsets + import akka.http.scaladsl.model.MediaTypes.`application/json` import akka.http.scaladsl.unmarshalling.FromEntityUnmarshaller import akka.http.scaladsl.unmarshalling.Unmarshaller - import akka.http.scaladsl.model.MediaTypes.`application/json` - import akka.http.scaladsl.model.HttpCharsets /** * Extract an empty entity into a JSON object. This is useful for the @@ -140,6 +140,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)( implicit val entitlementProvider: EntitlementProvider, implicit val activationIdFactory: ActivationIdGenerator, implicit val loadBalancer: LoadBalancerService, + implicit val cacheChangeNotification: Some[CacheChangeNotification], implicit val activationStore: ActivationStore, implicit val whiskConfig: WhiskConfig) extends SwaggerDocs(Uri.Path(apiPath) / apiVersion, "apiv1swagger.json") @@ -208,12 +209,12 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)( private val web = new WebActionsApi(Seq("web"), this.WebApiDirectives) class NamespacesApi( - val apiPath: String, - val apiVersion: String)( - implicit override val entityStore: EntityStore, - override val entitlementProvider: EntitlementProvider, - override val executionContext: ExecutionContext, - override val logging: Logging) + val apiPath: String, + val apiVersion: String)( + implicit override val entityStore: EntityStore, + override val entitlementProvider: EntitlementProvider, + override val executionContext: ExecutionContext, + override val logging: Logging) extends WhiskNamespacesApi class ActionsApi( @@ -226,6 +227,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)( override val entitlementProvider: EntitlementProvider, override val activationIdFactory: ActivationIdGenerator, override val loadBalancer: LoadBalancerService, + override val cacheChangeNotification: Some[CacheChangeNotification], override val executionContext: ExecutionContext, override val logging: Logging, override val whiskConfig: WhiskConfig) @@ -250,6 +252,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)( override val entitlementProvider: EntitlementProvider, override val activationIdFactory: ActivationIdGenerator, override val loadBalancer: LoadBalancerService, + override val cacheChangeNotification: Some[CacheChangeNotification], override val executionContext: ExecutionContext, override val logging: Logging, override val whiskConfig: WhiskConfig) @@ -263,6 +266,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)( override val entitlementProvider: EntitlementProvider, override val activationIdFactory: ActivationIdGenerator, override val loadBalancer: LoadBalancerService, + override val cacheChangeNotification: Some[CacheChangeNotification], override val executionContext: ExecutionContext, override val logging: Logging, override val whiskConfig: WhiskConfig) @@ -277,6 +281,7 @@ class RestAPIVersion(config: WhiskConfig, apiPath: String, apiVersion: String)( override val activationStore: ActivationStore, override val activationIdFactory: ActivationIdGenerator, override val loadBalancer: LoadBalancerService, + override val cacheChangeNotification: Some[CacheChangeNotification], override val executionContext: ExecutionContext, override val logging: Logging, override val whiskConfig: WhiskConfig, diff --git a/core/controller/src/main/scala/whisk/core/controller/Rules.scala b/core/controller/src/main/scala/whisk/core/controller/Rules.scala index e65c845..6f657b2 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Rules.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Rules.scala @@ -30,6 +30,7 @@ import spray.json.DeserializationException import whisk.common.TransactionId import whisk.core.database.DocumentConflictException +import whisk.core.database.CacheChangeNotification import whisk.core.database.NoDocumentException import whisk.core.entity._ import whisk.core.entity.types.EntityStore @@ -54,6 +55,9 @@ trait WhiskRulesApi extends WhiskCollectionAPI with ReferencedEntities { /** JSON response formatter. */ import RestApiCommons.jsonDefaultResponsePrinter + /** Notification service for cache invalidation. */ + protected implicit val cacheChangeNotification: Some[CacheChangeNotification] + /** Path to Rules REST API. */ protected val rulesPath = "rules" diff --git a/core/controller/src/main/scala/whisk/core/controller/Triggers.scala b/core/controller/src/main/scala/whisk/core/controller/Triggers.scala index d77d981..c9d1444 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Triggers.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Triggers.scala @@ -45,6 +45,7 @@ import spray.json._ import spray.json.DefaultJsonProtocol.RootJsObjectFormat import whisk.common.TransactionId +import whisk.core.database.CacheChangeNotification import whisk.core.entitlement.Collection import whisk.core.entity.ActivationResponse import whisk.core.entity.EntityPath @@ -57,9 +58,9 @@ import whisk.core.entity.WhiskTrigger import whisk.core.entity.WhiskTriggerPut import whisk.core.entity.types.ActivationStore import whisk.core.entity.types.EntityStore -import whisk.http.ErrorResponse.terminate import whisk.core.entity.Identity import whisk.core.entity.FullyQualifiedEntityName +import whisk.http.ErrorResponse.terminate /** A trait implementing the triggers API. */ trait WhiskTriggersApi extends WhiskCollectionAPI { @@ -73,6 +74,9 @@ trait WhiskTriggersApi extends WhiskCollectionAPI { /** Database service to CRUD triggers. */ protected val entityStore: EntityStore + /** Notification service for cache invalidation. */ + protected implicit val cacheChangeNotification: Some[CacheChangeNotification] + /** Database service to get activations. */ protected val activationStore: ActivationStore diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala index 55bfac4..e60e1dd 100644 --- a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala @@ -160,7 +160,7 @@ protected[actions] trait SequenceActions { */ private def storeSequenceActivation(activation: WhiskActivation)(implicit transid: TransactionId): Unit = { logging.info(this, s"recording activation '${activation.activationId}'") - WhiskActivation.put(activationStore, activation) onComplete { + WhiskActivation.put(activationStore, activation)(transid, notifier = None) onComplete { case Success(id) => logging.info(this, s"recorded activation") case Failure(t) => logging.error(this, s"failed to record activation ${activation.activationId} with error ${t.getLocalizedMessage}") } diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala index b4c79ee..07ca455 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancerService.scala @@ -18,6 +18,8 @@ package whisk.core.loadBalancer import java.nio.charset.StandardCharsets + +import scala.annotation.tailrec import scala.concurrent.Await import scala.concurrent.ExecutionContext import scala.concurrent.Future @@ -25,31 +27,33 @@ import scala.concurrent.Promise import scala.concurrent.duration.DurationInt import scala.util.Failure import scala.util.Success + import org.apache.kafka.clients.producer.RecordMetadata + import akka.actor.ActorRefFactory import akka.actor.ActorSystem import akka.actor.Props -import akka.pattern.ask import akka.util.Timeout +import akka.pattern.ask + import whisk.common.Logging import whisk.common.LoggingMarkers import whisk.common.TransactionId import whisk.core.WhiskConfig import whisk.core.WhiskConfig._ -import whisk.core.connector.MessagingProvider import whisk.core.connector.{ ActivationMessage, CompletionMessage } import whisk.core.connector.MessageFeed import whisk.core.connector.MessageProducer +import whisk.core.connector.MessagingProvider import whisk.core.database.NoDocumentException import whisk.core.entity.{ ActivationId, WhiskActivation } -import whisk.core.entity.InstanceId +import whisk.core.entity.EntityName import whisk.core.entity.ExecutableWhiskAction +import whisk.core.entity.Identity +import whisk.core.entity.InstanceId import whisk.core.entity.UUID import whisk.core.entity.WhiskAction import whisk.core.entity.types.EntityStore -import scala.annotation.tailrec -import whisk.core.entity.EntityName -import whisk.core.entity.Identity import whisk.spi.SpiLoader trait LoadBalancer { @@ -162,9 +166,9 @@ class LoadBalancerService( private def createTestActionForInvokerHealth(db: EntityStore, action: WhiskAction): Future[Unit] = { implicit val tid = TransactionId.loadbalancer WhiskAction.get(db, action.docid).flatMap { oldAction => - WhiskAction.put(db, action.revision(oldAction.rev)) + WhiskAction.put(db, action.revision(oldAction.rev))(tid, notifier = None) }.recover { - case _: NoDocumentException => WhiskAction.put(db, action) + case _: NoDocumentException => WhiskAction.put(db, action)(tid, notifier = None) }.map(_ => {}).andThen { case Success(_) => logging.info(this, "test action for invoker health now exists") case Failure(e) => logging.error(this, s"error creating test action for invoker health: $e") diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala index 1f64625..0f8a2cf 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala @@ -140,7 +140,7 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa val store = (tid: TransactionId, activation: WhiskActivation) => { implicit val transid = tid logging.info(this, "recording the activation result to the data store") - WhiskActivation.put(activationStore, activation).andThen { + WhiskActivation.put(activationStore, activation)(tid, notifier = None).andThen { case Success(id) => logging.info(this, s"recorded activation") case Failure(t) => logging.error(this, s"failed to record activation") } diff --git a/tests/src/test/scala/ha/CacheInvalidationTests.scala b/tests/src/test/scala/ha/CacheInvalidationTests.scala new file mode 100644 index 0000000..ecda623 --- /dev/null +++ b/tests/src/test/scala/ha/CacheInvalidationTests.scala @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ha + +import scala.concurrent.Await +import scala.concurrent.duration.DurationInt + +import org.junit.runner.RunWith +import org.scalatest.FlatSpec +import org.scalatest.Matchers +import org.scalatest.junit.JUnitRunner + +import akka.http.scaladsl.Http +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import akka.http.scaladsl.marshalling.Marshal +import akka.http.scaladsl.model.HttpMethods +import akka.http.scaladsl.model.HttpRequest +import akka.http.scaladsl.model.RequestEntity +import akka.http.scaladsl.model.StatusCodes +import akka.http.scaladsl.model.Uri +import akka.http.scaladsl.model.headers.Authorization +import akka.http.scaladsl.model.headers.BasicHttpCredentials +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.stream.ActorMaterializer +import common.WhiskProperties +import common.WskActorSystem +import common.WskTestHelpers +import spray.json._ +import spray.json.DefaultJsonProtocol._ +import whisk.core.WhiskConfig +import akka.http.scaladsl.model.StatusCode + +@RunWith(classOf[JUnitRunner]) +class CacheInvalidationTests + extends FlatSpec + with Matchers + with WskTestHelpers + with WskActorSystem { + + implicit val materializer = ActorMaterializer() + + val hosts = WhiskProperties.getProperty("controller.hosts").split(",") + val authKey = WhiskProperties.readAuthKey(WhiskProperties.getAuthFileForTesting) + + val timeout = 15.seconds + + def retry[T](fn: => T) = whisk.utils.retry(fn, 15, Some(1.second)) + + def updateAction(name: String, code: String, controllerInstance: Int = 0) = { + require(controllerInstance >= 0 && controllerInstance < hosts.length, "Controller instance not known.") + + val host = hosts(controllerInstance) + val port = WhiskProperties.getControllerBasePort + controllerInstance + + val body = JsObject("namespace" -> JsString("_"), "name" -> JsString(name), "exec" -> JsObject("kind" -> JsString("nodejs:default"), "code" -> JsString(code))) + + val request = Marshal(body).to[RequestEntity].flatMap { entity => + Http().singleRequest(HttpRequest( + method = HttpMethods.PUT, + uri = Uri().withScheme("http").withHost(host).withPort(port).withPath(Uri.Path(s"/api/v1/namespaces/_/actions/$name")).withQuery(Uri.Query("overwrite" -> true.toString)), + headers = List(Authorization(BasicHttpCredentials(authKey.split(":")(0), authKey.split(":")(1)))), + entity = entity)).flatMap { response => + val action = Unmarshal(response).to[JsObject].map { resBody => + withClue(s"Error in Body: $resBody")(response.status shouldBe StatusCodes.OK) + resBody + } + action + } + } + + Await.result(request, timeout) + } + + def getAction(name: String, controllerInstance: Int = 0, expectedCode: StatusCode = StatusCodes.OK) = { + require(controllerInstance >= 0 && controllerInstance < hosts.length, "Controller instance not known.") + + val host = hosts(controllerInstance) + val port = WhiskProperties.getControllerBasePort + controllerInstance + + val request = Http().singleRequest(HttpRequest( + method = HttpMethods.GET, + uri = Uri().withScheme("http").withHost(host).withPort(port).withPath(Uri.Path(s"/api/v1/namespaces/_/actions/$name")), + headers = List(Authorization(BasicHttpCredentials(authKey.split(":")(0), authKey.split(":")(1)))))).flatMap { response => + val action = Unmarshal(response).to[JsObject].map { resBody => + withClue(s"Wrong statuscode from controller. Body is: $resBody")(response.status shouldBe expectedCode) + resBody + } + action + } + + Await.result(request, timeout) + } + + def deleteAction(name: String, controllerInstance: Int = 0, expectedCode: Option[StatusCode] = Some(StatusCodes.OK)) = { + require(controllerInstance >= 0 && controllerInstance < hosts.length, "Controller instance not known.") + + val host = hosts(controllerInstance) + val port = WhiskProperties.getControllerBasePort + controllerInstance + + val request = Http().singleRequest(HttpRequest( + method = HttpMethods.DELETE, + uri = Uri().withScheme("http").withHost(host).withPort(port).withPath(Uri.Path(s"/api/v1/namespaces/_/actions/$name")), + headers = List(Authorization(BasicHttpCredentials(authKey.split(":")(0), authKey.split(":")(1)))))).flatMap { response => + val action = Unmarshal(response).to[JsObject].map { resBody => + expectedCode.map { code => + withClue(s"Wrong statuscode from controller. Body is: $resBody")(response.status shouldBe code) + } + resBody + } + action + } + + Await.result(request, timeout) + } + + behavior of "the cache" + + it should "be invalidated on updating an entity" in { + if (WhiskProperties.getProperty(WhiskConfig.controllerInstances).toInt >= 2) { + val actionName = "invalidateRemoteCacheOnUpdate" + + deleteAction(actionName, 0, None) + deleteAction(actionName, 1, None) + + // Create an action on controller0 + val createdAction = updateAction(actionName, "CODE_CODE_CODE", 0) + + // Get action from controller1 + val actionFromController1 = getAction(actionName, 1) + createdAction shouldBe actionFromController1 + + // Update the action on controller0 + val updatedAction = updateAction(actionName, "CODE_CODE", 0) + + retry({ + // Get action from controller1 + val updatedActionFromController1 = getAction(actionName, 1) + updatedAction shouldBe updatedActionFromController1 + }) + } + } + + it should "be invalidated on deleting an entity" in { + if (WhiskProperties.getProperty(WhiskConfig.controllerInstances).toInt >= 2) { + val actionName = "invalidateRemoteCacheOnDelete" + + deleteAction(actionName, 0, None) + deleteAction(actionName, 1, None) + + // Create an action on controller0 + val createdAction = updateAction(actionName, "CODE_CODE_CODE", 0) + // Get action from controller1 (Now its in the cache of controller 0 and 1) + val actionFromController1 = getAction(actionName, 1) + createdAction shouldBe actionFromController1 + + retry({ + // Delete the action on controller0 (It should be deleted automatically from the cache of controller1) + val updatedAction = deleteAction(actionName, 0) + // Get action from controller1 should fail with 404 + getAction(actionName, 1, StatusCodes.NotFound) + }) + } + } +} diff --git a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala index 150a04e..1aae330 100644 --- a/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala +++ b/tests/src/test/scala/whisk/core/controller/test/ActionsApiTests.scala @@ -469,7 +469,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { action.parameters, action.limits, action.version, action.publish, action.annotations ++ Parameters(WhiskAction.execFieldName, NODEJS6))) } - stream.toString should include regex (s"caching*.*${action.docid.asDocInfo}") + stream.toString should include(s"caching ${CacheKey(action)}") stream.reset() // second request should fetch from cache @@ -481,7 +481,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { action.publish, action.annotations ++ Parameters(WhiskAction.execFieldName, NODEJS6))) } - stream.toString should include regex (s"serving from cache:*.*${action.docid.asDocInfo}") + stream.toString should include(s"serving from cache: ${CacheKey(action)}") stream.reset() // delete should invalidate cache @@ -492,7 +492,7 @@ class ActionsApiTests extends ControllerTestCommon with WhiskActionsApi { action.parameters, action.limits, action.version, action.publish, action.annotations ++ Parameters(WhiskAction.execFieldName, NODEJS6))) } - stream.toString should include regex (s"invalidating*.*${action.docid.asDocInfo}") + stream.toString should include(s"invalidating ${CacheKey(action)}") stream.reset() } diff --git a/tests/src/test/scala/whisk/core/controller/test/AuthenticateTests.scala b/tests/src/test/scala/whisk/core/controller/test/AuthenticateTests.scala index e964856..2c6415b 100644 --- a/tests/src/test/scala/whisk/core/controller/test/AuthenticateTests.scala +++ b/tests/src/test/scala/whisk/core/controller/test/AuthenticateTests.scala @@ -62,14 +62,14 @@ class AuthenticateTests extends ControllerTestCommon with Authenticate { user.get shouldBe Identity(subject, ns.name, ns.authkey, Privilege.ALL) // first lookup should have been from datastore - stream.toString should include regex (s"serving from datastore: ${ns.authkey.uuid.asString}") + stream.toString should include(s"serving from datastore: ${CacheKey(ns.authkey)}") stream.reset() // repeat query, now should be served from cache val cachedUser = Await.result(validateCredentials(Some(pass))(transid()), dbOpTimeout) cachedUser.get shouldBe Identity(subject, ns.name, ns.authkey, Privilege.ALL) - stream.toString should include regex (s"serving from cache: ${ns.authkey.uuid.asString}") + stream.toString should include(s"serving from cache: ${CacheKey(ns.authkey)}") stream.reset() } diff --git a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala index dbfa17f..f899e81 100644 --- a/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala +++ b/tests/src/test/scala/whisk/core/controller/test/ControllerTestCommon.scala @@ -42,6 +42,7 @@ import whisk.core.connector.ActivationMessage import whisk.core.controller.RestApiCommons import whisk.core.controller.WhiskServices import whisk.core.database.DocumentFactory +import whisk.core.database.CacheChangeNotification import whisk.core.database.test.DbUtils import whisk.core.entitlement._ import whisk.core.entity._ @@ -85,6 +86,12 @@ protected trait ControllerTestCommon override def make = fixedId } + implicit val cacheChangeNotification = Some { + new CacheChangeNotification { + override def apply(k: CacheKey): Future[Unit] = Future.successful(()) + } + } + val entityStore = WhiskEntityStore.datastore(whiskConfig) val activationStore = WhiskActivationStore.datastore(whiskConfig) val authStore = WhiskAuthStore.datastore(whiskConfig) @@ -169,7 +176,6 @@ protected trait ControllerTestCommon with DefaultJsonProtocol { implicit val serdes = jsonFormat5(BadEntity.apply) override val cacheEnabled = true - override def cacheKeyForUpdate(w: BadEntity) = w.docid.asDocInfo } } diff --git a/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala b/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala index f59c5f1..6a8f272 100644 --- a/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala +++ b/tests/src/test/scala/whisk/core/database/test/MultipleReadersSingleWriterCacheTests.scala @@ -17,197 +17,58 @@ package whisk.core.database.test -import java.util.concurrent.CountDownLatch -import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future -import scala.concurrent.duration.DurationInt -import scala.concurrent.duration.FiniteDuration -import scala.language.postfixOps -import scala.util.Failure -import scala.util.Success +import org.junit.runner.RunWith import org.scalatest.FlatSpec import org.scalatest.Matchers +import org.scalatest.junit.JUnitRunner import common.StreamLogging import common.WskActorSystem -import whisk.common.Logging import whisk.common.TransactionId +import whisk.core.database.CacheChangeNotification import whisk.core.database.MultipleReadersSingleWriterCache +import whisk.core.entity.CacheKey -class MultipleReadersSingleWriterCacheTests(nIters: Int = 3) extends FlatSpec +@RunWith(classOf[JUnitRunner]) +class MultipleReadersSingleWriterCacheTests extends FlatSpec with Matchers with MultipleReadersSingleWriterCache[String, String] with WskActorSystem with StreamLogging { - "the cache" should "support simple CRUD" in { - val inhibits = doReadWriteRead("foo").go(0 seconds) - inhibits.debug(this) + behavior of "the cache" - inhibits.nReadInhibits.get should be(0) - cacheSize should be(1) - } - - "the cache" should "support concurrent CRUD to different keys" in { - // - // for the first iter, all reads are not-cached and each thread - // requests a different key, so we expect no read inhibits, and a - // bunch of write inhibits - // - val inhibits = doCRUD("CONCURRENT CRUD to different keys", { i => "foop_" + i }) - inhibits.nReadInhibits.get should be(0) - inhibits.nWriteInhibits.get should not be (0) - - // - // after the first iter, the keys already exist, so the first read - // should be cached, resulting in the writes proceeding more - // smoothly this time, thus inhibiting some of the second reads - // - for (i <- 1 to nIters - 1) { - doCRUD("CONCURRENT CRUD to different keys", { i => "foop_" + i }) - .nReadInhibits.get should not be (0) - } - } - - "the cache" should "support concurrent CRUD to shared keys" in { - for (i <- 1 to nIters) { - doCRUD("CONCURRENT CRUD to shared keys", sharedKeys) - .nWriteInhibits.get should not be (0) - } - } - - "the cache" should "support concurrent CRUD to shared keys (zero latency)" in { - var hasInhibits = false - for (i <- 1 to nIters) { - hasInhibits = doCRUD("concurrent CRUD to shared keys (zero latency)", sharedKeys, 0 seconds) - .hasInhibits - } - hasInhibits should not be (false) - } - - "the cache" should "support concurrent CRUD to shared keys (short latency)" in { - for (i <- 1 to nIters) { - doCRUD("concurrent CRUD to shared keys (short latency)", sharedKeys, 10 milliseconds) - .hasInhibits should be(true) - } - } - - "the cache" should "support concurrent CRUD to shared keys (medium latency)" in { - for (i <- 1 to nIters) { - doCRUD("concurrent CRUD to shared keys (medium latency)", sharedKeys, 100 milliseconds) - .hasInhibits should be(true) - } - } - - "the cache" should "support concurrent CRUD to shared keys (long latency)" in { - for (i <- 1 to nIters) { - doCRUD("CONCURRENT CRUD to shared keys (long latency)", sharedKeys, 5 seconds) - .nWriteInhibits.get should not be (0) - } - } - - "the cache" should "support concurrent CRUD to shared keys, with update first" in { - for (i <- 1 to nIters) { - doCRUD("CONCURRENT CRUD to shared keys, with update first", sharedKeys, 1 second, false) - .nWriteInhibits.get should be(0) - } - } - - def sharedKeys = { i: Int => "foop_" + (i % 2) } - - def doCRUD( - testName: String, - key: Int => String, - delay: FiniteDuration = 1 second, - readsFirst: Boolean = true, - nThreads: Int = 10): Inhibits = { - - System.out.println(testName); - - val exec = Executors.newFixedThreadPool(nThreads) - val inhibits = Inhibits() + it should "execute the callback on invalidating and updating an entry" in { + val ctr = new AtomicInteger(0) + val key = CacheKey("key") - for (i <- 1 to nThreads) { - exec.submit(new Runnable { def run() = { doReadWriteRead(key(i), inhibits, readsFirst).go(delay) } }) - } - - exec.shutdown - exec.awaitTermination(2, TimeUnit.MINUTES) - - inhibits.debug(this) - inhibits - } - - case class Inhibits( - nReadInhibits: AtomicInteger = new AtomicInteger(0), - nWriteInhibits: AtomicInteger = new AtomicInteger(0)) { - - def debug(from: AnyRef) = { - logging.debug(from, "InhibitedReads: " + nReadInhibits) - logging.debug(from, "InhibitedWrites: " + nWriteInhibits) - } - - def hasInhibits: Boolean = { nReadInhibits.get > 0 || nWriteInhibits.get > 0 } - } - - private case class doReadWriteRead(key: String, inhibits: Inhibits = Inhibits(), readFirst: Boolean = true)(implicit logging: Logging) { - def go(implicit delay: FiniteDuration): Inhibits = { - val latch = new CountDownLatch(2) - - implicit val transId = TransactionId.testing - - if (!readFirst) { - // we want to do the update before the first read - cacheUpdate(key, key, delayed("bar_b")) onFailure { - case t => - inhibits.nWriteInhibits.incrementAndGet(); - } - } - - cacheLookup(key, delayed("bar"), true) onComplete { - case Success(s) => { - latch.countDown() - } - case Failure(t) => { - latch.countDown() - inhibits.nReadInhibits.incrementAndGet(); - } - } - - if (readFirst) { - // we did the read before the update, so do the write next - cacheUpdate(key, key, delayed("bar_b")) onFailure { - case t => - inhibits.nWriteInhibits.incrementAndGet(); - } - } - - cacheLookup(key, delayed("bar_c"), true) onComplete { - case Success(s) => { - latch.countDown(); - } - case Failure(t) => { - inhibits.nReadInhibits.incrementAndGet(); - latch.countDown(); + implicit val transId = TransactionId.testing + lazy implicit val cacheUpdateNotifier = Some { + new CacheChangeNotification { + override def apply(key: CacheKey) = { + ctr.incrementAndGet() + Future.successful(()) } } + } - latch.await(2, TimeUnit.MINUTES) + // Create an cache entry + cacheUpdate("doc", key, Future.successful("db save successful")) + ctr.get shouldBe 1 - inhibits - } - } + // Callback should be called if entry exists + cacheInvalidate(key, Future.successful(())) + ctr.get shouldBe 2 + cacheUpdate("docdoc", key, Future.successful("update in db successful")) + ctr.get shouldBe 3 - private def delayed[W](v: W)(implicit delay: FiniteDuration): Future[W] = { - akka.pattern.after(duration = delay, using = actorSystem.scheduler)( - Future.successful { v }) + // Callback should be called if entry does not exist + cacheInvalidate(CacheKey("abc"), Future.successful(())) + ctr.get shouldBe 4 } - - /** we are using cache keys, so the update key is just the string itself */ - override protected def cacheKeyForUpdate(w: String): String = (w) } diff --git a/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala b/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala index 466f326..f881510 100644 --- a/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala +++ b/tests/src/test/scala/whisk/core/entity/test/DatastoreTests.scala @@ -32,6 +32,7 @@ import common.StreamLogging import common.WskActorSystem import whisk.core.WhiskConfig import whisk.core.database.DocumentConflictException +import whisk.core.database.CacheChangeNotification import whisk.core.database.NoDocumentException import whisk.core.database.test.DbUtils import whisk.core.entity._ @@ -50,6 +51,8 @@ class DatastoreTests extends FlatSpec val datastore = WhiskEntityStore.datastore(config) val authstore = WhiskAuthStore.datastore(config) + implicit val cacheUpdateNotifier: Option[CacheChangeNotification] = None + override def afterAll() { println("Shutting down store connections") datastore.shutdown() diff --git a/tests/src/test/scala/whisk/core/entity/test/MigrationEntities.scala b/tests/src/test/scala/whisk/core/entity/test/MigrationEntities.scala index 0c83ca5..a6af6e4 100644 --- a/tests/src/test/scala/whisk/core/entity/test/MigrationEntities.scala +++ b/tests/src/test/scala/whisk/core/entity/test/MigrationEntities.scala @@ -58,7 +58,6 @@ object OldWhiskRule override val collectionName = "rules" override implicit val serdes = jsonFormat8(OldWhiskRule.apply) - override def cacheKeyForUpdate(t: OldWhiskRule) = t.docid.asDocInfo } /** @@ -86,5 +85,4 @@ object OldWhiskTrigger override val collectionName = "triggers" override implicit val serdes = jsonFormat7(OldWhiskTrigger.apply) - override def cacheKeyForUpdate(t: OldWhiskTrigger) = t.docid.asDocInfo } -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].