Greeting: I have an app working fine with hudi 0.6.0, Now I need to upgrade it so that I can run spark 3.1.2.
I have the following dependencies: <properties> <java.version>8</java.version> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <encoding>UTF-8</encoding> <spark.version>3.1.2</spark.version> <scala.version>2.12.11</scala.version> <scala.compat.version>2.12</scala.compat.version> <elastic-search.version>7.13.4</elastic-search.version> <jackson.version>2.10.0</jackson.version> <kafka.version>2.8.0</kafka.version> </properties> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-spark${spark.version}-bundle_${scala.compat.version}</artifactId> <version>0.10.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-avro_${scala.compat.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> The method to write to hudi’s is the same as before: def writeHudis(df: DataFrame, databaseName: String, tableName: String, pk: String, path: String, timestampKey: String, partitionKey: String=null, hiveStyle: Boolean = false): Unit = { val env = setup.value.env val db = ensureDatabase(databaseName) val hudiOptions = Map[String,String]( HoodieWriteConfig.TBL_NAME.key() -> tableName, DataSourceWriteOptions.RECORDKEY_FIELD.key() -> pk, DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> timestampKey, DataSourceWriteOptions.HIVE_SYNC_ENABLED.key() -> (env != Setup.LOCAL).toString, DataSourceWriteOptions.HIVE_TABLE.key() -> tableName, DataSourceWriteOptions.HIVE_DATABASE.key() -> db, DataSourceWriteOptions.HIVE_URL.key() -> s"jdbc:hive2://${emsMaster(env)}:10000/;ssl=true" ) // Write a DataFrame as a Hudi dataset val write = df.write .format("org.apache.hudi") .options(hudiOptions) if(partitionKey == null){ write.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.NonpartitionedKeyGenerator") .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS.key(), "org.apache.hudi.hive.NonPartitionedExtractor") .option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .option("hoodie.datasource.write.hive_style_partitioning", hiveStyle) .mode(SaveMode.Overwrite) .save(path) } else { write .option(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), partitionKey) .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS.key(), partitionKey) .mode(SaveMode.Append) .save(path) spark.sql(s"MSCK REPAIR TABLE $db.$tableName") } } But now I am getting java.lang.RuntimeException: scala.None$ is not a valid external type for schema of string at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.StaticInvoke_4$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_4$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:209) Please help, thanks Andrew This message and any attachments are intended only for the use of the addressee and may contain information that is privileged and confidential. If the reader of the message is not the intended recipient or an authorized representative of the intended recipient, you are hereby notified that any dissemination of this communication is strictly prohibited. If you have received this communication in error, notify the sender immediately by return email and delete the message and any attachments from your system.