源表test:
CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED 
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'test'
)
源表status
CREATE TABLE status (
`id` INT,
`name` VARCHAR(255),
PRIMARY KEY(id) NOT ENFORCED 
) WITH (  
  'connector' = 'mysql-cdc',
  'hostname' = 'localhost',
  'port' = '3306',
  'username' = 'root',
  'password' = '1',
  'database-name' = 'ai_audio_lyric_task',
  'table-name' = 'status'
);

输出表
CREATE TABLE test_status (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
`status_name` VARCHAR(255)
PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'xxx',
  'index' = 'xxx',
  'username' = 'xxx',
  'password' = 'xxx',
  'sink.bulk-flush.backoff.max-retries' = '100000',
  'sink.bulk-flush.backoff.strategy' = 'CONSTANT',
  'sink.bulk-flush.max-actions' = '5000',
  'sink.bulk-flush.max-size' = '10mb',
  'sink.bulk-flush.interval' = '1s'
);


输出语句:
INSERT into test_status
SELECT t.*, s.name
FROM test AS t
LEFT JOIN status AS s ON t.status = s.id;

mysql表中已经有数据
test: 
0, name0, 2020-07-06 00:00:00 , 0
1, name1, 2020-07-06 00:00:00 , 1
2, name2, 2020-07-06 00:00:00 , 1
.....

status
0, status0
1, status1
2, status2
.....

操作顺序与复现:
1、启动任务,设置并行度为40,
表中数据算完后。/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink
savepoint保存,然后web ui上取消任务。
  ==> test_status中的数据正常:
    0, name0, 2020-07-06 00:00:00 , 0, status0
    1, name1, 2020-07-06 00:00:00 , 1, status1
    2, name2, 2020-07-06 00:00:00 , 1, status1

2、操作mysql, 将status中id=1数据变更为 status1_modify

3、接下来的重启上面的任务不同并行度下,1和大于1的情况下,在并行度大于1的情况下,结果跟预期不相符。
/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p  1
job  下,
  ==> test_status中的数据正常:
    0, name0, 2020-07-06 00:00:00 , 0, status0
    1, name1, 2020-07-06 00:00:00 , 1, status1_modify
    2, name2, 2020-07-06 00:00:00 , 1, status1_modify
/data/home/jindyliu/flink-demo/flink-1.11.2/bin/flink  -s savepoint  -p 40
job  下
  ==> test_status中的数据不正常, id = 1,2的两条数据缺失:
    0, name0, 2020-07-06 00:00:00 , 0, status0


怀疑与cdc的变化流在多个并行度下sink输出流的时候,先执行了 insert id=1再执行delete id=1的动作,导致数据有问题!!!

这里是不是bug?还是从save point里恢复的时候,算子的状态有问题?
如果是,能不能在sink的时候,只把sink这里的并行度设置为1??







--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复