Re: flink sink kafka 报错

2021-04-07 文章 lp
中间还有这样的错误:


20:14:48,707 WARN  org.apache.kafka.common.utils.AppInfoParser 
[] - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException:
kafka.producer:type=app-info,id="producer-Source: Custom Source -> Flat Map
-> Map -> Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-8"
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
~[?:1.8.0_231]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
~[?:1.8.0_231]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
~[?:1.8.0_231]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
~[?:1.8.0_231]
at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
~[?:1.8.0_231]
at
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
~[?:1.8.0_231]
at
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:426)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:77)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$3(FlinkKafkaProducer.java:1282)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at 
java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
~[?:1.8.0_231]
at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
~[?:1.8.0_231]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
~[?:1.8.0_231]
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
~[?:1.8.0_231]
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
~[?:1.8.0_231]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
~[?:1.8.0_231]
at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
~[?:1.8.0_231]
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
~[?:1.8.0_231]
at
java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
~[?:1.8.0_231]
at
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
~[?:1.8.0_231]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
~[?:1.8.0_231]
at 
java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
~[?:1.8.0_231]
at
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
~[?:1.8.0_231]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1263)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.cleanUpUserContext(FlinkKafkaProducer.java:1249)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.finishRecoveringContext(FlinkKafkaProducer.java:1224)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:380)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195)
~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
~[flink-streaming-java_2.11-1.12.1.jar:1.12.1]
at

flink sink kafka 报错

2021-04-07 文章 lp
我写了一个stream程序,从kafka
topicA中读取数据,进行简单预处理后,sink回kafka的topicB,程序本地正常running,但是中间报了一些错误如下:
-
20:11:47,078 INFO  org.apache.kafka.clients.Metadata   
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-31, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-31] Cluster ID:
IKSZYfPVTaGGwDrkST0v_A
20:11:47,079 INFO  org.apache.kafka.clients.Metadata   
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-26, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-26] Cluster ID:
IKSZYfPVTaGGwDrkST0v_A
20:11:47,079 INFO  org.apache.kafka.clients.Metadata   
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-7, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-7] Cluster ID:
IKSZYfPVTaGGwDrkST0v_A
20:11:47,066 WARN  org.apache.kafka.clients.NetworkClient  
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-4, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-4] Error connecting to node
10.66.0.129:9092 (id: -1 rack: null)
java.net.SocketException: Permission denied: connect
at sun.nio.ch.Net.connect0(Native Method) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:454) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:446) ~[?:1.8.0_231]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
~[?:1.8.0_231]
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:280)
~[kafka-clients-2.4.1.jar:?]
at org.apache.kafka.common.network.Selector.connect(Selector.java:258)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:951)
[kafka-clients-2.4.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010)
[kafka-clients-2.4.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClientUtils.isReady(NetworkClientUtils.java:42)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:65)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.awaitNodeReady(Sender.java:529)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:447)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
[kafka-clients-2.4.1.jar:?]
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
[kafka-clients-2.4.1.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
20:11:47,081 WARN  org.apache.kafka.clients.NetworkClient  
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-4, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-4] Bootstrap broker
10.66.0.129:9092 (id: -1 rack: null) disconnected
20:11:47,081 INFO  org.apache.kafka.clients.Metadata   
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-36, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-36] Cluster ID:
IKSZYfPVTaGGwDrkST0v_A
20:11:47,081 INFO  org.apache.kafka.clients.Metadata   
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-11, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-11] Cluster ID:
IKSZYfPVTaGGwDrkST0v_A
20:11:47,084 INFO 
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl []
- Could not complete snapshot 28 for operator Source: Custom Source -> Flat
Map -> Map -> Sink: Unnamed (5/8)#0. Failure reason: Checkpoint was
declined.
org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete
snapshot 28 for operator Source: Custom Source -> Flat Map -> Map -> Sink: