FW: Unable to achieve Flink kafka connector exactly once delivery semantics.

2023-10-27 文章 Gopal Chennupati (gchennup)
Hi,
Can someone please help me to resolve the below issue while running flink job.
Or provide me any doc/example which describe the exactly-once delivery 
guarantee semantics.

Thanks,
Gopal.

From: Gopal Chennupati (gchennup) 
Date: Friday, 27 October 2023 at 11:00 AM
To: commun...@flink.apache.org , 
u...@flink.apache.org 
Subject: Unable to achieve Flink kafka connector exactly once delivery 
semantics.
Hi Team,


I am trying to configure my kafka sink connector with “exactly-once” delivery 
guarantee, however it’s failing when I run the flink job with this 
configuration, here is the full exception stack trace from the job logs.


[Source: SG-SGT-TransformerJob -> Map -> Sink: Writer -> Sink: Committer 
(5/10)#12] WARN org.apache.kafka.common.utils.AppInfoParser - Error registering 
AppInfo mbean

javax.management.InstanceAlreadyExistsException: 
kafka.producer:type=app-info,id=producer-sgt-4-1

  at 
java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)

  at 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)

  at 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)

  at 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)

  at 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)

  at 
java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)

  at 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)

  at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:433)

  at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:289)

  at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:316)

  at 
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:301)

  at 
org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:55)

  at 
org.apache.flink.connector.kafka.sink.KafkaWriter.getOrCreateTransactionalProducer(KafkaWriter.java:332)

  at 
org.apache.flink.connector.kafka.sink.TransactionAborter.abortTransactionOfSubtask(TransactionAborter.java:104)

  at 
org.apache.flink.connector.kafka.sink.TransactionAborter.abortTransactionsWithPrefix(TransactionAborter.java:82)

  at 
org.apache.flink.connector.kafka.sink.TransactionAborter.abortLingeringTransactions(TransactionAborter.java:66)

  at 
org.apache.flink.connector.kafka.sink.KafkaWriter.abortLingeringTransactions(KafkaWriter.java:295)

  at 
org.apache.flink.connector.kafka.sink.KafkaWriter.(KafkaWriter.java:176)

  at 
org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:111)

  at 
org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:57)

  at 
org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:117)

  at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:146)

  at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)

  at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)

  at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)

  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)

  at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)

  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)

  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)

  at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)

  at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)

  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)

  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)

  at java.base/java.lang.Thread.run(Thread.java:834)


And here is the producer configuration,
KafkaSink sink = KafkaSink
.builder()

