flink sink到kafka,报错Failed to construct kafka producer

2021-01-21 文章 lp
flink1.11.2 
自定义source循环产生数据然后sink到kafka
采用application Mode部署作业到yarn,
jobmanager.log报错如下:(jobmanager和taskmanager的container都分配了,报错都是如下)

2021-01-21 10:52:17,742 INFO 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - Calculating tasks to restart to recover the failed task
cbc357ccb763df2852fee8c4fc7d55f2_0.
2021-01-21 10:52:17,742 INFO 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
[] - 1 tasks should be restarted to recover the failed task
cbc357ccb763df2852fee8c4fc7d55f2_0. 
2021-01-21 10:52:17,742 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job
kafkaSink -- flink ???kafka??? (315f9a7b42afb08b4de1841a5b3c0f76)
switched from state RUNNING to RESTARTING.
2021-01-21 10:52:18,743 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job
kafkaSink -- flink ???kafka??? (315f9a7b42afb08b4de1841a5b3c0f76)
switched from state RESTARTING to RUNNING.
2021-01-21 10:52:18,743 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250)
switched from CREATED to SCHEDULED.
2021-01-21 10:52:18,743 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250)
switched from SCHEDULED to DEPLOYING.
2021-01-21 10:52:18,744 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Deploying
Source: Custom Source -> Sink: Unnamed (1/1) (attempt #229) to
container_1611044725922_0017_01_02 @ slave02 (dataPort=39278)
2021-01-21 10:52:18,748 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250)
switched from DEPLOYING to RUNNING.
2021-01-21 10:52:18,753 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (c3ab2e8ae832a93924c02f50e17e2250)
switched from RUNNING to FAILED on
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@75c6d62a.
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432)
~[quickstart-0.1.jar:?]
at
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:78)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1141)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1242)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1238)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:940)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
~[quickstart-0.1.jar:?]
at

回复: flink sink到kafka

2020-07-19 文章 明启 孙
谢谢,我试试

发送自 Windows 10 版邮件应用

发件人: godfrey he
发送时间: 2020年7月19日 23:06
收件人: user-zh
主题: Re: flink sink到kafka

如果你是用flink sql的,可以通过DDL的方式来定义kafka sink,参考 [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html

Best,
Godfrey

smq <374060...@qq.com> 于2020年7月19日周日 下午9:36写道:

> 大家好,我想通过avro格式sink到kafka,请问该怎么实现,官网上没找到相关方法。



Re: flink sink到kafka

2020-07-19 文章 godfrey he
如果你是用flink sql的,可以通过DDL的方式来定义kafka sink,参考 [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html

Best,
Godfrey

smq <374060...@qq.com> 于2020年7月19日周日 下午9:36写道:

> 大家好,我想通过avro格式sink到kafka,请问该怎么实现,官网上没找到相关方法。


flink sink到kafka

2020-07-19 文章 smq
大家好,我想通过avro格式sink到kafka,请问该怎么实现,官网上没找到相关方法。