> I also don't completely understand at this point why I can write the result of a group, tumble window aggregate to Kafka and not this window join / aggregate.
If you are doing a tumble window aggregate with watermark enabled, Flink will only fire a final result for each window at once, no modification or retractions will happen after a window is calculated and fired. But with some other arbitrary aggregations, there is not enough information for Flink to determine whether the data is complete or not, so the framework will keep calculating results when receiving new records and retract earlier results by firing retraction/deletion messages. Best, Kurt On Thu, Mar 5, 2020 at 10:13 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > Thanks Benoît! > > I can see now how I can implement this myself through the provided sink > interfaces but I was trying to avoid having to write code for this :D > My initial motivation was to see whether we are able to write out any kind > of table to Kafka as a simple stream of "upserts". > > I also don't completely understand at this point why I can write the > result of a group, tumble window aggregate to Kafka and not this window > join / aggregate. > > Cheers, > Gyula > > On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris < > benoit.pa...@centraliens-lille.org> wrote: > >> Hi Gyula, >> >> I'm afraid conversion to see the retractions vs inserts can't be done in >> pure SQL (though I'd love that feature). >> >> You might want to go lower level and implement a RetractStreamTableSink >> [1][2] that you would wrap around a KafkaTableSink [3]. This will give you >> a emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);, in which the >> Boolean flag will give you an 'accumulate' or 'retract' signal. >> You can then filter the DataStream accordingly before passing to the >> KafkaTableSink. >> >> Hope this helps. >> >> Best regards >> Benoît >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink >> [3] >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html >> >> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gyula.f...@gmail.com> wrote: >> >>> Hi Roman, >>> >>> This is the core logic: >>> >>> CREATE TABLE QueryResult ( >>> queryId BIGINT, >>> itemId STRING, >>> quantity INT >>> ) WITH ( >>> 'connector.type' = 'kafka', >>> 'connector.version' = 'universal', >>> 'connector.topic' = 'query.output.log.1', >>> 'connector.properties.bootstrap.servers' = '<broker>', >>> 'format.type' = 'json' >>> ); >>> >>> INSERT INTO QueryResult >>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity >>> FROM >>> ItemTransactions AS t, >>> Queries AS q >>> WHERE >>> t.itemId = q.itemId AND >>> t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND >>> q.event_time >>> GROUP BY >>> t.itemId, q.event_time, q.queryId; >>> >>> And the error I get is: >>> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL >>> update statement. >>> at >>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697) >>> at >>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576) >>> at >>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527) >>> at >>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548) >>> at >>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310) >>> at java.util.Optional.ifPresent(Optional.java:159) >>> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211) >>> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) >>> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) >>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) >>> Caused by: org.apache.flink.table.api.TableException: >>> AppendStreamTableSink requires that Table has only insert changes. >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123) >>> at >>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48) >>> at >>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) >>> >>> I am wondering what could I do to just simply pump the result updates to >>> Kafka here. >>> >>> Gyula >>> >>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman < >>> khachatryan.ro...@gmail.com> wrote: >>> >>>> Hi Gyula, >>>> >>>> Could you provide the code of your Flink program, the error with >>>> stacktrace and the Flink version? >>>> >>>> Thanks., >>>> Roman >>>> >>>> >>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gyula.f...@gmail.com> wrote: >>>> >>>>> Hi All! >>>>> >>>>> Excuse my stupid question, I am pretty new to the Table/SQL API and I >>>>> am trying to play around with it implementing and running a few use-cases. >>>>> >>>>> I have a simple window join + aggregation, grouped on some id that I >>>>> want to write to Kafka but I am hitting the following error: >>>>> >>>>> "AppendStreamTableSink requires that Table has only insert changes." >>>>> >>>>> If I understand correctly the problem here is that since updates are >>>>> possible within a single group, we have a retract stream and the Kafka >>>>> Sink >>>>> cannot handle that. I tried to search for the solution but I haven't found >>>>> any satisfying answers. >>>>> >>>>> How can I simply tell the INSERT logic to ignore previous values and >>>>> just always keep sending the latest (like you would see it on the CLI >>>>> output). >>>>> >>>>> Thank you! >>>>> Gyula >>>>> >>>> >> >> -- >> Benoît Paris >> Ingénieur Machine Learning Explicable >> Tél : +33 6 60 74 23 00 >> http://benoit.paris >> http://explicable.ml >> >