This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new b99aaa3 Refactor and unit test PoolingRestClient. (#3416) b99aaa3 is described below commit b99aaa3c8b4e6e7dac167f7b6f9a5e9ed13989d5 Author: James Dubee <jwdu...@us.ibm.com> AuthorDate: Mon Jun 18 02:21:14 2018 -0400 Refactor and unit test PoolingRestClient. (#3416) --- .../logging/ElasticSearchRestClient.scala | 5 +- .../whisk/core/database/CloudantRestClient.scala | 3 + .../whisk/core/database/CouchDbRestClient.scala | 15 +- .../main/scala/whisk/http/PoolingRestClient.scala | 53 +++--- .../database/test/ExtendedCouchDbRestClient.scala | 13 +- .../scala/whisk/http/PoolingRestClientTests.scala | 181 +++++++++++++++++++++ 6 files changed, 227 insertions(+), 43 deletions(-) diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchRestClient.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchRestClient.scala index 2f34c26..1597ecc 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchRestClient.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchRestClient.scala @@ -32,6 +32,7 @@ import scala.util.Try import spray.json._ import whisk.http.PoolingRestClient +import whisk.http.PoolingRestClient._ trait EsQueryMethod trait EsOrder @@ -164,11 +165,11 @@ class ElasticSearchRestClient( private val baseHeaders: List[HttpHeader] = List(Accept(MediaTypes.`application/json`)) def info(headers: List[HttpHeader] = List.empty): Future[Either[StatusCode, JsObject]] = { - requestJson[JsObject](mkRequest(GET, Uri./, baseHeaders ++ headers)) + requestJson[JsObject](mkRequest(GET, Uri./, headers = baseHeaders ++ headers)) } def index(index: String, headers: List[HttpHeader] = List.empty): Future[Either[StatusCode, JsObject]] = { - requestJson[JsObject](mkRequest(GET, Uri(index), baseHeaders ++ headers)) + requestJson[JsObject](mkRequest(GET, Uri(index), headers = baseHeaders ++ headers)) } def search[T: RootJsonReader](index: String, diff --git a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala b/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala index 682df26..ce6a9a9 100644 --- a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala +++ b/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala @@ -22,9 +22,12 @@ import scala.concurrent.Future import akka.actor.ActorSystem import akka.http.scaladsl.model.HttpMethods import akka.http.scaladsl.model.StatusCode + import spray.json._ import spray.json.DefaultJsonProtocol._ + import whisk.common.Logging +import whisk.http.PoolingRestClient._ /** * This class only handles the basic communication to the proper endpoints diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala index 3e8143a..16e6cfa 100644 --- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala +++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala @@ -30,6 +30,7 @@ import spray.json._ import spray.json.DefaultJsonProtocol._ import whisk.common.Logging import whisk.http.PoolingRestClient +import whisk.http.PoolingRestClient._ /** * This class only handles the basic communication to the proper endpoints @@ -73,15 +74,15 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str // http://docs.couchdb.org/en/1.6.1/api/document/common.html#get--db-docid def getDoc(id: String): Future[Either[StatusCode, JsObject]] = - requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), baseHeaders)) + requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), headers = baseHeaders)) // http://docs.couchdb.org/en/1.6.1/api/document/common.html#get--db-docid def getDoc(id: String, rev: String): Future[Either[StatusCode, JsObject]] = - requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), baseHeaders ++ revHeader(rev))) + requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), headers = baseHeaders ++ revHeader(rev))) // http://docs.couchdb.org/en/1.6.1/api/document/common.html#delete--db-docid def deleteDoc(id: String, rev: String): Future[Either[StatusCode, JsObject]] = - requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db, id), baseHeaders ++ revHeader(rev))) + requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db, id), headers = baseHeaders ++ revHeader(rev))) // http://docs.couchdb.org/en/1.6.1/api/ddoc/views.html def executeView(designDoc: String, viewName: String)(startKey: List[Any] = Nil, @@ -137,7 +138,7 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str val viewUri = uri(db, "_design", designDoc, "_view", viewName).withQuery(Uri.Query(argMap)) - requestJson[JsObject](mkRequest(HttpMethods.GET, viewUri, baseHeaders)) + requestJson[JsObject](mkRequest(HttpMethods.GET, viewUri, headers = baseHeaders)) } // Streams an attachment to the database @@ -149,7 +150,7 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str source: Source[ByteString, _]): Future[Either[StatusCode, JsObject]] = { val entity = HttpEntity.Chunked(contentType, source.map(bs => HttpEntity.ChunkStreamPart(bs))) val request = - mkRequest0(HttpMethods.PUT, uri(db, id, attName), Future.successful(entity), baseHeaders ++ revHeader(rev)) + mkRequest(HttpMethods.PUT, uri(db, id, attName), Future.successful(entity), baseHeaders ++ revHeader(rev)) requestJson[JsObject](request) } @@ -159,9 +160,9 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str rev: String, attName: String, sink: Sink[ByteString, Future[T]]): Future[Either[StatusCode, (ContentType, T)]] = { - val request = mkRequest(HttpMethods.GET, uri(db, id, attName), baseHeaders ++ revHeader(rev)) + val httpRequest = mkRequest(HttpMethods.GET, uri(db, id, attName), headers = baseHeaders ++ revHeader(rev)) - request0(request) flatMap { response => + request(httpRequest) flatMap { response => if (response.status.isSuccess()) { response.entity.withoutSizeLimit().dataBytes.runWith(sink).map(r => Right(response.entity.contentType, r)) } else { diff --git a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala index 0da6630..b842e75 100644 --- a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala +++ b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala @@ -17,10 +17,9 @@ package whisk.http -import scala.concurrent.Future -import scala.concurrent.Promise -import scala.util.{Failure, Success} -import scala.util.Try +import scala.concurrent.{Future, Promise} +import scala.util.{Failure, Success, Try} +import scala.concurrent.ExecutionContext import akka.actor.ActorSystem import akka.http.scaladsl.Http @@ -81,32 +80,10 @@ class PoolingRestClient( }))(Keep.left) .run - // Prepares a request with the proper headers. - def mkRequest0(method: HttpMethod, - uri: Uri, - body: Future[MessageEntity], - headers: List[HttpHeader] = List.empty): Future[HttpRequest] = { - body.map { b => - HttpRequest(method, uri, headers, b) - } - } - - protected def mkRequest(method: HttpMethod, uri: Uri, headers: List[HttpHeader] = List.empty): Future[HttpRequest] = { - mkRequest0(method, uri, Future.successful(HttpEntity.Empty), headers) - } - - protected def mkJsonRequest(method: HttpMethod, - uri: Uri, - body: JsValue, - headers: List[HttpHeader] = List.empty): Future[HttpRequest] = { - val b = Marshal(body).to[MessageEntity] - mkRequest0(method, uri, b, headers) - } - // Enqueue a request, and return a future capturing the corresponding response. // WARNING: make sure that if the future response is not failed, its entity // be drained entirely or the connection will be kept open until timeouts kick in. - def request0(futureRequest: Future[HttpRequest]): Future[HttpResponse] = { + def request(futureRequest: Future[HttpRequest]): Future[HttpResponse] = { futureRequest flatMap { request => val promise = Promise[HttpResponse] @@ -131,8 +108,8 @@ class PoolingRestClient( } // Runs a request and returns either a JsObject, or a StatusCode if not 2xx. - protected def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] = { - request0(futureRequest) flatMap { response => + def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] = { + request(futureRequest) flatMap { response => if (response.status.isSuccess()) { Unmarshal(response.entity.withoutSizeLimit()).to[T].map { o => Right(o) @@ -162,3 +139,21 @@ class PoolingRestClient( Future.successful(()) } } + +object PoolingRestClient { + + def mkRequest(method: HttpMethod, + uri: Uri, + body: Future[MessageEntity] = Future.successful(HttpEntity.Empty), + headers: List[HttpHeader] = List.empty)(implicit ec: ExecutionContext): Future[HttpRequest] = { + body.map { b => + HttpRequest(method, uri, headers, b) + } + } + + def mkJsonRequest(method: HttpMethod, uri: Uri, body: JsValue, headers: List[HttpHeader] = List.empty)( + implicit ec: ExecutionContext): Future[HttpRequest] = { + val b = Marshal(body).to[MessageEntity] + mkRequest(method, uri, b, headers) + } +} diff --git a/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala b/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala index d554e31..a4019c9 100644 --- a/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala +++ b/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala @@ -21,10 +21,13 @@ import scala.concurrent.Future import akka.actor.ActorSystem import akka.http.scaladsl.model._ + import spray.json._ import spray.json.DefaultJsonProtocol._ + import whisk.common.Logging import whisk.core.database.CouchDbRestClient +import whisk.http.PoolingRestClient._ /** * Implementation of additional endpoints that should only be used in testing. @@ -39,22 +42,22 @@ class ExtendedCouchDbRestClient(protocol: String, // http://docs.couchdb.org/en/1.6.1/api/server/common.html#get-- def instanceInfo(): Future[Either[StatusCode, JsObject]] = - requestJson[JsObject](mkRequest(HttpMethods.GET, Uri./, baseHeaders)) + requestJson[JsObject](mkRequest(HttpMethods.GET, Uri./, headers = baseHeaders)) // http://docs.couchdb.org/en/1.6.1/api/server/common.html#all-dbs def dbs(): Future[Either[StatusCode, List[String]]] = { - requestJson[JsArray](mkRequest(HttpMethods.GET, uri("_all_dbs"), baseHeaders)).map { either => + requestJson[JsArray](mkRequest(HttpMethods.GET, uri("_all_dbs"), headers = baseHeaders)).map { either => either.right.map(_.convertTo[List[String]]) } } // http://docs.couchdb.org/en/1.6.1/api/database/common.html#put--db def createDb(): Future[Either[StatusCode, JsObject]] = - requestJson[JsObject](mkRequest(HttpMethods.PUT, uri(db), baseHeaders)) + requestJson[JsObject](mkRequest(HttpMethods.PUT, uri(db), headers = baseHeaders)) // http://docs.couchdb.org/en/1.6.1/api/database/common.html#delete--db def deleteDb(): Future[Either[StatusCode, JsObject]] = - requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db), baseHeaders)) + requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db), headers = baseHeaders)) // http://docs.couchdb.org/en/1.6.1/api/database/bulk-api.html#get--db-_all_docs def getAllDocs(skip: Option[Int] = None, @@ -75,6 +78,6 @@ class ExtendedCouchDbRestClient(protocol: String, .toMap val url = uri(db, "_all_docs").withQuery(Uri.Query(argMap)) - requestJson[JsObject](mkRequest(HttpMethods.GET, url, baseHeaders)) + requestJson[JsObject](mkRequest(HttpMethods.GET, url, headers = baseHeaders)) } } diff --git a/tests/src/test/scala/whisk/http/PoolingRestClientTests.scala b/tests/src/test/scala/whisk/http/PoolingRestClientTests.scala new file mode 100644 index 0000000..7213e4a --- /dev/null +++ b/tests/src/test/scala/whisk/http/PoolingRestClientTests.scala @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.http + +import org.junit.runner.RunWith +import org.scalatest.{FlatSpecLike, Matchers} +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.junit.JUnitRunner + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.scaladsl.Flow +import akka.stream.ActorMaterializer +import akka.testkit.TestKit +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.HttpMethods.{GET, POST} +import akka.http.scaladsl.model.StatusCodes.{InternalServerError, NotFound} +import akka.http.scaladsl.model.headers.RawHeader +import akka.http.scaladsl.unmarshalling.Unmarshaller.UnsupportedContentTypeException + +import common.StreamLogging + +import spray.json.JsObject +import spray.json.DefaultJsonProtocol._ + +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future, Promise, TimeoutException} +import scala.util.{Success, Try} + +import whisk.http.PoolingRestClient._ + +@RunWith(classOf[JUnitRunner]) +class PoolingRestClientTests + extends TestKit(ActorSystem("PoolingRestClientTests")) + with FlatSpecLike + with Matchers + with ScalaFutures + with StreamLogging { + implicit val ec: ExecutionContext = system.dispatcher + implicit val materializer: ActorMaterializer = ActorMaterializer() + + def testFlow(httpResponse: HttpResponse = HttpResponse(), httpRequest: HttpRequest = HttpRequest()) + : Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), NotUsed] = + Flow[(HttpRequest, Promise[HttpResponse])] + .mapAsyncUnordered(1) { + case (request, userContext) => + request shouldBe httpRequest + Future.successful((Success(httpResponse), userContext)) + } + + def failFlow(httpResponse: HttpResponse = HttpResponse(), httpRequest: HttpRequest = HttpRequest()) + : Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), NotUsed] = + Flow[(HttpRequest, Promise[HttpResponse])] + .mapAsyncUnordered(1) { + case (request, userContext) => + Future.failed(new Exception) + } + + def await[T](awaitable: Future[T], timeout: FiniteDuration = 10.seconds) = Await.result(awaitable, timeout) + + behavior of "Pooling REST Client" + + it should "error when configuration protocol is invalid" in { + a[IllegalArgumentException] should be thrownBy new PoolingRestClient("invalid", "host", 443, 1) + } + + it should "get a non-200 status code when performing a request" in { + val httpResponse = HttpResponse(InternalServerError) + val httpRequest = HttpRequest() + val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse))) + + await(poolingRestClient.request(Future.successful(httpRequest))) shouldBe httpResponse + } + + it should "return payload from a request" in { + val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint)) + val httpRequest = HttpRequest() + val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse))) + + await(poolingRestClient.request(Future.successful(httpRequest))) shouldBe httpResponse + } + + it should "send headers when making a request" in { + val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint)) + val httpRequest = HttpRequest(headers = List(RawHeader("key", "value"))) + val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest))) + + await(poolingRestClient.request(Future.successful(httpRequest))) shouldBe httpResponse + } + + it should "send uri when making a request" in { + val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint)) + val httpRequest = HttpRequest(uri = Uri("/some/where")) + val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest))) + + await(poolingRestClient.request(Future.successful(httpRequest))) shouldBe httpResponse + } + + it should "send a payload when making a request" in { + val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint)) + val httpRequest = HttpRequest(POST, entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, "payload")) + val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest))) + + await(poolingRestClient.request(Future.successful(httpRequest))) shouldBe httpResponse + } + + it should "return JSON when making a request" in { + val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint)) + val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint)) + val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest))) + val request = mkJsonRequest(GET, Uri./, JsObject(), List.empty) + + await(poolingRestClient.requestJson[JsObject](request)) shouldBe Right(JsObject()) + } + + it should "throw timeout exception when Future fails in httpFlow" in { + val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint)) + val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint)) + val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(failFlow(httpResponse, httpRequest))) + val request = mkJsonRequest(GET, Uri./, JsObject(), List.empty) + + a[TimeoutException] should be thrownBy await(poolingRestClient.requestJson[JsObject](request)) + } + + it should "return a status code on request failure" in { + val httpResponse = HttpResponse(NotFound) + val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint)) + val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest))) + val request = mkJsonRequest(GET, Uri./, JsObject(), List.empty) + + await(poolingRestClient.requestJson[JsObject](request)) shouldBe Left(NotFound) + } + + it should "throw an unsupported content-type exception when unexpected content-type is returned" in { + val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, "plain text")) + val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint)) + val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest))) + val request = mkJsonRequest(GET, Uri./, JsObject(), List.empty) + + a[UnsupportedContentTypeException] should be thrownBy await(poolingRestClient.requestJson[JsObject](request)) + } + + it should "create an HttpRequest without a payload" in { + val httpRequest = HttpRequest() + + await(mkRequest(GET, Uri./)) shouldBe httpRequest + } + + it should "create an HttpRequest with a JSON payload" in { + val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint)) + + await(mkJsonRequest(GET, Uri./, JsObject(), List.empty)) shouldBe httpRequest + } + + it should "create an HttpRequest with a payload" in { + val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint)) + val request = mkRequest( + GET, + Uri./, + Future.successful(HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint)), + List.empty) + + await(request) shouldBe httpRequest + } + +} -- To stop receiving notification emails like this one, please contact markusthoem...@apache.org.