Hi all,

Following the upgrade to 3.1.1, I see a couple of issues.

Spark Structured Streaming (SSS) does not seem to work with the newer
spark-sql-kafka-0-10_2.12-3.1.1.jar for Spark. It throws


java.lang.NoSuchMethodError:
org.apache.spark.kafka010.KafkaTokenUtil$.needTokenUpdate(Ljava/util/Map;Lscala/Option;)Z

So I have to use the previous jar file spark-sql-kafka-0-10_2.12-3.0.1.jar

However, we can set aside that for now

The second point is that with the following jars under $SPARK_HOME/jars


   1. spark-sql-kafka-0-10_2.12-3.0.1.jar
   2. commons-pool2-2.9.0.jar
   3. kafka-clients-2.7.0.jar


The SSS job runs in local mode as a single JVM

In Yarn mode this fails with the following error


*java.lang.NoClassDefFoundError: Could not initialize class
org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer*

Even other executors running on the same node fail as well. with the above
error.

I have ensured that those jar files are available on all three nodes of the
cluster (on-prem)  but still no luck,

Any ideas appreciated.

Thanks



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 22 Feb 2021 at 22:55, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Many thanks Muru. That was a great help!
>
> -
> -----------------------------------+---------------------------------------------------------------------------------------------------------------------+-------+
> |key                                 |value
>
>     |headers|
>
> +------------------------------------+---------------------------------------------------------------------------------------------------------------------+-------+
> |b8f3bffd-42f6-4bb4-80fa-eafb6e1dd9a1|{"rowkey":"b8f3bffd-42f6-4bb4-80fa-eafb6e1dd9a1","ticker":"SBRY",
> "timeissued":"2021-02-20T19:10:18", "price":374.6} |null   |
> |d38c7771-9d1b-4cf1-94cf-97c8d4b7fd5e|{"rowkey":"d38c7771-9d1b-4cf1-94cf-97c8d4b7fd5e","ticker":"ORCL",
> "timeissued":"2021-02-20T19:10:22", "price":19.24} |null   |
> |1870f59a-2ef5-469d-a3e1-f756ab4de90c|{"rowkey":"1870f59a-2ef5-469d-a3e1-f756ab4de90c","ticker":"MRW",
> "timeissued":"2021-02-20T19:10:25", "price":263.05} |null   |
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 22 Feb 2021 at 22:46, muru <mmur...@gmail.com> wrote:
>
>> You should include commons-pool2-2.9.0.jar and remove
>> spark-streaming-kafka-0-10_2.12-3.0.1.jar (unnecessary jar).
>>
>> On Mon, Feb 22, 2021 at 12:42 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Trying to make PySpark with PyCharm work with Structured Streaming
>>>
>>> spark-3.0.1-bin-hadoop3.2
>>> kafka_2.12-1.1.0
>>>
>>> Basic code
>>>
>>> from __future__ import print_function
>>> from src.config import config, hive_url
>>> import sys
>>> from sparkutils import sparkstuff as s
>>>
>>> class MDStreaming:
>>>     def __init__(self, spark_session,spark_context):
>>>         self.spark = spark_session
>>>         self.sc = spark_context
>>>         self.config = config
>>>
>>>     def startStreaming(self):
>>>         self.sc.setLogLevel("ERROR")
>>>         try:
>>>             kafkaReaderWithHeaders = self.spark \
>>>                 .readStream \
>>>                 .format("kafka") \
>>>                 .option("kafka.bootstrap.servers",
>>> config['MDVariables']['bootstrapServers'],) \
>>>                 .option("schema.registry.url",
>>> config['MDVariables']['schemaRegistryURL']) \
>>>                 .option("group.id", config['common']['appName']) \
>>>                 .option("zookeeper.connection.timeout.ms",
>>> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
>>>                 .option("rebalance.backoff.ms",
>>> config['MDVariables']['rebalanceBackoffMS']) \
>>>                 .option("zookeeper.session.timeout.ms",
>>> config['MDVariables']['zookeeperSessionTimeOutMs']) \
>>>                 .option("auto.commit.interval.ms",
>>> config['MDVariables']['autoCommitIntervalMS']) \
>>>                 .option("subscribe", config['MDVariables']['topic']) \
>>>                 .option("failOnDataLoss", "false") \
>>>                 .option("includeHeaders", "true") \
>>>                 .option("startingOffsets", "earliest") \
>>>                 .load()
>>>         except Exception as e:
>>>                 print(f"""{e}, quitting""")
>>>                 sys.exit(1)
>>>
>>>         kafkaReaderWithHeaders.selectExpr("CAST(key AS STRING)",
>>> "CAST(value AS STRING)", "headers") \
>>>             .writeStream \
>>>             .format("console") \
>>>             .option("truncate","false") \
>>>             .start() \
>>>             .awaitTermination()
>>>         kafkaReaderWithHeaders.printSchema()
>>>
>>> if __name__ == "__main__":
>>>     appName = config['common']['appName']
>>>     spark_session = s.spark_session(appName)
>>>     spark_context = s.sparkcontext()
>>>     mdstreaming = MDStreaming(spark_session, spark_context)
>>>     mdstreaming.startStreaming()
>>>
>>> I have used the following jars in $SYBASE_HOME/jars
>>>
>>>   spark-sql-kafka-0-10_2.12-3.0.1.jar
>>>  kafka-clients-2.7.0.jar
>>>  spark-streaming-kafka-0-10_2.12-3.0.1.jar
>>>  spark-token-provider-kafka-0-10_2.12-3.0.1.jar
>>>
>>> and also in $SPARK_HOME/conf/spark-defaults.conf
>>>
>>> spark.driver.extraClassPath        $SPARK_HOME/jars/*.jar
>>> spark.executor.extraClassPath      $SPARK_HOME/jars/*.jar
>>>
>>>
>>> The error is this:
>>>
>>> 2021-02-22 16:40:38,886 ERROR executor.Executor: Exception in task 3.0
>>> in stage 0.0 (TID 3)
>>> *java.lang.NoClassDefFoundError: Could not initialize class
>>> org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer$*
>>> at
>>> org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.<init>(KafkaBatchPartitionReader.scala:52)
>>> at
>>> org.apache.spark.sql.kafka010.KafkaBatchReaderFactory$.createReader(KafkaBatchPartitionReader.scala:40)
>>> at
>>> org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:60)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:127)
>>> at
>>> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
>>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>>
>>> pyspark.sql.utils.StreamingQueryException: Writing job aborted.
>>> === Streaming Query ===
>>> Identifier: [id = 0706dcd1-01de-4d7f-a362-81257b45e38c, runId =
>>> d61d9807-6f6c-4de1-a60f-8ae31c8a3c36]
>>> Current Committed Offsets: {}
>>> Current Available Offsets: {KafkaV2[Subscribe[md]]:
>>> {"md":{"8":1905351,"2":1907338,"5":1905175,"4":1904978,"7":1907880,"1":1903797,"3":1906072,"6":1904936,"0":1903896}}}
>>>
>>> Current State: ACTIVE
>>> Thread State: RUNNABLE
>>>
>>> Logical Plan:
>>> WriteToMicroBatchDataSource ConsoleWriter[numRows=20, truncate=false]
>>> +- Project [cast(key#8 as string) AS key#24, cast(value#9 as string) AS
>>> value#25, headers#15]
>>>    +- StreamingDataSourceV2Relation [key#8, value#9, topic#10,
>>> partition#11, offset#12L, timestamp#13, timestampType#14, headers#15],
>>> org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@1cf1e26d,
>>> KafkaV2[Subscribe[md]]
>>>
>>> Process finished with exit code 1
>>>
>>> The thing is that the class is in the jar file below in $SPARK_HOME/jars
>>>
>>>
>>> find $SPARK_HOME/jars/  -name "*.jar" | xargs grep
>>> org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
>>>
>>>
>>> Binary file jars/spark-sql-kafka-0-10_2.12-3.0.1.jar matches
>>>
>>> Appreciate any feedback.
>>>
>>>
>>> Thanks
>>>
>>>
>>> Mich
>>>
>>>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>>

Reply via email to