flink本身不提供cancel job后清理zookeeper上残留znode的功能或机制,包括hdfs上的部分数据,如果想清除的话,可手动操作或者自实现。
在 2020-06-28 09:12:41,"林恬" 写道:
>各位好:
> 目前我使用的是Flink 1.9.2, HA使用ZK, 使用过程中发现ZK上的/leader/${job_id}
>节点即使作业被Cancel了也不会被清理,导致运行久了之后,/leader/下有大量job_id的空ZNode,请问这块清理时机是什么时候呢?或者说这个没被清理的行为是否是1.9.2的bug呢?
>
>
Hi kevin,
If you mean to add annotations for Flink native K8s session pods, you could
use "kubernetes.jobmanager.annotations"
and "kubernetes.taskmanager.annotations"[1]. However, they are only
supported from release-1.11. Maybe you could
wait for a little bit more time, 1.11 will be released
问题1
./bin/flink run -m
yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254
当yarn application -kill application_1567067657620_0254后,
在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
问题2
./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
Flink CLI是把-C的参数apply到了client端生成的JobGraph里,然后提交JobGraph来运行的
使用Rest方式提交,目前确实不支持针对单个Job设置classpath,我觉得这是一个合理的需求,可以提个JIRA
目前work around的办法只能是配置到cluster的configuration里面,在启动session的时候使用-C/--classpath
或者-D pipeline.classpaths=xxx,yyy,这样所有的job都会把它们增加到classpath里了
Best,
Yang
chenxuying 于2020年6月24日周三
flink??1.8.0
??flink table
??
tEnv.registerDataStream("t_data",dataStream,"f1-1");
??
org.apache.flink.table.api.TableException:
Field reference expression expected.
Hi,
能具体说明下你的场景和需求么? 数据源是什么,源数据中是否原先就有c4这一列呢,还是新增加了 c4 这一列呢?
其次,这个问题要看你的 connector 是什么。 有的 connector 是根据列名来映射的(如
JSON,各种数据库),有的是根据列名顺序来映射的(如 CSV)。
如果是按列名来映射的,那么在 Flink SQL DDL 中,新增 c4 一列就能读取到 c4 的值,不管c4 在哪个字段之后。
Best,
Jark
On Sat, 27 Jun 2020 at 17:26, 忝忝向仧 <153488...@qq.com> wrote:
>
我补充一下,checkpoint的UI截图如下:
https://imgchr.com/i/NgCUgS
https://imgchr.com/i/NgChDJ
https://imgchr.com/i/NgCT4x
>
--
**
tivanli
**
Hi Tianwang Li,
偶尔一两天出现 Checkpoint 超时,看下你的任务中,是否可能存在某类 key 在这一两天会突然增多的情况。
Best,
LakeShen
zhisheng 于2020年6月28日周日 上午10:27写道:
> hi, Tianwang Li
>
> 看到有三个图片挂了,可以试着把图片上传到第三方的图床,然后贴个链接过来,另外:
>
> > 任务经常会出现反压(特别是在窗口输出的时候)
>
> 这个检查一下窗口下游算子的情况,比如是不是窗口输出的数据过多,而 sink 的并发还和之前的保持一致,导致处理速度跟不上,从而导致的反压。
>
>
>
hi,guanyq
你这种提交方式属于 Flink On YARN 的 per job 模式,机制是这样的,当新提一个作业的时候,AppID 是会变化的。
Best!
zhisheng
Yangze Guo 于2020年6月28日周日 上午9:59写道:
> 我理解你需要使用session模式,即./bin/yarn-session.sh [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html#flink-yarn-session
>
>
Hi, guanyq.
关于问题1:在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
这个appid的自增策略并不是根据Flink负责生成,如果有必要,你可以对hadoop-yarn进行调研,并做出你的结论。
关于问题2 ./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
我是否可以理解为,flink
yarn-session模式的集群更适合你的作业需求呢?因为在问题中提到的提交方式为per-job,job关闭后,Flink即关闭集群。
可参考:
Hi Aissa,
Flink CEP is an api that processes multi-event matching with a pattern,
like (START MIDDLE+ END).
If you can calculate the "sensor_status" by one record, I think Flink
DataStream API / Table & SQL API
could satisfy your requirement already.
Aissa Elaffani 于2020年6月25日周四 下午11:35写道:
>
欢迎使用后来反馈一下~
On Thu, 25 Jun 2020 at 14:55, gaolanf...@hotmail.com
wrote:
> Hello
>
> 非常感谢!
>
> 查了您给的资料
> Canal支持MySQL,
> Debezuim支持 MySQL, PostgreSQL, Oracle等,
>
>不过Debezuim没用过,用的人也相对少,自己去试试看~
>
>
>
>
> gaolanf...@hotmail.com
>
> 发件人: Leonard Xu
> 发送时间:
hi,立志:
从你的描述(能跑 10 几天且使用的是 FsStateBackend),可以提供一下 JobManager 和 TaskManager 的 GC
时间和次数的监控信息吗?怀疑是不是因为 Full GC 导致的问题。
Best!
zhisheng
张立志 于2020年6月28日周日 上午10:13写道:
> 从监控后台看back presure 是正常的,flatMap 这个任务是存在的,但只是连了一下mysql,没有别的任何操作,而且另一个job
> 没有flatmap ,单纯的map reduce
>
hi, Tianwang Li
看到有三个图片挂了,可以试着把图片上传到第三方的图床,然后贴个链接过来,另外:
> 任务经常会出现反压(特别是在窗口输出的时候)
这个检查一下窗口下游算子的情况,比如是不是窗口输出的数据过多,而 sink 的并发还和之前的保持一致,导致处理速度跟不上,从而导致的反压。
> 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)
这种也可以看看是不是 HDFS 有时候压力比较大导致的出现毛刺现象
另外建议补一下 UI 上 chekcpoint
Hi Dimitris,
Regarding your questions.
a) For standalone clusters, the recommended way is to use `.flink.size`
rather than `.process.size`. `.process.size` includes JVM metaspace and
overhead in addition to `.flink.size`, which usually do not really matter
for standalone clusters.
b) In case of
是的。
Best,
Roc Marshal.
在 2020-06-28 10:10:20,"林恬" 写道:
>您的意思是,这些因为Cancel Job的遗留的空的leader/${job_id} ZNode是需要使用者自己定期清理么?
>
>
>
>
>
>
>
>--Original--
>From: "Roc Marshal"Date: Sun, Jun 28, 2020 10:07 AM
>To: "FLINK中国"
>Subject: Re:Flink
关于Flink checkpoint偶尔会比较长时间的问题。
*环境与背景:*
版本:flink1.10.0
数据量:每秒约10万左右的记录,数据源是kafka
计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。
是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。
*问题:*
大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。
source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting checkpoint消耗时间比较长。
好的 感谢两位我试试
Sun.Zhu <17626017...@163.com> 于2020年6月25日周四 下午11:19写道:
> 虽然chain在一起,但是可以通过metrics中看出来各个算子的各项指标的
> | |
> Sun.Zhu
> |
> |
> 17626017...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年06月25日 00:51,徐骁 写道:
> 两个方法确实可以, 但是要追踪起来很废时间, 对小白太不友好啊
>
Hi,
我好像没有看到哪里说不可以group by ROW类型呀,可以说下你使用的是哪个Flink版本,以及哪个planner么?
能附上异常栈就更好啦。
sunfulin 于2020年6月25日周四 下午4:35写道:
> Hi,
> 请教大家一个问题,我在一个sink table里定义了一个column A,类型是Row。
> 在通过Flink SQL 做sink的时候,尝试group by A,报错说ROW类型无法group by。这种应该如何处理哈?
--
Best,
Benchao Li
从监控后台看back presure 是正常的,flatMap 这个任务是存在的,但只是连了一下mysql,没有别的任何操作,而且另一个job
没有flatmap ,单纯的map reduce
统计,能跑10几天,到1个多G的时侯就明显变慢,然后超时10分钟就报错了,从后台的错误日志里,没有明显的异常信息,都是checkpoint 超时后的信息.
在 2020-06-28 09:58:00,"LakeShen" 写道:
>Hi 张立志,
>
>一般 Checkpoint 超时,可以先看看你的任务中,是否存在反压,比如 Sink 阶段,又或者是某个地方有 flatMap操作导致。
>
您的意思是,这些因为Cancel Job的遗留的空的leader/${job_id} ZNode是需要使用者自己定期清理么?
--Original--
From: "Roc Marshal"
Hi, 林恬.
首先,感谢你的反馈。
关于zk对应路径下的信息清理问题,你可以简单理解为,Flink对zk组件的依赖,仅在依赖其功能的范围内。并不会提供整个集群或者某个路径下和Flink
job信息一致性的维护,即不会对其进行无效的信息清理,因为在HA的场景下,对无效路径的判定条件要复杂很多。
Best,
Roc Marshal.
在 2020-06-28 09:12:41,"林恬" 写道:
>各位好:
> 目前我使用的是Flink 1.9.2, HA使用ZK, 使用过程中发现ZK上的/leader/${job_id}
Hi Ori,
Here are some suggestions from my side.
- Probably the most straightforward way is to try increasing the timeout
to see if that helps. You can leverage the configuration option
`heartbeat.timeout`[1]. The default is 50s.
- It might be helpful to share your configuration
我理解你需要使用session模式,即./bin/yarn-session.sh [1]
[1]
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html#flink-yarn-session
Best,
Yangze Guo
On Sun, Jun 28, 2020 at 9:10 AM guanyq wrote:
>
> 问题1
>
> ./bin/flink run -m
>
Hi, 立志。
能不能提供一下更多的信息,比如异常信息等,方便对这个case背景做更进一步的了解呢?
谢谢。
Best,
Roc Marshal
在 2020-06-28 09:52:10,"张立志" 写道:
>flink 版本1.8
>部署集群yarn
>
>
>配置代码:
>StreamExecutionEnvironment.stateBackend(new
Hi 张立志,
一般 Checkpoint 超时,可以先看看你的任务中,是否存在反压,比如 Sink 阶段,又或者是某个地方有 flatMap操作导致。
然后看下自己任务中,是否存在热点问题等。如果一切都是正常的话,可以尝试使用 RocksDB 的增量 Checkpoint ,具体参考[1]。
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/state_backends.html#rocksdb-state-backend-details
Best,
LakeShen
张立志
Hi guanyq,
你为什么希望 app id 不变呢?
Best,
LakeShen
guanyq 于2020年6月28日周日 上午9:10写道:
> 问题1
>
> ./bin/flink run -m
> yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254
>
> 当yarn application -kill application_1567067657620_0254后,
>
> 在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
>
> 问题2
flink 版本1.8
部署集群yarn
配置代码:
StreamExecutionEnvironment.stateBackend(new
FsStateBackend("hdfs://nsstreaming/streaming/flink_checkpoint/state").checkpointingInterval(1000*60*10).checkpointTimeout(1000*60*10).timeCharacteristic(TimeCharacteristic.IngestionTime).build();
业务代码相对比较简单,内存占用较大
各位好:
目前我使用的是Flink 1.9.2, HA使用ZK, 使用过程中发现ZK上的/leader/${job_id}
节点即使作业被Cancel了也不会被清理,导致运行久了之后,/leader/下有大量job_id的空ZNode,请问这块清理时机是什么时候呢?或者说这个没被清理的行为是否是1.9.2的bug呢?
问题1
./bin/flink run -m
yarn-cluster每次提交是都会产生一个新的APP-ID如:application_1567067657620_0254
当yarn application -kill application_1567067657620_0254后,
在提交./bin/flink run -m yarn-cluster如何不让这个appid递增?
问题2
./bin/flink run -m yarn-cluster提交任务后。cancel掉job,如何在提交到这个appid上?
On Thu 25 Jun 2020 at 12:48, ivo.kn...@t-online.de
wrote:
> Whats up guys,
>
>
>
> I'm trying to run an Apache Flink Application with the GraalVM Native
> Image but I get the following error: (check attached file)
>
>
>
> I suppose this happens, because Flink uses a lot of low-level-code and is
With an AscendingTimestampExtractor, watermarks are not created for every
event, and as your job starts up, some events will be processed before the
first watermark is generated.
The impossible value you see is an initial value that's in place until the
first real watermark is available. On the
Hello,
I'm having a bit of trouble understanding the memory configuration on
flink.
I'm using flink10.0.0 to read some datasets of edges and extract features.
I run this on a cluster consisting of 4 nodes , with 32cores and 252GB Ram
each, and hopefully I could expand this as long as I can add
Hi,all:
Flink sql
,??A??c1,c2,c3c1c4c1,c4,c2,c3.
.
Hi All,
I have recently been exploring MIDAS: an algorithm for Streaming Anomaly
Detection. A production level parallel and distributed implementation of
MIDAS should be quite useful to the industry. I feel that Flink is very
well-suited for the same as MIDAS deals with streaming data. If anyone
Hi Community ,
Could anyone let me know if Flink is used in US healthcare tech space ?
Thanks,
Prasanna.
36 matches
Mail list logo