Hello Flink community,

I'm experiencing an issue with nested row handling in **Flink SQL 2.2.0** that 
doesn't occur in earlier versions (1.20.3, 2.1.1).

Problem Description
When performing a LEFT JOIN with a lookup table containing nested ROW fields, 
the `COALESCE()` function returns empty/null values instead of the fallback 
value in Flink 2.2.0.


Expected Behavior (Flink 1.20.3, 2.1.1):

- When `lookup.httpcode500.error.message` is NULL, `COALESCE()` returns 'no 
error'
- When `lookup.httpcode500.error.message` has a value, it returns that value


Actual Behavior (Flink 2.2.0):

- When `lookup.httpcode500.error.message` is NULL, `COALESCE()` returns empty 
string instead of 'no error'

Workaround
Enabling `table.legacy-nested-row-nullability' = 'true'` fixes the issue, but 
I'd prefer to find a solution compatible with 2.2.0's new nullability handling.

Minimal Reproducible Example
Query:
SELECT
    id,
    `value`,
    i.type,
    subtype,
    COALESCE(lookup.httpcode500.error.message, 'no error') as message
FROM input AS i
    LEFT JOIN lookup
        ON MOD(i.type, 5) = lookup.type;
Lookup Table Schema:
CREATE TABLE lookup (
    type        INTEGER,
    subtype     BIGINT,
    httpcode500 ROW<
        `error` ROW<
            code BIGINT NOT NULL,
            message STRING NOT NULL
        > NOT NULL
    >,
    PRIMARY KEY (type) NOT ENFORCED
) WITH (
    'connector' = 'filesystem',
    'path' = 'file:///data/lookup',
    'format' = 'avro'
);

Test Data:
type | subtype | httpcode500
-----|---------|-------------------------------------------
  0  |   100   | {"error": {"code": 500, "message": "Internal Server Error"}}
  1  |   200   | NULL
  2  |   NULL  | {"error": {"code": 500, "message": "Internal Server Error"}}
  3  |   NULL  | NULL

Input Table Schema:
CREATE TABLE input (
    id          STRING,
    `value`     BIGINT,
    type        INTEGER,
    ingest_time TIMESTAMP(3),
    WATERMARK FOR ingest_time AS ingest_time - INTERVAL '0.5' SECOND
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '1',
    'number-of-rows' = '5',
    'fields.id.kind' = 'sequence',
    'fields.id.start' = '0',
    'fields.id.end' = '14',
    'fields.value.min' = '1000',
    'fields.value.max' = '2000',
    'fields.type.kind' = 'sequence',
    'fields.type.start' = '0',
    'fields.type.end' = '14'
);

Results Comparison
Flink 2.2.0 (without legacy option) - UNEXPECTED:
id | value | type | subtype | message
---|-------|------|---------|------------------
0  | 1983  |  0   |  NULL   | [EMPTY]           ← Should be "no error"
0  | 1983  |  0   |   100   | Internal Server Error
1  | 1992  |  1   |   200   | [EMPTY]           ← Should be "no error"
2  | 1922  |  2   |  NULL   | Internal Server Error
3  | 1912  |  3   |  NULL   | [EMPTY]           ← Should be "no error"
4  | 1685  |  4   |  NULL   | [EMPTY]           ← Should be "no error"
Flink 2.1.1 or 2.2.0 (with legacy option) - EXPECTED:
id | value | type | subtype | message
---|-------|------|---------|------------------
0  | 1529  |  0   |  NULL   | no error
0  | 1529  |  0   |   100   | Internal Server Error
1  | 1675  |  1   |   200   | no error
2  | 1862  |  2   |  NULL   | Internal Server Error
3  | 1271  |  3   |  NULL   | no error
4  | 1629  |  4   |  NULL   | no error

Environment
- Flink Version: 2.2.0 (issue), 1.20.3 and 2.1.1 (working)


Questions
1. Is this a known issue in Flink 2.2.0?
2. Is there a recommended fix/workaround other than enabling 
`table.legacy-nested-row-nullability`?
3. Should I report this as a bug in Jira?

Any guidance would be appreciated!

Best regards,
Fred Teunissen

-----------------------------------------------------------------
ATTENTION:
The information in this e-mail is confidential and only meant for the intended 
recipient. If you are not the intended recipient, don't use or disclose it in 
any way. Please let the sender know and delete the message immediately.
-----------------------------------------------------------------

Reply via email to