Re: [DISCUSS] Change some default config values of blocking shuffle

2022-01-04 文章
>>> documentation for the sort shuffle [1] to include a tuning guide? I am
>>> thinking of a more in depth description of what things you might observe
>>> and how to influence them with the configuration options.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/batch/blocking_shuffle/#sort-shuffle
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Dec 14, 2021 at 8:43 AM Jingsong Li 
>>> wrote:
>>>
>>>> Hi Yingjie,
>>>>
>>>> Thanks for your explanation. I have no more questions. +1
>>>>
>>>> On Tue, Dec 14, 2021 at 3:31 PM Yingjie Cao 
>>>> wrote:
>>>> >
>>>> > Hi Jingsong,
>>>> >
>>>> > Thanks for your feedback.
>>>> >
>>>> > >>> My question is, what is the maximum parallelism a job can have
>>>> with the default configuration? (Does this break out of the box)
>>>> >
>>>> > Yes, you are right, these two options are related to network memory
>>>> and framework off-heap memory. Generally, these changes will not break out
>>>> of the box experience, but for some extreme cases, for example, there are
>>>> too many ResultPartitions per task, users may need to increase network
>>>> memory to avoid "insufficient network buffer" error. For framework
>>>> off-head, I believe that user do not need to change the default value.
>>>> >
>>>> > In fact, I have a basic goal when changing these config values: when
>>>> running TPCDS of medium parallelism with the default value, all queries
>>>> must pass without any error and at the same time, the performance can be
>>>> improved. I think if we achieve this goal, most common use cases can be
>>>> covered.
>>>> >
>>>> > Currently, for the default configuration, the exclusive buffers
>>>> required at input gate side is still parallelism relevant (though since
>>>> 1.14, we can decouple the network buffer consumption from parallelism by
>>>> setting a config value, it has slight performance influence on streaming
>>>> jobs), which means that no large parallelism can be supported by the
>>>> default configuration. Roughly, I would say the default value can support
>>>> jobs of several hundreds of parallelism.
>>>> >
>>>> > >>> I do feel that this correspondence is a bit difficult to control
>>>> at the moment, and it would be best if a rough table could be provided.
>>>> >
>>>> > I think this is a good suggestion, we can provide those suggestions
>>>> in the document.
>>>> >
>>>> > Best,
>>>> > Yingjie
>>>> >
>>>> > Jingsong Li  于2021年12月14日周二 14:39写道:
>>>> >>
>>>> >> Hi  Yingjie,
>>>> >>
>>>> >> +1 for this FLIP. I'm pretty sure this will greatly improve the ease
>>>> >> of batch jobs.
>>>> >>
>>>> >> Looks like "taskmanager.memory.framework.off-heap.batch-shuffle.size"
>>>> >> and "taskmanager.network.sort-shuffle.min-buffers" are related to
>>>> >> network memory and framework.off-heap.size.
>>>> >>
>>>> >> My question is, what is the maximum parallelism a job can have with
>>>> >> the default configuration? (Does this break out of the box)
>>>> >>
>>>> >> How much network memory and framework.off-heap.size are required for
>>>> >> how much parallelism in the default configuration?
>>>> >>
>>>> >> I do feel that this correspondence is a bit difficult to control at
>>>> >> the moment, and it would be best if a rough table could be provided.
>>>> >>
>>>> >> Best,
>>>> >> Jingsong
>>>> >>
>>>> >> On Tue, Dec 14, 2021 at 2:16 PM Yingjie Cao 
>>>> wrote:
>>>> >> >
>>>> >> > Hi Jiangang,
>>>> >> >
>>>> >> > Thanks for your suggestion.
>>>> >> >
>>>> >> > >>> The config can affect the memory usage. Will the related
>>>> memory configs be changed?
>>>> >> >
>>>> >> > I think we will not change 

Re: Re: Re:Re: batch模式下任务plan的状态一直为CREATED

2021-12-22 文章
创建任务之后,生成2个plan。。。
你这里的表述是两个相连接的JobVertex吧!第二个依赖第一个,需要等第一个执行完才会执行第二个。如果是流模式的话,二者会同时执行。

RS  于2021年12月23日周四 10:30写道:

> 只有一条SQL,只是数据量比较大,使用的BATCH模式。
> SELECT price
>
> FROM hive.data.data1
>
> ORDER BY price DESC
>
>
>
> 在 2021-12-22 18:35:13,"刘建刚"  写道:
> >你的SQL是怎么写的?两个独立的SQL吗?Flink中有个参数table.dml-sync
> >,决定是否多条SQL语句顺序执行,默认是false,也就是多条语句是同时执行的。
> >
> >RS  于2021年12月22日周三 09:25写道:
> >
> >> 跑了10几个小时终于跑完了,测试发现BATCH模式下,只有Source把所有数据消费完,后面的SortLimit
> plan才会创建,和流模式不太一样
> >>
> >>
> >>
> >>
> >> 在 2021-12-21 20:06:08,"RS"  写道:
> >> >slot资源是绝对充足的,你提到的资源还涉及到其他资源吗?
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >在 2021-12-21 17:57:21,"刘建刚"  写道:
> >>
> >>
> >>固定资源的情况下,batch的调度会按照拓扑顺序执行算子。如果你的资源只够运行一个source,那么等source运行完毕后才能运行SortLimit。
> >> >>
> >> >>RS  于2021年12月21日周二 16:53写道:
> >> >>
> >> >>> hi,
> >> >>>
> >> >>> 版本:flink1.14
> >> >>>
> >> >>> 模式:batch
> >> >>>
> >> >>> 测试场景:消费hive大量数据,计算某个字段的 top 10
> >> >>>
> >> >>>
> >> >>>
> >>
> 使用sql-client测试,创建任务之后,生成2个plan,一个Source,一个SortLimit。Source状态为RUNNING,SortLimit状态一直为CREATED。
> >> >>>
> >> >>> 请问下,SortLimit状态一直为CREATED是正常现象吗?
> >> >>>
> >> >>> 数据量比较大,全部消费完的话,估计得好几天时间,BATCH模式下,SortLimit的状态需要等所有数据全部消费完才改变吗?
> >> >>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> 测试SQL:
> >> >>>
> >> >>> SELECT price
> >> >>>
> >> >>> FROM hive.data.data1
> >> >>>
> >> >>> ORDER BY price DESC
> >> >>>
> >> >>> LIMIT 10;
> >>
>


Re: 请教flink sql作业链路延迟监控如何实现

2021-12-22 文章
1、端到端的延迟可以通过latencyMarker来监控,但是可能会对性能有一定的影响。具体参考
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#end-to-end-latency-tracking
2、kafka本身的延迟,直接使用kafka的groupId的lag即可。
3、Flink处理的延迟,这个好像没有原生的,可以通过反压来查看是否有有性能问题。另外,通过1、2也可以反映延迟情况。

RS  于2021年12月23日周四 10:37写道:

> 我是直接监控kafka的lag,如果lag数值较大或持续上升,肯定就有延迟了。收到告警后,再查看下plan,有个busy指标,红色的节点就是有问题的
>
>
>
>
>
>
>
>
> 在 2021-12-23 08:36:33,"casel.chen"  写道:
> >想问一下flink sql作业链路延迟监控如何实现?
> >我们的flink
> sql作业基本上都是上游接kafka,下游sink到es/hbase/kafka/mongodb/redis/clickhouse/doris这些存储
> >想监控如下三种延迟,目前有什么办法实现吗?会有相应的metrics暴露出来吗?目前我们在用的flink版本是1.13.2
> >1. 端到端的延迟
> >2. kafka本身的延迟
> >3. flink处理的延迟
>


Re: Re:Re: batch模式下任务plan的状态一直为CREATED

2021-12-22 文章
你的SQL是怎么写的?两个独立的SQL吗?Flink中有个参数table.dml-sync
,决定是否多条SQL语句顺序执行,默认是false,也就是多条语句是同时执行的。

RS  于2021年12月22日周三 09:25写道:

> 跑了10几个小时终于跑完了,测试发现BATCH模式下,只有Source把所有数据消费完,后面的SortLimit plan才会创建,和流模式不太一样
>
>
>
>
> 在 2021-12-21 20:06:08,"RS"  写道:
> >slot资源是绝对充足的,你提到的资源还涉及到其他资源吗?
> >
> >
> >
> >
> >
> >在 2021-12-21 17:57:21,"刘建刚"  写道:
>
> >>固定资源的情况下,batch的调度会按照拓扑顺序执行算子。如果你的资源只够运行一个source,那么等source运行完毕后才能运行SortLimit。
> >>
> >>RS  于2021年12月21日周二 16:53写道:
> >>
> >>> hi,
> >>>
> >>> 版本:flink1.14
> >>>
> >>> 模式:batch
> >>>
> >>> 测试场景:消费hive大量数据,计算某个字段的 top 10
> >>>
> >>>
> >>>
> 使用sql-client测试,创建任务之后,生成2个plan,一个Source,一个SortLimit。Source状态为RUNNING,SortLimit状态一直为CREATED。
> >>>
> >>> 请问下,SortLimit状态一直为CREATED是正常现象吗?
> >>>
> >>> 数据量比较大,全部消费完的话,估计得好几天时间,BATCH模式下,SortLimit的状态需要等所有数据全部消费完才改变吗?
> >>>
> >>>
> >>>
> >>>
> >>> 测试SQL:
> >>>
> >>> SELECT price
> >>>
> >>> FROM hive.data.data1
> >>>
> >>> ORDER BY price DESC
> >>>
> >>> LIMIT 10;
>


