您好!
    我在使用flink时遇到一些问题。
flink-1.14.4
sqlserver-cdc-2.2.1
yarn-per-job

我有一个任务,先是双流join,再与mysql维表lookup join,开启增量检查点;
sqlsever-cdc短暂故障,任务失败,自动恢复,但是lookup join对应task不再输出数据;
检查发现,加载维表数据为0,即任务恢复时未加载一次全量维表数据;

以上,可能是什么问题,应该如何解决呢?

期待回复!
best wishes!

附日志:
2022-06-24 14:55:45,950 ERROR com.ververica.cdc.debezium.internal.Handover      
           [] - Reporting error:
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
 An exception occurred in the change event producer. This connector will be 
stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) 
~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:292)
 ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
at 
io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:152)
 ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
at 
io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:119)
 ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_301]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_301]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_301]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_301]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_301]
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 为过程或函数 
cdc.fn_cdc_get_all_changes_ ...  提供的参数数目不足。
at 
com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262)
 ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
at 
com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5427)
 ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
at 
com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1758)
 ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
at 
com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1016)
 ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
at 
io.debezium.pipeline.source.spi.ChangeTableResultSet.next(ChangeTableResultSet.java:63)
 ~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:181)
 ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:608) 
~[flink-sql-connector-mysql-cdc-2.2.1.jar:2.2.1]
at 
io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:226)
 ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:171)
 ~[flink-sql-connector-sqlserver-cdc-2.2.1.jar:2.2.1]
... 7 more
2022-06-24 14:55:45,953 INFO  io.debezium.embedded.EmbeddedEngine               
           [] - Stopping the embedded engine
2022-06-24 14:55:45,954 INFO  
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl
 [] - Source: TableSourceScan(table=[[default_catalog, default_database, 
carflow]], fields=[id, plate_license, site_id, create_time, flow_type, 
circle_id]) -> Calc(select=[id, plate_license, site_id, create_time, 
(create_time + -28800000:INTERVAL HOUR) AS c_time, flow_type, circle_id]) -> 
WatermarkAssigner(rowtime=[c_time], watermark=[c_time]) (1/1)#0 discarding 0 
drained requests
2022-06-24 14:55:45,955 INFO  io.debezium.embedded.EmbeddedEngine               
           [] - Stopping the embedded engine
2022-06-24 14:55:45,957 WARN  org.apache.flink.runtime.taskmanager.Task         
           [] - Source: TableSourceScan(table=[[default_catalog, 
default_database, carflow]], fields=[id, plate_license, site_id, create_time, 
flow_type, circle_id]) -> Calc(select=[id, plate_license, site_id, create_time, 
(create_time + -28800000:INTERVAL HOUR) AS c_time, flow_type, circle_id]) -> 
WatermarkAssigner(rowtime=[c_time], watermark=[c_time]) (1/1)#0 
(71206ba8149ac20bb39d8169ff3d2f02) switched from RUNNING to FAILED with failure 
cause: 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
 An exception occurred in the change event producer. This connector will be 
stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:292)
at 
io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:152)
at 
io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:119)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: 为过程或函数 
cdc.fn_cdc_get_all_changes_ ...  提供的参数数目不足。
at 
com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:262)
at 
com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5427)
at 
com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1758)
at 
com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1016)
at 
io.debezium.pipeline.source.spi.ChangeTableResultSet.next(ChangeTableResultSet.java:63)
at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:181)
at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:608)
at 
io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:226)
at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:171)
... 7 more

2022-06-24 14:55:45,957 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Freeing task resources for Source: 
TableSourceScan(table=[[default_catalog, default_database, carflow]], 
fields=[id, plate_license, site_id, create_time, flow_type, circle_id]) -> 
Calc(select=[id, plate_license, site_id, create_time, (create_time + 
-28800000:INTERVAL HOUR) AS c_time, flow_type, circle_id]) -> 
WatermarkAssigner(rowtime=[c_time], watermark=[c_time]) (1/1)#0 
(71206ba8149ac20bb39d8169ff3d2f02).
2022-06-24 15:03:57,819 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - 
Un-registering task and sending final execution state FINISHED to JobManager 
for task Source: TableSourceScan(table=[[default_catalog, default_database, 
sitecar]], fields=[car_number, site_id, first_oil_time, is_tq_car, 
relation_id]) -> WatermarkAssigner(rowtime=[first_oil_time], 
watermark=[first_oil_time]) (1/1)#1 9ebcb0fc15ced6db3f2a579510e415ee.
2022-06-24 15:06:35,005 INFO  io.debezium.connector.common.BaseSourceTask       
           [] - 23591 records sent during previous 00:05:19.499, last recorded 
offset: {transaction_id=null, event_serial_no=1, 
commit_lsn=0000162c:00016205:00d0, change_lsn=0000162c:00016205:00c7}





amber_...@qq.com

回复