Which runner are you using ?

There's a known issue with SDFs not triggering for portable runners:
https://github.com/apache/beam/issues/20979

This should not occur for Dataflow.
For Flink, you could use the option "--experiments=use_deprecated_read" to
make it work.

Thanks,
Cham

On Fri, Mar 8, 2024 at 8:10 AM LDesire <[email protected]> wrote:

> Hello Apache Beam community.
> I'm asking because while creating a beam pipeline in Python, ReadFromKafka
> is not working.
>
> My code looks like this
>
> ```
> @beam.ptransform_fn
> def LogElements(input):
>     def log_element(elem):
>         print(elem)
>         return elem
>
>     return (
>             input | 'DoLog' >> Map(log_element)
>     )
>
>
> if __name__ == '__main__':
>     consumer_config = {
>         'bootstrap.servers': '<bootstrap_server>'
>     }
>
>     with beam.Pipeline(options=PipelineOptions(['--streaming'])) as p:
>         (
>                 p | ReadFromKafka(consumer_config=consumer_config,
> topics=['input_topic'])
>                 | "ToLines" >> beam.Map(lambda x: "%s %s" %
> (x[0].decode("utf-8"), x[1].decode("utf-8")))
>                 | 'Logging' >> LogElements()
>         )
> ```
>
> This is a simple pipeline that simply subscribes to a topic in Kafka and
> outputs to the console.
>
> I've seen in the documentation that using ReadFromKafka in Python creates
> an external java processor.
> We also confirmed that it creates a process in VisualVM just fine.
>
> However, I'm not sure why I'm not getting messages from Kafka.
>
> * I checked the `_really_start_process` method of the `SubprocessServer`
> class in subprocess_server.py to get the logs from the external java
> process.
>
>
>
>

Reply via email to