[ https://issues.apache.org/jira/browse/SPARK-35291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17347302#comment-17347302 ]
Umar Asir edited comment on SPARK-35291 at 5/19/21, 5:19 AM: ------------------------------------------------------------- It's an issue with delta. but null pointer is thrown from "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.CreateExternalRow_0$(Unknown Source)" was (Author: umerasir): It's an issue with delta. > NullPointerException at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.CreateExternalRow_0$(Unknown > Source) > -------------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-35291 > URL: https://issues.apache.org/jira/browse/SPARK-35291 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL > Affects Versions: 2.3.1, 3.0.2 > Reporter: Umar Asir > Priority: Major > Attachments: NotNullIssue.scala, cdwqasourceupdate.7z, > cdwqatgtupdate.7z, pom.xml, run1.log > > > We are trying to merge data using DeltaTable's merge API. On inserting a null > value into the not-null column results in NullPointerException instead of > throwing constrain violation error > *Code :* > {code:java} > package com.uasir.cdw.delta > import org.apache.spark.sql._ > import io.delta.tables._ > object NotNullIssue { > def main(args: Array[String]): Unit = { > System.setProperty("hadoop.home.dir", "C:\\Tools\\hadoop\\") > val spark = SparkSession > .builder() > .appName("DFMergeTest") > .master("local[*]") > .config("spark.sql.extensions", > "io.delta.sql.DeltaSparkSessionExtension") > .config("spark.sql.catalog.spark_catalog", > "org.apache.spark.sql.delta.catalog.DeltaCatalog") > .config("spark.testing.memory", "571859200") > .getOrCreate() > println("Reading from the source table") > val df = spark.read.format("delta").load("C:\\Input\\cdwqasourceupdate") > \\PFA cdwqasourceupdate.7z > df.show() > println("Reading from the target table") > val tgtDf = spark.read.format("delta").load("C:\\Input\\cdwqatgtupdate") > \\PFA cdwqatgtupdate.7z > tgtDf.show() > val sourceTable = "source" > val targetDataTable = "target" > val colMap= scala.collection.mutable.Map[String,String]() > val sourceFields = df.schema.fieldNames > val targetFields = tgtDf.schema.fieldNames > for ( i <- 0 until targetFields.length) { > colMap(targetFields(i)) = sourceTable + "." + sourceFields(i) > } > /* colMap will be generated as : > TGTID -> c1_ID > TGT_NAME -> c2_NAME > TGT_ADDRESS -> c3_address > TGT_DOB -> c4_dob > */ > println("update") > DeltaTable.forPath(spark, "C:\\Input\\cdwqatgtupdate") > .as(targetDataTable) > .merge( > df.as(sourceTable), > targetDataTable + "." + "TGTID" + " = " + sourceTable + "." + > "c1_ID" ) > .whenMatched() > .updateExpr(colMap) > .execute() > println("Reading from target the table after operation") > tgtDf.show() > } > } > {code} > *Error :* > {code:java} > Caused by: java.lang.RuntimeException: Error while decoding: > java.lang.NullPointerExceptionCaused by: java.lang.RuntimeException: Error > while decoding: java.lang.NullPointerExceptioncreateexternalrow(input[0, int, > false], input[1, string, false].toString, input[2, string, true].toString, > staticinvoke(class org.apache.spark.sql.catalyst.util.DateTimeUtils$, > ObjectType(class java.sql.Date), toJavaDate, input[3, date, true], true, > false), StructField(TGTID,IntegerType,false), > StructField(TGT_NAME,StringType,false), > StructField(TGT_ADDRESS,StringType,true), StructField(TGT_DOB,DateType,true)) > at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:188) > at > org.apache.spark.sql.delta.commands.MergeIntoCommand$JoinedRowProcessor.$anonfun$processPartition$9(MergeIntoCommand.scala:565) > at scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at > scala.collection.Iterator$$anon$10.next(Iterator.scala:459) at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown > Source) at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:265) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at > org.apache.spark.scheduler.Task.run(Task.scala:127) at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.NullPointerException at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.CreateExternalRow_0$(Unknown > Source) at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown > Source) at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184) > ... 17 more > {code} > *PFA :* > # Sample program [^NotNullIssue.scala] , > # Source data (parquet files) [^cdwqasourceupdate.7z] > # Target data (parquet files) [^cdwqatgtupdate.7z] > # Log [^run1.log] > # [^pom.xml] > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org