RE: Running spark Kafka streaming jo in Azure HDInsight

2021-10-06 Thread Muhammed Favas
Hi,

Yeah, but I verified the version of spark in HDInsight and the one I used in 
code and both are same.

HDInsight(4.0) has spark version 3.0 and Scala version 2.12

[cid:image001.png@01D7BAD3.81444600]

I used Livy API in HDInsight to submit the job. This is an API available in 
HDInsight to submit job remotely. I have passed all dependent jars while 
calling the submit.


Regards,
Favas

From: Stelios Philippou 
Sent: Wednesday, October 6, 2021 16:51 PM
To: Muhammed Favas 
Cc: user@spark.apache.org
Subject: Re: Running spark Kafka streaming jo in Azure HDInsight

Hi Favas,

The error states that you are using different libraries version.


Exception in thread "streaming-start" java.lang.NoSuchMethodError: 
org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V


Have in mind that Spark uses its internal libraries for the majority of this. 
So those two must be aligned between Spark and your code.

Can you verify that your HDVersion is indeed 3.0 ?

Also how are you submitting the job ?


On Wed, 6 Oct 2021 at 14:10, Muhammed Favas 
mailto:favas.muham...@expeedsoftware.com>> 
wrote:
Hi,

I am facing some dependency issue in running a spark streaming job in Azure 
HDInsight. The job is connecting to a kafka broker which is hosted in a LAN and 
has public IP access to it.

Spark job porn.xml set up – spark version 3.0.0, Scala version 2.12


  org.scala-lang
  scala-library
  2.12.12


  org.apache.spark
  spark-core_2.12
  3.0.0

 
  org.apache.spark
  spark-sql_2.12
  3.0.0
  


  org.apache.hadoop
  hadoop-common
  2.7.4


  org.apache.spark
  spark-streaming_2.12
  3.0.0
  


  org.apache.spark
  spark-streaming-kafka-0-10_2.12
  3.0.0


HDInsight version - Spark 3.0 (HDI 4.0)
I am using Livy API to start job in azure remotely. Below is the list of files 
passed in “jars” option in livy

kafka-clients-2.7.0.jar<https://expeediotsparkstorage.blob.core.windows.net/sparkjobs/kafka-clients-2.7.0.jar>,
spark-streaming-kafka-0-10_2.12-3.0.0.jar<https://expeediotsparkstorage.blob.core.windows.net/sparkjobs/spark-streaming-kafka-0-10_2.12-3.0.0.jar>,
spark-token-provider-kafka-0-10_2.12-3.0.0.jar<https://expeediotsparkstorage.blob.core.windows.net/sparkjobs/spark-token-provider-kafka-0-10_2.12-3.0.0.jar>

The job is starting in azure spark cluster, but it is not receiving data from 
my kafka broker. Here is the error I am getting


Exception in thread "streaming-start" java.lang.NoSuchMethodError: 
org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V

at 
org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:93)

at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:73)

at 
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:258)

at 
org.apache.spark.streaming.DStreamGraph.$anonfun$start$7(DStreamGraph.scala:55)

at 
org.apache.spark.streaming.DStreamGraph.$anonfun$start$7$adapted(DStreamGraph.scala:55)

at scala.collection.Iterator.foreach(Iterator.scala:941)

at scala.collection.Iterator.foreach$(Iterator.scala:941)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)

at 
scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:974)

at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53)

at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67)

at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56)

at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50)

at 
scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:971)

at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:153)

at 
scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute$(Tasks.scala:149)

at 
scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:440)

at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)

at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)

at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)

at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)

at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)

Here is the scala code which used to connect to broker.


import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe



val kafkaParams = Map[String, Object](
"bootstrap.servers" -> kafkaServer,
"key.deseria

Re: Running spark Kafka streaming jo in Azure HDInsight

2021-10-06 Thread Stelios Philippou
Hi Favas,

The error states that you are using different libraries version.

Exception in thread "streaming-start" java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V



Have in mind that Spark uses its internal libraries for the majority of
this. So those two must be aligned between Spark and your code.

Can you verify that your HDVersion is indeed 3.0 ?

Also how are you submitting the job ?


On Wed, 6 Oct 2021 at 14:10, Muhammed Favas <
favas.muham...@expeedsoftware.com> wrote:

> Hi,
>
>
>
> I am facing some dependency issue in running a spark streaming job in
> Azure HDInsight. The job is connecting to a kafka broker which is hosted in
> a LAN and has public IP access to it.
>
>
>
> Spark job porn.xml set up – spark version 3.0.0, Scala version 2.12
>
>
>
> 
>   org.scala-lang
>   scala-library
>   2.12.12
> 
> 
>   org.apache.spark
>   spark-core_2.12
>   3.0.0
> 
>  
>   org.apache.spark
>   spark-sql_2.12
>   3.0.0
>   
> 
> 
>   org.apache.hadoop
>   hadoop-common
>   2.7.4
> 
> 
>   org.apache.spark
>   spark-streaming_2.12
>   3.0.0
>   
> 
> 
>   org.apache.spark
>   spark-streaming-kafka-0-10_2.12
>   3.0.0
> 
>
>
>
> HDInsight version - Spark 3.0 (HDI 4.0)
>
> I am using Livy API to start job in azure remotely. Below is the list of
> files passed in “jars” option in livy
>
>
>
> kafka-clients-2.7.0.jar
> 
> ,
>
> spark-streaming-kafka-0-10_2.12-3.0.0.jar
> 
> ,
>
> spark-token-provider-kafka-0-10_2.12-3.0.0.jar
> 
>
>
>
> The job is starting in azure spark cluster, but it is not receiving data
> from my kafka broker. Here is the error I am getting
>
>
>
> Exception in thread "streaming-start" java.lang.NoSuchMethodError: 
> org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
>
> at 
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:93)
>
> at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:73)
>
> at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:258)
>
> at 
> org.apache.spark.streaming.DStreamGraph.$anonfun$start$7(DStreamGraph.scala:55)
>
> at 
> org.apache.spark.streaming.DStreamGraph.$anonfun$start$7$adapted(DStreamGraph.scala:55)
>
> at scala.collection.Iterator.foreach(Iterator.scala:941)
>
> at scala.collection.Iterator.foreach$(Iterator.scala:941)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>
> at 
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:974)
>
> at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53)
>
> at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>
> at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67)
>
> at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56)
>
> at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50)
>
> at 
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:971)
>
> at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:153)
>
> at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute$(Tasks.scala:149)
>
> at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:440)
>
> at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
>
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>
> at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>
> at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
>
>
>
> Here is the scala code which used to connect to broker.
>
>
>
> import org.apache.spark.streaming.kafka010.KafkaUtils
> import org.apache.spark.streaming.kafka010.LocationStrategies.
> *PreferConsistent*import 
> org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
>
>
>
> val kafkaParams = *Map*[String, Object](
> "bootstrap.servers" -> kafkaServer,
> "key.deserializer" -> *classOf*[StringDeserializer],
> "value.deserializer" -> *classOf*[StringDeserializer],
> "group.id" -> connectionID,
> "auto.offset.reset" -> "earliest",
> "enable.auto.commit" -> (true: java.lang.Boolean),
> "partition.assignment.strategy" 
> ->"org.apache.kafka.clients.consumer.RangeAssignor"
>   )
>
>   val topics = *Array*(connectionID)
>