[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...

2018-08-03 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/21955#discussion_r207664852
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
 ---
@@ -109,7 +109,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
   brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
   server = new KafkaServer(brokerConf)
   server.startup()
-  brokerPort = server.boundPort()
+  brokerPort = server.boundPort(brokerConf.interBrokerListenerName)
--- End diff --

Isn't the test hanging on the line right before that change though?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...

2018-08-03 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21955#discussion_r207663151
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
 ---
@@ -109,7 +109,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
   brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
   server = new KafkaServer(brokerConf)
   server.startup()
-  brokerPort = server.boundPort()
+  brokerPort = server.boundPort(brokerConf.interBrokerListenerName)
--- End diff --

In particular @zsxwing , regarding the failure below, I wonder if my change 
on this line really wasn't quite equivalent. Is it possible it needs to be 
"PLAINTEXT" like in yours?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...

2018-08-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21955


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...

2018-08-02 Thread koeninger
Github user koeninger commented on a diff in the pull request:

https://github.com/apache/spark/pull/21955#discussion_r207422889
  
--- Diff: external/kafka-0-10/pom.xml ---
@@ -28,7 +28,7 @@
   spark-streaming-kafka-0-10_2.11
   
 streaming-kafka-0-10
-0.10.0.1
+2.0.0
   
   jar
   Spark Integration for Kafka 0.10
--- End diff --

Probably worth updating the name to indicate it's for brokers version 0.10 +


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...

2018-08-02 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/21955#discussion_r207394183
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
 ---
@@ -109,7 +109,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
   brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
   server = new KafkaServer(brokerConf)
   server.startup()
-  brokerPort = server.boundPort()
+  brokerPort = server.boundPort(brokerConf.interBrokerListenerName)
--- End diff --

> @zsxwing comparing to your change... should this be "PLAINTEXT"?

I think both fine.

> default.api.timeout.ms -> 3000

You don't need to add `default.api.timeout.ms` to `brokerConfiguration`. 
It's a client configuration. I think you don't need to add this to any place. 
DStreams Kafka doesn't have tests requiring this config.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...

2018-08-02 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21955#discussion_r207392624
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
 ---
@@ -109,7 +109,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
   brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
   server = new KafkaServer(brokerConf)
   server.startup()
-  brokerPort = server.boundPort()
+  brokerPort = server.boundPort(brokerConf.interBrokerListenerName)
--- End diff --

@zsxwing comparing to your change... should this be "PLAINTEXT"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...

2018-08-02 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21955#discussion_r207392910
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
 ---
@@ -109,7 +109,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
   brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
   server = new KafkaServer(brokerConf)
   server.startup()
-  brokerPort = server.boundPort()
+  brokerPort = server.boundPort(brokerConf.interBrokerListenerName)
--- End diff --

And then in `brokerConfiguration` below, I need to add something like these 
props?

offsets.topic.replication.factor -> 1
group.initial.rebalance.delay.ms -> 10
default.api.timeout.ms -> 3000


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...

2018-08-02 Thread ijuma
Github user ijuma commented on a diff in the pull request:

https://github.com/apache/spark/pull/21955#discussion_r207318249
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
 ---
@@ -72,33 +72,39 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
   private def compactLogs(topic: String, partition: Int, messages: 
Array[(String, String)]) {
 val mockTime = new MockTime()
-// LogCleaner in 0.10 version of Kafka is still expecting the old 
TopicAndPartition api
-val logs = new Pool[TopicAndPartition, Log]()
+val logs = new Pool[TopicPartition, Log]()
 val logDir = kafkaTestUtils.brokerLogDir
 val dir = new File(logDir, topic + "-" + partition)
 dir.mkdirs()
 val logProps = new ju.Properties()
 logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
 logProps.put(LogConfig.MinCleanableDirtyRatioProp, 
java.lang.Float.valueOf(0.1f))
+// TODO is this new Log declaration correct?
+val logDirFailureChannel = new LogDirFailureChannel(0)
 val log = new Log(
   dir,
   LogConfig(logProps),
   0L,
+  0L,
   mockTime.scheduler,
-  mockTime
+  new BrokerTopicStats(),
+  mockTime,
+  Int.MaxValue,
+  Int.MaxValue,
+  new TopicPartition(topic, partition),
+  new ProducerStateManager(new TopicPartition(topic, partition), dir),
+  logDirFailureChannel
 )
 messages.foreach { case (k, v) =>
-  val msg = new ByteBufferMessageSet(
-NoCompressionCodec,
-new Message(v.getBytes, k.getBytes, Message.NoTimestamp, 
Message.CurrentMagicValue))
-  log.append(msg)
+  val records = new MemoryRecords()
--- End diff --

```java
public static MemoryRecords withRecords(CompressionType compressionType, 
SimpleRecord... records) {
```
Maybe you can use the above?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...

2018-08-02 Thread ijuma
Github user ijuma commented on a diff in the pull request:

https://github.com/apache/spark/pull/21955#discussion_r207317252
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
 ---
@@ -72,31 +72,37 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
   private def compactLogs(topic: String, partition: Int, messages: 
Array[(String, String)]) {
 val mockTime = new MockTime()
-// LogCleaner in 0.10 version of Kafka is still expecting the old 
TopicAndPartition api
-val logs = new Pool[TopicAndPartition, Log]()
+val logs = new Pool[TopicPartition, Log]()
 val logDir = kafkaTestUtils.brokerLogDir
 val dir = new File(logDir, topic + "-" + partition)
 dir.mkdirs()
 val logProps = new ju.Properties()
 logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
 logProps.put(LogConfig.MinCleanableDirtyRatioProp, 
java.lang.Float.valueOf(0.1f))
+// TODO is this new Log declaration correct?
+val logDirFailureChannel = new LogDirFailureChannel(0)
--- End diff --

This should be `1` if we're assuming a single log directory.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...

2018-08-02 Thread ijuma
Github user ijuma commented on a diff in the pull request:

https://github.com/apache/spark/pull/21955#discussion_r207316897
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
 ---
@@ -72,31 +72,37 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
   private def compactLogs(topic: String, partition: Int, messages: 
Array[(String, String)]) {
 val mockTime = new MockTime()
-// LogCleaner in 0.10 version of Kafka is still expecting the old 
TopicAndPartition api
-val logs = new Pool[TopicAndPartition, Log]()
+val logs = new Pool[TopicPartition, Log]()
 val logDir = kafkaTestUtils.brokerLogDir
 val dir = new File(logDir, topic + "-" + partition)
 dir.mkdirs()
 val logProps = new ju.Properties()
 logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
 logProps.put(LogConfig.MinCleanableDirtyRatioProp, 
java.lang.Float.valueOf(0.1f))
+// TODO is this new Log declaration correct?
+val logDirFailureChannel = new LogDirFailureChannel(0)
 val log = new Log(
   dir,
   LogConfig(logProps),
   0L,
+  0L,
   mockTime.scheduler,
-  mockTime
+  new BrokerTopicStats(),
+  mockTime,
+  Int.MaxValue,
+  Int.MaxValue,
+  new TopicPartition(topic, partition),
+  new ProducerStateManager(new TopicPartition(topic, partition), dir),
+  logDirFailureChannel
 )
 messages.foreach { case (k, v) =>
-  val msg = new ByteBufferMessageSet(
-NoCompressionCodec,
-new Message(v.getBytes, k.getBytes, Message.NoTimestamp, 
Message.CurrentMagicValue))
-  log.append(msg)
--- End diff --

there is `Log.appendAsLeader` and `Log.appendAsFollower` depending on your 
goal here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...

2018-08-01 Thread srowen
GitHub user srowen opened a pull request:

https://github.com/apache/spark/pull/21955

[SPARK-18057][FOLLOW-UP][SS] Update Kafka client version from 0.10.0.1 to 
2.0.0


## What changes were proposed in this pull request?

Update to kafka 2.0.0 in streaming-kafka module, and remove override for 
Scala 2.12. It won't compile for 2.12 otherwise.

## How was this patch tested?

Existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/srowen/spark SPARK-18057.2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21955.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21955


commit b82db04a2601405f58c796e75b9e10bf6d2e6bdf
Author: Sean Owen 
Date:   2018-08-02T03:18:39Z

Update to kafka 2.0.0 in streaming-kafka module, and remove override for 
Scala 2.12




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org