Re: FlinkKafkaProducer 开启Excatly Once之后 初始化事务状态超时的问题

2019-09-01 Thread Wesley Peng

Hi

on 2019/9/2 11:49, 陈赋赟 wrote:

2019-09-02 10:24:28,599 INFO  org.apache.flink.runtime.taskmanager.Task
 - Interval Join -> Sink: Unnamed (1/4) 
(e8b85b6f144879efbb0b4209f226c69b) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.TimeoutException: Timeout expired while 
initializing transactional state in 6ms.


You may reference this:

https://stackoverflow.com/questions/54295588/kafka-streams-failed-to-rebalance-error

Possible options:

As this answer says, switch off Exactly Once for your streamer. It then 
doesn't use transactions and all seems to work ok. Not helpful if you 
require EOS or some other client code requires transactions.
restart any brokers that are reporting warnings to force them to 
re-resolve the IP address. They would need to be restarted in a way that 
they don't change IP address themselves. Not usually possible in kubernetes.
Defect raised Issue KAFKA-7958 - Transactions are broken with kubernetes 
hosted brokers


Update 2017-02-20 This may have been resolved in Kafka 2.1.1 (Confluent 
5.1.2) released today. See the linked issue.


Re:FlinkKafkaProducer 开启Excatly Once之后 初始化事务状态超时的问题

2019-09-01 Thread 陈赋赟




2019-09-02 10:24:28,599 INFO  org.apache.flink.runtime.taskmanager.Task 
- Interval Join -> Sink: Unnamed (1/4) 
(e8b85b6f144879efbb0b4209f226c69b) switched from RUNNING to FAILED.
org.apache.kafka.common.errors.TimeoutException: Timeout expired while 
initializing transactional state in 6ms.



在 2019-09-02 11:29:35,"陈赋赟"  写道:

我在flink中使用了kafkaProducer 
并开启了ExcatlyOnce语义,第一次部署在测试环境启动的时候一切正常,然后我再上新版本的时候kill掉了之前的任务,并重现发布了一下,就出现了如下的问题日志里显示在做checkpoint的时候出现了初始化事务状态
 超时过期的异常。
具体异常如下:
 


checkpoint interval设置了30s执行一次
producer事务超时(transaction.timeout.ms)时间设置了5分钟




 

FlinkKafkaProducer 开启Excatly Once之后 初始化事务状态超时的问题

2019-09-01 Thread 陈赋赟
我在flink中使用了kafkaProducer 
并开启了ExcatlyOnce语义,第一次部署在测试环境启动的时候一切正常,然后我再上新版本的时候kill掉了之前的任务,并重现发布了一下,就出现了如下的问题日志里显示在做checkpoint的时候出现了初始化事务状态
 超时过期的异常。
具体异常如下:
 


checkpoint interval设置了30s执行一次
producer事务超时(transaction.timeout.ms)时间设置了5分钟

FLINK WEEKLY 2019/35

2019-09-01 Thread Zili Chen
FLINK WEEKLY 2019/35 

FLINK 社区正在如火如荼的开发 1.10 的新特性中,许多对 FLINK
现有局限的讨论,包括功能上的、配置上的和文档上的问题都在热烈的讨论中。上周,user-zh
列表活跃度大大增加,社区的开发者和使用者对用户的问题的回复也非常迅速,FLINK 中文社区的壮大有目共睹。本周仍然分为用户列表的问答,FLINK
开发的进展和社区事件三个部分为大家推送上周的 FLINK 社区新闻。
USER

flink 1.9 消费kafka报错


实际问题是使用 BLINK planner 的问题,阿里的开发者介绍了使用 BLINK planner 的姿势。

flink1.9 blink planner table ddl 使用问题

flink1.9
Blink planner create view 问题


同样是 BLINK planner 的使用姿势问题。

关于elasticSearch table sink 构造过于复杂


查询结果输出到 ES sink 的连接方式。

关于flink状态后端使用Rocksdb序列化问题


升级到 FLINK 1.8 使用 POJO Scheme Evolution 支持状态模式演化。

Checkpoint使用


作业从 Checkpoint 而不是 Savepoint 中恢复的方式,恢复时可以在一定程度上调整并行度。

FLINK 1.9 Docker 镜像 

FLINK 1.9 Docker 镜像已经发布,包括 Scala 2.11 和 2.12 的支持版本。

How can TMs distribute evenly over Flink on YARN cluster?