Re: batch模式下任务plan的状态一直为CREATED

2021-12-21 文章
固定资源的情况下,batch的调度会按照拓扑顺序执行算子。如果你的资源只够运行一个source,那么等source运行完毕后才能运行SortLimit。

RS  于2021年12月21日周二 16:53写道:

> hi,
>
> 版本:flink1.14
>
> 模式:batch
>
> 测试场景:消费hive大量数据,计算某个字段的 top 10
>
>
> 使用sql-client测试,创建任务之后,生成2个plan,一个Source,一个SortLimit。Source状态为RUNNING,SortLimit状态一直为CREATED。
>
> 请问下,SortLimit状态一直为CREATED是正常现象吗?
>
> 数据量比较大,全部消费完的话,估计得好几天时间,BATCH模式下,SortLimit的状态需要等所有数据全部消费完才改变吗?
>
>
>
>
> 测试SQL:
>
> SELECT price
>
> FROM hive.data.data1
>
> ORDER BY price DESC
>
> LIMIT 10;


Re: [DISCUSS] Change some default config values of blocking shuffle

2021-12-10 文章
Glad to see the suggestion. In our test, we found that small jobs with the
changing configs can not improve the performance much just as your test. I
have some suggestions:

   - The config can affect the memory usage. Will the related memory
   configs be changed?
   - Can you share the tpcds results for different configs? Although we
   change the default values, it is helpful to change them for different
   users. In this case, the experience can help a lot.

Best,
Liu Jiangang

Yun Gao  于2021年12月10日周五 17:20写道:

> Hi Yingjie,
>
> Very thanks for drafting the FLIP and initiating the discussion!
>
> May I have a double confirmation for
> taskmanager.network.sort-shuffle.min-parallelism that
> since other frameworks like Spark have used sort-based shuffle for all the
> cases, does our
> current circumstance still have difference with them?
>
> Best,
> Yun
>
>
>
>
> --
> From:Yingjie Cao 
> Send Time:2021 Dec. 10 (Fri.) 16:17
> To:dev ; user ; user-zh <
> user-zh@flink.apache.org>
> Subject:Re: [DISCUSS] Change some default config values of blocking shuffle
>
> Hi dev & users:
>
> I have created a FLIP [1] for it, feedbacks are highly appreciated.
>
> Best,
> Yingjie
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-199%3A+Change+some+default+config+values+of+blocking+shuffle+for+better+usability
> Yingjie Cao  于2021年12月3日周五 17:02写道:
>
> Hi dev & users,
>
> We propose to change some default values of blocking shuffle to improve
> the user out-of-box experience (not influence streaming). The default
> values we want to change are as follows:
>
> 1. Data compression
> (taskmanager.network.blocking-shuffle.compression.enabled): Currently, the
> default value is 'false'.  Usually, data compression can reduce both disk
> and network IO which is good for performance. At the same time, it can save
> storage space. We propose to change the default value to true.
>
> 2. Default shuffle implementation
> (taskmanager.network.sort-shuffle.min-parallelism): Currently, the default
> value is 'Integer.MAX', which means by default, Flink jobs will always use
> hash-shuffle. In fact, for high parallelism, sort-shuffle is better for
> both stability and performance. So we propose to reduce the default value
> to a proper smaller one, for example, 128. (We tested 128, 256, 512 and
> 1024 with a tpc-ds and 128 is the best one.)
>
> 3. Read buffer of sort-shuffle
> (taskmanager.memory.framework.off-heap.batch-shuffle.size): Currently, the
> default value is '32M'. Previously, when choosing the default value, both
> ‘32M' and '64M' are OK for tests and we chose the smaller one in a cautious
> way. However, recently, it is reported in the mailing list that the default
> value is not enough which caused a buffer request timeout issue. We already
> created a ticket to improve the behavior. At the same time, we propose to
> increase this default value to '64M' which can also help.
>
> 4. Sort buffer size of sort-shuffle
> (taskmanager.network.sort-shuffle.min-buffers): Currently, the default
> value is '64' which means '64' network buffers (32k per buffer by default).
> This default value is quite modest and the performance can be influenced.
> We propose to increase this value to a larger one, for example, 512 (the
> default TM and network buffer configuration can serve more than 10 result
> partitions concurrently).
>
> We already tested these default values together with tpc-ds benchmark in a
> cluster and both the performance and stability improved a lot. These
> changes can help to improve the out-of-box experience of blocking shuffle.
> What do you think about these changes? Is there any concern? If there are
> no objections, I will make these changes soon.
>
> Best,
> Yingjie
>
>


Re: [ANNOUNCE] Open source of remote shuffle project for Flink batch data processing

2021-11-30 文章
Good work for flink's batch processing!
Remote shuffle service can resolve the container lost problem and reduce
the running time for batch jobs once failover. We have investigated the
component a lot and welcome Flink's native solution. We will try it and
help improve it.

Thanks,
Liu Jiangang

Yingjie Cao  于2021年11月30日周二 下午9:33写道:

> Hi dev & users,
>
> We are happy to announce the open source of remote shuffle project [1] for
> Flink. The project is originated in Alibaba and the main motivation is to
> improve batch data processing for both performance & stability and further
> embrace cloud native. For more features about the project, please refer to
> [1].
>
> Before going open source, the project has been used widely in production
> and it behaves well on both stability and performance. We hope you enjoy
> it. Collaborations and feedbacks are highly appreciated.
>
> Best,
> Yingjie on behalf of all contributors
>
> [1] https://github.com/flink-extended/flink-remote-shuffle
>


Re: flink on yarn 的pre_job提交失败,但是session模式可以成功

2021-11-04 文章
通过你上面的信息是看不出来的,里头的链接你可以看下详细日志
http://ark1.analysys.xyz:8088/cluster/app/application_1635998548270_0028

陈卓宇 <2572805...@qq.com.invalid> 于2021年11月4日周四 下午6:29写道:

> yarn的错误日志:
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
> initialize the cluster entrypoint YarnJobClusterEntrypoint.   at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200)
>  at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:569)
>  at
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:91)
> Caused by: java.lang.VerifyError: class
> org.apache.flink.yarn.YarnResourceManager overrides final method
> onStop.()Ljava/util/concurrent/CompletableFuture;  at
> java.lang.ClassLoader.defineClass1(Native Method)at
> java.lang.ClassLoader.defineClass(ClassLoader.java:763)  at
> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
>   at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
>  at java.net.URLClassLoader.access$100(URLClassLoader.java:73)   at
> java.net.URLClassLoader$1.run(URLClassLoader.java:368)   at
> java.net.URLClassLoader$1.run(URLClassLoader.java:362)   at
> java.security.AccessController.doPrivileged(Native Method)   at
> java.net.URLClassLoader.findClass(URLClassLoader.java:361)   at
> java.lang.ClassLoader.loadClass(ClassLoader.java:424)at
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)at
> java.lang.ClassLoader.loadClass(ClassLoader.java:357)at
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:54)
> at
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:38)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:231)
>at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
> at java.security.AccessController.doPrivileged(Native Method)   at
> javax.security.auth.Subject.doAs(Subject.java:422)   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
>at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
>  ... 2 common frames omitted
>
> Diagnostics:
> Application application_1635998548270_0028 failed 1  times
> (global limit =2; local limit is =1) due to AM Container for
> appattempt_1635998548270_0028_01 exited with  exitCode: 1
>
> For more detailed output, check
> the application  tracking page:
> http://ark1.analysys.xyz:8088/cluster/app/application_1635998548270_0028
> Then click on links to logs of each attempt.
> Diagnostics: Exception from
> container-launch.
> Container id:
> container_e391_1635998548270_0028_01_01
> Exit code: 1
> Stack trace: ExitCodeException
> exitCode=1:
> at
> org.apache.hadoop.util.Shell.runCommand(Shell.java:944)
> at
> org.apache.hadoop.util.Shell.run(Shell.java:848)
> at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1142)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:237)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:317)
>
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
>
> at
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> at
> java.lang.Thread.run(Thread.java:748)
>
>
> Container exited with a non-zero
> exit code 1
> Failing this attempt. Failing the
> application.
>
> 陈
>
>
> 


Re: How to execute multi SQL in one job

2021-10-25 文章
Thanks very much. Forgive me for the simple question. I have found the doc
in the latest code. My inner code is too old...

Jake  于2021年10月26日周二 上午11:39写道:

>
> Hi, you can do like this:
>
> ```java
>
> val statementSet = tableEnv.createStatementSet()
> val insertSqlBuffer = ListBuffer.empty[String]
>
>
> val calciteParser = new CalciteParser(SqlUtil.getSqlParserConfig
> (tableEnv.getConfig))
> sqlArr
> .foreach(item => {
> println(item)
> val itemNode = calciteParser.parse(item)
>
> itemNode match {
> case sqlSet: SqlSet => {
> configuration.setString(sqlSet.getKeyString, sqlSet.getValueString)
> }
> case _: RichSqlInsert => insertSqlBuffer += item
> case _ => {
> println(item)
> val itemResult = tableEnv.executeSql(item)
> itemResult.print()
> }
> }
> })
>
> // execute batch inserts
> if (insertSqlBuffer.size > 0) {
> insertSqlBuffer.foreach(item => {
> println("insert sql: " + item)
> statementSet.addInsertSql(item)
> })
> val explain = statementSet.explain()
> println(explain)
> statementSet.execute()
> }
>
>
> ```
>
>
> On Oct 26, 2021, at 11:27, 刘建刚  wrote:
>
> I have multi batch SQL commands separated by semicolon(;). The SQL
> commands need to be executed in order(In other cases, the SQL command may
> share sources or sinks). I want to execute them in one job. When I
> use tableEnv.executeSql(multiSQL), it will throw errors.  How can I execute
> them in one job? Thanks.
>
>
>


How to execute multi SQL in one job

2021-10-25 文章
I have multi batch SQL commands separated by semicolon(;). The SQL commands
need to be executed in order(In other cases, the SQL command may share
sources or sinks). I want to execute them in one job. When I
use tableEnv.executeSql(multiSQL), it will throw errors.  How can I execute
them in one job? Thanks.


Re: 回复:Flink的api停止方式

2021-10-11 文章
可以尝试以下两种方法:
1、达到停止条件时,通过一定方式通知外界工具,外界工具来帮忙停止作业。
2、现在RichFunction里可以拿到jobId,但是拿不到applicationId,可以看看能否修改代码获取它,比如通过环境变量。然后再调用restful
接口停止作业。

lei-tian  于2021年10月11日周一 上午9:11写道:

> 因为要在代码里面判断是否停止的条件,停止的时候还是要在代码里面停止吧。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-10-11 09:06:17,"995626544" <995626...@qq.com.INVALID> 写道:
> >hi:
> >这个借助外部脚本应该比较容易实现吧。stop后再启动。
> >
> >
> >
> >---原始邮件---
> >发件人: "lei-tian" >发送时间: 2021年10月11日(周一) 上午9:02
> >收件人: "user-zh" >主题: Flink的api停止方式
> >
> >
> >您好:
> >
> 我这边现在有个flink读hbase的程序需要每天在固定的时间段内调用接口,然后如果在指定时间以外或者处理的数据量达到一个阈值的时候停止,在第二天的重复第一天的操作。现在有个问题,就是如何在代码里面可以像UI界面那样将jobcancel掉,而且第二天可以接着第一天的处理进度接着处理剩下的数据。有个savepoint的方案,需要jobid和applicationid但是目前好像在代码里面获取不到,有人有解决思路吗?
>


Re: flink on native k8s 资源弹性扩容问题

2021-09-26 文章
这个不支持,你可以通过外部的工具来做到。比如,检测cpu到了一定程度就自动的重启作业来扩容。

赵旭晨  于2021年9月23日周四 下午9:14写道:

> 目前生产上环境作业参数0.2(cpu),5G
> 平常增量跑的时候cpu占用率不到5%,上游数据全量初始化时经常会把CPU打满
>
> 想问下:flink能否做到弹性扩容?当pod的request cpu打满时自动增加cpu,当高峰期过后处于增量阶段时再收回部分pod资源?
>
>
>
>
>


Re: Flink SQL是否支持Count Window函数?

2021-09-23 文章
这个目前还不支持,但是可以基于TVF来实现,现在已经建了一个jira了:
https://issues.apache.org/jira/browse/FLINK-24002

Caizhi Weng  于2021年9月22日周三 上午11:17写道:

> Hi!
>
> 据我所知目前暂时没有增加 count window 的打算,以后可能会在最新的 Window TVF 里添加 count window tvf。
>
> 不建议在 SQL 中自行实现 count window,因为 SQL 添加 window 较为复杂。但可以考虑先将 SQL 转为
> datastream,用 datastream 的 count window 之后再将 datastream 转回 SQL。
>
> EnvironmentSettings settings = EnvironmentSettings.newInstance().
> inStreamingMode().build();
> StreamTableEnvironment tEnv =
> StreamTableEnvironment.create(
> StreamExecutionEnvironment.getExecutionEnvironment(), settings);
> tEnv.executeSql(
> "CREATE TABLE T ( a INT, b INT, key AS abs(a) % 3, val AS abs(b) % 3 ) WITH
> ( 'connector' = 'datagen' )");
> Table table = tEnv.sqlQuery("SELECT key, val FROM T");
> DataStream dataStream = tEnv.toDataStream(table);
> DataStream> summedStream =
> dataStream
> .keyBy(row -> (int) row.getField(0))
> .countWindow(100)
> .apply(
> (WindowFunction<
> Row,
> Tuple2,
> Integer,
> GlobalWindow>)
> (key, window, input, out) -> {
> int sum = 0;
> for (Row row : input) {
> Integer field = (Integer) row.getField(1);
> if (field != null) {
> sum += field;
> }
> }
> out.collect(Tuple2.of(key, sum));
> })
> .returns(
> new TupleTypeInfo<>(
> BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO));
> Table summedTable = tEnv.fromDataStream(summedStream);
> tEnv.registerTable("S", summedTable);
> tEnv.executeSql("SELECT f0, f1 FROM S").print();
>
> casel.chen  于2021年9月17日周五 下午6:05写道:
>
> > 今天遇到一个业务场景用到count window,查了下Flink官网目前Flink SQL只支持time
> > window,问一下官方是否打算sql支持count window呢?
> > 如果没有计划的话,自己要如何实现?是否可以像flink 1.13引入的cumulate window写一个自定义窗口函数呢?谢谢!
>


Re: 通过savepoint重启任务,link消费kafka,有重复消息

2021-08-02 文章
cancel with savepoint是之前的接口了,会造成kafka数据的重复。新的stop with
savepoint则会在做savepoint的时候,不再发送数据,从而避免了重复数据,哭啼可以参考
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/savepoints/

Jim Chen  于2021年8月2日周一 下午2:33写道:

> 我是通过savepoint的方式重启的,命令如下:
>
> cancel command:
>
> /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink cancel \
> -yid application_1625497885855_698371 \
> -s
>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint
> \
> 59cf6ccc83aa163bd1e0cd3304dfe06a
>
> print savepoint:
>
>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
>
>
> restart command:
>
> /home/datadev/flink-1.12.2/flink-1.12.2/bin/flink run \
> -m yarn-cluster \
> -yjm 4096 -ytm 4096 \
> -ynm User_Click_Log_Split_All \
> -yqu syh_offline \
> -ys 2 \
> -d \
> -p 64 \
> -s
>
> hdfs://ztcluster/flink_realtime_warehouse/checkpoint/UserClickLogAll/savepoint/savepoint-59cf6c-f82cb4317494
> \
> -n \
> -c com.datacenter.etl.ods.common.mobile.UserClickLogAll \
>
> /opt/case/app/realtime/v1.0/batch/buryingpoint/paiping/all/realtime_etl-1.0-SNAPSHOT.jar
>
> Jim Chen  于2021年8月2日周一 下午2:01写道:
>
> > 大家好,我有一个flink job, 消费kafka topic A, 然后写到kafka  topic B.
> > 当我通过savepoint的方式,重启任务之后,发现topic B中有重复消费的数据。有人可以帮我解答一下吗?谢谢!
> >
> > My Versions
> > Flink 1.12.4
> > Kafka 2.0.1
> > Java 1.8
> >
> > Core code:
> >
> > env.enableCheckpointing(30);
> >
> >
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >
> >
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> >
> > DataStream dataDS = env.addSource(kafkaConsumer).map(xxx);
> >
> > tableEnv.createTemporaryView("data_table",dataDS);
> > String sql = "select * from data_table a inner join
> > hive_catalog.dim.dim.project for system_time as of a.proctime as b on
> a.id
> > = b.id"
> > Table table = tableEnv.sqlQuery(sql);
> > DataStream resultDS = tableEnv.toAppendStream(table, Row.class).map(xx);
> >
> > // Kafka producer parameter
> > Properties producerProps = new Properties();
> > producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> > bootstrapServers);
> > producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
> > producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG,
> kafkaBufferMemory);
> > producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaBatchSize);
> > producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaLingerMs);
> > producerProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 30);
> > producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
> > "1");
> > producerProps.put(ProducerConfig.RETRIES_CONFIG, "5");
> > producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
> > producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
> >
> > resultDS.addSink(new FlinkKafkaProducer(sinkTopic, new
> > JSONSchema(), producerProps, new FlinkFixedPartitioner<>(),
> > FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 5))
> > .setParallelism(sinkParallelism);
> >
>


Re: 退订

2021-07-30 文章
Send anything to  user-zh-unsubscr...@flink.apache.org

hihl  于2021年7月27日周二 下午5:50写道:

> 退订


Re: (无主题)

2021-06-21 文章
嗨,你使用的是session还是perJob的作业?哪个flink版本?有详细的日志吗?一般不退户,可能是master卡在了哪里,比如我们遇到过卡在handler或者异步执行有问题。

田磊  于2021年6月20日周日 上午11:14写道:

>
> 我用flink跑hbase的数据,flink的界面显示任务已经finished,正在running的任务为0。而yarn的界面显示正在running的状态,一直都结束不了,需要手动kill,是什么情况啊。
>
>
> | |
> totorobabyfans
> |
> |
> 邮箱:totorobabyf...@163.com
> |
>
> 签名由 网易邮箱大师 定制


Re: 流与流 left join

2021-06-14 文章
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/#interval-joins

chenchencc <1353637...@qq.com> 于2021年6月15日周二 上午9:46写道:

> 你好,谢谢哈,想问下有相关的资料或者案例能发下吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: rocksdb状态后端最多保留checkpoints问题

2021-05-28 文章
增量快照的原理是sst文件共享,系统会自动帮助你管理sst文件的引用,类似java的引用,并不会因为一个快照删除了就会把实际的数据删除掉。
也就不会发生你说的情况

