转发: flinkcdc:slave with the same server_uuid/server_id as this slave has connected to the master;

2022-03-14 Thread maker_d...@foxmail.com
时隔一个月又遇到了这个问题,现在有人能帮忙解决一下吗?



maker_d...@foxmail.com
 
发件人: maker_d...@foxmail.com
发送时间: 2022-02-15 14:13
收件人: user-zh@flink.apache.org
主题: flinkcdc:slave with the same server_uuid/server_id as this slave has 
connected to the master;
flink version:flink-1.13.5
cdc version:2.1.1

在使用flinkcdc同步多个表时遇到报错:
org.apache.flink.runtime.JobException: Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, 
backoffTimeMS=1)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
at sun.reflect.GeneratedMethodAccessor135.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: One or more fetchers have encountered 
exception
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:223)
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:154)
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:116)
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:294)
at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:69)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: SplitFetcher thread 3 received 
unexpected exception while polling the records
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:148)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:103

flinkcdc:slave with the same server_uuid/server_id as this slave has connected to the master;

2022-02-14 Thread maker_d...@foxmail.com
)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
 An exception occurred in the change event producer. This connector will be 
stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185)
at 
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)
at 
com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)
at 
com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)
... 1 more
Caused by: io.debezium.DebeziumException: A slave with the same 
server_uuid/server_id as this slave has connected to the master; the first 
event '' at 4, the last event read from 
'/data/mysql/storage/logs/bin_log/bin.001086' at 426321679, the last byte read 
from '/data/mysql/storage/logs/bin_log/bin.001086' at 426321679. Error code: 
1236; SQLSTATE: HY000.
at 
io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1146)
... 5 more
Caused by: com.github.shyiko.mysql.binlog.network.ServerException: A slave with 
the same server_uuid/server_id as this slave has connected to the master; the 
first event '' at 4, the last event read from 
'/data/mysql/storage/logs/bin_log/bin.001086' at 426321679, the last byte read 
from '/data/mysql/storage/logs/bin_log/bin.001086' at 426321679.
at 
com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:937)
... 3 more




maker_d...@foxmail.com


flinksql客户端不能提交任务

2021-10-13 Thread maker_d...@foxmail.com
48)
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
Could not complete the operation. Number of retries has been exhausted.
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
... 21 more
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
 Connection refused: localhost/127.0.0.1:8081
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
... 19 more
Caused by: 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
 Connection refused: localhost/127.0.0.1:8081
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)

Shutting down the session...
done.

不知道是哪里出问题了,在yarn上看flink-session任务是正常的,其中的其他任务都正常运行,就是不能提交了。
请求各位大佬帮助。



maker_d...@foxmail.com


flink k8s部署使用s3做HA问题

2021-07-27 Thread maker_d...@foxmail.com
)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not upload job files.
at 
org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:86)
at 
org.apache.flink.runtime.rest.handler.job.JobSubmitHandler.lambda$uploadJobGraphFiles$4(JobSubmitHandler.java:195)
... 11 more
Caused by: java.io.IOException: PUT operation failed: Could not transfer error 
message
at org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:356)
at org.apache.flink.runtime.blob.BlobClient.uploadFile(BlobClient.java:405)
at 
org.apache.flink.runtime.client.ClientUtils.uploadUserJars(ClientUtils.java:113)
at 
org.apache.flink.runtime.client.ClientUtils.uploadAndSetUserJars(ClientUtils.java:105)
at 
org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:83)
... 12 more
Caused by: java.io.IOException: Could not transfer error message
at 
org.apache.flink.runtime.blob.BlobUtils.readExceptionFromStream(BlobUtils.java:293)
at 
org.apache.flink.runtime.blob.BlobOutputStream.receiveAndCheckPutResponse(BlobOutputStream.java:161)
at 
org.apache.flink.runtime.blob.BlobOutputStream.finish(BlobOutputStream.java:107)
at org.apache.flink.runtime.blob.BlobClient.putInputStream(BlobClient.java:353)
... 16 more
Caused by: java.lang.ClassNotFoundException: 
com.amazonaws.services.s3.model.AmazonS3Exception
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:76)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1986)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1850)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2160)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:633)
at java.lang.Throwable.readObject(Throwable.java:915)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
at 
org.apache.flink.runtime.blob.BlobUtils.readExceptionFromStream(BlobUtils.java:290)
... 19 more
]
at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:486)
at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:466)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
... 4 more

不知道是哪里配置有问题还是缺少什么jar包之类的。
请各位帮忙解答。




maker_d...@foxmail.com


k8s session模式SQLclient怎样连接

