I think you'd have to manually implement a pipeline that uses the Kafka API
to capture the current offsets of each partition (say, as a
PCollection<KV<Integer, Integer>>) and passes them to a ParDo that uses the
Kafka API to read each respective partition until the respective offset.

This could be something useful to contribute to KafkaIO.read() if this
approach plays out well for you.

On Wed, Dec 6, 2017 at 2:40 PM Vilhelm von Ehrenheim <[email protected]>
wrote:

> Hi!
> I have a job that I would like to run in batch that reads all contents of
> a Kafka log.
>
> To do this I need to use something like
>
> ```
> KafkaIO.read()
>     .updateConsumerProperties(ImmutableMap.of(
>         ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"))
>     .withMaxReadTime(Duration.standardMinutes(10))
> ```
>
> which requires me to on a hunch know how long it takes to read the whole
> log. It feels a bit flakey as this might change over time.
>
> The other alternative as I understand it it to use
> `withMaxNumRecords(...)` but that makes even less sense in my case as I do
> not know the number of records in the log.
>
> So my question is, is there any way I can read up to the current offsets
> that the log is on when the job start or similar instead?
>
> / Vilhelm
>

Reply via email to