社区各位大佬大家好,
我想通过flinkcdc读取mysql表,然后发送到kafka表。
在我使用sql-client客户端向kafka表插入数据时,报如下错误:

2021-04-25 17:21:03
org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
    at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:590)
    at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:384)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
load user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
ClassLoader info: URL ClassLoader:
file: 
'/data/yarn/nm/usercache/flink/appcache/application_1618375297719_0009/blobStore-656e7e03-d94c-4861-b492-aeca2e5b4218/job_3682b0f430839794beb0d09e8e53b416/blob_p-e79c4e89fbdd13c78a3a0602a35a8c6f2ab35ebc-2f20c3259bf505db1bb258562da113c0'
 (valid JAR)
Class not resolvable through given classloader.
    at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:272)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
    at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
    at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
    at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
    at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
    at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
    at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260)
    ... 8 more

环境信息:
flink_version: 1.11.3 自编译集成到CDH6.3.1
kafka_version: 2.2.1
部署模式:yarn


kafka表如下:
CREATE TABLE default_catalog.default_database.kafka_user (
  `id` INT  COMMENT '主键',
  `phone` STRING  COMMENT '手机号',
  `last_login_ip` STRING  COMMENT '最后登录ip',
  `last_login_time` TIMESTAMP  COMMENT '最后登录时间',
  `create_by` INT  COMMENT '创建人',
  `update_by` INT  COMMENT '最后更新人',
  `create_date` TIMESTAMP  COMMENT '创建时间',
  `update_date` TIMESTAMP  COMMENT '最后更新时间',
  `sort` INT  COMMENT '排序',
  `remark` STRING  COMMENT '备注',
  `deleted` INT  COMMENT '是否删除(0.,否 1.是)'
) WITH (
    'connector' = 'kafka',
    'topic' = 'test_user_topic',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'vplc02:9092',
    'format' = 'changelog-json'
);

mysql表如下:
CREATE TABLE default_catalog.default_database.mysql_user (
  `id` INT  COMMENT '主键',
  `phone` STRING  COMMENT '手机号',
  `last_login_ip` STRING  COMMENT '最后登录ip',
  `last_login_time` TIMESTAMP  COMMENT '最后登录时间',
  `create_by` INT  COMMENT '创建人',
  `update_by` INT  COMMENT '最后更新人',
  `create_date` TIMESTAMP  COMMENT '创建时间',
  `update_date` TIMESTAMP  COMMENT '最后更新时间',
  `sort` INT  COMMENT '排序',
  `remark` STRING  COMMENT '备注',
  `deleted` INT  COMMENT '是否删除(0.,否 1.是)'
) WITH(
  'connector' = 'mysql-cdc',
  'hostname' = '192.168.171.19',
  'port' = '3306',
  'username' = '***',
  'password' = '***',
  'database-name' = 'test_db',
  'table-name' = 'user'
);

插入语句:
INSERT INTO default_catalog.default_database.kafka_user 
SELECT *
FROM default_catalog.default_database.mysql_user;


Flink入门选手,
希望大佬帮助解答!




maker_d...@foxmail.com

回复