FLINK 目前无法保证在 YARN 上起作业的时候 TM 尽量分配到不同的节点上。

type error with generics


FLINK Java API 使用时有时需要手动添加类型信息,在 Scala 的情况下由于有 implicit 所以有时候两种 API 的表现很不相同。

Re: Flink operators for Kubernetes


k8s 上的 FLINK operator 已经由 Apache Beam 社区的成员开发出来了,有 FLINK on k8s 需求的同学可以尝试使用。

Is there Go client for Flink?


目前 FLINK 只有 Java Client 和 REST API,使用 Go 的用户可以通过 REST API 来控制 FLINK
作业的提交和监控。

How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?


FLINK 大作业包含大的 uberjar 的情况下的最佳实践,主要受限于 FLINK Resource Manager
的一些缺陷。阿里和腾讯的开发者都分享了自己处理大作业大包的方案。
DEV

[DISCUSS] FLIP-57 - Rework FunctionCatalog


Bowen Li 的 FLIP-57 旨在提供更好的 FLINK SQL 的开发和编写体验。

[DISCUSS] FLIP-60: Restructure the Table API & SQL documentation


Timo Walther 的 FLIP-60 旨在将 Table API & SQL 的文档从原来附属于 DataStream API
的情况提升为第一层级的文档。FLINK 的用户很多都通过编写 SQL 来实现自己的作业,文档的提升有助于改善用户开发时查阅相关信息的体验。

[DISCUSS] FLIP-59: Enable execution configuration from Configuration object


Dawid Wysakowicz 的 FLIP-59 与 FLIP-54 关系紧密,都是着重在改善 FLINK 的配置情况。目前,FLINK 的
execution configuration 只能在编写程序的时候从程序中设置,与其他许多配置可以通过配置文件或命令行参数等方法传递不一致。

[DISCUSS] Simplify Flink's cluster level RestartStrategy configuration


Till Rohrmann 发起了简化 FLINK 集群级别重启策略配置的讨论,目前 FLINK
的重启策略配置在演化过程中变得很复杂,主要是除了推荐的 restart-strategy 配置外还有非常多的默认行为。

Re: [DISCUSS] Flink client api enhancement for downstream project


Kostas Kloudas 更新了 Client API 重构的进展,按照开发文档实现 JobClient 和多部署后端的 Executor
的原型已经在开发中。
NEWS

[ANNOUNCE] Apache Flink-shaded 8.0 released


Apache Flink-shaded 8.0 发布,Chesnay Schepler 是本次的 release manager,这个项目为
FLINK 提供了 shaded 的依赖。

[DISCUSS] Releasing Flink 1.8.2


jincheng sun 发起了 FLINK 1.8.2 的发布讨论,有望在近期发布 1.8.2 版本。

Best,
tison.


查看调度到 Task Manager 的 Job

2019-09-01 Thread 163
Dear community

Flink 是否提供了从 TaskManager 的角度查看有哪些任务运行在该 TaskManger 的方法???


背景:
Flink 1.7.0 版本,Flink on YARN
集群中某一个节点突然负载变高,需要定位哪一个 Job 占用了大量的资源,当前的办法是在 UI 的 Running Jobs 
菜单,一个一个任务去找,然而我们有几十个 Job。快疯了……




Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-09-01 Thread Zhu Zhu
Hi Elkhan,

>>Regarding "One optimization that we take is letting yarn to reuse the
flink-dist jar which was localized when running previous jobs."
>>We are intending to use Flink Real-time pipeline for Replay from
Hive/HDFS (from offline source), to have 1 single pipeline for both batch
and real-time. So for batch Flink job, the ?>>containers will be released
once the job is done.
>>I guess your job is real-time flink, so  you can share the  jars from
already long-running jobs.

This optimization is conducted by making flink dist jar a public
distributed cache of YARN.
In this way, the localized dist jar can be shared by different YARN
applications and it will not be removed when the YARN application which
localized it terminates.
This requires some changes in Flink though.
We will open a ISSUE to contribute this optimization to the community.

Thanks,
Zhu Zhu

SHI Xiaogang  于2019年8月31日周六 下午12:57写道:

