Hi Boyuan,

Thanks for your suggestion, that sounds like a good idea. However, where do
I open my connection with Kafka and start consuming messages? Is it in the
process method (turning it into a blocking method?). I currently try it
like this, but timers never seem to be fired. I have the feeling this is
because of the blocking nature of the process method.

So in short my flow is like this:
- I have a stateful DoFn, which reads from a PCollection with the
configuration of my KafkaConsumer.
- The stateful DoFn reads this message from the PCollection and starts a
KafkaConsumer (in the process method)
- Rather than yielding message incoming from the KafkaConsumer, they are
added to BagState and CombiningValueStateSpec to keep track of the count.
- When the CombiningValueStateSpec has a count >= MAX_BUFFER_SIZE we set a
timer (TimeDomain.REAL_TIME) to be fired in a very small duration (1 ms).
- The timer method yields all elements in the buffer and clears both the
buffer state as well as the count state.

However, these timers are never fired (i.e. timer methods are never
executed).
Is there something I'm doing wrong or do you have any other suggestions
there?

On Thu, 8 Apr 2021 at 19:35, Boyuan Zhang <[email protected]> wrote:

> Hi Wouter,
>
> Dataflow fuses your operations into several stages during execution time.
> The downstream stage can have inputs only when upstream finishes at least
> one bundle(a small portion of work that Dataflow gives to the worker). In
> your case, it seems like your _ConsumeKafkaTopic runs forever and never
> stops. Thus the operations in the same stage of _ConsumeKafkaTopic can see
> the output from _ConsumeKafkaTopic immediately but the downstream stages
> can never have the input there.
>
> One workaround I can come up is to change you _ConsumeKafkaTopic to a
> stateful dofn. You can give _ConsumeKafkaTopic a variable to identify how
> many records  _ConsumeKafkaTopic outputs once. Let's say the number is
> 1000. When your output messages exceed 1000, you can save the current
> consumer position into ValueState and set a now() + 5s processing time
> timer. In the timer callback, you can output another 1000 records again and
> set next timer. Does it sound work for you?
>
> On Thu, Apr 8, 2021 at 3:47 AM Wouter Zorgdrager <[email protected]>
> wrote:
>
>> Dear Beam community,
>>
>> I'm currently trying to set up a Beam pipeline using the PythonSDK. I
>> need to read from an unbounded Kafka source. My flow looks like this:
>> [image: flow.png]
>> It reads event from a Kafka topic, using a (stateless) router the events
>> get to different PTransforms and the final results get written to a Kafka
>> topic again. I use side outputs to 'route' and beam.Flatten() to merge
>> PCollections again.
>>
>> In my first attempt, I used the built-in Kafka IO which uses an
>> ExternalTransform [1]. However, no data was being consumed from my Kafka
>> topic. I think this was also discussed in the mailing list before [2] and
>> has to do with this issue [3].
>>
>> In my second attempt, I used this Kafka connector from an external source
>> [4]. This connector worked initially, where I did not fully implement the
>> flow as described above. When using this connector, I see messages get
>> 'stuck' in the dataflow and not being fully processed until the end. I.e.
>> they get processed until a certain DoFn in the pipeline, but not any
>> further. Oddly enough, when I get rid of the beam.Flatten() (and therefore
>> I can't merge collections and just use a linear pipeline) it does work.
>> Moreover, when a substitute my kafka consumer with a simple beam.Create()
>> the full pipeline works as well.
>>
>> I think the problem is that the Kafka connector I'm using is not a
>> Splittable DoFn and just blocks completely while consuming [5]. However,
>> I'm confused that this does work for a linear pipeline (without
>> flattening).
>>
>> To give some more context; I'm planning to deploy this using a
>> FlinkRunner. Moreover, I have a 'hard' dependency on using Python and
>> Kafka. In other words, I can't move to another IO framework or programming
>> language.
>>
>> I hope you can help me out, provide some suggestions or workarounds.
>>
>> Thanks in advance!
>> Regards,
>> Wouter
>>
>> [1] -
>> https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.kafka.html
>> [2] -
>> https://lists.apache.org/x/thread.html/r9c74a8a7efa4b14f2f1d5f77ce8f12128cf1071861c1627f00702415@%3Cuser.beam.apache.org%3E
>>
>> [3] - https://issues.apache.org/jira/browse/BEAM-11998
>> [4] - https://github.com/mohaseeb/beam-nuggets
>> [5] -
>> https://github.com/mohaseeb/beam-nuggets/blob/39d2493b161ebbcbff9f4857115d527f6fefda77/beam_nuggets/io/kafkaio.py#L76
>>
>>
>

Reply via email to