Thanks Benoît! I can see now how I can implement this myself through the provided sink interfaces but I was trying to avoid having to write code for this :D My initial motivation was to see whether we are able to write out any kind of table to Kafka as a simple stream of "upserts".
I also don't completely understand at this point why I can write the result of a group, tumble window aggregate to Kafka and not this window join / aggregate. Cheers, Gyula On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris < benoit.pa...@centraliens-lille.org> wrote: > Hi Gyula, > > I'm afraid conversion to see the retractions vs inserts can't be done in > pure SQL (though I'd love that feature). > > You might want to go lower level and implement a RetractStreamTableSink > [1][2] that you would wrap around a KafkaTableSink [3]. This will give you > a emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);, in which the > Boolean flag will give you an 'accumulate' or 'retract' signal. > You can then filter the DataStream accordingly before passing to the > KafkaTableSink. > > Hope this helps. > > Best regards > Benoît > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink > [3] > https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html > > On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > >> Hi Roman, >> >> This is the core logic: >> >> CREATE TABLE QueryResult ( >> queryId BIGINT, >> itemId STRING, >> quantity INT >> ) WITH ( >> 'connector.type' = 'kafka', >> 'connector.version' = 'universal', >> 'connector.topic' = 'query.output.log.1', >> 'connector.properties.bootstrap.servers' = '<broker>', >> 'format.type' = 'json' >> ); >> >> INSERT INTO QueryResult >> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity >> FROM >> ItemTransactions AS t, >> Queries AS q >> WHERE >> t.itemId = q.itemId AND >> t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time >> GROUP BY >> t.itemId, q.event_time, q.queryId; >> >> And the error I get is: >> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL >> update statement. >> at >> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697) >> at >> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576) >> at >> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527) >> at >> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548) >> at >> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310) >> at java.util.Optional.ifPresent(Optional.java:159) >> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211) >> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) >> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) >> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) >> Caused by: org.apache.flink.table.api.TableException: >> AppendStreamTableSink requires that Table has only insert changes. >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123) >> at >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >> at >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >> >> I am wondering what could I do to just simply pump the result updates to >> Kafka here. >> >> Gyula >> >> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman < >> khachatryan.ro...@gmail.com> wrote: >> >>> Hi Gyula, >>> >>> Could you provide the code of your Flink program, the error with >>> stacktrace and the Flink version? >>> >>> Thanks., >>> Roman >>> >>> >>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gyula.f...@gmail.com> wrote: >>> >>>> Hi All! >>>> >>>> Excuse my stupid question, I am pretty new to the Table/SQL API and I >>>> am trying to play around with it implementing and running a few use-cases. >>>> >>>> I have a simple window join + aggregation, grouped on some id that I >>>> want to write to Kafka but I am hitting the following error: >>>> >>>> "AppendStreamTableSink requires that Table has only insert changes." >>>> >>>> If I understand correctly the problem here is that since updates are >>>> possible within a single group, we have a retract stream and the Kafka Sink >>>> cannot handle that. I tried to search for the solution but I haven't found >>>> any satisfying answers. >>>> >>>> How can I simply tell the INSERT logic to ignore previous values and >>>> just always keep sending the latest (like you would see it on the CLI >>>> output). >>>> >>>> Thank you! >>>> Gyula >>>> >>> > > -- > Benoît Paris > Ingénieur Machine Learning Explicable > Tél : +33 6 60 74 23 00 > http://benoit.paris > http://explicable.ml >