> Hi Dadashov,
>
> You may have a look at method YarnResourceManager#onContainersAllocated
> which will launch containers (via NMClient#startContainer) after containers
> are allocated.
> The launching is performed in the main thread of YarnResourceManager and
> the launching is synchronous/blocking. Consequently, the containers will be
> launched one by one.
>
> Regards,
> Xiaogang
>
> Elkhan Dadashov  于2019年8月31日周六 上午2:37写道:
>
>> Thanks  everyone for valuable input and sharing  your experience for
>> tackling the issue.
>>
>> Regarding suggestions :
>> - We provision some common jars in all cluster nodes  *-->*  but this
>> requires dependence on Infra Team schedule for handling common jars/updating
>> - Making Uberjar slimmer *-->* tried even with 200 MB Uberjar (half
>> size),  did not improve much. Only 100 containers could started in time.
>> but then receiving :
>>
>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
>> start container.
>> This token is expired. current time is 1566422713305 found 1566422560552
>> Note: System times on machines may be out of sync. Check system time and 
>> time zones.
>>
>>
>> - It would be nice to see FLINK-13184
>>  , but expected
>> version that will get in is 1.10
>> - Increase replication factor --> It would be nice to have Flink conf for
>> setting replication factor for only Fink job jars, but not the output. It
>> is also challenging to set a replication for yet non-existing directory,
>> the new files will have default replication factor. Will explore HDFS cache
>> option.
>>
>> Maybe another option can be:
>> - Letting yet-to-be-started Task Managers (or NodeManagers) download the
>> jars from already started TaskManagers  in P2P fashion, not to have a
>> blocker on HDFS replication.
>>
>> Spark job without any tuning exact same size jar with 800 executors, can
>> start without any issue at the same cluster in less than a minute.
>>
>> *Further questions:*
>>
>> *@ SHI Xiaogang > :*
>>
>> I see that all 800 requests are sent concurrently :
>>
>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>  org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
>> container with resources . Number pending requests
>> 793.
>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>  org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
>> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
>> nativeMemoryInMB=0, networkMemoryInMB=0} for job
>> e908cb4700d5127a0b67be035e4494f7 with allocation id
>> AllocationID{cb016f7ce1eac1342001ccdb1427ba07}.
>>
>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>  org.apache.flink.yarn.YarnResourceManager  - Requesting new TaskExecutor
>> container with resources . Number pending requests
>> 794.
>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-dispatcher-37] INFO
>>  org.apache.flink.yarn.YarnResourceManager  - Request slot with profile
>> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
>> nativeMemoryInMB=0, networkMemoryInMB=0} for job
>> e908cb4700d5127a0b67be035e4494f7 with allocation id
>> AllocationID{71bbb917374ade66df4c058c41b81f4e}.
>> ...
>>
>> Can you please elaborate the part  "As containers are launched and
>> stopped one after another" ? Any pointer to class/method in Flink?
>>
>> *@ Zhu Zhu > *:
>>
>> Regarding "One optimization that we take is letting yarn to reuse the
>> flink-dist jar which was localized when running previous jobs."
>>
>> We are intending to use Flink Real-time pipeline for Replay from
>> Hive/HDFS (from offline source), to have 1 single pipeline for both batch
>> and real-time. So for batch Flink job, the containers will be released once
>> the job is done.
>> I guess your job is real-time flink, so  you can share the  jars from
>> already long-running jobs.
>>
>> Thanks.
>>
>>
>> On Fri, Aug 30, 2019 at 12:46 AM Jeff Zhang  wrote:
>>
>>> I can think of 2 approaches:
>>>
>>> 1. Allow 

Re: [ANNOUNCE] Kinesis connector becomes part of Flink releases

2019-09-01 Thread Yu Li
Great to know, thanks for the efforts Bowen!

And I believe it worth a release note in the original JIRA, wdyt? Thanks.

Best Regards,
Yu


On Sat, 31 Aug 2019 at 11:01, Bowen Li  wrote:

> Hi all,
>
> I'm glad to announce that, as #9494
> was merged today,
> flink-connector-kinesis is officially of Apache 2.0 license now in master
> branch and its artifact will be deployed to Maven central as part of Flink
> releases starting from Flink 1.10.0. Users can use the artifact out of
> shelf then and no longer have to build and maintain it on their own.
>
> It brings a much better user experience to our large AWS customer base by
> making their work simpler, smoother, and more productive!
>
> Thanks everyone who participated in coding and review to drive this
> initiative forward.
>
> Cheers,
> Bowen
>


Re: [SURVEY] Is the default restart delay of 0s causing problems?

