Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating)

2023-03-27 文章 Biao Liu
Congrats!

Thanks,
Biao /'bɪ.aʊ/



On Tue, 28 Mar 2023 at 10:29, Hang Ruan  wrote:

> Congratulations!
>
> Best,
> Hang
>
> yu zelin  于2023年3月28日周二 10:27写道:
>
>> Congratulations!
>>
>> Best,
>> Yu Zelin
>>
>> 2023年3月27日 17:23,Yu Li  写道:
>>
>> Dear Flinkers,
>>
>>
>>
>> As you may have noticed, we are pleased to announce that Flink Table Store 
>> has joined the Apache Incubator as a separate project called Apache 
>> Paimon(incubating) [1] [2] [3]. The new project still aims at building a 
>> streaming data lake platform for high-speed data ingestion, change data 
>> tracking and efficient real-time analytics, with the vision of supporting a 
>> larger ecosystem and establishing a vibrant and neutral open source 
>> community.
>>
>>
>>
>> We would like to thank everyone for their great support and efforts for the 
>> Flink Table Store project, and warmly welcome everyone to join the 
>> development and activities of the new project. Apache Flink will continue to 
>> be one of the first-class citizens supported by Paimon, and we believe that 
>> the Flink and Paimon communities will maintain close cooperation.
>>
>>
>> 亲爱的Flinkers,
>>
>>
>> 正如您可能已经注意到的,我们很高兴地宣布,Flink Table Store 已经正式加入 Apache
>> 孵化器独立孵化 [1] [2] [3]。新项目的名字是
>> Apache 
>> Paimon(incubating),仍致力于打造一个支持高速数据摄入、流式数据订阅和高效实时分析的新一代流式湖仓平台。此外,新项目将支持更加丰富的生态,并建立一个充满活力和中立的开源社区。
>>
>>
>> 在这里我们要感谢大家对 Flink Table Store
>> 项目的大力支持和投入,并热烈欢迎大家加入新项目的开发和社区活动。Apache Flink 将继续作为 Paimon 支持的主力计算引擎之一,我们也相信
>> Flink 和 Paimon 社区将继续保持密切合作。
>>
>>
>> Best Regards,
>> Yu (on behalf of the Apache Flink PMC and Apache Paimon PPMC)
>>
>> 致礼,
>> 李钰(谨代表 Apache Flink PMC 和 Apache Paimon PPMC)
>>
>> [1] https://paimon.apache.org/
>> [2] https://github.com/apache/incubator-paimon
>> [3] https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal
>>
>>
>>


Re: [ANNOUNCE] Dian Fu becomes a Flink committer

2020-01-16 文章 Biao Liu
Congrats!

Thanks,
Biao /'bɪ.aʊ/



On Fri, 17 Jan 2020 at 13:43, Rui Li  wrote:

> Congratulations Dian, well deserved!
>
> On Thu, Jan 16, 2020 at 5:58 PM jincheng sun 
> wrote:
>
>> Hi everyone,
>>
>> I'm very happy to announce that Dian accepted the offer of the Flink PMC
>> to become a committer of the Flink project.
>>
>> Dian Fu has been contributing to Flink for many years. Dian Fu played an
>> essential role in PyFlink/CEP/SQL/Table API modules. Dian Fu has
>> contributed several major features, reported and fixed many bugs, spent a
>> lot of time reviewing pull requests and also frequently helping out on the
>> user mailing lists and check/vote the release.
>>
>> Please join in me congratulating Dian for becoming a Flink committer !
>>
>> Best,
>> Jincheng(on behalf of the Flink PMC)
>>
>
>
> --
> Best regards!
> Rui Li
>


Re: Error:java: 无效的标记: --add-exports=java.base/sun.net.util=ALL-UNNAMED

2019-11-13 文章 Biao Liu
Hi,

I have encountered the same issue when setting up a dev environment.

It seems that the my Intellij (2019.2.1) unexpectedly activates java11
profile of maven. It doesn't match the Java compiler (JDK8). I'm not sure
why it happened silently.

So for me, the solution is "Intellij" -> "View" -> "Tool Windows" ->
"Maven" -> "Profiles" -> uncheck the "java11" -> reimport maven project.

Thanks,
Biao /'bɪ.aʊ/



On Mon, 4 Nov 2019 at 18:01, OpenInx  wrote:

> Hi
> I met the same problem before. After some digging,  I find that the idea
> will detect the JDK version
> and choose whether to use the jdk11 option to run the flink maven building.
> if you are in jdk11 env,  then
> it will add the option --add-exports when maven building in IDEA.
>
> For my case,  I was in IntelliJIdea2019.2 which depends on the jdk11, and
> once I re-import the flink
> modules then the IDEA will add the --add-exports flag even if  I removed
> all the flags in .idea/compile.xml
> explicitly.  I noticed that the Intellij's JDK affected the flink maven
> building, so I turned to use the Intellij with JDK8
> bundled,  then the problem was gone.
>
> You can verify it, and if  it's really the same. can just replace your IDEA
> with the pkg suffix with "with bundled JBR 8" in
> here [1].
> Say if you are using MacOS, then should download the package "2019.2.4 for
> macOS with bundled JBR 8 (dmg)"
>
> Hope it works for you
> Thanks.
>
> [1]. https://www.jetbrains.com/idea/download/other.html
>
>
> On Mon, Nov 4, 2019 at 5:44 PM Till Rohrmann  wrote:
>
> > Try to reimport that maven project. This should resolve this issue.
> >
> > Cheers,
> > Till
> >
> > On Mon, Nov 4, 2019 at 10:34 AM 刘建刚  wrote:
> >
> > >   Hi, I am using flink 1.9 in idea. But when I run a unit test in
> > idea.
> > > The idea reports the following error:"Error:java: 无效的标记:
> > > --add-exports=java.base/sun.net.util=ALL-UNNAMED".
> > >   Everything is ok when I use flink 1.6. I am using jdk 1.8. Is it
> > > related to the java version?
> > >
> >
>


Re: Mac操作系统下Ask timed out问題

2019-11-04 文章 Biao Liu
你好,

MacOS 可以跑 Flink,我自己刚试了下,复制你的命令就可以跑。
建议再查一下你本地的环境,你本地的 Flink 是自己编译的吗?如果不行试一下 Flink 提供的 binary 包 [1]?

[1] https://flink.apache.org/downloads.html

Thanks,
Biao /'bɪ.aʊ/



On Tue, 5 Nov 2019 at 12:30, jeff kit  wrote:

> HI,大家好:
> 我在运行Flink官方的Quick
> Start就遇到了问題。为了避免自己问蠢问題,我先做了很多尝试,如换Flink的版本,从1.7到1.8及至1.9都试过,在我自己的Mac OS
> X上这个问題是必然出现的,而换到其他操作系统例如Windows,则是正常的。
>
> 这也许不是一个常见的问題,更多是我本机的运行环境问題,但多天尝试下来仍然没有找到解决方法,才在这里求助一下。
>
> 操作步骤:
> 1. ./bin/start-cluster.sh  # 启动flink。
> 2. ./bin/flink run examples/batch/WordCount.jar   # 提交wordCount 包
>
> 随后就是抛了异常:
> Starting execution of program
> Executing WordCount example with default input data set.
> Use --input to specify file input.
> Printing result to stdout. Use --output to specify output path.
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: Could not
> retrieve the execution result. (JobID: 81bc8720dee57710788cc8e41079ba4d)
> at
>
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
> at
>
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
> at
>
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:820)
> at org.apache.flink.api.java.DataSet.collect(DataSet.java:413)
> at org.apache.flink.api.java.DataSet.print(DataSet.java:1652)
> at
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:88)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at
>
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at
> org.apache.flink.client.cli.CliFrontend$$Lambda$31/1990451863.call(Unknown
> Source)
> at
>
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
> at
>
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382)
> at
>
> org.apache.flink.client.program.rest.RestClusterClient$$Lambda$44/1067599825.apply(Unknown
> Source)
> at
>
> java.util.concurrent.CompletableFuture$ExceptionCompletion.run(CompletableFuture.java:1246)
> at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
> at
>
> java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
> at
>
> java.util.concurrent.CompletableFuture$ThenApply.run(CompletableFuture.java:723)
> at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
> at
>
> java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
> at
>
> java.util.concurrent.CompletableFuture$ThenCopy.run(CompletableFuture.java:1333)
> at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
> at
>
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2361)
> at
>
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
> at
>
> org.apache.flink.runtime.concurrent.FutureUtils$$Lambda$63/31844.accept(Unknown
> Source)
> at
>
> java.util.concurrent.CompletableFuture$WhenCompleteCompletion.run(CompletableFuture.java:1298)
> at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193)
> at
>
> java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210)
> at
>
> java.util.concurrent.CompletableFuture$AsyncCompose.exec(CompletableFuture.java:626)
> at
>
> java.util.concurrent.CompletableFuture$Async.run(CompletableFuture.java:428)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(

Re: 在 Trigger 里可以知道 Window 中数据的状况吗

2019-11-04 文章 Biao Liu
你好,

countWindow [1] 能满足你的需求吗?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/datastream/KeyedStream.html#countWindow-long-

Thanks,
Biao /'bɪ.aʊ/



On Tue, 5 Nov 2019 at 14:01, Utopia  wrote:

> 大家好,
>
> 我想根据 Window 中数据的信息,比如数据的数量来决定是否 Fire,应该怎么实现呢?是否必须自己维护这样的状态。
>
> Best  regards
> Utopia
>


Fwd: 从 state 中恢复数据,更改 yarn container 个数会有影响吗

2019-11-04 文章 Biao Liu
Hi, this topic should be sent to user-zh mailing list. Just forward there.

Thanks,
Biao /'bɪ.aʊ/



-- Forwarded message -
From: Yun Tang 
Date: Tue, 5 Nov 2019 at 13:20
Subject: Re: 从 state 中恢复数据,更改 yarn container 个数会有影响吗
To: wangl...@geekplus.com.cn , user <
u...@flink.apache.org>


Hi



首先先判断作业是否在不断地failover,是否有“maximum parallelism”
相关的异常,如果有,说明因为改了并发度而不兼容,实际作业一直都没有从checkpoint正常恢复。



如果作业成功地从checkpoint恢复了,再判断是不是因为task端正在因为正在改并发而导致恢复数据中,如果你的state
比较大,这一步骤可能会比较耗时,一般这种情况是source端消费了数据,但是无法向下游发送,整个作业看上去像是一直卡在那边。可以通过task端的
jstak看调用栈,看是否有restore相关的栈hang住。



如果以上都不是,那请自行jstack看一下source和下游task的CPU在进行什么操作,再做之后的判断。



祝好

唐云





*From: *"wangl...@geekplus.com.cn" 
*Date: *Tuesday, November 5, 2019 at 11:48 AM
*To: *user 
*Subject: *从 state 中恢复数据,更改 yarn container 个数会有影响吗





从 RocketMQ 中消费数据做处理。

代码中最大的并行度为 8, 提交任务时指定 -ys 4 ,会自动分配 2 个 container 作为 taskMgr

运行一段时间后以 savepoint 方式停止。

再从 savepoint 恢复,此时指定 -ys 2 , 会分配 4 个container 作为 taskMgr , 但任务提交后程序不从
RocketMQ 消费数据了,消费 TPS 一直是 0,这是什么原因呢?





谢谢,

王磊




--

wangl...@geekplus.com.cn


Re: [metrics] metrics 中 Availability 和 Checkpointing 这两组没有显示

2019-11-04 文章 Biao Liu
你好,

JM 的 metric 应该也会直接 report。
可以考虑缩小下问题范围,是 metrics 还是 reporter 的问题。
例如加个 slf4j reporter [1],看下 JM log 中有没有相应的 metrics,如果有那就是 reporter 的问题。

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter

Thanks,
Biao /'bɪ.aʊ/



On Tue, 22 Oct 2019 at 17:37, Blake  wrote:

> 使用 PrometheusReporter  去 report metric 信息
> 发现:9250 端口 没有显示 Availability 和 Checkpointing 这两部分的信息
> 是需要单独配置吗?在文档里面没有看到相关说明。
> 我注意到:这两个的 scope 都是 Job (only available on JobManager)
> 是要在启动时,指定额外的参数吗?
>
>
>
>
> 配置如下:
> flink-conf.yml
> metrics.reporters: prom
>
> metrics.reporter.prom.class:
> org.apache.flink.metrics.prometheus.PrometheusReporter
> metrics.reporter.prom.port: 9250
>
> metrics.system-resource: true
>
>
>
>
>
>
>
> docker-compose.yml 局部:
> services:
>   jobmanager:
> # image: flink:1.9.0
> build: ./job_manager
> container_name: jobmanager_1.9.0
> volumes:
>   - ./prometheus/:/etc/prometheus/
>   - prometheus_data:/prometheus
> ports:
>   - "8081:8081"
>   - "9250:9250"
> expose:
>   - "6123"
> networks:
>   - back-tier
>   # - host-tier
> command: jobmanager
> environment:
>   - JOB_MANAGER_RPC_ADDRESS=jobmanager
>   taskmanager:
> # image: flink:1.9.0
> build: ./task_manager
> container_name: taskmanager_1.9.0
> ports:
>   # - "9001:9001"
>   - "9251:9251"
> expose:
>   - "6121"
>   - "6122"
> networks:
>   - back-tier
>   # - host-tier
> command: taskmanager
> depends_on:
>   - jobmanager
> environment:
>   - JOB_MANAGER_RPC_ADDRESS=jobmanager
>
>
>
>
> Docker
> FROM flink:1.9.0
>
> COPY flink-conf.yaml ./conf/
>
> RUN cp ./opt/flink-metrics-prometheus-1.9.0.jar ./lib/


Re: Re: 怎样把 state 定时写到外部存储

2019-11-04 文章 Biao Liu
你好,

对你的问题描述有一些疑问

> 每一条消息来都会更改 state 值,如果每一条消息来都写外部存储下游撑不住
> 有没有什么方式可以定期读 state 写到外部存储?

这里是什么意思呢?更改 state 值和写外部系统存储应该是两个独立的事件。state 是 Flink 内部使用的,给外部系统使用的数据一般通过
sink 写出去,和 state 没有直接关系。

从你的描述中,只看到貌似是写 Mysql (是通过 sink 吗?) 扛不住。批量写一下?比如在 sink 中处理一下

如果没理解对你的问题,你可以再详细描述一下

Thanks,
Biao /'bɪ.aʊ/



On Fri, 1 Nov 2019 at 11:21, misaki L  wrote:

> 使用 window 聚合一下批量写呢?
>
> wangl...@geekplus.com.cn  于2019年11月1日周五
> 上午10:17写道:
>
> > Hi Congxian,
> >
> > 以 sink 的方式写出去最终也要落在某个地方才能供查询使用啊。
> > 我们的 case 是写到 MySQL 中
> >
> >
> >
> > wangl...@geekplus.com.cn
> >
> > Sender: Congxian Qiu
> > Send Time: 2019-11-01 10:10
> > Receiver: user-zh
> > Subject: Re: 怎样把 state 定时写到外部存储
> > 好奇为什么要把 State 定期写到外存呢?是外部系统需要使用这些 State 吗?那为什么不把 State 以 sink 的方式写出去呢?
> >
> > Best,
> > Congxian
> >
> >
> > Jun Zhang <825875...@qq.com> 于2019年10月31日周四 上午10:36写道:
> >
> > > 是否可以注册一个定时器?
> > >
> > >
> > > 你看看这个文章,是否对你有帮助
> > >
> > >
> > > https://mp.weixin.qq.com/s/VUEvvoHoupZMpxWQsEeHEA
> > >  在2019年10月31日 10:16,wangl...@geekplus.com.cn<
> > wangl...@geekplus.com.cn>
> > > 写道:
> > >
> > >
> > > 消息驱动,QPS 很高, 每一条消息来都会更改 state 值,如果每一条消息来都写外部存储下游撑不住。
> > > 有没有什么方式可以定期读 state 写到外部存储?
> > > 我现在用的是 Flink1.7.2 版本。
> > >
> > >
> > >
> > >
> > >
> > > wangl...@geekplus.com.cn
> >
>


Re: Could not forward element to next operator

2019-09-28 文章 Biao Liu
问题可能出在被省略的部分,检查一下最底层的 caused by

Thanks,
Biao /'bɪ.aʊ/



On Sun, 29 Sep 2019 at 13:17, <18612537...@163.com> wrote:

> 我看过这个我不是这个问题,作业没有设置水印,作业可以正常运行,最近可能是运行一天多会报这个异常
>
> 发自我的 iPhone
>
> > 在 2019年9月29日,上午11:49,Wesley Peng  写道:
> >
> > Hello,
> >
> > May this article match your issue?
> > https://blog.csdn.net/qq_41910230/article/details/90411237
> >
> > regards.
> >
> >> On Sun, Sep 29, 2019 at 10:33 AM allan <18612537...@163.com> wrote:
> >>
> >> Hi,
> >>
> >> 最近发现作业一直在报错,我的窗口是一分钟的窗口。这是什么原因,谁能帮助一下?flink版本1.6 ,错误如下:
> >>
> >>
> >>
> >>
> TimerException{org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> >> Could not forward element to next operator}
> >>
> >>   at
> >>
> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:288)
> >>
> >>   at
> >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >>
> >>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >>
> >>   at
> >>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >>
> >>   at
> >>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >>
> >>   at
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >>
> >>   at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >>
> >>   at java.lang.Thread.run(Thread.java:748)
> >>
> >> Caused by:
> >>
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> >> Could not forward element to next operator
> >>
> >>   at org.apache.flink.streaming.
> >>
> >>
> >>
> runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
> >>
> >>   at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> >>
> >>   at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> >>
> >>   at
> >>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:689)
>
>
>


Re: task-manager中taskslot的classloader隔离问题

2019-09-28 文章 Biao Liu
同一个 TM 中,相同 job 的 subtask 会共享一个 classloader

Thanks,
Biao /'bɪ.aʊ/



On Sat, 28 Sep 2019 at 09:30, Ever <439674...@qq.com> wrote:

> 有一个job有2个task,每个task分别有3个subtask(并行度为3), 如下图所示。
>
> 每个subtask会占用一个taskslot, 但是同一个job的不同task的subtask可以共享同一个taskslot,
> 所以这里应该是一个taskslot会有2个subtask。
> 那么这两个share同一个taskslot的subtask, 其classloader是同一个,
> 还是说每个subtask都有不同的classloader呢?
>
> 因为我的job中会用到一个静态类(Scala的Object或者java中的单例类),
> 类里面有个包含基础数据的集合成员变量。我想知道这个变量是需要在每个subtask中初始化, 还是只需要在jvm范围内初始化一次。
>
>


Re: yarn application -kill application_1565677682535_0431直接去kill任务,未正常关闭文件

2019-09-25 文章 Biao Liu
不要用 yarn application kill,flink cancel 可以满足需求吗?

Thanks,
Biao /'bɪ.aʊ/



On Mon, 9 Sep 2019 at 11:54, 646208563  wrote:

> 大家好,我flink任务运行在yarn上,然后sink里面是写hdfs文件,close方面关闭文件,然后我发现当我用yarn application
> -kill
> application_1565677682535_0431去kill任务的时候,sink的close方法未执行,导致我文件未正常关闭。请问下这个问题怎么去避免?


Re: Flink大state读取磁盘,磁盘IO打满,任务相互影响的问题探讨

2019-09-22 文章 Biao Liu
Hello,

IO 量这么大符合预期吗?而且是读硬盘打满。
有没有尝试过调优?
1. 业务方面的调优,例如对 state 的使用是否合理
2. 系统层面的调优,例如 incremental checkpoint [1]

1.
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/checkpointing.html#state-backend-incremental

Thanks,
Biao /'bɪ.aʊ/



On Tue, 10 Sep 2019 at 14:39, Wesley Peng  wrote:

>
>
> on 2019/9/10 13:47, 蒋涛涛 wrote:
> > 尝试手段:
> >
> > 1. 手动迁移IO比较高的任务到其他机器,但是yarn任务提交比较随机,只能偶尔为之
> >
> > 2. 目前没有SSD,只能用普通STATA盘,目前加了两块盘提示磁盘IO能力,但是单盘对单任务的磁盘IO瓶颈还在
> >
> > 还有哪些策略可以解决或者缓解么?
>
> It seems the tricks to improve RocksDB's throughput might be helpfu.
>
> With writes and reads accessing mostly the recent data, our goal is to
> let them stay in memory as much as possible without using up all the
> memory on the server. The following parameters are worth tuning:
>
> Block cache size: When uncompressed blocks are read from SSTables, they
> are cached in memory. The amount of data that can be stored before
> eviction policies apply is determined by the block cache size. The
> bigger the better.
>
> Write buffer size: How big can Memtable get before it is frozen.
> Generally, the bigger the better. The tradeoff is that big write buffer
> takes more memory and longer to flush to disk and to recover.
>
> Write buffer number: How many Memtables to keep before flushing to
> SSTable. Generally, the bigger the better. Similarly, the tradeoff is
> that too many write buffers take up more memory and longer to flush to
> disk.
>
> Minimum write buffers to merge: If most recently written keys are
> frequently changed, it is better to only flush the latest version to
> SSTable. This parameter controls how many Memtables it will try to merge
> before flushing to SSTable. It should be less than the write buffer
> number. A suggested value is 2. If the number is too big, it takes
> longer to merge buffers and there is less chance of duplicate keys in
> that many buffers.
>
> The list above is far from being exhaustive, but tuning them correctly
> can have a big impact on performance. Please refer to RocksDB’s Tuning
> Guide for more details on these parameters. Figuring out the optimal
> combination of values for all of them is an art in itself.
>
> please ref: https://klaviyo.tech/flinkperf-c7bd28acc67
>
> regards.
>


Re: 关于Async I/O的exactly-once

2019-09-22 文章 Biao Liu
1. 首先你描述的场景,不只存在于 Async IO operator,其他 operator 也会有类似问题
2. Flink 的 exactly once 是针对 Flink 内部而言,例如 state 等,[1]
3. 如果你想针对外部系统也保证 exactly once 语义,需要对应的 connector 支持 [2]

1.
https://ci.apache.org/projects/flink/flink-docs-release-1.9/internals/stream_checkpointing.html
2.
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/guarantees.html

Thanks,
Biao /'bɪ.aʊ/