tison  于2021年5月28日周五 上午1:47写道:

> rocksdb 增量 checkpoint 不是你这么理解的,总的不会恢复不了。原因可以参考下面的材料
>
> -
> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
> 官方 blog 介绍
> - https://www.bilibili.com/video/BV1db411e7x2 施博士的介绍,大概 24 分钟开始讲
>
> Best,
> tison.
>
>
> casel.chen  于2021年5月27日周四 下午11:35写道:
>
> > 作业中使用了增量式的rocksdb状态后端,请问如果再设置了最多保留checkpoints个数的话,会不会造成rocksdb
> >
> state恢复失败?例如,假设完整的state恢复需要最近10个chk,但因为设置了最多保留checkpoints个数为5的话,状态是不是恢复不了了?
>


Re: Re: flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

2021-05-28 文章
那应该是master failover后把快照信息丢失了,ha应该能解决这个问题。

董建 <62...@163.com> 于2021年5月28日周五 下午6:24写道:

> 稳定复现
> checkpoint 正常生成,在web ui和hdfs目录里边都可以确认。
> 我们jobmanager没有做ha,不知道是否是这个原因导致的?
> 日志里边能看到是从指定的-s恢复的,没有指定-s的时候,重启的时候也并没有使用最新的checkpoint文件。
> 目前这个问题困扰了我很久,也没有一个好的思路,下一步先把ha搞起来再试试。
> >> org.apache.flink.configuration.GlobalConfiguration   [] -
> Loading
> >> configuration property: execution.savepoint.path,
> >>
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-05-28 18:15:38,"刘建刚"  写道:
> >这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗?
> >1、从savepoint恢复;
> >2、作业开始定期做savepoint;
> >3、作业failover。
> >如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。
> >如果还是有问题,需要通过日志来排查了。
> >
> >董建 <62...@163.com> 于2021年5月28日周五 下午5:37写道:
> >
> >> 我遇到的问题现象是这样的
> >>
> >>
> >>
> >>
> >> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。
> >>
> >>
> >>
> >>
> >> flink run -d -s
> >>
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> >> -t yarn-per-job -m yarn-cluser -D yarn.application.name=
> >> /tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod
> >>
> >>
> >>
> >>
> >> 2、flink-conf.xml
> >>
> >>
> >>
> >>
> >> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default
> >>
> >>
> >>
> >>
> >> 3、代码checkpoint设置
> >>
> >>
> >>
> >>
> >>StreamExecutionEnvironment env =
> >> StreamExecutionEnvironment.getExecutionEnvironment();
> >>
> >>
> >>
> >>
> >>env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100,
> >> 10));
> >>
> >>
> >>
> >>
> >>CheckpointConfig checkpointConfig = env.getCheckpointConfig();
> >>
> >>
> >>
> >>
> >>
> >>
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >>
> >>
> >>
> >>
> >>env.enableCheckpointing(1 * 60 * 1000);
> >>
> >>
> >>
> >>
> >>
> >>  checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> >>
> >>
> >>
> >>
> >>checkpointConfig.setTolerableCheckpointFailureNumber(100);
> >>
> >>
> >>
> >>
> >>checkpointConfig.setCheckpointTimeout(60 * 1000);
> >>
> >>
> >>
> >>
> >>checkpointConfig.setMaxConcurrentCheckpoints(1);
> >>
> >>
> >>
> >>
> >> 4、问题现象
> >>
> >>
> >>
> >>
> >> a)运维同事切换yarn
> >>
> resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器
> >>
> >>
> >>
> >>
> >>
> >> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200
> >>
> >>
> >>
> >>
> >> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200
> >> restore,从日志中看还是从chk-100 restore的。
> >>
> >>
> >>
> >>
> >> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction
> >> sourceMilApplysLogStream = MySQLSource.builder()
> >>
> >>
> >>
> >>
> >>   重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费
> >>
> >>
> >>
> >>
> >> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?
> >>
> >>
> >>
> >>
> >> 2021-05-24 16:49:50,398 INFO
> >> org.apache.flink.configuration.GlobalConfiguration   [] -
> Loading
> >> configuration property: execution.savepoint.path,
> >>
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> >>
> >>
> >>
> >> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费
> >>
> >>
> >>
> >>
> >> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。
>


Re: flink on yarn 模式下,yarn集群的resource-manager切换导致flink应用程序重启,并且未从最后一次checkpoint恢复

2021-05-28 文章
这种情况是不符合预期的。请问通过以下步骤可以稳定复现吗?
1、从savepoint恢复;
2、作业开始定期做savepoint;
3、作业failover。
如果是的话,可能需要排查下checkpoint 文件是否存在,zookeeper上是否更新。
如果还是有问题,需要通过日志来排查了。

董建 <62...@163.com> 于2021年5月28日周五 下午5:37写道:

> 我遇到的问题现象是这样的
>
>
>
>
> 1、flink版本flink-1.12.2,启动命令如下,指定-s是因为job有做过cancel,这里重启。
>
>
>
>
> flink run -d -s
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
> -t yarn-per-job -m yarn-cluser -D yarn.application.name=
> /tmp/flink-1.0-SNAPSHOT.jar  -c com.test.myStream  --profile prod
>
>
>
>
> 2、flink-conf.xml
>
>
>
>
> state.checkpoints.dir: hdfs:///user/flink/checkpoints/default
>
>
>
>
> 3、代码checkpoint设置
>
>
>
>
>StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>
>
>
>env.setRestartStrategy(RestartStrategies.fixedDelayRestart(100,
> 10));
>
>
>
>
>CheckpointConfig checkpointConfig = env.getCheckpointConfig();
>
>
>
>
>
>  
> checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
>
>
>
>env.enableCheckpointing(1 * 60 * 1000);
>
>
>
>
>
>  checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
>
>
>
>checkpointConfig.setTolerableCheckpointFailureNumber(100);
>
>
>
>
>checkpointConfig.setCheckpointTimeout(60 * 1000);
>
>
>
>
>checkpointConfig.setMaxConcurrentCheckpoints(1);
>
>
>
>
> 4、问题现象
>
>
>
>
> a)运维同事切换yarn
> resourc-manager,我的flink任务也会重启(重启后application-id和job-id并没有改变),但是jobmanager和taskmanager更换了机器
>
>
>
>
>
> b)我的flink任务每隔1分钟做一个checkpoint,假如任务在上次重启后又运行了一段时间,checkpoint已经到了chk-200
>
>
>
>
> c)集群的同事再次切换resource-manager,这个时候预期是flink任务自动从chk-200
> restore,从日志中看还是从chk-100 restore的。
>
>
>
>
> d)任务大致逻辑是通过flink-mysql-cdc消费binlog,DebeziumSourceFunction
> sourceMilApplysLogStream = MySQLSource.builder()
>
>
>
>
>   重启导致了再次从chk-100的binlog pos开始消费,即了chk-100~chk-200之间的binlog重复消费
>
>
>
>
> e)日志中有打印如下,难道是因为上次我启动flink任务的时候用了-s,所以不会去自动找最新的checkpoint重启吗?
>
>
>
>
> 2021-05-24 16:49:50,398 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: execution.savepoint.path,
> hdfs:///user/flink/checkpoints/default/f9b85edbc6ca779b6e60414f3e3964f2/chk-100
>
>
>
> 预期是任务重启后自动从最新的checkpoint开始继续消费,即从chk-200开始消费
>
>
>
>
> 现在的问题是任务重启后总是从flink -s指定的checkpoint恢复的。


Re: 消息队列量级特别如何优化消费

2021-03-05 文章
本质原因是作业资源不足无法处理大量数据,好像只有扩大并发来解决了。


allanqinjy  于2021年3月5日周五 上午10:48写道:

>
>
> hi,
>   由于消息队列量特别大,在flink接入以后被压严重。除了修改并行度以外还有没有其他的优化方案!!!
>
>
> | |
> allanqinjy
> |
> |
> allanqi...@163.com
> |
> 签名由网易邮箱大师定制
>
>


Re: group agg 开启了mini batch之后,state ttl不生效的问题

2020-09-30 文章
修复方案参考https://github.com/apache/flink/pull/11830

kandy.wang  于2020年9月30日周三 下午2:19写道:

> group agg 开启了mini batch之后,state ttl不生效的问题:
>
>
> 现在我们发现好像 计算订单指标,写hbase,开启mini batch确实是需要的。这样可以大大降低sink
> 算子的数据量,降低hbase的写入tps,减少hbase压力。不然每来一条数据就处理一次的话,单个任务 就可以把hbase 的tps 干到 十几万。
>
>
> sql-client-defaults.yaml对应的参数应该是这2个吧:
> # minimum idle state retention in ms
> min-idle-state-retention: 0
> # maximum idle state retention in ms
> max-idle-state-retention: 0
> 这个现在进展如何了,这个社区打算什么时候支持
>
>
>
>


Re: 回复: BLinkPlanner sql join状态清理

2020-09-29 文章
miniBatch下是无法ttl的,这个是修复方案:https://github.com/apache/flink/pull/11830

Benchao Li  于2020年9月29日周二 下午5:18写道:

> Hi Ericliuk,
>
> 这应该是实现的bug,你可以去社区建一个issue描述下这个问题。
> 有时间的话也可以帮忙修复一下,没有时间社区也会有其他小伙伴帮忙来修复的~
>
> Ericliuk  于2020年9月29日周二 下午4:59写道:
>
> > 我最近也遇到了这个问题,看了下,blink,并且配置minibatch优化后就会不使用IdleStateRetention相关配置了。
> > <
> >
> http://apache-flink.147419.n8.nabble.com/file/t491/Xnip2020-09-29_16-55-32.png
> >
> >
> >
> > 不太清楚为什么用了mini batch就没读取这个配置。
> > 一个属于状态清理,一个属于agg优化,优化配置要影响到原来的状态query状态清理么?
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
>
> --
>
> Best,
> Benchao Li
>


Re: FlinkSQL是否支持设置窗口trigger实现continuious trigger呢

2020-09-28 文章
提供另外一种思路:内层是10s的翻滚窗口,外层接一个按5分钟为key的group by。为防止状态过大,可以设置ttl。简单demo如下:
SELECT *
FROM (SELECT TUMBLE_START(proctime, INTERVAL '10' SECOND) AS st, *
 FROM *
 GROUP BY TUMBLE(proctime, INTERVAL '10' SECOND)
   )
GROUP BY st / (5 * 60 * 1000)

赵一旦  于2020年9月27日周日 下午5:45写道:

> Benchao Li那个我会考虑下,主要是了解下,从datastream转过来,想结合业务看看有多少任务是sql也能实现的。
> silence这个不清楚你表达啥意思,统计需求是五分钟粒度的,不通过窗口咋搞。
> 难道是指基于ts人工计算所属窗口w,然后group by w, 其他key  这样嘛。
>
>
>
> silence  于2020年9月27日周日 下午5:37写道:
>
> > 也可以通过普通的非窗口聚合进行实现吧,minibatch设大点
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>


Re: Re:HistoryServer完成任务丢失的问题

2020-09-28 文章
修复方案为:https://issues.apache.org/jira/browse/FLINK-18959

xiao cai  于2020年9月27日周日 下午6:42写道:

> 貌似是个bug,我的版本是1.11.0
>
>
>
> https://issues.apache.org/jira/browse/FLINK-18959?jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20Bug%20AND%20text%20~%20%22history%20server%22
>
>
>  原始邮件
> 发件人: xiao cai
> 收件人: user-zh
> 发送时间: 2020年9月27日(周日) 18:41
> 主题: Re:Re:HistoryServer完成任务丢失的问题
>
>
> 貌似是个bug
>
>
>  原始邮件
> 发件人: xiao cai
> 收件人: user-zh
> 发送时间: 2020年9月27日(周日) 18:31
> 主题: Re:Re:HistoryServer完成任务丢失的问题
>
>
> 是在history server中没有,但是yarn
> logs还是可以看到的,我理解是任务结束前,jobManager没有upload文件到指定目录,所以history
> server没有拉取到。但是为何没有upload,我通过jobManager的日志也没看出来,没有任何报错。 原始邮件 发件人: Michael
> Ran 收件人: user-zh 发送时间:
> 2020年9月27日(周日) 17:06 主题: Re:Re:HistoryServer完成任务丢失的问题 你的意思是,日志彻底消失了?完全找不到?
> 不会是你任务有问题,压根就没成功,没产生吧。 理论上日志不可能平白无故消失的 在 2020-09-27 17:03:45,"xiao
> cai"  写道: >是的,默认是10s一次,但是这个是去jobmanager的归档目录里拉取的间隔。
> >问题是cancel的那次job,并没有上传日志信息到归档目录里。 > > > 原始邮件 >发件人: Michael Ran<
> greemqq...@163.com> >收件人: user-zh >发送时间:
> 2020年9月27日(周日) 16:45 >主题: Re:HistoryServer完成任务丢失的问题 > > >history
> 记得是定时拉取的,有延迟过去 在 2020-09-27 16:40:27,"xiao cai"  写道:
> >Hi: >flink 1.11.0
> >我启动了一个任务,当这个任务进行checkpoint的时候,我在webui中点击了cancel,任务在yarn中的状态时killed,但是再到history
> server中却找不到这个任务。同时我尝试了再yarn中kill
> application的方式,也是无法发现。这个问题有点随机,不是每次都会出现,有时候执行cancel,在history
> server又能看到。希望了解history serve相关原理的同学给予帮助。 >非常感谢。 > > > > >best, >xiao.


Re: 执行mvn构建错误

2020-09-25 文章
看着是mvn无法下载到某些包,你有使用过其他版本吗?如果都是相同的问题,那么应该是你本地环境或者网络环境的问题。

迟成  于2020年9月25日周五 下午1:45写道:

> 环境:
>
> tag release-1.11.2
>
> commit fe361357
>
> Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
>
> Java version: 1.8.0_251, vendor: Oracle Corporation, runtime:
> /Library/Java/JavaVirtualMachines/jdk1.8.0_251.jdk/Contents/Home/jre
>
> Default locale: zh_CN, platform encoding: UTF-8
>
> OS name: "mac os x", version: "10.15.6", arch: "x86_64", family: "mac"
>
>
>
> mvn clean
>
> [ERROR] Failed to execute goal on project flink-dist_2.11: Could not
> resolve dependencies for project
> org.apache.flink:flink-dist_2.11:jar:1.11.2: The following artifacts could
> not be resolved:
> org.apache.flink:flink-examples-streaming-state-machine_2.11:jar:1.11.2,
> org.apache.flink:flink-examples-streaming-twitter_2.11:jar:1.11.2,
> org.apache.flink:flink-fs-hadoop-shaded:jar:1.11.2,
> org.apache.flink:flink-s3-fs-base:jar:1.11.2: Could not find artifact
> org.apache.flink:flink-examples-streaming-state-machine_2.11:jar:1.11.2 in
> central (https://repo.maven.apache.org/maven2) -> [Help 1]
>
>
>
> mvn clean package -DskipTests
>
> [ERROR]
> ~/flink/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ArrayItem.java:[317,31]
> 找不到符号
>
> [ERROR]   符号:   类 AvroMissingFieldException
>
> [ERROR]   位置: 程序包 org.apache.avro
>
>
>
> 各位大佬,我看文档上说maven用3.2.5最好,是不是mvn的问题,还是其他啥配置不太对
>
>
>
>


Re: RocksDBStateBackend 问题

2020-09-07 文章
直接存在rocksdb数据库。rocksdb会首先将数据写到内存buffer中(不会太大),等buffer满了再刷到磁盘。相比filesystem的statebackend,rocksdb会因为序列化和反序列化导致处理速度慢一些,但是优势是可以利用磁盘的超大空间来存储更大的状态。

zilong xiao  于2020年9月7日周一 下午5:51写道:

> 可以看下这个文档:
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/state_backends.html#the-rocksdbstatebackend
>
> guaishushu1...@163.com  于2020年9月7日周一 下午5:47写道:
>
> > 想问下关于RocksDBStateBackend
> > 是直接把状态存在rocksdb数据库,还是等内存满了再存到RocksDB数据库。如果直接存在RocksDB数据库,那岂不是很影响数据处理速度。
> >
> >
> >
> > guaishushu1...@163.com
> >
>


Re: 请教:用flink实现实时告警的功能

2020-09-06 文章
针对规则改变,要想实时生效,有两种建议:

   1. 利用维表join的功能来join数据库中的规则,flink内部可以配置一定的缓存策略。可以查看下Lookup的实现。
   2. 也可以把规则打到kafka的表里,然后通过broadcast来广播最新的规则。


李军  于2020年9月4日周五 下午5:46写道:

> 您好!
>
>
>可以使用Flink+drools 做。drools可以实时更新规则
> 2020-9-4
> | |
> 李军
> |
> |
> hold_li...@163.com
> |
> 签名由网易邮箱大师定制
> 在2020年8月6日 10:26,samuel@ubtrobot.com 写道:
> 由于需要实时告警功能,经调研,用flink 来实现是比较合适,但有几个问题没搞清楚,请大神们指教,感谢!
>
> 告警有分两部分:
> 一是告警规则的设置,数据存放在mysql,存储的格式是json
> {"times":5}  ---就是事件发生大于5次就发出告警;
> {"temperature": 80} ---就是温度大于80就告警;
> 二是告警实现
> 1)上报的数据写入到kafka
> 2)flink读取kafka的数据,然后通过翻滚窗口进行计算,如果满足规则就生产告警。
>
>
> 现在遇到的问题是:
> 1. 当规则变更时,如何及时生效?
> 2.如果用flink CEP来是实现,对于同一数据源,能否在一个作业里让多个规则同时生效?
> 3.这一功能有最佳实践吗?
>
> 希望哪位解答一下,谢谢!
>
>
>
>


How to visit outer service in batch for sql

2020-08-26 文章
  For API, we can visit outer service in batch through countWindow,
such as the following. We can visit outer service every 1000 records. If we
visit outer service every record, it will be very slow for our job.

source.keyBy(new KeySelector())
.countWindow(1000)
.apply((WindowFunction)
(s, globalWindow, values, collector) -> {
List resultList = service.visit(values);
for (MyType result: resultList) {
if (result.ok) {
collector.collect(result);
}
}
});

  But how can I write SQL to implement the batch logic? I can use udf
to visit outer service. Currently, Flink only support time window but not
count window. I also check the udf wiki but find it hard to batch records.
  Any suggestion is welcome. Thank you.


Re: flinksql如何控制结果输出的频率

2020-04-15 文章
感谢 Benchao,问题应解决了!