2019-09-01 Thread Yu Li
-1 on increasing the default delay to none zero, with below reasons:

a) I could see some concerns about setting the delay to zero in the very
original JIRA (FLINK-2993 )
but later on in FLINK-9158
 we still decided to make
the change, so I'm wondering whether the decision also came from any
customer requirement? If so, how could we judge whether one requirement
override the other?

b) There could be valid reasons for both default values depending on
different use cases, as well as relative work around (like based on latest
policy, setting the config manually to 10s could resolve the problem
mentioned), and from former replies to this thread we could see users have
already taken actions. Changing it back to non-zero again won't affect such
users but might cause surprises to those depending on 0 as default.

Last but not least, no matter what decision we make this time, I'd suggest
to make it final and document in our release note explicitly. Checking the
1.5.0 release note [1] [2] it seems we didn't mention about the change on
default restart delay and we'd better learn from it this time. Thanks.

[1]
https://flink.apache.org/news/2018/05/25/release-1.5.0.html#release-notes
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-notes/flink-1.5.html

Best Regards,
Yu


On Sun, 1 Sep 2019 at 04:33, Steven Wu  wrote:

> +1 on what Zhu Zhu said.
>
> We also override the default to 10 s.
>
> On Fri, Aug 30, 2019 at 8:58 PM Zhu Zhu  wrote:
>
>> In our production, we usually override the restart delay to be 10 s.
>> We once encountered cases that external services are overwhelmed by
>> reconnections from frequent restarted tasks.
>> As a safer though not optimized option, a default delay larger than 0 s
>> is better in my opinion.
>>
>>
>> 未来阳光 <2217232...@qq.com> 于2019年8月30日周五 下午10:23写道:
>>
>>> Hi,
>>>
>>>
>>> I thinks it's better to increase the default value. +1
>>>
>>>
>>> Best.
>>>
>>>
>>>
>>>
>>> -- 原始邮件 --
>>> 发件人: "Till Rohrmann";
>>> 发送时间: 2019年8月30日(星期五) 晚上10:07
>>> 收件人: "dev"; "user";
>>> 主题: [SURVEY] Is the default restart delay of 0s causing problems?
>>>
>>>
>>>
>>> Hi everyone,
>>>
>>> I wanted to reach out to you and ask whether decreasing the default delay
>>> to `0 s` for the fixed delay restart strategy [1] is causing trouble. A
>>> user reported that he would like to increase the default value because it
>>> can cause restart storms in case of systematic faults [2].
>>>
>>> The downside of increasing the default delay would be a slightly
>>> increased
>>> restart time if this config option is not explicitly set.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-9158
>>> [2] https://issues.apache.org/jira/browse/FLINK-11218
>>>
>>> Cheers,
>>> Till
>>
>>


Re: Re:使用flink 1.8.1 部署yarn集群 , 始终有报错

2019-09-01 Thread Yun Tang
Hi

向0.0.0.0:8030 尝试提交作业是因为提交作业时找不到正确的YARN配置,就会向默认的本地8030端口提交,检查一下HADOOP_CONF_DIR 
或者 HADOOP_HOME 这些环境变量有没有设置正确。可以设置一下这些配置文件的目录地址就可以提交作业了。

BTW,这个不是一个Flink的问题,是所有使用YARN管理作业的大数据计算引擎都有可能遇到的问题。

祝好
唐云

From: 周��岗 
Sent: Sunday, September 1, 2019 15:31
To: user-zh@flink.apache.org 
Subject: Re:使用flink 1.8.1 部署yarn集群 , 始终有报错


比较肯定yarn的配置基本是正确的,不知道为何flink始终在通过0.0.0.0 连接yarn scheduler







在 2019-09-01 13:55:50,"周��岗"  写道:
>Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 0 time(s); 
>retry policy is RetryUpToMaximumCountWithFixedSleep
>
>
>
>
>同样的一台电脑使用1.7.2部署就没有问题,有没有大神帮忙看看哪里有问题


Re: Non incremental window function accumulates unbounded state with RocksDb

2019-09-01 Thread Yun Tang
Hi William

I think there might be another possible cause. Since RocksDB would perform 10X 
less than heap state backend. Have you ever checked current watermark of the 
job (from web UI) to see whether window triggered as expected, and whether the 
rocksDB job behaves back pressured? If state have been stayed in the window but 
not triggered, we might meet larger state. (However, it seems still cannot be 
acted a 400 factor larger)