On Wed, 4 Sep 2019 at 12:58, star <3149768...@qq.com> wrote:

> 感谢您的回复,异步io只能针对失败的异步请求 重发,而不能保证每个请求只发一次,那应该是at least once?附文档:The
> asynchronous I/O operator offers full exactly-once fault tolerance
> guarantees. It stores the records for in-flight asynchronous requests in
> checkpoints and restores/re-triggers the requests when recovering from a
> failure.
>
> 发自我的iPhone
>
>
> -- 原始邮件 --
> 发件人: Dino Zhang  发送时间: 2019年9月4日 09:20
> 收件人: user-zh  主题: 回复:关于Async I/O的exactly-once
>
>
>
> hi star,
>
> exactly-once指flink内部的,要保证end-to-end
> exactly可以通过两阶段提交,需要实现TwoPhaseCommitSinkFunction,或者做幂等处理
>
> On Wed, Sep 4, 2019 at 8:20 AM star <3149768...@qq.com> wrote:
>
> >
> 看文档我的理解是会将异步的请求保存在检查点中,failover的时候重新触发请求。我的问题是既然是重新触发请求,并没有回滚,那之前的请求已经对外部系统造成影响了,不就是at
> > least-once了吗?
> > 比如ck1:发送了a b c三个请求更新外部数据库,ck2:发送:d,e,f。假设ck1做完了checkpoint,a
> ,b请求成功,c没成功.
> >
> >
> >
> ck2在执行到e的时候任务被cancel了,但c,d都已经成功了。那么我重新启动的时候从最近一次成功的ck1拉起,c,d岂不是又要被重新请求一次
> >
> >
> > 谢谢
> >
> > 发自我的iPhone
>
>
>
> --
> Regards,
> DinoZhang


Re: build Flink master brach fail due to npm

2019-08-20 文章 Biao Liu
HI,

Could you try adding "-Dskip.npm" when building the project?

This is just a work-around way to skip this installation. Though I don't
have much experience of npm. Maybe someone else could help.

Thanks,
Biao /'bɪ.aʊ/



On Tue, 20 Aug 2019 at 16:23, 刘波 <1091643...@qq.com> wrote:

> Hi, users!
>
> When I'm build the Flink master branch  from github,  it's failed due to
> the flink-runtime-web module.
>   nodejs v10.15.2
>   npm6.10.3
>   Javaopenjdk version "11.0.4" 2019-07-16
>   maven 3.6.0
>
> anyone got the same problem? any suggestions to  resolve it... thanks~
>
>
> [INFO]
> [INFO] --- maven-remote-resources-plugin:1.5:process
> (process-resource-bundles) @ flink-runtime-web_2.11 ---
> [INFO]
> [INFO] --- frontend-maven-plugin:1.6:install-node-and-npm (install node
> and npm) @ flink-runtime-web_2.11 ---
> [INFO] Node v10.9.0 is already installed.
> [INFO]
> [INFO] --- frontend-maven-plugin:1.6:npm (npm install) @
> flink-runtime-web_2.11 ---
> [INFO] Running 'npm ci --cache-max=0 --no-save' in
> /home/ben/work/flink/flink-runtime-web/web-dashboard
> [WARNING] npm WARN prepare removing existing node_modules/ before
> installation
> [INFO]
> [INFO] > fsevents@1.2.7 install
> /home/ben/work/flink/flink-runtime-web/web-dashboard/node_modules/fsevents
> [INFO] > node install
> [INFO]
> [ERROR] Aborted (core dumped)
> [INFO]
> [INFO] > node-sass@4.11.0 install
> /home/ben/work/flink/flink-runtime-web/web-dashboard/node_modules/node-sass
> [INFO] > node scripts/install.js
> [INFO]
> [INFO] Cached binary found at
> /home/ben/.npm/node-sass/4.11.0/linux-x64-64_binding.node
> [ERROR] Aborted (core dumped)
> [INFO]
> [INFO] > husky@1.3.1 install
> /home/ben/work/flink/flink-runtime-web/web-dashboard/node_modules/husky
> [INFO] > node husky install
> [INFO]
> [INFO] husky > setting up git hooks
> [INFO] HUSKY_SKIP_INSTALL environment variable is set to 'true', skipping
> Git hooks installation.
> [ERROR] Aborted (core dumped)
> [ERROR] npm ERR! code ELIFECYCLE
> [ERROR] npm ERR! errno 134
> [ERROR] npm ERR! husky@1.3.1 install: `node husky install`
> [ERROR] npm ERR! Exit status 134
> [ERROR] npm ERR!
> [ERROR] npm ERR! Failed at the husky@1.3.1 install script.
> [ERROR] npm ERR! This is probably not a problem with npm. There is likely
> additional logging output above.
> [ERROR]
> [ERROR] npm ERR! A complete log of this run can be found in:
> [ERROR] npm ERR!
>  /home/ben/.npm/_logs/2019-08-20T07_59_08_468Z-debug.log
> [INFO]
> 
>


Re: need help

2019-08-08 文章 Biao Liu
你好,

异常里可以看出 AskTimeoutException, 可以调整两个参数 akka.ask.timeout 和 web.timeout
再试一下,默认值如下

akka.ask.timeout: 10 s
web.timeout: 1

PS: 搜 “AskTimeoutException Flink” 可以搜到很多相关答案

Thanks,
Biao /'bɪ.aʊ/



On Thu, Aug 8, 2019 at 7:33 PM 陈某  wrote:

