Hello Flink community,
I'm experiencing an issue with nested row handling in **Flink SQL 2.2.0** that
doesn't occur in earlier versions (1.20.3, 2.1.1).
Problem Description
When performing a LEFT JOIN with a lookup table containing nested ROW fields,
the `COALESCE()` function returns empty/null values instead of the fallback
value in Flink 2.2.0.
Expected Behavior (Flink 1.20.3, 2.1.1):
- When `lookup.httpcode500.error.message` is NULL, `COALESCE()` returns 'no
error'
- When `lookup.httpcode500.error.message` has a value, it returns that value
Actual Behavior (Flink 2.2.0):
- When `lookup.httpcode500.error.message` is NULL, `COALESCE()` returns empty
string instead of 'no error'
Workaround
Enabling `table.legacy-nested-row-nullability' = 'true'` fixes the issue, but
I'd prefer to find a solution compatible with 2.2.0's new nullability handling.
Minimal Reproducible Example
Query:
SELECT
id,
`value`,
i.type,
subtype,
COALESCE(lookup.httpcode500.error.message, 'no error') as message
FROM input AS i
LEFT JOIN lookup
ON MOD(i.type, 5) = lookup.type;
Lookup Table Schema:
CREATE TABLE lookup (
type INTEGER,
subtype BIGINT,
httpcode500 ROW<
`error` ROW<
code BIGINT NOT NULL,
message STRING NOT NULL
> NOT NULL
>,
PRIMARY KEY (type) NOT ENFORCED
) WITH (
'connector' = 'filesystem',
'path' = 'file:///data/lookup',
'format' = 'avro'
);
Test Data:
type | subtype | httpcode500
-----|---------|-------------------------------------------
0 | 100 | {"error": {"code": 500, "message": "Internal Server Error"}}
1 | 200 | NULL
2 | NULL | {"error": {"code": 500, "message": "Internal Server Error"}}
3 | NULL | NULL
Input Table Schema:
CREATE TABLE input (
id STRING,
`value` BIGINT,
type INTEGER,
ingest_time TIMESTAMP(3),
WATERMARK FOR ingest_time AS ingest_time - INTERVAL '0.5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'number-of-rows' = '5',
'fields.id.kind' = 'sequence',
'fields.id.start' = '0',
'fields.id.end' = '14',
'fields.value.min' = '1000',
'fields.value.max' = '2000',
'fields.type.kind' = 'sequence',
'fields.type.start' = '0',
'fields.type.end' = '14'
);
Results Comparison
Flink 2.2.0 (without legacy option) - UNEXPECTED:
id | value | type | subtype | message
---|-------|------|---------|------------------
0 | 1983 | 0 | NULL | [EMPTY] ← Should be "no error"
0 | 1983 | 0 | 100 | Internal Server Error
1 | 1992 | 1 | 200 | [EMPTY] ← Should be "no error"
2 | 1922 | 2 | NULL | Internal Server Error
3 | 1912 | 3 | NULL | [EMPTY] ← Should be "no error"
4 | 1685 | 4 | NULL | [EMPTY] ← Should be "no error"
Flink 2.1.1 or 2.2.0 (with legacy option) - EXPECTED:
id | value | type | subtype | message
---|-------|------|---------|------------------
0 | 1529 | 0 | NULL | no error
0 | 1529 | 0 | 100 | Internal Server Error
1 | 1675 | 1 | 200 | no error
2 | 1862 | 2 | NULL | Internal Server Error
3 | 1271 | 3 | NULL | no error
4 | 1629 | 4 | NULL | no error
Environment
- Flink Version: 2.2.0 (issue), 1.20.3 and 2.1.1 (working)
Questions
1. Is this a known issue in Flink 2.2.0?
2. Is there a recommended fix/workaround other than enabling
`table.legacy-nested-row-nullability`?
3. Should I report this as a bug in Jira?
Any guidance would be appreciated!
Best regards,
Fred Teunissen
-----------------------------------------------------------------
ATTENTION:
The information in this e-mail is confidential and only meant for the intended
recipient. If you are not the intended recipient, don't use or disclose it in
any way. Please let the sender know and delete the message immediately.
-----------------------------------------------------------------