> 2020年4月15日 下午3:38,Benchao Li  写道:
> 
> Hi 建刚,
> 
> 现在Emit的原理是这样子的:
> - 当某个key下面来了第一条数据的时候,注册一个emit delay之后的处理时间定时器;
> - 当定时器到了的时候,
>   - 检查当前的key下的聚合结果跟上次输出的结果是否有变化,
>  - 如果有变化,就发送-[old], +[new] 两条结果到下游;
>  - 如果是没有变化,则不做任何处理;
>   - 再次注册一个新的emit delay之后的处理时间定时器。
> 
> 你可以根据这个原理,再对照下你的数据,看看是否符合预期。
> 
> 刘建刚 mailto:liujiangangp...@gmail.com>> 
> 于2020年4月15日周三 下午3:32写道:
> 我们也经常有固定窗口定期触发的需求,但是添加以下参数并没有得到预期的效果(10秒的窗口,期待每秒都输出结果),是我的使用方法不对还是其他问题呢?多谢各位,下面是伪代码:
> 
> public class EarlyEmitter {
>public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   env.setParallelism(1);
> 
>   EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().useBlink
>   StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
> settings);
> 
>   
> tEnv.getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled",
>  true);
>   
> tEnv.getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay",
>  "1000 ms");
> 
>   Table table = tEnv.fromDataStream(
> env.addSource(new SourceData()), "generate_time, name, city, id, 
> event_time.proctime");
>   tEnv.createTemporaryView("person", table);
> 
>   String emit =
> "SELECT name, COUNT(DISTINCT id)" +
>   "FROM person " +
>   "GROUP BY TUMBLE(event_time, interval '10' second), name";
> 
>   Table result = tEnv.sqlQuery(emit);
>   tEnv.toRetractStream(result, Row.class).print();
> 
>   env.execute("IncrementalGrouping");
>}
> 
>private static final class SourceData implements 
> SourceFunction> {
>   @Override
>   public void run(SourceContext> ctx) 
> throws Exception {
>  while (true) {
> long time = System.currentTimeMillis();
> ctx.collect(Tuple4.of(time, "flink", "bj", 1L));
>  }
>   }
> 
>   @Override
>   public void cancel() {
> 
>   }
>}
> }
> 
> 
> 
> 
>> 2020年3月27日 下午3:23,Benchao Li > <mailto:libenc...@gmail.com>> 写道:
>> 
>> Hi,
>> 
>> 对于第二个场景,可以尝试一下fast emit:
>> table.exec.emit.early-fire.enabled = true
>> table.exec.emit.early-fire.delay = 5min
>> 
>> PS:
>> 1. 这个feature并没有在官方文档里面写出来,目前应该是一个实验性质的feature
>> 2. window加了emit之后,会由原来输出append结果变成输出retract结果
>> 
>> Jingsong Li mailto:jingsongl...@gmail.com>> 
>> 于2020年3月27日周五 下午2:51写道:
>> 
>>> Hi,
>>> 
>>> For #1:
>>> 创建级联的两级window:
>>> - 1分钟窗口
>>> - 5分钟窗口,计算只是保存数据,发送明细数据结果
>>> 
>>> Best,
>>> Jingsong Lee
>>> 
>> 
>> 
>> -- 
>> 
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com <mailto:libenc...@gmail.com>; 
>> libenc...@pku.edu.cn <mailto:libenc...@pku.edu.cn>
> 
> 
> 
> -- 
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com <mailto:libenc...@gmail.com>; libenc...@pku.edu.cn 
> <mailto:libenc...@pku.edu.cn>


Re: flinksql如何控制结果输出的频率

2020-04-15 文章
我们也经常有固定窗口定期触发的需求,但是添加以下参数并没有得到预期的效果(10秒的窗口,期待每秒都输出结果),是我的使用方法不对还是其他问题呢?多谢各位,下面是伪代码:

public class EarlyEmitter {
   public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
  env.setParallelism(1);

  EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlink
  StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
settings);

  
tEnv.getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled",
 true);
  
tEnv.getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay",
 "1000 ms");

  Table table = tEnv.fromDataStream(
env.addSource(new SourceData()), "generate_time, name, city, id, 
event_time.proctime");
  tEnv.createTemporaryView("person", table);

  String emit =
"SELECT name, COUNT(DISTINCT id)" +
  "FROM person " +
  "GROUP BY TUMBLE(event_time, interval '10' second), name";

  Table result = tEnv.sqlQuery(emit);
  tEnv.toRetractStream(result, Row.class).print();

  env.execute("IncrementalGrouping");
   }

   private static final class SourceData implements SourceFunction> {
  @Override
  public void run(SourceContext> ctx) 
throws Exception {
 while (true) {
long time = System.currentTimeMillis();
ctx.collect(Tuple4.of(time, "flink", "bj", 1L));
 }
  }

  @Override
  public void cancel() {

  }
   }
}




> 2020年3月27日 下午3:23,Benchao Li  写道:
> 
> Hi,
> 
> 对于第二个场景,可以尝试一下fast emit:
> table.exec.emit.early-fire.enabled = true
> table.exec.emit.early-fire.delay = 5min
> 
> PS:
> 1. 这个feature并没有在官方文档里面写出来,目前应该是一个实验性质的feature
> 2. window加了emit之后,会由原来输出append结果变成输出retract结果
> 
> Jingsong Li  于2020年3月27日周五 下午2:51写道:
> 
>> Hi,
>> 
>> For #1:
>> 创建级联的两级window:
>> - 1分钟窗口
>> - 5分钟窗口,计算只是保存数据,发送明细数据结果
>> 
>> Best,
>> Jingsong Lee
>> 
> 
> 
> -- 
> 
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn



Re: java.lang.StackOverflowError

2020-01-22 文章
多谢,已经找到解决的issue了:https://issues.apache.org/jira/browse/FLINK-10367 
<https://issues.apache.org/jira/browse/FLINK-10367>

> 2020年1月22日 下午4:48,zhisheng  写道:
> 
> 1、建议问题别同时发到三个邮件去
> 2、找找还有没有更加明显的异常日志
> 
> 刘建刚  于2020年1月22日周三 上午10:25写道:
> 
>> I am using flink 1.6.2 on yarn. State backend is rocksdb.
>> 
>>> 2020年1月22日 上午10:15,刘建刚  写道:
>>> 
>>>  I have a flink job which fails occasionally. I am eager to avoid
>> this problem. Can anyone help me? The error stacktrace is as following:
>>> java.io.IOException: java.lang.StackOverflowError
>>>  at org.apache.flink.runtime.io
>> .network.partition.consumer.InputChannel.checkError(InputChannel.java:191)
>>>  at org.apache.flink.runtime.io
>> .network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:194)
>>>  at org.apache.flink.runtime.io
>> .network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:589)
>>>  at org.apache.flink.runtime.io
>> .network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:546)
>>>  at
>> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:175)
>>>  at org.apache.flink.streaming.runtime.io
>> .StreamInputProcessor.processInput(StreamInputProcessor.java:236)
>>>  at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>>>  at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:335)
>>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:754)
>>>  at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.StackOverflowError
>>>  at org.apache.flink.runtime.io
>> .network.partition.consumer.SingleInputGate.notifyChannelNonEmpty(SingleInputGate.java:656)
>>>  at org.apache.flink.runtime.io
>> .network.partition.consumer.InputChannel.notifyChannelNonEmpty(InputChannel.java:125)
>>>  at org.apache.flink.runtime.io
>> .network.partition.consumer.InputChannel.setError(InputChannel.java:203)
>>>  at org.apache.flink.runtime.io
>> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:403)
>>>  at org.apache.flink.runtime.io
>> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
>>>  at org.apache.flink.runtime.io
>> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
>>>  at
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
>>>  at
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
>>>  at org.apache.flink.runtime.io
>> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
>>>  at org.apache.flink.runtime.io
>> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
>>>  at org.apache.flink.runtime.io
>> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
>>>  at org.apache.flink.runtime.io
>> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
>>>  at
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
>>>  at
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
>>>  at org.apache.flink.runtime.io
>> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
>>>  at org.apache.flink.runtime.io
>> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
>>>  at org.apache.flink.runtime.io
>> .network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
>>>  at org.apache.flink.runtime.io
>> .network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
>>>  at
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
>>>  at
>> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
>>>  at org.apache.flink.runtime.io
>> .network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
>>>  at org.apache.flink.runtime.io
>> .network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputCha

Re: java.lang.StackOverflowError

2020-01-21 文章
I am using flink 1.6.2 on yarn. State backend is rocksdb. 

> 2020年1月22日 上午10:15,刘建刚  写道:
> 
>   I have a flink job which fails occasionally. I am eager to avoid this 
> problem. Can anyone help me? The error stacktrace is as following:
> java.io.IOException: java.lang.StackOverflowError
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.InputChannel.checkError(InputChannel.java:191)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:194)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:589)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:546)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:175)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:236)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:335)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:754)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.StackOverflowError
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.notifyChannelNonEmpty(SingleInputGate.java:656)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.InputChannel.notifyChannelNonEmpty(InputChannel.java:125)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.InputChannel.setError(InputChannel.java:203)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:403)
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
>   at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
>   at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
>   at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
>  

java.lang.StackOverflowError

2020-01-21 文章
  I have a flink job which fails occasionally. I am eager to avoid this
problem. Can anyone help me? The error stacktrace is as following:

java.io.IOException: java.lang.StackOverflowError
at 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.checkError(InputChannel.java:191)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.getNextBuffer(RemoteInputChannel.java:194)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:589)
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:546)
at 
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:175)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:236)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:335)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:754)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.StackOverflowError
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.notifyChannelNonEmpty(SingleInputGate.java:656)
at 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.notifyChannelNonEmpty(InputChannel.java:125)
at 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.setError(InputChannel.java:203)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:403)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)
at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)
at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.notifyBufferAvailable(RemoteInputChannel.java:380)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:282)
at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)
at 