>
>
> -- Forwarded message -
> 发件人: 陈某 
> Date: 2019年8月8日周四 下午7:25
> Subject: need help
> To: 
>
>
> 你好,我是一个刚接触flink的新手,在搭建完flink on
> yarn集群后,依次启动zookeeper,hadoop,yarn,flkink集群,并提交认识到yarn上时运行遇到问题,网上搜索相关问题,暂未找到解决方式,希望能得到帮助,谢谢。
>
> 使用的运行指令为:
> [root@flink01 logs]# flink run -m  yarn-cluster
> ./examples/streaming/WordCount.jar
> 查看log后错误信息如下:(附件中为完整的log文件)
> org.apache.flink.client.program.ProgramInvocationException: Could not
> retrieve the execution result. (JobID: 91e82fd8626bde4c901bf0b1639e12e7)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:483)
> at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> at
> org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:89)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:423)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:388)
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:208)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> [Internal server error.,  akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/dispatcher#2035575525]] after [1 ms].
> Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
> at
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
> at
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverSchedul

Re: Flink官网barrier疑问

2019-08-07 文章 Biao Liu
你好,范瑞

Barrier alignment 这里并不会涉及 output/input queue,pending 的只是用于 alignment
的一小部分数据。

如果想了解 checkpoint 的原理,建议阅读文档中引用的两篇论文。[1] [2]

如果想了解 Flink 的具体实现,这里的文档是 internal 部分的,可能需要阅读一下相关代码了。[3] [4]

1. https://arxiv.org/abs/1506.08603
2.
https://www.microsoft.com/en-us/research/publication/distributed-snapshots-determining-global-states-distributed-system/?from=https%3A%2F%2Fresearch.microsoft.com%2Fen-us%2Fum%2Fpeople%2Flamport%2Fpubs%2Fchandy.pdf
3.
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
4.
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java

Thanks,
Biao /'bɪ.aʊ/



On Wed, Aug 7, 2019 at 2:11 PM ❄ <836961...@qq.com> wrote:

> Hi,老师:
> 老师,你好flink官网这个页面(
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/internals/stream_checkpointing.htm)介绍barrier对齐的这里第三步
>  •  Once the last stream has received barrier n, the operator emits all
> pending outgoing records, and then emits snapshot n barriers itself.
>
> 这句话,说的是一旦接受到上游所有流的barrier n,这个Operator实例会发送所有 pending的输出记录,然后发送
> 把自己的barrier n发出去到下游。这里的pending的输出记录是指什么数据?是指barrier之前的那些还在Output
> Queue中的数据吗?不是barrier之后的数据吧,因为精准一次语义的话,snapshot之前,barrier之后的数据应该还没开始处理,等同步快照结束后才能开始处理。如果这里指的是barrier之前那些还在Output
> Queue中的数据,那么也不能马上把这些数据发出去吧,应该还要考虑下游的Input Queue中有足够空间
>
>
>
> 望解答,谢谢老师!
>
> 范瑞


Re: jobmanager 日志异常

2019-08-05 文章 Biao Liu
你好,

> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED
> SIGNAL 15: SIGTERM. Shutting down as requested.

这是收到了 signal 15 了 [1],Wong 说得对,搜一下 yarn node manager 或者 yarn resource
manager 的 log

1. https://access.redhat.com/solutions/737033

Thanks,
Biao /'bɪ.aʊ/



On Tue, Aug 6, 2019 at 12:30 PM Wong Victor 
wrote:

> Hi,
>   可以查看一下jobmanager所在节点的yarn log,搜索一下对应的container为什么被kill;
>
> Regards
>
> On 2019/8/6, 11:40 AM, "戴嘉诚"  wrote:
>
> 大家好:
>
>
>
> 我的flink是部署在yarn上左session,今天早上jobmanager自动退出了,然后yarn把他重新拉起了,导致里面跑的job重新启动了,但是我查看日志,看到jobmanager的日志没有任何异常,同时jobmanager也没有长时间的full
> gc和频繁的gc,以下是jobmanager的日志:
> 就是在06:44分的是偶,日志上标记了收收到停止请求,然后jobmanager直接停止了...请问是由于什么原因导致的呢?
>
> 2019-08-06 06:43:58,891 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7843 for job e49624208fe771c4c9527799fd46f2a3 (5645215
> bytes in
> > 801 ms).
> > 2019-08-06 06:43:59,336 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7852 @ 1565045039321 for job
> a9a7464ead55474bea6f42ed8e5de60f.
> > 2019-08-06 06:44:00,971 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7852 @ 1565045040957 for job
> 79788b218e684cb31c1ca0fcc641e89f.
> > 2019-08-06 06:44:01,357 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7852 for job a9a7464ead55474bea6f42ed8e5de60f (25870658
> bytes in
> > 1806 ms).
> > 2019-08-06 06:44:02,887 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7852 for job 79788b218e684cb31c1ca0fcc641e89f (29798945
> bytes in
> > 1849 ms).
> > 2019-08-06 06:44:05,101 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7852 @ 1565045045092 for job
> 03f3a0bd53c21f90f70ea01916dc9f78.
> > 2019-08-06 06:44:06,547 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7844 @ 1565045046522 for job
> 486a1949d75863f823013d87b509d228.
> > 2019-08-06 06:44:07,311 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7844 for job 486a1949d75863f823013d87b509d228 (62458942
> bytes in
> > 736 ms).
> > 2019-08-06 06:44:07,506 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7852 for job 03f3a0bd53c21f90f70ea01916dc9f78 (105565032
> bytes
> > in 2366 ms).
> > 2019-08-06 06:44:08,087 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7853 @ 1565045048055 for job
> 32783d371464265ef536454055ae6182.
> > 2019-08-06 06:44:09,626 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Checkpoint
> > 7050 of job 4b542195824ff7b7cdf749543fd368cb expired before
> completing.
> > 2019-08-06 06:44:09,647 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7051 @ 1565045049626 for job
> 4b542195824ff7b7cdf749543fd368cb.
> > 2019-08-06 06:44:12,006 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7853 for job 32783d371464265ef536454055ae6182 (299599482
> bytes
> > in 3912 ms).
> > 2019-08-06 06:44:12,972 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7853 @ 1565045052962 for job
> 16db5afe9a8cd7c6278030d5dec4c80c.
> > 2019-08-06 06:44:13,109 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7853 @ 1565045053080 for job
> 9c1394a2d2ff47c7852eff9f1f932535.
> > 2019-08-06 06:44:16,779 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7853 for job 16db5afe9a8cd7c6278030d5dec4c80c (152643149
> bytes
> > in 3666 ms).
> > 2019-08-06 06:44:18,598 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7828 for job 8df2b47f2a4c1ba0f7019ee5989f6e71 (837558245
> bytes
> > in 23472 ms).
> > 2019-08-06 06:44:19,193 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7853 for job 9c1394a2d2ff47c7852eff9f1f932535 (594628825
> bytes
> > in 6067 ms).
> > 2019-08-06 06:44:19,238 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 5855 for job 108ce7f6f5f3e76b12fad9dbdbc8feba (45917615
> bytes in
> > 61819 ms).
> > 2019-08-06 06:44:19,248 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 5856 @ 1565045059238 

Re: BucketingSink 内存使用分析

2019-08-02 文章 Biao Liu
把 heap dump 出来分析一下?

Thanks,
Biao /'bɪ.aʊ/



On Thu, Aug 1, 2019 at 6:07 PM 九思 <1048095...@qq.com> wrote:

> 请教各位老师,flink on
> yarn,1个TaskManager,4个slot,TaskManager内存4G,JobManager内存1G。使用BucketingSink写入hdfs,3秒checkpoint一次。每秒大概数据100条,3秒也就是300左右。输入数据,大概427个字节,427字节*300=128100字节=125KB;输出数据,大概80字节,80字节*300=24000字节=23KB。但是Flink
> 的 TaskManager的 jvm 不知道为啥占用了这么多,高的时候有1G。
> JVM (Heap/Non-Heap)
> Type Committed   Used Maximum
> Heap 2.68 GB 863 MB   2.68 GB
> Non-Heap 84.3 MB 82.8 MB  -1 B
> Total2.76 GB 946 MB   2.68 GB
>
>
>
> 按理说,3s chepoint 一次,会写入一次hdfs。也就是说,只有3s的数据才会缓存在内存。还有什么数据也在内存嘛?


Re: StreamingFileSink自定义输出的文件名

2019-07-31 文章 Biao Liu
貌似没有现成的办法,有人提了个相关的相关的 JIRA ticket [1],可以关注下

1. https://issues.apache.org/jira/browse/FLINK-13428

Thanks,
Biao /'bɪ.aʊ/



On Tue, Jul 30, 2019 at 11:51 AM 王佩  wrote:

> 用StreamingFileSink.forBulkFormat写到HDFS上的文件,默认文件名为
> part-subtaskID-bucketID,如: part-3-62529。
>
> 现在想自定义输出的文件名,不用part-subtaskID-bucketID这种格式。但在Flink中没有找到入口可以自定义文件名。
>
> 请教下各位大大,怎么才能实现。
>
>
> 感谢!辛苦!
>


Re: env.readFile只读取新文件

2019-07-31 文章 Biao Liu
恐怕没有现成的,自己写一个,继承 SourceFunction

Thanks,
Biao /'bɪ.aʊ/



On Wed, Jul 31, 2019 at 4:49 PM 王佩  wrote:

> 如下代码:
>
> DataStreamSource source = env.readFile(
> textInputFormat,
> "/data/appData/streamingWatchFile/source",
> FileProcessingMode.PROCESS_CONTINUOUSLY,
> 10 * 1000
>  );
>
> 当被监控目录下的某个文件被修改,如touch了一下,整个文件会重复处理一遍。
>
> 有没有什么方法,可以做到只读取新文件。想实现只读取新的Parquet文件的效果。
>


Re: 多滑动窗口问题

2019-07-18 文章 Biao Liu
比如这样?

dataStream.timeWindow(...) // 5m
dataStream.timeWindow(...) // 15m
dataStream.timeWindow(...) // 30m
...


刘晨  于2019年7月19日周五 下午2:07写道:

> 您好:
>  请问, 在进行流计算时, source相同, 处理逻辑相同, 但要计算不同的滑动时间窗口,
>  比如 每分钟统计最近 5m,15m,30m 以及 每15分钟计算, 1h, 3h , 12h的数据
>  除去每种窗口写一个程序外, 有其他更加便捷的解决方式吗 ?
>
>
>   谢谢


Re: Re: Flink 的 log 文件夹下产生了 44G 日志

2019-07-18 文章 Biao Liu
最根本的解法当然是去掉打日志的地方,这 source 不是 Flink 内置的,Flink 当然不能控制你们自定义 source 的行为。

你可以考虑自己改一下 log4j.properties,手动关掉这个 logger, Flink 内置的 log4j.properties 里有
example,参考着改一下

log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR,
file
改成 log4j.logger.com.JavaCustoms.FlinkJMSStreamSource=OFF, file

但是这明显是个 ERROR,最好还是解决一下,要不就是掩耳盗铃啊


Henry  于2019年7月19日周五 下午2:20写道:

>
>
>
> 你好,谢谢!是的,这个Source是用JMS实现的自定义Source。目前还在查原因,但是怎么能够让Flink不这样爆炸写log日志呢?20分钟就能写满磁盘,写了40G多。
>
>
>
>
>
> 在 2019-07-19 11:11:37,"Caizhi Weng"  写道:
> >Hi Henry,
> >
> >这个 source 看起来不像是 Flink 提供的 source,应该是 source 本身实现的问题。你可能需要修改 source
> >的源码让它出错后关闭或者进行其它处理...
> >
> >Henry  于2019年7月19日周五 上午9:31写道:
> >
> >>  大家好,之前那个报错图片大家没看到,重新弄一下。
> >> 报错图片链接:
> >> https://img-blog.csdnimg.cn/20190719092540880.png
> >> https://img-blog.csdnimg.cn/20190719092848500.png
> >>
> >>
> >>
> 我看报错的原因是,我这里Source用的是ActiveMQ,从昨天早上9点开始运行Flink任务接收消息,到今天早上8点都很正常。然后在今天早上8点4分的时候开始猛报错flink往log文件夹下写日志。第二个图是报错开始,显示ActiveMQ好像超时,然后就是消费者关闭一直猛写log。
> >>
> 我想问一下有没有什么办法设置flink不让他一直写log,这样感觉没办法测试运行啊,只要一报错就一直写日志,把服务器磁盘都干满了直接崩溃了。求助。
>


Re: Flink 的 log 文件夹下产生了 44G 日志

2019-07-17 文章 Biao Liu
Hi Henry,

邮件列表貌似不能支持直接贴图,所以无法理解“里面不停的在产生 error
信息,但是也没有看到具体哪里报错”是什么意思。尝试贴图到第三方,然后贴链接上来?不知道还有没有更好的办法

To zhisheng2018, 你为什么老回空邮件?


zhisheng2...@gmail.com  于2019年7月18日周四 上午12:49写道:

>


Re: 关于Flink Chain的一些问题

2019-07-09 文章 Biao Liu
你好,

> 1. 在文档中提到的 Chain 的概念我理解为 Source ->  -> Sink 单管道输送数据?

Chain 的 operator 合并到一个 task 中,可以避免潜在的网络传输,见 [1]

> 2. 对于 Chain 的概念是否有更多的资料可以参考,以及什么时候会破坏这个 Chain

文档参考:[1], [2]
Chain 的条件有很多种,最常见就是上下游并发要一致,有兴趣可以读一下相关代码 [3] 关于 isChainable 的部分

> 3. 调优中,如果 Sink 的 Parallel 小于前一个 Operatior 的 Parallel 对比 相同的 Parallel
> 会有什么区别,如何进行 Parallel 方面的调优,是否有资料可以供观摩参照的

Parallel 不相同的情况一定不会 chain,这是一个可能的问题
Parallel 调优没有什么具体的模式,适合你的就好,可以参考 back pressure 的情况,尽量在少占用资源和少 back pressure
之间平衡


1.
https://ci.apache.org/projects/flink/flink-docs-master/concepts/runtime.html#tasks-and-operator-chains
2.
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#task-chaining-and-resource-groups
3.
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java

runtime  于2019年7月10日周三 上午11:35写道:

> Hi all,
> 我在生产环境使用Flink一段时间后,随着集群不断变大,有些问题愈发明显
> 1. 在文档中提到的 Chain 的概念我理解为 Source ->  -> Sink 单管道输送数据?
> 2. 对于 Chain 的概念是否有更多的资料可以参考,以及什么时候会破坏这个 Chain
> 3. 调优中,如果 Sink 的 Parallel 小于前一个 Operatior 的 Parallel 对比 相同的 Parallel
> 会有什么区别,如何进行 Parallel 方面的调优,是否有资料可以供观摩参照的
>


Re: 基于事件时间的滑动窗口,如何加速窗口的关闭

2019-07-09 文章 Biao Liu
大概明白了你的场景,果然贴代码好很多 :)

我理解你的问题是:
1. source 数据不连续,10秒一个 batch
2. 而你用的又是 event time,event time 依赖数据提供的时间
3. 没数据来的这个间隙,event time 无法更新,没有 watermark 发下去,也就导致 window 不能关闭

我能想到的几个方面
1. 你真的需要 event time 吗?如果用 processing time 就没这问题
2. 10秒一个 batch 的数据源,可以优化吗?如果是连续输入,也就没有这问题了
3. 语义上来说,你的下一批数据不到,Flink 也没办法发送 watermark,因为不知道会不会来更早的数据,这个是你使用的模型决定的。Flink
提供了一套机制来解决类似问题,不过不确定你的 source 是否能支持,可以参考下. [1]

1.
https://ci.apache.org/projects/flink/flink-docs-master/dev/event_time.html#idling-sources


Ever <439674...@qq.com> 于2019年7月10日周三 上午11:45写道:

