Hi everyone, I have pretty challenging problem with reading/writing multiple parquet files with streaming, but let me introduce my data flow:
I have a lot of json events streaming to my platform. All of them have the same structure, but fields are mostly optional. Some of the fields are arrays with structs inside. These arrays can be empty, but sometimes they contain the data (structs). Now I'm using Spark SQL & Streaming to: 0. Stream data from Kafka val stream = KafkaUtils.createDirectStream ... 1. read json data to json dataframe: stream.foreachRDD( rdd => { val dataRdd : RDD[String] = myTransform(rdd) val jsonDf = sql.read.json(dataRdd) 2. write jsonDf to Parquet files: if (firstRun) { jsonRdd.write.parquet("parquet-events") firstRun = false } else { // the table has to exist to be able to append data. jsonRdd.write.mode(SaveMode.Append).parquet("parquet-events") } }) All the writing goes fine. It produces multiple files, each for one batch of data. But the problem is on reading the data: scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) scala> val events = sqlContext.read.parquet("parquet-events") org.apache.spark.SparkException: Failed to merge incompatible schemas StructType... Caused by: org.apache.spark.SparkException: Failed to merge incompatible data types StringType and StructType(StructField(key,StringType,true), StructField(value,StringType,true)) Indeed the printed schemas contain mismatched types of few fields, e.g.: StructField(details,ArrayType(StringType,true),true) vs StructField(details,ArrayType(StructType(StructField(key,StringType,true), StructField(value,StringType,tru e)),true),true) It seems that bad thing happened in read.json: it recognized my array fields differently: when array is empty as containing Strings; when filled with data as containing structs. The code of json/InferSchema indeed suggests that: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala#L127Where canonicalizeType method replaces NullType with StringType in my empty arrays. This is a serious problem for someone trying to stream data from json to parquet tables . Does anyone have ideas how to handle this problem? My ideas are (some non-exclusive): 1. Have schema perfectly defined on my data. This is a last resort as I wanted to create schema-less solution. 2. Write my own schema inference, that removes empty arrays from schema. Then pass schema directly to read method. I could even use & modify InferSchema class from spark source, but it is private unfortunately... So I need to copy paste it. 3. Submit a bug to Spark about it. Do you also think it is a bug? It's a blocker for me currently, any help will be appreciated! Cheers, Krzysztof