Re: Fail to deploy flink on k8s in minikube

2020-01-12 文章
Thank you for your help.

Yang Wang  于2020年1月13日周一 下午12:53写道:

> Hi, Jiangang
>
> Glad to hear that you are looking to run Flink on Kubernetes.
>
> It just because you are using the new Kubernetes version.The
> extensions/v1beta1
> has been removed since v1.16. Please use apps/v1 instead. The apps/v1 is
> introduced
> from v1.9.0. I will create a ticket fix the documentation.
>
> Before release-1.10, you could use standalone per-job[1] or standalone
> session[2] cluster on
> K8s. There are some existing K8s operators to manage the application
> lifecycle(e.g. google flink-on-k8s-operator[3],
> lyft flink-k8s-operator[4]).
>
> Running Flink native on K8s is supported from 1.10. You could find it here
> [5]. It aims at to make
> Flink users more convenient to deploy Flink workloads on K8s cluster.
> However, we only support
> session cluster now. The per-job mode is in development.
>
> [1]
> https://github.com/apache/flink/blob/release-1.9/flink-container/kubernetes/README.md#deploy-flink-job-cluster
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html#flink-session-cluster-on-kubernetes
>
> http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/
> [3] https://github.com/GoogleCloudPlatform/flink-on-k8s-operator
> [4] https://github.com/lyft/flinkk8soperator
> [5]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/native_kubernetes.html
>
> Best,
> Yang
>
> 刘建刚  于2020年1月13日周一 下午12:14写道:
>
>>   I fail to deploy flink on k8s referring to
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html
>>   When I run the command 'kubectl create -f
>> jobmanager-deployment.yaml', following error is reported:
>> [image: image.png]
>>   I am new to k8s. Our team want to deploy flink on k8s. Can anyone
>> help me resolve this issue? Can anyone give me some tutorial about k8s and
>> flink in product? Thank you very much.
>>
>


Fail to deploy flink on k8s in minikube

2020-01-12 文章
  I fail to deploy flink on k8s referring to
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/kubernetes.html
  When I run the command 'kubectl create -f jobmanager-deployment.yaml',
following error is reported:
[image: image.png]
  I am new to k8s. Our team want to deploy flink on k8s. Can anyone
help me resolve this issue? Can anyone give me some tutorial about k8s and
flink in product? Thank you very much.


How to get kafka record's timestamp in job

2019-12-31 文章
  In kafka010, ConsumerRecord has a field named timestamp. It is
encapsulated
in Kafka010Fetcher. How can I get the timestamp when I write a flink job?
Thank you very much.


Re: Error:java: 无效的标记: --add-exports=java.base/sun.net.util=ALL-UNNAMED

2019-11-21 文章
Thank you very much. It works for me.

> 在 2019年11月14日,下午1:06,Biao Liu  写道:
> 
> Hi,
> 
> I have encountered the same issue when setting up a dev environment.
> 
> It seems that the my Intellij (2019.2.1) unexpectedly activates java11
> profile of maven. It doesn't match the Java compiler (JDK8). I'm not sure
> why it happened silently.
> 
> So for me, the solution is "Intellij" -> "View" -> "Tool Windows" ->
> "Maven" -> "Profiles" -> uncheck the "java11" -> reimport maven project.
> 
> Thanks,
> Biao /'bɪ.aʊ/
> 
> 
> 
> On Mon, 4 Nov 2019 at 18:01, OpenInx  wrote:
> 
>> Hi
>> I met the same problem before. After some digging,  I find that the idea
>> will detect the JDK version
>> and choose whether to use the jdk11 option to run the flink maven building.
>> if you are in jdk11 env,  then
>> it will add the option --add-exports when maven building in IDEA.
>> 
>> For my case,  I was in IntelliJIdea2019.2 which depends on the jdk11, and
>> once I re-import the flink
>> modules then the IDEA will add the --add-exports flag even if  I removed
>> all the flags in .idea/compile.xml
>> explicitly.  I noticed that the Intellij's JDK affected the flink maven
>> building, so I turned to use the Intellij with JDK8
>> bundled,  then the problem was gone.
>> 
>> You can verify it, and if  it's really the same. can just replace your IDEA
>> with the pkg suffix with "with bundled JBR 8" in
>> here [1].
>> Say if you are using MacOS, then should download the package "2019.2.4 for
>> macOS with bundled JBR 8 (dmg)"
>> 
>> Hope it works for you
>> Thanks.
>> 
>> [1]. https://www.jetbrains.com/idea/download/other.html
>> 
>> 
>> On Mon, Nov 4, 2019 at 5:44 PM Till Rohrmann  wrote:
>> 
>>> Try to reimport that maven project. This should resolve this issue.
>>> 
>>> Cheers,
>>> Till
>>> 
>>> On Mon, Nov 4, 2019 at 10:34 AM 刘建刚  wrote:
>>> 
>>>>  Hi, I am using flink 1.9 in idea. But when I run a unit test in
>>> idea.
>>>> The idea reports the following error:"Error:java: 无效的标记:
>>>> --add-exports=java.base/sun.net.util=ALL-UNNAMED".
>>>>  Everything is ok when I use flink 1.6. I am using jdk 1.8. Is it
>>>> related to the java version?
>>>> 
>>> 
>> 



Re: How to estimate the memory size of flink state

2019-11-20 文章
  Thank you. Your suggestion is good and I benefit a lot. For my case, I 
want to know the state memory size for other reasons. 
  When the the gc pressure is bigger, I need to limit the source or discard 
some data from the source to ensure job’s running. If the state size is bigger, 
I need to discard data. If the state size is not bigger, I need to limit the 
source.  The state size shows the resident memory. For event time, discarding 
data can reduce memory usage.
  Could you please give me some suggestions? 

> 在 2019年11月20日,下午3:16,sysukelee  写道:
> 
> Hi Liu,
> We monitor the jvm used/max heap memory to determine whether to rescale the 
> job.
> To avoid problems caused by oom, you don't need to know exactly how much 
> memory exactly used by state. 
> Focusing on jvm memory use is more reasonable.
>  
> <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=sysukelee=sysukelee%40gmail.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png=%5B%22sysukelee%40gmail.com%22%5D>
> On 11/20/2019 15:08,刘建刚 
> <mailto:liujiangangp...@gmail.com> wrote: 
> We are using flink 1.6.2. For filesystem backend, we want to monitor
> the state size in memory. Once the state size becomes bigger, we can get
> noticed and take measures such as rescaling the job, or the job may fail
> because of the memory.
> We have tried to get the memory usage for the jvm, like gc throughput.
> For our case, state can vary greatly at the peak. So maybe I can refer to
> the state memory size.
> I checked the metrics and code, but didn't find any information about
> the state memory size. I can get the checkpoint size, but they are
> serialized result that can not reflect the running state in memory.  Can
> anyone give me some suggestions? Thank you very much.



How to estimate the memory size of flink state

2019-11-19 文章
  We are using flink 1.6.2. For filesystem backend, we want to monitor
the state size in memory. Once the state size becomes bigger, we can get
noticed and take measures such as rescaling the job, or the job may fail
because of the memory.
  We have tried to get the memory usage for the jvm, like gc throughput.
For our case, state can vary greatly at the peak. So maybe I can refer to
the state memory size.
  I checked the metrics and code, but didn't find any information about
the state memory size. I can get the checkpoint size, but they are
serialized result that can not reflect the running state in memory.  Can
anyone give me some suggestions? Thank you very much.


Error:java: 无效的标记: --add-exports=java.base/sun.net.util=ALL-UNNAMED

2019-11-04 文章
  Hi, I am using flink 1.9 in idea. But when I run a unit test in idea.
The idea reports the following error:"Error:java: 无效的标记:
--add-exports=java.base/sun.net.util=ALL-UNNAMED".
  Everything is ok when I use flink 1.6. I am using jdk 1.8. Is it
related to the java version?


How to use two continuously window with EventTime in sql

2019-10-29 文章
  For one sql window, I can register table with event time and use time
field in the tumble window. But if I want to use the result for the first
window and use another window to process it, how can I do it? Thank you.


Uncertain result when using group by in stream sql

2019-09-13 文章
  I use flink stream sql to write a demo about "group by".  The records
are [(bj, 1), (bj, 3), (bj, 5)]. I group by the first element and sum the
second element.
  Every time I run the program, the result is different. It seems that
the records are out of order. Even sometimes record is lost. I am confused
about that.
  The code is as below:

public class Test {
   public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
  StreamTableEnvironment tEnv =
StreamTableEnvironment.getTableEnvironment(env);

  DataStream> dataStream = env.fromElements(
Tuple2.of("bj", 1L),
Tuple2.of("bj", 3L),
Tuple2.of("bj", 5L));
  tEnv.registerDataStream("person", dataStream);

  String sql = "select f0, sum(f1) from person group by f0";
  Table table = tEnv.sqlQuery(sql);
  tEnv.toRetractStream(table, Row.class).print();

  env.execute();
   }
}

  The results may be as below:
1> (true,bj,1)
1> (false,bj,1)
1> (true,bj,4)
1> (false,bj,4)
1> (true,bj,9)

1> (true,bj,5)
1> (false,bj,5)
1> (true,bj,8)
1> (false,bj,8)
1> (true,bj,9)