Best
Yun Tang

From: William Jonsson 
Sent: Friday, August 30, 2019 18:22
To: Yun Tang ; user@flink.apache.org 
Subject: Re: Non incremental window function accumulates unbounded state with 
RocksDb


Thanks for your answer Yun.



I agree, I don’t believe that either, however that’s my empirical observation. 
Those statistics are from save points. Basically the jobs are running towards a 
production kafka so no, not exactly the same input. However, these statistics 
are from several runs distributed in time so they should not contain temporal 
effects. There are no failovers in the pipeline during runtime. By doing some 
calculations on the size and the pace of the data in the pipeline (how often we 
receive data and how big the datatype is) yields that the buffered data in the 
windows should be around a little less than 200Mb, so the HeapBackend behaves 
accordingly. I agree, the space amplification can’t be a factor of 400 and 
still continue growing for the RocksDb. I’ve spent some time trying to figure 
this out, if we are doing anything obscure , but I cant find anything. So it 
would be interesting if anyone have the same experience as I have.



The pipeline is currently running on Flink 1.7.2



Best regards and wish you a pleasant day,

William



From: Yun Tang 
Date: Friday, 30 August 2019 at 11:42
To: William Jonsson , "user@flink.apache.org" 

Cc: Fleet Perception for Maintenance 

Subject: Re: Non incremental window function accumulates unbounded state with 
RocksDb



Hi William



I don't believe the same job would have 70~80GB state for RocksDB while it's 
only 200MB for HeapStateBackend even though RocksDB has some space 
amplification. Are you sure the job received the same input throughput with 
different state backends and they both run well without any failover? Could you 
take a savepoint for the job with different state backends and compare the size 
of the savepoints? What's more, what version of Flink did you use?



Best

Yun Tang



From: William Jonsson 
Sent: Friday, August 30, 2019 17:04
To: user@flink.apache.org 
Cc: Fleet Perception for Maintenance 

Subject: Non incremental window function accumulates unbounded state with 
RocksDb



Hello,

I have a Flink pipeline reading data from Kafka which is keyed (in the 
pseudocode example below it is keyed on the first letter in the string, in our 
pipeline it is keyed on a predefined key) and processed in sliding windows with 
a duration of 60m every 10:th minute. The time setting is eventTime and the 
windows processes the data when the window should fire, there are no 
incremental processing of the windowed data.

When running with a Heap backend the state behaves “normally”, i.e it stays 
within the data size that is expected when the windows have buffered the data 
(~200 Mb for this application) and is bounded to around this size independent 
of the lifetime of the processing pipeline. However, if the state backend is 
changed to the RocksDb backend the states starts to grow indefinitely (is our 
observation, we haven’t seen it stop growing at least) to 70-80 Gb in just 
above of a month runtime.

I made a save point of the state and downloaded it and analysed which shown 
that the state consisted of data from the whole lifetime of the pipeline, of 
about equal size for each day. I interpret this as the state has accumulated 
the old data which should’ve been deleted during the clearing of the windows. 
It is worth noting that the state consists of the input Strings only, so it 
should have nothing to do with the histogram calculation?

I have tried to reduce the level0_num_of_files_compation_trigger to be 1 and 
the base file size as well as the target file size to trigger more compactions 
in the hope of that the compactions would remove the obsolete data which 
rendered in no improvement at all (it basically got worse).

Please see the pseudocode below for a code example. The pipeline is more 
complex than this and is running on other classes than String input and a 
“histogram” output class. Do you have any input or ideas how the state could be 
manageable in the Heap case but totally unhandleable during the RocksDb version?

Best regards,

William



class Histogram extends WindowFunction[String, Histogram, TimeWindow] {

def process (key : T, window: TimeWindow, input : Itrable[String]) = {

 //Calculate the histogram

}

override def apply(key : T, window: TimeWindow, input : Iterable[String], out: 

Re:使用flink 1.8.1 部署yarn集群 , 始终有报错

2019-09-01 Thread 周虓岗

比较肯定yarn的配置基本是正确的,不知道为何flink始终在通过0.0.0.0 连接yarn scheduler







在 2019-09-01 13:55:50,"周虓岗"  写道:
>Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 0 time(s); 
>retry policy is RetryUpToMaximumCountWithFixedSleep
>
>
>
>
>同样的一台电脑使用1.7.2部署就没有问题,有没有大神帮忙看看哪里有问题