Hi everyone,
I have pretty challenging problem with reading/writing multiple parquet
files with streaming, but let me introduce my data flow:

I have a lot of json events streaming to my platform. All of them have the
same structure, but fields are mostly optional. Some of the fields are
arrays with structs inside.
These arrays can be empty, but sometimes they contain the data (structs).

Now I'm using Spark SQL & Streaming to:
0. Stream data from Kafka

val stream = KafkaUtils.createDirectStream ...

1. read json data to json dataframe:

stream.foreachRDD( rdd => {

val dataRdd : RDD[String] = myTransform(rdd)

val jsonDf = sql.read.json(dataRdd)

2. write jsonDf to Parquet files:

if (firstRun) {

  jsonRdd.write.parquet("parquet-events")

  firstRun = false

} else { // the table has to exist to be able to append data.

  jsonRdd.write.mode(SaveMode.Append).parquet("parquet-events")

}

})

All the writing goes fine. It produces multiple files, each for one batch
of data.

But the problem is on reading the data:

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

scala> val events = sqlContext.read.parquet("parquet-events")

org.apache.spark.SparkException: Failed to merge incompatible schemas
StructType...

Caused by: org.apache.spark.SparkException: Failed to merge
incompatible data types StringType and
StructType(StructField(key,StringType,true),
StructField(value,StringType,true))


Indeed the printed schemas contain mismatched types of few fields, e.g.:

StructField(details,ArrayType(StringType,true),true)

vs

StructField(details,ArrayType(StructType(StructField(key,StringType,true),
StructField(value,StringType,tru
e)),true),true)


It seems that bad thing happened in read.json:  it recognized my array
fields differently: when array is empty as containing Strings;  when
filled with data as containing structs.

The code of json/InferSchema indeed suggests that:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala#L127Where
canonicalizeType method replaces NullType with StringType in my empty
arrays.

This is a serious problem for someone trying to stream data from json
to parquet tables . Does anyone have ideas how to handle this problem?
My ideas are (some non-exclusive):

1. Have schema perfectly defined on my data. This is a last resort as
I wanted to create schema-less solution.

2. Write my own schema inference, that removes empty arrays from
schema. Then pass schema directly to read method. I could even use &
modify InferSchema class from spark source, but it is private
unfortunately... So I need to copy paste it.

3. Submit a bug to Spark about it. Do you also think it is a bug?

It's a blocker for me currently, any help will be appreciated!

Cheers,

Krzysztof

Reply via email to