.setBootstrapServers(producerConfig.getProperty("bootstrap.servers"))
.setKafkaProducerConfig(producerConfig)
.setRecordSerializer(new 
GenericMessageSerialization<>(generic_key.class,
generic_value.class, 
producer

flink-kafka-connector 消费时获取不到topic-paitition

2022-11-25 文章 朱文忠
kafka connector 开了这个配置, 'properties.allow.auto.create.topics' = 'true'
文档里面也有提到
, 但是开启flinkKafkaComsumer消费一个新的topic时,还是报找不到topic的错误,有大佬帮忙解释一下吗?
报错如下:
这是我的配置
kafka broker 也开启了自动创建topic的配置


Re:Re: Re: Flink Kafka Sink Error: org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0

2022-08-26 文章 gulugulucxg
感谢回复,我这边问题已经修复了,修改一下clients的版本到2.4.1就可以了

















在 2022-08-26 16:20:27,"Weihua Hu"  写道:
>可以尝试升级到 2.5+
>
>Best,
>Weihua
>
>
>On Thu, Aug 25, 2022 at 6:41 PM gulugulucxg  wrote:
>
>> 您好,集群版本是1.1.1,是挺低的,是这个原因吗,升级到多少合适呢
>> 在 2022-08-25 18:31:06,"Weihua Hu"  写道:
>> >kafka 集群的版本是什么呢?看起来是集群版本有点低了
>> >
>> >Best,
>> >Weihua
>> >
>> >
>> >On Thu, Aug 25, 2022 at 3:41 PM gulugulucxg  wrote:
>> >
>> >> 大佬们好:
>> >>
>> >>
>> 我这边指定EXACTLY_ONCE写kafka后,任务直接起能起来,但是从savepoint起任务总是失败,kafka-clients版本2.5.0,flink版本及相关依赖版本均为1.12.4,
>> >>
>> >> 异常如下:
>> >>
>> >> 2022-08-25 10:42:44
>> >>
>> >> org.apache.kafka.common.errors.UnsupportedVersionException: Attempted
>> to write a non-default producerId at version 0
>> >>
>> >> 相关代码如下:
>> >> Properties properties = new Properties();
>> >> properties.put("bootstrap.servers",
>> >> KafkaConstant.bootstrap_servers_01);
>> >> properties.put("transaction.timeout.ms", 15 * 60 * 1000);
>> >> FlinkKafkaProducer statsLogV2Producer = new
>> >> FlinkKafkaProducer<>(
>> >> KafkaConstant.topic_01,
>> >> new MyKafkaSerializationSchema(KafkaConstant.topic_01),
>> >> properties ,
>> >> FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>> >>
>>


Re: Re: Flink Kafka Sink Error: org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0

2022-08-26 文章 Weihua Hu
可以尝试升级到 2.5+

Best,
Weihua


On Thu, Aug 25, 2022 at 6:41 PM gulugulucxg  wrote:

> 您好,集群版本是1.1.1,是挺低的,是这个原因吗,升级到多少合适呢
> 在 2022-08-25 18:31:06,"Weihua Hu"  写道:
> >kafka 集群的版本是什么呢?看起来是集群版本有点低了
> >
> >Best,
> >Weihua
> >
> >
> >On Thu, Aug 25, 2022 at 3:41 PM gulugulucxg  wrote:
> >
> >> 大佬们好:
> >>
> >>
> 我这边指定EXACTLY_ONCE写kafka后,任务直接起能起来,但是从savepoint起任务总是失败,kafka-clients版本2.5.0,flink版本及相关依赖版本均为1.12.4,
> >>
> >> 异常如下:
> >>
> >> 2022-08-25 10:42:44
> >>
> >> org.apache.kafka.common.errors.UnsupportedVersionException: Attempted
> to write a non-default producerId at version 0
> >>
> >> 相关代码如下:
> >> Properties properties = new Properties();
> >> properties.put("bootstrap.servers",
> >> KafkaConstant.bootstrap_servers_01);
> >> properties.put("transaction.timeout.ms", 15 * 60 * 1000);
> >> FlinkKafkaProducer statsLogV2Producer = new
> >> FlinkKafkaProducer<>(
> >> KafkaConstant.topic_01,
> >> new MyKafkaSerializationSchema(KafkaConstant.topic_01),
> >> properties ,
> >> FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
> >>
>


Re:Re: Flink Kafka Sink Error: org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0

2022-08-25 文章 gulugulucxg
您好,集群版本是1.1.1,是挺低的,是这个原因吗,升级到多少合适呢
在 2022-08-25 18:31:06,"Weihua Hu"  写道:
>kafka 集群的版本是什么呢?看起来是集群版本有点低了
>
>Best,
>Weihua
>
>
>On Thu, Aug 25, 2022 at 3:41 PM gulugulucxg  wrote:
>
>> 大佬们好:
>>
>> 我这边指定EXACTLY_ONCE写kafka后,任务直接起能起来,但是从savepoint起任务总是失败,kafka-clients版本2.5.0,flink版本及相关依赖版本均为1.12.4,
>>
>> 异常如下:
>>
>> 2022-08-25 10:42:44
>>
>> org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to 
>> write a non-default producerId at version 0
>>
>> 相关代码如下:
>> Properties properties = new Properties();
>> properties.put("bootstrap.servers",
>> KafkaConstant.bootstrap_servers_01);
>> properties.put("transaction.timeout.ms", 15 * 60 * 1000);
>> FlinkKafkaProducer statsLogV2Producer = new
>> FlinkKafkaProducer<>(
>> KafkaConstant.topic_01,
>> new MyKafkaSerializationSchema(KafkaConstant.topic_01),
>> properties ,
>> FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>>


Re: Flink Kafka Sink Error: org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0

2022-08-25 文章 Weihua Hu
kafka 集群的版本是什么呢?看起来是集群版本有点低了

Best,
Weihua


On Thu, Aug 25, 2022 at 3:41 PM gulugulucxg  wrote:

> 大佬们好:
>
> 我这边指定EXACTLY_ONCE写kafka后,任务直接起能起来,但是从savepoint起任务总是失败,kafka-clients版本2.5.0,flink版本及相关依赖版本均为1.12.4,
>
> 异常如下:
>
> 2022-08-25 10:42:44
>
> org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to 
> write a non-default producerId at version 0
>
> 相关代码如下:
> Properties properties = new Properties();
> properties.put("bootstrap.servers",
> KafkaConstant.bootstrap_servers_01);
> properties.put("transaction.timeout.ms", 15 * 60 * 1000);
> FlinkKafkaProducer statsLogV2Producer = new
> FlinkKafkaProducer<>(
> KafkaConstant.topic_01,
> new MyKafkaSerializationSchema(KafkaConstant.topic_01),
> properties ,
> FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>


Flink Kafka Sink Error: org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write a non-default producerId at version 0

2022-08-25 文章 gulugulucxg
大佬们好:
我这边指定EXACTLY_ONCE写kafka后,任务直接起能起来,但是从savepoint起任务总是失败,kafka-clients版本2.5.0,flink版本及相关依赖版本均为1.12.4,


异常如下:

2022-08-25 10:42:44
org.apache.kafka.common.errors.UnsupportedVersionException: Attempted to write 
a non-default producerId at version 0


相关代码如下:
Properties properties = new Properties();
properties.put("bootstrap.servers", KafkaConstant.bootstrap_servers_01);
properties.put("transaction.timeout.ms", 15 * 60 * 1000);
FlinkKafkaProducer statsLogV2Producer = new 
FlinkKafkaProducer<>(
KafkaConstant.topic_01,
new MyKafkaSerializationSchema(KafkaConstant.topic_01),
properties ,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

Re: Flink Kafka e2e exactly once问题询问

2022-01-03 文章 zilong xiao
假设我的Checkpoint超时时间15min,kafka
transcation超时时间10min,在Checkpoint长时间卡住,超过了事务超时时间,这种情况是不是就有问题了 ?

赵珩  于2021年12月31日周五 21:23写道:

> 我的理解是超过kafka transaction
> timeout时间重启flink任务才会发生未提交数据丢失的情况,
> kafka不会无限期的保存未提交事务数据。
> 正常情况下的flink重启是不会出现数据丢失的。
>
> 在 2021/12/31 11:31, zilong xiao 写道:
> > 看官方文档中有介绍说当kafka事务超时时,可能会出现数据丢失的情况,那就是说,Flink没办法完全保证端到端exactly
> > once是么?想请教下社区大佬,我这么理解是正确的吗?一直都听说Flink 写kafka是可以保证端到端exactly
> once的,看到文档描述有点懵
> >
> > 文档地址:
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-producers-and-fault-tolerance
> >
> > 关键字:"If the time between Flink application crash and completed restart is
> > larger than Kafka’s transaction timeout there will be data loss (Kafka
> will
> > automatically abort transactions that exceeded timeout time)."
>


Re: Flink Kafka e2e exactly once问题询问

2022-01-03 文章 zilong xiao
kafka是有拒绝超时事务提交机制的

Michael Ran  于2021年12月31日周五 14:40写道:

> 没测试过,如果kafka 确定自身会丢掉超时事务消息的前提下,比如10分钟超时丢消息。1.flink
> 发送消息A,进入第一阶段。2.flink 等待kafka 消息一阶段 ack信息  3.flink
> 收到ack消息,发送二阶段确认消息,并进行chk异常:
> 这个时候flink第二阶段消息确认,发送失败(同时flink应用因为各种原因挂了,超过10分钟)3.1 10分钟后,kakfa
> 丢弃事务超时的消息3.2 flink 重启,重新提交二阶段的事务id (但是由于kakfa
> 消息已经丢了,提交无效)猜测:当二阶段commit失败的时候,是否根据重发消息来解决?同时引入幂等保证
> 在 2021-12-31 11:31:49,"zilong xiao"  写道:
> >看官方文档中有介绍说当kafka事务超时时,可能会出现数据丢失的情况,那就是说,Flink没办法完全保证端到端exactly
> >once是么?想请教下社区大佬,我这么理解是正确的吗?一直都听说Flink 写kafka是可以保证端到端exactly
> once的,看到文档描述有点懵
> >
> >文档地址:
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-producers-and-fault-tolerance
> >
> >关键字:"If the time between Flink application crash and completed restart is
> >larger than Kafka’s transaction timeout there will be data loss (Kafka
> will
> >automatically abort transactions that exceeded timeout time)."
>


Re: Flink Kafka e2e exactly once问题询问

2021-12-31 文章 赵珩
我的理解是超过kafka transaction 
timeout时间重启flink任务才会发生未提交数据丢失的情况, 
kafka不会无限期的保存未提交事务数据。 
正常情况下的flink重启是不会出现数据丢失的。


在 2021/12/31 11:31, zilong xiao 写道:

看官方文档中有介绍说当kafka事务超时时,可能会出现数据丢失的情况,那就是说,Flink没办法完全保证端到端exactly
once是么?想请教下社区大佬,我这么理解是正确的吗?一直都听说Flink 写kafka是可以保证端到端exactly once的,看到文档描述有点懵

文档地址:
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-producers-and-fault-tolerance

关键字:"If the time between Flink application crash and completed restart is
larger than Kafka’s transaction timeout there will be data loss (Kafka will
automatically abort transactions that exceeded timeout time)."


Flink Kafka e2e exactly once问题询问

2021-12-30 文章 zilong xiao
看官方文档中有介绍说当kafka事务超时时,可能会出现数据丢失的情况,那就是说,Flink没办法完全保证端到端exactly
once是么?想请教下社区大佬,我这么理解是正确的吗?一直都听说Flink 写kafka是可以保证端到端exactly once的,看到文档描述有点懵

文档地址:
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-producers-and-fault-tolerance

关键字:"If the time between Flink application crash and completed restart is
larger than Kafka’s transaction timeout there will be data loss (Kafka will
automatically abort transactions that exceeded timeout time)."


回复:Flink kafka自定义metrics在influxdb上解析失败

2021-11-26 文章 Jimmy Zhang
你好,我没有自己开发连接器,我用的是kafka 
connector,influxdb只是作为一个metrics信息存储端,你是需要一个influxdb的连接器?我只是用到了metrics统计体系而已,只是自定义了Counter对象,和连接器没有什么关系



发自 网易邮箱大师




 回复的原邮件 
| 发件人 | 信华哺 |
| 日期 | 2021年11月26日 17:22 |
| 收件人 | user-zh@flink.apache.org |
| 抄送至 | |
| 主题 | 回复:Flink kafka自定义metrics在influxdb上解析失败 |
你好:
   我想问一下,你用的flink sql连接器是自己开发的么? 我在网上只能找到一个datastream的influxdb连接器




在2021年7月23日 10:11,Jimmy Zhang 写道:
大家好,Flink版本1.13.1。
我在FlinkKafkaProducer的invork方法中,创建了一个counter,它采用sinkTableName+_receive_count作为最终的metric
 表名,并且采用influxDB作为上报终端,其中,sinkTableName是从catalog信息里拿到的,我从构造中一层层传递进去。

但是发现一个奇怪的问题:当我采用简单的形如insert into a select from b where 
b.c='d'的SQL语句时,influxDB中的表可以成功被建出来;
但如果加上UDF,比如
insert into a select 
CreateJsonOuter('tablename',CaptTimeSlewTime(Record.CAPTURE_TIME, 
'YEAR_,MONTH'),'data',CreateJsonInner(true)) from source where 
Record.datasource='xx'时,influxDB就会报这样的解析错误,不知道大佬们有没有遇见过,困扰一天了!任何建议都不胜感激!
org.influxdb.InfluxDBException$UnableToParseException: partial write: unable to 
parse 
'taskmanager_job_task_operator_xx_ODS_INFO_receive_count,host=xx,job_name= 
insert \ into\ ODS_INFO\ select \ CreateJsonOuter(' :missing fields
unable to parse ''tablename'\, 
'ODS_INFO'\,CaptTimeSlewTime(Record.CAPTURE_TIME\, 
'YEAR_,MONTH')\,'data'\,CreateJsonInner(true\,': missing fields)

|
Best,
Jimmy
|

Signature is customized by Netease Mail Master

回复:Flink kafka自定义metrics在influxdb上解析失败

2021-11-26 文章 信华哺
你好:
我想问一下,你用的flink sql连接器是自己开发的么? 我在网上只能找到一个datastream的influxdb连接器




在2021年7月23日 10:11,Jimmy Zhang 写道:
大家好,Flink版本1.13.1。
我在FlinkKafkaProducer的invork方法中,创建了一个counter,它采用sinkTableName+_receive_count作为最终的metric
 表名,并且采用influxDB作为上报终端,其中,sinkTableName是从catalog信息里拿到的,我从构造中一层层传递进去。

但是发现一个奇怪的问题:当我采用简单的形如insert into a select from b where 
b.c='d'的SQL语句时,influxDB中的表可以成功被建出来;
但如果加上UDF,比如
insert into a select 
CreateJsonOuter('tablename',CaptTimeSlewTime(Record.CAPTURE_TIME, 
'YEAR_,MONTH'),'data',CreateJsonInner(true)) from source where 
Record.datasource='xx'时,influxDB就会报这样的解析错误,不知道大佬们有没有遇见过,困扰一天了!任何建议都不胜感激!
org.influxdb.InfluxDBException$UnableToParseException: partial write: unable to 
parse 
'taskmanager_job_task_operator_xx_ODS_INFO_receive_count,host=xx,job_name= 
insert \ into\ ODS_INFO\ select \ CreateJsonOuter(' :missing fields
unable to parse ''tablename'\, 
'ODS_INFO'\,CaptTimeSlewTime(Record.CAPTURE_TIME\, 
'YEAR_,MONTH')\,'data'\,CreateJsonInner(true\,': missing fields)

|
Best,
Jimmy
|

Signature is customized by Netease Mail Master

回复:Flink kafka自定义metrics在influxdb上解析失败

2021-07-26 文章 Jimmy Zhang
Hi,caizhi,非常感谢你的回复!
在KafkaDynamicTableFactory.java的createDynamicTableSink(Context 
context)方法开始,我通过context.getObjectIdentifier().getObjectName()获取到sinkTableName。因为ObjectIdentifier类就唯一标识了一个表,它包括catalogName、databaseName和objectName,分别代表catalog名、数据库名和表名。之后,我通过构造传入到了FlinkKafkaProducer,然后就可以使用了。
我已经解决了该问题,根本原因是influxDB解析sql失败!原因是,我书写的flink sql语句 insert into 或者insert 
overwrite 中单引号带有换行符,所以写入influxdb会报错。另外,创建表的with参数也要维持版本统一!




|
Best,
Jimmy
|

Signature is customized by Netease Mail Master

在2021年07月23日 10:32,Caizhi Weng 写道:
Hi!

是通过什么方式拿到 sink table name 呢?从报错信息来看,拿到的可能并不是“a” 这样的 table name,而是一个 insert
语句的 digest,因此 metric 表名会变得比较复杂,导致解析错误。

当然也可以选择对 metric 表名进行转义。

Jimmy Zhang  于2021年7月23日周五 上午10:11写道:

> 大家好,Flink版本1.13.1。
> 我在FlinkKafkaProducer的invork方法中,创建了一个counter,它采用sinkTableName+_receive_count作为最终的metric
> 表名,并且采用influxDB作为上报终端,其中,sinkTableName是从catalog信息里拿到的,我从构造中一层层传递进去。
>
> 但是发现一个奇怪的问题:当我采用简单的形如insert into a select from b where
> b.c='d'的SQL语句时,influxDB中的表可以成功被建出来;
> 但如果加上UDF,比如
> insert into a select
> CreateJsonOuter('tablename',CaptTimeSlewTime(Record.CAPTURE_TIME,
> 'YEAR_,MONTH'),'data',CreateJsonInner(true)) from source where
> Record.datasource='xx'时,influxDB就会报这样的解析错误,不知道大佬们有没有遇见过,困扰一天了!任何建议都不胜感激!
> org.influxdb.InfluxDBException$UnableToParseException: partial write:
> unable to parse
> 'taskmanager_job_task_operator_xx_ODS_INFO_receive_count,host=xx,job_name=
> insert \ into\ ODS_INFO\ select \ CreateJsonOuter(' :missing fields
> unable to parse ''tablename'\,
> 'ODS_INFO'\,CaptTimeSlewTime(Record.CAPTURE_TIME\,
> 'YEAR_,MONTH')\,'data'\,CreateJsonInner(true\,': missing fields)
>
> |
> Best,
> Jimmy
> |
>
> Signature is customized by Netease Mail Master


Re: Flink kafka自定义metrics在influxdb上解析失败

2021-07-22 文章 Caizhi Weng
Hi!

是通过什么方式拿到 sink table name 呢?从报错信息来看,拿到的可能并不是“a” 这样的 table name,而是一个 insert
语句的 digest,因此 metric 表名会变得比较复杂,导致解析错误。

当然也可以选择对 metric 表名进行转义。

Jimmy Zhang  于2021年7月23日周五 上午10:11写道:

> 大家好,Flink版本1.13.1。
> 我在FlinkKafkaProducer的invork方法中,创建了一个counter,它采用sinkTableName+_receive_count作为最终的metric
> 表名,并且采用influxDB作为上报终端,其中,sinkTableName是从catalog信息里拿到的,我从构造中一层层传递进去。
>
> 但是发现一个奇怪的问题:当我采用简单的形如insert into a select from b where
> b.c='d'的SQL语句时,influxDB中的表可以成功被建出来;
> 但如果加上UDF,比如
> insert into a select
> CreateJsonOuter('tablename',CaptTimeSlewTime(Record.CAPTURE_TIME,
> 'YEAR_,MONTH'),'data',CreateJsonInner(true)) from source where
> Record.datasource='xx'时,influxDB就会报这样的解析错误,不知道大佬们有没有遇见过,困扰一天了!任何建议都不胜感激!
> org.influxdb.InfluxDBException$UnableToParseException: partial write:
> unable to parse
> 'taskmanager_job_task_operator_xx_ODS_INFO_receive_count,host=xx,job_name=
> insert \ into\ ODS_INFO\ select \ CreateJsonOuter(' :missing fields
> unable to parse ''tablename'\,
> 'ODS_INFO'\,CaptTimeSlewTime(Record.CAPTURE_TIME\,
> 'YEAR_,MONTH')\,'data'\,CreateJsonInner(true\,': missing fields)
>
> |
> Best,
> Jimmy
> |
>
> Signature is customized by Netease Mail Master


Flink kafka自定义metrics在influxdb上解析失败

2021-07-22 文章 Jimmy Zhang
大家好,Flink版本1.13.1。
我在FlinkKafkaProducer的invork方法中,创建了一个counter,它采用sinkTableName+_receive_count作为最终的metric
 表名,并且采用influxDB作为上报终端,其中,sinkTableName是从catalog信息里拿到的,我从构造中一层层传递进去。

但是发现一个奇怪的问题:当我采用简单的形如insert into a select from b where 
b.c='d'的SQL语句时,influxDB中的表可以成功被建出来;
但如果加上UDF,比如
insert into a select 
CreateJsonOuter('tablename',CaptTimeSlewTime(Record.CAPTURE_TIME, 
'YEAR_,MONTH'),'data',CreateJsonInner(true)) from source where 
Record.datasource='xx'时,influxDB就会报这样的解析错误,不知道大佬们有没有遇见过,困扰一天了!任何建议都不胜感激!
org.influxdb.InfluxDBException$UnableToParseException: partial write: unable to 
parse 
'taskmanager_job_task_operator_xx_ODS_INFO_receive_count,host=xx,job_name= 
insert \ into\ ODS_INFO\ select \ CreateJsonOuter(' :missing fields
unable to parse ''tablename'\, 
'ODS_INFO'\,CaptTimeSlewTime(Record.CAPTURE_TIME\, 
'YEAR_,MONTH')\,'data'\,CreateJsonInner(true\,': missing fields)

|
Best,
Jimmy
|

Signature is customized by Netease Mail Master

flink Kafka producer执行出错

2021-04-30 文章 tanggen...@163.com
我在flink处理消费kafka的一个topic,将迟到数据通过侧流发送到一个新的topic,基本上是一直报这个错,每次提交checkpoint时都会报这个错,然后就会重启
还请指导一下,需要做些其它的设置吗
2021-04-30 17:00:51
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: Producer attempted an operation with an old epoch. Either there 
is a newer producer with the same transactionalId, or the producer's 
transaction has been expired by the broker.
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1282)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:816)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:703)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:794)
at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:58)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.sideOutput(WindowOperator.java:558)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:422)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:185)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:569)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:534)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
attempted an operation with an old epoch. Either there is a newer producer with 
the same transactionalId, or the producer's transaction has been expired by the 
broker.




tanggen...@163.com


Re: Flink-kafka-connector Consumer配置警告

2021-04-20 文章 范 佳兴
flink.partition-discovery.interval-millis这个配置在Flink中是生效的,flink kafka connectors 
会根据配置的时间去获取kafka topic的分区信息,代码实现见: FlinkKafkaConsumerBase 
中的createAndStartDiscoveryLoop方法。

19:38:37,557 WARN  org.apache.kafka.clients.consumer.ConsumerConfig
[] - The configuration 'flink.partition-discovery.interval-millis' was
supplied but isn't a known config.

这个WARN是kafka报出来的,意思是说kafka收到了提供这个参数,但是kafka并不认识。
这个参数并不是给kafka用的,只不过在获取kafka分区的时候需要创建一个KafkaConsumer实例,把设置的参数也一并传给了Kafka。
对应的Warn位置为KafkaConsumer构造函数里面调用的config.logUnused()方法。


在 2021/4/18 下午7:45,“lp”<973182...@qq.com> 写入:

flink1.12正常程序中,有如下告警:

19:38:37,557 WARN  org.apache.kafka.clients.consumer.ConsumerConfig 
   
[] - The configuration 'flink.partition-discovery.interval-millis' was
supplied but isn't a known config.

我有一行如下配置:

properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,10);



根据官网https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#topic-discovery介绍:
By default, partition discovery is disabled. To enable it, set a
non-negative value for flink.partition-discovery.interval-millis in the
provided properties config, representing the discovery interval in
milliseconds.


上述配置应该是合法的,但是为何会报如此警告呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/




回复: Flink-kafka-connector Consumer配置警告

2021-04-19 文章 飞翔
你可以看下源码:



这个props只是作为FlinkKafkaConsumer初始化配置变量,只是这个props 
不仅仅是用来初始化kafka的,只不过这个props最后整个扔进kafka消费客户端的初始化里面而已,不会有任何影响。
就想你自己初始化一个kafka 消费端,你往props塞进其他参数,也会警告,但没有任何影响。


--原始邮件--
发件人:
"user-zh"   
 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#topic-discovery介绍:
 By default, partition discovery is disabled. To enable it, set a
 non-negative value for flink.partition-discovery.interval-millis in the
 provided properties config, representing the discovery interval in
 milliseconds.
 
 
 上述配置应该是合法的,但是为何会报如此警告呢?
 
 
 
 --
 Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink-kafka-connector Consumer配置警告

2021-04-19 文章 Paul Lam
这个是 Kafka client 的警告。这个配置项是 Flink 加进去的,Kafka 不认识。

Best,
Paul Lam

> 2021年4月18日 19:45,lp <973182...@qq.com> 写道:
> 
> flink1.12正常程序中,有如下告警:
> 
> 19:38:37,557 WARN  org.apache.kafka.clients.consumer.ConsumerConfig   
>  
> [] - The configuration 'flink.partition-discovery.interval-millis' was
> supplied but isn't a known config.
> 
> 我有一行如下配置:
> properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,10);
> 
> 
> 根据官网https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#topic-discovery介绍:
> By default, partition discovery is disabled. To enable it, set a
> non-negative value for flink.partition-discovery.interval-millis in the
> provided properties config, representing the discovery interval in
> milliseconds.
> 
> 
> 上述配置应该是合法的,但是为何会报如此警告呢?
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Flink-kafka-connector Consumer配置警告

2021-04-18 文章 lp
flink1.12正常程序中,有如下告警:

19:38:37,557 WARN  org.apache.kafka.clients.consumer.ConsumerConfig
[] - The configuration 'flink.partition-discovery.interval-millis' was
supplied but isn't a known config.

我有一行如下配置:
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,10);


根据官网https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#topic-discovery介绍:
By default, partition discovery is disabled. To enable it, set a
non-negative value for flink.partition-discovery.interval-millis in the
provided properties config, representing the discovery interval in
milliseconds.


上述配置应该是合法的,但是为何会报如此警告呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink-kafka-connector Producer.setWriteTimestampToKafka(true) 导致的问题

2021-04-16 文章 lp
我使用flink1.12版本,采用flink-kafka-connector从kafka的topicA中读取数据,然后sink会topicB,在sink
to topicB的FlinkProducer设置如下时,程序会偶现报错,去掉后异常消失,请问是什么原因呢?


flinkKafkaProducer.setWriteTimestampToKafka(true);




--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink kafka connector 偶现报错 Permission denied: connect

2021-04-07 文章 lp
我写了一个flink kafka connector的作业,从kafka topicA消费数据,做处理后,又sink回 topicB,
程序正常running中,偶现如下报错:


java.net.SocketException: Permission denied: connect
at sun.nio.ch.Net.connect0(Native Method) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:454) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:446) ~[?:1.8.0_231]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
~[?:1.8.0_231]
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:280)
~[kafka-clients-2.4.1.jar:?]
at org.apache.kafka.common.network.Selector.connect(Selector.java:258)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:951)
[kafka-clients-2.4.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010)
[kafka-clients-2.4.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:321)
[kafka-clients-2.4.1.jar:?]
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
[kafka-clients-2.4.1.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
10:23:15,951 WARN  org.apache.kafka.clients.NetworkClient  
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-5, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-5] Error connecting to node
10.66.0.129:9092 (id: -1 rack: null)
java.net.SocketException: Permission denied: connect
at sun.nio.ch.Net.connect0(Native Method) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:454) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:446) ~[?:1.8.0_231]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
~[?:1.8.0_231]
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:280)
~[kafka-clients-2.4.1.jar:?]
at org.apache.kafka.common.network.Selector.connect(Selector.java:258)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:951)
[kafka-clients-2.4.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010)
[kafka-clients-2.4.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:545)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:321)
[kafka-clients-2.4.1.jar:?]
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
[kafka-clients-2.4.1.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_231]
10:23:15,953 WARN  org.apache.kafka.clients.NetworkClient  
[] - [Producer clientId=producer-Source: Custom Source -> Flat Map -> Map ->
Sink: Unnamed-3d05135cf7d8f1375d8f655ba9d20255-2, transactionalId=Source:
Custom Source -> Flat Map -> Map -> Sink:
Unnamed-3d05135cf7d8f1375d8f655ba9d20255-2] Error connecting to node
10.66.0.129:9092 (id: -1 rack: null)
java.net.SocketException: Permission denied: connect
at sun.nio.ch.Net.connect0(Native Method) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:454) ~[?:1.8.0_231]
at sun.nio.ch.Net.connect(Net.java:446) ~[?:1.8.0_231]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
~[?:1.8.0_231]
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:280)
~[kafka-clients-2.4.1.jar:?]
at org.apache.kafka.common.network.Selector.connect(Selector.java:258)
~[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:951)
[kafka-clients-2.4.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient.access$500(NetworkClient.java:71)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1122)
[kafka-clients-2.4.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1010)
[kafka-clients-2.4.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClien

??????????????????flink ????????????????????????kafka,mysql??

2021-03-14 文章 Asahi Lee
??
   ??flink 
??

flink-Kafka 报错:ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer

2021-01-22 文章 lp
测试代码如下:
--
public class Sink_KafkaSink_1{
public static void main(String[] args) throws Exception {
final ParameterTool params =
ParameterTool.fromPropertiesFile(Sink_KafkaSink_1.class.getResourceAsStream("/pro.properties"));
String host = params.get("host");
int kafkaPort = Integer.parseInt(params.get("kafkaPort"));
produceTestdata2kafka(new
StringJoiner(":").add(host).add(String.valueOf(kafkaPort)).toString());
}

private static void produceTestdata2kafka(String kafkaAddr) throws
Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
   
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

DataStreamSource text = env.addSource(new
CustomsourceFuncation()).setParallelism(1);

Properties properties = new Properties();
properties.setProperty("bootstrap.servers",kafkaAddr);

FlinkKafkaProducer producer = new
FlinkKafkaProducer("flinktest",//topic
new SimpleStringSchema(), //消息序列化
properties
);
//写入 Kafka 时附加记录的事件时间戳
producer.setWriteTimestampToKafka(true);
text.addSink(producer);
env.execute("[kafkaSink with custom source]");
}
}

