Dear Beam community, I'm currently trying to set up a Beam pipeline using the PythonSDK. I need to read from an unbounded Kafka source. My flow looks like this: [image: flow.png] It reads event from a Kafka topic, using a (stateless) router the events get to different PTransforms and the final results get written to a Kafka topic again. I use side outputs to 'route' and beam.Flatten() to merge PCollections again.
In my first attempt, I used the built-in Kafka IO which uses an ExternalTransform [1]. However, no data was being consumed from my Kafka topic. I think this was also discussed in the mailing list before [2] and has to do with this issue [3]. In my second attempt, I used this Kafka connector from an external source [4]. This connector worked initially, where I did not fully implement the flow as described above. When using this connector, I see messages get 'stuck' in the dataflow and not being fully processed until the end. I.e. they get processed until a certain DoFn in the pipeline, but not any further. Oddly enough, when I get rid of the beam.Flatten() (and therefore I can't merge collections and just use a linear pipeline) it does work. Moreover, when a substitute my kafka consumer with a simple beam.Create() the full pipeline works as well. I think the problem is that the Kafka connector I'm using is not a Splittable DoFn and just blocks completely while consuming [5]. However, I'm confused that this does work for a linear pipeline (without flattening). To give some more context; I'm planning to deploy this using a FlinkRunner. Moreover, I have a 'hard' dependency on using Python and Kafka. In other words, I can't move to another IO framework or programming language. I hope you can help me out, provide some suggestions or workarounds. Thanks in advance! Regards, Wouter [1] - https://beam.apache.org/releases/pydoc/2.27.0/apache_beam.io.kafka.html [2] - https://lists.apache.org/x/thread.html/r9c74a8a7efa4b14f2f1d5f77ce8f12128cf1071861c1627f00702415@%3Cuser.beam.apache.org%3E [3] - https://issues.apache.org/jira/browse/BEAM-11998 [4] - https://github.com/mohaseeb/beam-nuggets [5] - https://github.com/mohaseeb/beam-nuggets/blob/39d2493b161ebbcbff9f4857115d527f6fefda77/beam_nuggets/io/kafkaio.py#L76
