Hi Anna, Have you added spark-sql-kafka-0-10_2.11:2.2.0 package as well? Further info can be found here: https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#deploying The same --packages option can be used with spark-shell as well...
BR, G On Mon, Mar 18, 2019 at 10:07 PM anna stax <[email protected]> wrote: > Hi all, > I am unable to write the contents of spark dataframe to Kafka. > I am using Spark 2.2 > > This is my code > > val df = Seq(("1","One"),("2","two")).toDF("key","value") > df.printSchema() > df.show(false) > df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") > .write > .format("kafka") > .option("kafka.bootstrap.servers", "127.0.0.1:9092") > .option("topic", "testtopic") > .save() > > and I am getting the following error message > [Stage 0:> (0 + > 2) / 2]Exception in thread "main" org.apache.spark.SparkException: Job > aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most > recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor > driver): java.lang.NoSuchMethodError: > org.apache.spark.sql.catalyst.expressions.Cast$.apply$default$3()Lscala/Option; > at > org.apache.spark.sql.kafka010.KafkaWriteTask.createProjection(KafkaWriteTask.scala:112) > at > org.apache.spark.sql.kafka010.KafkaWriteTask.<init>(KafkaWriteTask.scala:39) > at > org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:90) > at > org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89) > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) > at > org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > I have added this dependency > > <dependency> > <groupId>org.apache.spark</groupId> > <artifactId>spark-sql_2.11</artifactId> > <version>2.2.2</version> > </dependency> > > Appreciate any help. Thanks. > > https://stackoverflow.com/questions/55229945/writing-the-contents-of-spark-dataframe-to-kafka-with-spark-2-2 >