class CustomsourceFuncation implements SourceFunction {
//private long count = 1L;
private boolean isRunning = true;

@Override
public void run(SourceContext ctx) throws Exception {
while(isRunning){
//图书的排行榜
List books = new ArrayList<>();
books.add("msg1");
books.add("msg2");
books.add("msg3");
books.add("msg4");
books.add("msg5");
int i = new Random().nextInt(5);
ctx.collect(books.get(i));
//每2秒产生一条数据
Thread.sleep(2000);
}
}

//取消一个cancel的时候会调用的方法
@Override
public void cancel() {
isRunning = false;
}
}
--

本地测试无异常,maven打包后提交yarn集群运行,application Mode模式,jobmanager循环一直报错如下:
--
2021-01-22 07:54:31,929 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job
[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
from state RUNNING to RESTARTING.
2021-01-22 07:54:32,930 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Job
[kafkaSink with custom source] (c256bf309be7e543182c5e1d9af659ef) switched
from state RESTARTING to RUNNING.
2021-01-22 07:54:32,931 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - No
checkpoint found during restore.
2021-01-22 07:54:32,931 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from CREATED to SCHEDULED.
2021-01-22 07:54:32,932 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from SCHEDULED to DEPLOYING.
2021-01-22 07:54:32,932 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Deploying
Source: Custom Source -> Sink: Unnamed (1/1) (attempt #2) with attempt id
ca057bcbb78c0a81fc471d81db89ec28 to container_1611044725922_0027_01_02 @
slave02 (dataPort=37913) with allocation id 3f0f1dc64e898272d68989ca9a8feff2
2021-01-22 07:54:32,950 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from DEPLOYING to RUNNING.
2021-01-22 07:54:32,969 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
Custom Source -> Sink: Unnamed (1/1) (ca057bcbb78c0a81fc471d81db89ec28)
switched from RUNNING to FAILED on container_1611044725922_0027_01_02 @
slave02 (dataPort=37913).
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:432)
~[quickstart-0.1.jar:?]
at
org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:298)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.(FlinkKafkaInternalProducer.java:77)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342)
~[quickstart-0.1.jar:?]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990)

??????flink??????kafka offset

2021-01-17 文章 ??????
savepoint??flink-kafka-connector?? 
Kafka 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/kafka.html


| |
??
|
|
18500348...@163.com
|
??
??2021??1??16?? 19:38?? ??
??flink ??checkpoint,kafka 
offsetstate,checkpointingoffset??kafka,??kafka 
consumer_topic??
groupid??offset,??bakup


checkpoint??-s 
??chkpoint/savepoint??
flink??stateoffset,??kafka??lastest/earliestj
 ??kafka??consumer_topic??topic??


??

flink??????kafka offset

2021-01-16 文章 ????
??flink ??checkpoint,kafka 
offsetstate,checkpointingoffset??kafka,??kafka 
consumer_topic??
groupid??offset,??bakup


checkpoint??-s 
??chkpoint/savepoint??
flink??stateoffset,??kafka??lastest/earliestj
 ??kafka??consumer_topic??topic??


??

Re: flink-kafka-sink

2021-01-12 文章 r pp
hi,没有效果 具体是啥?

cxx <1156531...@qq.com> 于2021年1月7日周四 上午9:53写道:

>  我从kafka消费一条数据,然后将消息进行切分,再发送到下游的kafka中,但是这样不能保证在一个事务里面。
> 例如:我将一条数据切分成10条,然后再第五条的时候抛出一个异常,但是前四条已经发送到下游的kafka了。
> 我设置了事务id,隔离级别,client
> id,enable.idempotence,max.in.flight.requests.per.connection,retries
> 但是没有效果。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best,
  pp


flink-kafka-sink

2021-01-06 文章 cxx
 我从kafka消费一条数据,然后将消息进行切分,再发送到下游的kafka中,但是这样不能保证在一个事务里面。
例如:我将一条数据切分成10条,然后再第五条的时候抛出一个异常,但是前四条已经发送到下游的kafka了。
我设置了事务id,隔离级别,client
id,enable.idempotence,max.in.flight.requests.per.connection,retries
但是没有效果。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Can flink kafka producer dynamically discover new parititons after expansion??

2020-12-25 文章 ??????????
hi??I have a source Kafka and a sink Kafka??when the amount of data processing 
grows??I need to expand Kafka topic's partition number ,but I don't want 
to restart the job to take effect.
for source Kafka, I use flink.partition-discovery.interval-millis and it could 
consume the new parititon after Iexpand the Kafka topic's partition 
number.


but sink kafka don't work like this??


The flink kafka producer get topic's paritition list and cache in 
topicPartitionsMap as showed In Class FlinkKafkaProducer

Can flink kafka producer dynamically discover new parititons after expansion??

2020-12-25 文章 ??????????
hi??I have a source Kafka and a sink Kafka??when the amount of data processing 
grows??I need to expand Kafka topic's partition number ,but I don't want 
to restart the job to take effect.
for source Kafka, I use flink.partition-discovery.interval-millis and it could 
consume the new parititon after Iexpand the Kafka topic's partition 
number.


but sink kafka don't work like this??


The flink kafka producer get topic's paritition list and cache in 
topicPartitionsMap as showed In Class FlinkKafkaProducer

Re: Flink Kafka作业异常重启,从Checkpoint恢复后重复推送数据部分数据如何处理?

2020-08-27 文章 Kevin Dai
谢谢您的回复,刚也看到官方文档DataStream的Kafka连接器中也有相关说明。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Kafka作业异常重启,从Checkpoint恢复后重复推送数据部分数据如何处理?

2020-08-27 文章 Congxian Qiu
Hi
   checkpoint 只能保证 state 的 exactly once,但是单条数据可能重复处理多次,如果是 sink
输出多次的话,或许你可以看一下 TwoPhaseCommitSinkFunction 相关的,这篇文章有一个相关的描述[1]

[1]
https://www.ververica.com/blog/end-to-end-exactly-once-processing-apache-flink-apache-kafka
Best,
Congxian


Kevin Dai <154434...@qq.com> 于2020年8月28日周五 上午9:44写道:

> Flink ETL作业生成实时DWD宽表数据,写入Kafka中。
>
> 当ETL作业的TM出现异常,自动重启恢复后,作业虽然能从上一次Checkpoint状态恢复,但是会出现重复推送部分数据,导致下游DWS相关作业都要进行去重处理,增加下游作业成本。
> 想了下解决方案,扩展Kafka
> Sink,初始化的时候,先读取当前State中记录的位置后面的所有数据,然后写入的时候进行去重处理,恢复到正常位置后,清理掉这部分数据。
> 想问下大佬们,这种处理方式是否合理,或者有没其他更好的解决方案?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Flink Kafka作业异常重启,从Checkpoint恢复后重复推送数据部分数据如何处理?

