Re: flink sink kafka 报错
中间还有这样的错误: 20:14:48,707 WARN org.apache.kafka.common.utils.AppInfoParser [] - Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id="producer-Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-8" at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[?:1.8.0_231] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[?:1.8.0_231] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[?:1.8.0_231] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[?:1.8.0_231] at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[?:1.8.0_231] at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[?:1.8.0_231] at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:426) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298) ~[kafka-clients-2.4.1.jar:?] at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:77) ~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$3(FlinkKafkaProducer.java:1282) ~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1] at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[?:1.8.0_231] at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556) ~[?:1.8.0_231] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_231] at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291) ~[?:1.8.0_231] at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) ~[?:1.8.0_231] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_231] at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401) ~[?:1.8.0_231] at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) ~[?:1.8.0_231] at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160) ~[?:1.8.0_231] at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) ~[?:1.8.0_231] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) ~[?:1.8.0_231] at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[?:1.8.0_231] at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583) ~[?:1.8.0_231] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1263) ~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.cleanUpUserContext(FlinkKafkaProducer.java:1249) ~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.finishRecoveringContext(FlinkKafkaProducer.java:1224) ~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:380) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195) ~[flink-connector-kafka_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) ~[flink-streaming-java_2.11-1.12.1.jar:1.12.1] at
flink sink kafka 报错
我写了一个stream程序,从kafka topicA中读取数据,进行简单预处理后,sink回kafka的topicB,程序本地正常running,但是中间报了一些错误如下: - 20:11:47,078 INFO org.apache.kafka.clients.Metadata [] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-31, transactionalId=Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-31] Cluster ID: IKSZYfPVTaGGwDrkST0v_A 20:11:47,079 INFO org.apache.kafka.clients.Metadata [] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-26, transactionalId=Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-26] Cluster ID: IKSZYfPVTaGGwDrkST0v_A 20:11:47,079 INFO org.apache.kafka.clients.Metadata [] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-7, transactionalId=Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-7] Cluster ID: IKSZYfPVTaGGwDrkST0v_A 20:11:47,066 WARN org.apache.kafka.clients.NetworkClient [] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-4, transactionalId=Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-4] Error connecting to node 10.66.0.129:9092 (id: -1 rack: null) java.net.SocketException: Permission denied: connect at sun.nio.ch.Net.connect0(Native Method) ~[?:1.8.0_231] at sun.nio.ch.Net.connect(Net.java:454) ~[?:1.8.0_231] at sun.nio.ch.Net.connect(Net.java:446) ~[?:1.8.0_231] at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648) ~[?:1.8.0_231] at org.apache.kafka.common.network.Selector.doConnect(Selector.java:280) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.common.network.Selector.connect(Selector.java:258) ~[kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:951) [kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71) [kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122) [kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010) [kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545) [kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.NetworkClientUtils.isReady(NetworkClientUtils.java:42) [kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:65) [kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.producer.internals.Sender.awaitNodeReady(Sender.java:529) [kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:447) [kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311) [kafka-clients-2.4.1.jar:?] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) [kafka-clients-2.4.1.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231] 20:11:47,081 WARN org.apache.kafka.clients.NetworkClient [] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-4, transactionalId=Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-4] Bootstrap broker 10.66.0.129:9092 (id: -1 rack: null) disconnected 20:11:47,081 INFO org.apache.kafka.clients.Metadata [] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-36, transactionalId=Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-36] Cluster ID: IKSZYfPVTaGGwDrkST0v_A 20:11:47,081 INFO org.apache.kafka.clients.Metadata [] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-11, transactionalId=Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-11] Cluster ID: IKSZYfPVTaGGwDrkST0v_A 20:11:47,084 INFO org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl [] - Could not complete snapshot 28 for operator Source: Custom Source -> Flat Map -> Map -> Sink: Unnamed (5/8)#0. Failure reason: Checkpoint was declined. org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 28 for operator Source: Custom Source -> Flat Map -> Map -> Sink: