Hello,

代码在为before这条数据设置rowKind时抛了一个NPE,before正常应该是不为null的。
看起来是你的数据问题,一条 update 的changelog, before 为null, 
这是不合理的,没有before的数据,是无法处理after的数据的。
如果确认是脏数据,可以开启ignore-parse-errors跳过[1]

祝好
Leonard
[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors
 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/formats/debezium.html#debezium-json-ignore-parse-errors>

{
    "payload": {
        "before": null,
        "after": {
            "id": 2,
            "name": "liushimin",
            "age": "24",
            "sex": "man",
            "phone": "155555555"
        },
        "source": {
            "version": "1.2.0.Final",
            "connector": "postgresql",
            "name": "postgres",
            "ts_ms": 1595409754151,
            "snapshot": "false",
            "db": "postgres",
            "schema": "public",
            "table": "person",
            "txId": 569,
            "lsn": 23632344,
            "xmin": null
        },
        "op": "u",
        "ts_ms": 1595409754270,
        "transaction": null
    }
}

> 在 2020年7月22日,17:34,[email protected] 写道:
> 
> {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"155555555"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}

回复