Hi Boyuan,

I understand. I basically have to re-open the Kafka connection for every
'bundle', also since my Kafka consumer is not serializable.  It indeed
might be a bit inefficient, but this is mostly for testing purposes so I
guess it's ok for now. I'm using a FlinkRunner and unfortunately, DataFlow
is not an option. I'm hoping at some point this issue gets resolved [1] and
I will be able to use the built-in Beam Kafka connector.

Thanks for your help!

Wouter

[1] - https://issues.apache.org/jira/browse/BEAM-11998

On Mon, 10 May 2021 at 19:50, Boyuan Zhang <[email protected]> wrote:

> Hi Wouter,
>
> So in short my flow is like this:
>> - I have a stateful DoFn, which reads from a PCollection with the
>> configuration of my KafkaConsumer.
>> - The stateful DoFn reads this message from the PCollection and starts a
>> KafkaConsumer (in the process method)
>> - Rather than yielding message incoming from the KafkaConsumer, they are
>> added to BagState and CombiningValueStateSpec to keep track of the count.
>> - When the CombiningValueStateSpec has a count >= MAX_BUFFER_SIZE we set
>> a timer (TimeDomain.REAL_TIME) to be fired in a very small duration (1 ms).
>> - The timer method yields all elements in the buffer and clears both the
>> buffer state as well as the count state.
>>
>> However, these timers are never fired (i.e. timer methods are never
>> executed).
>>
>
> Your guessing is correct.  The timer is scheduled only after the current
> bundle finishes processing. I would change your KafkaReadDoFn to
>
>    - in process(), you initialize your Kafka consumer and output first N
>    records, when it exceeds N, you set a real-time timer and save the
>    next available Kafka offset to ValueState. Then we should stop the process
>    by returning from it.
>    - in onTimer(), you can retrieve the next offset to read from
>    ValueState and assign that offset to the consumer, and continue to read for
>    next N elements. When you have outputed N elements, you save the offset
>    again to the ValueState, reset the timers.
>    - When timer fires, all steps in onTimer will be repeated.
>
>
> Though this approach should work, it may not have a good performance since
> we don't do anything special around distributing works.
>
> What runner is your target runner? If it's Dataflow, I believe
> ExternalTransform should work.
>
> On Mon, May 10, 2021 at 3:13 AM Wouter Zorgdrager <[email protected]>
> wrote:
>
>> Hi Boyuan,
>>
>> Thanks for your suggestion, that sounds like a good idea. However, where
>> do I open my connection with Kafka and start consuming messages? Is it in
>> the process method (turning it into a blocking method?). I currently try it
>> like this, but timers never seem to be fired. I have the feeling this is
>> because of the blocking nature of the process method.
>>
>> So in short my flow is like this:
>> - I have a stateful DoFn, which reads from a PCollection with the
>> configuration of my KafkaConsumer.
>> - The stateful DoFn reads this message from the PCollection and starts a
>> KafkaConsumer (in the process method)
>> - Rather than yielding message incoming from the KafkaConsumer, they are
>> added to BagState and CombiningValueStateSpec to keep track of the count.
>> - When the CombiningValueStateSpec has a count >= MAX_BUFFER_SIZE we set
>> a timer (TimeDomain.REAL_TIME) to be fired in a very small duration (1 ms).
>> - The timer method yields all elements in the buffer and clears both the
>> buffer state as well as the count state.
>>
>> However, these timers are never fired (i.e. timer methods are never
>> executed).
>> Is there something I'm doing wrong or do you have any other suggestions
>> there?
>>
>> On Thu, 8 Apr 2021 at 19:35, Boyuan Zhang <[email protected]> wrote:
>>
>>> Hi Wouter,
>>>
>>> Dataflow fuses your operations into several stages during execution
>>> time. The downstream stage can have inputs only when upstream finishes at
>>> least one bundle(a small portion of work that Dataflow gives to the
>>> worker). In your case, it seems like your _ConsumeKafkaTopic runs
>>> forever and never stops. Thus the operations in the same stage
>>> of _ConsumeKafkaTopic can see the output from _ConsumeKafkaTopic
>>> immediately but the downstream stages can never have the input there.
>>>
>>> One workaround I can come up is to change you _ConsumeKafkaTopic to a
>>> stateful dofn. You can give _ConsumeKafkaTopic a variable to identify how
>>> many records  _ConsumeKafkaTopic outputs once. Let's say the number is
>>> 1000. When your output messages exceed 1000, you can save the current
>>> consumer position into ValueState and set a now() + 5s processing time
>>> timer. In the timer callback, you can output another 1000 records again and
>>> set next timer. Does it sound work for you?
>>>
>>> On Thu, Apr 8, 2021 at 3:47 AM Wouter Zorgdrager <[email protected]>
>>> wrote:
>>>
>>>> Dear Beam community,
>>>>
>>>> I'm currently trying to set up a Beam pipeline using the PythonSDK. I
>>>> need to read from an unbounded Kafka source. My flow looks like this:
>>>> [image: flow.png]
>>>> It reads event from a Kafka topic, using a (stateless) router the
>>>> events get to different PTransforms and the final results get written to a
>>>> Kafka topic again. I use side outputs to 'route' and beam.Flatten() to
>>>> merge PCollections again.
>>>>
>>>> In my first attempt, I used the built-in Kafka IO which uses an
>>>> ExternalTransform [1]. However, no data was being consumed from my Kafka
>>>> topic. I think this was also discussed in the mailing list before [2] and
>>>> has to do with this issue [3].
>>>>
>>>> In my second attempt, I used this Kafka connector from an external
>>>> source [4]. This connector worked initially, where I did not fully
>>>> implement the flow as described above. When using this connector, I see
>>>> messages get 'stuck' in the dataflow and not being fully processed until
>>>> the end. I.e. they get processed until a certain DoFn in the pipeline, but
>>>> not any further. Oddly enough, when I get rid of the beam.Flatten() (and
>>>> therefore I can't merge collections and just use a linear pipeline) it does
>>>> work. Moreover, when a substitute my kafka consumer with a simple
>>>> beam.Create() the full pipeline works as well.
>>>>
>>>> I think the problem is that the Kafka connector I'm using is not a
>>>> Splittable DoFn and just blocks completely while consuming [5]. However,
>>>> I'm confused that this does work for a linear pipeline (without
>>>> flattening).
>>>>
>>>> To give some more context; I'm planning to deploy this using a
>>>> FlinkRunner. Moreover, I have a 'hard' dependency on using Python and
>>>> Kafka. In other words, I can't move to another IO framework or programming
>>>> language.
>>>>
>>>> I hope you can help me out, provide some suggestions or workarounds.
>>>>
>>>> Thanks in advance!
>>>> Regards,
>>>> Wouter
>>>>
>>>> [1] -
>>>> https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.kafka.html
>>>> [2] -
>>>> https://lists.apache.org/x/thread.html/r9c74a8a7efa4b14f2f1d5f77ce8f12128cf1071861c1627f00702415@%3Cuser.beam.apache.org%3E
>>>>
>>>> [3] - https://issues.apache.org/jira/browse/BEAM-11998
>>>> [4] - https://github.com/mohaseeb/beam-nuggets
>>>> [5] -
>>>> https://github.com/mohaseeb/beam-nuggets/blob/39d2493b161ebbcbff9f4857115d527f6fefda77/beam_nuggets/io/kafkaio.py#L76
>>>>
>>>>
>>>

Reply via email to