Hi Gyula,

I'm afraid conversion to see the retractions vs inserts can't be done in
pure SQL (though I'd love that feature).

You might want to go lower level and implement a RetractStreamTableSink
[1][2] that you would wrap around a KafkaTableSink [3]. This will give you
a emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);, in which the
Boolean flag will give you an 'accumulate' or 'retract' signal.
You can then filter the DataStream accordingly before passing to the
KafkaTableSink.

Hope this helps.

Best regards
Benoît

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html

On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gyula.f...@gmail.com> wrote:

> Hi Roman,
>
> This is the core logic:
>
> CREATE TABLE QueryResult (
> queryId    BIGINT,
>   itemId    STRING,
>   quantity INT
> ) WITH (
> 'connector.type'     = 'kafka',
> 'connector.version' = 'universal',
> 'connector.topic'   = 'query.output.log.1',
> 'connector.properties.bootstrap.servers' = '<broker>',
> 'format.type' = 'json'
> );
>
> INSERT INTO QueryResult
> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
> FROM
>   ItemTransactions AS t,
>   Queries AS q
> WHERE
>   t.itemId = q.itemId AND
>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time
> GROUP BY
>   t.itemId, q.event_time, q.queryId;
>
> And the error I get is:
> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL
> update statement.
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
> at
> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
> at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
> at java.util.Optional.ifPresent(Optional.java:159)
> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
> Caused by: org.apache.flink.table.api.TableException:
> AppendStreamTableSink requires that Table has only insert changes.
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>
> I am wondering what could I do to just simply pump the result updates to
> Kafka here.
>
> Gyula
>
> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Gyula,
>>
>> Could you provide the code of your Flink program, the error with
>> stacktrace and the Flink version?
>>
>> Thanks.,
>> Roman
>>
>>
>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
>>
>>> Hi All!
>>>
>>> Excuse my stupid question, I am pretty new to the Table/SQL API and I am
>>> trying to play around with it implementing and running a few use-cases.
>>>
>>> I have a simple window join + aggregation, grouped on some id that I
>>> want to write to Kafka but I am hitting the following error:
>>>
>>> "AppendStreamTableSink requires that Table has only insert changes."
>>>
>>> If I understand correctly the problem here is that since updates are
>>> possible within a single group, we have a retract stream and the Kafka Sink
>>> cannot handle that. I tried to search for the solution but I haven't found
>>> any satisfying answers.
>>>
>>> How can I simply tell the INSERT logic to ignore previous values and
>>> just always keep sending the latest (like you would see it on the CLI
>>> output).
>>>
>>> Thank you!
>>> Gyula
>>>
>>

-- 
Benoît Paris
Ingénieur Machine Learning Explicable
Tél : +33 6 60 74 23 00
http://benoit.paris
http://explicable.ml

Reply via email to