Hi Robert,

You can refer to
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
for the whole example.

Best,
Shuiqiang

Robert Cullen <cinquate...@gmail.com> 于2021年3月13日周六 上午4:01写道:

> Shuiqiang, Can you include the import statements?  thanks.
>
> On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <acqua....@gmail.com>
> wrote:
>
>> Hi Robert,
>>
>> Kafka Connector is provided in Python DataStream API since
>> release-1.12.0. And the documentation for it is lacking, we will make it up
>> soon.
>>
>> The following code shows how to apply KafkaConsumers and KafkaProducer:
>> ```
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>
>> # define the schema of the message from kafka, here the data is in json
>> format.
>> type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount',
>> 'payPlatform', 'provinceId'],
>> [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
>> Types.INT()])
>> json_row_schema =
>> JsonRowDeserializationSchema.builder().type_info(type_info).build()
>>
>> # define the kafka connection properties.
>> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
>> 'pyflink-e2e-source'}
>>
>> # create the KafkaConsumer and KafkaProducer with the specified topic
>> name, serialization/deserialization schema and properties.
>> kafka_consumer = FlinkKafkaConsumer("timer-stream-source",
>> json_row_schema, kafka_props)
>> kafka_producer = FlinkKafkaProducer("timer-stream-sink",
>> SimpleStringSchema(), kafka_props)
>>
>> # set the kafka source to consume data from earliest offset.
>> kafka_consumer.set_start_from_earliest()
>>
>> # create a DataStream from kafka consumer source
>> ds = env.add_source(kafka_consumer)
>>
>> result_stream = ...
>>
>> # write the result into kafka by a kafka producer sink.
>> result_stream.add_sink(kafka_producer)
>> ```
>>
>> Best,
>> Shuiqiang
>>
>> Robert Cullen <cinquate...@gmail.com> 于2021年3月13日周六 上午12:56写道:
>>
>>> I’ve scoured the web looking for an example of using a Kafka source for
>>> a DataStream in python. Can someone finish this example?
>>>
>>> env = StreamExecutionEnvironment.get_execution_environment()
>>> env.set_parallelism(1)
>>> ds = env.from_collection( KAFKA_SOURCE )
>>> ...
>>>
>>> --
>>> Robert Cullen
>>> 240-475-4490
>>>
>>
>
> --
> Robert Cullen
> 240-475-4490
>

Reply via email to