您好! 我在使用flink时遇到一些问题。 flink-1.14.4 sqlserver-cdc-2.2.1 yarn-per-job
我有一个任务,先是双流join,再与mysql维表lookup join,开启增量检查点; sqlsever-cdc短暂故障,任务失败,自动恢复,但是lookup join对应task不再输出数据; 检查发现,加载维表数据为0,即任务恢复时未加载一次全量维表数据; 以上,可能是什么问题,应该如何解决呢? 期待回复! best wishes! 附日志: 2022-06-24 14:55:45,950 ERROR com.ververica.cdc.debezium.internal.Handover [] - Reporting error: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1] at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:292) ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:152) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1] at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:119) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_301] at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_301] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_301] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_301] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_301] Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 为过程或函数 cdc.fn_cdc_get_all_changes_ ... 提供的参数数目不足。 at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262) ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] at com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5427) ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] at com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1758) ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] at com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1016) ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] at io.debezium.pipeline.source.spi.ChangeTableResultSet.next(ChangeTableResultSet.java:63) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1] at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:181) ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:608) ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1] at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:226) ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:171) ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1] ... 7 more 2022-06-24 14:55:45,953 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping the embedded engine 2022-06-24 14:55:45,954 INFO org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl [] - Source: TableSourceScan(table=[[default_catalog, default_database, carflow]], fields=[id, plate_license, site_id, create_time, flow_type, circle_id]) -> Calc(select=[id, plate_license, site_id, create_time, (create_time + -28800000:INTERVAL HOUR) AS c_time, flow_type, circle_id]) -> WatermarkAssigner(rowtime=[c_time], watermark=[c_time]) (1/1)#0 discarding 0 drained requests 2022-06-24 14:55:45,955 INFO io.debezium.embedded.EmbeddedEngine [] - Stopping the embedded engine 2022-06-24 14:55:45,957 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, carflow]], fields=[id, plate_license, site_id, create_time, flow_type, circle_id]) -> Calc(select=[id, plate_license, site_id, create_time, (create_time + -28800000:INTERVAL HOUR) AS c_time, flow_type, circle_id]) -> WatermarkAssigner(rowtime=[c_time], watermark=[c_time]) (1/1)#0 (71206ba8149ac20bb39d8169ff3d2f02) switched from RUNNING to FAILED with failure cause: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:292) at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:152) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:119) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 为过程或函数 cdc.fn_cdc_get_all_changes_ ... 提供的参数数目不足。 at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262) at com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5427) at com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1758) at com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1016) at io.debezium.pipeline.source.spi.ChangeTableResultSet.next(ChangeTableResultSet.java:63) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:181) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:608) at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:226) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:171) ... 7 more 2022-06-24 14:55:45,957 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: TableSourceScan(table=[[default_catalog, default_database, carflow]], fields=[id, plate_license, site_id, create_time, flow_type, circle_id]) -> Calc(select=[id, plate_license, site_id, create_time, (create_time + -28800000:INTERVAL HOUR) AS c_time, flow_type, circle_id]) -> WatermarkAssigner(rowtime=[c_time], watermark=[c_time]) (1/1)#0 (71206ba8149ac20bb39d8169ff3d2f02). 2022-06-24 15:03:57,819 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FINISHED to JobManager for task Source: TableSourceScan(table=[[default_catalog, default_database, sitecar]], fields=[car_number, site_id, first_oil_time, is_tq_car, relation_id]) -> WatermarkAssigner(rowtime=[first_oil_time], watermark=[first_oil_time]) (1/1)#1 9ebcb0fc15ced6db3f2a579510e415ee. 2022-06-24 15:06:35,005 INFO io.debezium.connector.common.BaseSourceTask [] - 23591 records sent during previous 00:05:19.499, last recorded offset: {transaction_id=null, event_serial_no=1, commit_lsn=0000162c:00016205:00d0, change_lsn=0000162c:00016205:00c7} amber_...@qq.com