There has been some discussion in the past about adding a "drain" feature
to Apache Beam which would allow this intermediate data to be output so it
isn't lost. The caveat is that you'll be outputting partial results.

The design doc was shared here:
https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit

On Tue, Oct 2, 2018 at 8:58 AM Raghu Angadi <[email protected]> wrote:

> > I am curious about what you mentioned (30min of 1hr window would be
> lost), just a noob question, why?
>
> Say you have 1 hour windowing in your pipeline. The aggregation is emitted
> at the end of the window. 30 minutes into the window, there might many
> incoming messages processed already. Where should the information about
> this partially processed window be stored? Managing this kind of state is
> an important part of runner. It is checkpointed on some persistent storage.
> If you restart the pipeline at that time, new job may not have access to
> stage from previous job, so you would lose 30 minutes worth of messages.
>
> The reliability has to be provided by the runner. I don't know much about
> details on SparkRunner, but you can certainly try another runner like Flink
> or Dataflow.
>
> Raghu.
> On Tue, Oct 2, 2018 at 8:09 AM Juan Carlos Garcia <[email protected]>
> wrote:
>
>> Thanks for you inputs on this matter, by data loss i meant data that is
>> on Kafka but was not written in HDFS due to restarting the pipeline
>> (SparkRunner) or because it failed due to connectivity and was kill by Yarn
>> and when we restarted the pipeline those records were skipped.
>>
>> I am curious about what you mentioned (30min of 1hr window would be
>> lost), just a noob question, why?
>>
>> > read from any source and write on any supported sink.
>>
>> I have no doubt about it.
>>
>> With a multi stage pipeline (where we sort, manipulate, group the data)
>> my purpose is to reliable sink data to HDFS, regardless of any interruption
>> on the pipeline, like other ingestion library do in "batch" (Camus /
>> gobbling from LinkedIn)
>>
>> During this period we are using HDFS as sink with Spark with a window to
>> avoid hitting HDFS badly, do you recommend me to use Flink instead (at
>> least for this requirement)?
>>
>> Thanks in advanced
>>
>> Raghu Angadi <[email protected]> schrieb am Di., 2. Okt. 2018, 08:25:
>>
>>> This is mostly a question about SparkRunner and to certain extent
>>> FileIO. You might want to elaborate a bit more what you mean by data loss.
>>> In most cases, restarting a pipeline from scratch loses checkpointed state
>>> from previous job (e.g. first 30 minutes of a 1 hour window would be lost),
>>> unless you have a way to restart from a 'snapshot' of the pipeline (i.e.
>>> starting from 'savepoint' in Flink or 'updating' a pipeline in Dataflow).
>>>
>>> Regd 'commitOnFinalize()' in KafkaIO, it runs 'soon after' the
>>> corresponding messages are processed/checkpointed. In the case of Spark and
>>> Dataflow, it would be after the messages pass through the first stage of
>>> the pipeline.
>>>
>>> > Please advice if this usecase (data ingestion to hdfs) is something
>>> beam could achieve without lossing data from KafkaIO.
>>> Yes, reading from any supported source and writing to any supported sink
>>> is supported. Otherwise, it would be a bug.
>>>
>>> On Mon, Oct 1, 2018 at 10:25 PM Juan Carlos Garcia <[email protected]>
>>> wrote:
>>>
>>>> Hi folks we are running a pipeline which as the subject says the we are
>>>> having issues with data lost.
>>>>
>>>> Using KafkaIO (2.0.4 due to the version of our brokers) with
>>>> commitOnFinalize, we would like to understand how this finalize work
>>>> together with a FileIO.
>>>>
>>>> I studied the KafkaIO and saw that the records are committed to kafka
>>>> inside the consumerPollLoop method only when a checkpoint is produced, but
>>>> when is this checkpoint produced?, how does it cope with windowed data and
>>>> a FileIO to produces files?
>>>>
>>>> When running with spark our batchInterval is 30secs, and the pipeline
>>>> have a fixed-window of 1hr for FileIO to write to HDFS and we are
>>>> constantly restarting the pipeline (1 or 3 times a day, or yarn reach it
>>>> maximum restart attempt and then it kill it completely due to networks
>>>> interruption ), however we have detected we have missing data on HDFS.
>>>>
>>>> Initially we were running without specifying a checkpoint directory
>>>> (SparkRunner) , and we found that on each deployment a random directory was
>>>> generated under /tmp, recently we started to uses a fixed directory for
>>>> checkpoint (via - - checkpointDir on the spark runner), but still we have
>>>> doubts that this will completely solve our data lost problems when
>>>> restarting the pipeline multiple times a day (or is it our assumption
>>>> incorrect? ).
>>>>
>>>> Please advice if this usecase (data ingestion to hdfs) is something
>>>> beam could achieve without lossing data from KafkaIO.
>>>>
>>>> Thanks
>>>> JC
>>>>
>>>>
>>>>
>>>>

Reply via email to