This is an automated email from the ASF dual-hosted git repository. cbickel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 28328c1 Use proper API host and reduce logging noise. (#3276) 28328c1 is described below commit 28328c1043b5cbe0e55c4588fa43960e6c860b04 Author: Markus Thömmes <markusthoem...@me.com> AuthorDate: Wed Feb 14 08:08:04 2018 +0100 Use proper API host and reduce logging noise. (#3276) Co-authored-by: cbickel <git...@cbickel.de> Co-authored-by: jeremiaswerner <jeremias.wer...@gmail.com> --- tests/src/test/scala/common/rest/WskRest.scala | 72 ++++++++------------------ 1 file changed, 23 insertions(+), 49 deletions(-) diff --git a/tests/src/test/scala/common/rest/WskRest.scala b/tests/src/test/scala/common/rest/WskRest.scala index a8a9c16..6e30cd7 100644 --- a/tests/src/test/scala/common/rest/WskRest.scala +++ b/tests/src/test/scala/common/rest/WskRest.scala @@ -21,22 +21,19 @@ import java.io.File import java.time.Instant import java.util.Base64 import java.security.cert.X509Certificate + import org.apache.commons.io.FileUtils import org.scalatest.Matchers import org.scalatest.FlatSpec import org.scalatest.concurrent.ScalaFutures import org.scalatest.time.Span.convertDurationToSpan -import scala.Left -import scala.Right + import scala.collection.JavaConversions.mapAsJavaMap import scala.collection.mutable.Buffer import scala.collection.immutable.Seq import scala.concurrent.duration.Duration import scala.concurrent.duration.DurationInt -import scala.concurrent.{Future, Promise} import scala.language.postfixOps -import scala.util.Failure -import scala.util.Success import scala.util.Try import scala.util.{Failure, Success} import akka.http.scaladsl.model.StatusCode @@ -53,16 +50,13 @@ import akka.http.scaladsl.model.ContentTypes import akka.http.scaladsl.Http import akka.http.scaladsl.model.headers.BasicHttpCredentials import akka.http.scaladsl.model.Uri -import akka.http.scaladsl.model.Uri.Path +import akka.http.scaladsl.model.Uri.{Path, Query} import akka.http.scaladsl.model.HttpMethods.DELETE import akka.http.scaladsl.model.HttpMethods.GET import akka.http.scaladsl.model.HttpMethods.POST import akka.http.scaladsl.model.HttpMethods.PUT import akka.http.scaladsl.HttpsConnectionContext -import akka.http.scaladsl.settings.ConnectionPoolSettings import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{Keep, Sink, Source} -import akka.stream.{OverflowStrategy, QueueOfferResult} import spray.json._ import spray.json.DefaultJsonProtocol._ import spray.json.JsObject @@ -75,7 +69,6 @@ import common.HasActivation import common.RunWskCmd import common.TestUtils import common.TestUtils.SUCCESS_EXIT -import common.TestUtils.DONTCARE_EXIT import common.TestUtils.ANY_ERROR_EXIT import common.TestUtils.DONTCARE_EXIT import common.TestUtils.RunResult @@ -86,6 +79,7 @@ import common.WskProps import whisk.core.entity.ByteSize import whisk.utils.retry import javax.net.ssl.{HostnameVerifier, KeyManager, SSLContext, SSLSession, X509TrustManager} + import com.typesafe.sslconfig.akka.AkkaSSLConfig import java.nio.charset.StandardCharsets @@ -1230,49 +1224,29 @@ class RunWskRestCmd() extends FlatSpec with RunWskCmd with Matchers with ScalaFu else "" } - def request(method: HttpMethod, - uri: Uri, - body: Option[String] = None, - creds: BasicHttpCredentials): Future[HttpResponse] = { + def requestEntity(method: HttpMethod, path: Path, params: Map[String, String] = Map(), body: Option[String] = None)( + implicit wp: WskProps): HttpResponse = { + + val creds = getBasicHttpCredentials(wp) + + // startsWith(http) includes https + val hostWithScheme = if (wp.apihost.startsWith("http")) { + Uri(wp.apihost) + } else { + Uri().withScheme("https").withHost(wp.apihost) + } + val entity = body map { b => HttpEntity(ContentTypes.`application/json`, b) } getOrElse HttpEntity(ContentTypes.`application/json`, "") - val request = HttpRequest(method, uri, List(Authorization(creds)), entity = entity) - val connectionPoolSettings = - ConnectionPoolSettings(actorSystem).withMaxOpenRequests(maxOpenRequest).withIdleTimeout(idleTimeout) - val pool = Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]]( - host = WhiskProperties.getApiHost, - connectionContext = connectionContext, - settings = connectionPoolSettings) - val queue = Source - .queue[(HttpRequest, Promise[HttpResponse])](queueSize, OverflowStrategy.dropNew) - .via(pool) - .toMat(Sink.foreach({ - case ((Success(resp), p)) => p.success(resp) - case ((Failure(e), p)) => p.failure(e) - }))(Keep.left) - .run - - val promise = Promise[HttpResponse] - val responsePromise = Promise[HttpResponse]() - queue.offer(request -> responsePromise).flatMap { - case QueueOfferResult.Enqueued => responsePromise.future - case QueueOfferResult.Dropped => - Future.failed(new RuntimeException("Queue has overflowed. Please try again later.")) - case QueueOfferResult.Failure(ex) => Future.failed(ex) - case QueueOfferResult.QueueClosed => - Future.failed( - new RuntimeException("Queue was closed (pool shut down) while running the request. Please try again later.")) - } - } - def requestEntity(method: HttpMethod, - path: Path, - params: Map[String, String] = Map(), - body: Option[String] = None, - whiskUrl: Uri = Uri(""))(implicit wp: WskProps): HttpResponse = { - val creds = getBasicHttpCredentials(wp) - request(method, whiskUrl.withPath(path).withQuery(Uri.Query(params)), body, creds = creds).futureValue + val request = + HttpRequest( + method, + hostWithScheme.withPath(path).withQuery(Query(params)), + List(Authorization(creds)), + entity = entity) + Http().singleRequest(request, connectionContext).futureValue } private def getBasicHttpCredentials(wp: WskProps): BasicHttpCredentials = { -- To stop receiving notification emails like this one, please contact cbic...@apache.org.