Shuiqiang, Can you include the import statements?  thanks.

On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <acqua....@gmail.com> wrote:

> Hi Robert,
>
> Kafka Connector is provided in Python DataStream API since release-1.12.0.
> And the documentation for it is lacking, we will make it up soon.
>
> The following code shows how to apply KafkaConsumers and KafkaProducer:
> ```
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>
> # define the schema of the message from kafka, here the data is in json
> format.
> type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount',
> 'payPlatform', 'provinceId'],
> [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(),
> Types.INT()])
> json_row_schema =
> JsonRowDeserializationSchema.builder().type_info(type_info).build()
>
> # define the kafka connection properties.
> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id':
> 'pyflink-e2e-source'}
>
> # create the KafkaConsumer and KafkaProducer with the specified topic
> name, serialization/deserialization schema and properties.
> kafka_consumer = FlinkKafkaConsumer("timer-stream-source",
> json_row_schema, kafka_props)
> kafka_producer = FlinkKafkaProducer("timer-stream-sink",
> SimpleStringSchema(), kafka_props)
>
> # set the kafka source to consume data from earliest offset.
> kafka_consumer.set_start_from_earliest()
>
> # create a DataStream from kafka consumer source
> ds = env.add_source(kafka_consumer)
>
> result_stream = ...
>
> # write the result into kafka by a kafka producer sink.
> result_stream.add_sink(kafka_producer)
> ```
>
> Best,
> Shuiqiang
>
> Robert Cullen <cinquate...@gmail.com> 于2021年3月13日周六 上午12:56写道:
>
>> I’ve scoured the web looking for an example of using a Kafka source for a
>> DataStream in python. Can someone finish this example?
>>
>> env = StreamExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>> ds = env.from_collection( KAFKA_SOURCE )
>> ...
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>

-- 
Robert Cullen
240-475-4490

Reply via email to