Dear Beam community,

I'm currently trying to set up a Beam pipeline using the PythonSDK. I need
to read from an unbounded Kafka source. My flow looks like this:
[image: flow.png]
It reads event from a Kafka topic, using a (stateless) router the events
get to different PTransforms and the final results get written to a Kafka
topic again. I use side outputs to 'route' and beam.Flatten() to merge
PCollections again.

In my first attempt, I used the built-in Kafka IO which uses an
ExternalTransform [1]. However, no data was being consumed from my Kafka
topic. I think this was also discussed in the mailing list before [2] and
has to do with this issue [3].

In my second attempt, I used this Kafka connector from an external source
[4]. This connector worked initially, where I did not fully implement the
flow as described above. When using this connector, I see messages get
'stuck' in the dataflow and not being fully processed until the end. I.e.
they get processed until a certain DoFn in the pipeline, but not any
further. Oddly enough, when I get rid of the beam.Flatten() (and therefore
I can't merge collections and just use a linear pipeline) it does work.
Moreover, when a substitute my kafka consumer with a simple beam.Create()
the full pipeline works as well.

I think the problem is that the Kafka connector I'm using is not a
Splittable DoFn and just blocks completely while consuming [5]. However,
I'm confused that this does work for a linear pipeline (without
flattening).

To give some more context; I'm planning to deploy this using a FlinkRunner.
Moreover, I have a 'hard' dependency on using Python and Kafka. In other
words, I can't move to another IO framework or programming language.

I hope you can help me out, provide some suggestions or workarounds.

Thanks in advance!
Regards,
Wouter

[1] -
https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.kafka.html
[2] -
https://lists.apache.org/x/thread.html/r9c74a8a7efa4b14f2f1d5f77ce8f12128cf1071861c1627f00702415@%3Cuser.beam.apache.org%3E

[3] - https://issues.apache.org/jira/browse/BEAM-11998
[4] - https://github.com/mohaseeb/beam-nuggets
[5] -
https://github.com/mohaseeb/beam-nuggets/blob/39d2493b161ebbcbff9f4857115d527f6fefda77/beam_nuggets/io/kafkaio.py#L76

Reply via email to