Hi Lei,

you can check how the FlinkFixedPartitioner [1] or Tuple2FlinkPartitioner [2] are implemented. Since you are using SQL connectors of the newest generation, you should receive an instance of org.apache.flink.table.data.RowData in your partitioner.

You can create a Maven project with a flink-connector-kafka_2.11 provided dependency and create a JAR with the class file. You should then be able to pass the JAR to SQL as you pass other JARs.

Regards,
Timo

[1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitioner.java [2] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/Tuple2FlinkPartitioner.java


On 18.08.20 12:57, wangl...@geekplus.com wrote:


CREATE TABLE kafka_sink_table(
  warehouse_id INT,
  pack_task_order_id BIGINT,
  out_order_code STRING,
  pick_order_id BIGINT,
  end_time BIGINT
WITH (
  'connector'='kafka',
  'topic'='ods_wms_pack_task_order',
  'properties.bootstrap.servers'='172.19.78.32:9092',
  'format'='json'
);


INSERT INTO kafka_sink_table SELECT  .......

As describe here: https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html
I want to do partition according to warehouse_id.

How should i write my customer partitioner? Is there any example?

Thanks,
Lei

------------------------------------------------------------------------
wangl...@geekplus.com <mailto:wangl...@geekplus.com.cn>

Reply via email to