Hi Lei,
you can check how the FlinkFixedPartitioner [1] or
Tuple2FlinkPartitioner [2] are implemented. Since you are using SQL
connectors of the newest generation, you should receive an instance of
org.apache.flink.table.data.RowData in your partitioner.
You can create a Maven project with a flink-connector-kafka_2.11
provided dependency and create a JAR with the class file. You should
then be able to pass the JAR to SQL as you pass other JARs.
Regards,
Timo
[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java
[2]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java
On 18.08.20 12:57, wangl...@geekplus.com wrote:
CREATE TABLE kafka_sink_table(
warehouse_id INT,
pack_task_order_id BIGINT,
out_order_code STRING,
pick_order_id BIGINT,
end_time BIGINT
WITH (
'connector'='kafka',
'topic'='ods_wms_pack_task_order',
'properties.bootstrap.servers'='172.19.78.32:9092',
'format'='json'
);
INSERT INTO kafka_sink_table SELECT .......
As describe here:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html
I want to do partition according to warehouse_id.
How should i write my customer partitioner? Is there any example?
Thanks,
Lei
------------------------------------------------------------------------
wangl...@geekplus.com <mailto:wangl...@geekplus.com.cn>