I have created a table that reads from a Kafka topic. What I want to do is order the data by eventTime and add a new field that represents the previous value using the LAG function.
The problem arises when two records have exactly the same eventTime, which produces a "strange" behavior. CREATE TABLE example ( eventTimestamp BIGINT NOT NULL, msisdn INT NOT NULL, zoneIds ARRAY<INT NOT NULL> NOT NULL, ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3), `kafka_offset` BIGINT METADATA FROM 'offset' VIRTUAL, WATERMARK FOR ts AS ts ) WITH ( 'connector' = 'kafka', 'topic' = 'example-offset', 'properties.bootstrap.servers' = 'xxxx', 'properties.auto.offset.reset' = 'latest', 'scan.startup.mode' = 'latest-offset', 'key.format' = 'raw', 'key.fields' = 'msisdn', 'value.format' = 'avro', 'value.fields-include' = 'ALL', 'scan.watermark.idle-timeout' = '1000', ); INSERT INTO example (eventTimestamp, msisdn, zoneIds)VALUES (1739996380000, 673944959, ARRAY[1]), (1739996380000, 673944959, ARRAY[1]); SELECT msisdn, eventTimestamp, ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds, ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY ts, kafka_offset), ARRAY[-1]), -1) AS prev_zoneIds, tsFROM example; *Actual Result:* msisdneventTimestampzoneIdsprev_zoneIdsts 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000*Expected Result:* msisdneventTimestampzoneIdsprev_zoneIdsts 673944959 1739996380000 [1] [ ] 2025-02-19 21:19:40.000 673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000 ------------------------------ *Is this behavior normal?* I am trying to achieve the expected behavior by including the metadata of the offset in the example table and adding it to the OVER clause in the LAG function. However, it seems that Flink does not allow ordering by more than one column: ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY ts, kafka_offset), ARRAY[-1]), -1) AS prev_zoneIds, Results in: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: The window can only be ordered by a single time column. ------------------------------ Would you happen to know how to achieve the expected result?