I have created a table that reads from a Kafka topic. What I want to do is
order the data by eventTime and add a new field that represents the
previous value using the LAG function.

The problem arises when two records have exactly the same eventTime, which
produces a "strange" behavior.


CREATE TABLE example (
    eventTimestamp BIGINT NOT NULL,
    msisdn INT NOT NULL,
    zoneIds ARRAY<INT NOT NULL> NOT NULL,
    ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3),
    `kafka_offset` BIGINT METADATA FROM 'offset' VIRTUAL,
    WATERMARK FOR ts AS ts
) WITH (
    'connector' = 'kafka',
    'topic' = 'example-offset',
    'properties.bootstrap.servers' = 'xxxx',
    'properties.auto.offset.reset' = 'latest',
    'scan.startup.mode' = 'latest-offset',
    'key.format' = 'raw',
    'key.fields' = 'msisdn',
    'value.format' = 'avro',
    'value.fields-include' = 'ALL',
    'scan.watermark.idle-timeout' = '1000',

);


INSERT INTO example (eventTimestamp, msisdn, zoneIds)VALUES
(1739996380000, 673944959, ARRAY[1]),
(1739996380000, 673944959, ARRAY[1]);


SELECT
    msisdn,
    eventTimestamp,
    ARRAY_REMOVE(IFNULL(zoneIds, ARRAY[-1]), -1) AS zoneIds,
    ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn
ORDER BY ts, kafka_offset), ARRAY[-1]), -1)  AS prev_zoneIds,
    tsFROM example;

*Actual Result:*
msisdneventTimestampzoneIdsprev_zoneIdsts
673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000
673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000*Expected Result:*
msisdneventTimestampzoneIdsprev_zoneIdsts
673944959 1739996380000 [1] [ ] 2025-02-19 21:19:40.000
673944959 1739996380000 [1] [1] 2025-02-19 21:19:40.000
------------------------------
*Is this behavior normal?*

I am trying to achieve the expected behavior by including the metadata of
the offset in the example table and adding it to the OVER clause in the LAG
function. However, it seems that Flink does not allow ordering by more than
one column:


ARRAY_REMOVE(IFNULL(LAG(zoneIds, 1) OVER (PARTITION BY msisdn ORDER BY
ts, kafka_offset), ARRAY[-1]), -1)  AS prev_zoneIds,

Results in:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: The window can only be
ordered by a single time column.

------------------------------

Would you happen to know how to achieve the expected result?

Reply via email to