Hi Gyula, I'm afraid conversion to see the retractions vs inserts can't be done in pure SQL (though I'd love that feature).
You might want to go lower level and implement a RetractStreamTableSink [1][2] that you would wrap around a KafkaTableSink [3]. This will give you a emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);, in which the Boolean flag will give you an 'accumulate' or 'retract' signal. You can then filter the DataStream accordingly before passing to the KafkaTableSink. Hope this helps. Best regards Benoît [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink [3] https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > Hi Roman, > > This is the core logic: > > CREATE TABLE QueryResult ( > queryId BIGINT, > itemId STRING, > quantity INT > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'query.output.log.1', > 'connector.properties.bootstrap.servers' = '<broker>', > 'format.type' = 'json' > ); > > INSERT INTO QueryResult > SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity > FROM > ItemTransactions AS t, > Queries AS q > WHERE > t.itemId = q.itemId AND > t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time > GROUP BY > t.itemId, q.event_time, q.queryId; > > And the error I get is: > org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL > update statement. > at > org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527) > at > org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548) > at > org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310) > at java.util.Optional.ifPresent(Optional.java:159) > at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211) > at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) > Caused by: org.apache.flink.table.api.TableException: > AppendStreamTableSink requires that Table has only insert changes. > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) > > I am wondering what could I do to just simply pump the result updates to > Kafka here. > > Gyula > > On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman < > khachatryan.ro...@gmail.com> wrote: > >> Hi Gyula, >> >> Could you provide the code of your Flink program, the error with >> stacktrace and the Flink version? >> >> Thanks., >> Roman >> >> >> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gyula.f...@gmail.com> wrote: >> >>> Hi All! >>> >>> Excuse my stupid question, I am pretty new to the Table/SQL API and I am >>> trying to play around with it implementing and running a few use-cases. >>> >>> I have a simple window join + aggregation, grouped on some id that I >>> want to write to Kafka but I am hitting the following error: >>> >>> "AppendStreamTableSink requires that Table has only insert changes." >>> >>> If I understand correctly the problem here is that since updates are >>> possible within a single group, we have a retract stream and the Kafka Sink >>> cannot handle that. I tried to search for the solution but I haven't found >>> any satisfying answers. >>> >>> How can I simply tell the INSERT logic to ignore previous values and >>> just always keep sending the latest (like you would see it on the CLI >>> output). >>> >>> Thank you! >>> Gyula >>> >> -- Benoît Paris Ingénieur Machine Learning Explicable Tél : +33 6 60 74 23 00 http://benoit.paris http://explicable.ml