Hi Averell,
What you want to do is possible today but at this point is an early
experimental feature. The reason for that is that Kafka is a
cross-language Java transform in a Python pipeline. We just recently
enabled cross-language pipelines.
1. First of all, until 2.13.0 is released you will have to use the
latest master version, e.g.
$ git clone https://github.com/apache/beam
2. Setup and activate a Python virtual environment:
$ virtualenv ~/beam_environemnt
$ source ~/beam_environment/bin/activate
4. Build the Python SDK:
$ cd beam
$ ./gradlew :beam-sdks-python:buildSnapshot
$ cd sdks/python/build/
$ unzip apache-beam-2.13.0.dev0.zip
$ cd apache-beam-2.13.0.dev0
$ python setup.py install
3. Start the Flink JobServer / ExpansionServer
$ ./gradlew :beam-runners-flink-1.7-job-server:runShadow
5. Create your Python pipeline and use the ReadFromKafka transform, e.g.
options = ["--runner=PortableRunner",
"--job_endpoint=localhost:8099"]
p = Pipeline(options)
(p
|
ReadFromKafka(consumer_config={'bootstrap.servers':
'kafka_broker:port'},
topics=['myTopic'])
|
Map(lambda x,y: ...)
p.run()
6. Run your file with the python command :)
Note: If you do not set key_deserializer or value_serializer for
ReadFromKafka, you will receive the read data as KV[bytes, bytes]. That
means you have to perform decoding inside Python. If you set a Kafka
Deserializer, you can also receive the Kafka data already decoded.
However, you are limited to the coders in ModelCoders. For example, Int,
Long, KV, Iterable are supported; we also added String recently.
Hope that makes sense. Curious to see how your experiments go.
Cheers,
Max
PS: The best resources are in the doc comments of kafka.py:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py
or here:
https://beam.apache.org/documentation/runners/flink/
https://beam.apache.org/roadmap/portability/
On 10.05.19 14:33, lvhu...@gmail.com wrote:
Hi everyone,
I am trying to get started with Python on Flink-cluster runner, to build a
pipeline that reads data from Kafka and write to S3 in parquet format.
I tried to search on Beam website, but could not find any example (even for the basic
word count). E.g, in this page https://beam.apache.org/get-started/wordcount-example/, in
all Python - Flink-cluster sections, there's no content but "This runner is not yet
available for the Python SDK."
At this point in time, is that possible to create such a pipeline? From all the
slides / videos, it seems feasible. But, could you please lead me to some
step-by-step guide?
Thanks and best regards,
Averell