Hi Robert, You can refer to https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py for the whole example.
Best, Shuiqiang Robert Cullen <cinquate...@gmail.com> 于2021年3月13日周六 上午4:01写道: > Shuiqiang, Can you include the import statements? thanks. > > On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <acqua....@gmail.com> > wrote: > >> Hi Robert, >> >> Kafka Connector is provided in Python DataStream API since >> release-1.12.0. And the documentation for it is lacking, we will make it up >> soon. >> >> The following code shows how to apply KafkaConsumers and KafkaProducer: >> ``` >> env = StreamExecutionEnvironment.get_execution_environment() >> env.set_parallelism(1) >> env.set_stream_time_characteristic(TimeCharacteristic.EventTime) >> >> # define the schema of the message from kafka, here the data is in json >> format. >> type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', >> 'payPlatform', 'provinceId'], >> [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(), >> Types.INT()]) >> json_row_schema = >> JsonRowDeserializationSchema.builder().type_info(type_info).build() >> >> # define the kafka connection properties. >> kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': >> 'pyflink-e2e-source'} >> >> # create the KafkaConsumer and KafkaProducer with the specified topic >> name, serialization/deserialization schema and properties. >> kafka_consumer = FlinkKafkaConsumer("timer-stream-source", >> json_row_schema, kafka_props) >> kafka_producer = FlinkKafkaProducer("timer-stream-sink", >> SimpleStringSchema(), kafka_props) >> >> # set the kafka source to consume data from earliest offset. >> kafka_consumer.set_start_from_earliest() >> >> # create a DataStream from kafka consumer source >> ds = env.add_source(kafka_consumer) >> >> result_stream = ... >> >> # write the result into kafka by a kafka producer sink. >> result_stream.add_sink(kafka_producer) >> ``` >> >> Best, >> Shuiqiang >> >> Robert Cullen <cinquate...@gmail.com> 于2021年3月13日周六 上午12:56写道: >> >>> I’ve scoured the web looking for an example of using a Kafka source for >>> a DataStream in python. Can someone finish this example? >>> >>> env = StreamExecutionEnvironment.get_execution_environment() >>> env.set_parallelism(1) >>> ds = env.from_collection( KAFKA_SOURCE ) >>> ... >>> >>> -- >>> Robert Cullen >>> 240-475-4490 >>> >> > > -- > Robert Cullen > 240-475-4490 >