Structured Streaming's file sink solves these problems by writing a
log/manifest of all the authoritative files written out (for any format).
So if you run batch or interactive queries on the output directory with
Spark, it will automatically read the manifest and only process files are
that are in the manifest, thus skipping any partial files, etc.



On Fri, Mar 2, 2018 at 1:37 PM, Sunil Parmar <sunilosu...@gmail.com> wrote:

> Is there a way to get finer control over file writing in parquet file
> writer ?
>
> We've an streaming application using Apache Apex ( on path of migration to
> Spark ...story for a different thread). The existing streaming application
> read JSON from Kafka and writes Parquet to HDFS. We're trying to deal with
> partial files by writing .tmp files and renaming them as the last step. We
> only commit offset after rename is successful. This way we get at least
> once semantic and partial file write issue.
>
> Thoughts ?
>
>
> Sunil Parmar
>
> On Wed, Feb 28, 2018 at 1:59 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> There is no good way to save to parquet without causing downstream
>> consistency issues.
>> You could use foreachRDD to get each RDD, convert it to
>> DataFrame/Dataset, and write out as parquet files. But you will later run
>> into issues with partial files caused by failures, etc.
>>
>>
>> On Wed, Feb 28, 2018 at 11:09 AM, karthikus <aswin8...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I have a Kafka stream data and I need to save the data in parquet format
>>> without using Structured Streaming (due to the lack of Kafka Message
>>> header
>>> support).
>>>
>>> val kafkaStream =
>>>       KafkaUtils.createDirectStream(
>>>         streamingContext,
>>>         LocationStrategies.PreferConsistent,
>>>         ConsumerStrategies.Subscribe[String, String](
>>>           topics,
>>>           kafkaParams
>>>         )
>>>       )
>>>     // process the messages
>>>     val messages = kafkaStream.map(record => (record.key, record.value))
>>>     val lines = messages.map(_._2)
>>>
>>> Now, how do I save it as parquet ? All the examples that I have come
>>> across
>>> uses SQLContext which is deprecated. ! Any help appreciated !
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>

Reply via email to