Folks,

I've opened a PR a while ago with a PR to merge the possibility to merge a
custom data type, into a native data type. This is something new because of
the introduction of Delta.

To have some background, I'm having a DataSet that has fields of the type
XMLGregorianCalendarType. I don't care about this type and would like to
convert this to a standard data type. Mainly because, if I'm reading the
data again using another job, it needs to have the customer data type being
registered, which is not possible in the SQL API. The magic bit here is
that I'm overriding the jsonValue to lose the information about the custom
data type. In this case, you have to make sure that it is serialized as the
normal timestamp.

Before Delta, when appending to the table, everything would go fine because
it would not check compatibility on write. Now with Delta, things are
different. When writing, it will check if the two structures can be merged:

OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512m; support
was removed in 8.0
Warning: Ignoring non-spark config property:
eventLog.rolloverIntervalSeconds=3600
Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed
to merge fields 'EventTimestamp' and 'EventTimestamp'. Failed to merge
incompatible data types TimestampType and
org.apache.spark.sql.types.CustomXMLGregorianCalendarType@6334178e;;
at
com.databricks.sql.transaction.tahoe.schema.SchemaUtils$$anonfun$18.apply(SchemaUtils.scala:685)
at
com.databricks.sql.transaction.tahoe.schema.SchemaUtils$$anonfun$18.apply(SchemaUtils.scala:674)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at
com.databricks.sql.transaction.tahoe.schema.SchemaUtils$.com$databricks$sql$transaction$tahoe$schema$SchemaUtils$$merge$1(SchemaUtils.scala:674)
at
com.databricks.sql.transaction.tahoe.schema.SchemaUtils$.mergeSchemas(SchemaUtils.scala:750)
at
com.databricks.sql.transaction.tahoe.schema.ImplicitMetadataOperation$class.updateMetadata(ImplicitMetadataOperation.scala:63)
at
com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.updateMetadata(WriteIntoDelta.scala:50)
at
com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.write(WriteIntoDelta.scala:90)
at
com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand$$anonfun$run$2.apply(CreateDeltaTableCommand.scala:119)
at
com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand$$anonfun$run$2.apply(CreateDeltaTableCommand.scala:93)
at
com.databricks.logging.UsageLogging$$anonfun$recordOperation$1.apply(UsageLogging.scala:405)
at
com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:235)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:230)
at
com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:18)
at
com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:272)
at
com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:18)
at
com.databricks.logging.UsageLogging$class.recordOperation(UsageLogging.scala:386)
at
com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:18)
at
com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:55)
at
com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:98)
at
com.databricks.spark.util.UsageLogger$class.recordOperation(UsageLogger.scala:67)
at
com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:67)
at
com.databricks.spark.util.UsageLogging$class.recordOperation(UsageLogger.scala:342)
at
com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.recordOperation(CreateDeltaTableCommand.scala:45)
at
com.databricks.sql.transaction.tahoe.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:108)
at
com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.recordDeltaOperation(CreateDeltaTableCommand.scala:45)
at
com.databricks.sql.transaction.tahoe.commands.CreateDeltaTableCommand.run(CreateDeltaTableCommand.scala:93)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:146)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:134)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:187)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:183)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:134)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:115)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:115)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
at
org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:710)
at
org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:508)
at
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:483)
at
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:430)
at com.ahold.IngestFild$.writeUnmanagedTable(IngestFild.scala:49)
at com.ahold.IngestFild$.ingestFild(IngestFild.scala:69)
at com.ahold.IngestFild$.main(IngestFild.scala:31)
at com.ahold.IngestFild.main(IngestFild.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org
$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Currently, the merge function does not support UDT's. Therefore I've
extended the rules. Last few weeks it was quiet at the PR. Is this
something that we can merge into Spark? I would like to get your opinion on
this.

 Cheers, Fokko
  • [no subject] Driesprong, Fokko
    • Re: Driesprong, Fokko

Reply via email to