> 是10秒输出一次。 但每次的结果都会延迟10几秒。
>
> 问题的关键在于数据源是通过定时器批量发数据过来, 每10秒发一批, 发完再等10秒才发下一批。 flink收到一批数据后,
> 还得等window关闭才能处理这批数据并sink出去。 而window的关闭又要等下一批数据的到来。
>
> 下面是示范代码, source是kafka。
> ```
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.getConfig.setAutoWatermarkInterval(watermarkGenInterval)
>
> env.setParallelism(parallel)
>
> env.addSource(source)
>   .map(json => {
>   new InvokingInfoWrapper(xxx)
> })
>   .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seconds(5))
> {
> override def extractTimestamp(invoking: InvokingInfoWrapper): Long
> = {
>   invoking.timestamp
> }
>   })
>   .keyBy(invokingInfo => {
> s"${invokingInfo.caller}_${invokingInfo.callee}"
>   })
>   .timeWindow(Time.seconds(60), Time.seconds(10))
>   .reduce(innerReducer).map(invokingInfo => {
>   //some mapping code
>   invokingInfo
>   })
>   .addSink(new
> WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-sink")
> ```
>
> -- 原始邮件 --
> *发件人:* "Biao Liu";
> *发送时间:* 2019年7月10日(星期三) 中午11:34
> *收件人:* "user-zh";
> *主题:* Re: 基于事件时间的滑动窗口,如何加速窗口的关闭
>
> 你好,看完你的描述,有一些疑问
>
> > “一个job,基于事件时间,窗口大小为60秒,滑动窗口10秒(每10秒更新过去60秒的数据), 水印5秒。
> > 正常情况下, 整个流程最多延迟5秒即可看到计算后的结果。
> > 但是我们的数据源会每10秒集中过来一批数据。”
>
> 1. 你的 sliding window 不是10秒吗?不就应该10秒输出一次吗?
>
> > 例如9:02:2x(wall clock)的时候, 收到9:02:10~9:02:19的数据...
>
> 2. 你用了 watermark,但是又和 wall clock 比较,到底是基于 processing time 还是 event time?
>
> 3. 关于窗口关闭这个描述,没看懂,不如贴代码来的直接
>
> 4. 关于 window 的文档,可以参考
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#sliding-windows
>
>
> Ever <439674...@qq.com> 于2019年7月10日周三 上午10:39写道:
>
> > 一个job,基于事件时间,窗口大小为60秒,滑动窗口10秒(每10秒更新过去60秒的数据), 水印5秒。
> >
> > 正常情况下, 整个流程最多延迟5秒即可看到计算后的结果。
> >
> > 但是我们的数据源会每10秒集中过来一批数据。
> >
> >
> > 现在的问题是, 数据过来后, 窗口没法及时关闭, 而是要等下一个10秒的数据到来后, 才会触发窗口关闭。
> >
> >
> > 这种场景下, 如何加速窗口关闭?
> > 例如9:02:2x(wall clock)的时候, 收到9:02:10~9:02:19的数据,
> > 但是, 到9:02:3x的时候收到9:02:2x的数据的时候, 才会关闭9:02:1x这个窗口期。
> >
> >
> > 这时候计算结果会延迟10秒+。
> >
> >
> > 如何加速这个窗口的关闭呢?
> > 如果想在收到的最后一条数据的时间戳+watermark之后就关闭窗口, 如何设置?
>


Re: 基于事件时间的滑动窗口,如何加速窗口的关闭

2019-07-09 文章 Biao Liu
你好,看完你的描述,有一些疑问

> “一个job,基于事件时间,窗口大小为60秒,滑动窗口10秒(每10秒更新过去60秒的数据), 水印5秒。
> 正常情况下, 整个流程最多延迟5秒即可看到计算后的结果。
> 但是我们的数据源会每10秒集中过来一批数据。”

1. 你的 sliding window 不是10秒吗?不就应该10秒输出一次吗?

> 例如9:02:2x(wall clock)的时候, 收到9:02:10~9:02:19的数据...

2. 你用了 watermark,但是又和 wall clock 比较,到底是基于 processing time 还是 event time?

3. 关于窗口关闭这个描述,没看懂,不如贴代码来的直接

4. 关于 window 的文档,可以参考
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html#sliding-windows


Ever <439674...@qq.com> 于2019年7月10日周三 上午10:39写道:

> 一个job,基于事件时间,窗口大小为60秒,滑动窗口10秒(每10秒更新过去60秒的数据), 水印5秒。
>
> 正常情况下, 整个流程最多延迟5秒即可看到计算后的结果。
>
> 但是我们的数据源会每10秒集中过来一批数据。
>
>
> 现在的问题是, 数据过来后, 窗口没法及时关闭, 而是要等下一个10秒的数据到来后, 才会触发窗口关闭。
>
>
> 这种场景下, 如何加速窗口关闭?
> 例如9:02:2x(wall clock)的时候, 收到9:02:10~9:02:19的数据,
> 但是, 到9:02:3x的时候收到9:02:2x的数据的时候, 才会关闭9:02:1x这个窗口期。
>
>
> 这时候计算结果会延迟10秒+。
>
>
> 如何加速这个窗口的关闭呢?
> 如果想在收到的最后一条数据的时间戳+watermark之后就关闭窗口, 如何设置?


Re: 注册缓存文件的热更新问题

2019-07-04 文章 Biao Liu
据我所知,没有
自己写代码实现吧

戴嘉诚  于2019年7月5日周五 上午10:36写道:

> 好的,那我想问问,如果要定期更新文件的这个场景,flink有没有其他功能是否支持呢?
> 谢谢!
>
> 发件人: Biao Liu
> 发送时间: 2019年7月5日 10:20
> 收件人: user-zh
> 主题: Re: 注册缓存文件的热更新问题
>
> 这个接口只会在提交 job 时工作一次,不会检测更新
>
> Xintong Song  于2019年7月4日周四 下午7:39写道:
>
> > 你好,
> >
> > 这个应该是不可以的。
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Thu, Jul 4, 2019 at 4:29 PM 戴嘉诚  wrote:
> >
> > > 大家好:
> > >
> > >
> >
> 我在flink中看到可以注册一个分布式缓存文件StreamExecutionEnvironment.registerCachedFile()然后可以广播到每个tm上给算子使用,那么我想问问,这个文件可以检测到文件更新了,然后会重新广播过去嘛?因为ip会可能会每天都有改变,所以ip库要每天都更新。
> > >
> > >
> >
>
>


Re: 注册缓存文件的热更新问题

2019-07-04 文章 Biao Liu
这个接口只会在提交 job 时工作一次,不会检测更新

Xintong Song  于2019年7月4日周四 下午7:39写道:

> 你好,
>
> 这个应该是不可以的。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Jul 4, 2019 at 4:29 PM 戴嘉诚  wrote:
>
> > 大家好:
> >
> >
> 我在flink中看到可以注册一个分布式缓存文件StreamExecutionEnvironment.registerCachedFile()然后可以广播到每个tm上给算子使用,那么我想问问,这个文件可以检测到文件更新了,然后会重新广播过去嘛?因为ip会可能会每天都有改变,所以ip库要每天都更新。
> >
> >
>


Re: blink 版本 消费kafka 看不到group id

2019-06-24 文章 Biao Liu
哦,看你贴了 pom.xml 还以为是指 maven 的 group id
扫了眼 Kafka consumer,blink 和 flink 两边的代码有不一样的地方,blink 并不会保证一定和 flink 行为一致

建议
1. 再调试一下,可以参考两边 Kafka consumer 具体的实现,例如FlinkKafkaConsumerBase 的具体代码
2. 看到一个可能的原因,试一试 disable checkpointing
3. 继续向社区求助,最好提供更详细的配置/代码/log等,估计不会有人"用blink那个版本的pom测试一下”
4. 还不行的话可以向 @Becket 求助,他是 Kafka 的大拿,也负责 blink 这边的 Kafka consumer


雷水鱼  于2019年6月25日周二 下午12:25写道:

> 问题实际上就是在kafka server 那里看不到消费者组,也就是group id ,
> 表现出来的现象就是就是flink重启后,根本就没有按上次的offset开始继续消费
> 也就是之前flink,一直都没有提交过offset,也看不到消费者组,
>  可以用blink那个版本的pom测试一下
>
> ------
> 发件人:Biao Liu 
> 发送时间:2019年6月25日(星期二) 12:10
> 收件人:user-zh ; 雷水鱼 
> 主 题:Re: blink 版本 消费kafka 看不到group id
>
> 抱歉,没看懂你的问题
> 1. group id 和 offset 有什么关系?
> 2. "在kafka 里看不到group id" 指什么?
>
> 雷水鱼  于2019年6月24日周一 下午5:05写道:
> 现象
> 使用这个pom ,在kafka 里看不到group id
> 
>  com.alibaba.blink
>  flink-streaming-scala_2.11
>  blink-3.2.2
>  
>  
>  org.slf4j
>  slf4j-api
>  
>  
> 
> 
>  com.alibaba.blink
>  flink-connector-kafka-0.11_2.11
>  blink-3.2.0
>  
>  
>  org.slf4j
>  slf4j-api
>  
>  
> 
> 使用开源版本,可以看到在kafka 看到group id
> 
>  org.apache.flink
>  flink-streaming-java_2.12
>  1.7.1
> 
>
> 
> 
>  org.apache.flink
>  flink-connector-kafka-0.11_2.12
>  1.7.1
> 
>
> 有没有大佬能看一下,在业务场景 中 还是需要看下 offset 的
>
>
>
>
>


Re: 你好!

2019-06-24 文章 Biao Liu
抱歉,没完全看懂你的问题,尝试回答下,更新拓扑目前需要重启

杨胜松(鼓翅)  于2019年6月24日周一 下午4:26写道:

>
> 你好!
>
> 请教下,假设我一个拓扑现在有三个sql已经在跑了,现在我想加第四个sql进来,那么我一定要重发这个拓扑么?有什么办法可以在不影响这三个sql计算的情况下,把第四个sql也加进来么?
> 刚接触flink,还处在摸索阶段,有什么资料能分享下更好啦~
>
>
>
>
>
>
> --
> 风险能力中台-产品技术部-风控平台-计算平台
> 地址:浙江-杭州-余杭-西溪园区
> 电话:1575183XXX
>
>


Re: blink 版本 消费kafka 看不到group id

2019-06-24 文章 Biao Liu
抱歉,没看懂你的问题
1. group id 和 offset 有什么关系?
2. "在kafka 里看不到group id" 指什么?

雷水鱼  于2019年6月24日周一 下午5:05写道:

> 现象
> 使用这个pom ,在kafka 里看不到group id
> 
>  com.alibaba.blink
>  flink-streaming-scala_2.11
>  blink-3.2.2
>  
>  
>  org.slf4j
>  slf4j-api
>  
>  
> 
> 
>  com.alibaba.blink
>  flink-connector-kafka-0.11_2.11
>  blink-3.2.0
>  
>  
>  org.slf4j
>  slf4j-api
>  
>  
> 
> 使用开源版本,可以看到在kafka 看到group id
> 
>  org.apache.flink
>  flink-streaming-java_2.12
>  1.7.1
> 
>
> 
> 
>  org.apache.flink
>  flink-connector-kafka-0.11_2.12
>  1.7.1
> 
>
> 有没有大佬能看一下,在业务场景 中 还是需要看下 offset 的
>
>
>
>


Re: Flink程序长期运行后报错退出 PartitionRequestQueue - Encountered error while consuming partitions

2019-06-24 文章 Biao Liu
你好,"Connection reset by peer" 是网络 TCP 层的异常,见[1]
建议查一下该 task 的下游消费者 task

1.
https://stackoverflow.com/questions/1434451/what-does-connection-reset-by-peer-mean

罗学焕/予之  于2019年6月18日周二 下午5:08写道:

> 大家好:
> Flink应用,如以100笔/s的交易量向kafka写入数据(数据量不大),Flink程序接受并处理数据,涉及到 20个左右的 流表 Join
> 。和大量的异步操作读取hbase 维表。
> 运行1-2小时后,Flink应用停止运行并报错,(报错关键堆栈如下,省略部分为flink.shaded.netty部分的堆栈)
> 观察过内存未溢出,网络负载也不高。
> 不知道是啥原因,大家能帮忙看下吗?
>
> 主要报错:
> ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue -
> Encountered error while consuming partitions
> java.io.IOException: Connection reset by peer
> ...
> ...
> org.apache.flink.runtime.io
> .network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:264)
> org.apache.flink.runtime.io
> .network.netty.PartitionRequestQueue.enqueueAvailableReader(PartitionRequestQueue.java:116)
> org.apache.flink.runtime.io
> .network.netty.PartitionRequestQueue.userEnentTriggered(PartitionRequestQueue.java:176)
> ...
> ...
> ...
> ...
>
>


