Which runner are you using ? There's a known issue with SDFs not triggering for portable runners: https://github.com/apache/beam/issues/20979
This should not occur for Dataflow. For Flink, you could use the option "--experiments=use_deprecated_read" to make it work. Thanks, Cham On Fri, Mar 8, 2024 at 8:10 AM LDesire <[email protected]> wrote: > Hello Apache Beam community. > I'm asking because while creating a beam pipeline in Python, ReadFromKafka > is not working. > > My code looks like this > > ``` > @beam.ptransform_fn > def LogElements(input): > def log_element(elem): > print(elem) > return elem > > return ( > input | 'DoLog' >> Map(log_element) > ) > > > if __name__ == '__main__': > consumer_config = { > 'bootstrap.servers': '<bootstrap_server>' > } > > with beam.Pipeline(options=PipelineOptions(['--streaming'])) as p: > ( > p | ReadFromKafka(consumer_config=consumer_config, > topics=['input_topic']) > | "ToLines" >> beam.Map(lambda x: "%s %s" % > (x[0].decode("utf-8"), x[1].decode("utf-8"))) > | 'Logging' >> LogElements() > ) > ``` > > This is a simple pipeline that simply subscribes to a topic in Kafka and > outputs to the console. > > I've seen in the documentation that using ReadFromKafka in Python creates > an external java processor. > We also confirmed that it creates a process in VisualVM just fine. > > However, I'm not sure why I'm not getting messages from Kafka. > > * I checked the `_really_start_process` method of the `SubprocessServer` > class in subprocess_server.py to get the logs from the external java > process. > > > >
