你好:
        我在使用flink 
1.11.2版本的时候使用flinksql处理两条流。因为两条流都是数据库变更信息,我需要取最新的数据关联;所以分别对两条流做row_number=1
(SELECT [column_list] FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY time_attr [asc|desc]) AS rownum
   FROM table_name) 
WHERE rownum = 1)
去重后再左关联; 前期当左流变更都没有问题,结果符合预期;当右流有数据时,第一条数据也符合预期,但是右流在发送一条数据出现变更时,出现了一条不符合预期的数据;
        
left> 
(true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
 15:59:50,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
right> (true,3774bca649224249bdbcb8e7c80b52f9,1,0,8,1607932790000)
left> 
(false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
 15:59:50,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
left> 
(true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
 15:59:50,4f28c1211e274bba819cc63680a3b386,1,0,8,1607932790000)
left> 
(false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,,2020-12-14
 15:59:50,4f28c1211e274bba819cc63680a3b386,1,0,8,1607932790000)
left> 
(true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,8,1607932790000)
right> (false,3774bca649224249bdbcb8e7c80b52f9,1,0,8,1607932790000)
right> (true,3774bca649224249bdbcb8e7c80b52f9,1,0,1,1607933006000)
left> 
(false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,8,1607932790000)
left> 
(true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
left> 
(false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
left> 
(true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,1,0,1,1607933006000)

第1行左流来了数据显示true,此时右流没有数据结果是null;
第2行右流来了数据,显示为true(单独打印了右流的结果);
第3行显示左流撤回;
第4行 左右流数据关联上,正常显示;
第5行 左流数据变更,数据撤回;
第6行 显示变更后的数据;
第7行 右流数据变化,数据撤回;
第8行 显示右流最新的结果;
第9行 因为右流数据变化 所以左流(关联数据)撤回;
第10行 和第11 行 不符合预期;
正常应该是 右流发生变化 第9行 因为右流数据变化 所以左流(关联数据)撤回;然后右流的最新数据和左流产生结果;显示第12行数据才对;
所以想请教一下大家;

1607998361520> 
(true,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)
1607998361520> 
(false,3774bca649224249bdbcb8e7c80b52f9,TS202012141551420518,3,2a30402475dd4d89b461d2e457b297f0,NP02,NP020101,NP020202,yangkg001,yangkg001,null,4f28c1211e274bba819cc63680a3b386,null,null,null,null)

我的sql语句如下
String sql = "SELECT a.sheetId 
sheetId,sheetCode,sheetStatus,sheetType,sheetScene,sheetObject," +
        " 
sheetPresentation,sheetAcceptor,sheetHandler,updateTime,dealTaskId,provided,satisfied,score,operateTime
 " +
        " from (SELECT 
sheetId,sheetCode,sheetStatus,sheetType,sheetScene,sheetObject," +
        " sheetPresentation,sheetAcceptor,sheetHandler,updateTime,dealTaskId" +
        " FROM (SELECT *," +
        "     ROW_NUMBER() OVER (PARTITION BY sheetId ORDER BY operateTime 
desc) AS rownum " +
        "   FROM sheetMain)" +
        " WHERE rownum = 1 ) a" +
        " left JOIN " +
        " (select sheetId,provided,satisfied,score,operateTime from (SELECT *," 
+
        "     ROW_NUMBER() OVER (PARTITION BY sheetId ORDER BY operateTime 
desc) AS rownum " +
        "   FROM sheetAnswers)" +
        " WHERE rownum = 1 ) c" +
        " ON a.sheetId = c.sheetId " ;



[email protected]

回复