How to implement grouping set in stream

2019-09-10 文章
  I want to implement grouping set in stream. I am new to flink sql. I
want to find a example to teach me how to self define rule and
implement corresponding operator. Can anyone give me any suggestion?


How to calculate one day's uv every minute by SQL

2019-09-04 文章
  We want to calculate one day's uv and show the result every minute .
We have implemented this by java code:

  dataStream.keyBy(dimension)
.incrementWindow(Time.days(1), Time.minutes(1))
.uv(userId)

  The input data is big. So we use ValueState to store all the
distinct userIds from 00:00:00 to last minute. For current minute, we union
the minute's data with ValueState to obtain a new
ValueState and output the current uv.
  The problem is how to translate the java code to sql? We expect the
sql to be like this:

   select incrementWindow_end, dimension, distinct(userId) from table
group by incrementWindow(Time.days(1), Time.minutes(1)), dimension

  Anyone can give me some suggestions? Thank you very much.


Re: How to load udf jars in flink program

2019-08-15 文章
已经能够运行了,非常感谢。能解释下原理吗?我发现只用-yt,也是可以成功运行的。

苏 欣  于2019年8月15日周四 下午5:54写道:

> 我们是这么做的,你们可以试一下。用-yt指定jar所在的目录,-C将jar添加进classpath
>
> 例如:flink run -m yarn-cluster -yt /external/libs  -C
> file:///external/libs/func1.jar -C file:///external/libs/func2.jar
>
>
>
> 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
>
>
>
> ____
> 发件人: 刘建刚 
> 发送时间: Thursday, August 15, 2019 5:31:33 PM
> 收件人: user-zh@flink.apache.org ;
> u...@flink.apache.org ; d...@flink.apache.org <
> d...@flink.apache.org>
> 主题: How to load udf jars in flink program
>
>   We are using per-job to load udf jar when start job. Our jar file is
> in another path but not flink's lib path. In the main function, we use
> classLoader to load the jar file by the jar path. But it reports the
> following error when job starts running.
>   If the jar file is in lib, everything is ok. But our udf jar file is
> managed in a special path. How can I load udf jars in flink program with
> only giving the jar path?
>
> org.apache.flink.api.common.InvalidProgramException: Table program
> cannot be compiled. This is a bug. Please file an issue.
> at
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
> at
> org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35)
> at
> org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:723)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.codehaus.commons.compiler.CompileException: Line 5,
> Column 1: Cannot determine simple type name "com"
> at
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11877)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6758)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6519)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6532)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6532)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6532)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6532)
> at
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6498)
> at
> org.codehaus.janino.UnitCompiler.access$14000(UnitCompiler.java:218)
> at
> org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(UnitCompiler.java:6405)
> at
> org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(UnitCompiler.java:6400)
> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3983)
> at
> org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:6400)
> at
> org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:6393)
> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3982)
> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6393)
> at
> org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:218)
> at
> org.codehaus.janino.UnitCompiler$25.getType(UnitCompiler.java:8206)
> at
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6798)
> at
> org.codehaus.janino.UnitCompiler.access$14500(UnitCompiler.java:218)
> at
> org.codehaus.janino.UnitCompiler$22$2$1.visitFieldAccess(UnitCompiler.java:6423)
> at
> org.codehaus.janino.UnitCompiler$22$2$1.visitFieldAccess(UnitCompiler.java:6418)
> at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4365)
> at
> org.codehaus.janino.UnitCompiler$22$2.visitLvalue(UnitCompiler.java:6418)
> at
> org.codehaus.janino.UnitCompiler$22$2.visitLvalue(UnitCompiler.java:6414)
> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4203)
> at
> org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java:6414)
> at
> org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java:6393)
> at org.codehaus.janino.Java$Rvalue.accept(Java.ja

Re: How to load udf jars in flink program

2019-08-15 文章
/external/libs 是指启动flink程序所在机器的目录吗?你们在程序里也是通过classLoader加载的jar包然后注册udf吗?

 我刚才简单试了一下,将jar包放在/tmp下,会报下面的问题:
org.apache.flink.client.deployment.ClusterDeploymentException: Could not
deploy Yarn job cluster.
at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:241)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)
Caused by: java.nio.file.AccessDeniedException:
/tmp/1cc4b2d5-9f17-4d75-b598-a0f819906b9e_resources/web_server
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:84)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at
sun.nio.fs.UnixFileSystemProvider.newDirectoryStream(UnixFileSystemProvider.java:427)
at java.nio.file.Files.newDirectoryStream(Files.java:457)
at java.nio.file.FileTreeWalker.visit(FileTreeWalker.java:300)
at java.nio.file.FileTreeWalker.next(FileTreeWalker.java:372)
at java.nio.file.Files.walkFileTree(Files.java:2706)
at java.nio.file.Files.walkFileTree(Files.java:2742)
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.uploadAndRegisterFiles(AbstractYarnClusterDescriptor.java:1218)
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:815)
at
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:553)
at
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
... 9 more

苏 欣  于2019年8月15日周四 下午5:54写道:

> 我们是这么做的,你们可以试一下。用-yt指定jar所在的目录,-C将jar添加进classpath
>
> 例如:flink run -m yarn-cluster -yt /external/libs  -C
> file:///external/libs/func1.jar -C file:///external/libs/func2.jar
>
>
>
> 发送自 Windows 10 版邮件<https://go.microsoft.com/fwlink/?LinkId=550986>应用
>
>
>
> ____
> 发件人: 刘建刚 
> 发送时间: Thursday, August 15, 2019 5:31:33 PM
> 收件人: user-zh@flink.apache.org ;
> u...@flink.apache.org ; d...@flink.apache.org <
> d...@flink.apache.org>
> 主题: How to load udf jars in flink program
>
>   We are using per-job to load udf jar when start job. Our jar file is
> in another path but not flink's lib path. In the main function, we use
> classLoader to load the jar file by the jar path. But it reports the
> following error when job starts running.
>   If the jar file is in lib, everything is ok. But our udf jar file is
> managed in a special path. How can I load udf jars in flink program with
> only giving the jar path?
>
> org.apache.flink.api.common.InvalidProgramException: Table program
> cannot be compiled. This is a bug. Please file an issue.
> at
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
> at
> org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35)
> at
> org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:723)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.codehaus.commons.compiler.CompileException: Line 5,
> Column 1: Cannot determine simple type name "com"
> at
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11877)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6758)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6519)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6532)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6532)
> at
> org.codehaus.j

How to load udf jars in flink program

2019-08-15 文章
  We are using per-job to load udf jar when start job. Our jar file is
in another path but not flink's lib path. In the main function, we use
classLoader to load the jar file by the jar path. But it reports the
following error when job starts running.
  If the jar file is in lib, everything is ok. But our udf jar file is
managed in a special path. How can I load udf jars in flink program with
only giving the jar path?

org.apache.flink.api.common.InvalidProgramException: Table program
cannot be compiled. This is a bug. Please file an issue.
at 
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
at 
org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35)
at 
org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:723)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.codehaus.commons.compiler.CompileException: Line 5,
Column 1: Cannot determine simple type name "com"
at 
org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11877)
at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6758)
at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6519)
at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6532)
at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6532)
at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6532)
at 
org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6532)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6498)
at org.codehaus.janino.UnitCompiler.access$14000(UnitCompiler.java:218)
at 
org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(UnitCompiler.java:6405)
at 
org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(UnitCompiler.java:6400)
at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3983)
at org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:6400)
at org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:6393)
at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3982)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6393)
at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:218)
at org.codehaus.janino.UnitCompiler$25.getType(UnitCompiler.java:8206)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6798)
at org.codehaus.janino.UnitCompiler.access$14500(UnitCompiler.java:218)
at 
org.codehaus.janino.UnitCompiler$22$2$1.visitFieldAccess(UnitCompiler.java:6423)
at 
org.codehaus.janino.UnitCompiler$22$2$1.visitFieldAccess(UnitCompiler.java:6418)
at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4365)
at 
org.codehaus.janino.UnitCompiler$22$2.visitLvalue(UnitCompiler.java:6418)
at 
org.codehaus.janino.UnitCompiler$22$2.visitLvalue(UnitCompiler.java:6414)
at org.codehaus.janino.Java$Lvalue.accept(Java.java:4203)
at 
org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java:6414)
at 
org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java:6393)
at org.codehaus.janino.Java$Rvalue.accept(Java.java:4171)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6393)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6780)
at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:218)
at 
org.codehaus.janino.UnitCompiler$22$2$1.visitAmbiguousName(UnitCompiler.java:6421)
at 
org.codehaus.janino.UnitCompiler$22$2$1.visitAmbiguousName(UnitCompiler.java:6418)
at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4279)
at 
org.codehaus.janino.UnitCompiler$22$2.visitLvalue(UnitCompiler.java:6418)
at 
org.codehaus.janino.UnitCompiler$22$2.visitLvalue(UnitCompiler.java:6414)
at org.codehaus.janino.Java$Lvalue.accept(Java.java:4203)
at 
org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java:6414)
at 
org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java:6393)
at org.codehaus.janino.Java$Rvalue.accept(Java.java:4171)
at 

Re: 维表如何实现动态查询

2019-07-07 文章
可以设置多长时间加载一次最新的维表,比如每1分钟加载一次。

> 在 2019年7月3日,下午12:12,雒正林  写道:
> 
>   维表(mysql) 是动态变化的,与流表join 时,维表一直是第一次查询到的数据,后面维表变化的数据,在join时,查询不到。
>