Re: Flink tps 速度问题

2019-06-24 文章 Biao Liu
你好,建议自己先排查一下,把问题范围缩小,别人才好帮忙
可以看看 back pressure 具体慢在哪里,再具体分析

haibin <191560...@qq.com> 于2019年6月19日周三 上午5:50写道:

> hello,各位大佬:
>
>
> 在做实时etl的时候,source(kafka)->map->filter->flatmap->map->sink(kafka)这样流程etl的时候,发现处理速度很慢,有什么好的方法提高处理速度。
>有25个作业同时消费同一个topic(32个分区),会不会有性能问题?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink filesystem 1.7.2 on Hadoop 2.7 BucketingSink.reflectTruncat() 有写入很多小文件到hdfs的风险

2019-06-24 文章 Biao Liu
你好,看了下代码,1.7.2 确实有这问题,最新的代码已经 fix,见[1]
如果可以的话,升级到1.8.0就包含了该 fixing

1.
https://github.com/apache/flink/commit/24c2e17c8d52ae2f0f897a5806a3a44fdf62b0a5

巫旭阳  于2019年6月24日周一 下午2:40写道:

> 源码在 BucketingSink 615行
> Path testPath = new Path(basePath, UUID.randomUUID().toString());
> try (FSDataOutputStream outputStream = fs.create(testPath)) {
>outputStream.writeUTF("hello");
> } catch (IOException e) {
> LOG.error("Could not create file for checking if truncate works.", e);
>throw new RuntimeException("Could not create file for checking if
> truncate works. " +
> "You can disable support for truncate() completely via " +
> "BucketingSink.setUseTruncate(false).", e);
> }
>
> try {
>m.invoke(fs, testPath, 2);
> } catch (IllegalAccessException | InvocationTargetException e) {
> LOG.debug("Truncate is not supported.", e);
> m = null;
> }
>
> try {
>fs.delete(testPath, false);
> } catch (IOException e) {
> LOG.error("Could not delete truncate test file.", e);
>throw new RuntimeException("Could not delete truncate test file. " +
> "You can disable support for truncate() completely via " +
> "BucketingSink.setUseTruncate(false).", e);
> }
> line 635 开始创建一个测试文件 “FSDataOutputStream outputStream = fs.create(testPath)”
> line 636 尝试写入 一段 测试文字"hello" "outputStream.writeUTF("hello")"
> line 645 调用 truncate 方法“m.invoke(fs, testPath, 2);”
> line 652 删除测试文件 “fs.delete(testPath, false);“
> 上述逻辑有一些瑕疵 :
>  1 在635行创建一个测试文件后,636行写入hello 失败,抛出异常(导致程序重启或退出)
>  2 在645行调用m.invocate 失败 抛出异常(导致程序重启或退出)
>  两行操作都抛出异常终止程序或重启程序,导致创建的测试文件无法被删除,极端情况下。程序一直在抛出异常然后重启,根据我阅读的代码
> reflectTruncat(Filesystem fs)是程序初始化 state的时候会执行。
>
>
> 望大佬能指点一下,是我的姿势不对还是这块的设计有瑕疵。
>
>


Re: flink连续窗口

2019-06-24 文章 Biao Liu
你好,我觉得问题有点太抽象,建议先看看官网文档[1],还有 Flink 自带的例子[2]

1.
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html
2.
https://github.com/apache/flink/tree/release-1.8/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing


残翅2008 <770968...@qq.com> 于2019年6月24日周一 上午11:14写道:

> 求flink连续窗口资料 demo 调优场景,谢谢~~


Re: UnknownTaskExecutorException: No TaskExecutor registered

2019-05-28 文章 Biao Liu
你好,
你贴的异常栈的意思是,webUI 请求的这个 TM
已经不存在了,引起问题的原因可能是你之前在浏览器打开的一个页面无法继续访问,把这个页面关掉应该就不会报错了
这个问题不会导致“不可用”,只是个展示错误,你说的“并且程序不能正常工作”可能是其他问题引起的

xuzhong...@wsmtec.com  于2019年5月27日周一 下午5:17写道:

> 大家好!
>
> flink启动一段时间后,一直输出以下错误日志,并且程序不能正常工作了。
> 请问有遇到过的么,怎么解决的?
> 是否是内存等资源不够了
>
> Flink版本V1.7.2
> 2019-05-27 16:03:03,316 ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler
> - Implementation error: Unhandled exception.
> org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
> No TaskExecutor registered under
> container_e371_1558915641092_0246_01_02.
> at
> org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:563)
> at sun.reflect.GeneratedMethodAccessor38.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> https://issues.apache.org/jira/browse/FLINK-8929 这有同样问题,但没有解决
>
>
>
>
> walt.xu
>
>


Re: flink消费kafka问题

2019-05-20 文章 Biao Liu
Hi, 你需要正确处理“落库失败”的数据,例如可以直接抛异常出来,这样 job 会不停 failover,直到不再落库失败


方伟  于2019年5月20日周一 下午6:02写道:

> Hi 你好~:
>
> 请教个问题:我用flink消费kafka的数据,使用了checkpoint记录分区的偏移量,5s做一次checkpoint,并设置了EXACTLY_ONCE,让消费的数据落到mysql中,如何保证落库失败了(比如数据库中字段长度设置小了),当重新消费时还会消费到上次那条数据(我的理解是此时可能那条数据已经做了checkpoint了,下次消费就会跳过这条数据,是这样吗?该如何解决呢?),谢谢!


Re: java.lang.NoClassDefFoundError: org/apache/flink/table/api/TableEnvironment

2019-04-15 文章 Biao Liu
Hi,
猜测是因为 flink-dist.jar 中未包含 table 相关 jar,opt/ 下应该有
flink-table-xxx.jar,带上这个包再运行试试 (可以先粗暴点把这个包放到 lib/ 下再运行)

Han Mingcong  于2019年4月15日周一 下午5:05写道:

> 您好,
>近期在调研Flink,从官网下载的flink-1.7.2-bin-hadoop28-scala_2.11版本,在使用flink run
> 提交一个Table作业时有以下报错:
>
> [bin]$ ./flink run -c WordCountTable ../../WC.jar
> Starting execution of program
> java.lang.NoClassDefFoundError: org/apache/flink/table/api/TableEnvironment
> at WordCountTable.main(WordCountTable.java:22)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.table.api.TableEnvironment
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 18 more
>
> 程序源码来自于
> https://github.com/apache/flink/blob/release-1.7/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/WordCountTable.java
>
> pom.xml:
>
> 
> 
> org.apache.flink
> flink-table_2.11
> 1.7.2
> 
> 
> org.apache.flink
> flink-streaming-scala_2.11
> 1.7.2
> 
> 
>
> 请问如何解决这个问题?
>
>


Re: flink on yarn 启动任务失败

2019-04-08 文章 Biao Liu
Hi,
“Queue's AM resource limit exceeded”
-> 这个应该是 YARN 对 AM 的使用资源进行了限制吧,上限是 4096M 内存?你启动的应该是 job mode 吧,每个 job
都会启动单独的 AM,每个 AM 占用 2048M 内存?如果按这样算的话确实只够启动两个

1900 <575209...@qq.com> 于2019年4月4日周四 下午4:54写道:

> 目前整体采用flink on yarn ha 部署,flink版本为社区版1.7.2,hadoop版本为社区版2.8.5
>
>
> 目前总共有5台flink集群,每台服务器CPU4核,内存8G
>
>
> flink基本配置为
> jobmanager.heap.size: 2048m
> taskmanager.heap.size: 2048m
> taskmanager.numberOfTaskSlots: 4
>
>
> 采用run a job on flink 启动任务,现在每个任务一个并行度
> 命令如 flink run -d -m yarn-cluster  ...
>
>
> 当发布两个任务成功后,第三个任务就启动不了
> 部分启动日志如下
> 360 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor   -
> Submitting application master application_1554100483755_0013
> 2019-04-04 16:24:23,389 INFO
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted
> application application_1554100483755_0013
> 2019-04-04 16:24:23,389 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Waiting for
> the cluster to be allocated
> 2019-04-04 16:24:23,390 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deploying
> cluster, current state ACCEPTED
> 2019-04-04 16:25:23,625 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> 2019-04-04 16:25:23,876 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> 2019-04-04 16:25:24,127 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
> 2019-04-04 16:25:24,378 INFO
> org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Deployment
> took more than 60 seconds. Please check if the requested resources are
> available in the YARN cluster
>
>
>
> 其他找不到任何跟踪信息,查看yarn 控台后,发现容器分配不了,页面上的信息如下
> YarnApplicationState:   ACCEPTED: waiting for AM container to be
> allocated, launched and register with RM.
>
>
> Diagnostics:[Thu Apr 04 16:33:49 +0800 2019] Application is added to
> the scheduler and is not yet activated.
> Queue's AM resource limit exceeded. Details : AM Partition =
> ; AM Resource Request = ;
> Queue Resource Limit for AM = ; User AM Resource
> Limit of the queue = ; Queue AM Resource Usage =
> ;
>
>
>
> 1.按照上面的机器划分跟启动设置并行度,还有yarn控台节点查看,还有很多内存跟CPU没有使用到,
> 为什么会出现这种情况,是还需要什么配置吗?
>
> 2.对于上面几个基本配置,jobmanager.heap.size,taskmanager.heap.size,taskmanager.numberOfTaskSlots有什么设置注意点吗?
> 一般要怎么设置?我现在发现这种启动模式下,每个任务都会有一个jobmanager跟一个taskmanger


Re: 求助,blink资源配置的问题,为什么资源还不足啊。。。

2019-04-08 文章 Biao Liu
Hi,可以提供更详细的信息吗?例如
1. 版本号
2. 完整的日志
3. 完整的集群配置文件
4. 集群是 on YARN 还是 standalone? 启动集群命令?
5. 完整的 job 信息?启动 job 的命令?


邓成刚【qq】  于2019年4月4日周四 下午6:13写道:

> 求助,blink资源配置的问题,为什么资源还不足啊。。。
> 盼回复,谢谢!
>
> 为什么
>
> 2019-04-04 17:49:32,495 INFO
>  org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Requesting
> slot with profile
> ResourceProfile{cpuCores=26.44, heapMemoryInMB=4144,
> directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=101,
> MANAGED_MEMORY_MB=0.0,
> FLOATING_MANAGED_MEMORY_MB=0.0} from resource manager (request =
> ba5acbd5a1fdeb37bcfc05e43695a622).
> 2019-04-04 17:49:32,496 INFO
>  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Request slot with profile
> ResourceProfile{cpuCores=26.44, heapMemoryInMB=4144,
> directMemoryInMB=0, nativeMemoryInMB=0, networkMemoryInMB=101,
> MANAGED_MEMORY_MB=0.0,
> FLOATING_MANAGED_MEMORY_MB=0.0} for job eef62bb1555d0450fa097f1eccc9d6d8
> with allocation id 91e5bb6ded9ef90104db0179c1415ade.
> 2019-04-04 17:54:32,460 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Batch
> request 88 slots, but only 0 are fulfilled.
>
>
> TS 和CPU 都有 200,UserHeap MEM:500G,Managed MEM:300G,Network MEM:320M
>
> 配置如下: 共5 台机器 参数配置都一模一样的。
>
> jobmanager.heap.mb: 20480
>
> taskmanager.heap.mb: 102400
>
> taskmanager.memory.off-heap: true
>
> taskmanager.memory.size : 61440
>
> taskmanager.cpu.core: 40
>
> taskmanager.numberOfTaskSlots: 40
>
> 邓成刚【qq】


