????mq????????????????????????????????

2020-02-20 文章 claylin
Hi event 
timekafkatcp(??tcpRecv-Q)
org.apache.kafka.clients.FetchSessionHandler - [Consumer 
clientId=consumer-108, 
groupId=push-life-cycle-trace-consumer-for-flink-1.1.0-no-checkpoint] Error 
sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 213: 
org.apache.kafka.common.errors.DisconnectException.




Re: How does Flink manage the kafka offset

2020-02-20 文章 Jin Yi
Hi Benchao,

Thanks a lot!
Eleanore

On Thu, Feb 20, 2020 at 4:30 PM Benchao Li  wrote:

> Hi Jin,
>
> See below inline replies:
>
> My understanding is, upon startup, Flink Job Manager will contact kafka to
>> get the offset for each partition for this consume group, and distribute
>> the task to task managers, and it does not use kafka to manage the consumer
>> group.
>
>
> Generally, yes. If you are not using checkpoint, and starting from
> group-offsets, Flink will read offset from Kafka at startup.
>
> and when the 2nd job cluster starts up, it does the same thing, so the 1st
>> job cluster is not aware of there are new consumers from the same consume
>> group have joined.
>
>
> Yes.
>
> But if I add more task managers to the same job cluster, then job manager
>> is aware of more consumers from this consume group has joined, and it will
>> rebalance the partition consumption if needed.
>
>
> No. Flink does not rebalance the partitions when new task managers joined
> cluster. It only did so when job restarts and job parallelism changes.
>
> Hope it helps.
>
> Jin Yi  于2020年2月21日周五 上午6:14写道:
>
>> Hi there,
>>
>> We are running apache beam application with flink being the runner.
>>
>> We use the KafkaIO connector to read from topics:
>> https://beam.apache.org/releases/javadoc/2.19.0/
>>
>> and we have the following configuration, which enables auto commit of
>> offset, no checkpointing is enabled, and it is performing element wise
>> processing.
>>
>> So we run our application in Flink Job Cluster mode, and if I run the
>> same job twice, meaning start 2 flink job clusters, then I see message
>> being processed twice.
>>
>> My understanding is, upon startup, Flink Job Manager will contact kafka
>> to get the offset for each partition for this consume group, and distribute
>> the task to task managers, and it does not use kafka to manage the consumer
>> group.
>>
>> and when the 2nd job cluster starts up, it does the same thing, so the
>> 1st job cluster is not aware of there are new consumers from the same
>> consume group have joined.
>>
>> But if I add more task managers to the same job cluster, then job manager
>> is aware of more consumers from this consume group has joined, and it will
>> rebalance the partition consumption if needed.
>>
>> Is my understanding correct?
>>
>> Thanks a lot!
>> Eleanore
>>
>> Map consumerConfig = ImmutableMap.> Object>builder()
>> .put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup)
>> .put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
>> .put(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG")
>> .build();
>>
>> return KafkaIO.read()
>> .withBootstrapServers(kafkaSettings.getBootstrapServers())
>> .withTopic(topic)
>> .withKeyDeserializer(KeyDeserializer.class)
>> .withValueDeserializerAndCoder(getDeserializer(encoding), new
>> JsonNodeCoder<>())
>> .withConsumerConfigUpdates(consumerConfig)
>> .withoutMetadata();
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Flink任务AMRMToken失效问题

2020-02-20 文章 faaron zheng
Hi,大家好,
请教一个flink任务正常运行一段时间后因为AMRMToken失效导致任务失败的问题。当前使用的环境Flink1.7.2,使用kerberos鉴权,hadoop3.1.1。
JM日志一直checkpoint正常,突然报了附件的错误


社区有个相关的issue单,Flink-12623但是说是和hadoop版本有关的。想问下除了这个原因还有什么原因会导致这个问题出现么?


Re: Re: flink-1.10.0通过run -m yarn-cluster提交任务时异常

2020-02-20 文章 tison
常见问题。

现在 Flink 不 bundle hadoop,所以你要设置下 HADOOP_CLASSPATH

Best,
tison.


amenhub  于2020年2月18日周二 上午11:51写道:

> hi, Weihua
>
>
> 如你所说,我想要通过flink on yarn的run方式提交任务到集群上,但是当我运行./bin/flink run -m
> yarn-cluster ../examples/batch/WordCount.jar ,还是一样的错误,
> 日志信息只有这么一些;如果按您所说,是因为没有成功加载FlinkYarnSessionCli导致的,那导致没有成功加载的原因有哪些方面呢?谢谢!
>
>
> 祝好,amenhub
>
>
>
>
>
>
>
> 在 2020-02-18 11:29:13,"Weihua Hu"  写道:
> >Hi, amenhub
> >
> >你应该是要把作业提交到 yarn 上吧。这个错误应该没有正确的加载 FlinkYarnSessionCli
> 导致的,这些日志不是失败的根因。可以多提供一些日志看看。
> >
> >
> >Best
> >Weihua Hu
> >
> >> 2020年2月18日 10:56,amenhub  写道:
> >>
> >> parseHostPortAddress
> >
>


Re: How does Flink manage the kafka offset

2020-02-20 文章 Benchao Li
Hi Jin,

See below inline replies:

My understanding is, upon startup, Flink Job Manager will contact kafka to
> get the offset for each partition for this consume group, and distribute
> the task to task managers, and it does not use kafka to manage the consumer
> group.


Generally, yes. If you are not using checkpoint, and starting from
group-offsets, Flink will read offset from Kafka at startup.

and when the 2nd job cluster starts up, it does the same thing, so the 1st
> job cluster is not aware of there are new consumers from the same consume
> group have joined.


Yes.

But if I add more task managers to the same job cluster, then job manager
> is aware of more consumers from this consume group has joined, and it will
> rebalance the partition consumption if needed.


No. Flink does not rebalance the partitions when new task managers joined
cluster. It only did so when job restarts and job parallelism changes.

Hope it helps.

Jin Yi  于2020年2月21日周五 上午6:14写道:

> Hi there,
>
> We are running apache beam application with flink being the runner.
>
> We use the KafkaIO connector to read from topics:
> https://beam.apache.org/releases/javadoc/2.19.0/
>
> and we have the following configuration, which enables auto commit of
> offset, no checkpointing is enabled, and it is performing element wise
> processing.
>
> So we run our application in Flink Job Cluster mode, and if I run the same
> job twice, meaning start 2 flink job clusters, then I see message being
> processed twice.
>
> My understanding is, upon startup, Flink Job Manager will contact kafka to
> get the offset for each partition for this consume group, and distribute
> the task to task managers, and it does not use kafka to manage the consumer
> group.
>
> and when the 2nd job cluster starts up, it does the same thing, so the 1st
> job cluster is not aware of there are new consumers from the same consume
> group have joined.
>
> But if I add more task managers to the same job cluster, then job manager
> is aware of more consumers from this consume group has joined, and it will
> rebalance the partition consumption if needed.
>
> Is my understanding correct?
>
> Thanks a lot!
> Eleanore
>
> Map consumerConfig = ImmutableMap.builder()
> .put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup)
> .put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
> .put(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG")
> .build();
>
> return KafkaIO.read()
> .withBootstrapServers(kafkaSettings.getBootstrapServers())
> .withTopic(topic)
> .withKeyDeserializer(KeyDeserializer.class)
> .withValueDeserializerAndCoder(getDeserializer(encoding), new
> JsonNodeCoder<>())
> .withConsumerConfigUpdates(consumerConfig)
> .withoutMetadata();
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


How does Flink manage the kafka offset

2020-02-20 文章 Jin Yi
Hi there,

We are running apache beam application with flink being the runner.

We use the KafkaIO connector to read from topics:
https://beam.apache.org/releases/javadoc/2.19.0/

and we have the following configuration, which enables auto commit of
offset, no checkpointing is enabled, and it is performing element wise
processing.

So we run our application in Flink Job Cluster mode, and if I run the same
job twice, meaning start 2 flink job clusters, then I see message being
processed twice.

My understanding is, upon startup, Flink Job Manager will contact kafka to
get the offset for each partition for this consume group, and distribute
the task to task managers, and it does not use kafka to manage the consumer
group.

and when the 2nd job cluster starts up, it does the same thing, so the 1st
job cluster is not aware of there are new consumers from the same consume
group have joined.

But if I add more task managers to the same job cluster, then job manager
is aware of more consumers from this consume group has joined, and it will
rebalance the partition consumption if needed.

Is my understanding correct?

Thanks a lot!
Eleanore

Map consumerConfig = ImmutableMap.builder()
.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup)
.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
.put(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG")
.build();

return KafkaIO.read()
.withBootstrapServers(kafkaSettings.getBootstrapServers())
.withTopic(topic)
.withKeyDeserializer(KeyDeserializer.class)
.withValueDeserializerAndCoder(getDeserializer(encoding), new
JsonNodeCoder<>())
.withConsumerConfigUpdates(consumerConfig)
.withoutMetadata();


请问一下FLINK-14091 这个JIRA 是否在FLINK17 中也存在

2020-02-20 文章 tao wang
https://issues.apache.org/jira/browse/FLINK-14091

。
现在在生产环境中,FLINK 1.7 遇到了同样的问题,导致checkpoint 失败。


Re: [ANNOUNCE] Apache Flink Python API(PyFlink) 1.9.2 released

2020-02-20 文章 Xingbo Huang
Thanks a lot for the release.
Great Work, Jincheng!
Also thanks to participants who contribute to this release.

Best,
Xingbo


Till Rohrmann  于2020年2月18日周二 下午11:40写道:

> Thanks for updating the 1.9.2 release wrt Flink's Python API Jincheng!
>
> Cheers,
> Till
>
> On Thu, Feb 13, 2020 at 12:25 PM Hequn Cheng  wrote:
>
>> Thanks a lot for the release, Jincheng!
>> Also thanks to everyone that make this release possible!
>>
>> Best,
>> Hequn
>>
>> On Thu, Feb 13, 2020 at 2:18 PM Dian Fu  wrote:
>>
>> > Thanks for the great work, Jincheng.
>> >
>> > Regards,
>> > Dian
>> >
>> > 在 2020年2月13日,下午1:32,jincheng sun  写道:
>> >
>> > Hi everyone,
>> >
>> > The Apache Flink community is very happy to announce the release of
>> Apache
>> > Flink Python API(PyFlink) 1.9.2, which is the first release to PyPI for
>> the
>> > Apache Flink Python API 1.9 series.
>> >
>> > Apache Flink® is an open-source stream processing framework for
>> > distributed, high-performing, always-available, and accurate data
>> streaming
>> > applications.
>> >
>> > The release is available for download at:
>> >
>> > https://pypi.org/project/apache-flink/1.9.2/#files
>> >
>> > Or installed using pip command:
>> >
>> > pip install apache-flink==1.9.2
>> >
>> > We would like to thank all contributors of the Apache Flink community
>> who
>> > helped to verify this release and made this release possible!
>> >
>> > Best,
>> > Jincheng
>> >
>> >
>> >
>>
>


Re: flink rocksdb状态后端物理内存溢出的问题

2020-02-20 文章 Yu Li
建议升级到1.10.0版本,该版本默认对RocksDB backend的内存使用会有限制,更多资料请参考官方文档 [1]。

Best Regards,
Yu

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/state_backends.html#memory-management


On Thu, 20 Feb 2020 at 17:42, chanamper  wrote:

> 请教一下,我采用flink
> 1.8版本,状态后端采用rocksdb方式,任务运行一段时间后containter会出现物理内存溢出,单个containter的内存为10G、堆内存使用很少仅1G左右。这种情况下我应该如何分析内存占用情况?


flink rocksdb状态后端物理内存溢出的问题

2020-02-20 文章 chanamper
请教一下,我采用flink 
1.8版本,状态后端采用rocksdb方式,任务运行一段时间后containter会出现物理内存溢出,单个containter的内存为10G、堆内存使用很少仅1G左右。这种情况下我应该如何分析内存占用情况?

使用flink sql join临时表,出现异常(Flink-1.10.0)

2020-02-20 文章 amenhub
各位好:


  Flink-1.10.0 可以使用处理时间属性进行 temporal join,当我尝试以下面的 sql 提交 flink 任务时,
【 SELECT m.name, m.age, m.score FROM mysql_out AS m JOIN kafka_out FOR 
SYSTEM_TIME AS OF m.update_time AS k ON m.name = k.name 】


  出现了如下异常:
【 Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There 
are not enough rules to produce a node with desired properties: 
convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, 
MiniBatchIntervalTraitDef=None: 0, UpdateAsRetractionTraitDef=false, 
AccModeTraitDef=UNKNOWN.Missing conversion is FlinkLogicalJoin[convention: 
LOGICAL -> STREAM_PHYSICAL] 】


请大佬答疑解惑,感谢


祝好,amenhub