Hi,

I have been trying to write a temporal join in SQL done on a rolling aggregate 
view. However it does not work and throws :

org.apache.flink.table.api.ValidationException: Event-Time Temporal Table Join 
requires both primary key and row time attribute in versioned table, but no row 
time attribute can be found.

It seems that after the aggregation, the table looses the watermark and it's 
not possible to add one with the SQL API as it's a view.

CREATE TABLE orders (
    order_id INT,
    price DECIMAL(6, 2),
    currency_id INT,
    order_time AS NOW(),
    WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND
)
    WITH (
        'connector' = 'datagen',
        'rows-per-second' = '10',
        'fields.order_id.kind' = 'sequence',
        'fields.order_id.start' = '1',
        'fields.order_id.end' = '100000',
        'fields.currency_id.min' = '1',
        'fields.currency_id.max' = '20'
    );

CREATE TABLE currency_rates (
    currency_id INT,
    conversion_rate DECIMAL(4, 3),
    PRIMARY KEY (currency_id) NOT ENFORCED
)
    WITH (
        'connector' = 'datagen',
        'rows-per-second' = '10',
        'fields.currency_id.min' = '1',
        'fields.currency_id.max' = '20'
    );

CREATE TEMPORARY VIEW max_rates AS (
    SELECT
        currency_id,
        MAX(conversion_rate) AS max_rate
    FROM currency_rates
    GROUP BY currency_id
);

CREATE TEMPORARY VIEW temporal_join AS (
    SELECT
        order_id,
        max_rates.max_rate
    FROM orders
         LEFT JOIN max_rates FOR SYSTEM_TIME AS OF orders.order_time
         ON orders.currency_id = max_rates.currency_id
);

SELECT * FROM temporal_join;

Am I missing something? What would be a good starting point to address this?

Thanks in advance,
Sébastien Chevalley

Reply via email to