Shuiqiang, Can you include the import statements? thanks. On Fri, Mar 12, 2021 at 1:48 PM Shuiqiang Chen <acqua....@gmail.com> wrote:
> Hi Robert, > > Kafka Connector is provided in Python DataStream API since release-1.12.0. > And the documentation for it is lacking, we will make it up soon. > > The following code shows how to apply KafkaConsumers and KafkaProducer: > ``` > env = StreamExecutionEnvironment.get_execution_environment() > env.set_parallelism(1) > env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > > # define the schema of the message from kafka, here the data is in json > format. > type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', > 'payPlatform', 'provinceId'], > [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(), > Types.INT()]) > json_row_schema = > JsonRowDeserializationSchema.builder().type_info(type_info).build() > > # define the kafka connection properties. > kafka_props = {'bootstrap.servers': 'localhost:9092', 'group.id': > 'pyflink-e2e-source'} > > # create the KafkaConsumer and KafkaProducer with the specified topic > name, serialization/deserialization schema and properties. > kafka_consumer = FlinkKafkaConsumer("timer-stream-source", > json_row_schema, kafka_props) > kafka_producer = FlinkKafkaProducer("timer-stream-sink", > SimpleStringSchema(), kafka_props) > > # set the kafka source to consume data from earliest offset. > kafka_consumer.set_start_from_earliest() > > # create a DataStream from kafka consumer source > ds = env.add_source(kafka_consumer) > > result_stream = ... > > # write the result into kafka by a kafka producer sink. > result_stream.add_sink(kafka_producer) > ``` > > Best, > Shuiqiang > > Robert Cullen <cinquate...@gmail.com> 于2021年3月13日周六 上午12:56写道: > >> I’ve scoured the web looking for an example of using a Kafka source for a >> DataStream in python. Can someone finish this example? >> >> env = StreamExecutionEnvironment.get_execution_environment() >> env.set_parallelism(1) >> ds = env.from_collection( KAFKA_SOURCE ) >> ... >> >> -- >> Robert Cullen >> 240-475-4490 >> > -- Robert Cullen 240-475-4490