Hi,

I am using EMR-5.33.0 (Spark version- Spark 2.4.7). I am writing job which
is reading from one kafka topic to other kafka topic. In kafka sink we are
using checkpointing also but facing below issues while running job

021-08-23 17:01:13.373 MicroBatchExecution stream execution thread for
demo_job [id = 3c8b0825-d737-4e5b-929f-0988acd563e2, runId =
c015f08c-0429-4c39-93e5-71d06d6e13d3] [ERROR] Query demo_job [id =
3c8b0825-d737-4e5b-929f-0988acd563e2, runId =
c015f08c-0429-4c39-93e5-71d06d6e13d3] terminated with error
java.lang.IllegalStateException: createWriterFactory should not be called
with SupportsWriteInternalRow.
at
org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow.createWriterFactory(SupportsWriteInternalRow.java:36)
at
org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter.createWriterFactory(MicroBatchWriter.scala:36)
at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:55)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:173)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:169)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:197)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:194)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:169)
at
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:289)
at
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:338)
at org.apache.spark.sql.Dataset.org
$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3416)
at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2791)
at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2791)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3391)
at
org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery$1(SQLExecution.scala:83)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:94)
at
org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141)
at
org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:93)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92)
at org.apache.spark.sql.Dataset.org
$apache$spark$sql$Dataset$$withAction(Dataset.scala:3390)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2791)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:540)
at
org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery$1(SQLExecution.scala:83)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:94)
at
org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141)
at
org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:93)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org
$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org
$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)

Can anyone help here and let me know about why it happened and what is
resolution for this.

-- 
Thanks & Regards,
Amit Sharma

Reply via email to