[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...

2018-07-18 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/21488#discussion_r203561847
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -115,7 +116,7 @@ private[kafka010] class KafkaOffsetReader(
   def fetchTopicPartitions(): Set[TopicPartition] = runUninterruptibly {
 assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
 // Poll to get the latest assigned partitions
-consumer.poll(0)
+consumer.poll(JDuration.ofMillis(0))
--- End diff --

That's a good point. However, supporting all these versions are pretty 
cheap for Spark right now. Spark is using only APIs in 0.10. In addition, if 
the Kafka client version we pick up here has some critical issue, the user can 
just switch to an old version.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...

2018-07-17 Thread ijuma
Github user ijuma commented on a diff in the pull request:

https://github.com/apache/spark/pull/21488#discussion_r203256766
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -115,7 +116,7 @@ private[kafka010] class KafkaOffsetReader(
   def fetchTopicPartitions(): Set[TopicPartition] = runUninterruptibly {
 assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
 // Poll to get the latest assigned partitions
-consumer.poll(0)
+consumer.poll(JDuration.ofMillis(0))
--- End diff --

@zsxwing Why do you want to support Kafka clients jars from 0.10 to 2.0? 
Since newer clients jars support older brokers, we recommend people use the 
latest Kafka clients jar whenever possible.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...

2018-07-17 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/21488#discussion_r203109788
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -115,7 +116,7 @@ private[kafka010] class KafkaOffsetReader(
   def fetchTopicPartitions(): Set[TopicPartition] = runUninterruptibly {
 assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
 // Poll to get the latest assigned partitions
-consumer.poll(0)
+consumer.poll(JDuration.ofMillis(0))
--- End diff --

@tedyu just realized this is `ofMillis` rather than `toMillis`. We 
definitely cannot use it as this `poll` overload doesn't exist in previous 
versions and we want to support Kafka versions from 0.10 to 2.0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...

2018-07-17 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21488#discussion_r203106522
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -115,7 +116,7 @@ private[kafka010] class KafkaOffsetReader(
   def fetchTopicPartitions(): Set[TopicPartition] = runUninterruptibly {
 assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
 // Poll to get the latest assigned partitions
-consumer.poll(0)
+consumer.poll(JDuration.ofMillis(0))
--- End diff --

Depending on the Kafka release we agree upon, I can revert.
Duration is recommended API for 2.0.0 release


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...

2018-07-17 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/21488#discussion_r203104176
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -115,7 +116,7 @@ private[kafka010] class KafkaOffsetReader(
   def fetchTopicPartitions(): Set[TopicPartition] = runUninterruptibly {
 assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
 // Poll to get the latest assigned partitions
-consumer.poll(0)
+consumer.poll(JDuration.ofMillis(0))
--- End diff --

Could you revert these changes? We don't use java.time.Duration in Spark.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...

2018-07-17 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/21488#discussion_r203103522
  
--- Diff: external/kafka-0-10-sql/pom.xml ---
@@ -74,6 +74,11 @@
   ${kafka.version}
   test
 
+
+  org.eclipse.jetty
--- End diff --

Where does this come from? Or it can be just a test dependency?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...

2018-06-06 Thread guozhangwang
Github user guozhangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/21488#discussion_r193574097
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 ---
@@ -203,7 +215,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, 
Object] = Map.empty) extends L
 
   /** Add new partitions to a Kafka topic */
   def addPartitions(topic: String, partitions: Int): Unit = {
-AdminUtils.addPartitions(zkUtils, topic, partitions)
+val existingAssignment = zkClient.getReplicaAssignmentForTopics(
+  collection.immutable.Set(topic)).map {
+case (topicPartition, replicas) => topicPartition.partition -> 
replicas
+}
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...

2018-06-06 Thread eric-maynard
Github user eric-maynard commented on a diff in the pull request:

https://github.com/apache/spark/pull/21488#discussion_r193549547
  
--- Diff: external/kafka-0-10-sql/pom.xml ---
@@ -29,7 +29,7 @@
   spark-sql-kafka-0-10_2.11
   
 sql-kafka-0-10
-0.10.0.1
+2.0.0-SNAPSHOT
   
   jar
   Kafka 0.10 Source for Structured Streaming
--- End diff --

We should change this line to reflect the change too


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...

2018-06-05 Thread ijuma
Github user ijuma commented on a diff in the pull request:

https://github.com/apache/spark/pull/21488#discussion_r192961847
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 ---
@@ -203,7 +215,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, 
Object] = Map.empty) extends L
 
   /** Add new partitions to a Kafka topic */
   def addPartitions(topic: String, partitions: Int): Unit = {
-AdminUtils.addPartitions(zkUtils, topic, partitions)
+val existingAssignment = zkClient.getReplicaAssignmentForTopics(
+  collection.immutable.Set(topic)).map {
+case (topicPartition, replicas) => topicPartition.partition -> 
replicas
+}
--- End diff --

We can get replica assignment information via AdminClient too. I think we 
should try to avoid the internal `ZkUtils` and `KafkaZkClient` as much as we 
can.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...

2018-06-03 Thread ijuma
Github user ijuma commented on a diff in the pull request:

https://github.com/apache/spark/pull/21488#discussion_r192602632
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 ---
@@ -96,10 +101,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, 
Object] = Map.empty) extends L
   // Set up the Embedded Zookeeper server and get the proper Zookeeper port
   private def setupEmbeddedZookeeper(): Unit = {
 // Zookeeper server startup
-zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
+val zkSvr = s"$zkHost:$zkPort";
+zookeeper = new EmbeddedZookeeper(zkSvr)
 // Get the actual zookeeper binding port
 zkPort = zookeeper.actualPort
-zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, 
zkConnectionTimeout, false)
+zkUtils = ZkUtils(zkSvr, zkSessionTimeout, zkConnectionTimeout, false)
+zkClient = KafkaZkClient(zkSvr, false, 6000, 1, Int.MaxValue, 
Time.SYSTEM)
+adminZkClient = new AdminZkClient(zkClient)
--- End diff --

AdminClient.create gives you a concrete instance. createPartitions is the 
method you're looking for.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...

2018-06-03 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21488#discussion_r192601997
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 ---
@@ -96,10 +101,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, 
Object] = Map.empty) extends L
   // Set up the Embedded Zookeeper server and get the proper Zookeeper port
   private def setupEmbeddedZookeeper(): Unit = {
 // Zookeeper server startup
-zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
+val zkSvr = s"$zkHost:$zkPort";
+zookeeper = new EmbeddedZookeeper(zkSvr)
 // Get the actual zookeeper binding port
 zkPort = zookeeper.actualPort
-zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, 
zkConnectionTimeout, false)
+zkUtils = ZkUtils(zkSvr, zkSessionTimeout, zkConnectionTimeout, false)
+zkClient = KafkaZkClient(zkSvr, false, 6000, 1, Int.MaxValue, 
Time.SYSTEM)
+adminZkClient = new AdminZkClient(zkClient)
--- End diff --

AdminClient is abstract.
KafkaAdminClient doesn't provide addPartitions.

Mind giving some pointer ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...

2018-06-03 Thread ijuma
Github user ijuma commented on a diff in the pull request:

https://github.com/apache/spark/pull/21488#discussion_r192601219
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 ---
@@ -96,10 +101,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, 
Object] = Map.empty) extends L
   // Set up the Embedded Zookeeper server and get the proper Zookeeper port
   private def setupEmbeddedZookeeper(): Unit = {
 // Zookeeper server startup
-zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
+val zkSvr = s"$zkHost:$zkPort";
+zookeeper = new EmbeddedZookeeper(zkSvr)
 // Get the actual zookeeper binding port
 zkPort = zookeeper.actualPort
-zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, 
zkConnectionTimeout, false)
+zkUtils = ZkUtils(zkSvr, zkSessionTimeout, zkConnectionTimeout, false)
+zkClient = KafkaZkClient(zkSvr, false, 6000, 1, Int.MaxValue, 
Time.SYSTEM)
+adminZkClient = new AdminZkClient(zkClient)
--- End diff --

Can we use the Java AdminClient instead of these internal classes?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...

2018-06-03 Thread tedyu
GitHub user tedyu opened a pull request:

https://github.com/apache/spark/pull/21488

SPARK-18057 Update structured streaming kafka from 0.10.0.1 to 2.0.0

## What changes were proposed in this pull request?

This PR upgrades to the Kafka 2.0.0 release where KIP-266 is integrated.

## How was this patch tested?

This PR uses existing Kafka related unit tests

(Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tedyu/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21488.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21488


commit 0a22686d9a388a21d5dd38513854341d3f37f738
Author: tedyu 
Date:   2018-06-03T19:54:22Z

SPARK-18057 Update structured streaming kafka from 0.10.0.1 to 2.0.0




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org