2020-08-27 文章 Kevin Dai
Flink ETL作业生成实时DWD宽表数据,写入Kafka中。
当ETL作业的TM出现异常,自动重启恢复后,作业虽然能从上一次Checkpoint状态恢复,但是会出现重复推送部分数据,导致下游DWS相关作业都要进行去重处理,增加下游作业成本。
想了下解决方案,扩展Kafka
Sink,初始化的时候,先读取当前State中记录的位置后面的所有数据,然后写入的时候进行去重处理,恢复到正常位置后,清理掉这部分数据。
想问下大佬们,这种处理方式是否合理,或者有没其他更好的解决方案?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink kafka 分区问题

2020-08-24 文章 steven chen
大佬们,有个问题一直不理解
1.FlinkKafkaProducer 往 kafka 中写数据时 kafka 有10分区,现在使用 round-robin 的方式进行分区,每个 task 
都会轮循的写下游的所有 partition
为什么10分区的产生的偏移量会出现偏差。
2.多流join 数据偏移,如何修复
3.flink sql 能否repartition

Re: flink kafka 数据丢失

2020-08-19 文章 赵一旦
问题123等其他人,问题4:我感觉本质上区别不大,如果你资源也完全对等的话,比如最终总体分配内存量相同。而且也没必要启10个taskmanager,唯一好处是更加隔离,比如某个taskmanager失败不会导致全部slot失败。但感觉10个tm的方式从linux角度估计会稍微浪费部分资源。

steven chen  于2020年8月20日周四 上午9:23写道:

> hi:
>版本:flink 1.10 +kafka +hive catalog
>
>
>现在我是使用flinkSql ddl 在hivecatalog 注册元数据表,现在启动了4个job ,kafka 4个分区,然后4job
> 分别根据自己需求去根据hivecatalog 元数据表进行统计,但是出现其中2个job 丢了1条数据,查看kafka
> 分区偏移量丢失数据分区的偏移量少1。相当于所有任务都共用一个元数据,但是各个统计的任务比如天统计和时统计最后的总数发生偏移,比如天统计为50,时统计则51
> | Partition | Latest Offset | Leader | Replicas | In Sync Replicas |
> Preferred Leader? | Under Replicated? |
> | 0 | 352 | 0 | (0) | (0) | true | false |
> | 1 | 351 | 0 | (0) | (0) | true | false |
> | 2 | 352 | 0 | (0) | (0) | true | false |
> | 3 | 352 | 0 | (0) | (0) | true | false |
> | 4 | 351 | 0 | (0) | (0) | true | false |
>
> 问题1:当使用hive catalog 后 只需注册一次数据表?那么指定的kafka group id 在分区消费的时候是随机的?
> 问题2:我使用checkpoint  重新恢复计算还是会出现该情况?有什么好思路排查?
> 问题3:这种实时任务数据如何修复,因为我这个全部flink sql,遇到数据计算异常,无从下手。
> 问题4:本地测试1个taskmanager 10个solt 和10个taskmanager 10个solt ,他们有什么区别?


flink kafka 数据丢失

2020-08-19 文章 steven chen
hi: 
   版本:flink 1.10 +kafka +hive catalog 


   现在我是使用flinkSql ddl 在hivecatalog 注册元数据表,现在启动了4个job ,kafka 4个分区,然后4job 
分别根据自己需求去根据hivecatalog 元数据表进行统计,但是出现其中2个job 丢了1条数据,查看kafka 
分区偏移量丢失数据分区的偏移量少1。相当于所有任务都共用一个元数据,但是各个统计的任务比如天统计和时统计最后的总数发生偏移,比如天统计为50,时统计则51
| Partition | Latest Offset | Leader | Replicas | In Sync Replicas | Preferred 
Leader? | Under Replicated? |
| 0 | 352 | 0 | (0) | (0) | true | false |
| 1 | 351 | 0 | (0) | (0) | true | false |
| 2 | 352 | 0 | (0) | (0) | true | false |
| 3 | 352 | 0 | (0) | (0) | true | false |
| 4 | 351 | 0 | (0) | (0) | true | false |

问题1:当使用hive catalog 后 只需注册一次数据表?那么指定的kafka group id 在分区消费的时候是随机的?
问题2:我使用checkpoint  重新恢复计算还是会出现该情况?有什么好思路排查?
问题3:这种实时任务数据如何修复,因为我这个全部flink sql,遇到数据计算异常,无从下手。
问题4:本地测试1个taskmanager 10个solt 和10个taskmanager 10个solt ,他们有什么区别?

??????????: Re: Flink????Kafka??Mysql?? End-To-End Exactly-Once????????????

2020-08-02 文章 ??????
MySQL??Connection??
??Connection A T ??commit()Connection 
A??A TwoPhaseCommitSinkFunction 
pendingCommitTransactions??Connection 
BcommitT


??MySQL??2PC??





----
??: 
   "user-zh"

https://github.com/lusecond/flink_help --depth=1
 gt;
 gt;
 gt; 
TwoPhaseCommitSinkFunction4??beginTransaction??preCommit??commit??abort
 gt; 
jdbc

Re: flink kafka SQL Connectors 传递kerberos 参数

2020-07-30 文章 Leonard Xu
不知道你的问题是能否通过这个解决

我看了下目前文档里缺少了传递kafka properties 的部分,我建了个issue[1]把文档补齐

Best
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-18768 



> 在 2020年7月30日,17:52,lydata  写道:
> 
> 
> 
> 
> 
> 
> 
> 谢谢 ,我试试
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-07-30 17:34:41,"Leonard Xu"  写道:
>> Hi, 
>> kafka properties 的参数是可以透传的,你试试下面:
>> 
>> ‘properties.security.protocol'='SASL_PLAINTEXT',
>> ‘properties.sasl.mechanism'='GSSAPI’,
>> ‘properties.sasl.kerberos.service.name'='kafka',
>> 
>> 祝好
>> Leonard
>> 
>> 
>>> 在 2020年7月30日,17:00,lydata  写道:
>>> 
>>> 
>>> 
>>> 是否需要这3个参数,或者下面参数是否支持?
>>> 
>>> 
>>> 
>>> 
>>> 'security.protocol'='SASL_PLAINTEXT',
>>> 'sasl.mechanism'='GSSAPI',
>>> 'sasl.kerberos.service.name'='kafka',
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2020-07-30 16:38:11,"lydata"  写道:
 flink v1.11.1  kafka使用了kerberos 
 下面DDL 是支持 kerberos 参数
 
 
 CREATETABLEkafkaTable(
 ...
 )WITH('connector'='kafka',
 'topic'='user_behavior',
 'properties.bootstrap.servers'='localhost:9092',
 'properties.group.id'='testGroup', 'security.protocol'='SASL_PLAINTEXT',
 'sasl.mechanism'='GSSAPI',
 'sasl.kerberos.service.name'='kafka',
 'format'='csv',
 'scan.startup.mode'='earliest-offset'
 )
 
 
 是否支持上面的参数?



Re:Re: flink kafka SQL Connectors 传递kerberos 参数

2020-07-30 文章 lydata






谢谢 ,我试试














在 2020-07-30 17:34:41,"Leonard Xu"  写道:
>Hi, 
>kafka properties 的参数是可以透传的,你试试下面:
>
>‘properties.security.protocol'='SASL_PLAINTEXT',
>‘properties.sasl.mechanism'='GSSAPI’,
>‘properties.sasl.kerberos.service.name'='kafka',
>
>祝好
>Leonard
>
>
>> 在 2020年7月30日,17:00,lydata  写道:
>> 
>> 
>> 
>> 是否需要这3个参数,或者下面参数是否支持?
>> 
>> 
>> 
>> 
>> 'security.protocol'='SASL_PLAINTEXT',
>> 'sasl.mechanism'='GSSAPI',
>> 'sasl.kerberos.service.name'='kafka',
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-07-30 16:38:11,"lydata"  写道:
>>> flink v1.11.1  kafka使用了kerberos 
>>> 下面DDL 是支持 kerberos 参数
>>> 
>>> 
>>> CREATETABLEkafkaTable(
>>> ...
>>> )WITH('connector'='kafka',
>>> 'topic'='user_behavior',
>>> 'properties.bootstrap.servers'='localhost:9092',
>>> 'properties.group.id'='testGroup', 'security.protocol'='SASL_PLAINTEXT',
>>> 'sasl.mechanism'='GSSAPI',
>>> 'sasl.kerberos.service.name'='kafka',
>>> 'format'='csv',
>>> 'scan.startup.mode'='earliest-offset'
>>> )
>>> 
>>> 
>>> 是否支持上面的参数?


Re: flink kafka SQL Connectors 传递kerberos 参数

2020-07-30 文章 Leonard Xu
Hi, 
kafka properties 的参数是可以透传的,你试试下面:

‘properties.security.protocol'='SASL_PLAINTEXT',
‘properties.sasl.mechanism'='GSSAPI’,
‘properties.sasl.kerberos.service.name'='kafka',

祝好
Leonard


> 在 2020年7月30日,17:00,lydata  写道:
> 
> 
> 
> 是否需要这3个参数,或者下面参数是否支持?
> 
> 
> 
> 
> 'security.protocol'='SASL_PLAINTEXT',
> 'sasl.mechanism'='GSSAPI',
> 'sasl.kerberos.service.name'='kafka',
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-07-30 16:38:11,"lydata"  写道:
>> flink v1.11.1  kafka使用了kerberos 
>> 下面DDL 是支持 kerberos 参数
>> 
>> 
>> CREATETABLEkafkaTable(
>> ...
>> )WITH('connector'='kafka',
>> 'topic'='user_behavior',
>> 'properties.bootstrap.servers'='localhost:9092',
>> 'properties.group.id'='testGroup', 'security.protocol'='SASL_PLAINTEXT',
>> 'sasl.mechanism'='GSSAPI',
>> 'sasl.kerberos.service.name'='kafka',
>> 'format'='csv',
>> 'scan.startup.mode'='earliest-offset'
>> )
>> 
>> 
>> 是否支持上面的参数?



flink kafka SQL Connectors 传递kerberos 参数

2020-07-30 文章 lydata
 flink v1.11.1  kafka使用了kerberos 
下面DDL 是支持 kerberos 参数


CREATETABLEkafkaTable(
...
)WITH('connector'='kafka',
'topic'='user_behavior',
'properties.bootstrap.servers'='localhost:9092',
'properties.group.id'='testGroup', 'security.protocol'='SASL_PLAINTEXT',
'sasl.mechanism'='GSSAPI',
'sasl.kerberos.service.name'='kafka',
'format'='csv',
'scan.startup.mode'='earliest-offset'
)


是否支持上面的参数?

flink????kafka??????????

2020-07-29 文章 ??????
flink1.11kafkagroup 
offset??kafka tooloffset??flink

flink????kafka????????????????????

2020-07-23 文章 ??????
??flinkflinkkafka

Flink ??kafka??????????????????checkpoint??????????

2020-07-20 文章 ??????
??
??Flink 
??kafka??checkpointEXACTLY_ONCE
??
Producer attempted an operation with an old epoch.Either there is a newer 
producer with the same transactionalId, or the producer's transaction has been 
expired by the broker


??
??

Re: flink kafka connector中获取kafka元数据

2020-07-07 文章 Dream-底限
好的

On Tue, Jul 7, 2020 at 5:30 PM Leonard Xu  wrote:

> 嗯,这个在FLIP-107里会支持,目前没法拿到这些meta数据,可以关注下FLIP-107的进展。
>
> Best,
> Leonard Xu
>
> > 在 2020年7月7日,17:26,Dream-底限  写道:
> >
> > hi
> > 是的,想以下面这种方式获取
> >
> > CREATE TABLE MyUserTable (key string,topic string,,以及其他的数据字段) WITH
> > ('connector.type' = 'kafka','connector.version' = '0.11' ,...)
> >
> >
> > On Tue, Jul 7, 2020 at 5:19 PM Leonard Xu  wrote:
> >
> >> Hi,
> >> kafka元数据 是指kafka记录里的 meta数据吗? 比如kafka自带的timestamp,kafka的key信息。
> >> 如果是这些信息的话, Table/SQL API 目前还没办法拿到, FLIP-107[1] 会支持这个事情。
> >>
> >> 祝好,
> >> Leonard Xu
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
> >> <
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records
> >>>
> >>
> >>> 在 2020年7月7日,17:12,Dream-底限  写道:
> >>>
> >>> kafka元数据
> >>
> >>
>
>


Re: flink kafka connector中获取kafka元数据

2020-07-07 文章 Leonard Xu
嗯,这个在FLIP-107里会支持,目前没法拿到这些meta数据,可以关注下FLIP-107的进展。

Best,
Leonard Xu

