Greeting:

I have an app working fine with hudi 0.6.0, Now I need to upgrade it so that I 
can run spark 3.1.2.

I have the following dependencies:

<properties>
    <java.version>8</java.version>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <spark.version>3.1.2</spark.version>
    <scala.version>2.12.11</scala.version>
    <scala.compat.version>2.12</scala.compat.version>
    <elastic-search.version>7.13.4</elastic-search.version>
    <jackson.version>2.10.0</jackson.version>
    <kafka.version>2.8.0</kafka.version>
</properties>
<dependency>
    <groupId>org.apache.hudi</groupId>
    
<artifactId>hudi-spark${spark.version}-bundle_${scala.compat.version}</artifactId>
    <version>0.10.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-avro_${scala.compat.version}</artifactId>
    <version>${spark.version}</version>
    <scope>provided</scope>
</dependency>



The method to write to hudi’s is the same as before:
def writeHudis(df: DataFrame,
                       databaseName: String, tableName: String, pk: String,
                       path: String,
                       timestampKey: String, partitionKey: String=null, 
hiveStyle: Boolean = false): Unit = {
  val env = setup.value.env
  val db = ensureDatabase(databaseName)
  val hudiOptions = Map[String,String](
    HoodieWriteConfig.TBL_NAME.key() -> tableName,
    DataSourceWriteOptions.RECORDKEY_FIELD.key() -> pk,
    DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> timestampKey,
    DataSourceWriteOptions.HIVE_SYNC_ENABLED.key() -> (env != 
Setup.LOCAL).toString,
    DataSourceWriteOptions.HIVE_TABLE.key() -> tableName,
    DataSourceWriteOptions.HIVE_DATABASE.key() -> db,
    DataSourceWriteOptions.HIVE_URL.key() -> 
s"jdbc:hive2://${emsMaster(env)}:10000/;ssl=true"
  )

  // Write a DataFrame as a Hudi dataset
  val write = df.write
    .format("org.apache.hudi")
    .options(hudiOptions)
  if(partitionKey == null){
    write.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key(), 
"org.apache.hudi.keygen.NonpartitionedKeyGenerator")
      .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), 
"org.apache.hudi.hive.NonPartitionedExtractor")
      .option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
      .option("hoodie.datasource.write.hive_style_partitioning", hiveStyle)
      .mode(SaveMode.Overwrite)
      .save(path)
  }
  else {
    write
      .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), partitionKey)
      .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), partitionKey)
      .mode(SaveMode.Append)
      .save(path)
    spark.sql(s"MSCK REPAIR TABLE $db.$tableName")
  }
}





But now I am getting

java.lang.RuntimeException: scala.None$ is not a valid external type for schema 
of string

                at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.StaticInvoke_4$(Unknown
 Source)

                at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_4$(Unknown
 Source)

                at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
 Source)

                at 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:209)





Please help, thanks

Andrew


This message and any attachments are intended only for the use of the addressee 
and may contain information that is privileged and confidential. If the reader 
of the message is not the intended recipient or an authorized representative of 
the intended recipient, you are hereby notified that any dissemination of this 
communication is strictly prohibited. If you have received this communication 
in error, notify the sender immediately by return email and delete the message 
and any attachments from your system.

Reply via email to