I noticed that has been significant work on the SQL / Table subsystem and decided to evaluate it for one of our use cases. The use case requires the joining of two streams, which can be considered a stream of table upserts. Critically, when joining the streams, we only want to join against the latest value per key in one of the tables/streams.
Simply performing a join between the stream/tables is not sufficient, as it will generate result of records other than the latest one. E.g. if you have two steam/tables with schema: Telemetry [ tstamp: Long item: String score: Int source: String ] Scores [ tstamp: Long item: String score: Int ] tableEnv.sqlQuery(""" SELECT s.tstamp, s.item, s.score, t.source FROM Telemetry t INNER JOIN Scores s ON s.item = t.item WHERE s.score <> t.score AND s.tstamp >= t.tstamp """) If the stream receives 3 records from the telemetry stream for the same source and then a record that matches the item from the score stream that updates the score, it will generate three output records, even though we only want the latest record from the source to be considered. If this were a regular database we could do the following to only get the latest records with the telemetry table: tableEnv.sqlQuery(""" SELECT a.tstamp, a.item, a.score, a.source FROM Telemetry a INNER JOIN ( SELECT MAX(tstamp), item, source FROM Telemetry GROUP BY item, source ) b ON a.item = b.item AND a.source = b.source """) and then execute the previous query against this LatestTelemetry table instead of Telemetry. But that does not work. The query executed within Flink, but still outputs multiple records, regardless of the order the records come into the source streams. I am wondering if there is a way to accomplish this within Flink's SQL/Table abstractions. Kafka Streams has the concept of a KTable, where records are considered upserts and update previous records that have the same key. Thus, when you join against a KTable, you only join against the latest record for a given key, rather than all previous records for the key. Thoughts?