Re: 请教关于Keyed() 方法的问题。

2019-04-08 文章 Biao Liu
Hi, 尝试理解fli一下你的疑问
“其中,每个具体mapFunc处理的数据,应该是相同的key数据。不知理解是否正确”
-> keyby 只会保证相同 key 的数据会被分在相同 mapFunc 中,每个 mapFunc 可能会处理不同 key 的数据,详见官网文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/

Yaoting Gong  于2019年4月4日周四 下午2:00写道:

> 大家好,
>
> 先说本人的理解,keyed(..).flatmap(mapFunc())
> 其中,每个具体mapFunc处理的数据,应该是相同的key数据。不知理解是否正确。
>
> 我的具体情况是
>   我对数据对校验处理。首先根据设备id (uuid) 分组,然后针对不同分组进行数据校验。
> 部分代码如下:
>
> rowData.filter(legalData _)
>.map(data => BehaviorComVO(getText(data, "id"), getText(data, "uuid"),
> getText(data, "session_id"), getText(data, "source"), getText(data,
> "product_version")))
>*  .keyBy(_.uuid)*
> * .flatMap(new RepeatIdCheckDispatch())*
>  .addSink()
>
> *RepeatIdCheckDispatch*  细节:
>
> *  override def flatMap(in: BehaviorComVO, out: Collector[String]): Unit =
> {*
>
> *in match {*
> *  case BehaviorComVO(_, _, _, "visit", _) =>*
> *if (!repeatIdChecker.isOK) out.collect(repeatIdChecker.result)*
>
> *repeatIdChecker = RepeatIdChecker(in)*
>
> *  case _: BehaviorComVO => repeatIdChecker.doCheck(in)*
> *}*
> *  }*
>
> "visit" 是一个周期数据的开始。。但是运行之后,我发现,有其他uuid的数据,进入到同一个 *RepeatIdChecker 中*,
>


Re: flink on yarn 模式 日志问题

2019-04-08 文章 Biao Liu
1. 这个日志确实会存在,如果你觉得5秒打印两行不能接受的话,我能想到的几种解决方法
  1.1. 加大 checkpoint 间隔
  1.2. 单独指定该 logger 的 level,修改
log4j.properties,增加一行:log4j.logger.org.apache.flink.runtime.checkpoint.CheckpointCoordinator=WARN
  1.3. 修改源代码重新编译
2. 确实在 YARN 模式下,日志的位置不固定,和你的需求不匹配,standalone 模式可能更友好些。硬核一点的方法,可以扩展 log4j
appender,不只打到文件,可以搜一下有没有现成的解决方案
3. Flink session/job 挂掉的话,仍可以通过 YARN 获取日志,只是无法和 Flink task
映射,需要自己分析对应关系来排查问题。可以考虑使用 Flink history server 来协助排查,参见:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/historyserver.html

1900 <575209...@qq.com> 于2019年4月4日周四 下午1:42写道:

> 很高兴回复的这么详细,以后问题会继续描述详细点
>
>
> 现在目前flink用的版本是社区版1.7.2,hadoop版本是2.8.5,采用flink on yarn ha部署,服务启动采用 run a
> job on yarn
>
>
> 1.代码中配置了env.enableCheckpointing(5000);想屏蔽的日志如下
>
>
> 2019-04-04 13:23:50,176 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 2707 @ 1554355430174 for job c6028596fef272ae93bf4cfb625a48c9.
> 2019-04-04 13:23:50,218 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 2707 for job c6028596fef272ae93bf4cfb625a48c9 (13472 bytes in 33
> ms).
>
>
>
> 假设配置的checkpoint时间为5秒,那么时间会会越来越大,在yarn 的控台查看jobmanager.log会越来越卡,一天下来就打不开了
>
>
> 如 页面通过点击如下面查看
> jobmanager.err : Total file length is 573 bytes.
> jobmanager.log : Total file length is 363370 bytes.
> jobmanager.out : Total file length is 0 bytes.
>
>
>
>
> 3.是想通过其他日志搜集服务,将日志搜集到其他服务器作为监控(比如放到es里等),现在根据第二个回答,找到路径了,这种情况下,每次启动都随机生成container_id,路径是随机变化的,
> 这样貌似不怎么好搜集把?大家是什么处理日志的?
>
> 4.根据上面的,突然想到个问题是,假设现在flink任务挂了或者停了,就没法在yarn控台继续跟踪了(想通过日志追踪到底发生了什么错误),没法映射过去,这个该怎么设置了?或者用上面第3种进行搜集汇总到日志服务器上?
>
>
>
>
>
>
>
>
> -- 原始邮件 --
> 发件人: "Biao Liu";
> 发送时间: 2019年4月4日(星期四) 中午11:14
> 收件人: "user-zh";
>
> 主题: Re: flink on yarn 模式 日志问题
>
>
>
> Hi,
> 首先,Flink 框架的日志应该不多,不知道具体涉及到 checkpoint 的是哪些 log 呢?(建议以后提问给出尽可能详细的信息,例如使用版本和
> log 文件)
> 1. log 是通过 log4j/logback 等第三方系统控制,conf 文件夹中有相应配置文件,可以调整整体或单个 logger 的
> level,建议查阅相关系统资料
> 2. Flink on YARN 模式下,一般访问 log 是通过 Flink web UI 跳转查看。如果坚持想找到具体 log
> 文件,Application 结束前在本地文件,可通过 container 进程启动命令看到具体 log 文件,结束后可能归档到 HDFS 上,请查阅
> YARN 相关资料
> 3. log 使用方式,建议阅读官网文档,详见
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/logging.html
> ,另,Flink
> 本身不带“日志监控系统”,日志路径详见上述第二个回答
>
> 1900 <575209...@qq.com> 于2019年4月4日周四 上午10:45写道:
>
> > 程序中设置了检查点,env.enableCheckpointing(5000);
> > 日志中就不断的有日志,时间长了日志就会越来越大,在yarn控台 看job日志,会越来越大,根本就不能看了
> > 1.请问检查点日志打印可以关闭吗?或者有其他方式看吗?
> > 2.请问在这种模式下,如何去找日志文件存放文件路径,目前找不到,是不是在hdfs中
> > 3.请问flink任务中大家是怎么打印日志的?假设slf4j+logback,在on yarn
> > 模式下如何设置,日志会打到什么地方,能不能被搜集到监控系统中搜集到(日志文件在具体某个路径下,还是在hdfs中)


Re: flink on yarn 模式 日志问题

2019-04-03 文章 Biao Liu
Hi,
首先,Flink 框架的日志应该不多,不知道具体涉及到 checkpoint 的是哪些 log 呢?(建议以后提问给出尽可能详细的信息,例如使用版本和
log 文件)
1. log 是通过 log4j/logback 等第三方系统控制,conf 文件夹中有相应配置文件,可以调整整体或单个 logger 的
level,建议查阅相关系统资料
2. Flink on YARN 模式下,一般访问 log 是通过 Flink web UI 跳转查看。如果坚持想找到具体 log
文件,Application 结束前在本地文件,可通过 container 进程启动命令看到具体 log 文件,结束后可能归档到 HDFS 上,请查阅
YARN 相关资料
3. log 使用方式,建议阅读官网文档,详见
https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/logging.html,另,Flink
本身不带“日志监控系统”,日志路径详见上述第二个回答

1900 <575209...@qq.com> 于2019年4月4日周四 上午10:45写道:

> 程序中设置了检查点,env.enableCheckpointing(5000);
> 日志中就不断的有日志,时间长了日志就会越来越大,在yarn控台 看job日志,会越来越大,根本就不能看了
> 1.请问检查点日志打印可以关闭吗?或者有其他方式看吗?
> 2.请问在这种模式下,如何去找日志文件存放文件路径,目前找不到,是不是在hdfs中
> 3.请问flink任务中大家是怎么打印日志的?假设slf4j+logback,在on yarn
> 模式下如何设置,日志会打到什么地方,能不能被搜集到监控系统中搜集到(日志文件在具体某个路径下,还是在hdfs中)


Re: 1.7.1版本getDistributedCache().getFile("key")失败问题

2019-04-03 文章 Biao Liu
不要用 detached job-mode 就好了呀,用 session mode

天边的云  于2019年4月3日周三 下午4:55写道:

> Hi,除了DistributedCache机制。flink on yarn还有什么方法可以解决 在TaskManager中读取配置文件路径的办法吗?
> 在2019年4月3日 16:39,Biao Liu 写道:
> Hi, 该 issue 中 Till 已经进行了说明,看起来是 detached job-mode 的 bug,目前社区版确实对 job-mode
> 支持的不太好,坑比较多,建议不要使用
>
> 天边的云  于2019年4月3日周三 下午4:28写道:
>
> 我看社区已经有人反馈了此问题,看上去像是1.6对BlobServer做了改动导致的?
>
>
> https://issues.apache.org/jira/browse/FLINK-10370?jql=project%20%3D%20FLINK%20AND%20text%20~%20%22registerCachedFile%22
>
>
> 在2019年4月3日 16:26,Biao Liu 写道:
> Hi, 按照你给出的部分代码,看不出问题所在,按道理是不应该的
> 建议给出更详细的信息,可以考虑以下几个方面
> 1. 完整可复现的代码(可以放到附件中),方便其他同学复现问题
> 2. 具体使用版本
>
> PS: 如果你确认不是你测试环境/代码的问题,甚至可以直接建 jira issue 来反馈 bug,参见
> https://flink.apache.org/how-to-contribute.html#file-a-bug-report
>
>
> 天边的云  于2019年4月3日周三 下午3:24写道:
>
> 如题,按照官方例子
> env9.registerCachedFile("hdfs://xxx/key", "key");
>
>
> ...
> .map(
> new RichMapFunction() {
> @Override
> public String map(String value) throws Exception {
> return value;
> }
> @Override
> public void open(Configuration parameters) {
> File file =
> getRuntimeContext().getDistributedCache().getFile("key");
> System.out.println(file.getAbsolutePath());
> }
>
>
> });
> ...
> 之前1.5版本没有任何问题,相同的代码更换到1.7.1版本之后报如下错误:
> java.lang.IllegalArgumentException: File with name 'key' is not available.
> Did you forget to register the file?
>
>
> 有谁遇到相同的问题或对这块代码有研究的吗?
>
>


Re: 1.7.1版本getDistributedCache().getFile("key")失败问题

2019-04-03 文章 Biao Liu
Hi, 该 issue 中 Till 已经进行了说明,看起来是 detached job-mode 的 bug,目前社区版确实对 job-mode
支持的不太好,坑比较多,建议不要使用

