Hi Roman, This is the core logic:
CREATE TABLE QueryResult ( queryId BIGINT, itemId STRING, quantity INT ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'query.output.log.1', 'connector.properties.bootstrap.servers' = '<broker>', 'format.type' = 'json' ); INSERT INTO QueryResult SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity FROM ItemTransactions AS t, Queries AS q WHERE t.itemId = q.itemId AND t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time GROUP BY t.itemId, q.event_time, q.queryId; And the error I get is: org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL update statement. at org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697) at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576) at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527) at org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548) at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310) at java.util.Optional.ifPresent(Optional.java:159) at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211) at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) Caused by: org.apache.flink.table.api.TableException: AppendStreamTableSink requires that Table has only insert changes. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) I am wondering what could I do to just simply pump the result updates to Kafka here. Gyula On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman < khachatryan.ro...@gmail.com> wrote: > Hi Gyula, > > Could you provide the code of your Flink program, the error with > stacktrace and the Flink version? > > Thanks., > Roman > > > On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > >> Hi All! >> >> Excuse my stupid question, I am pretty new to the Table/SQL API and I am >> trying to play around with it implementing and running a few use-cases. >> >> I have a simple window join + aggregation, grouped on some id that I want >> to write to Kafka but I am hitting the following error: >> >> "AppendStreamTableSink requires that Table has only insert changes." >> >> If I understand correctly the problem here is that since updates are >> possible within a single group, we have a retract stream and the Kafka Sink >> cannot handle that. I tried to search for the solution but I haven't found >> any satisfying answers. >> >> How can I simply tell the INSERT logic to ignore previous values and just >> always keep sending the latest (like you would see it on the CLI output). >> >> Thank you! >> Gyula >> >