Hi Slim,

In 1.11, I think you have to implement a custom FlinkKafkaPartitioner and
set the class name to 'sink.partitioner' option.

In 1.12, you can re-partition the data by specifying the key field (Kafka
producer will partition data by the message key by default). You can do
this by adding some additional options in 1.12.

CREATE TABLE output_kafkaTable (
 user_id BIGINT,
 item_id BIGINT,
 category_id BIGINT,
 behavior STRING,
 ts TIMESTAMP(3)
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior_partition_by_iid',
 'properties.bootstrap.servers' = 'localhost:9092',
 'key.fields' = 'item_id',  -- specify which columns will be written to
message key
 'key.format' = 'raw',
 'value.format' = 'json'
);


Best,
Jark



On Tue, 17 Nov 2020 at 13:53, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi,
>
> I'm pulling in some Flink SQL experts (in CC) to help you with this one :)
>
> Cheers,
> Gordon
>
> On Tue, Nov 17, 2020 at 7:30 AM Slim Bouguerra <slim.bougue...@gmail.com>
> wrote:
>
>> Hi,
>> I am trying to author a SQL job that does repartitioning a Kafka SQL
>> table into another Kafka SQL table.
>> as example input/output tables have exactly the same SQL schema (see
>> below) and data the only difference is that the new kafka stream need to be
>> repartition using a simple project like item_id (input stream is
>> partitioned by user_id)
>> is there a way to do this via SQL only ? without using
>> org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner
>>
>> In other words how can we express the stream key (keyedBy) via the SQL
>> layer ?
>>
>> For instance in Hive they expose a system column called  __key or
>> __partition that can be used to do this via SQL layer  (see
>> https://github.com/apache/hive/tree/master/kafka-handler#table-definitions
>> )
>>
>> CREATE TABLE input_kafkaTable (
>>  user_id BIGINT,
>>  item_id BIGINT,
>>  category_id BIGINT,
>>  behavior STRING,
>>  ts TIMESTAMP(3)
>> ) WITH (
>>  'connector' = 'kafka',
>>  'topic' = 'user_behavior_partition_by_uid',
>>  'properties.bootstrap.servers' = 'localhost:9092',
>> )
>>
>> CREATE TABLE output_kafkaTable (
>>  user_id BIGINT,
>>  item_id BIGINT,
>>  category_id BIGINT,
>>  behavior STRING,
>>  ts TIMESTAMP(3)
>> ) WITH (
>>  'connector' = 'kafka',
>>  'topic' = 'user_behavior_partition_by_iid',
>>  'properties.bootstrap.servers' = 'localhost:9092',
>> )
>>
>>
>>
>> --
>>
>> B-Slim
>> _______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______/\/\/\_______
>>
>

Reply via email to