jasonliangyc created FLINK-32642:
------------------------------------

             Summary: The upsert mode doesn't work for the compound keys
                 Key: FLINK-32642
                 URL: https://issues.apache.org/jira/browse/FLINK-32642
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Client
            Reporter: jasonliangyc
         Attachments: image-2023-07-21-23-55-47-399.png, 
image-2023-07-21-23-56-56-543.png, image-2023-07-21-23-57-22-186.png, 
image-2023-07-22-00-11-56-363.png

 
Hi, the issue can be produced by following below steps:
 
*1.* Create two tables in sqlserver, one is the sink table, the other one is 
cdc source table, the sink table has a compound key(date_str,time_str).
*2.* Create the corresponding flink tables in sql-client.
 
{code:java}
--create sink table in sqlserver
CREATE TABLE cumulative_cnt (
  date_str VARCHAR(50),
  time_str VARCHAR(50),
  cnt INTEGER
  CONSTRAINT PK_cumulative_cnt PRIMARY KEY (date_str,time_str)
);

--create source cdc table in sqlserver
CREATE TABLE user_behavior (
  id INTEGER NOT NULL IDENTITY(101,1) PRIMARY KEY,
  create_date datetime NOT NULL,
  click_event VARCHAR(255) NOT NULL
);

EXEC sys.sp_cdc_enable_table  
@source_schema = N'dbo',  
@source_name   = N'user_behavior',  
@role_name     = NULL,  
@supports_net_changes = 1  
GO

--create flink tables through sql-client
CREATE TABLE cumulative_cnt (
    date_str STRING,
    time_str STRING,
    cnt BIGINT,
    PRIMARY KEY (date_str, time_str)NOT ENFORCED
  )  WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:sqlserver://xxxx:1433;databaseName=xxxx',
    'username' = 'xxxx',
    'password' = 'xxxx',
    'table-name' = 'cumulative_cnt'
);

--create flink cdc table through sql-client
CREATE TABLE user_behavior (
   id int,
   create_date TIMESTAMP(0),
   click_event STRING
 ) WITH (
    'connector' = 'sqlserver-cdc',
    'hostname' = 'xxxx',
    'port' = '1433',
    'username' = 'xxxx',
    'password' = 'xxxx',
    'database-name' = 'xxxx',
    'schema-name' = 'dbo',
    'table-name' = 'user_behavior'
 ); {code}
*3.* Run below sql through sql-client to start the job for capturing the cdc 
data and do the aggregation and finally insert the result into target table.
{code:java}
insert into cumulative_cnt
select
date_str,
max(time_str) as time_str,
count(*) as cnt
from
(
  select
  DATE_FORMAT(create_date, 'yyyy-MM-dd') as date_str,
  SUBSTR(DATE_FORMAT(create_date, 'HH:mm'),1,4) || '0' as time_str
  from user_behavior
) group by date_str; 


{code}
*4.* Insert two records for testing.
{code:java}
INSERT INTO user_behavior(create_date, click_event)VALUES ('2023-06-01 
01:01:00','click1');
INSERT INTO user_behavior(create_date, click_event)VALUES ('2023-06-01 
02:20:00','click1');{code}
*5.* Checked the result in db( pls see the screen 1) and found that the target 
table only have one record, but it is not the expectation cause the two source 
records have different time, thus the compound key(date_str,  time_str) shoud 
be different( pls see the screen 2 )
 
There should be two records in the target table:
2023-06-01    01:00    1
2023-06-01    02:20    2
 
screen 1
!image-2023-07-22-00-11-56-363.png|width=274,height=199!
 
screen 2
!image-2023-07-21-23-57-22-186.png|width=572,height=120!
 
 
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to