Re: Spark streaming with Confluent kafka

2020-07-03 Thread Gabor Somogyi
The error is clear:
Caused by: java.lang.IllegalArgumentException: No serviceName defined in
either JAAS or Kafka config

On Fri, 3 Jul 2020, 15:40 dwgw,  wrote:

> Hi
>
> I am trying to stream confluent kafka topic in the spark shell. For that i
> have invoked spark shell using following command.
>
> # spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0
> --conf
>
> "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf"
> --driver-java-options
> "-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf" --files
> /home/spark/kafka_jaas.conf
>
> kafka_jaas.conf
> -
>
> KafkaClient {
>
>  org.apache.kafka.common.security.plain.PlainLoginModule required
>username="XXX"
>password="XXX";
> };
>
> Readstream
> -
>
> scala> val df = spark.
> | readStream.
> | format("kafka").
> | option("kafka.bootstrap.servers", "pkc-XXX.cloud:9092").
> | option("subscribe", "INTERNAL_VIS_R1227_CDC_ICX_SESSIONS").
> | option("kafka.sasl.mechanisms", "PLAIN").
> | option("kafka.security.protocol", "SASL_SSL").
> | option("startingOffsets", "earliest").
> | load.
> | select($"value".cast("string").alias("value"))
> df: org.apache.spark.sql.DataFrame = [value: string]
>
> Writestream
> --
>
> scala> df.writeStream.
> | format("console").
> | outputMode("append").
> | trigger(Trigger.ProcessingTime("20 seconds")).
> | start
> 2020-07-03 04:07:48,366 WARN streaming.StreamingQueryManager: Temporary
> checkpoint location created which is deleted normally when the query didn't
> fail: /tmp/temporary-897996c3-a86a-4b7d-ac19-62168a14d279. If it's required
> to delete it under any circumstances, please set
> spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to
> know deleting temp checkpoint folder is best effort.
> res0: org.apache.spark.sql.streaming.StreamingQuery =
> org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@324a5741
>
> scala> 2020-07-03 04:07:49,139 WARN kafka010.KafkaOffsetReader: Error in
> attempt 1 getting Kafka offsets:
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:820)
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:631)
> at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:612)
> at
>
> org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:76)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:88)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:538)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:600)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at
>
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:599)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:536)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:567)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReader.scala:536)
> at
>
> org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchEarliestOffsets(KafkaOffsetReader.scala:298)
> at
>
> org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:151)
> at scala.Option.getOrElse(Option.scala:189)
> at
>
> org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:148)
> at
>
> org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:76)
> at
>
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:378)
> at scala.Option.getOrElse(Option.scala:189)
> at
>
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:378)
> at
>
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
> at
>
> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
> at
>
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
> at
>
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:371)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
> at 

Spark streaming with Confluent kafka

2020-07-03 Thread dwgw
Hi

I am trying to stream confluent kafka topic in the spark shell. For that i
have invoked spark shell using following command.

# spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0
--conf
"spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf"
--driver-java-options
"-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf" --files
/home/spark/kafka_jaas.conf

kafka_jaas.conf
-

KafkaClient {
                     
 org.apache.kafka.common.security.plain.PlainLoginModule required
                       username="XXX"
                       password="XXX";
};

Readstream
-

scala> val df = spark.
| readStream.
| format("kafka").
| option("kafka.bootstrap.servers", "pkc-XXX.cloud:9092").
| option("subscribe", "INTERNAL_VIS_R1227_CDC_ICX_SESSIONS").
| option("kafka.sasl.mechanisms", "PLAIN").
| option("kafka.security.protocol", "SASL_SSL").
| option("startingOffsets", "earliest").
| load.
| select($"value".cast("string").alias("value"))
df: org.apache.spark.sql.DataFrame = [value: string]

Writestream
--

scala> df.writeStream.
| format("console").
| outputMode("append").
| trigger(Trigger.ProcessingTime("20 seconds")).
| start
2020-07-03 04:07:48,366 WARN streaming.StreamingQueryManager: Temporary
checkpoint location created which is deleted normally when the query didn't
fail: /tmp/temporary-897996c3-a86a-4b7d-ac19-62168a14d279. If it's required
to delete it under any circumstances, please set
spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to
know deleting temp checkpoint folder is best effort.
res0: org.apache.spark.sql.streaming.StreamingQuery =
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@324a5741

scala> 2020-07-03 04:07:49,139 WARN kafka010.KafkaOffsetReader: Error in
attempt 1 getting Kafka offsets:
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:820)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:631)
at
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:612)
at
org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:76)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:88)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:538)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:600)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:599)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:536)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:567)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReader.scala:536)
at
org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchEarliestOffsets(KafkaOffsetReader.scala:298)
at
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:151)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:148)
at
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:76)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:378)
at scala.Option.getOrElse(Option.scala:189)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:378)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:371)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:368)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at

Re: Spark Streaming with Confluent

2017-12-13 Thread Gerard Maas
Hi Arkadiusz,

Try 'rooting' your import. It looks like the import is being interpreted as
being relative to the previous.
'rooting; is done by adding the  '_root_'  prefix to your import:

