Hi Fabian, Could you give an example that the query has a unique key? What is the mechanism flink infer which field is the unique key(s)? Thanks a lot!
Best, Henry > 在 2018年8月11日,上午5:21,Fabian Hueske <fhue...@gmail.com> 写道: > > Hi Henry, > > The problem is that the table that results from the query does not have a > unique key. > You can only use an upsert sink if the table has a (composite) unique key. > Since this is not the case, you cannot use upsert sink. > However, you can implement a StreamRetractionTableSink which allows to write > any kind of Table (append-only/update, keyed/non-keyed) to an external system. > > Best, Fabian > > 2018-08-10 17:06 GMT+02:00 徐涛 <happydexu...@gmail.com > <mailto:happydexu...@gmail.com>>: > Hi All, > I am using flink 1.6 to generate some realtime programs. I want to > write the output to table sink, the code is as below. At first I use append > table sink, which error message tells me that I should use upsert table sink, > so I write one. But still another error “Caused by: > org.apache.flink.table.api.TableException: UpsertStreamTableSink requires > that Table has a full primary keys if it is updated.” comes out,which blocks > me. My questions is how to modify a table keys in this scenario? I also check > the exception stack, and found that the system infer the keys field by > val tableKeys: Option[Array[String]] = > UpdatingPlanChecker.getUniqueKeyFields(optimizedPlan), I wonder how to make > the function return value ? > Thanks a lot !!! > var praise = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as PU FROM > praise GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' > DAY),article_id" ) > tableEnv.registerTable("praiseAggr", praise) > > var comment = tableEnv.sqlQuery(s"SELECT article_id,hll(from_uid) as CU > FROM comment GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' > DAY),article_id" ) > tableEnv.registerTable("commentAggr", comment) > > var reader = tableEnv.sqlQuery(s"SELECT article_id,hll(uid) as RU FROM > reader GROUP BY HOP(updated_time, INTERVAL '1' MINUTE,INTERVAL '3' > DAY),article_id" ) > tableEnv.registerTable("readerAggr", reader) > > var finalTable = tableEnv.sqlUpdate(s"insert into "+ sinkTableName + " " > + " SELECT p.article_id,p.PU,c.CU,r.RU from praiseAggr p FULL OUTER JOIN > commentAggr c on p.article_id=c.article_id FULL OUTER JOIN readerAggr r on > c.article_id=r.article_id") > > > > > > Thank, > Henry Xu >