Hello All, I'm running a simple Structured Streaming on GCP, which reads data from Kafka and prints onto console.
Command : cloud dataproc jobs submit pyspark /Users/karanalang/Documents/Technology/gcp/DataProc/StructuredStreaming_Kafka_GCP-Batch-feb1.py --cluster dataproc-ss-poc --jars gs://spark-jars-karan/spark-sql-kafka-0-10_2.12-3.1.2.jar gs://spark-jars-karan/spark-core_2.12-3.1.2.jar --region us-central1 I'm getting error : File "/tmp/01c16a55009a42a0a29da6dde9aae4d5/StructuredStreaming_Kafka_GCP-Batch-feb1.py", line 49, in <module> df = spark.read.format('kafka')\ File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 210, in load File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__ File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o69.load. : java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<init>(KafkaSourceProvider.scala:599) at org.apache.spark.sql.kafka010.KafkaSourceProvider$.<clinit>(KafkaSourceProvider.scala) at org.apache.spark.sql.kafka010.KafkaSourceProvider.org $apache$spark$sql$kafka010$KafkaSourceProvider$$validateBatchOptions(KafkaSourceProvider.scala:348) at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:128) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325) at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer at java.net.URLClassLoader.findClass(URLClassLoader.java:387) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) Additional details are in stackoverflow - https://stackoverflow.com/questions/70951195/gcp-dataproc-java-lang-noclassdeffounderror-org-apache-kafka-common-serializa Do we need to pass any other jar ? What needs to be done to debug/fix this ? tia !