> 在 2020年7月7日,17:26,Dream-底限  写道:
> 
> hi
> 是的,想以下面这种方式获取
> 
> CREATE TABLE MyUserTable (key string,topic string,,以及其他的数据字段) WITH
> ('connector.type' = 'kafka','connector.version' = '0.11' ,...)
> 
> 
> On Tue, Jul 7, 2020 at 5:19 PM Leonard Xu  wrote:
> 
>> Hi,
>> kafka元数据 是指kafka记录里的 meta数据吗? 比如kafka自带的timestamp,kafka的key信息。
>> 如果是这些信息的话, Table/SQL API 目前还没办法拿到, FLIP-107[1] 会支持这个事情。
>> 
>> 祝好,
>> Leonard Xu
>> 
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
>> <
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records
>>> 
>> 
>>> 在 2020年7月7日,17:12,Dream-底限  写道:
>>> 
>>> kafka元数据
>> 
>> 



Re: flink kafka connector中获取kafka元数据

2020-07-07 文章 Dream-底限
hi
是的,想以下面这种方式获取

CREATE TABLE MyUserTable (key string,topic string,,以及其他的数据字段) WITH
('connector.type' = 'kafka','connector.version' = '0.11' ,...)


On Tue, Jul 7, 2020 at 5:19 PM Leonard Xu  wrote:

> Hi,
>  kafka元数据 是指kafka记录里的 meta数据吗? 比如kafka自带的timestamp,kafka的key信息。
> 如果是这些信息的话, Table/SQL API 目前还没办法拿到, FLIP-107[1] 会支持这个事情。
>
> 祝好,
> Leonard Xu
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-107:+Reading+table+columns+from+different+parts+of+source+records
> >
>
> > 在 2020年7月7日,17:12,Dream-底限  写道:
> >
> > kafka元数据
>
>


Re: flink kafka connector中获取kafka元数据

2020-07-07 文章 Leonard Xu
Hi,
 kafka元数据 是指kafka记录里的 meta数据吗? 比如kafka自带的timestamp,kafka的key信息。
如果是这些信息的话, Table/SQL API 目前还没办法拿到, FLIP-107[1] 会支持这个事情。

祝好,
Leonard Xu

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A+Reading+table+columns+from+different+parts+of+source+records
 


> 在 2020年7月7日,17:12,Dream-底限  写道:
> 
> kafka元数据



flink kafka connector中获取kafka元数据

2020-07-07 文章 Dream-底限
hi、
flink table/sql api中,有办法获取kafka元数据吗?

tableEnvironment.sqlUpdate(CREATE TABLE MyUserTable (...) WITH
('connector.type' = 'kafka','connector.version' = '0.11' ,...))


??????flink????kafka????????

2020-06-29 文章 Yichao Yang
Hi


?? dercd_seeme-3 partition 
kafkapartition


Best,
Yichao Yang




----
??:"??"

flink ???? Kafka ???? eos ????

2020-05-22 文章 ????????????????
Hi All??


flink  kafka ?? eos ?? 
??


0-05-21 16:52:15,057 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: Custom 
Source (1/1) (f65b2869d898a050238c53f9fbc9573b) switched from DEPLOYING to 
RUNNING. 2020-05-21 16:52:15,062 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- 
Co-Process-Broadcast - Map - Sink: Unnamed (1/1) 
(d0739aa81367223f83a63a86307fffb3) switched from DEPLOYING to RUNNING. 
2020-05-21 16:52:15,276 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- 
Co-Process-Broadcast - Map - Sink: Unnamed (1/1) 
(d0739aa81367223f83a63a86307fffb3) switched from RUNNING to FAILED. 
org.apache.kafka.common.KafkaException: org.apache.kafka.common.KafkaException: 
Could not find a coordinator with type TRANSACTION with key 
Co-Process-Broadcast - Map - Sink: 
Unnamed-c4ffe334eee7821772b24597621064ce-32 due tounexpected error: The server 
experienced an unexpected error when processing the request.   at 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 
 at 
java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593)  
 at 
java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677) at 
java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)  at 
java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)  at 
java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)   
at 
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)  at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1101)
   at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1037)
 at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
 at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
   at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
   at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460) 
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)   at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:533) at 
java.lang.Thread.run(Thread.java:748) Caused by: 
org.apache.kafka.common.KafkaException: Could not find a coordinator with type 
TRANSACTION with key Co-Process-Broadcast - Map - Sink: 
Unnamed-c4ffe334eee7821772b24597621064ce-32 due tounexpected error: The server 
experienced an unexpected error when processing the request.  at 
org.apache.kafka.clients.producer.internals.TransactionManager$FindCoordinatorHandler.handleResponse(TransactionManager.java:1142)
   at 
org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:909)
 at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)  at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:557)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)  at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:288)  at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235)  
... 1 more

?????? Re: Re: Re: ??????:flink ????kafka source ????????????

2020-01-15 文章 Others
 lib  







----
??:"JingsongLee"https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html#usage


--
From:Others <41486...@qq.com
Send Time:2020??1??15??(??) 15:54
To:user-zh@flink.apache.org JingsongLee http://maven.apache.org/plugins/maven-shade-plugin/
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing 
/Users/lidonghao/Documents/doumob/flinkjob/target/flinkjob-1.0-SNAPSHOT.jar 
with 
/Users/lidonghao/Documents/doumob/flinkjob/target/flinkjob-1.0-SNAPSHOT-shaded.jar
[INFO] Dependency-reduced POM written at: 
/Users/lidonghao/Documents/doumob/flinkjob/dependency-reduced-pom.xml
[INFO] 
[INFO] BUILD SUCCESS
[INFO] 
[INFO] Total time: 9.447 s
[INFO] Finished at: 2020-01-15T15:24:56+08:00
[INFO] Final Memory: 69M/781M
[INFO] 

Process finished with exit code 0

[INFO] Including org.apache.flink:flink-json:jar:1.9.1 in the 
shaded jar.

--  --
??: "JingsongLee"http://www.myorganization.orghttps://repository.apache.org/content/repositories/snapshots/

????????????:flink ????kafka source ????????????

2020-01-15 文章 Others
 lib  




----
??:"AS"https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#dependencies
 jarflink??lib??() ??. 
 ??.






??2020??01??15?? 14:59??Others<41486...@qq.com ??
flink  ??1.9.1
??
2020-01-15 11:57:44,255 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - 
Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: findAndCreateTableSource failed.
nbsp;nbsp;nbsp;nbsp;at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
nbsp;nbsp;nbsp;nbsp;at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
nbsp;nbsp;nbsp;nbsp;at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
nbsp;nbsp;nbsp;nbsp;at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
nbsp;nbsp;nbsp;nbsp;at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
nbsp;nbsp;nbsp;nbsp;at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
nbsp;nbsp;nbsp;nbsp;at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
nbsp;nbsp;nbsp;nbsp;at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
nbsp;nbsp;nbsp;nbsp;at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
nbsp;nbsp;nbsp;nbsp;at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource 
failed.
nbsp;nbsp;nbsp;nbsp;at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
nbsp;nbsp;nbsp;nbsp;at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
nbsp;nbsp;nbsp;nbsp;at 
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
nbsp;nbsp;nbsp;nbsp;at 
com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86)
nbsp;nbsp;nbsp;nbsp;at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
nbsp;nbsp;nbsp;nbsp;at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
nbsp;nbsp;nbsp;nbsp;at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
nbsp;nbsp;nbsp;nbsp;at 
java.lang.reflect.Method.invoke(Method.java:498)
nbsp;nbsp;nbsp;nbsp;at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
nbsp;nbsp;nbsp;nbsp;... 9 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No factory implements 
'org.apache.flink.table.factories.DeserializationSchemaFactory'.

The following properties are requested:
connector.properties.0.key=group.id
connector.properties.0.value=consumer_flink_etl_test
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=flink_etl_pro
connector.type=kafka
connector.version=universal
format.derive-schema=true
format.fail-on-missing-field=false
format.property-version=1
format.type=json
schema.0.name=rowtime
schema.0.rowtime.timestamps.from=cTime
schema.0.rowtime.timestamps.type=from-field
schema.0.rowtime.watermarks.delay=2000
schema.0.rowtime.watermarks.type=periodic-bounded
schema.0.type=TIMESTAMP
schema.1.name=event
schema.1.type=VARCHAR
schema.2.name=adSpaceKey
schema.2.type=VARCHAR
schema.3.name=appkey
schema.3.type=VARCHAR
schema.4.name=build
schema.4.type=VARCHAR
schema.5.name=buoyId
schema.5.type=BIGINT
schema.6.name=gameHtmlId
schema.6.type=BIGINT
schema.7.name=uid
schema.7.type=VARCHAR
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
nbsp;nbsp;nbsp;nbsp;at 
org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243)
nbsp;nbsp;nbsp;nbsp;at 

????????????:flink ????kafka source ????????????

2020-01-15 文章 AS
Hi:
??, kafka??factory.
?? 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#dependencies
 jarflink??lib??() ??. 
??.






??2020??01??15?? 14:59??Others<41486...@qq.com> ??
flink  ??1.9.1
??
2020-01-15 11:57:44,255 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled 
exception.
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: findAndCreateTableSource failed.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource 
failed.
at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
at 
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
at 
com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
... 9 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No factory implements 
'org.apache.flink.table.factories.DeserializationSchemaFactory'.

The following properties are requested:
connector.properties.0.key=group.id
connector.properties.0.value=consumer_flink_etl_test
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=flink_etl_pro
connector.type=kafka
connector.version=universal
format.derive-schema=true
format.fail-on-missing-field=false
format.property-version=1
format.type=json
schema.0.name=rowtime
schema.0.rowtime.timestamps.from=cTime
schema.0.rowtime.timestamps.type=from-field
schema.0.rowtime.watermarks.delay=2000
schema.0.rowtime.watermarks.type=periodic-bounded
schema.0.type=TIMESTAMP
schema.1.name=event
schema.1.type=VARCHAR
schema.2.name=adSpaceKey
schema.2.type=VARCHAR
schema.3.name=appkey
schema.3.type=VARCHAR
schema.4.name=build
schema.4.type=VARCHAR
schema.5.name=buoyId
schema.5.type=BIGINT
schema.6.name=gameHtmlId
schema.6.type=BIGINT
schema.7.name=uid
schema.7.type=VARCHAR
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
at 
org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243)
at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186)
at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259)
at 

??????:flink ????kafka source ????????????

2020-01-14 文章 Others
flink  ??1.9.1
??
2020-01-15 11:57:44,255 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler- Unhandled 
exception.
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: findAndCreateTableSource failed.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource 
failed.
at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
at 
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
at 
com.doumob.flink.BuoyDataJob.main(BuoyDataJob.java:86)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
... 9 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.DeserializationSchemaFactory' in
the classpath.

Reason: No factory implements 
'org.apache.flink.table.factories.DeserializationSchemaFactory'.

The following properties are requested:
connector.properties.0.key=group.id
connector.properties.0.value=consumer_flink_etl_test
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=172.16.0.148:9092,172.16.0.149:9092,172.16.0.150:9092
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=flink_etl_pro
connector.type=kafka
connector.version=universal
format.derive-schema=true
format.fail-on-missing-field=false
format.property-version=1
format.type=json
schema.0.name=rowtime
schema.0.rowtime.timestamps.from=cTime
schema.0.rowtime.timestamps.type=from-field
schema.0.rowtime.watermarks.delay=2000
schema.0.rowtime.watermarks.type=periodic-bounded
schema.0.type=TIMESTAMP
schema.1.name=event
schema.1.type=VARCHAR
schema.2.name=adSpaceKey
schema.2.type=VARCHAR
schema.3.name=appkey
schema.3.type=VARCHAR
schema.4.name=build
schema.4.type=VARCHAR
schema.5.name=buoyId
schema.5.type=BIGINT
schema.6.name=gameHtmlId
schema.6.type=BIGINT
schema.7.name=uid
schema.7.type=VARCHAR
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
at 
org.apache.flink.table.factories.TableFactoryService.filterByFactoryClass(TableFactoryService.java:243)
at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:186)
at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.getDeserializationSchema(KafkaTableSourceSinkFactoryBase.java:259)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactoryBase.createStreamTableSource(KafkaTableSourceSinkFactoryBase.java:144)
at 
org.apache.flink.table.factories.StreamTableSourceFactory.createTableSource(StreamTableSourceFactory.java:49)
at 

?????? Re: flink????kafka????????????kafka??????????????????

2020-01-12 文章 Evan
??kafka??Offset??
kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zkhost:2181 
--group ${group.id} --topic ${topic_name}
zkhost ??group.id??topic_name

Group Topic
 Pid Offset 
logSizeLag 
 Owner
test  dy_event   
0 
811573310658588   
 2542855none
test  dy_event   
1 
811422110658585   
 2544364none
test  dy_event   
2 
811517310658587   
 2543414none
test  dy_event   
3 
811512710658585   
 2543458none
test  dy_event   
4 
811516010658587   
 2543427none



pid Offset??




----
??:"Benchao Li"https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration

wqpapa https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
 
 wqpapa 

??????flink????Kafka????????????

2020-01-09 文章 ZhangChangjun
sourcekafka



------
??: "sunfulin"https://manual.sensorsdata.cn/sa/latest/page-1573828.html??Flink????Kafka??kafka
 console consumer??
flinkjob??

?????? Flink????Kafka??Mysql?? End-To-End Exactly-Once????????????

