Hi Roman,

This is the core logic:

CREATE TABLE QueryResult (
queryId    BIGINT,
  itemId    STRING,
  quantity INT
) WITH (
'connector.type'     = 'kafka',
'connector.version' = 'universal',
'connector.topic'   = 'query.output.log.1',
'connector.properties.bootstrap.servers' = '<broker>',
'format.type' = 'json'
);

INSERT INTO QueryResult
SELECT q.queryId, t.itemId, sum(t.quantity) AS quantity
FROM
  ItemTransactions AS t,
  Queries AS q
WHERE
  t.itemId = q.itemId AND
  t.event_time BETWEEN q.event_time - INTERVAL '5' SECOND AND q.event_time
GROUP BY
  t.itemId, q.event_time, q.queryId;

And the error I get is:
org.apache.flink.table.client.gateway.SqlExecutionException: Invalid SQL
update statement.
at
org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:697)
at
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:576)
at
org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:527)
at
org.apache.flink.table.client.cli.CliClient.callInsert(CliClient.java:548)
at
org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:310)
at java.util.Optional.ifPresent(Optional.java:159)
at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
Caused by: org.apache.flink.table.api.TableException: AppendStreamTableSink
requires that Table has only insert changes.
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:123)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)

I am wondering what could I do to just simply pump the result updates to
Kafka here.

Gyula

On Thu, Mar 5, 2020 at 2:37 PM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi Gyula,
>
> Could you provide the code of your Flink program, the error with
> stacktrace and the Flink version?
>
> Thanks.,
> Roman
>
>
> On Thu, Mar 5, 2020 at 2:17 PM Gyula Fóra <gyula.f...@gmail.com> wrote:
>
>> Hi All!
>>
>> Excuse my stupid question, I am pretty new to the Table/SQL API and I am
>> trying to play around with it implementing and running a few use-cases.
>>
>> I have a simple window join + aggregation, grouped on some id that I want
>> to write to Kafka but I am hitting the following error:
>>
>> "AppendStreamTableSink requires that Table has only insert changes."
>>
>> If I understand correctly the problem here is that since updates are
>> possible within a single group, we have a retract stream and the Kafka Sink
>> cannot handle that. I tried to search for the solution but I haven't found
>> any satisfying answers.
>>
>> How can I simply tell the INSERT logic to ignore previous values and just
>> always keep sending the latest (like you would see it on the CLI output).
>>
>> Thank you!
>> Gyula
>>
>

Reply via email to