Dear spark user community, I have recieved some insight regarding filtering seperate dataframes in my spark-structured-streaming job. However I wish to write the dataframes aforementioned above in the stack overflow question each using a parquet writer to a separate location. My initial impression is this requires multiple sinks, but I'm being pressured against that. I think it might also be possible using the for each / for each batch writers. But I'm not sure regarding parquet writer, and also the caveats to this approach. Can some more advanced users or developers suggest how to go about this, particularly without using multiple streams?
On Wed, Dec 26, 2018 at 6:01 PM Colin Williams <colin.williams.seat...@gmail.com> wrote: > > https://stackoverflow.com/questions/53938967/writing-corrupt-data-from-kafka-json-datasource-in-spark-structured-streaming > > On Wed, Dec 26, 2018 at 2:42 PM Colin Williams > <colin.williams.seat...@gmail.com> wrote: > > > > From my initial impression it looks like I'd need to create my own > > `from_json` using `jsonToStructs` as a reference but try to handle ` > > case : BadRecordException => null ` or similar to try to write the non > > matching string to a corrupt records column > > > > On Wed, Dec 26, 2018 at 1:55 PM Colin Williams > > <colin.williams.seat...@gmail.com> wrote: > > > > > > Hi, > > > > > > I'm trying to figure out how I can write records that don't match a > > > json read schema via spark structred streaming to an output sink / > > > parquet location. Previously I did this in batch via corrupt column > > > features of batch. But in this spark structured streaming I'm reading > > > from kafka a string and using from_json on the value of that string. > > > If it doesn't match my schema then I from_json returns null for all > > > the rows, and does not populate a corrupt record column. But I want to > > > somehow obtain the source kafka string in a dataframe, and an write to > > > a output sink / parquet location. > > > > > > def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: > > > StructType) = { > > > val jsonDataFrame = > > > rawKafkaDataFrame.select(col("value").cast("string")) > > > jsonDataFrame.select(from_json(col("value"), > > > schema)).select("jsontostructs(value).*") > > > } --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org