Re: Spark streaming with Confluent kafka
The error is clear: Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config On Fri, 3 Jul 2020, 15:40 dwgw, wrote: > Hi > > I am trying to stream confluent kafka topic in the spark shell. For that i > have invoked spark shell using following command. > > # spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 > --conf > > "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf" > --driver-java-options > "-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf" --files > /home/spark/kafka_jaas.conf > > kafka_jaas.conf > - > > KafkaClient { > > org.apache.kafka.common.security.plain.PlainLoginModule required >username="XXX" >password="XXX"; > }; > > Readstream > - > > scala> val df = spark. > | readStream. > | format("kafka"). > | option("kafka.bootstrap.servers", "pkc-XXX.cloud:9092"). > | option("subscribe", "INTERNAL_VIS_R1227_CDC_ICX_SESSIONS"). > | option("kafka.sasl.mechanisms", "PLAIN"). > | option("kafka.security.protocol", "SASL_SSL"). > | option("startingOffsets", "earliest"). > | load. > | select($"value".cast("string").alias("value")) > df: org.apache.spark.sql.DataFrame = [value: string] > > Writestream > -- > > scala> df.writeStream. > | format("console"). > | outputMode("append"). > | trigger(Trigger.ProcessingTime("20 seconds")). > | start > 2020-07-03 04:07:48,366 WARN streaming.StreamingQueryManager: Temporary > checkpoint location created which is deleted normally when the query didn't > fail: /tmp/temporary-897996c3-a86a-4b7d-ac19-62168a14d279. If it's required > to delete it under any circumstances, please set > spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to > know deleting temp checkpoint folder is best effort. > res0: org.apache.spark.sql.streaming.StreamingQuery = > org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@324a5741 > > scala> 2020-07-03 04:07:49,139 WARN kafka010.KafkaOffsetReader: Error in > attempt 1 getting Kafka offsets: > org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > at > > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:820) > at > > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:631) > at > > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:612) > at > > org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:76) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:88) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:538) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:600) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at > > org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:599) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:536) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:567) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReader.scala:536) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchEarliestOffsets(KafkaOffsetReader.scala:298) > at > > org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:151) > at scala.Option.getOrElse(Option.scala:189) > at > > org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:148) > at > > org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:76) > at > > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:378) > at scala.Option.getOrElse(Option.scala:189) > at > > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:378) > at > > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352) > at > > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) > at > > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) > at > > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:371) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:128) > at
Spark streaming with Confluent kafka
Hi I am trying to stream confluent kafka topic in the spark shell. For that i have invoked spark shell using following command. # spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf" --driver-java-options "-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf" --files /home/spark/kafka_jaas.conf kafka_jaas.conf - KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="XXX" password="XXX"; }; Readstream - scala> val df = spark. | readStream. | format("kafka"). | option("kafka.bootstrap.servers", "pkc-XXX.cloud:9092"). | option("subscribe", "INTERNAL_VIS_R1227_CDC_ICX_SESSIONS"). | option("kafka.sasl.mechanisms", "PLAIN"). | option("kafka.security.protocol", "SASL_SSL"). | option("startingOffsets", "earliest"). | load. | select($"value".cast("string").alias("value")) df: org.apache.spark.sql.DataFrame = [value: string] Writestream -- scala> df.writeStream. | format("console"). | outputMode("append"). | trigger(Trigger.ProcessingTime("20 seconds")). | start 2020-07-03 04:07:48,366 WARN streaming.StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-897996c3-a86a-4b7d-ac19-62168a14d279. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort. res0: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@324a5741 scala> 2020-07-03 04:07:49,139 WARN kafka010.KafkaOffsetReader: Error in attempt 1 getting Kafka offsets: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:820) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:631) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:612) at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:76) at org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:88) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:538) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:600) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) at org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:599) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:536) at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:567) at org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReader.scala:536) at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchEarliestOffsets(KafkaOffsetReader.scala:298) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:151) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:148) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:76) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:378) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:378) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:371) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.immutable.Map$Map1.foreach(Map.scala:128) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:368) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at
Re: Spark Streaming with Confluent
Hi Arkadiusz, Try 'rooting' your import. It looks like the import is being interpreted as being relative to the previous. 'rooting; is done by adding the '_root_' prefix to your import: import org.apache.spark.streaming.kafka.KafkaUtils import _root_.io.confluent.kafka.serializers.KafkaAvroDecoder kr, Gerard. On Wed, Dec 13, 2017 at 6:05 PM, Arkadiusz Biczwrote: > Hi, > > I try to test spark streaming 2.2.0 version with confluent 3.3.0 > > I have got lot of error during compilation this is my sbt: > > lazy val sparkstreaming = (project in file(".")) > .settings( > name := "sparkstreaming", > organization := "org.arek", > version := "0.1-SNAPSHOT", > scalaVersion := "2.11.8", > libraryDependencies ++= Seq( > "org.apache.spark" %% "spark-streaming" % "2.2.0", > "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.2.0", > "io.confluent" % "kafka-avro-serializer" % "3.3.0" > ) > ) > > > import org.apache.spark._ > import org.apache.spark.streaming._ > import org.apache.spark.streaming.kafka.KafkaUtils > import io.confluent.kafka.serializers.KafkaAvroDecoder > > object Transformation extends Serializable { > > def main(args: Array[String]) = { > val conf = new SparkConf().setAppName("StreamingTranformation"). > setMaster("local[*]") > val streamingContext = new StreamingContext(conf, Seconds(1)) > > val kafkaParams = Map[String, String]("metadata.broker.list" -> > "local:2181", > "schema.registry.url" -> "http://local:8081;, > "auto.offset.reset" -> "smallest") > > val topicSet = Set("GEXPPROD_ROUTE") > val messages = KafkaUtils.createDirectStream[Object, Object, > KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams, > topicSet).map(_._2) > > val lines = messages.foreachRDD(rdd => { > rdd.foreach({ avroRecord => > println(avroRecord) > }) > }) > } > > > [warn] Found version conflict(s) in library dependencies; some are > suspected to be binary incompatible: > [warn] * io.netty:netty:3.9.9.Final is selected over {3.6.2.Final, > 3.7.0.Final} > [warn] +- org.apache.spark:spark-core_2.11:2.2.0 > (depends on 3.7.0.Final) > [warn] +- org.apache.zookeeper:zookeeper:3.4.8 > (depends on 3.7.0.Final) > [warn] +- org.apache.zookeeper:zookeeper:3.4.6 > (depends on 3.6.2.Final) > [warn] +- org.apache.hadoop:hadoop-hdfs:2.6.5 > (depends on 3.6.2.Final) > [warn] * commons-net:commons-net:2.2 is selected over 3.1 > [warn] +- org.apache.spark:spark-core_2.11:2.2.0 > (depends on 3.1) > [warn] +- org.apache.hadoop:hadoop-common:2.6.5 > (depends on 3.1) > [warn] * com.google.guava:guava:11.0.2 is selected over {12.0.1, 16.0.1} > [warn] +- org.apache.hadoop:hadoop-yarn-client:2.6.5 > (depends on 11.0.2) > [warn] +- org.apache.hadoop:hadoop-yarn-api:2.6.5 > (depends on 11.0.2) > [warn] +- org.apache.hadoop:hadoop-yarn-common:2.6.5 > (depends on 11.0.2) > [warn] +- org.apache.hadoop:hadoop-yarn-server-nodemanager:2.6.5 > (depends on 11.0.2) > [warn] +- org.apache.hadoop:hadoop-common:2.6.5 > (depends on 11.0.2) > [warn] +- org.apache.hadoop:hadoop-yarn-server-common:2.6.5 > (depends on 11.0.2) > [warn] +- org.apache.hadoop:hadoop-hdfs:2.6.5 > (depends on 11.0.2) > [warn] +- org.apache.curator:curator-framework:2.6.0 > (depends on 16.0.1) > [warn] +- org.apache.curator:curator-client:2.6.0 > (depends on 16.0.1) > [warn] +- org.apache.curator:curator-recipes:2.6.0 > (depends on 16.0.1) > [warn] +- org.htrace:htrace-core:3.0.4 (depends > on 12.0.1) > [warn] Run 'evicted' to see detailed eviction warnings > [info] Compiling 1 Scala source to /home/adminuser/data- > streaming-platform/sparkstreaming/target/scala-2.11/classes ... > [error] /home/adminuser/data-streaming-platform/ > sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:6:11: > object confluent is not a member of package org.apache.spark.io > [error] import io.confluent.kafka.serializers.KafkaAvroDecoder > [error] ^ > [error] /home/adminuser/data-streaming-platform/ > sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:19:66: > not found: type KafkaAvroDecoder > [error] val messages = KafkaUtils.createDirectStream[Object, Object, > KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams, > topicSet).map(_._2) > [error] ^ > [error] /home/adminuser/data-streaming-platform/ > sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:19:84: > not found: type KafkaAvroDecoder > [error] val messages = KafkaUtils.createDirectStream[Object, Object, > KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams, > topicSet).map(_._2) > [error] > > > When changing to library "org.apache.spark" %% > "spark-streaming-kafka-0-10" % "2.2.0" : > > > [warn]
Spark Streaming with Confluent
Hi, I try to test spark streaming 2.2.0 version with confluent 3.3.0 I have got lot of error during compilation this is my sbt: lazy val sparkstreaming = (project in file(".")) .settings( name := "sparkstreaming", organization := "org.arek", version := "0.1-SNAPSHOT", scalaVersion := "2.11.8", libraryDependencies ++= Seq( "org.apache.spark" %% "spark-streaming" % "2.2.0", "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.2.0", "io.confluent" % "kafka-avro-serializer" % "3.3.0" ) ) import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka.KafkaUtils import io.confluent.kafka.serializers.KafkaAvroDecoder object Transformation extends Serializable { def main(args: Array[String]) = { val conf = new SparkConf().setAppName("StreamingTranformation").setMaster("local[*]") val streamingContext = new StreamingContext(conf, Seconds(1)) val kafkaParams = Map[String, String]("metadata.broker.list" -> "local:2181", "schema.registry.url" -> "http://local:8081;, "auto.offset.reset" -> "smallest") val topicSet = Set("GEXPPROD_ROUTE") val messages = KafkaUtils.createDirectStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams, topicSet).map(_._2) val lines = messages.foreachRDD(rdd => { rdd.foreach({ avroRecord => println(avroRecord) }) }) } [warn] Found version conflict(s) in library dependencies; some are suspected to be binary incompatible: [warn] * io.netty:netty:3.9.9.Final is selected over {3.6.2.Final, 3.7.0.Final} [warn] +- org.apache.spark:spark-core_2.11:2.2.0 (depends on 3.7.0.Final) [warn] +- org.apache.zookeeper:zookeeper:3.4.8 (depends on 3.7.0.Final) [warn] +- org.apache.zookeeper:zookeeper:3.4.6 (depends on 3.6.2.Final) [warn] +- org.apache.hadoop:hadoop-hdfs:2.6.5(depends on 3.6.2.Final) [warn] * commons-net:commons-net:2.2 is selected over 3.1 [warn] +- org.apache.spark:spark-core_2.11:2.2.0 (depends on 3.1) [warn] +- org.apache.hadoop:hadoop-common:2.6.5 (depends on 3.1) [warn] * com.google.guava:guava:11.0.2 is selected over {12.0.1, 16.0.1} [warn] +- org.apache.hadoop:hadoop-yarn-client:2.6.5 (depends on 11.0.2) [warn] +- org.apache.hadoop:hadoop-yarn-api:2.6.5(depends on 11.0.2) [warn] +- org.apache.hadoop:hadoop-yarn-common:2.6.5 (depends on 11.0.2) [warn] +- org.apache.hadoop:hadoop-yarn-server-nodemanager:2.6.5 (depends on 11.0.2) [warn] +- org.apache.hadoop:hadoop-common:2.6.5 (depends on 11.0.2) [warn] +- org.apache.hadoop:hadoop-yarn-server-common:2.6.5 (depends on 11.0.2) [warn] +- org.apache.hadoop:hadoop-hdfs:2.6.5(depends on 11.0.2) [warn] +- org.apache.curator:curator-framework:2.6.0 (depends on 16.0.1) [warn] +- org.apache.curator:curator-client:2.6.0(depends on 16.0.1) [warn] +- org.apache.curator:curator-recipes:2.6.0 (depends on 16.0.1) [warn] +- org.htrace:htrace-core:3.0.4 (depends on 12.0.1) [warn] Run 'evicted' to see detailed eviction warnings [info] Compiling 1 Scala source to /home/adminuser/data-streaming-platform/sparkstreaming/target/scala-2.11/classes ... [error] /home/adminuser/data-streaming-platform/sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:6:11: object confluent is not a member of package org.apache.spark.io [error] import io.confluent.kafka.serializers.KafkaAvroDecoder [error] ^ [error] /home/adminuser/data-streaming-platform/sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:19:66: not found: type KafkaAvroDecoder [error] val messages = KafkaUtils.createDirectStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams, topicSet).map(_._2) [error] ^ [error] /home/adminuser/data-streaming-platform/sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:19:84: not found: type KafkaAvroDecoder [error] val messages = KafkaUtils.createDirectStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams, topicSet).map(_._2) [error] When changing to library "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.2.0" : [warn] Found version conflict(s) in library dependencies; some are suspected to be binary incompatible: [warn] * com.101tec:zkclient:0.10 is selected over 0.8 [warn] +- io.confluent:common-utils:3.3.0(depends on 0.10) [warn] +- org.apache.kafka:kafka_2.11:0.10.0.1 (depends on 0.8) [warn] * io.netty:netty:3.9.9.Final is selected over {3.6.2.Final, 3.7.0.Final} [warn] +- org.apache.spark:spark-core_2.11:2.2.0