Hi Kiran. I was using

   - Kafka Cluster 2.12-1.1.0
   - Spark Streaming 2.3, Spark SQL 2.3
   - Scala 2.11.8

Your Kafka version 0.10 seems to be pretty old. That may be the issue here.
Try upgrading Kafka in a test environment to see if it helps.


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 11 Jun 2021 at 08:55, Kiran Biswal <biswalki...@gmail.com> wrote:

> Hello Mich
>
> When you were using dstream, what version of kafka, spark and scala were
> you using?
>
> I am using kafka 0.10 with spark 3.0.1 and scala 2.12. Do you feel this
> combination can reliably work from streaming point of view.?
>
> I get below error when invoke createDirectStreamException. Any
> suggestions on how to move forard here?
>
> Thanks a kit for ak help.
> Thanks
> kiran
>
> in thread "streaming-start" java.lang.NoClassDefFoundError:
> org/apache/kafka/common/security/JaasContext
>
>       at 
> org.apache.spark.kafka010.KafkaTokenUtil$.isGlobalJaasConfigurationProvided(KafkaTokenUtil.scala:155)
>       at 
> org.apache.spark.kafka010.KafkaConfigUpdater.setAuthenticationConfigIfNeeded(KafkaConfigUpdater.scala:72)
>       at 
> org.apache.spark.kafka010.KafkaConfigUpdater.setAuthenticationConfigIfNeeded(KafkaConfigUpdater.scala:62)
>       at 
> org.apache.spark.streaming.kafka010.ConsumerStrategy.setAuthenticationConfigIfNeeded(ConsumerStrategy.scala:64)
>       at 
> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:91)
>       at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:73)
>       at 
> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:258)
>       at 
> org.apache.spark.streaming.DStreamGraph.$anonfun$start$7(DStreamGraph.scala:55)
>       at 
> org.apache.spark.streaming.DStreamGraph.$anonfun$start$7$adapted(DStreamGraph.scala:55)
>       at scala.collection.Iterator.foreach(Iterator.scala:941)
>       at scala.collection.Iterator.foreach$(Iterator.scala:941)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>       at 
> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:974)
>       at scala.collection.parallel.Task.$anonfun$tryLeaf$1(Tasks.scala:53)
>       at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>       at scala.util.control.Breaks$$anon$1.catchBreak(Breaks.scala:67)
>       at scala.collection.parallel.Task.tryLeaf(Tasks.scala:56)
>       at scala.collection.parallel.Task.tryLeaf$(Tasks.scala:50)
>       at 
> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:971)
>       at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute(Tasks.scala:153)
>       at 
> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask.compute$(Tasks.scala:149)
>       at 
> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:440)
>       at java.util.concurrent.RecursiveAction.exec(RecursiveAction.java:189)
>       at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>       at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>       at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>       at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.common.security.JaasContext
>       at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
>       ... 27 more
>
>
>
> On Tue, Jun 8, 2021 at 7:26 AM Mich Talebzadeh <mich.talebza...@gmail.com>
> wrote:
>
>> Hi Kiran,
>>
>> I don't seem to have a reference to handling offsets in my old code.
>>
>> However, in Spark structured streaming (SSS) I handle it using a
>> reference to checkpointLocation as below: (this is in Python)
>>
>>        checkpoint_path = "file:///ssd/hduser/avgtemperature/chkpt"
>>
>>           result = resultMF.withColumn("uuid",uuidUdf()) \
>>                      .selectExpr("CAST(uuid AS STRING) AS key",
>> "to_json(struct(startOfWindow, endOfWindow, AVGTemperature)) AS value") \
>>                      .writeStream \
>>                      .outputMode('complete') \
>>                      .format("kafka") \
>>                      .option("kafka.bootstrap.servers",
>> config['MDVariables']['bootstrapServers'],) \
>>                      .option("topic", "avgtemperature") \
>>                     * .option('checkpointLocation', checkpoint_path) \*
>>                      .queryName("avgtemperature") \
>>                      .start()
>>
>> Now within that  checkpoint_path directory you have five sub-directories
>> containing all you need  including offsets
>>
>> /ssd/hduser/avgtemperature/chkpt> ls
>> commits  metadata  offsets  sources  state
>>
>> HTH
>>
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 8 Jun 2021 at 01:21, Kiran Biswal <biswalki...@gmail.com> wrote:
>>
>>> Hello Mich
>>>
>>> Thanks a lot. Using code similar to yours I was able to compile.
>>>
>>> One outstanding question is in my older code the *getConsumerOffsets *older
>>> method was handling offsets(latestLeaderOffsets, earliestLeaderOffsets
>>> etc, was calling kafkaCluster).
>>>
>>>  Will there be data loss if I don't handle offsets? In your example
>>> handling offsets was not required? If I were to handle offsets any examples
>>> you could share?
>>>
>>> Thanks a lot again and appreciate the great help.
>>> Regards
>>> Kiran
>>>
>>> On Mon, Jun 7, 2021 at 2:58 AM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Hi Kiran,
>>>>
>>>> As you be aware  createDirectStream is depreciated and you ought to
>>>> use Spark Structured streaming, especially that you are moving to version
>>>> 3.0.1.
>>>>
>>>> If you still want to use dstream then that page seems to be correct
>>>>
>>>> Looking at my old code I have
>>>>
>>>> import org.apache.spark.streaming._
>>>> import org.apache.spark.streaming.kafka._
>>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>>
>>>>     val kafkaParams = Map[String, String](
>>>>                                       "bootstrap.servers" ->
>>>> bootstrapServers,
>>>>                                       "schema.registry.url" ->
>>>> schemaRegistryURL,
>>>>                                        "zookeeper.connect" ->
>>>> zookeeperConnect,
>>>>                                        "group.id" -> sparkAppName,
>>>>                                        "zookeeper.connection.timeout.ms"
>>>> -> zookeeperConnectionTimeoutMs,
>>>>                                        "rebalance.backoff.ms" ->
>>>> rebalanceBackoffMS,
>>>>                                        "zookeeper.session.timeout.ms"
>>>> -> zookeeperSessionTimeOutMs,
>>>>                                        "auto.commit.interval.ms" ->
>>>> autoCommitIntervalMS
>>>>                                      )
>>>>     //val topicsSet = topics.split(",").toSet
>>>>     val topicsValue = Set(topics)
>>>>     val dstream = KafkaUtils.createDirectStream[String, String,
>>>> StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsValue)
>>>>     dstream.cache()
>>>>
>>>> HTH,
>>>>
>>>>
>>>> Mich
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, 7 Jun 2021 at 10:34, Kiran Biswal <biswalki...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Mich, Thanks a lot for your response. I am basically trying to get
>>>>> some older code(streaming job to read from kafka) in 2.0.1 spark to work 
>>>>> in
>>>>> 3.0,1. The specific area where I am having problem (KafkaCluster) has most
>>>>> likely to do with get/ set commit offsets in kafka
>>>>>
>>>>>
>>>>>
>>>>> // Create message Dstream for each (topic, schema class)
>>>>>
>>>>>
>>>>>     val msgStreams = config.getTopicSchemaClassMap.map {
>>>>>
>>>>>
>>>>>       case (kafkaTopic, schemaClass) => {
>>>>>
>>>>>
>>>>>         val consumerOffsets = *getConsumerOffsets*(kafkaTopic)
>>>>>
>>>>>
>>>>>         val msgDStream = (KafkaUtils.createDirectStream[Array[Byte],
>>>>> Array[Byte], DefaultDecoder, DefaultDecoder,
>>>>>
>>>>>           Tuple2[Array[Byte],Array[Byte]]]
>>>>>
>>>>>
>>>>>           (ssc, kafkaParams, consumerOffsets,
>>>>>
>>>>>
>>>>>           (msg: MessageAndMetadata[Array[Byte], Array[Byte]]) =>
>>>>> (msg.key, msg.message)
>>>>>
>>>>>           ))
>>>>>
>>>>>
>>>>>         (kafkaTopic, schemaClass, msgDStream)
>>>>>
>>>>>
>>>>>       }
>>>>>
>>>>>
>>>>>     }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *The getConsumerOffsets  *method  internally used KafkaCluter which
>>>>> is probably deprecated.
>>>>>
>>>>>
>>>>>
>>>>> Do You think I need to mimic the code shown here to get/set offsets
>>>>> rather than use kafkaCluster?
>>>>>
>>>>>
>>>>>
>>>>> https://spark.apache.org/docs/3.0.0-preview/streaming-kafka-0-10-integration.html
>>>>>
>>>>>
>>>>> Thanks
>>>>>
>>>>> Kiran
>>>>>
>>>>> On Mon, Jun 7, 2021 at 1:04 AM Mich Talebzadeh <
>>>>> mich.talebza...@gmail.com> wrote:
>>>>>
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Are you trying to read topics from Kafka in spark 3.0.1?
>>>>>>
>>>>>> Have you checked Spark 3.0.1 documentation?
>>>>>>
>>>>>> Integrating Spark with Kafka is pretty straight forward. with 3.0.1
>>>>>> and higher
>>>>>>
>>>>>>
>>>>>> HTH
>>>>>>
>>>>>>
>>>>>>    view my Linkedin profile
>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>> may
>>>>>> arise from relying on this email's technical content is explicitly
>>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>>> arising from such loss, damage or destruction.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sun, 6 Jun 2021 at 21:18, Kiran Biswal <biswalki...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> *I am using spark 3.0.1 AND Kafka 0.10 AND Scala 2.12. Getting an
>>>>>>> error related to KafkaCluster (not found: type KafkaCluster). Is this 
>>>>>>> class
>>>>>>> deprecated? How do I find a replacement?*
>>>>>>>
>>>>>>> *I am upgrading from spark 2.0.1 to spark 3.0.1*
>>>>>>>
>>>>>>> *In spark 2.0.1 KafkaCluster was supported*
>>>>>>>
>>>>>>> https://spark.apache.org/docs/2.0.1/api/java/org/apache/spark/streaming/kafka/KafkaCluster.html
>>>>>>>
>>>>>>> just looking for ideas how to achieve same functionality in spark
>>>>>>> 3.0.1. Any thoughts and examples will be highly appreciated.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Kiran
>>>>>>>
>>>>>>

Reply via email to