[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/21955#discussion_r207664852 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala --- @@ -109,7 +109,7 @@ private[kafka010] class KafkaTestUtils extends Logging { brokerConf = new KafkaConfig(brokerConfiguration, doLog = false) server = new KafkaServer(brokerConf) server.startup() - brokerPort = server.boundPort() + brokerPort = server.boundPort(brokerConf.interBrokerListenerName) --- End diff -- Isn't the test hanging on the line right before that change though? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21955#discussion_r207663151 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala --- @@ -109,7 +109,7 @@ private[kafka010] class KafkaTestUtils extends Logging { brokerConf = new KafkaConfig(brokerConfiguration, doLog = false) server = new KafkaServer(brokerConf) server.startup() - brokerPort = server.boundPort() + brokerPort = server.boundPort(brokerConf.interBrokerListenerName) --- End diff -- In particular @zsxwing , regarding the failure below, I wonder if my change on this line really wasn't quite equivalent. Is it possible it needs to be "PLAINTEXT" like in yours? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21955 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...
Github user koeninger commented on a diff in the pull request: https://github.com/apache/spark/pull/21955#discussion_r207422889 --- Diff: external/kafka-0-10/pom.xml --- @@ -28,7 +28,7 @@ spark-streaming-kafka-0-10_2.11 streaming-kafka-0-10 -0.10.0.1 +2.0.0 jar Spark Integration for Kafka 0.10 --- End diff -- Probably worth updating the name to indicate it's for brokers version 0.10 + --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/21955#discussion_r207394183 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala --- @@ -109,7 +109,7 @@ private[kafka010] class KafkaTestUtils extends Logging { brokerConf = new KafkaConfig(brokerConfiguration, doLog = false) server = new KafkaServer(brokerConf) server.startup() - brokerPort = server.boundPort() + brokerPort = server.boundPort(brokerConf.interBrokerListenerName) --- End diff -- > @zsxwing comparing to your change... should this be "PLAINTEXT"? I think both fine. > default.api.timeout.ms -> 3000 You don't need to add `default.api.timeout.ms` to `brokerConfiguration`. It's a client configuration. I think you don't need to add this to any place. DStreams Kafka doesn't have tests requiring this config. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21955#discussion_r207392624 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala --- @@ -109,7 +109,7 @@ private[kafka010] class KafkaTestUtils extends Logging { brokerConf = new KafkaConfig(brokerConfiguration, doLog = false) server = new KafkaServer(brokerConf) server.startup() - brokerPort = server.boundPort() + brokerPort = server.boundPort(brokerConf.interBrokerListenerName) --- End diff -- @zsxwing comparing to your change... should this be "PLAINTEXT"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21955#discussion_r207392910 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala --- @@ -109,7 +109,7 @@ private[kafka010] class KafkaTestUtils extends Logging { brokerConf = new KafkaConfig(brokerConfiguration, doLog = false) server = new KafkaServer(brokerConf) server.startup() - brokerPort = server.boundPort() + brokerPort = server.boundPort(brokerConf.interBrokerListenerName) --- End diff -- And then in `brokerConfiguration` below, I need to add something like these props? offsets.topic.replication.factor -> 1 group.initial.rebalance.delay.ms -> 10 default.api.timeout.ms -> 3000 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...
Github user ijuma commented on a diff in the pull request: https://github.com/apache/spark/pull/21955#discussion_r207318249 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala --- @@ -72,33 +72,39 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]) { val mockTime = new MockTime() -// LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api -val logs = new Pool[TopicAndPartition, Log]() +val logs = new Pool[TopicPartition, Log]() val logDir = kafkaTestUtils.brokerLogDir val dir = new File(logDir, topic + "-" + partition) dir.mkdirs() val logProps = new ju.Properties() logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) logProps.put(LogConfig.MinCleanableDirtyRatioProp, java.lang.Float.valueOf(0.1f)) +// TODO is this new Log declaration correct? +val logDirFailureChannel = new LogDirFailureChannel(0) val log = new Log( dir, LogConfig(logProps), 0L, + 0L, mockTime.scheduler, - mockTime + new BrokerTopicStats(), + mockTime, + Int.MaxValue, + Int.MaxValue, + new TopicPartition(topic, partition), + new ProducerStateManager(new TopicPartition(topic, partition), dir), + logDirFailureChannel ) messages.foreach { case (k, v) => - val msg = new ByteBufferMessageSet( -NoCompressionCodec, -new Message(v.getBytes, k.getBytes, Message.NoTimestamp, Message.CurrentMagicValue)) - log.append(msg) + val records = new MemoryRecords() --- End diff -- ```java public static MemoryRecords withRecords(CompressionType compressionType, SimpleRecord... records) { ``` Maybe you can use the above? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...
Github user ijuma commented on a diff in the pull request: https://github.com/apache/spark/pull/21955#discussion_r207317252 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala --- @@ -72,31 +72,37 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]) { val mockTime = new MockTime() -// LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api -val logs = new Pool[TopicAndPartition, Log]() +val logs = new Pool[TopicPartition, Log]() val logDir = kafkaTestUtils.brokerLogDir val dir = new File(logDir, topic + "-" + partition) dir.mkdirs() val logProps = new ju.Properties() logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) logProps.put(LogConfig.MinCleanableDirtyRatioProp, java.lang.Float.valueOf(0.1f)) +// TODO is this new Log declaration correct? +val logDirFailureChannel = new LogDirFailureChannel(0) --- End diff -- This should be `1` if we're assuming a single log directory. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...
Github user ijuma commented on a diff in the pull request: https://github.com/apache/spark/pull/21955#discussion_r207316897 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala --- @@ -72,31 +72,37 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]) { val mockTime = new MockTime() -// LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api -val logs = new Pool[TopicAndPartition, Log]() +val logs = new Pool[TopicPartition, Log]() val logDir = kafkaTestUtils.brokerLogDir val dir = new File(logDir, topic + "-" + partition) dir.mkdirs() val logProps = new ju.Properties() logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) logProps.put(LogConfig.MinCleanableDirtyRatioProp, java.lang.Float.valueOf(0.1f)) +// TODO is this new Log declaration correct? +val logDirFailureChannel = new LogDirFailureChannel(0) val log = new Log( dir, LogConfig(logProps), 0L, + 0L, mockTime.scheduler, - mockTime + new BrokerTopicStats(), + mockTime, + Int.MaxValue, + Int.MaxValue, + new TopicPartition(topic, partition), + new ProducerStateManager(new TopicPartition(topic, partition), dir), + logDirFailureChannel ) messages.foreach { case (k, v) => - val msg = new ByteBufferMessageSet( -NoCompressionCodec, -new Message(v.getBytes, k.getBytes, Message.NoTimestamp, Message.CurrentMagicValue)) - log.append(msg) --- End diff -- there is `Log.appendAsLeader` and `Log.appendAsFollower` depending on your goal here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21955: [SPARK-18057][FOLLOW-UP][SS] Update Kafka client ...
GitHub user srowen opened a pull request: https://github.com/apache/spark/pull/21955 [SPARK-18057][FOLLOW-UP][SS] Update Kafka client version from 0.10.0.1 to 2.0.0 ## What changes were proposed in this pull request? Update to kafka 2.0.0 in streaming-kafka module, and remove override for Scala 2.12. It won't compile for 2.12 otherwise. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/srowen/spark SPARK-18057.2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21955.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21955 commit b82db04a2601405f58c796e75b9e10bf6d2e6bdf Author: Sean Owen Date: 2018-08-02T03:18:39Z Update to kafka 2.0.0 in streaming-kafka module, and remove override for Scala 2.12 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org