转发: flinkcdc:slave with the same server_uuid/server_id as this slave has connected to the master;
时隔一个月又遇到了这个问题,现在有人能帮忙解决一下吗? maker_d...@foxmail.com 发件人: maker_d...@foxmail.com 发送时间: 2022-02-15 14:13 收件人: user-zh@flink.apache.org 主题: flinkcdc:slave with the same server_uuid/server_id as this slave has connected to the master; flink version:flink-1.13.5 cdc version:2.1.1 在使用flinkcdc同步多个表时遇到报错: org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, backoffTimeMS=1) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435) at sun.reflect.GeneratedMethodAccessor135.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:223) at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: SplitFetcher thread 3 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:148) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:103
flinkcdc:slave with the same server_uuid/server_id as this slave has connected to the master;
) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185) at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973) at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606) at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850) ... 1 more Caused by: io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event '' at 4, the last event read from '/data/mysql/storage/logs/bin_log/bin.001086' at 426321679, the last byte read from '/data/mysql/storage/logs/bin_log/bin.001086' at 426321679. Error code: 1236; SQLSTATE: HY000. at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1146) ... 5 more Caused by: com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event '' at 4, the last event read from '/data/mysql/storage/logs/bin_log/bin.001086' at 426321679, the last byte read from '/data/mysql/storage/logs/bin_log/bin.001086' at 426321679. at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:937) ... 3 more maker_d...@foxmail.com
flinksql客户端不能提交任务
48) Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Number of retries has been exhausted. at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386) ... 21 more Caused by: java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8081 at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ... 19 more Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:8081 Caused by: java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330) at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:748) Shutting down the session... done. 不知道是哪里出问题了,在yarn上看flink-session任务是正常的,其中的其他任务都正常运行,就是不能提交了。 请求各位大佬帮助。 maker_d...@foxmail.com
flink k8s部署使用s3做HA问题
) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not upload job files. at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:86) at org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:195) ... 11 more Caused by: java.io.IOException: PUT operation failed: Could not transfer error message at org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:356) at org.apache.flink.runtime.blob.BlobClient.uploadFile(BlobClient.java:405) at org.apache.flink.runtime.client.ClientUtils.uploadUserJars(ClientUtils.java:113) at org.apache.flink.runtime.client.ClientUtils.uploadAndSetUserJars(ClientUtils.java:105) at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:83) ... 12 more Caused by: java.io.IOException: Could not transfer error message at org.apache.flink.runtime.blob.BlobUtils.readExceptionFromStream(BlobUtils.java:293) at org.apache.flink.runtime.blob.BlobOutputStream.receiveAndCheckPutResponse(BlobOutputStream.java:161) at org.apache.flink.runtime.blob.BlobOutputStream.finish(BlobOutputStream.java:107) at org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:353) ... 16 more Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.s3.model.AmazonS3Exception at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:633) at java.lang.Throwable.readObject(Throwable.java:915) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587) at org.apache.flink.runtime.blob.BlobUtils.readExceptionFromStream(BlobUtils.java:290) ... 19 more ] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:486) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:466) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ... 4 more 不知道是哪里配置有问题还是缺少什么jar包之类的。 请各位帮忙解答。 maker_d...@foxmail.com
k8s session模式SQLclient怎样连接
大家好, 我将flink部署在k8s集群上,使用官方文档上的session模式进行部署,可以正常提交任务。 现在我想使用sqlclient,在提交任务时提示 : [ERROR] Could not execute SQL statement. Reason: java.net.UnknownHostException: flink-cluster 请问大家,如何使用sqlclient连接k8s上的flink session。 flink版本 1.12.4. maker_d...@foxmail.com
flinksql消费kafka写入doris中文乱码
我使用flinksql消费kafka并将数据写入doris,但出现中文乱码。 SQL如下: CREATE TABLE `datacollect_business_kafka` ( `id` varchar(36), `chain_id` varchar(36), `app_id` varchar(32) , ... CHARACTER SET `UTF-8` ) WITH ( 'connector' = 'kafka', 'topic' = 'datacollect_business_stage', 'properties.bootstrap.servers' = 'XXX', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE `datacollect_business_doris` ( `id` varchar(36), `chain_id` varchar(36), `app_id` varchar(32) , ... CHARACTER SET `UTF-8` ) WITH ( 'connector' = 'doris', 'fenodes' = 'XXX', 'table.identifier' = 'stage_datacollect.datacollect_business', 'username' = 'XXX', 'password' = 'XXX', 'sink.batch.size' = '1' ); insert into datacollect_business_doris select * from datacollect_business_kafka; 在网上查找信息,flink-conf.yaml文件中添加:env.java.opts: "-Dfile.encoding=UTF-8” flink版本:1.12.4 部署模式:on yarn 希望各位大佬帮助。 谢谢! maker_d...@foxmail.com
Flink on yarn-cluster模式提交任务报错
(ContainerLocalizer.java:223) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) For more detailed output, check the application tracking page: http://vplc01:8088/cluster/app/application_1623148752688_0041 Then click on links to logs of each attempt. . Failing the application. If log aggregation is enabled on your cluster, use this command to further investigate the issue: yarn logs -applicationId application_1623148752688_0041 at org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1200) at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:592) at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:474) ... 24 more 2021-06-08 18:56:14,431 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Cancelling deployment from Deployment Failure Hook 2021-06-08 18:56:14,432 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Killing YARN application 2021-06-08 18:56:14,438 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl[] - Killed application application_1623148752688_0041 2021-06-08 18:56:14,538 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Deleting files in file:/root/.flink/application_1623148752688_0041. maker_d...@foxmail.com
scala2.12错误:Error: scala/Product$class
我使用maven构建了一个scala2.12的flink项目 我希望使用flink消费kafka,但项目运行时报错: scalac: Error: scala/Product$class java.lang.NoClassDefFoundError: scala/Product$class at org.apache.flink.api.scala.codegen.TypeDescriptors$RecursiveDescriptor.(TypeDescriptors.scala:155) at org.apache.flink.api.scala.codegen.TypeAnalyzer$UDTAnalyzerInstance$UDTAnalyzerCache$$anonfun$getOrElseUpdate$2.apply(TypeAnalyzer.scala:479) at org.apache.flink.api.scala.codegen.TypeAnalyzer$UDTAnalyzerInstance$UDTAnalyzerCache$$anonfun$getOrElseUpdate$2.apply(TypeAnalyzer.scala:478) at scala.Option.getOrElse(Option.scala:138) ... pom文件相关依赖: 1.12.3 2.12 2.12.8 org.apache.flink flink-scala_${scala.binary.version} ${flink.version} provided org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} provided org.apache.flink flink-clients_${scala.binary.version} ${flink.version} provided org.scala-lang scala-library ${scala.version} provided org.apache.flink flink-connector-kafka_${scala.binary.version} ${flink.version} flink消费kafka代码:...val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "xx:9092") val kafkaConsumer = new FlinkKafkaConsumer[String]("xxx", new SimpleStringSchema(), properties) kafkaConsumer.setStartFromGroupOffsets() val kafkaDataStream: DataStream[String] = env.addSource(kafkaConsumer) ...在网上搜索基本都是说依赖版本问题,但我检查了lib,与scala相关的都是2.12版本的。找不到问题在哪儿,请各位指教,谢谢! maker_d...@foxmail.com
flinkSQL插入数据Illegal use of 'NULL'
大家好, 我在使用FlinkCDC读取mysql数据插入到Doris,在sql-client中创建doris表与mysql表,mysql表可以同步数据,到此都没有问题。 但是当insert任务提交后数据一直不能同步,并且没有报错信息。 我将数据库中的数据编辑为单条的insert语句插入flinkSQL中的doris表,报错如下: [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.sql.validate.SqlValidatorException: Illegal use of 'NULL' 经过观察,我发现我的insert语句中,多个列的值为NULL。 请问,这种情况要如何解决,要一列一列的转化吗?(这张表非常大,列很多) 到Doris表数据不同步是否也是这个的原因? 请求各位帮助。 maker_d...@foxmail.com
Re: 回复:Flink集成到CDH集群,Flink重启后web页面打不开
去yarn后台,看看am落在哪个机器上面,地址就是那个。 ---原始邮件--- 发件人: "maker_d...@foxmail.com"
Flink集成到CDH集群,Flink重启后web页面打不开
Flink集成到CDH集群,Flink重启后web页面打不开,有没有遇到同样情况的,大家交流一下。 maker_d...@foxmail.com
Re: Re: 提交FlinkSQLKafka表报异常cannt load user class
hi 从报错上看是缺少了 Flink-sql-kafka 那个包,把这个包添加到 Flink/lib 下面再跑一下 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re: 提交FlinkSQLKafka表报异常cannt load user class
您好, flink-sql-connector-kafka_2.11-1.11.3.jar 这个包已经在flink的lib目录下了。 maker_d...@foxmail.com 发件人: JasonLee 发送时间: 2021-04-25 17:56 收件人: user-zh 主题: Re: 提交FlinkSQLKafka表报异常cannt load user class hi 从报错上看是缺少了 Flink-sql-kafka 那个包,把这个包添加到 Flink/lib 下面再跑一下 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
提交FlinkSQLKafka表报异常cannt load user class
) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260) ... 8 more 环境信息: flink_version: 1.11.3 自编译集成到CDH6.3.1 kafka_version: 2.2.1 部署模式:yarn kafka表如下: CREATE TABLE default_catalog.default_database.kafka_user ( `id` INT COMMENT '主键', `phone` STRING COMMENT '手机号', `last_login_ip` STRING COMMENT '最后登录ip', `last_login_time` TIMESTAMP COMMENT '最后登录时间', `create_by` INT COMMENT '创建人', `update_by` INT COMMENT '最后更新人', `create_date` TIMESTAMP COMMENT '创建时间', `update_date` TIMESTAMP COMMENT '最后更新时间', `sort` INT COMMENT '排序', `remark` STRING COMMENT '备注', `deleted` INT COMMENT '是否删除(0.,否 1.是)' ) WITH ( 'connector' = 'kafka', 'topic' = 'test_user_topic', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'vplc02:9092', 'format' = 'changelog-json' ); mysql表如下: CREATE TABLE default_catalog.default_database.mysql_user ( `id` INT COMMENT '主键', `phone` STRING COMMENT '手机号', `last_login_ip` STRING COMMENT '最后登录ip', `last_login_time` TIMESTAMP COMMENT '最后登录时间', `create_by` INT COMMENT '创建人', `update_by` INT COMMENT '最后更新人', `create_date` TIMESTAMP COMMENT '创建时间', `update_date` TIMESTAMP COMMENT '最后更新时间', `sort` INT COMMENT '排序', `remark` STRING COMMENT '备注', `deleted` INT COMMENT '是否删除(0.,否 1.是)' ) WITH( 'connector' = 'mysql-cdc', 'hostname' = '192.168.171.19', 'port' = '3306', 'username' = '***', 'password' = '***', 'database-name' = 'test_db', 'table-name' = 'user' ); 插入语句: INSERT INTO default_catalog.default_database.kafka_user SELECT * FROM default_catalog.default_database.mysql_user; Flink入门选手, 希望大佬帮助解答! maker_d...@foxmail.com