This is mostly a question about SparkRunner and to certain extent FileIO.
You might want to elaborate a bit more what you mean by data loss. In most
cases, restarting a pipeline from scratch loses checkpointed state from
previous job (e.g. first 30 minutes of a 1 hour window would be lost),
unless you have a way to restart from a 'snapshot' of the pipeline (i.e.
starting from 'savepoint' in Flink or 'updating' a pipeline in Dataflow).

Regd 'commitOnFinalize()' in KafkaIO, it runs 'soon after' the
corresponding messages are processed/checkpointed. In the case of Spark and
Dataflow, it would be after the messages pass through the first stage of
the pipeline.

> Please advice if this usecase (data ingestion to hdfs) is something beam
could achieve without lossing data from KafkaIO.
Yes, reading from any supported source and writing to any supported sink is
supported. Otherwise, it would be a bug.

On Mon, Oct 1, 2018 at 10:25 PM Juan Carlos Garcia <[email protected]>
wrote:

> Hi folks we are running a pipeline which as the subject says the we are
> having issues with data lost.
>
> Using KafkaIO (2.0.4 due to the version of our brokers) with
> commitOnFinalize, we would like to understand how this finalize work
> together with a FileIO.
>
> I studied the KafkaIO and saw that the records are committed to kafka
> inside the consumerPollLoop method only when a checkpoint is produced, but
> when is this checkpoint produced?, how does it cope with windowed data and
> a FileIO to produces files?
>
> When running with spark our batchInterval is 30secs, and the pipeline have
> a fixed-window of 1hr for FileIO to write to HDFS and we are constantly
> restarting the pipeline (1 or 3 times a day, or yarn reach it maximum
> restart attempt and then it kill it completely due to networks interruption
> ), however we have detected we have missing data on HDFS.
>
> Initially we were running without specifying a checkpoint directory
> (SparkRunner) , and we found that on each deployment a random directory was
> generated under /tmp, recently we started to uses a fixed directory for
> checkpoint (via - - checkpointDir on the spark runner), but still we have
> doubts that this will completely solve our data lost problems when
> restarting the pipeline multiple times a day (or is it our assumption
> incorrect? ).
>
> Please advice if this usecase (data ingestion to hdfs) is something beam
> could achieve without lossing data from KafkaIO.
>
> Thanks
> JC
>
>
>
>

Reply via email to