2021-07-22 Thread maker_d...@foxmail.com
大家好,
我将flink部署在k8s集群上,使用官方文档上的session模式进行部署,可以正常提交任务。
现在我想使用sqlclient,在提交任务时提示 :
[ERROR] Could not execute SQL statement. Reason:
java.net.UnknownHostException: flink-cluster
请问大家,如何使用sqlclient连接k8s上的flink session。
flink版本 1.12.4.



maker_d...@foxmail.com


flinksql消费kafka写入doris中文乱码

2021-06-17 Thread maker_d...@foxmail.com
我使用flinksql消费kafka并将数据写入doris,但出现中文乱码。

SQL如下:

CREATE TABLE `datacollect_business_kafka` (
  `id` varchar(36),
  `chain_id` varchar(36),
  `app_id` varchar(32) ,
...
 CHARACTER SET `UTF-8`
) WITH (
  'connector' = 'kafka',
  'topic' = 'datacollect_business_stage',
  'properties.bootstrap.servers' = 'XXX',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

CREATE TABLE `datacollect_business_doris` (
  `id` varchar(36),
  `chain_id` varchar(36),
  `app_id` varchar(32) ,
  ...
   CHARACTER SET `UTF-8`
) WITH (
'connector' = 'doris',
'fenodes' = 'XXX',
'table.identifier' = 'stage_datacollect.datacollect_business',
'username' = 'XXX',
'password' = 'XXX',
'sink.batch.size' = '1'
);

insert into datacollect_business_doris select * from datacollect_business_kafka;


在网上查找信息,flink-conf.yaml文件中添加:env.java.opts: "-Dfile.encoding=UTF-8”

flink版本:1.12.4 
部署模式:on yarn

希望各位大佬帮助。
谢谢! 


maker_d...@foxmail.com


Flink on yarn-cluster模式提交任务报错

2021-06-08 Thread maker_d...@foxmail.com
(ContainerLocalizer.java:223)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

For more detailed output, check the application tracking page: 
http://vplc01:8088/cluster/app/application_1623148752688_0041 Then click on 
links to logs of each attempt.
. Failing the application.
If log aggregation is enabled on your cluster, use this command to further 
investigate the issue:
yarn logs -applicationId application_1623148752688_0041
at 
org.apache.flink.yarn.YarnClusterDescriptor.startAppMaster(YarnClusterDescriptor.java:1200)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:592)
at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:474)
... 24 more
2021-06-08 18:56:14,431 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
   [] - Cancelling deployment from Deployment Failure Hook
2021-06-08 18:56:14,432 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
   [] - Killing YARN application
2021-06-08 18:56:14,438 INFO  
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl[] - Killed 
application application_1623148752688_0041
2021-06-08 18:56:14,538 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
   [] - Deleting files in 
file:/root/.flink/application_1623148752688_0041.



maker_d...@foxmail.com


scala2.12错误:Error: scala/Product$class

2021-06-03 Thread maker_d...@foxmail.com
我使用maven构建了一个scala2.12的flink项目
我希望使用flink消费kafka,但项目运行时报错:

scalac: Error: scala/Product$class
java.lang.NoClassDefFoundError: scala/Product$class
at 
org.apache.flink.api.scala.codegen.TypeDescriptors$RecursiveDescriptor.(TypeDescriptors.scala:155)
at 
org.apache.flink.api.scala.codegen.TypeAnalyzer$UDTAnalyzerInstance$UDTAnalyzerCache$$anonfun$getOrElseUpdate$2.apply(TypeAnalyzer.scala:479)
at 
org.apache.flink.api.scala.codegen.TypeAnalyzer$UDTAnalyzerInstance$UDTAnalyzerCache$$anonfun$getOrElseUpdate$2.apply(TypeAnalyzer.scala:478)
at scala.Option.getOrElse(Option.scala:138)
...

pom文件相关依赖:
1.12.3
2.12
2.12.8
org.apache.flink
flink-scala_${scala.binary.version}
${flink.version}
provided


org.apache.flink
flink-streaming-scala_${scala.binary.version}
${flink.version}
provided


org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}
provided




org.scala-lang
scala-library
${scala.version}
provided






org.apache.flink
flink-connector-kafka_${scala.binary.version}
${flink.version}

flink消费kafka代码:...val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "xx:9092")
val kafkaConsumer = new FlinkKafkaConsumer[String]("xxx", new 
SimpleStringSchema(), properties)
kafkaConsumer.setStartFromGroupOffsets()
val kafkaDataStream: DataStream[String] = env.addSource(kafkaConsumer)
...在网上搜索基本都是说依赖版本问题,但我检查了lib,与scala相关的都是2.12版本的。找不到问题在哪儿,请各位指教,谢谢!