2020-01-02 文章 ????2008
??1??mysql-connector-java??latest
??5.1.6??
5.1.48
https://github.com/lusecond/flink_help --depth=1


 
TwoPhaseCommitSinkFunction4??beginTransaction??preCommit??commit??abort
 
jdbc

flink????kafka??????

2019-12-05 文章 cs
Hi 
all,??flink??topicgroup
 
idkakfaflink??group
 id

flink kafka consumer部分消息未消费

2019-09-27 文章 zenglong chen
一个test topic,手动往里面添加消息,flink消费后print输出,发现会漏掉部分添加的消息,问题可能出在哪里?前辈有人遇到过类似问题吗?


????flink??kafka??????????????????

2019-08-26 文章 1900
flink on yarn?? flink??1.7.2??hadoop??2.8.5??kafka??1.0.0



kafkaflinkkafka??offset


??
Properties props = new Properties();
props.put("auto.offset.reset", "latest");
Schema(), props));DataStream data = env.addSource(new 
FlinkKafkaConsumer<>("topic", new EventSchema(), props));


??
Properties props = new Properties();
Schema(), props).setStartFromLatest());DataStream data = 
env.addSource(new FlinkKafkaConsumer<>("topic", new EventSchema(), 
props).setStartFromLatest());


??offsetoffset,??






??:


kafka??APIsetStartFromTimestamp??
DataStream data = env.addSource(new FlinkKafkaConsumer<>("topic", new 
EventSchema(), props).setStartFromTimestamp(1566797693000));



??kafka

Re: Flink Kafka Connector相关问题

2019-08-22 文章 戴鑫铉
Hi Victor:
   您的回复已收到,谢谢您详细的解答!非常感谢!

Victor Wong  于2019年8月23日周五 上午10:20写道:

> Hi 鑫铉:
>   我尝试解答下;
>
>   1. 这是否意味着使用FlinkKafkaConsumer09必须开启checkpoint机制,不然不会定期提交offset至kafka呢?
>   根据官方文档  [1],checkpoint offset是Flink的功能,auto commit offset是kafka
> client的功能;这俩功能在作用上有一定重叠,所以根据配置会有不同的行为表现;
>   如果Flink没有开启checkpoint,那么offset的保存依赖于kafka client的auto commit offset;
>   如果Flink开启了checkpoint,那么auto commit
> offset会被Flink禁用;在flink完成checkpoint之后,根据用户配置决定是否提交offset到zookeeper (kafka
> 0.8) 或 kafka broker ( kafka 0.8+);
>   结论:如果不开启checkpoint,只要kafka properties中配置了 " enable.auto.commit"为true 和 "
> auto.commit.interval.ms"大于0,就能定期提交offset到kafka;
>
>   2. current-offsets、committed-offsets、consumer lag;
>   根据官方文档 [2],
>   current-offsets是当前Flink读取到的最新offset;
>   committed-offsets是提交到zookeeper/kafka broker 的offset;
>   consumer lag是指topic最新的offset(log end offset) 和
> committed-offsets的差值;Flink没有提供consumer lag信息,该信息依赖于kafka及其相关运维工具生成;
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-connector-metrics
>
>
> On 2019/8/22, 7:21 PM, "戴鑫铉"  wrote:
>
> 您好,这次发邮件主要想请教一下关于Flink Kafka Connector的一些问题:
>
>
> 1、在官方文档中有提到关闭checkpoint,flink仍会定时提交offset。但经过测试和观察源码,发现只有FlinkKafkaConsumer08会在createFetcher时就创建一个定期提交offset的committer,而FlinkKafkaConsumer09和FlinkKafkaConsumer010似乎只能依赖commitInternalOffsetsToKafka()方法提交Offset,该方法只有在checkpoint打开的情况才会被调用。这是否意味着使用FlinkKafkaConsumer09必须开启checkpoint机制,不然不会定期提交offset至kafka呢?
>
> 2、还想问下flink kafka
>
> connector中的currentoffset和commitoffset,currentoffset-commitoffset就算lag值嘛,官方文档有提到监控lag值,但是我个人感觉currentoffset和kafka内的Log
> End Offset不是一回事啊?能请详细解释一下吗?
>
>
>


Re: Flink Kafka Connector相关问题

2019-08-22 文章 Victor Wong
Hi 鑫铉:
  我尝试解答下;
  
  1. 这是否意味着使用FlinkKafkaConsumer09必须开启checkpoint机制,不然不会定期提交offset至kafka呢?
  根据官方文档  [1],checkpoint offset是Flink的功能,auto commit offset是kafka 
client的功能;这俩功能在作用上有一定重叠,所以根据配置会有不同的行为表现;
  如果Flink没有开启checkpoint,那么offset的保存依赖于kafka client的auto commit offset;
  如果Flink开启了checkpoint,那么auto commit 
offset会被Flink禁用;在flink完成checkpoint之后,根据用户配置决定是否提交offset到zookeeper (kafka 0.8) 或 
kafka broker ( kafka 0.8+);
  结论:如果不开启checkpoint,只要kafka properties中配置了 " enable.auto.commit"为true 和 " 
auto.commit.interval.ms"大于0,就能定期提交offset到kafka;

  2. current-offsets、committed-offsets、consumer lag;
  根据官方文档 [2],
  current-offsets是当前Flink读取到的最新offset;
  committed-offsets是提交到zookeeper/kafka broker 的offset;
  consumer lag是指topic最新的offset(log end offset) 和 
committed-offsets的差值;Flink没有提供consumer lag信息,该信息依赖于kafka及其相关运维工具生成;

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-connector-metrics


On 2019/8/22, 7:21 PM, "戴鑫铉"  wrote:

    您好,这次发邮件主要想请教一下关于Flink Kafka Connector的一些问题:


1、在官方文档中有提到关闭checkpoint,flink仍会定时提交offset。但经过测试和观察源码,发现只有FlinkKafkaConsumer08会在createFetcher时就创建一个定期提交offset的committer,而FlinkKafkaConsumer09和FlinkKafkaConsumer010似乎只能依赖commitInternalOffsetsToKafka()方法提交Offset,该方法只有在checkpoint打开的情况才会被调用。这是否意味着使用FlinkKafkaConsumer09必须开启checkpoint机制,不然不会定期提交offset至kafka呢?

2、还想问下flink kafka

connector中的currentoffset和commitoffset,currentoffset-commitoffset就算lag值嘛,官方文档有提到监控lag值,但是我个人感觉currentoffset和kafka内的Log
End Offset不是一回事啊?能请详细解释一下吗?




Flink Kafka Connector相关问题

2019-08-22 文章 戴鑫铉
您好,这次发邮件主要想请教一下关于Flink Kafka Connector的一些问题:

1、在官方文档中有提到关闭checkpoint,flink仍会定时提交offset。但经过测试和观察源码,发现只有FlinkKafkaConsumer08会在createFetcher时就创建一个定期提交offset的committer,而FlinkKafkaConsumer09和FlinkKafkaConsumer010似乎只能依赖commitInternalOffsetsToKafka()方法提交Offset,该方法只有在checkpoint打开的情况才会被调用。这是否意味着使用FlinkKafkaConsumer09必须开启checkpoint机制,不然不会定期提交offset至kafka呢?

2、还想问下flink kafka
connector中的currentoffset和commitoffset,currentoffset-commitoffset就算lag值嘛,官方文档有提到监控lag值,但是我个人感觉currentoffset和kafka内的Log
End Offset不是一回事啊?能请详细解释一下吗?


?????? Re: flink kafka???????????????? ???? taskmanager ????

2019-07-26 文章 ????
jarflinkflink




--  --
??: "rockey...@163.com";
: 2019??7??26??(??) 10:22
??: "user-zh";

: Re: Re: flink kafka  taskmanager 



??zhisheng??
?? flink-metrics-prometheus ?? 
org/apache/kafka/common/metrics/stats/Rate$1 
flink



rockey...@163.com
 
 zhisheng
?? 2019-07-25 17:27
 user-zh
?? Re: flink kafka  taskmanager 
hi??rockeycui
?? flink-metrics-prometheus ?? report metrics ?? 
metrics ??
 
rockey...@163.com  ??2019??7??25?? 5:12??
 
> ??flink ??
>
> 2019-07-23 10:52:01,420 INFO  org.apache.kafka.common.utils.AppInfoParser
>  - Kafka version : 0.9.0.1
> 2019-07-23 10:52:01,420 INFO  org.apache.kafka.common.utils.AppInfoParser
>  - Kafka commitId : 23c69d62a0cabf06
> 2019-07-23 10:52:01,560 INFO  org.apache.flink.runtime.taskmanager.Task
>  - Attempting to cancel task Source: Custom Source -> Map
> -> Filter -> Map -> Map -> to: Row -> Map -> Sink: Unnamed (3/3)
> (02dd0b294a75cb672899e83c53985034).
> 2019-07-23 10:52:01,560 INFO  org.apache.flink.runtime.taskmanager.Task
>  - Source: Custom Source -> Map -> Filter -> Map -> Map ->
> to: Row -> Map -> Sink: Unnamed (3/3) (02dd0b294a75cb672899e83c53985034)
> switched from RUNNING to CANCELING.
> 2019-07-23 10:52:01,560 INFO  org.apache.flink.runtime.taskmanager.Task
>  - Triggering cancellation of task code Source: Custom
> Source -> Map -> Filter -> Map -> Map -> to: Row -> Map -> Sink: Unnamed
> (3/3) (02dd0b294a75cb672899e83c53985034).
> 2019-07-23 10:52:01,563 INFO
> org.apache.kafka.clients.producer.KafkaProducer   - Closing the
> Kafka producer with timeoutMillis = 9223372036854775807 ms.
> 2019-07-23 10:52:01,571 INFO  org.apache.flink.runtime.taskmanager.Task
>  - Source: Custom Source -> Map -> Filter -> Map -> Map ->
> to: Row -> Map -> Sink: Unnamed (3/3) (02dd0b294a75cb672899e83c53985034)
> switched from CANCELING to CANCELED.
> 2019-07-23 10:52:01,571 INFO  org.apache.flink.runtime.taskmanager.Task
>  - Freeing task resources for Source: Custom Source -> Map
> -> Filter -> Map -> Map -> to: Row -> Map -> Sink: Unnamed (3/3)
> (02dd0b294a75cb672899e83c53985034).
> 2019-07-23 10:52:01,572 INFO  org.apache.flink.runtime.taskmanager.Task
>  - Ensuring all FileSystem streams are closed for task
> Source: Custom Source -> Map -> Filter -> Map -> Map -> to: Row -> Map ->
> Sink: Unnamed (3/3) (02dd0b294a75cb672899e83c53985034) [CANCELED]
> 2019-07-23 10:52:01,572 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor-
> Un-registering task and sending final execution state CANCELED to
> JobManager for task Source: Custom Source -> Map -> Filter -> Map -> Map ->
> to: Row -> Map -> Sink: Unnamed 02dd0b294a75cb672899e83c53985034.
> 2019-07-23 10:52:01,574 ERROR
> org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL:
> Thread 'flink-metrics-57' produced an uncaught exception. Stopping the
> process...
> java.lang.NoClassDefFoundError:
> org/apache/kafka/common/metrics/stats/Rate$1
> at org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:98)
> at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:67)
> at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
> at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
> at
> org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:39)
> at
> org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:29)
> at
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:254)
> at
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:53)
> at
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:166)
> at
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:118)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scal

Re: Re: flink kafka相关任务执行出错 导致 taskmanager 退出

2019-07-25 文章 rockey...@163.com
你好,zhisheng,
我们并没有用到 flink-metrics-prometheus,只是任务执行的时候就会报出 找不到 
org/apache/kafka/common/metrics/stats/Rate$1 这个类,不知道是否是flink自己调用到的



rockey...@163.com
 
发件人: zhisheng
发送时间: 2019-07-25 17:27
收件人: user-zh
主题: Re: flink kafka相关任务执行出错 导致 taskmanager 退出
hi,rockeycui
你是使用了 flink-metrics-prometheus 去 report metrics 数据吗?看到不少和 metrics 相关的字眼
 
rockey...@163.com  于2019年7月25日周四 下午5:12写道:
 