import org.apache.spark.streaming.kafka.KafkaUtils
import _root_.io.confluent.kafka.serializers.KafkaAvroDecoder

kr, Gerard.

On Wed, Dec 13, 2017 at 6:05 PM, Arkadiusz Bicz 
wrote:

> Hi,
>
> I try to test spark streaming 2.2.0 version with confluent 3.3.0
>
> I have got lot of error during compilation this is my sbt:
>
> lazy val sparkstreaming = (project in file("."))
>   .settings(
>   name := "sparkstreaming",
>   organization := "org.arek",
>   version := "0.1-SNAPSHOT",
>   scalaVersion := "2.11.8",
> libraryDependencies ++=  Seq(
>   "org.apache.spark" %% "spark-streaming" % "2.2.0",
>   "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.2.0",
>   "io.confluent" % "kafka-avro-serializer" % "3.3.0"
> )
>   )
>
>
> import org.apache.spark._
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.kafka.KafkaUtils
> import io.confluent.kafka.serializers.KafkaAvroDecoder
>
> object Transformation extends Serializable {
>
>   def main(args: Array[String]) = {
> val conf = new SparkConf().setAppName("StreamingTranformation").
> setMaster("local[*]")
> val streamingContext = new StreamingContext(conf, Seconds(1))
>
> val kafkaParams = Map[String, String]("metadata.broker.list" ->
> "local:2181",
>   "schema.registry.url" -> "http://local:8081;,
>   "auto.offset.reset" -> "smallest")
>
> val topicSet = Set("GEXPPROD_ROUTE")
> val messages = KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
> topicSet).map(_._2)
>
> val lines = messages.foreachRDD(rdd => {
>   rdd.foreach({ avroRecord =>
> println(avroRecord)
>   })
> })
>   }
>
>
> [warn] Found version conflict(s) in library dependencies; some are
> suspected to be binary incompatible:
> [warn]  * io.netty:netty:3.9.9.Final is selected over {3.6.2.Final,
> 3.7.0.Final}
> [warn]  +- org.apache.spark:spark-core_2.11:2.2.0
>  (depends on 3.7.0.Final)
> [warn]  +- org.apache.zookeeper:zookeeper:3.4.8
>  (depends on 3.7.0.Final)
> [warn]  +- org.apache.zookeeper:zookeeper:3.4.6
>  (depends on 3.6.2.Final)
> [warn]  +- org.apache.hadoop:hadoop-hdfs:2.6.5
> (depends on 3.6.2.Final)
> [warn]  * commons-net:commons-net:2.2 is selected over 3.1
> [warn]  +- org.apache.spark:spark-core_2.11:2.2.0
>  (depends on 3.1)
> [warn]  +- org.apache.hadoop:hadoop-common:2.6.5
> (depends on 3.1)
> [warn]  * com.google.guava:guava:11.0.2 is selected over {12.0.1, 16.0.1}
> [warn]  +- org.apache.hadoop:hadoop-yarn-client:2.6.5
>  (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-yarn-api:2.6.5
> (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-yarn-common:2.6.5
>  (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-yarn-server-nodemanager:2.6.5
> (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-common:2.6.5
> (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-yarn-server-common:2.6.5
> (depends on 11.0.2)
> [warn]  +- org.apache.hadoop:hadoop-hdfs:2.6.5
> (depends on 11.0.2)
> [warn]  +- org.apache.curator:curator-framework:2.6.0
>  (depends on 16.0.1)
> [warn]  +- org.apache.curator:curator-client:2.6.0
> (depends on 16.0.1)
> [warn]  +- org.apache.curator:curator-recipes:2.6.0
>  (depends on 16.0.1)
> [warn]  +- org.htrace:htrace-core:3.0.4   (depends
> on 12.0.1)
> [warn] Run 'evicted' to see detailed eviction warnings
> [info] Compiling 1 Scala source to /home/adminuser/data-
> streaming-platform/sparkstreaming/target/scala-2.11/classes ...
> [error] /home/adminuser/data-streaming-platform/
> sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:6:11:
> object confluent is not a member of package org.apache.spark.io
> [error] import io.confluent.kafka.serializers.KafkaAvroDecoder
> [error]   ^
> [error] /home/adminuser/data-streaming-platform/
> sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:19:66:
> not found: type KafkaAvroDecoder
> [error] val messages = KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
> topicSet).map(_._2)
> [error]  ^
> [error] /home/adminuser/data-streaming-platform/
> sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:19:84:
> not found: type KafkaAvroDecoder
> [error] val messages = KafkaUtils.createDirectStream[Object, Object,
> KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
> topicSet).map(_._2)
> [error]
>
>
> When changing to library  "org.apache.spark" %%
> "spark-streaming-kafka-0-10" % "2.2.0" :
>
>
> [warn] 

Spark Streaming with Confluent

2017-12-13 Thread Arkadiusz Bicz
Hi,

I try to test spark streaming 2.2.0 version with confluent 3.3.0

I have got lot of error during compilation this is my sbt:

lazy val sparkstreaming = (project in file("."))
  .settings(
  name := "sparkstreaming",
  organization := "org.arek",
  version := "0.1-SNAPSHOT",
  scalaVersion := "2.11.8",
libraryDependencies ++=  Seq(
  "org.apache.spark" %% "spark-streaming" % "2.2.0",
  "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.2.0",
  "io.confluent" % "kafka-avro-serializer" % "3.3.0"
)
  )


import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
import io.confluent.kafka.serializers.KafkaAvroDecoder

object Transformation extends Serializable {

  def main(args: Array[String]) = {
val conf = new
SparkConf().setAppName("StreamingTranformation").setMaster("local[*]")
val streamingContext = new StreamingContext(conf, Seconds(1))

val kafkaParams = Map[String, String]("metadata.broker.list" ->
"local:2181",
  "schema.registry.url" -> "http://local:8081;,
  "auto.offset.reset" -> "smallest")

val topicSet = Set("GEXPPROD_ROUTE")
val messages = KafkaUtils.createDirectStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
topicSet).map(_._2)

val lines = messages.foreachRDD(rdd => {
  rdd.foreach({ avroRecord =>
println(avroRecord)
  })
})
  }


[warn] Found version conflict(s) in library dependencies; some are
suspected to be binary incompatible:
[warn]  * io.netty:netty:3.9.9.Final is selected over {3.6.2.Final,
3.7.0.Final}
[warn]  +- org.apache.spark:spark-core_2.11:2.2.0 (depends
on 3.7.0.Final)
[warn]  +- org.apache.zookeeper:zookeeper:3.4.8   (depends
on 3.7.0.Final)
[warn]  +- org.apache.zookeeper:zookeeper:3.4.6   (depends
on 3.6.2.Final)
[warn]  +- org.apache.hadoop:hadoop-hdfs:2.6.5(depends
on 3.6.2.Final)
[warn]  * commons-net:commons-net:2.2 is selected over 3.1
[warn]  +- org.apache.spark:spark-core_2.11:2.2.0 (depends
on 3.1)
[warn]  +- org.apache.hadoop:hadoop-common:2.6.5  (depends
on 3.1)
[warn]  * com.google.guava:guava:11.0.2 is selected over {12.0.1, 16.0.1}
[warn]  +- org.apache.hadoop:hadoop-yarn-client:2.6.5 (depends
on 11.0.2)
[warn]  +- org.apache.hadoop:hadoop-yarn-api:2.6.5(depends
on 11.0.2)
[warn]  +- org.apache.hadoop:hadoop-yarn-common:2.6.5 (depends
on 11.0.2)
[warn]  +- org.apache.hadoop:hadoop-yarn-server-nodemanager:2.6.5
(depends on 11.0.2)
[warn]  +- org.apache.hadoop:hadoop-common:2.6.5  (depends
on 11.0.2)
[warn]  +- org.apache.hadoop:hadoop-yarn-server-common:2.6.5  (depends
on 11.0.2)
[warn]  +- org.apache.hadoop:hadoop-hdfs:2.6.5(depends
on 11.0.2)
[warn]  +- org.apache.curator:curator-framework:2.6.0 (depends
on 16.0.1)
[warn]  +- org.apache.curator:curator-client:2.6.0(depends
on 16.0.1)
[warn]  +- org.apache.curator:curator-recipes:2.6.0   (depends
on 16.0.1)
[warn]  +- org.htrace:htrace-core:3.0.4   (depends
on 12.0.1)
[warn] Run 'evicted' to see detailed eviction warnings
[info] Compiling 1 Scala source to
/home/adminuser/data-streaming-platform/sparkstreaming/target/scala-2.11/classes
...
[error]
/home/adminuser/data-streaming-platform/sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:6:11:
object confluent is not a member of package org.apache.spark.io
[error] import io.confluent.kafka.serializers.KafkaAvroDecoder
[error]   ^
[error]
/home/adminuser/data-streaming-platform/sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:19:66:
not found: type KafkaAvroDecoder
[error] val messages = KafkaUtils.createDirectStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
topicSet).map(_._2)
[error]  ^
[error]
/home/adminuser/data-streaming-platform/sparkstreaming/src/main/scala/com/arek/streaming/spark/Transformation.scala:19:84:
not found: type KafkaAvroDecoder
[error] val messages = KafkaUtils.createDirectStream[Object, Object,
KafkaAvroDecoder, KafkaAvroDecoder](streamingContext, kafkaParams,
topicSet).map(_._2)
[error]


When changing to library  "org.apache.spark" %%
"spark-streaming-kafka-0-10" % "2.2.0" :


[warn] Found version conflict(s) in library dependencies; some are
suspected to be binary incompatible:
[warn]  * com.101tec:zkclient:0.10 is selected over 0.8
[warn]  +- io.confluent:common-utils:3.3.0(depends
on 0.10)
[warn]  +- org.apache.kafka:kafka_2.11:0.10.0.1   (depends
on 0.8)
[warn]  * io.netty:netty:3.9.9.Final is selected over {3.6.2.Final,
3.7.0.Final}
[warn]  +- org.apache.spark:spark-core_2.11:2.2.0