[GitHub] [openwhisk] chetanmeh commented on a change in pull request #4628: Embedded Kafka support in OpenWhisk Standalone mode

2019-09-19 Thread GitBox
chetanmeh commented on a change in pull request #4628: Embedded Kafka support 
in OpenWhisk Standalone mode
URL: https://github.com/apache/openwhisk/pull/4628#discussion_r326465642
 
 

 ##
 File path: core/standalone/README.md
 ##
 @@ -204,7 +219,35 @@ Api Gateway mode can be enabled via `--api-gw` flag. In 
this mode upon launch a
 would be launched on port `3234` (can be changed with `--api-gw-port`). In 
this mode you can make use of the
 [api gateway][4] support.
 
+ Using Kafka
+
+Standalone OpenWhisk supports launching an [embedded kafka][5]. This mode is 
mostly useful for developers working on OpenWhisk
+implementation itself.
+
+```
+java -jar openwhisk-standalone.jar --kafka
+```
+
+It also supports launching a Kafka UI based on [Kafdrop 3][6] which enables 
seeing the topics created and structure of messages
 
 Review comment:
   No thats the actual name [Kafdrop 
3](https://github.com/obsidiandynamics/kafdrop)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [openwhisk] chetanmeh commented on a change in pull request #4628: Embedded Kafka support in OpenWhisk Standalone mode

2019-09-19 Thread GitBox
chetanmeh commented on a change in pull request #4628: Embedded Kafka support 
in OpenWhisk Standalone mode
URL: https://github.com/apache/openwhisk/pull/4628#discussion_r326198456
 
 

 ##
 File path: 
core/standalone/src/main/scala/org/apache/openwhisk/standalone/StandaloneOpenWhisk.scala
 ##
 @@ -393,6 +417,42 @@ object StandaloneOpenWhisk extends SLF4JLogging {
 Await.result(g, 5.minutes)
   }
 
+  private def startKafka(workDir: File, dockerClient: StandaloneDockerClient, 
conf: Conf, kafkaUi: Boolean)(
+implicit logging: Logging,
+as: ActorSystem,
+ec: ExecutionContext,
+materializer: ActorMaterializer): (Int, Seq[ServiceContainer]) = {
+val kafkaPort = getPort(conf.kafkaPort.toOption, 9092)
 
 Review comment:
   Ack. Done as you suggested by moving defaults to constant and reusing that 
across various places


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [openwhisk] chetanmeh commented on a change in pull request #4628: Embedded Kafka support in OpenWhisk Standalone mode

2019-09-19 Thread GitBox
chetanmeh commented on a change in pull request #4628: Embedded Kafka support 
in OpenWhisk Standalone mode
URL: https://github.com/apache/openwhisk/pull/4628#discussion_r326198147
 
 

 ##
 File path: 
core/standalone/src/main/scala/org/apache/openwhisk/standalone/StandaloneOpenWhisk.scala
 ##
 @@ -63,6 +64,25 @@ class Conf(arguments: Seq[String]) extends 
ScallopConf(arguments) {
   val apiGwPort = opt[Int](descr = "Api Gateway Port", default = Some(3234), 
noshort = true)
   val dataDir = opt[File](descr = "Directory used for storage", default = 
Some(StandaloneOpenWhisk.defaultWorkDir))
 
+  val kafka = opt[Boolean](descr = "Enable embedded Kafka support", noshort = 
true)
+  val kafkaUi = opt[Boolean](descr = "Enable Kafka UI", noshort = true)
+
+  val kafkaPort = opt[Int](
+descr = "Kafka port. If not specified then 9092 or some random free port 
(if 9092 is busy) would be used",
+noshort = true,
+required = false)
 
 Review comment:
   👍 Done that now


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [openwhisk] chetanmeh commented on a change in pull request #4628: Embedded Kafka support in OpenWhisk Standalone mode

2019-09-19 Thread GitBox
chetanmeh commented on a change in pull request #4628: Embedded Kafka support 
in OpenWhisk Standalone mode
URL: https://github.com/apache/openwhisk/pull/4628#discussion_r326193031
 
 

 ##
 File path: 
core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala
 ##
 @@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.standalone
+
+import java.io.File
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import kafka.server.KafkaConfig
+import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
+import org.apache.commons.io.FileUtils
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.kafkaHosts
+import org.apache.openwhisk.core.entity.ControllerInstanceId
+import org.apache.openwhisk.core.loadBalancer.{LeanBalancer, LoadBalancer, 
LoadBalancerProvider}
+import 
org.apache.openwhisk.standalone.StandaloneDockerSupport.{checkOrAllocatePort, 
containerName, createRunCmd}
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.reflect.io.Directory
+import scala.util.Try
+
+class KafkaLauncher(docker: StandaloneDockerClient,
+kafkaPort: Int,
+kafkaDockerPort: Int,
+zkPort: Int,
+workDir: File,
+kafkaUi: Boolean)(implicit logging: Logging,
+  ec: ExecutionContext,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  tid: TransactionId) {
+
+  def run(): Future[Seq[ServiceContainer]] = {
+for {
+  kafkaSvcs <- runKafka()
+  uiSvcs <- if (kafkaUi) runKafkaUI() else 
Future.successful(Seq.empty[ServiceContainer])
+} yield kafkaSvcs ++ uiSvcs
+  }
+
+  def runKafka(): Future[Seq[ServiceContainer]] = {
+
+//Below setting based on 
https://rmoff.net/2018/08/02/kafka-listeners-explained/
+// We configure two listeners where one is used for host based application 
and other is used for docker based application
+// to connect to Kafka server running on host
+val brokerProps = Map(
+  KafkaConfig.ListenersProp -> 
s"LISTENER_LOCAL://localhost:$kafkaPort,LISTENER_DOCKER://localhost:$kafkaDockerPort",
+  KafkaConfig.AdvertisedListenersProp -> 
s"LISTENER_LOCAL://localhost:$kafkaPort,LISTENER_DOCKER://${StandaloneDockerSupport
+.getLocalHostIp()}:$kafkaDockerPort",
+  KafkaConfig.ListenerSecurityProtocolMapProp -> 
"LISTENER_LOCAL:PLAINTEXT,LISTENER_DOCKER:PLAINTEXT",
+  KafkaConfig.InterBrokerListenerNameProp -> "LISTENER_LOCAL")
+implicit val config: EmbeddedKafkaConfig =
+  EmbeddedKafkaConfig(kafkaPort = kafkaPort, zooKeeperPort = zkPort, 
customBrokerProperties = brokerProps)
+
+val t = Try {
+  EmbeddedKafka.startZooKeeper(createDir("zookeeper"))
+  EmbeddedKafka.startKafka(createDir("kafka"))
+}
+
+Future
+  .fromTry(t)
+  .map(
+_ =>
+  Seq(
+ServiceContainer(kafkaPort, s"localhost:$kafkaPort", "kafka"),
+ServiceContainer(
+  kafkaDockerPort,
+  s"${StandaloneDockerSupport.getLocalHostIp()}:$kafkaDockerPort",
+  "kafka-docker"),
+ServiceContainer(zkPort, "Zookeeper", "zookeeper")))
+  }
+
+  def runKafkaUI(): Future[Seq[ServiceContainer]] = {
+val hostIp = StandaloneDockerSupport.getLocalHostIp()
+val port = checkOrAllocatePort(9000)
+val env = Map(
+  "ZOOKEEPER_CONNECT" -> s"$hostIp:$zkPort",
+  "KAFKA_BROKERCONNECT" -> s"$hostIp:$kafkaDockerPort",
+  "JVM_OPTS" -> "-Xms32M -Xmx64M",
+  "SERVER_SERVLET_CONTEXTPATH" -> "/")
+
+logging.info(this, s"Starting Kafka Drop UI port: $port")
+val name = containerName("kafka-drop-ui")
+val params = Map("-p" -> Set(s"$port:9000"))
+val args = createRunCmd(name, env, params)
+
+val f = docker.runDetached("obsidiandynamics/kafdrop", args, true)
+f.map(_ => Seq(ServiceContainer(port, s"http://localhost:$port";, name)))
+ 

[GitHub] [openwhisk] chetanmeh commented on a change in pull request #4628: Embedded Kafka support in OpenWhisk Standalone mode

2019-09-19 Thread GitBox
chetanmeh commented on a change in pull request #4628: Embedded Kafka support 
in OpenWhisk Standalone mode
URL: https://github.com/apache/openwhisk/pull/4628#discussion_r326186606
 
 

 ##
 File path: 
core/standalone/src/main/scala/org/apache/openwhisk/standalone/StandaloneOpenWhisk.scala
 ##
 @@ -63,6 +64,25 @@ class Conf(arguments: Seq[String]) extends 
ScallopConf(arguments) {
   val apiGwPort = opt[Int](descr = "Api Gateway Port", default = Some(3234), 
noshort = true)
   val dataDir = opt[File](descr = "Directory used for storage", default = 
Some(StandaloneOpenWhisk.defaultWorkDir))
 
+  val kafka = opt[Boolean](descr = "Enable embedded Kafka support", noshort = 
true)
+  val kafkaUi = opt[Boolean](descr = "Enable Kafka UI", noshort = true)
+
+  val kafkaPort = opt[Int](
+descr = "Kafka port. If not specified then 9092 or some random free port 
(if 9092 is busy) would be used",
+noshort = true,
+required = false)
 
 Review comment:
   This is done on purpose. Port 9092 is more like a "preferred" port. If no 
explicit option is provided then system would try to start Kafka by default at 
9092. However if that port is busy then a random port would be selected. 
   
   This is done to ensure that Standalone OpenWhisk needs minimum required free 
port (for now only 3233). Rest all services can be started at any port and then 
dependent service would be configured as per randomly selected port
   
   If a user explicitly provides a port like `--kafka-port 9010` then Kafka 
would be started at that port. If the port is busy then Kafka would fail to 
start. This support is mostly meant for test which would like to know where 
Kafka would start


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [openwhisk] chetanmeh commented on a change in pull request #4628: Embedded Kafka support in OpenWhisk Standalone mode

2019-09-19 Thread GitBox
chetanmeh commented on a change in pull request #4628: Embedded Kafka support 
in OpenWhisk Standalone mode
URL: https://github.com/apache/openwhisk/pull/4628#discussion_r326184627
 
 

 ##
 File path: 
core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala
 ##
 @@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.standalone
+
+import java.io.File
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import kafka.server.KafkaConfig
+import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
+import org.apache.commons.io.FileUtils
+import org.apache.openwhisk.common.{Logging, TransactionId}
+import org.apache.openwhisk.core.WhiskConfig
+import org.apache.openwhisk.core.WhiskConfig.kafkaHosts
+import org.apache.openwhisk.core.entity.ControllerInstanceId
+import org.apache.openwhisk.core.loadBalancer.{LeanBalancer, LoadBalancer, 
LoadBalancerProvider}
+import 
org.apache.openwhisk.standalone.StandaloneDockerSupport.{checkOrAllocatePort, 
containerName, createRunCmd}
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.reflect.io.Directory
+import scala.util.Try
+
+class KafkaLauncher(docker: StandaloneDockerClient,
+kafkaPort: Int,
+kafkaDockerPort: Int,
+zkPort: Int,
+workDir: File,
+kafkaUi: Boolean)(implicit logging: Logging,
+  ec: ExecutionContext,
+  actorSystem: ActorSystem,
+  materializer: ActorMaterializer,
+  tid: TransactionId) {
+
+  def run(): Future[Seq[ServiceContainer]] = {
+for {
+  kafkaSvcs <- runKafka()
+  uiSvcs <- if (kafkaUi) runKafkaUI() else 
Future.successful(Seq.empty[ServiceContainer])
+} yield kafkaSvcs ++ uiSvcs
+  }
+
+  def runKafka(): Future[Seq[ServiceContainer]] = {
+
+//Below setting based on 
https://rmoff.net/2018/08/02/kafka-listeners-explained/
+// We configure two listeners where one is used for host based application 
and other is used for docker based application
+// to connect to Kafka server running on host
 
 Review comment:
   👍 Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [openwhisk] chetanmeh commented on a change in pull request #4628: Embedded Kafka support in OpenWhisk Standalone mode

2019-09-18 Thread GitBox
chetanmeh commented on a change in pull request #4628: Embedded Kafka support 
in OpenWhisk Standalone mode
URL: https://github.com/apache/openwhisk/pull/4628#discussion_r325713056
 
 

 ##
 File path: core/standalone/src/main/resources/logback-standalone.xml
 ##
 @@ -29,6 +29,8 @@
 
 
 
 
 Review comment:
   Yes


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services