假设Code X,第一条数据X.Amount=0,第二条数据X.Amount=100,第三条数据X.Amount=200
1、由于Code是主键,table中每次仅保留了第最新那条X的数据,因此select sum(X.Amount) from table的输出是 :0, 
100, 200
2、我定义UDAF中,对于同一个Code X来说,在accumulate方法中每次都会执行acc.lastAmount = 
Amount去更新acc的状态,但从结果来看,对于同一个Code X,每一次进入方法acc.lastAmount都是0? 也是因为表中仅保留一条Code 
X的数据的关系吗?


那在upsert kafka table中(Code X只保留最新一条数据),假设要累加Code 
X的Amount,期望的输出是:0,100,300...,应该如何实现?
求大佬解惑><

















在 2020-12-10 13:47:57,"Jark Wu" <imj...@gmail.com> 写道:
>因为你的 upsert kafka table 的 pk 是 code,所以 code 分组下,数据已经是唯一的了 (一个 code
>下,只有一行,取最后一行作为最新数据)。估计你同样的 code 下,amount 值是一样的,所以 sum(amount) 自然不会变化。
>
>Best,
>Jark
>
>On Thu, 10 Dec 2020 at 12:36, bulterman <15618338...@163.com> wrote:
>
>> // kafka table
>> tableEnv.execuetSql("CREATE TABLE  market_stock(\n" +
>>
>>                 "    Code STRING,\n" +
>>
>>                 "    Amount BIGINT,\n" +
>>
>>                 ......
>>
>>                 "    PRIMARY KEY (Code) NOT ENFORCED\n" +
>>
>>                 ") WITH (\n" +
>>
>>                 "    'connector' = 'upsert-kafka',\n" +
>>
>>                 "    'topic' = 'zzz',\n" +
>>
>>                 "    'properties.bootstrap.servers' = '10.0.3.20:9092,
>> 10.0.3.24:9092,10.0.3.26:9092',\n" +
>>
>>                 "    'properties.group.id' = 'sqltest46',\n" +
>>
>>                 "    'key.format' = 'raw',\n" +
>>
>>                 "    'value.format' = 'json'\n" +
>>
>>                 ")");
>> // 使用UDAF计算
>> Table table = bsTableEnv.sqlQuery("select
>> Code,MainFundFlowFunc(Amount,AskPrice1,BidPrice1,Last) from market_stock
>> GROUP BY Code");
>> env.toRetractStream(table,Row.class).print();
>>
>>
>> // UDAF的定义如下
>> public class MainFundFlowFunc extends AggregateFunction<Row, AmountAccum> {
>>     @Override
>>     public Row getValue(AmountAccum acc) {
>>         Long mf = acc.ebb + acc.bb - acc.ebs - acc.bs;
>>         double mfr = acc.lastAmount > 0 ?
>> MyNumericCalculator.divide(mf,acc.lastAmount,2).doubleValue() : 0.0;
>>         return Row.of(acc.ebs,acc.bs,acc.ms,acc.ss,acc.ebb,acc.bb,acc.mb,
>> acc.sb,mf,mfr);
>>     }
>>     @Override
>>     public AmountAccum createAccumulator() {
>>         return new AmountAccum();
>>     }
>>
>>     public void accumulate(AmountAccum acc, Long amount, Double askPrice1,
>> Double bidPrice1, Double last) {
>>         //......
>>        acc.lastAmount = amount;
>>         acc.lastAskPrice1 = askPrice1;
>>         acc.lastBidPrice1 = bidPrice1;
>>     }
>>     public void retract(AmountAccum acc, Long amount, Double askPrice1,
>> Double bidPrice1, Double last) {
>>         acc.lastAmount = amount;
>>         acc.lastAskPrice1 = askPrice1;
>>         acc.lastBidPrice1 = bidPrice1;
>>     }
>>
>> }
>>
>>
>>
>>
>> // acc
>> public class AmountAccum {
>>     public Double lastAskPrice1;
>>     public Double lastBidPrice1;
>>
>>     public Long lastAmount = 0L;
>>
>>     public Long ebs = 0L;
>>
>>     public Long bs = 0L;
>>
>>     public Long ms = 0L;
>>
>>     public Long ss = 0L;
>>
>>     public Long ebb = 0L;
>>
>>     public Long bb = 0L;
>>
>>     public Long mb = 0L;
>>
>>     public Long sb = 0L;
>> }
>>
>>
>> debug观察acc的lastAmount值,一直是0.
>>
>>
>> 刚才试了一下用sum()函数,执行select Code,sum(Amount) from market_stock GROUP BY
>> Code,发现并没有累加Amount字段的值,每一次输出都是最新的那个Amount值。
>> 是我的使用姿势不对吗= =
>>
>> 在 2020-12-10 11:30:31,"Jark Wu" <imj...@gmail.com> 写道:
>> >可以发下代码吗?
>> >
>> >On Thu, 10 Dec 2020 at 11:19, bulterman <15618338...@163.com> wrote:
>> >
>> >> 上游是upsert-kafka connector 创建的table,
>> >> 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的
>> >> (为了测试方便,table里只有同一个PK的数据)
>>

回复