Hi Averell,

What you want to do is possible today but at this point is an early experimental feature. The reason for that is that Kafka is a cross-language Java transform in a Python pipeline. We just recently enabled cross-language pipelines.

1. First of all, until 2.13.0 is released you will have to use the
   latest master version, e.g.
   $ git clone https://github.com/apache/beam

2. Setup and activate a Python virtual environment:
   $ virtualenv ~/beam_environemnt
   $ source ~/beam_environment/bin/activate

4. Build the Python SDK:
   $ cd beam
   $ ./gradlew :beam-sdks-python:buildSnapshot
   $ cd sdks/python/build/
   $ unzip apache-beam-2.13.0.dev0.zip
   $ cd apache-beam-2.13.0.dev0
   $ python setup.py install

3. Start the Flink JobServer / ExpansionServer
   $ ./gradlew :beam-runners-flink-1.7-job-server:runShadow

5. Create your Python pipeline and use the ReadFromKafka transform, e.g.

   options = ["--runner=PortableRunner",
              "--job_endpoint=localhost:8099"]
   p = Pipeline(options)
   (p
    |
    ReadFromKafka(consumer_config={'bootstrap.servers':
                                   'kafka_broker:port'},
                  topics=['myTopic'])
    |
    Map(lambda x,y: ...)
   p.run()

6. Run your file with the python command :)

Note: If you do not set key_deserializer or value_serializer for ReadFromKafka, you will receive the read data as KV[bytes, bytes]. That means you have to perform decoding inside Python. If you set a Kafka Deserializer, you can also receive the Kafka data already decoded. However, you are limited to the coders in ModelCoders. For example, Int, Long, KV, Iterable are supported; we also added String recently.
        
Hope that makes sense. Curious to see how your experiments go.

Cheers,
Max

PS: The best resources are in the doc comments of kafka.py: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py
or here:
https://beam.apache.org/documentation/runners/flink/
https://beam.apache.org/roadmap/portability/

On 10.05.19 14:33, lvhu...@gmail.com wrote:
Hi everyone,

I am trying to get started with Python on Flink-cluster runner, to build a 
pipeline that reads data from Kafka and write to S3 in parquet format.
I tried to search on Beam website, but could not find any example (even for the basic 
word count). E.g, in this page https://beam.apache.org/get-started/wordcount-example/, in 
all Python - Flink-cluster sections, there's no content but "This runner is not yet 
available for the Python SDK."

At this point in time, is that possible to create such a pipeline? From all the 
slides / videos, it seems feasible. But, could you please lead me to some 
step-by-step guide?

Thanks and best regards,
Averell

Reply via email to