I've been teaching myself Beam and here is an example pipeline that uses
Kafka IO (read and write). I hope it helps.
*Prerequisites*
1. Kafka runs on Docker and its external listener is exposed on port 29092
(i.e. its bootstrap server address can be specified as localhost:29092)
2. The following entry exists in /etc/hosts and the multi language pipeline
(Kafk IO) runs in a Docker and the container runs using the Docker host
network (i.e. --network host)
*127.0.0.1 host.docker.internal*
*Key notes*
1. Use FlinkRunner with or without specifying *flink_master*. If not
specified, an embedded Flink cluster is used. Otherwise add the
*flink_master* option (eg "flink_master": "localhost:8081"). (Check
--use_own argument)
2. The Kafka bootstrap server address should be specified as
*host.docker.internal:29092* (change port is different).
3. Add *use_deprecated_read* as indicated in an earlier reply. Otherwise it
may get stuck.
import os
import argparse
import json
import logging
import typing
import apache_beam as beam
from apache_beam.io import kafka
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def decode_message(kafka_kv: tuple):
# Incoming Kafka records must have a key associated.
# Otherwise, Beam throws an exception with null keys.
# Example: (b'key', b'value')
return kafka_kv[1].decode("utf-8")
def create_message(element: dict):
key = {"id": element["id"]}
print(key)
return json.dumps(key).encode("utf-8"), json.dumps(element).encode(
"utf-8")
def parse_json(element: str):
return json.loads(element)
def run():
parser = argparse.ArgumentParser(
description="Kafka IO example"
)
parser.add_argument(
"--runner", default="FlinkRunner", help="Specify Apache Beam Runner"
)
parser.add_argument(
"--use_own",
action="store_true",
default="Flag to indicate whether to use an own local cluster",
)
opts = parser.parse_args()
pipeline_opts = {
"runner": opts.runner,
"job_name": "kafka-io",
"environment_type": "LOOPBACK",
"streaming": True,
"parallelism": 3,
"experiments": [
"use_deprecated_read"
], ## https://github.com/apache/beam/issues/20979
"checkpointing_interval": "60000",
}
if opts.use_own is True:
pipeline_opts = {**pipeline_opts, **{"flink_master":
"localhost:8081"}}
print(pipeline_opts)
options = PipelineOptions([], **pipeline_opts)
# Required, else it will complain that when importing worker functions
options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=options)
(
p
| "Read from Kafka"
>> kafka.ReadFromKafka(
consumer_config={
"bootstrap.servers": os.getenv(
"BOOTSTRAP_SERVERS",
"host.docker.internal:29092",
),
"auto.offset.reset": "earliest",
# "enable.auto.commit": "true",
"group.id": "kafka-io",
},
topics=["input-topic"],
)
| "Decode messages" >> beam.Map(decode_message)
| "Parse elements" >> beam.Map(parse_json)
| "Create messages"
>> beam.Map(create_message).with_output_types(typing.Tuple[bytes,
bytes])
| "Write to Kafka"
>> kafka.WriteToKafka(
producer_config={
"bootstrap.servers": os.getenv(
"BOOTSTRAP_SERVERS",
"host.docker.internal:29092",
)
},
topic="output-topic",
)
)
logging.getLogger().setLevel(logging.WARN)
logging.info("Building pipeline ...")
p.run().wait_until_finish()
if __name__ == "__main__":
run()
On Sat, 9 Mar 2024 at 08:12, Chamikara Jayalath via user <
[email protected]> wrote:
> Which runner are you using ?
>
> There's a known issue with SDFs not triggering for portable runners:
> https://github.com/apache/beam/issues/20979
>
> This should not occur for Dataflow.
> For Flink, you could use the option "--experiments=use_deprecated_read" to
> make it work.
>
> Thanks,
> Cham
>
> On Fri, Mar 8, 2024 at 8:10 AM LDesire <[email protected]> wrote:
>
>> Hello Apache Beam community.
>> I'm asking because while creating a beam pipeline in Python,
>> ReadFromKafka is not working.
>>
>> My code looks like this
>>
>> ```
>> @beam.ptransform_fn
>> def LogElements(input):
>> def log_element(elem):
>> print(elem)
>> return elem
>>
>> return (
>> input | 'DoLog' >> Map(log_element)
>> )
>>
>>
>> if __name__ == '__main__':
>> consumer_config = {
>> 'bootstrap.servers': '<bootstrap_server>'
>> }
>>
>> with beam.Pipeline(options=PipelineOptions(['--streaming'])) as p:
>> (
>> p | ReadFromKafka(consumer_config=consumer_config,
>> topics=['input_topic'])
>> | "ToLines" >> beam.Map(lambda x: "%s %s" %
>> (x[0].decode("utf-8"), x[1].decode("utf-8")))
>> | 'Logging' >> LogElements()
>> )
>> ```
>>
>> This is a simple pipeline that simply subscribes to a topic in Kafka and
>> outputs to the console.
>>
>> I've seen in the documentation that using ReadFromKafka in Python creates
>> an external java processor.
>> We also confirmed that it creates a process in VisualVM just fine.
>>
>> However, I'm not sure why I'm not getting messages from Kafka.
>>
>> * I checked the `_really_start_process` method of the `SubprocessServer`
>> class in subprocess_server.py to get the logs from the external java
>> process.
>>
>>
>>
>>