Flink SQL 作业, 开始运行正常, checkpoint也正常, 没有大状态(状态19.7KB),
但是执行一段时间后,开始报错, 然后恢复执行个几秒,也马上报这个错,然后报错恢复不停循环.
哪位帮忙看看,不胜感激.
2020-06-05 14:13:19,791 INFO org.apache.flink.runtime.taskmanager.Task -
Source: KafkaTableSource(userId, utp, utrp, extendFields, requestTime) ->
SourceConversion(table=[default_catalog.default_database.user_visit_trace,
source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]],
fields=[userId, utp, utrp, extendFields, requestTime]) ->
Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime],
where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
(((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') >
_UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS NOT
NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)],
joinType=[InnerJoin], async=[false], lookup=[user_id=userId], where=[shop_id IS
NOT NULL], select=[item_id, userId, requestTime, shop_id, user_id]) ->
Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) / 1000)
FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT
_UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT _UTF-16LE':'
CONCAT item_id) AS redisKey, requestTime AS fieldName]) ->
SinkConversionToTuple2 -> Sink: Unnamed (1/8)
(ad7a2d51beccbb39f83ac2c16b923bd0) switched from RUNNING to FAILED.
java.lang.Exception: Could not perform checkpoint 401 for operator Source:
KafkaTableSource(userId, utp, utrp, extendFields, requestTime) ->
SourceConversion(table=[default_catalog.default_database.user_visit_trace,
source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]],
fields=[userId, utp, utrp, extendFields, requestTime]) ->
Calc(select=[getItemId(extendFields) AS item_id, userId, requestTime],
where=[((utp = _UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
(((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') >
_UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL AND userId IS NOT
NULL)]) -> LookupJoin(table=[JDBCTableSource(shop_id, user_id)],
joinType=[InnerJoin], async=[false], lookup=[user_id=userId], where=[shop_id IS
NOT NULL], select=[item_id, userId, requestTime, shop_id, user_id]) ->
Calc(select=[(_UTF-16LE'reco_rt:' CONCAT ((CAST(requestTime) / 1000)
FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT
_UTF-16LE':rt:feature:shop5item:cnt_show:' CONCAT shop_id CONCAT _UTF-16LE':'
CONCAT item_id) AS redisKey, requestTime AS fieldName]) ->
SinkConversionToTuple2 -> Sink: Unnamed (1/8).
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:802)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$3(StreamTask.java:777)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$595/1311782208.call(Unknown
Source)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1411)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:991)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:887)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$597/1708409807.run(Unknown
Source)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:860)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:793)
... 12 more
2020-06-05 14:13:19,795 INFO org.apache.flink.runtime.taskmanager.Task -
Attempting to fail task externally Source: KafkaTableSource(userId, utp, utrp,
extendFields, requestTime) ->
SourceConversion(table=[default_catalog.default_database.user_visit_trace,
source: [KafkaTableSource(userId, utp, utrp, extendFields, requestTime)]],
fields=[userId, utp, utrp, extendFields, requestTime]) ->
Calc(select=[(_UTF-16LE'search_rt:' CONCAT ((CAST(requestTime) / 1000)
FROM_UNIXTIME _UTF-16LE'yyyyMMdd') CONCAT _UTF-16LE':traffic:set:' CONCAT
getItemId(extendFields)) AS redisKey, requestTime AS fieldName], where=[((utp =
_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND
(((CAST(requestTime) / 1000) FROM_UNIXTIME _UTF-16LE'yyyyMMdd') >
_UTF-16LE'20200602') AND getItemId(extendFields) IS NOT NULL)]) ->
SinkConversionToTuple2 -> Sink: Unnamed (3/8)
(e0452995af60f6ed941b8dedd078def3).