Thanks Benoît!

I can see now how I can implement this myself through the provided sink
interfaces but I was trying to avoid having to write code for this :D
My initial motivation was to see whether we are able to write out any kind
of table to Kafka as a simple stream of "upserts".

I also don't completely understand at this point why I can write the result
of a group, tumble window aggregate to Kafka and not this window join /
aggregate.

Cheers,
Gyula

On Thu, Mar 5, 2020 at 3:00 PM Benoît Paris <
benoit.pa...@centraliens-lille.org> wrote:

> Hi Gyula,
>
> I'm afraid conversion to see the retractions vs inserts can't be done in
> pure SQL (though I'd love that feature).
>
> You might want to go lower level and implement a RetractStreamTableSink
> [1][2] that you would wrap around a KafkaTableSink [3]. This will give you
> a emitDataStream(DataStream<Tuple2<Boolean, T>> dataStream);, in which the
> Boolean flag will give you an 'accumulate' or 'retract' signal.
> You can then filter the DataStream accordingly before passing to the
> KafkaTableSink.
>
> Hope this helps.
>
> Best regards
> Benoît
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/sinks/RetractStreamTableSink.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html#retractstreamtablesink
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.html
>
> On Thu, Mar 5, 2020 at 2:50 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
>
>> Hi Roman,
>>
>> This is the core logic:
>>
>> CREATE TABLE QueryResult (
>> queryId    BIGINT,
>>   itemId    STRING,
>>   quantity INT
>> ) WITH (
>> 'connector.type'     = 'kafka',
>> 'connector.version' = 'universal',
>> 'connector.topic'   = 'query.output.log.1',
>> 'connector.properties.bootstrap.servers' = '<broker>',
>> 'format.type' = 'json'
>> );
>>
>> INSERT INTO QueryResult
>> SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
>> FROM
>>   ItemTransactions AS t,
>>   Queries AS q
>> WHERE
>>   t.itemId = q.itemId AND
>>   t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time
>> GROUP BY
>>   t.itemId, q.event_time, q.queryId;
>>
>> And the error I get is:
>> org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL
>> update statement.
>> at
>> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
>> at
>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
>> at
>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
>> at
>> org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
>> at
>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
>> at java.util.Optional.ifPresent(Optional.java:159)
>> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
>> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
>> Caused by: org.apache.flink.table.api.TableException:
>> AppendStreamTableSink requires that Table has only insert changes.
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>
>> I am wondering what could I do to just simply pump the result updates to
>> Kafka here.
>>
>> Gyula
>>
>> On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hi Gyula,
>>>
>>> Could you provide the code of your Flink program, the error with
>>> stacktrace and the Flink version?
>>>
>>> Thanks.,
>>> Roman
>>>
>>>
>>> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
>>>
>>>> Hi All!
>>>>
>>>> Excuse my stupid question, I am pretty new to the Table/SQL API and I
>>>> am trying to play around with it implementing and running a few use-cases.
>>>>
>>>> I have a simple window join + aggregation, grouped on some id that I
>>>> want to write to Kafka but I am hitting the following error:
>>>>
>>>> "AppendStreamTableSink requires that Table has only insert changes."
>>>>
>>>> If I understand correctly the problem here is that since updates are
>>>> possible within a single group, we have a retract stream and the Kafka Sink
>>>> cannot handle that. I tried to search for the solution but I haven't found
>>>> any satisfying answers.
>>>>
>>>> How can I simply tell the INSERT logic to ignore previous values and
>>>> just always keep sending the latest (like you would see it on the CLI
>>>> output).
>>>>
>>>> Thank you!
>>>> Gyula
>>>>
>>>
>
> --
> Benoît Paris
> Ingénieur Machine Learning Explicable
> Tél : +33 6 60 74 23 00
> http://benoit.paris
> http://explicable.ml
>

Reply via email to