hi everyone,
小白通过debezium将pgsql cdc数据同步至kafka之后,使用我们flink的sql client提交测试任务,但当kafka端cdc
json数据一开始发送,任务即报错,通过web ui log查看界面发现错误日志如下,还请大佬帮忙分析,谢谢!
====================================分割线======================================
DDL:
CREATE TABLE pgsql_person_cdc(
id BIGINT,
name STRING,
age STRING,
sex STRING,
phone STRING
) WITH (
'connector' = 'kafka',
'topic' = 'postgres.public.person',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'pgsql_cdc',
'format' = 'debezium-json',
'debezium-json.schema-include' = 'true'
)
CREATE TABLE sql_out (
id BIGINT,
name STRING,
age STRING,
sex STRING,
phone STRING
) WITH (
'connector' = 'print'
)
INSERT INTO sql_out SELECT * FROM pgsql_person_cdc;
====================================分割线======================================
错误日志:
java.io.IOException: Corrupt Debezium JSON message
'{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"string","optional":true,"field":"age"},{"type":"string","optional":true,"field":"sex"},{"type":"string","optional":true,"field":"phone"}],"optional":true,"name":"postgres.public.person.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"postgres.public.person.Envelope"},"payload":{"before":null,"after":{"id":2,"name":"liushimin","age":"24","sex":"man","phone":"155555555"},"source":{"version":"1.2.0.Final","connector":"postgresql","name":"postgres","ts_ms":1595409754151,"snapshot":"false","db":"postgres","schema":"public","table":"person","txId":569,"lsn":23632344,"xmin":null},"op":"u","ts_ms":1595409754270,"transaction":null}}'.
at
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:136)
~[flink-json-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
~[flink-connector-kafka-base_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
~[flink-connector-kafka_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
~[flink-connector-kafka_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
~[flink-connector-kafka-base_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
~[flink-dist_2.11-1.11.0.jar:1.11.0]
Caused by: java.lang.NullPointerException
at
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema.deserialize(DebeziumJsonDeserializationSchema.java:120)
~[flink-json-1.11.0.jar:1.11.0]
... 7 more
2020-07-22 17:22:34,415 INFO org.apache.flink.runtime.taskmanager.Task [] -
Freeing task resources for Source: TableSourceScan(table=[[default_catalog,
default_database, pgsql_person_cdc]], fields=[id, name, age, sex, phone]) ->
Sink: Sink(table=[default_catalog.default_database.sql_out], fields=[id, name,
age, sex, phone]) (1/1) (b553cb66df6e47a27e7dae8466b684ab).
2020-07-22 17:22:34,418 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor
[] - Un-registering task and sending final execution state FAILED to JobManager
for task Source: TableSourceScan(table=[[default_catalog, default_database,
pgsql_person_cdc]], fields=[id, name, age, sex, phone]) -> Sink:
Sink(table=[default_catalog.default_database.sql_out], fields=[id, name, age,
sex, phone]) (1/1) b553cb66df6e47a27e7dae8466b684ab.
2020-07-22 17:22:34,461 INFO
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot
TaskSlot(index:0, state:ACTIVE, resource profile:
ResourceProfile{cpuCores=1.0000000000000000, taskHeapMemory=384.000mb
(402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb
(536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId:
495bb5a0cd877808674b29890b6b8bc0, jobId: 3feda3a191fcb8e0da891b9fda1ee532).
2020-07-22 17:22:34,462 INFO
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job
3feda3a191fcb8e0da891b9fda1ee532 from job leader monitoring.
2020-07-22 17:22:34,462 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor
[] - Close JobManager connection for job 3feda3a191fcb8e0da891b9fda1ee532.
====================================分割线======================================
best!
[email protected]