> 你好,flink 运行过程中出现以下异常,有何建议指导排除异常,
>
> 2019-07-23 10:52:01,420 INFO  org.apache.kafka.common.utils.AppInfoParser
>  - Kafka version : 0.9.0.1
> 2019-07-23 10:52:01,420 INFO  org.apache.kafka.common.utils.AppInfoParser
>  - Kafka commitId : 23c69d62a0cabf06
> 2019-07-23 10:52:01,560 INFO  org.apache.flink.runtime.taskmanager.Task
>  - Attempting to cancel task Source: Custom Source -> Map
> -> Filter -> Map -> Map -> to: Row -> Map -> Sink: Unnamed (3/3)
> (02dd0b294a75cb672899e83c53985034).
> 2019-07-23 10:52:01,560 INFO  org.apache.flink.runtime.taskmanager.Task
>  - Source: Custom Source -> Map -> Filter -> Map -> Map ->
> to: Row -> Map -> Sink: Unnamed (3/3) (02dd0b294a75cb672899e83c53985034)
> switched from RUNNING to CANCELING.
> 2019-07-23 10:52:01,560 INFO  org.apache.flink.runtime.taskmanager.Task
>  - Triggering cancellation of task code Source: Custom
> Source -> Map -> Filter -> Map -> Map -> to: Row -> Map -> Sink: Unnamed
> (3/3) (02dd0b294a75cb672899e83c53985034).
> 2019-07-23 10:52:01,563 INFO
> org.apache.kafka.clients.producer.KafkaProducer   - Closing the
> Kafka producer with timeoutMillis = 9223372036854775807 ms.
> 2019-07-23 10:52:01,571 INFO  org.apache.flink.runtime.taskmanager.Task
>  - Source: Custom Source -> Map -> Filter -> Map -> Map ->
> to: Row -> Map -> Sink: Unnamed (3/3) (02dd0b294a75cb672899e83c53985034)
> switched from CANCELING to CANCELED.
> 2019-07-23 10:52:01,571 INFO  org.apache.flink.runtime.taskmanager.Task
>  - Freeing task resources for Source: Custom Source -> Map
> -> Filter -> Map -> Map -> to: Row -> Map -> Sink: Unnamed (3/3)
> (02dd0b294a75cb672899e83c53985034).
> 2019-07-23 10:52:01,572 INFO  org.apache.flink.runtime.taskmanager.Task
>  - Ensuring all FileSystem streams are closed for task
> Source: Custom Source -> Map -> Filter -> Map -> Map -> to: Row -> Map ->
> Sink: Unnamed (3/3) (02dd0b294a75cb672899e83c53985034) [CANCELED]
> 2019-07-23 10:52:01,572 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor-
> Un-registering task and sending final execution state CANCELED to
> JobManager for task Source: Custom Source -> Map -> Filter -> Map -> Map ->
> to: Row -> Map -> Sink: Unnamed 02dd0b294a75cb672899e83c53985034.
> 2019-07-23 10:52:01,574 ERROR
> org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL:
> Thread 'flink-metrics-57' produced an uncaught exception. Stopping the
> process...
> java.lang.NoClassDefFoundError:
> org/apache/kafka/common/metrics/stats/Rate$1
> at org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:98)
> at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:67)
> at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
> at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
> at
> org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:39)
> at
> org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:29)
> at
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:254)
> at
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:53)
> at
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:166)
> at
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:118)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor

回复: 回复: flink kafka相关任务执行出错 导致 taskmanager 退出

2019-07-25 文章 rockey...@163.com
谢谢,我们这边排查一下



rockey...@163.com
 
发件人: athlon...@gmail.com
发送时间: 2019-07-25 17:30
收件人: user-zh
主题: 回复: flink kafka相关任务执行出错 导致 taskmanager 退出
 
看看classpath里有没有kafka-clients 
jar包,这个类是在这个里面的Rate类没有找到如果有那么看看你使用的kafka版本和kafka-clients是否匹配,我遇到类似的问题由于版本不同导致有的类找不到

 
 
athlon...@gmail.com
发件人: rockey...@163.com
发送时间: 2019-07-25 17:26
收件人: user-zh
主题: flink kafka相关任务执行出错 导致 taskmanager 退出
你好,flink 运行过程中出现以下异常,有何建议指导排除异常,
2019-07-23 10:52:01,420 INFO  org.apache.kafka.common.utils.AppInfoParser   
- Kafka version : 0.9.0.1
2019-07-23 10:52:01,420 INFO  org.apache.kafka.common.utils.AppInfoParser   
- Kafka commitId : 23c69d62a0cabf06
2019-07-23 10:52:01,560 INFO  org.apache.flink.runtime.taskmanager.Task 
- Attempting to cancel task Source: Custom Source -> Map -> Filter 
-> Map -> Map -> to: Row -> Map -> Sink: Unnamed (3/3) 
(02dd0b294a75cb672899e83c53985034).
2019-07-23 10:52:01,560 INFO  org.apache.flink.runtime.taskmanager.Task 
- Source: Custom Source -> Map -> Filter -> Map -> Map -> to: Row 
-> Map -> Sink: Unnamed (3/3) (02dd0b294a75cb672899e83c53985034) switched from 
RUNNING to CANCELING.
2019-07-23 10:52:01,560 INFO  org.apache.flink.runtime.taskmanager.Task 
- Triggering cancellation of task code Source: Custom Source -> Map 
-> Filter -> Map -> Map -> to: Row -> Map -> Sink: Unnamed (3/3) 
(02dd0b294a75cb672899e83c53985034).
2019-07-23 10:52:01,563 INFO  org.apache.kafka.clients.producer.KafkaProducer   
- Closing the Kafka producer with timeoutMillis = 
9223372036854775807 ms.
2019-07-23 10:52:01,571 INFO  org.apache.flink.runtime.taskmanager.Task 
- Source: Custom Source -> Map -> Filter -> Map -> Map -> to: Row 
-> Map -> Sink: Unnamed (3/3) (02dd0b294a75cb672899e83c53985034) switched from 
CANCELING to CANCELED.
2019-07-23 10:52:01,571 INFO  org.apache.flink.runtime.taskmanager.Task 
- Freeing task resources for Source: Custom Source -> Map -> Filter 
-> Map -> Map -> to: Row -> Map -> Sink: Unnamed (3/3) 
(02dd0b294a75cb672899e83c53985034).
2019-07-23 10:52:01,572 INFO  org.apache.flink.runtime.taskmanager.Task 
- Ensuring all FileSystem streams are closed for task Source: 
Custom Source -> Map -> Filter -> Map -> Map -> to: Row -> Map -> Sink: Unnamed 
(3/3) (02dd0b294a75cb672899e83c53985034) [CANCELED]
2019-07-23 10:52:01,572 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Un-registering 
task and sending final execution state CANCELED to JobManager for task Source: 
Custom Source -> Map -> Filter -> Map -> Map -> to: Row -> Map -> Sink: Unnamed 
02dd0b294a75cb672899e83c53985034.
2019-07-23 10:52:01,574 ERROR 
org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL: Thread 
'flink-metrics-57' produced an uncaught exception. Stopping the process...
java.lang.NoClassDefFoundError: org/apache/kafka/common/metrics/stats/Rate$1
at org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:98)
at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:67)
at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
at 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:39)
at 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:29)
at 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:254)
at 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:53)
at 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:166)
at 
org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:118)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: 
org.apache.kafka.common.metrics.stats.Rate$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassL

回复: flink kafka相关任务执行出错 导致 taskmanager 退出

2019-07-25 文章 athlon...@gmail.com

看看classpath里有没有kafka-clients 
jar包,这个类是在这个里面的Rate类没有找到如果有那么看看你使用的kafka版本和kafka-clients是否匹配,我遇到类似的问题由于版本不同导致有的类找不到



athlon...@gmail.com
 
发件人: rockey...@163.com
发送时间: 2019-07-25 17:26
收件人: user-zh
主题: flink kafka相关任务执行出错 导致 taskmanager 退出
你好,flink 运行过程中出现以下异常,有何建议指导排除异常,
 
 
2019-07-23 10:52:01,420 INFO  org.apache.kafka.common.utils.AppInfoParser   
- Kafka version : 0.9.0.1
2019-07-23 10:52:01,420 INFO  org.apache.kafka.common.utils.AppInfoParser   
- Kafka commitId : 23c69d62a0cabf06
2019-07-23 10:52:01,560 INFO  org.apache.flink.runtime.taskmanager.Task 
- Attempting to cancel task Source: Custom Source -> Map -> Filter 
-> Map -> Map -> to: Row -> Map -> Sink: Unnamed (3/3) 
(02dd0b294a75cb672899e83c53985034).
2019-07-23 10:52:01,560 INFO  org.apache.flink.runtime.taskmanager.Task 
- Source: Custom Source -> Map -> Filter -> Map -> Map -> to: Row 
-> Map -> Sink: Unnamed (3/3) (02dd0b294a75cb672899e83c53985034) switched from 
RUNNING to CANCELING.
2019-07-23 10:52:01,560 INFO  org.apache.flink.runtime.taskmanager.Task 
- Triggering cancellation of task code Source: Custom Source -> Map 
-> Filter -> Map -> Map -> to: Row -> Map -> Sink: Unnamed (3/3) 
(02dd0b294a75cb672899e83c53985034).
2019-07-23 10:52:01,563 INFO  org.apache.kafka.clients.producer.KafkaProducer   
- Closing the Kafka producer with timeoutMillis = 
9223372036854775807 ms.
2019-07-23 10:52:01,571 INFO  org.apache.flink.runtime.taskmanager.Task 
- Source: Custom Source -> Map -> Filter -> Map -> Map -> to: Row 
-> Map -> Sink: Unnamed (3/3) (02dd0b294a75cb672899e83c53985034) switched from 
CANCELING to CANCELED.
2019-07-23 10:52:01,571 INFO  org.apache.flink.runtime.taskmanager.Task 
- Freeing task resources for Source: Custom Source -> Map -> Filter 
-> Map -> Map -> to: Row -> Map -> Sink: Unnamed (3/3) 
(02dd0b294a75cb672899e83c53985034).
2019-07-23 10:52:01,572 INFO  org.apache.flink.runtime.taskmanager.Task 
- Ensuring all FileSystem streams are closed for task Source: 
Custom Source -> Map -> Filter -> Map -> Map -> to: Row -> Map -> Sink: Unnamed 
(3/3) (02dd0b294a75cb672899e83c53985034) [CANCELED]
2019-07-23 10:52:01,572 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Un-registering 
task and sending final execution state CANCELED to JobManager for task Source: 
Custom Source -> Map -> Filter -> Map -> Map -> to: Row -> Map -> Sink: Unnamed 
02dd0b294a75cb672899e83c53985034.
2019-07-23 10:52:01,574 ERROR 
org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL: Thread 
'flink-metrics-57' produced an uncaught exception. Stopping the process...
java.lang.NoClassDefFoundError: org/apache/kafka/common/metrics/stats/Rate$1
at org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:98)
at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:67)
at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
at 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:39)
at 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:29)
at 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:254)
at 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:53)
at 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:166)
at 
org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:118)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: 
org.apache.kafka.common.metrics.stats.Rate$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoa

Re: flink kafka相关任务执行出错 导致 taskmanager 退出

2019-07-25 文章 zhisheng
hi,rockeycui
你是使用了 flink-metrics-prometheus 去 report metrics 数据吗?看到不少和 metrics 相关的字眼

rockey...@163.com  于2019年7月25日周四 下午5:12写道:

> 你好,flink 运行过程中出现以下异常,有何建议指导排除异常,
>
> 2019-07-23 10:52:01,420 INFO  org.apache.kafka.common.utils.AppInfoParser
>  - Kafka version : 0.9.0.1
> 2019-07-23 10:52:01,420 INFO  org.apache.kafka.common.utils.AppInfoParser
>  - Kafka commitId : 23c69d62a0cabf06
> 2019-07-23 10:52:01,560 INFO  org.apache.flink.runtime.taskmanager.Task
>  - Attempting to cancel task Source: Custom Source -> Map
> -> Filter -> Map -> Map -> to: Row -> Map -> Sink: Unnamed (3/3)
> (02dd0b294a75cb672899e83c53985034).
> 2019-07-23 10:52:01,560 INFO  org.apache.flink.runtime.taskmanager.Task
>  - Source: Custom Source -> Map -> Filter -> Map -> Map ->
> to: Row -> Map -> Sink: Unnamed (3/3) (02dd0b294a75cb672899e83c53985034)
> switched from RUNNING to CANCELING.
> 2019-07-23 10:52:01,560 INFO  org.apache.flink.runtime.taskmanager.Task
>  - Triggering cancellation of task code Source: Custom
> Source -> Map -> Filter -> Map -> Map -> to: Row -> Map -> Sink: Unnamed
> (3/3) (02dd0b294a75cb672899e83c53985034).
> 2019-07-23 10:52:01,563 INFO
> org.apache.kafka.clients.producer.KafkaProducer   - Closing the
> Kafka producer with timeoutMillis = 9223372036854775807 ms.
> 2019-07-23 10:52:01,571 INFO  org.apache.flink.runtime.taskmanager.Task
>  - Source: Custom Source -> Map -> Filter -> Map -> Map ->
> to: Row -> Map -> Sink: Unnamed (3/3) (02dd0b294a75cb672899e83c53985034)
> switched from CANCELING to CANCELED.
> 2019-07-23 10:52:01,571 INFO  org.apache.flink.runtime.taskmanager.Task
>  - Freeing task resources for Source: Custom Source -> Map
> -> Filter -> Map -> Map -> to: Row -> Map -> Sink: Unnamed (3/3)
> (02dd0b294a75cb672899e83c53985034).
> 2019-07-23 10:52:01,572 INFO  org.apache.flink.runtime.taskmanager.Task
>  - Ensuring all FileSystem streams are closed for task
> Source: Custom Source -> Map -> Filter -> Map -> Map -> to: Row -> Map ->
> Sink: Unnamed (3/3) (02dd0b294a75cb672899e83c53985034) [CANCELED]
> 2019-07-23 10:52:01,572 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor-
> Un-registering task and sending final execution state CANCELED to
> JobManager for task Source: Custom Source -> Map -> Filter -> Map -> Map ->
> to: Row -> Map -> Sink: Unnamed 02dd0b294a75cb672899e83c53985034.
> 2019-07-23 10:52:01,574 ERROR
> org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL:
> Thread 'flink-metrics-57' produced an uncaught exception. Stopping the
> process...
> java.lang.NoClassDefFoundError:
> org/apache/kafka/common/metrics/stats/Rate$1
> at org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:98)
> at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:67)
> at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
> at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
> at
> org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:39)
> at
> org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:29)
> at
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:254)
> at
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:53)
> at
> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:166)
> at
> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:118)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.kafka.common.metrics.stats.Rate$1
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 20 more
> 2019-07-23 10:52:01,576 INFO
> org.apache.flink.runtime.blob.TransientBlobCache  

