Hi sunfulin, Is this the real query you submit? AFAIK, insert with column list is not allowed for now, i.e. the `INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt)`.
Could you attach the full SQL text, including DDLs of ES6_ZHANGLE_OUTPUT table and kafka_zl_etrack_event_stream table. If you have a minimal program that can reproduce this problem, that would be great. Best, Jark On Fri, 14 Feb 2020 at 22:53, Robert Metzger <rmetz...@apache.org> wrote: > > > ---------- Forwarded message --------- > From: sunfulin <sunfulin0...@163.com> > Date: Fri, Feb 14, 2020 at 2:59 AM > Subject: Re:Flink 1.10 es sink exception > To: user@flink.apache.org <user@flink.apache.org> > > > Anyone can share a little advice on the reason of this exception? I > changed to use old planner, the same sql runs well. > > > > > > At 2020-02-13 16:07:18, "sunfulin" <sunfulin0...@163.com> wrote: > > Hi, guys > When running the same Flink sql like the following, I met exception like > "org.apache.flink.table.api.TableException: UpsertStreamTableSink requires > that Table has a full primary keys if it is updated". I am using the latest > Flink 1.10 release with blink planner enabled. Because the same logic runs > well within Flink 1.8.2 old planner. Does the SQL usage has some problem or > may has a bug here ? > > > INSERT INTO ES6_ZHANGLE_OUTPUT(aggId, pageId, ts, expoCnt, clkCnt) > SELECT aggId, pageId, ts_min as ts, > count(case when eventId = 'exposure' then 1 else null end) as expoCnt, > count(case when eventId = 'click' then 1 else null end) as clickCnt > FROM > ( > SELECT > 'ZL_001' as aggId, > pageId, > eventId, > recvTime, > ts2Date(recvTime) as ts_min > from kafka_zl_etrack_event_stream > where eventId in ('exposure', 'click') > ) as t1 > group by aggId, pageId, ts_min > > > > > > > > >