Hi Wouter,

So in short my flow is like this:
> - I have a stateful DoFn, which reads from a PCollection with the
> configuration of my KafkaConsumer.
> - The stateful DoFn reads this message from the PCollection and starts a
> KafkaConsumer (in the process method)
> - Rather than yielding message incoming from the KafkaConsumer, they are
> added to BagState and CombiningValueStateSpec to keep track of the count.
> - When the CombiningValueStateSpec has a count >= MAX_BUFFER_SIZE we set a
> timer (TimeDomain.REAL_TIME) to be fired in a very small duration (1 ms).
> - The timer method yields all elements in the buffer and clears both the
> buffer state as well as the count state.
>
> However, these timers are never fired (i.e. timer methods are never
> executed).
>

Your guessing is correct.  The timer is scheduled only after the current
bundle finishes processing. I would change your KafkaReadDoFn to

   - in process(), you initialize your Kafka consumer and output first N
   records, when it exceeds N, you set a real-time timer and save the
   next available Kafka offset to ValueState. Then we should stop the process
   by returning from it.
   - in onTimer(), you can retrieve the next offset to read from ValueState
   and assign that offset to the consumer, and continue to read for next N
   elements. When you have outputed N elements, you save the offset again to
   the ValueState, reset the timers.
   - When timer fires, all steps in onTimer will be repeated.


Though this approach should work, it may not have a good performance since
we don't do anything special around distributing works.

What runner is your target runner? If it's Dataflow, I believe
ExternalTransform should work.

On Mon, May 10, 2021 at 3:13 AM Wouter Zorgdrager <[email protected]>
wrote:

> Hi Boyuan,
>
> Thanks for your suggestion, that sounds like a good idea. However, where
> do I open my connection with Kafka and start consuming messages? Is it in
> the process method (turning it into a blocking method?). I currently try it
> like this, but timers never seem to be fired. I have the feeling this is
> because of the blocking nature of the process method.
>
> So in short my flow is like this:
> - I have a stateful DoFn, which reads from a PCollection with the
> configuration of my KafkaConsumer.
> - The stateful DoFn reads this message from the PCollection and starts a
> KafkaConsumer (in the process method)
> - Rather than yielding message incoming from the KafkaConsumer, they are
> added to BagState and CombiningValueStateSpec to keep track of the count.
> - When the CombiningValueStateSpec has a count >= MAX_BUFFER_SIZE we set a
> timer (TimeDomain.REAL_TIME) to be fired in a very small duration (1 ms).
> - The timer method yields all elements in the buffer and clears both the
> buffer state as well as the count state.
>
> However, these timers are never fired (i.e. timer methods are never
> executed).
> Is there something I'm doing wrong or do you have any other suggestions
> there?
>
> On Thu, 8 Apr 2021 at 19:35, Boyuan Zhang <[email protected]> wrote:
>
>> Hi Wouter,
>>
>> Dataflow fuses your operations into several stages during execution time.
>> The downstream stage can have inputs only when upstream finishes at least
>> one bundle(a small portion of work that Dataflow gives to the worker). In
>> your case, it seems like your _ConsumeKafkaTopic runs forever and never
>> stops. Thus the operations in the same stage of _ConsumeKafkaTopic can see
>> the output from _ConsumeKafkaTopic immediately but the downstream stages
>> can never have the input there.
>>
>> One workaround I can come up is to change you _ConsumeKafkaTopic to a
>> stateful dofn. You can give _ConsumeKafkaTopic a variable to identify how
>> many records  _ConsumeKafkaTopic outputs once. Let's say the number is
>> 1000. When your output messages exceed 1000, you can save the current
>> consumer position into ValueState and set a now() + 5s processing time
>> timer. In the timer callback, you can output another 1000 records again and
>> set next timer. Does it sound work for you?
>>
>> On Thu, Apr 8, 2021 at 3:47 AM Wouter Zorgdrager <[email protected]>
>> wrote:
>>
>>> Dear Beam community,
>>>
>>> I'm currently trying to set up a Beam pipeline using the PythonSDK. I
>>> need to read from an unbounded Kafka source. My flow looks like this:
>>> [image: flow.png]
>>> It reads event from a Kafka topic, using a (stateless) router the events
>>> get to different PTransforms and the final results get written to a Kafka
>>> topic again. I use side outputs to 'route' and beam.Flatten() to merge
>>> PCollections again.
>>>
>>> In my first attempt, I used the built-in Kafka IO which uses an
>>> ExternalTransform [1]. However, no data was being consumed from my Kafka
>>> topic. I think this was also discussed in the mailing list before [2] and
>>> has to do with this issue [3].
>>>
>>> In my second attempt, I used this Kafka connector from an external
>>> source [4]. This connector worked initially, where I did not fully
>>> implement the flow as described above. When using this connector, I see
>>> messages get 'stuck' in the dataflow and not being fully processed until
>>> the end. I.e. they get processed until a certain DoFn in the pipeline, but
>>> not any further. Oddly enough, when I get rid of the beam.Flatten() (and
>>> therefore I can't merge collections and just use a linear pipeline) it does
>>> work. Moreover, when a substitute my kafka consumer with a simple
>>> beam.Create() the full pipeline works as well.
>>>
>>> I think the problem is that the Kafka connector I'm using is not a
>>> Splittable DoFn and just blocks completely while consuming [5]. However,
>>> I'm confused that this does work for a linear pipeline (without
>>> flattening).
>>>
>>> To give some more context; I'm planning to deploy this using a
>>> FlinkRunner. Moreover, I have a 'hard' dependency on using Python and
>>> Kafka. In other words, I can't move to another IO framework or programming
>>> language.
>>>
>>> I hope you can help me out, provide some suggestions or workarounds.
>>>
>>> Thanks in advance!
>>> Regards,
>>> Wouter
>>>
>>> [1] -
>>> https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.kafka.html
>>> [2] -
>>> https://lists.apache.org/x/thread.html/r9c74a8a7efa4b14f2f1d5f77ce8f12128cf1071861c1627f00702415@%3Cuser.beam.apache.org%3E
>>>
>>> [3] - https://issues.apache.org/jira/browse/BEAM-11998
>>> [4] - https://github.com/mohaseeb/beam-nuggets
>>> [5] -
>>> https://github.com/mohaseeb/beam-nuggets/blob/39d2493b161ebbcbff9f4857115d527f6fefda77/beam_nuggets/io/kafkaio.py#L76
>>>
>>>
>>

Reply via email to