hi,大家好
我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
kakfa输入:
账号 金额 笔数
1111 100 1  -> 未匹配
1111 100 1  -> 未匹配
1111 100 1  -> 匹配上

维表 
账号  企业
2222  BBBB
1111  AAAA   -> 后插入的账号信息
实际输出结果
企业  金额  笔数
AAAA 100   1


我想要的结果:
企业  金额  笔数
AAAA 300   3





sql如下:
String sql2 =  "insert into dws_b2b_trade_year_index\n" +
               "WITH temp AS (\n" +
               "select \n" +
               "  ta.gmtStatistical as gmtStatistical,\n" +
               "  ta.paymentMethod as paymentMethod,\n" +
               "  tb.CORP_ID as outCorpId,\n" +
               "  tc.CORP_ID as inCorpId,\n" +
               "  sum(ta.tradeAmt) as tranAmount,\n" +
               "  sum(ta.tradeCnt) as tranNum \n" +
               "from dws_a2a_trade_year_index ta \n" +
               "left join dob_dim_account for system_time as of ta.proc as tb 
on ta.outAcctCode = tb.ACCT_CODE \n" +
               "left join dob_dim_account for system_time as of ta.proc as tc 
on ta.inAcctCode = tc.ACCT_CODE \n" +
               "group by \n" +
               " ta.gmtStatistical, \n" +
               " ta.paymentMethod, \n" +
               " tb.CORP_ID, \n" +
               " tc.CORP_ID \n" +
               ") \n" +
               "SELECT \n" +
               "   DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss') as 
gmtUpdate, \n" +
               "   gmtStatistical, \n" +
               "   paymentMethod, \n" +
               "   outCorpId, \n" +
               "   inCorpId, \n" +
               "   tranAmount, \n" +
               "   tranNum \n" +
               "FROM temp";

| |
Jason_H
|
|
hyb_he...@163.com
|

回复