markusthoemmes closed pull request #2957: splunk logstore
URL: https://github.com/apache/incubator-openwhisk/pull/2957
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala 
b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 5d3fadab3b..c538d6ec56 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -258,4 +258,7 @@ object ConfigKeys {
 
   val transactions = "whisk.transactions"
   val stride = s"$transactions.stride"
+
+  val logStore = "whisk.logstore"
+  val splunk = s"$logStore.splunk"
 }
diff --git 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
new file mode 100644
index 0000000000..465fb2532f
--- /dev/null
+++ 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import akka.actor.ActorSystem
+import whisk.core.entity.Identity
+import whisk.common.TransactionId
+import whisk.core.containerpool.Container
+import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, 
WhiskActivation}
+
+import scala.concurrent.Future
+
+/**
+ * Docker log driver based LogStore impl. Uses docker log driver to emit 
container logs to an external store.
+ * Fetching logs from that external store is not provided in this trait. This 
SPI requires the
+ * ContainerArgs.extraArgs to be used to indicate where the logs are shipped.
+ * see 
https://docs.docker.com/config/containers/logging/configure/#configure-the-logging-driver-for-a-container
+ *
+ * Fetching logs here is a NOOP, but extended versions can customize fetching, 
e.g. from ELK or Splunk etc.
+ */
+class LogDriverLogStore(actorSystem: ActorSystem) extends LogStore {
+
+  /** Indicate --log-driver and --log-opt flags via 
ContainerArgsConfig.extraArgs */
+  override def containerParameters = Map()
+
+  def collectLogs(transid: TransactionId,
+                  user: Identity,
+                  activation: WhiskActivation,
+                  container: Container,
+                  action: ExecutableWhiskAction): Future[ActivationLogs] =
+    Future.successful(ActivationLogs()) //no logs collected when using docker 
log drivers (see DockerLogStore for json-file exception)
+
+  /** no logs exposed to API/CLI using only the LogDriverLogStore; use an 
extended version,
+   * e.g. the SplunkLogStore to expose logs from some external source */
+  def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] =
+    Future.successful(ActivationLogs(Vector("Logs are not available.")))
+}
+
+object LogDriverLogStoreProvider extends LogStoreProvider {
+  override def logStore(actorSystem: ActorSystem) = new 
LogDriverLogStore(actorSystem)
+}
diff --git 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
new file mode 100644
index 0000000000..596b776131
--- /dev/null
+++ 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/SplunkLogStore.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.client.RequestBuilding.Post
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import akka.http.scaladsl.model.FormData
+import akka.http.scaladsl.model.HttpRequest
+import akka.http.scaladsl.model.HttpResponse
+import akka.http.scaladsl.model.Uri
+import akka.http.scaladsl.model.Uri.Path
+import akka.http.scaladsl.model.headers.Authorization
+import akka.http.scaladsl.model.headers.BasicHttpCredentials
+import akka.http.scaladsl.unmarshalling.Unmarshal
+import akka.stream.ActorMaterializer
+import akka.stream.OverflowStrategy
+import akka.stream.QueueOfferResult
+import akka.stream.scaladsl.Flow
+import akka.stream.scaladsl.Keep
+import akka.stream.scaladsl.Sink
+import akka.stream.scaladsl.Source
+import com.typesafe.sslconfig.akka.AkkaSSLConfig
+import pureconfig._
+import scala.concurrent.Future
+import scala.concurrent.Promise
+import scala.util.Failure
+import scala.util.Success
+import scala.util.Try
+import spray.json._
+import whisk.common.AkkaLogging
+import whisk.core.ConfigKeys
+import whisk.core.entity.ActivationLogs
+import whisk.core.entity.WhiskActivation
+
+case class SplunkLogStoreConfig(host: String,
+                                port: Int,
+                                username: String,
+                                password: String,
+                                index: String,
+                                logMessageField: String,
+                                activationIdField: String,
+                                disableSNI: Boolean)
+case class SplunkResponse(results: Vector[JsObject])
+object SplunkResponseJsonProtocol extends DefaultJsonProtocol {
+  implicit val orderFormat = jsonFormat1(SplunkResponse)
+}
+
+/**
+ * A Splunk based impl of LogDriverLogStore. Logs are routed to splunk via 
docker log driver, and retrieved via Splunk REST API
+ *
+ * @param actorSystem
+ * @param httpFlow Optional Flow to use for HttpRequest handling (to enable 
stream based tests)
+ */
+class SplunkLogStore(
+  actorSystem: ActorSystem,
+  httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), 
(Try[HttpResponse], Promise[HttpResponse]), Any]] = None,
+  splunkConfig: SplunkLogStoreConfig = 
loadConfigOrThrow[SplunkLogStoreConfig](ConfigKeys.splunk))
+    extends LogDriverLogStore(actorSystem) {
+  implicit val as = actorSystem
+  implicit val ec = as.dispatcher
+  implicit val materializer = ActorMaterializer()
+  private val logging = new AkkaLogging(actorSystem.log)
+
+  private val splunkApi = Path / "services" / "search" / "jobs" //see 
http://docs.splunk.com/Documentation/Splunk/6.6.3/RESTREF/RESTsearch#search.2Fjobs
+
+  import SplunkResponseJsonProtocol._
+
+  val maxPendingRequests = 500
+
+  val defaultHttpFlow = 
Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](
+    host = splunkConfig.host,
+    port = splunkConfig.port,
+    connectionContext =
+      if (splunkConfig.disableSNI)
+        Http().createClientHttpsContext(AkkaSSLConfig().mapSettings(s => 
s.withLoose(s.loose.withDisableSNI(true))))
+      else Http().defaultClientHttpsContext)
+
+  override def fetchLogs(activation: WhiskActivation): Future[ActivationLogs] 
= {
+
+    //example curl request:
+    //    curl -u  username:password -k 
https://splunkhost:port/services/search/jobs -d exec_mode=oneshot -d 
output_mode=json -d "search=search index=\"someindex\" | spath=activation_id | 
search activation_id=a930e5ae4ad4455c8f2505d665aad282 |  table log_message" -d 
"earliest_time=2017-08-29T12:00:00" -d "latest_time=2017-10-29T12:00:00"
+    //example response:
+    //    
{"preview":false,"init_offset":0,"messages":[],"fields":[{"name":"log_message"}],"results":[{"log_message":"some
 log message"}], "highlighted":{}}
+    //note: splunk returns results in reverse-chronological order, therefore 
we include "| reverse" to cause results to arrive in chronological order
+    val search =
+      s"""search index="${splunkConfig.index}"| spath 
${splunkConfig.activationIdField}| search 
${splunkConfig.activationIdField}=${activation.activationId.toString}| table 
${splunkConfig.logMessageField}| reverse"""
+
+    val entity = FormData(
+      Map(
+        "exec_mode" -> "oneshot",
+        "search" -> search,
+        "output_mode" -> "json",
+        "earliest_time" -> activation.start.toString, //assume that activation 
start/end are UTC zone, and splunk events are the same
+        "latest_time" -> activation.end
+          .plusSeconds(5) //add 5s to avoid a timerange of 0 on short-lived 
activations
+          .toString)).toEntity
+
+    logging.debug(this, "sending request")
+    queueRequest(
+      Post(Uri(path = splunkApi))
+        .withEntity(entity)
+        
.withHeaders(List(Authorization(BasicHttpCredentials(splunkConfig.username, 
splunkConfig.password)))))
+      .flatMap(response => {
+        logging.debug(this, s"splunk API response ${response}")
+        Unmarshal(response.entity)
+          .to[SplunkResponse]
+          .map(r => {
+            ActivationLogs(
+              r.results
+                .map(_.fields(splunkConfig.logMessageField).convertTo[String]))
+          })
+      })
+  }
+
+  //based on 
http://doc.akka.io/docs/akka-http/10.0.6/scala/http/client-side/host-level.html
+  val queue =
+    Source
+      .queue[(HttpRequest, Promise[HttpResponse])](maxPendingRequests, 
OverflowStrategy.dropNew)
+      .via(httpFlow.getOrElse(defaultHttpFlow))
+      .toMat(Sink.foreach({
+        case ((Success(resp), p)) => p.success(resp)
+        case ((Failure(e), p))    => p.failure(e)
+      }))(Keep.left)
+      .run()
+
+  def queueRequest(request: HttpRequest): Future[HttpResponse] = {
+    val responsePromise = Promise[HttpResponse]()
+    queue.offer(request -> responsePromise).flatMap {
+      case QueueOfferResult.Enqueued => responsePromise.future
+      case QueueOfferResult.Dropped =>
+        Future.failed(new RuntimeException("Splunk API Client Queue 
overflowed. Try again later."))
+      case QueueOfferResult.Failure(ex) => Future.failed(ex)
+      case QueueOfferResult.QueueClosed =>
+        Future.failed(
+          new RuntimeException(
+            "Splunk API Client Queue was closed (pool shut down) while running 
the request. Try again later."))
+    }
+  }
+}
+
+object SplunkLogStoreProvider extends LogStoreProvider {
+  override def logStore(actorSystem: ActorSystem) = new 
SplunkLogStore(actorSystem)
+}
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/logging/LogDriverLogStoreTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/logging/LogDriverLogStoreTests.scala
new file mode 100644
index 0000000000..b87e42322e
--- /dev/null
+++ 
b/tests/src/test/scala/whisk/core/containerpool/logging/LogDriverLogStoreTests.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import akka.actor.ActorSystem
+import akka.testkit.TestKit
+import org.scalatest.FlatSpecLike
+import org.scalatest.Matchers
+import whisk.core.containerpool.ContainerArgsConfig
+
+class LogDriverLogStoreTests extends TestKit(ActorSystem("LogDriverLogStore")) 
with FlatSpecLike with Matchers {
+
+  val testConfig = ContainerArgsConfig(
+    network = "network",
+    extraArgs =
+      Map("log-driver" -> Set("fluentd"), "log-opt" -> 
Set("fluentd-address=localhost:24225", "tag=OW_CONTAINER")))
+  behavior of "LogDriver LogStore"
+
+  it should "set the container parameters from the config" in {
+    val logDriverLogStore = new LogDriverLogStore(system)
+    logDriverLogStore.containerParameters shouldBe Map()
+  }
+}
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
new file mode 100644
index 0000000000..08faa479f0
--- /dev/null
+++ 
b/tests/src/test/scala/whisk/core/containerpool/logging/SplunkLogStoreTests.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.http.javadsl.model.headers.Authorization
+import akka.http.scaladsl.model.ContentTypes
+import akka.http.scaladsl.model.FormData
+import akka.http.scaladsl.model.HttpEntity
+import akka.http.scaladsl.model.HttpRequest
+import akka.http.scaladsl.model.HttpResponse
+import akka.http.scaladsl.model.StatusCodes
+import akka.http.scaladsl.unmarshalling.Unmarshal
+import akka.stream.ActorMaterializer
+import akka.stream.StreamTcpException
+import akka.stream.scaladsl.Flow
+import akka.testkit.TestKit
+import common.StreamLogging
+import java.time.ZonedDateTime
+import org.scalatest.Matchers
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.concurrent.ScalaFutures
+import scala.util.Failure
+import whisk.core.entity.ActivationLogs
+import org.scalatest.FlatSpecLike
+import pureconfig.error.ConfigReaderException
+import scala.concurrent.Await
+import scala.concurrent.Promise
+import scala.concurrent.duration._
+import scala.util.Success
+import scala.util.Try
+import spray.json.JsNumber
+import spray.json.JsObject
+import spray.json._
+import whisk.core.entity.ActionLimits
+import whisk.core.entity.ActivationId
+import whisk.core.entity.ActivationResponse
+import whisk.core.entity.EntityName
+import whisk.core.entity.EntityPath
+import whisk.core.entity.LogLimit
+import whisk.core.entity.MemoryLimit
+import whisk.core.entity.Parameters
+import whisk.core.entity.Subject
+import whisk.core.entity.TimeLimit
+import whisk.core.entity.WhiskActivation
+import whisk.core.entity.size._
+
+class SplunkLogStoreTests
+    extends TestKit(ActorSystem("SplunkLogStore"))
+    with FlatSpecLike
+    with Matchers
+    with ScalaFutures
+    with StreamLogging {
+  val testConfig = SplunkLogStoreConfig(
+    "splunk-host",
+    8080,
+    "splunk-user",
+    "splunk-pass",
+    "splunk-index",
+    "log_message",
+    "activation_id",
+    false)
+
+  behavior of "Splunk LogStore"
+
+  val startTime = "2007-12-03T10:15:30Z"
+  val endTime = "2007-12-03T10:15:45Z"
+  val endTimePlus5 = "2007-12-03T10:15:50Z" //queried end time range is 
endTime+5
+
+  val activation = WhiskActivation(
+    namespace = EntityPath("ns"),
+    name = EntityName("a"),
+    Subject(),
+    activationId = ActivationId(),
+    start = ZonedDateTime.parse(startTime).toInstant,
+    end = ZonedDateTime.parse(endTime).toInstant,
+    response = ActivationResponse.success(Some(JsObject("res" -> 
JsNumber(1)))),
+    annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), 
MemoryLimit(128.MB), LogLimit(1.MB)).toJson),
+    duration = Some(123))
+
+  implicit val ec = system.dispatcher
+  implicit val materializer = ActorMaterializer()
+
+  val testFlow: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], 
Promise[HttpResponse]), NotUsed] =
+    Flow[(HttpRequest, Promise[HttpResponse])]
+      .mapAsyncUnordered(1) {
+        case (request, userContext) =>
+          //we use cachedHostConnectionPoolHttps so won't get the host+port 
with the request
+          Unmarshal(request.entity)
+            .to[FormData]
+            .map { form =>
+              val earliestTime = form.fields.get("earliest_time")
+              val latestTime = form.fields.get("latest_time")
+              val outputMode = form.fields.get("output_mode")
+              val search = form.fields.get("search")
+              val execMode = form.fields.get("exec_mode")
+
+              request.uri.path.toString() shouldBe "/services/search/jobs"
+              request.headers shouldBe 
List(Authorization.basic(testConfig.username, testConfig.password))
+              earliestTime shouldBe Some(startTime)
+              latestTime shouldBe Some(endTimePlus5)
+              outputMode shouldBe Some("json")
+              execMode shouldBe Some("oneshot")
+              search shouldBe Some(
+                s"""search index="${testConfig.index}"| spath 
${testConfig.activationIdField}| search 
${testConfig.activationIdField}=${activation.activationId.toString}| table 
${testConfig.logMessageField}| reverse""")
+
+              (
+                Success(
+                  HttpResponse(
+                    StatusCodes.OK,
+                    entity = HttpEntity(
+                      ContentTypes.`application/json`,
+                      
"""{"preview":false,"init_offset":0,"messages":[],"fields":[{"name":"log_message"}],"results":[{"log_message":"some
 log message"},{"log_message":"some other log message"}], 
"highlighted":{}}"""))),
+                userContext)
+            }
+            .recover {
+              case e =>
+                println("failed")
+                (Failure(e), userContext)
+            }
+      }
+  val failFlow: Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], 
Promise[HttpResponse]), NotUsed] =
+    Flow[(HttpRequest, Promise[HttpResponse])]
+      .map {
+        case (request, userContext) =>
+          (Success(HttpResponse(StatusCodes.InternalServerError)), userContext)
+
+      }
+
+  it should "fail when loading out of box configs (because 
whisk.logstore.splunk doesn't exist)" in {
+    assertThrows[ConfigReaderException[_]] {
+      val splunkStore = new SplunkLogStore(system)
+    }
+
+  }
+  it should "find logs based on activation timestamps" in {
+    //use the a flow that asserts the request structure and provides a 
response in the expected format
+    val splunkStore = new SplunkLogStore(system, Some(testFlow), testConfig)
+    val result = Await.result(splunkStore.fetchLogs(activation), 1.second)
+    result shouldBe ActivationLogs(Vector("some log message", "some other log 
message"))
+  }
+
+  it should "fail to connect to bogus host" in {
+    //use the default http flow with the default bogus-host config
+    val splunkStore = new SplunkLogStore(system, splunkConfig = testConfig)
+    val result = splunkStore.fetchLogs(activation)
+    whenReady(result.failed, Timeout(1.second)) { ex =>
+      ex shouldBe an[StreamTcpException]
+    }
+  }
+  it should "display an error if API cannot be reached" in {
+    //use a flow that generates a 500 response
+    val splunkStore = new SplunkLogStore(system, Some(failFlow), testConfig)
+    val result = splunkStore.fetchLogs(activation)
+    whenReady(result.failed, Timeout(1.second)) { ex =>
+      ex shouldBe an[RuntimeException]
+    }
+
+  }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to