Forgive me for the long email I figured more details was better, also asked
on SO if you prefer there:

https://stackoverflow.com/questions/66325929/dataflow-reading-from-kafka-without-data-loss


<https://stackoverflow.com/posts/66325929/timeline>

We're currently big users of Beam/Dataflow batch jobs and wanting to start
using streaming if it can be done reliably.

Here is a common scenario: We have a very large Kafka topic that we need to
do some basic ETL or aggregation on and a non idempotent upstream queue.
Here is an example of our Kafka data:

    ID |     msg | timestamp (mm,ss)
-----------------------
  1    |     A   |  01:00
  2    |     B   |  01:01
  3    |     D   |  06:00
  4    |     E   |  06:01
  4.3  |     F   |  06:01
 ....  | ......  | ...... (millions more)
  4.5  |    ZZ   |  19:58

Oops, the data changes from integers to decimals at some point, which will
eventually cause some elements to fail, necessitating us to kill the
pipeline, possibly modify the downstream service, and possibly make minor
code changes to the Dataflow pipeline.

In Spark Structured Streaming, because of the ability to use external
checkpoints, we would be able to restart a streaming job and resume
processing the queue where the previous job left off (successfully
processing) for exactly once processing. In a vanilla or spring boot Java
Application we could loop through with a Kafka consumer, and only after
writing results to our 'sink', commit offsets.

My overall question is *can we achieve similar functionality in
Dataflow/Beam?* I'll list some of my assumptions and concerns:

   1. It seems here in KafkaIO
   
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1860>
   there is not a relationship between the offset commit PCollection and the
   User's one, does that mean they can drift apart?
   2. It seems here in KafkaOffsetCommit
   
<https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java#L124>
   this is taking a window of five minutes and emitting the highest offset,
   but this is *not wall time*, this is kafka record time. Going back to
   our sample data, to me it looks like the entire queue's offset would be
   committed (in chunks of five minutes) as fast as possible!  *This means
   that if we have only finished processing up to record F in the first five
   minutes, we may have committed almost the entire queue's offests?*

Now in our scenario our Pipeline started failing around F, it seems our
only choice is to start from the beginning or lose data? I believe this
might be overcome with a lot of custom code (Custom DoFn to ensure the
Kafka Consumer never commits) and some custom code for our upstream sink
that would eventually commit offsets. Is there a better way to do this,
and/or are some my assumptions wrong about how offset management is handled
in Beam/Dataflow?
Thanks in advance for any help/pointers/ideas!

*~Vincent*

Reply via email to