Hi Lei, If you want to write your custom partitioner, I think you can refer to the built-in FlinkFixedPartitioner[1]
[1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java Best, Xingbo wangl...@geekplus.com <wangl...@geekplus.com> 于2020年8月18日周二 下午8:18写道: > > > CREATE TABLE kafka_sink_table( > warehouse_id INT, > pack_task_order_id BIGINT, > out_order_code STRING, > pick_order_id BIGINT, > end_time BIGINT > WITH ( > 'connector'='kafka', > 'topic'='ods_wms_pack_task_order', > 'properties.bootstrap.servers'='172.19.78.32:9092', > 'format'='json' > ); > > > INSERT INTO kafka_sink_table SELECT ....... > > As describe here: > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html > > I want to do partition according to warehouse_id. > > How should i write my customer partitioner? Is there any example? > > Thanks, > Lei > > ------------------------------ > wangl...@geekplus.com <wangl...@geekplus.com.cn> >