Hello, Any updates on this bug?
Thanks! On Fri, Nov 20, 2020 at 11:51 AM Rex Fenley <r...@remind101.com> wrote: > Btw, this is what our source and sink essentially look like, with some > columns redacted. > > CREATE TABLE source_kafka_data ( > id BIGINT, > roles ARRAY<STRING NOT NULL>, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'topic', > 'properties.bootstrap.servers' = 'kafka', > 'properties.group.id' = 'group_id', > 'properties.auto.offset.reset' = 'earliest', > 'debezium-json.schema-include' = 'true', > 'format' = 'debezium-json' > ) > > > CREATE TABLE sink_es_data ( > id BIGINT NOT NULL, > roles ARRAY<STRING>, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'elasticsearch-7', > 'hosts' = 'eshost', > 'index' = 'data', > 'format' = 'json', > 'sink.bulk-flush.max-actions' = '8192', > 'sink.bulk-flush.max-size' = '16mb', > 'sink.bulk-flush.interval' = '5000', > 'sink.bulk-flush.backoff.delay' = '1000', > 'sink.bulk-flush.backoff.max-retries' = '4', > 'sink.bulk-flush.backoff.strategy' = 'CONSTANT' > ) > > > > On Thu, Nov 19, 2020 at 7:41 PM Rex Fenley <r...@remind101.com> wrote: > >> Thanks! >> >> Update: We've confirmed with a test copy of our data now that if we >> remove all the null values from arrays everything works smoothly and as >> expected. So this definitely appears to be the culprit. >> >> On Thu, Nov 19, 2020 at 6:41 PM Jark Wu <imj...@gmail.com> wrote: >> >>> Thanks Rex! This is very helpful. Will check it out later. >>> >>> >>> On Fri, 20 Nov 2020 at 03:02, Rex Fenley <r...@remind101.com> wrote: >>> >>>> Below is a highly redacted set of data that should represent the >>>> problem. As you can see, the "roles" field has "[null]" in it, a null value >>>> within the array. We also see in our DB corresponding rows like the >>>> following. >>>> id | roles >>>> -----------+------------ >>>> 16867433 | {NULL} >>>> >>>> We have confirmed that by not selecting "roles" all data passes through >>>> without failure on a single operator, but selecting "roles" will eventually >>>> always fail with java.lang.NullPointerException repeatedly. What is >>>> odd about this is there is 0 additional stack trace, just the exception, in >>>> our logs and in Flink UI. We only have INFO logging on, however, other >>>> exceptions we've encountered in our development have always revealed a >>>> stack trace. >>>> >>>> { >>>> "schema": { >>>> "type": "struct", >>>> "fields": [ >>>> { >>>> "type": "struct", >>>> "fields": [ >>>> { "type": "int32", "optional": false, "field": "id" }, >>>> { >>>> "type": "array", >>>> "items": { "type": "string", "optional": true }, >>>> "optional": false, >>>> "field": "roles" >>>> }, >>>> ], >>>> "optional": true, >>>> "name": "db.public.data.Value", >>>> "field": "before" >>>> }, >>>> { >>>> "type": "struct", >>>> "fields": [ >>>> { "type": "int32", "optional": false, "field": "id" }, >>>> { >>>> "type": "array", >>>> "items": { "type": "string", "optional": true }, >>>> "optional": false, >>>> "field": "roles" >>>> }, >>>> ], >>>> "optional": true, >>>> "name": "db.public.data.Value", >>>> "field": "after" >>>> }, >>>> { >>>> "type": "struct", >>>> "fields": [ >>>> { "type": "string", "optional": false, "field": "version" }, >>>> { "type": "string", "optional": false, "field": "connector" }, >>>> { "type": "string", "optional": false, "field": "name" }, >>>> { "type": "int64", "optional": false, "field": "ts_ms" }, >>>> { >>>> "type": "string", >>>> "optional": true, >>>> "name": "io.debezium.data.Enum", >>>> "version": 1, >>>> "parameters": { "allowed": "true,last,false" }, >>>> "default": "false", >>>> "field": "snapshot" >>>> }, >>>> { "type": "string", "optional": false, "field": "db" }, >>>> { "type": "string", "optional": false, "field": "schema" }, >>>> { "type": "string", "optional": false, "field": "table" }, >>>> { "type": "int64", "optional": true, "field": "txId" }, >>>> { "type": "int64", "optional": true, "field": "lsn" }, >>>> { "type": "int64", "optional": true, "field": "xmin" } >>>> ], >>>> "optional": false, >>>> "name": "io.debezium.connector.postgresql.Source", >>>> "field": "source" >>>> }, >>>> { "type": "string", "optional": false, "field": "op" }, >>>> { "type": "int64", "optional": true, "field": "ts_ms" }, >>>> { >>>> "type": "struct", >>>> "fields": [ >>>> { "type": "string", "optional": false, "field": "id" }, >>>> { "type": "int64", "optional": false, "field": "total_order" >>>> }, >>>> { >>>> "type": "int64", >>>> "optional": false, >>>> "field": "data_collection_order" >>>> } >>>> ], >>>> "optional": true, >>>> "field": "transaction" >>>> } >>>> ], >>>> "optional": false, >>>> "name": "db.public.data.Envelope" >>>> }, >>>> "payload": { >>>> "before": null, >>>> "after": { >>>> "id": 76704, >>>> "roles": [null], >>>> }, >>>> "source": { >>>> "version": "1.3.0.Final", >>>> "connector": "postgresql", >>>> "name": "db", >>>> "ts_ms": 1605739197360, >>>> "snapshot": "true", >>>> "db": "db", >>>> "schema": "public", >>>> "table": "data", >>>> "txId": 1784, >>>> "lsn": 1305806608, >>>> "xmin": null >>>> }, >>>> "op": "r", >>>> "ts_ms": 1605739197373, >>>> "transaction": null >>>> } >>>> } >>>> >>>> cc Brad >>>> >>>> On Thu, Nov 19, 2020 at 7:29 AM Dylan Forciea <dy...@oseberg.io> wrote: >>>> >>>>> Ah yes, missed the kafka part and just saw the array part. FLINK-19771 >>>>> definitely was solely in the postgres-specific code. >>>>> >>>>> >>>>> >>>>> Dylan >>>>> >>>>> >>>>> >>>>> *From: *Jark Wu <imj...@gmail.com> >>>>> *Date: *Thursday, November 19, 2020 at 9:12 AM >>>>> *To: *Dylan Forciea <dy...@oseberg.io> >>>>> *Cc: *Danny Chan <danny0...@apache.org>, Rex Fenley <r...@remind101.com>, >>>>> Flink ML <user@flink.apache.org> >>>>> *Subject: *Re: Filter Null in Array in SQL Connector >>>>> >>>>> >>>>> >>>>> Hi Dylan, >>>>> >>>>> >>>>> >>>>> I think Rex encountered another issue, because he is using Kafka with >>>>> Debezium format. >>>>> >>>>> >>>>> >>>>> Hi Rex, >>>>> >>>>> >>>>> >>>>> If you can share the json data and the exception stack, that would be >>>>> helpful! >>>>> >>>>> >>>>> >>>>> Besides, you can try to enable 'debezium-json.ignore-parse-errors' >>>>> option [1] to skip the dirty data. >>>>> >>>>> >>>>> >>>>> Best, >>>>> >>>>> Jark >>>>> >>>>> >>>>> >>>>> [1]: >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors >>>>> >>>>> >>>>> >>>>> On Thu, 19 Nov 2020 at 21:13, Dylan Forciea <dy...@oseberg.io> wrote: >>>>> >>>>> Do you mean that the array contains values that are null, or that the >>>>> entire array itself is null? If it’s the latter, I have an issue written, >>>>> along with a PR to fix it that has been pending review [1]. >>>>> >>>>> >>>>> >>>>> Regards, >>>>> >>>>> Dylan Forciea >>>>> >>>>> >>>>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-19771 >>>>> >>>>> >>>>> >>>>> *From: *Danny Chan <danny0...@apache.org> >>>>> *Date: *Thursday, November 19, 2020 at 2:24 AM >>>>> *To: *Rex Fenley <r...@remind101.com> >>>>> *Cc: *Flink ML <user@flink.apache.org> >>>>> *Subject: *Re: Filter Null in Array in SQL Connector >>>>> >>>>> >>>>> >>>>> Hi, Fenley ~ >>>>> >>>>> >>>>> >>>>> You are right, parsing nulls of ARRAY field is not supported now, i >>>>> have logged an issue [1] and would fix it soon ~ >>>>> >>>>> >>>>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-20234 >>>>> >>>>> >>>>> >>>>> Rex Fenley <r...@remind101.com> 于2020年11月19日周四 下午2:51写道: >>>>> >>>>> Hi, >>>>> >>>>> >>>>> >>>>> I recently discovered some of our data has NULL values arriving in an >>>>> ARRAY<STRING> column. This column is being consumed by Flink via the Kafka >>>>> connector Debezium format. We seem to be receiving NullPointerExceptions >>>>> for when these NULL values in the arrays arrive which restarts the source >>>>> operator in a loop. >>>>> >>>>> >>>>> >>>>> Is there any way to not throw or to possibly filter out NULLs in an >>>>> Array of Strings in Flink? >>>>> >>>>> >>>>> >>>>> We're somewhat stuck on how to solve this problem, we'd like to be >>>>> defensive about this on Flink's side. >>>>> >>>>> >>>>> >>>>> Thanks! >>>>> >>>>> >>>>> >>>>> (P.S. The exception was not that informative, there may be room for >>>>> improvement in terms of a richer error message when this happens.) >>>>> >>>>> >>>>> -- >>>>> >>>>> *Rex Fenley* | Software Engineer - Mobile and Backend >>>>> >>>>> >>>>> >>>>> *Remind.com* <https://www.remind.com/> | BLOG >>>>> <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> >>>>> | *LIKE US <https://www.facebook.com/remindhq>* >>>>> >>>>> >>>> >>>> -- >>>> >>>> Rex Fenley | Software Engineer - Mobile and Backend >>>> >>>> >>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>> <https://www.facebook.com/remindhq> >>>> >>> >> >> -- >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> <https://www.facebook.com/remindhq> >> > > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>