Hello,

I am using Flink as the query engine to build an alerting/monitoring
application. One of the use cases in our product requires continuously
tracking and charting the output of an aggregate only SQL query,
for example, select sum(revenue) from lineorder. A desirable property from
the output of Flink job for such a query is that there is always exactly 1
row in the result set (or that the number of rows does not fall to 0 due to
retractions for previous output).  In other words, I need upsert "like"
semantics for the output of the query.

I was hopeful after reading comments in UpsertStreamTableSink.java that
this condition is accounted for in the implementation, however, a pipeline
with above query writing to a concrete UpsertStreamTableSink fails with the
following error  - "UpsertStreamTableSink requires that Table has" + " a
full primary keys if it is updated." Here are the relevant comments from
UpsertStreamTableSink.java for reference -

```
Configures the unique key fields of the {@link Table} to write. The method
is called after {@link TableSink#configure(String[], TypeInformation[])}.

<p>The keys array might be empty, if the table consists of a single
(updated) record. If the table does not have a key and is append-only, the
keys attribute is null.

@param keys the field names of the table's keys, an empty array if the
table has a single row, and null if the table is append-only and has no key.
void setKeyFields(String[] keys);
```

The code in StreamExec(Legacy)Sink.scala appears to conform to observed
failure and does not match the comment about "empty key array if the table
consists of a single record".

 With that context, I have the following questions -

1. Is the UpsertStreamTableSink expected to consume the output of such
aggregate only queries? Or is my interpretation of the code and comment
wrong and I have misconfigured UpsertStreamTableSink?
2. If the answer to (1) is no, are there any recommended patterns for
solving this use-case such that the client never observes an empty result
set for the output of this query?

Regards,
Satyam

Reply via email to