flink kafka相关任务执行出错 导致 taskmanager 退出

2019-07-25 文章 rockey...@163.com
你好,flink 运行过程中出现以下异常,有何建议指导排除异常,


2019-07-23 10:52:01,420 INFO  org.apache.kafka.common.utils.AppInfoParser   
- Kafka version : 0.9.0.1
2019-07-23 10:52:01,420 INFO  org.apache.kafka.common.utils.AppInfoParser   
- Kafka commitId : 23c69d62a0cabf06
2019-07-23 10:52:01,560 INFO  org.apache.flink.runtime.taskmanager.Task 
- Attempting to cancel task Source: Custom Source -> Map -> Filter 
-> Map -> Map -> to: Row -> Map -> Sink: Unnamed (3/3) 
(02dd0b294a75cb672899e83c53985034).
2019-07-23 10:52:01,560 INFO  org.apache.flink.runtime.taskmanager.Task 
- Source: Custom Source -> Map -> Filter -> Map -> Map -> to: Row 
-> Map -> Sink: Unnamed (3/3) (02dd0b294a75cb672899e83c53985034) switched from 
RUNNING to CANCELING.
2019-07-23 10:52:01,560 INFO  org.apache.flink.runtime.taskmanager.Task 
- Triggering cancellation of task code Source: Custom Source -> Map 
-> Filter -> Map -> Map -> to: Row -> Map -> Sink: Unnamed (3/3) 
(02dd0b294a75cb672899e83c53985034).
2019-07-23 10:52:01,563 INFO  org.apache.kafka.clients.producer.KafkaProducer   
- Closing the Kafka producer with timeoutMillis = 
9223372036854775807 ms.
2019-07-23 10:52:01,571 INFO  org.apache.flink.runtime.taskmanager.Task 
- Source: Custom Source -> Map -> Filter -> Map -> Map -> to: Row 
-> Map -> Sink: Unnamed (3/3) (02dd0b294a75cb672899e83c53985034) switched from 
CANCELING to CANCELED.
2019-07-23 10:52:01,571 INFO  org.apache.flink.runtime.taskmanager.Task 
- Freeing task resources for Source: Custom Source -> Map -> Filter 
-> Map -> Map -> to: Row -> Map -> Sink: Unnamed (3/3) 
(02dd0b294a75cb672899e83c53985034).
2019-07-23 10:52:01,572 INFO  org.apache.flink.runtime.taskmanager.Task 
- Ensuring all FileSystem streams are closed for task Source: 
Custom Source -> Map -> Filter -> Map -> Map -> to: Row -> Map -> Sink: Unnamed 
(3/3) (02dd0b294a75cb672899e83c53985034) [CANCELED]
2019-07-23 10:52:01,572 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Un-registering 
task and sending final execution state CANCELED to JobManager for task Source: 
Custom Source -> Map -> Filter -> Map -> Map -> to: Row -> Map -> Sink: Unnamed 
02dd0b294a75cb672899e83c53985034.
2019-07-23 10:52:01,574 ERROR 
org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL: Thread 
'flink-metrics-57' produced an uncaught exception. Stopping the process...
java.lang.NoClassDefFoundError: org/apache/kafka/common/metrics/stats/Rate$1
at org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:98)
at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:67)
at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
at 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:39)
at 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:29)
at 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:254)
at 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:53)
at 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:166)
at 
org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:118)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: 
org.apache.kafka.common.metrics.stats.Rate$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 20 more
2019-07-23 10:52:01,576 INFO  org.apache.flink.runtime.blob.TransientBlobCache  
- Shutting down BLOB cache
2019-07-23 10:52:01,576 INFO  org.apache.flink.runtime.blob.PermanentBlobCache  
- Shutting down BLOB cache
2019-07-23 10:52:01,577 INFO  

flink kafka相关任务执行出错 导致 taskmanager 退出

2019-07-25 文章 rockey...@163.com
你好,flink 运行过程中出现以下异常,有何建议指导排除异常,

2019-07-23 10:52:01,420 INFO  org.apache.kafka.common.utils.AppInfoParser   
- Kafka version : 0.9.0.1
2019-07-23 10:52:01,420 INFO  org.apache.kafka.common.utils.AppInfoParser   
- Kafka commitId : 23c69d62a0cabf06
2019-07-23 10:52:01,560 INFO  org.apache.flink.runtime.taskmanager.Task 
- Attempting to cancel task Source: Custom Source -> Map -> Filter 
-> Map -> Map -> to: Row -> Map -> Sink: Unnamed (3/3) 
(02dd0b294a75cb672899e83c53985034).
2019-07-23 10:52:01,560 INFO  org.apache.flink.runtime.taskmanager.Task 
- Source: Custom Source -> Map -> Filter -> Map -> Map -> to: Row 
-> Map -> Sink: Unnamed (3/3) (02dd0b294a75cb672899e83c53985034) switched from 
RUNNING to CANCELING.
2019-07-23 10:52:01,560 INFO  org.apache.flink.runtime.taskmanager.Task 
- Triggering cancellation of task code Source: Custom Source -> Map 
-> Filter -> Map -> Map -> to: Row -> Map -> Sink: Unnamed (3/3) 
(02dd0b294a75cb672899e83c53985034).
2019-07-23 10:52:01,563 INFO  org.apache.kafka.clients.producer.KafkaProducer   
- Closing the Kafka producer with timeoutMillis = 
9223372036854775807 ms.
2019-07-23 10:52:01,571 INFO  org.apache.flink.runtime.taskmanager.Task 
- Source: Custom Source -> Map -> Filter -> Map -> Map -> to: Row 
-> Map -> Sink: Unnamed (3/3) (02dd0b294a75cb672899e83c53985034) switched from 
CANCELING to CANCELED.
2019-07-23 10:52:01,571 INFO  org.apache.flink.runtime.taskmanager.Task 
- Freeing task resources for Source: Custom Source -> Map -> Filter 
-> Map -> Map -> to: Row -> Map -> Sink: Unnamed (3/3) 
(02dd0b294a75cb672899e83c53985034).
2019-07-23 10:52:01,572 INFO  org.apache.flink.runtime.taskmanager.Task 
- Ensuring all FileSystem streams are closed for task Source: 
Custom Source -> Map -> Filter -> Map -> Map -> to: Row -> Map -> Sink: Unnamed 
(3/3) (02dd0b294a75cb672899e83c53985034) [CANCELED]
2019-07-23 10:52:01,572 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Un-registering 
task and sending final execution state CANCELED to JobManager for task Source: 
Custom Source -> Map -> Filter -> Map -> Map -> to: Row -> Map -> Sink: Unnamed 
02dd0b294a75cb672899e83c53985034.
2019-07-23 10:52:01,574 ERROR 
org.apache.flink.runtime.util.FatalExitExceptionHandler   - FATAL: Thread 
'flink-metrics-57' produced an uncaught exception. Stopping the process...
java.lang.NoClassDefFoundError: org/apache/kafka/common/metrics/stats/Rate$1
at org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:98)
at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:67)
at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
at 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:39)
at 
org.apache.flink.streaming.connectors.kafka.internal.metrics.KafkaMetricMutableWrapper.getValue(KafkaMetricMutableWrapper.java:29)
at 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:254)
at 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:53)
at 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:166)
at 
org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:118)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: 
org.apache.kafka.common.metrics.stats.Rate$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 20 more
2019-07-23 10:52:01,576 INFO  org.apache.flink.runtime.blob.TransientBlobCache  
- Shutting down BLOB cache
2019-07-23 10:52:01,576 INFO  org.apache.flink.runtime.blob.PermanentBlobCache  
- Shutting down BLOB cache
2019-07-23 10:52:01,577 INFO  

flink kafka source在并行分布式下是怎么确定一个subtask消费哪个kafka partition的?

2019-05-23 文章 junming liu
Hi All,

我们写kafka 
comsumer通常都不需要去管消费哪个partition,comsumer会根据partition.assignment.strategy设置的分配策略自动协商分配每个线程消费哪个或者哪些分区

但在FlinkKafkaConsumer中调用的KafkaConsumerThread这个消费线程代码中有如下代码


try {
   if (hasAssignedPartitions) {
  newPartitions = unassignedPartitionsQueue.pollBatch();
   }
   else {
  // if no assigned partitions block until we get at least one
  // instead of hot spinning this loop. We rely on a fact that
  // unassignedPartitionsQueue will be closed on a shutdown, so
  // we don't block indefinitely
  newPartitions = unassignedPartitionsQueue.getBatchBlocking();
   }
   if (newPartitions != null) {
  reassignPartitions(newPartitions);
   }
} catch (AbortedReassignmentException e) {
   continue;
}

我们先假设我们设置的并行度和kafka 
partition数是相等情况下来看以下问题,正常情况应该是一个subtask对应消费一个partition,并关注新程序首次运行初始化时的情况

问题1:KafkaConsumerThread这个线程是不是会运行在每个subtask里面?

问题2:默认新程序初始化时应该会执行unassignedPartitionsQueue.getBatchBlocking(),这里一个subtask是不是有可能获取多个partition的情况?它是怎么确保只获取一个分区的?

问题3:感觉我的理解有问题,能大概基于代码讲讲每个subtask确定消费哪个分区的流程吗?

非常感谢!!

Best,
YunKillerE


Re:Re: flink-kafka Trigger 无法触发问题

2019-05-15 文章 13341000780
非常感谢您的答复。接下来按照您提供的思路进行排查








在 2019-05-15 16:23:04,"Terry Wang"  写道:
>有可能是并行度设置大时,source的部分并发没有数据,导致eventTime未更新。可以排查下是否是这个问题
>
>> 在 2019年5月15日,下午2:18,13341000780 <13341000...@163.com> 写道:
>> 
>> hi, 各位大牛好!
>>   自定义了窗口触发器trigger,在onElement函数中注册了EventTimeTimer。出现了很离奇的问题,当并行度Parallelism 
>> 设置的比slots数和CPU核数小时,能成功触发onEventTime函数,当大于slots数或者大于CPU核数时,发现无法触发onEventTime,已确定元素能成功进入窗口,即onElement函数能成功触发。有人遇到过类似的问题吗,求解答。
>> 
>> 
>> 非常感谢.
>> 
>> 
>> 
>> 
>> 
>


Re: flink-kafka Trigger 无法触发问题

2019-05-15 文章 Terry Wang
有可能是并行度设置大时,source的部分并发没有数据,导致eventTime未更新。可以排查下是否是这个问题

> 在 2019年5月15日,下午2:18,13341000780 <13341000...@163.com> 写道:
> 
> hi, 各位大牛好!
>   自定义了窗口触发器trigger,在onElement函数中注册了EventTimeTimer。出现了很离奇的问题,当并行度Parallelism 
> 设置的比slots数和CPU核数小时,能成功触发onEventTime函数,当大于slots数或者大于CPU核数时,发现无法触发onEventTime,已确定元素能成功进入窗口,即onElement函数能成功触发。有人遇到过类似的问题吗,求解答。
> 
> 
> 非常感谢.
> 
> 
> 
> 
> 



flink-kafka Trigger 无法触发问题

2019-05-15 文章 13341000780
hi, 各位大牛好!
   自定义了窗口触发器trigger,在onElement函数中注册了EventTimeTimer。出现了很离奇的问题,当并行度Parallelism 
设置的比slots数和CPU核数小时,能成功触发onEventTime函数,当大于slots数或者大于CPU核数时,发现无法触发onEventTime,已确定元素能成功进入窗口,即onElement函数能成功触发。有人遇到过类似的问题吗,求解答。


非常感谢.





 

Re: flink-kafka 消费组问题

2019-04-30 文章 Becket Qin
Flink 的Kafka source没有使用 Kafka 本身的Consumer Group管理机制。所以不同Flink job 即使使用同样的
group id 也会消费到同样的消息。

On Mon, Apr 22, 2019 at 1:24 PM 13341000780 <13341000...@163.com> wrote:

> hi, 各位大牛好!
> kafka作为数据源时,在 Properties 
> 中设置了group.id,信息,但是同一个group下的consumer中能消费到同一个主题下想同一条消息。
> 另外我单独使用  kafka consumer时就不会出现这种情况,即同一个消费组下只能有一个 consume 消费到同一条消息。
> 各位大牛有人遇到过这个问题吗?望赐教。
>
>
> 非常感谢.


flink-kafka 消费组问题

2019-04-21 文章 13341000780
hi, 各位大牛好!
kafka作为数据源时,在 Properties 
中设置了group.id,信息,但是同一个group下的consumer中能消费到同一个主题下想同一条消息。  另外我单独使用  kafka 
consumer时就不会出现这种情况,即同一个消费组下只能有一个 consume 消费到同一条消息。
各位大牛有人遇到过这个问题吗?望赐教。


非常感谢.