天边的云  于2019年4月3日周三 下午4:28写道:

> 我看社区已经有人反馈了此问题,看上去像是1.6对BlobServer做了改动导致的?
>
> https://issues.apache.org/jira/browse/FLINK-10370?jql=project%20%3D%20FLINK%20AND%20text%20~%20%22registerCachedFile%22
>
>
> 在2019年4月3日 16:26,Biao Liu 写道:
> Hi, 按照你给出的部分代码,看不出问题所在,按道理是不应该的
> 建议给出更详细的信息,可以考虑以下几个方面
> 1. 完整可复现的代码(可以放到附件中),方便其他同学复现问题
> 2. 具体使用版本
>
> PS: 如果你确认不是你测试环境/代码的问题,甚至可以直接建 jira issue 来反馈 bug,参见
> https://flink.apache.org/how-to-contribute.html#file-a-bug-report
>
>
> 天边的云  于2019年4月3日周三 下午3:24写道:
>
> 如题,按照官方例子
> env9.registerCachedFile("hdfs://xxx/key", "key");
>
>
> ...
> .map(
> new RichMapFunction() {
> @Override
> public String map(String value) throws Exception {
> return value;
> }
> @Override
> public void open(Configuration parameters) {
> File file =
> getRuntimeContext().getDistributedCache().getFile("key");
> System.out.println(file.getAbsolutePath());
> }
>
>
> });
> ...
> 之前1.5版本没有任何问题,相同的代码更换到1.7.1版本之后报如下错误:
> java.lang.IllegalArgumentException: File with name 'key' is not available.
> Did you forget to register the file?
>
>
> 有谁遇到相同的问题或对这块代码有研究的吗?
>


Re: 1.7.1版本getDistributedCache().getFile("key")失败问题

2019-04-03 文章 Biao Liu
Hi, 按照你给出的部分代码,看不出问题所在,按道理是不应该的
建议给出更详细的信息,可以考虑以下几个方面
1. 完整可复现的代码(可以放到附件中),方便其他同学复现问题
2. 具体使用版本

PS: 如果你确认不是你测试环境/代码的问题,甚至可以直接建 jira issue 来反馈 bug,参见
https://flink.apache.org/how-to-contribute.html#file-a-bug-report


天边的云  于2019年4月3日周三 下午3:24写道:

> 如题,按照官方例子
> env9.registerCachedFile("hdfs://xxx/key", "key");
>
>
> ...
> .map(
> new RichMapFunction() {
>   @Override
>   public String map(String value) throws Exception {
> return value;
>   }
>   @Override
>   public void open(Configuration parameters) {
>File file =
> getRuntimeContext().getDistributedCache().getFile("key");
> System.out.println(file.getAbsolutePath());
>   }
>
>
> });
> ...
> 之前1.5版本没有任何问题,相同的代码更换到1.7.1版本之后报如下错误:
> java.lang.IllegalArgumentException: File with name 'key' is not available.
> Did you forget to register the file?
>
>
> 有谁遇到相同的问题或对这块代码有研究的吗?


Re: 远程提交代码到Flink集群

2019-04-02 文章 Biao Liu
Hi, 由于你提供的细节并不多,无法很好地理解你的需求
你的作业中没有自定义代码 (例如 java/scala 代码) 吗?如果有的话,就必须上传 jar,Flink 接受的是编译后的字节码,并不提供编译功能

PS:我理解"自动化"和"上传 jar"并没有直接联系

文报 <1010467...@qq.com> 于2019年4月2日周二 下午2:40写道:

> 谢谢各位的回复。
>
>  
> 我通过将代码推到git上,利用jenkins打包生成jar文件,然后调用shell脚本完成了简单的自动化。今天看见了JobManager是可以直接接受JobGraph,那么我能不能在代码中直接获取到自己代码生成的JobGraph,提交到JobManager上,这样就不需要通过jar包的形式上传运行了。如果能实现,第一步我应该怎么去做。
> 期待各位的回信,感谢。
>
>
>
>
> -- 原始邮件 --
> 发件人: "我自己的邮箱"<1010467...@qq.com>;
> 发送时间: 2019年3月29日(星期五) 下午2:19
> 收件人: "user-zh";
>
> 主题: 回复: 远程提交代码到Flink集群
>
>
>
> 谢谢各位的解答,我试试。
>
>
>
>
> -- 原始邮件 --
> 发件人: "Lifei Chen";
> 发送时间: 2019年3月29日(星期五) 中午11:10
> 收件人: "user-zh";
>
> 主题: Re: 远程提交代码到Flink集群
>
>
>
> 有一个小巧的go cli, 支持直接部署jar包到flink manager上。
>
> https://github.com/ing-bank/flink-deployer
>
> 希望能帮到你!
>
> Kaibo Zhou  于2019年3月29日周五 上午11:08写道:
>
> > 可以用 flink 提供的 Restful API 接口,upload 上传 jar 包然后 run。
> >
> > 参考:
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jars-upload
> > 和
> https://files.alicdn.com/tpsservice/a8d224d6a3b8b82d03aa84e370c008cc.pdf
> > 文档的介绍
> >
> > 文报 <1010467...@qq.com> 于2019年3月28日周四 下午9:06写道:
> >
> > > 各位好!
> > >
> > >
> >
> 请教一下各位,在本地开发完代码后,怎么样可以将编写好的代码直接提交到Flink集群上运行?(想做Flink任务的自动化,避免每次开发完成后都需要打jar包提交到web页面上)
> >


Re: HA切换

2019-04-01 文章 Biao Liu
Hi wuzhixin,
HA 切换时会重启 job,Flink 社区版目前的实现是这样的
可以了解下 Blink, 我们在社区版基础上优化了 master failover 的策略,可以避免重启 job

马 敬源  于2019年4月2日周二 上午9:45写道:

> Hi,wuzhixin:
> 尝试改一下flink-conf.yaml 这个配置:
>
> jobmanager.execution.failover-strategy: individual
>
>
> 来自 Outlook
>
> 
> 发件人: wuzhixin 
> 发送时间: 2019年4月1日 16:37
> 收件人: user-zh@flink.apache.org
> 主题: HA切换
>
> Hi all:
>
> 今天我们standalone的集群,使用zookeeper做了HA机制,但是今天因为zookeeper的一些原因,来了一次HA切换,然后
> 我们发现所有的job都重启了,请问这是标准处理么?
> flink的这种机制是不是不太好
>


Re: flink on yarn ha 高可用问题

2019-04-01 文章 Biao Liu
Hi,
这问题其实跟 Flink 无关,请了解下 HDFS nameservice。正确配置 HDFS 后,在 Flink 中填写路径时就不需要写死 name
node 地址了

天之痕 <575209...@qq.com> 于2019年4月2日周二 上午11:29写道:

> 请问该怎么处理,我目前在hadoop中配置了
>
> 
>
> fs.defaultFS
>
> hdfs://hacluster/
>  
>
>
>
> 
>
> dfs.client.failover.proxy.provider.hacluster
>
>
> org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
>  
>
>
> hadoop中是能模拟namenode切换的
>
>
> 1.请问现在flink中怎么配置?目前flink的环境都安装了hadoop,导出了hadoop环境变量
> 2.如果要求客户端也要包含hdfs ha的配置,那是不是flink扩展的同时也到在对应的服务器上配置hadoop,同时进行hadoop扩展?
>
>
>
>
>
>
>
>
> -- 原始邮件 --
> 发件人: "Lin Li";
> 发送时间: 2019年4月2日(星期二) 上午9:47
> 收件人: "user-zh";
>
> 主题: Re: flink on yarn ha 高可用问题
>
>
>
> hdfs ha mode 下配置 logical name (
>
> https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithNFS.html
> )
> flink 中访问配置的 hdfs logical name (同时需要确保 flink task 运行环境 hdfs-client 中也包含了
> hdfs ha 配置的 hdfs logical name 和实际指向的 namedone 映射关系)
>
>
> 天之痕 <575209...@qq.com> 于2019年4月2日周二 上午9:07写道:
>
> > flink on yarn ha模式下
> > state.checkpoints.dir:
> > hdfs://namenode-host:port/flink-checkpoints这个配置怎么配置能进行namenode能自动切换
> >
> >
> 现在只能指定namenode的hostname,假设这个namenode节点挂了,或者standby状态,flink任务checkpoint就报错,只能把节点起来,而且要将其切换为active节点
> > 目前hadoop namenode已经进行ha部署了,手动kill
> 一个namenode进程能自动切换;暂时理解flink中这个配置貌似只能配死一个
> >
> >
> > 请问如何解决这个问题,保证flink的高可用


Re: Flink Job 监控

2019-03-28 文章 Biao Liu
Hi, 可以了解下 RESTful API
https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/rest_api.html

cheng  于2019年3月28日周四 下午5:08写道:

> 我们目前是用standalone 模式部署的集群,请问这个job state 有关于job是否挂掉或者重启的指标?我看官方文档好像没找到。
>
> > 在 2019年3月28日,下午4:51,浪人 <1543332...@qq.com> 写道:
> >
> > 如果是使用flink集成cluster可以监控flink的job state,如果是yarn是超脱模式可以监控yarn的状态。
> >
> >
> >
> >
> > -- 原始邮件 --
> > 发件人: "cheng";
> > 发送时间: 2019年3月28日(星期四) 下午4:38
> > 收件人: "user-zh";
> >
> > 主题: Flink Job 监控
> >
> >
> >
> > 各位好!
> >   请教下各位,Flink Job 在生产上运行时,关于job运行状态的监控和告警一般是采用什么方案处理的?
> 比如监控job是否在正常运行,如果发现job 挂掉了 或者重启了 就进行告警。我这边有将一些metric 推到prometheus
> 但是好像没有发现关于job是否挂掉的metric。
> >   希望有做过这种方案的朋友能赐教下,谢谢了!!
>
>


Re: RocksDB中指定nameNode 的高可用

2019-03-27 文章 Biao Liu
Hi,
  HDFS 本身可以解决该问题,可以搜一下 “HDFS HA nameservice”,可以避免写死 name node 地址

Yun Tang  于2019年3月26日周二 下午5:29写道:

> Hi
>
> Flink高可用相关配置的存储目录,当存储路径配置成HDFS时,相关namenode高可用性由HDFS支持,对上层完全透明。
>
> 祝好
> 唐云
> 
> From: 戴嘉诚 
> Sent: Tuesday, March 26, 2019 16:57
> To: user-zh@flink.apache.org
> Subject: RocksDB中指定nameNode 的高可用
>
> 嘿,我想询问一下,flink中的RocksDB位置
> 我指定了hdfs路径,但是,这里是强指定nameNode的地址,但是我的hdfs是有个两个nameNode地址的,这里能否有个功能,当active
> nameNode挂掉了,类似hdfs的HA那样,能无缝切换nameNode地址吗?不然,当nameNode挂掉了, 我指定的flink也会一并挂掉
>
>


Re: FlinkKafkaConsumer 为什么不支持制定分区消费

2019-03-20 文章 Biao Liu
Hi,
构造函数中的 topics, topicPattern 不能满足你的需求吗?

yuqingshui  于2019年3月18日周一 下午10:33写道:

> 如题:FlinkKafkaConsumer 为什么不支持制定分区消费?