maker_d...@foxmail.com


flinkSQL插入数据Illegal use of 'NULL'

2021-05-11 Thread maker_d...@foxmail.com
大家好,

我在使用FlinkCDC读取mysql数据插入到Doris,在sql-client中创建doris表与mysql表,mysql表可以同步数据,到此都没有问题。
但是当insert任务提交后数据一直不能同步,并且没有报错信息。
我将数据库中的数据编辑为单条的insert语句插入flinkSQL中的doris表,报错如下:
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Illegal use of 
'NULL'
经过观察,我发现我的insert语句中,多个列的值为NULL。
请问,这种情况要如何解决,要一列一列的转化吗?(这张表非常大,列很多)
到Doris表数据不同步是否也是这个的原因?

请求各位帮助。



maker_d...@foxmail.com


Re: 回复:Flink集成到CDH集群,Flink重启后web页面打不开

2021-05-09 Thread maker_d...@foxmail.com
去yarn后台,看看am落在哪个机器上面,地址就是那个。



---原始邮件---
发件人: "maker_d...@foxmail.com"

Flink集成到CDH集群,Flink重启后web页面打不开

2021-05-09 Thread maker_d...@foxmail.com
Flink集成到CDH集群,Flink重启后web页面打不开,有没有遇到同样情况的,大家交流一下。



maker_d...@foxmail.com


Re: Re: 提交FlinkSQLKafka表报异常cannt load user class

2021-04-25 Thread maker_d...@foxmail.com
hi

从报错上看是缺少了 Flink-sql-kafka 那个包,把这个包添加到 Flink/lib 下面再跑一下



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/



Re: Re: 提交FlinkSQLKafka表报异常cannt load user class

2021-04-25 Thread maker_d...@foxmail.com
您好,
flink-sql-connector-kafka_2.11-1.11.3.jar
这个包已经在flink的lib目录下了。



maker_d...@foxmail.com
 
发件人: JasonLee
发送时间: 2021-04-25 17:56
收件人: user-zh
主题: Re: 提交FlinkSQLKafka表报异常cannt load user class
hi
 
从报错上看是缺少了 Flink-sql-kafka 那个包,把这个包添加到 Flink/lib 下面再跑一下
 
 
 
-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/
 


提交FlinkSQLKafka表报异常cannt load user class

2021-04-25 Thread maker_d...@foxmail.com
)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260)
... 8 more

环境信息:
flink_version: 1.11.3 自编译集成到CDH6.3.1
kafka_version: 2.2.1
部署模式:yarn


kafka表如下:
CREATE TABLE default_catalog.default_database.kafka_user (
  `id` INT  COMMENT '主键',
  `phone` STRING  COMMENT '手机号',
  `last_login_ip` STRING  COMMENT '最后登录ip',
  `last_login_time` TIMESTAMP  COMMENT '最后登录时间',
  `create_by` INT  COMMENT '创建人',
  `update_by` INT  COMMENT '最后更新人',
  `create_date` TIMESTAMP  COMMENT '创建时间',
  `update_date` TIMESTAMP  COMMENT '最后更新时间',
  `sort` INT  COMMENT '排序',
  `remark` STRING  COMMENT '备注',
  `deleted` INT  COMMENT '是否删除(0.,否 1.是)'
) WITH (
'connector' = 'kafka',
'topic' = 'test_user_topic',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'vplc02:9092',
'format' = 'changelog-json'
);

mysql表如下:
CREATE TABLE default_catalog.default_database.mysql_user (
  `id` INT  COMMENT '主键',
  `phone` STRING  COMMENT '手机号',
  `last_login_ip` STRING  COMMENT '最后登录ip',
  `last_login_time` TIMESTAMP  COMMENT '最后登录时间',
  `create_by` INT  COMMENT '创建人',
  `update_by` INT  COMMENT '最后更新人',
  `create_date` TIMESTAMP  COMMENT '创建时间',
  `update_date` TIMESTAMP  COMMENT '最后更新时间',
  `sort` INT  COMMENT '排序',
  `remark` STRING  COMMENT '备注',
  `deleted` INT  COMMENT '是否删除(0.,否 1.是)'
) WITH(
  'connector' = 'mysql-cdc',
  'hostname' = '192.168.171.19',
  'port' = '3306',
  'username' = '***',
  'password' = '***',
  'database-name' = 'test_db',
  'table-name' = 'user'
);

插入语句:
INSERT INTO default_catalog.default_database.kafka_user 
SELECT *
FROM default_catalog.default_database.mysql_user;


Flink入门选手,
希望大佬帮助解答!




maker_d...@foxmail.com