回复: 如果用flink sql持续查询过去30分钟登录网站的人数?
不好意思,之前没看到这个问题。 Darwin-amd64就是mac上的可执行文件格式。信任他,直接可以执行的。 -邮件原件- 发件人: 陈帅 发送时间: Saturday, December 7, 2019 10:48 PM 收件人: user-zh@flink.apache.org 主题: Re: 如果用flink sql持续查询过去30分钟登录网站的人数? 请问这个平台生成可执行文件creek是如何实现的?对应的Dawin-amd64环境下载下来的文件是什么格式的? Yuan,Youjun 于2019年12月7日周六 下午8:32写道: > 是的,与Flink完全一样的SQL接口,为边缘计算设计的流式计算框架。 > > > -邮件原件- > 发件人: 陈帅 > 发送时间: Saturday, December 7, 2019 11:36 AM > 收件人: user-zh@flink.apache.org > 主题: Re: 如果用flink sql持续查询过去30分钟登录网站的人数? > > 你们这个平台还挺方便快速验证的,是扩展了Flink SQL吗? > 虽然没有完全解决我的问题,但还是要谢谢你。 > > Yuan,Youjun 于2019年12月5日周四 上午10:41写道: > > > 可以用30分钟的range over窗口来处理,但是你提到两个0值得输出恐怕做不到,没有数据,没有产出。 > > 假设你得输入包含ts和userid两个字段,分别为时间戳和用户id,那么SQL应该这样: > > INSERT INTO mysink > > SELECT > >ts, userid, > >COUNT(userid) > >OVER (PARTITION BY userid ORDER BY rowtime RANGE BETWEEN > > INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW) AS cnt FROM mysrc > > > > 以如下输入为例: > > "2019-12-05 12:02:00,user1", > > "2019-12-05 12:13:00,user1", > > "2019-12-05 12:15:00,user1", > > "2019-12-05 12:31:00,user1", > > "2019-12-05 12:40:00,user1", > > "2019-12-05 12:45:00,user1" > > 产出如下结果: > > {"cnt":1,"ts":157554732,"userid":"user1"} > > {"cnt":2,"ts":157554798,"userid":"user1"} > > {"cnt":3,"ts":157554810,"userid":"user1"} > > {"cnt":4,"ts":157554906,"userid":"user1"} > > {"cnt":4,"ts":157554960,"userid":"user1"} > > {"cnt":4,"ts":157554990,"userid":"user1"} > > > > 为了验证上述SQL,你可以将如下作业粘贴到http://creek.baidubce.com/ > > 的作业定义输入框中,点击生成可执行文件,运行下载到的可执行文件,就能看到结果: > > { > > "sources": [{ > > "schema": { > > "format": "CSV", > > "fields": [{ > > "name": "ts", > > "type": "SQL_TIMESTAMP" > > }, > > { > > "name": "userid", > > "type": "STRING" > > }] > > }, > > "watermark": 0, > > "name": "mysrc", > > "eventTime": "ts", > > "type": "COLLECTION", > > "attr": { > > "input":[ > > "2019-12-05 12:02:00,user1", > > "2019-12-05 12:13:00,user1", > > "2019-12-05 12:15:00,user1", > > "2019-12-05 12:31:00,user1", > > "2019-12-05 12:40:00,user1", > > "2019-12-05 12:45:00,user1" > > ] > > } > > }], > > "sink": { > > "schema": { > > "format": "JSON" > > }, > > "name": "mysink", > > "type": "STDOUT" > > }, > > "name": "demojob", > > "timeType": "EVENTTIME", > > "sql": "INSERT INTO mysink SELECT rowtime, userid, COUNT(userid) > > OVER (PARTITION BY userid ORDER BY rowtime RANGE BETWEEN INTERVAL '30' > > MINUTE PRECEDING AND CURRENT ROW) AS cnt FROM mysrc" > > } > > > > > > 当然上面的例子是以事件时间,用处理时间也是可以的。为了验证,你可以把source.type从COLLECTION改成STDIN,把tim > > eT > > ype从EVENTTIME改成PROCESSTIME,重新生成、运行,从命令行下输入数据。 > > > > 袁尤军 > > > > -邮件原件- > > 发件人: 陈帅 > > 发送时间: Wednesday, December 4, 2019 11:40 PM > > 收件人: user-zh@flink.apache.org > > 主题: 如果用flink sql持续查询过去30分钟登录网站的人数? > > > > 例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无 > > 那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为 > > 12:01 (0), 12:03:(1), 12:14 (2), 12:16(3), 12:30 (4), 12:35 (4), > > 12:41 (5), 12:46 (4), 13:16 (0) > > 即每个元素进来就会设一个30分钟过期时间,窗口状态是维护还当前未过期元素集合。 > > > > 如果用sliding > > window的话,步长需要设置成1秒,那么窗口个数会膨胀很多,而实际上我只需要统计其中一个窗口,多余的窗口浪费了。我也考虑过用 > > over window,但是不知道它是否支持处理时间,因为我的场景是需要根据处理时间推移而改变统计值的。我尝试用stream > > api来实现,利用了timerService设置元素过期时间,但我测下来发现元素过期速度赶不上进入元素的速度,导致state大小一直增长. > > > > 所以想问一下: > > 1. 针对这种case有没有标准做法?sql支持吗? > > 2. 要怎么解决timerService的性能问题?timerService底层实现是不是单线程处理priority queue? > > > > 谢谢! > > 陈帅 > > >
Re: [DISCUSS] What parts of the Python API should we focus on next ?
Also CC user-zh. Best, Jincheng jincheng sun 于2019年12月19日周四 上午10:20写道: > Hi folks, > > As release-1.10 is under feature-freeze(The stateless Python UDF is > already supported), it is time for us to plan the features of PyFlink for > the next release. > > To make sure the features supported in PyFlink are the mostly demanded for > the community, we'd like to get more people involved, i.e., it would be > better if all of the devs and users join in the discussion of which kind of > features are more important and urgent. > > We have already listed some features from different aspects which you can > find below, however it is not the ultimate plan. We appreciate any > suggestions from the community, either on the functionalities or > performance improvements, etc. Would be great to have the following > information if you want to suggest to add some features: > > - > - Feature description: > - Benefits of the feature: > - Use cases (optional): > -- > > Features in my mind > > 1. Integration with most popular Python libraries > - fromPandas/toPandas API >Description: > Support to convert between Table and pandas.DataFrame. >Benefits: > Users could switch between Flink and Pandas API, for example, do > some analysis using Flink and then perform analysis using the Pandas API if > the result data is small and could fit into the memory, and vice versa. > > - Support Scalar Pandas UDF >Description: > Support scalar Pandas UDF in Python Table API & SQL. Both the > input and output of the UDF is pandas.Series. >Benefits: > 1) Scalar Pandas UDF performs better than row-at-a-time UDF, > ranging from 3x to over 100x (from pyspark) > 2) Users could use Pandas/Numpy API in the Python UDF > implementation if the input/output data type is pandas.Series > > - Support Pandas UDAF in batch GroupBy aggregation >Description: >Support Pandas UDAF in batch GroupBy aggregation of Python > Table API & SQL. Both the input and output of the UDF is pandas.DataFrame. >Benefits: > 1) Pandas UDAF performs better than row-at-a-time UDAF more than > 10x in certain scenarios > 2) Users could use Pandas/Numpy API in the Python UDAF > implementation if the input/output data type is pandas.DataFrame > > 2. Fully support all kinds of Python UDF > - Support Python UDAF(stateful) in GroupBy aggregation (NOTE: Please > give us some use case if you want this feature to be contained in the next > release) > Description: > Support UDAF in GroupBy aggregation. > Benefits: > Users could define and use Python UDAF and use it in GroupBy > aggregation. Without it, users have to use Java/Scala UDAF. > > - Support Python UDTF > Description: >Support Python UDTF in Python Table API & SQL > Benefits: > Users could define and use Python UDTF in Python Table API & SQL. > Without it, users have to use Java/Scala UDTF. > > 3. Debugging and Monitoring of Python UDF >- Support User-Defined Metrics > Description: >Allow users to define user-defined metrics and global job > parameters with Python UDFs. > Benefits: >UDF needs metrics to monitor some business or technical indicators, > which is also a requirement for UDFs. > >- Make the log level configurable > Description: >Allow users to config the log level of Python UDF. > Benefits: >Users could configure different log levels when debugging and > deploying. > > 4. Enrich the Python execution environment >- Docker Mode Support > Description: > Support running python UDF in docker workers. > Benefits: > Support various of deployments to meet more users' requirements. > > 5. Expand the usage scope of Python UDF >- Support to use Python UDF via SQL client > Description: > Support to register and use Python UDF via SQL client > Benefits: > SQL client is a very important interface for SQL users. This > feature allows SQL users to use Python UDFs via SQL client. > >- Integrate Python UDF with Notebooks > Description: > Such as Zeppelin, etc (Especially Python dependencies) > >- Support to register Python UDF into catalog > Description: > Support to register Python UDF into catalog > Benefits: > 1)Catalog is the centralized place to manage metadata such as > tables, UDFs, etc. With it, users could register the UDFs once and use it > anywhere. > 2) It's an important part of the SQL functionality. If Python > UDFs are not supported to be registered and used in catalog, Python UDFs > could not be shared between jobs. > > 6. Performance Improvements of Python UDF >- Cython improvements > Description: > Cython Improvements in coder & operations > Benefits: > Initial
Re: flink跨集群kerberos认证问题
是配置了:keytab和principal两个配置吗 Leslie Yuen 于2019年12月19日 周四08:35写道: > > 遇到过同样的问题,最后的解决办法就是直接在应用的集群中配置好flink-conf.yaml文件,代码中不需要额外加入认证代码就可以了。我测试通过的是消费写入带kerberos认证的Kafka > > > 在 2019年12月18日,23:05,李现 写道: > > > > 各位好, > >flink集群跨集群无法通过kerberos认证。 > >集群1:flink集群,无kerberos认证 > >集群2:hadoop2.6.0集群,有kerberos认证 > > > 集群1数据到集群2hdfs,用的是bucketingsink,无法通过kerberos认证。按照官方文档的配置,如果是在一个集群是能通过验证的。 > > 现在没有什么思路,希望有经验的朋友提点下,不胜感激。 >
Re: flink跨集群kerberos认证问题
这个方法我试过,如果是在应用集群的flink配置好conf文件,flinkjob初始化以及checkpoint存储需要和本地的hdfs交互,这样也是会遇到问题。就是client是security,server(本地hdfs)是simple,这样也是会抛异常。 Leslie Yuen 于2019年12月19日 周四08:35写道: > > 遇到过同样的问题,最后的解决办法就是直接在应用的集群中配置好flink-conf.yaml文件,代码中不需要额外加入认证代码就可以了。我测试通过的是消费写入带kerberos认证的Kafka > > > 在 2019年12月18日,23:05,李现 写道: > > > > 各位好, > >flink集群跨集群无法通过kerberos认证。 > >集群1:flink集群,无kerberos认证 > >集群2:hadoop2.6.0集群,有kerberos认证 > > > 集群1数据到集群2hdfs,用的是bucketingsink,无法通过kerberos认证。按照官方文档的配置,如果是在一个集群是能通过验证的。 > > 现在没有什么思路,希望有经验的朋友提点下,不胜感激。 >
Re: flink跨集群kerberos认证问题
遇到过同样的问题,最后的解决办法就是直接在应用的集群中配置好flink-conf.yaml文件,代码中不需要额外加入认证代码就可以了。我测试通过的是消费写入带kerberos认证的Kafka > 在 2019年12月18日,23:05,李现 写道: > > 各位好, >flink集群跨集群无法通过kerberos认证。 >集群1:flink集群,无kerberos认证 >集群2:hadoop2.6.0集群,有kerberos认证 >集群1数据到集群2hdfs,用的是bucketingsink,无法通过kerberos认证。按照官方文档的配置,如果是在一个集群是能通过验证的。 > 现在没有什么思路,希望有经验的朋友提点下,不胜感激。
Re: Re: flink如何动态修改窗口大小和类型?
有例子吗?复杂点的是有一个控制窗口大小的控制流connect实现 王双利 发件人: 陈帅 发送时间: 2019-12-18 22:51 收件人: user-zh@flink.apache.org 主题: Re: flink如何动态修改窗口大小和类型? 现实中确实有这类需求,简单点的是元素本身带有窗口大小设置,复杂点的是有一个控制窗口大小的控制流connect实现。 LakeShen 于2019年12月18日周三 下午2:12写道: > 使用自定义 Trigger 试试,在 Trigger 中自定义业务触发逻辑。 > > 陈帅 于2019年12月14日周六 下午6:44写道: > > > flink目前支不支持动态修改窗口大小和类型呢?例如先是统计5分钟窗口,而后修改成统计10分钟窗口. > > >
flink跨集群kerberos认证问题
各位好, flink集群跨集群无法通过kerberos认证。 集群1:flink集群,无kerberos认证 集群2:hadoop2.6.0集群,有kerberos认证 集群1数据到集群2hdfs,用的是bucketingsink,无法通过kerberos认证。按照官方文档的配置,如果是在一个集群是能通过验证的。 现在没有什么思路,希望有经验的朋友提点下,不胜感激。
Re: flink如何动态修改窗口大小和类型?
现实中确实有这类需求,简单点的是元素本身带有窗口大小设置,复杂点的是有一个控制窗口大小的控制流connect实现。 LakeShen 于2019年12月18日周三 下午2:12写道: > 使用自定义 Trigger 试试,在 Trigger 中自定义业务触发逻辑。 > > 陈帅 于2019年12月14日周六 下午6:44写道: > > > flink目前支不支持动态修改窗口大小和类型呢?例如先是统计5分钟窗口,而后修改成统计10分钟窗口. > > >
Re: flink sql confluent schema avro topic注册成表
谢谢回复,有了schema registry url为何还需要填subject和avroSchemaStr呢? 朱广彬 于2019年12月18日周三 上午10:30写道: > Hi 陈帅, > > 目前社区确实不支持confluent schema registry的avro格式,我们内部也是依赖schema registry来做avro > schema的管理,所以,我们改动了flink-avro 的源码来支持。 > > 主要涉及到这些地方: > > org.apache.flink.formats.avro.{AvroRowFormatFactory,AvroRowDeserializationSchema,AvroRowSerializationSchema} > 和org.apache.flink.table.descriptors.{Avro,AvroValidator} > > 使用时在构建Avro时指定以下三个参数即可(见标红部分): > > tableEnv.connect( > new Kafka() > .version("universal") > .topic(topic) > .properties(props) > ).withFormat( > new Avro() > .useRegistry(true) > .registryUrl(KAFKA_SCHEMA_REGISTRY_URL_ADDRESS) > .registrySubject(subject) > .avroSchema(avroSchemaStr) > ) > > > 陈帅 于2019年12月18日周三 上午8:26写道: > > > > flink sql是否能够支持将confluent schema registry注册的一个avro数据格式 的topic注册成一张table? >
Re: flink如何动态修改窗口大小和类型?
不是有 Dynamic gap 吗? Best regards Utopia 在 2019年12月18日 +0800 22:34,jingjing bai ,写道: > 目前一个任务中,仅支持一种窗口。 > 动态修改本身应该是一个伪需求 > 如果你仅仅是为了方便快速开发,建议你用sql方式去开发,比如自己扩展sql client。 > > > LakeShen 于2019年12月18日周三 下午2:12写道: > > > 使用自定义 Trigger 试试,在 Trigger 中自定义业务触发逻辑。 > > > > 陈帅 于2019年12月14日周六 下午6:44写道: > > > > > flink目前支不支持动态修改窗口大小和类型呢?例如先是统计5分钟窗口,而后修改成统计10分钟窗口. > > > > >
Re: flink如何动态修改窗口大小和类型?
目前一个任务中,仅支持一种窗口。 动态修改本身应该是一个伪需求 如果你仅仅是为了方便快速开发,建议你用sql方式去开发,比如自己扩展sql client。 LakeShen 于2019年12月18日周三 下午2:12写道: > 使用自定义 Trigger 试试,在 Trigger 中自定义业务触发逻辑。 > > 陈帅 于2019年12月14日周六 下午6:44写道: > > > flink目前支不支持动态修改窗口大小和类型呢?例如先是统计5分钟窗口,而后修改成统计10分钟窗口. > > >
Re: Re: FLINK 1.9 + YARN+ SessionWindows + 大数据量 + 运行一段时间后 OOM
- "TaskManager分配用于排序,hash表和缓存中间结果的内存位于JVM堆外" 这个是针对 batch (dataset / blink sql) 作业的,我看你跑的应该是 streaming 作业,把 taskmanager.memory.off-heap 设成 true 只是单纯为了减小 jvm heap size,留出空间给 rocksdb。 - 有一个 flink-examples 的目录,里面有一些示例作业,不过主要是展示 api 用法的。部署、资源调优方面的示例暂时还没有。 - 另外,我在上一封邮件里描述的解决方案,是针对 flink 1.9 及以前版本的。最新尚未发布的 flink 1.10 中资源配置部分做了比较大的改动,如果有兴趣的话可以等到发布之后关注一下相关的文档。 Thank you~ Xintong Song On Wed, Dec 18, 2019 at 4:49 PM USERNAME wrote: > @tonysong...@gmail.com 感谢回复 > 看了下参数的含义, > taskmanager.memory.off-heap: > 如果设置为true,TaskManager分配用于排序,hash表和缓存中间结果的内存位于JVM堆外。对于具有较大内存的设置,这可以提高在内存上执行的操作的效率(默认为false)。 > JVM堆内使用的内存是受YARN限制的,JVM堆外不受YARN限制,如果这样确实能 说通现在我的问题, > 已经修改并且在测试了,非常感谢tonysong...@gmail.com > 咱们FLINK有没有一些最佳实践的项目样例,能体现一些细节上的东西,能让大家用的更简单一些,体现FLINK的强大。 > > > > 在 2019-12-17 18:16:02,"Xintong Song" 写道: > >你这个不是OOM,是 container 内存超用被 yarn 杀掉了。 > >JVM 的内存是不可能超用的,否则会报 OOM。所以比较可能是 RocksDB 的内存够用量增加导致了超用。 > > > >建议: > > > >1. 增加如下配置 > >taskmanager.memory.off-heap: true > >taskmanager.memory.preallocate: false > > > >2. 若果已经采用了如下配置,或者改了配置之后仍存在问题,可以尝试调大下面这个配置,未配置时默认值是0.25 > >containerized.heap-cutoff-ratio > > > >Thank you~ > > > >Xintong Song > > > > > > > >On Tue, Dec 17, 2019 at 5:49 PM USERNAME wrote: > > > >> 版本:flink 1.9.1 > >> --运行命令 > >> flink run -d -m yarn-cluster -yn 40 -ys 2 > >> > >> > >> --部分代码 > >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >> RocksDBStateBackend backend = new RocksDBStateBackend(CHECKPOINT_PATH, > >> true); > >> > >> > >> .keyBy("imei") //10W+ > >> .window(EventTimeSessionWindows.withGap(Time.hours(1))) //设备超过1小时没有点就算离线 > >> .trigger(new Trigger()) > >> .aggregate(new AggregateFunction(), new ProcessWindowFunction()) > >> > >> > >> --数据 > >> 总共10W+设备,每个设备每30秒一条数据,一分钟数据量20W左右。 > >> > >> > >> --错误现象 > >> 运行一段时间(几天)之后,taskmanager就会挂掉。 > >> > >> > >> --求教 > >> 1. flink 内存不断增加? > >> 数据量是挺大的,并且窗口保留期可能会很长,但是实际数据运算一次就可以不用了,也做了StateTtlConfig 不知道 > >> 哪里?什么?导致的内存一直占用,可能用法有问题,希望大神能够指点一下迷津。 > >> 2. flink / yarn 参数配置能优化吗? > >> 有flink on yarn 的配置最佳实践吗? > >> > >> > >> 问题困扰很久了 从1.7 - 1.8 - 1.9 ,希望有熟悉内部机制和有过类似问题的大神指点一下。 > >> > >> > >> > >> > >> --错误信息 --> nodemanager .log > >> > >> > >> 2019-12-17 16:55:16,545 WARN > >> > org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: > >> Process tree for container: container_e16_1575354121024_0050_01_08 > has > >> processes older than 1 iteration running over the configured limit. > >> Limit=3221225472, current usage = 3222126592 > >> 2019-12-17 16:55:16,546 WARN > >> > org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: > >> Container > >> [pid=184523,containerID=container_e16_1575354121024_0050_01_08] is > >> running 901120B beyond the 'PHYSICAL' memory limit. Current usage: 3.0 > GB > >> of 3 GB physical memory used; 4.9 GB of 30 GB virtual memory used. > Killing > >> container. > >> Dump of the process-tree for container_e16_1575354121024_0050_01_08 > : > >> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) > >> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE > >> |- 184701 184523 184523 184523 (java) 21977 4845 5166649344 786279 > >> /usr/local/jdk1.8.0_171/bin/java -Xms2224m -Xmx2224m > >> -XX:MaxDirectMemorySize=848m -XX:NewRatio=2 -XX:+UseConcMarkSweepGC > >> -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly > >> -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError > >> > -Dlog.file=/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.log > >> -Dlogback.configurationFile=file:./logback.xml > >> -Dlog4j.configuration=file:./log4j.properties > >> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . > >> |- 184523 184521 184523 184523 (bash) 2 3 118067200 373 /bin/bash -c > >> /usr/local/jdk1.8.0_171/bin/java -Xms2224m -Xmx2224m > >> -XX:MaxDirectMemorySize=848m -XX:NewRatio=2 -XX:+UseConcMarkSweepGC > >> -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly > >> -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError > >> > -Dlog.file=/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.log > >> -Dlogback.configurationFile=file:./logback.xml > >> -Dlog4j.configuration=file:./log4j.properties > >> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> > >> > /opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.out > >> 2> > >> > /opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.err > >> > >> > >> > >> 2019-12-17 16:55:16,546 INFO > >> > org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: > >> Removed ProcessTree with root 184523 > >> 2019-12-17 16:55:16,547 INFO > >> > org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl: > >>
Re: jobmanager异常日志
这个只能说明超时,具体原因要看taskmanager日志 On Mon, Dec 16, 2019 at 2:49 PM pengchenglin wrote: > 各位: > > 大家好,在standalone的jobmanager节点的日志中看到的,这个153的taskmanager挂掉了,不知道是不是下面的报错导致的,有知道下面的Error是啥意思不 > > 2019-12-15 17:15:21.999 [flink-metrics-379] ERROR akka.remote.Remoting > flink-metrics-akka.remote.default-remote-dispatcher-20 - Association to > [akka.tcp://flink-metr...@xx.xx.xx.153:35929] with UID [1 > 617823256] irrecoverably failed. Quarantining address. > java.util.concurrent.TimeoutException: Remote system has been silent for > too long. (more than 48.0 hours) > at > akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:386) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at > akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > -- Regards, DinoZhang
Re:Re: FLINK 1.9 + YARN+ SessionWindows + 大数据量 + 运行一段时间后 OOM
@tonysong...@gmail.com 感谢回复 看了下参数的含义, taskmanager.memory.off-heap: 如果设置为true,TaskManager分配用于排序,hash表和缓存中间结果的内存位于JVM堆外。对于具有较大内存的设置,这可以提高在内存上执行的操作的效率(默认为false)。 JVM堆内使用的内存是受YARN限制的,JVM堆外不受YARN限制,如果这样确实能 说通现在我的问题, 已经修改并且在测试了,非常感谢tonysong...@gmail.com 咱们FLINK有没有一些最佳实践的项目样例,能体现一些细节上的东西,能让大家用的更简单一些,体现FLINK的强大。 在 2019-12-17 18:16:02,"Xintong Song" 写道: >你这个不是OOM,是 container 内存超用被 yarn 杀掉了。 >JVM 的内存是不可能超用的,否则会报 OOM。所以比较可能是 RocksDB 的内存够用量增加导致了超用。 > >建议: > >1. 增加如下配置 >taskmanager.memory.off-heap: true >taskmanager.memory.preallocate: false > >2. 若果已经采用了如下配置,或者改了配置之后仍存在问题,可以尝试调大下面这个配置,未配置时默认值是0.25 >containerized.heap-cutoff-ratio > >Thank you~ > >Xintong Song > > > >On Tue, Dec 17, 2019 at 5:49 PM USERNAME wrote: > >> 版本:flink 1.9.1 >> --运行命令 >> flink run -d -m yarn-cluster -yn 40 -ys 2 >> >> >> --部分代码 >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >> RocksDBStateBackend backend = new RocksDBStateBackend(CHECKPOINT_PATH, >> true); >> >> >> .keyBy("imei") //10W+ >> .window(EventTimeSessionWindows.withGap(Time.hours(1))) //设备超过1小时没有点就算离线 >> .trigger(new Trigger()) >> .aggregate(new AggregateFunction(), new ProcessWindowFunction()) >> >> >> --数据 >> 总共10W+设备,每个设备每30秒一条数据,一分钟数据量20W左右。 >> >> >> --错误现象 >> 运行一段时间(几天)之后,taskmanager就会挂掉。 >> >> >> --求教 >> 1. flink 内存不断增加? >> 数据量是挺大的,并且窗口保留期可能会很长,但是实际数据运算一次就可以不用了,也做了StateTtlConfig 不知道 >> 哪里?什么?导致的内存一直占用,可能用法有问题,希望大神能够指点一下迷津。 >> 2. flink / yarn 参数配置能优化吗? >> 有flink on yarn 的配置最佳实践吗? >> >> >> 问题困扰很久了 从1.7 - 1.8 - 1.9 ,希望有熟悉内部机制和有过类似问题的大神指点一下。 >> >> >> >> >> --错误信息 --> nodemanager .log >> >> >> 2019-12-17 16:55:16,545 WARN >> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: >> Process tree for container: container_e16_1575354121024_0050_01_08 has >> processes older than 1 iteration running over the configured limit. >> Limit=3221225472, current usage = 3222126592 >> 2019-12-17 16:55:16,546 WARN >> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: >> Container >> [pid=184523,containerID=container_e16_1575354121024_0050_01_08] is >> running 901120B beyond the 'PHYSICAL' memory limit. Current usage: 3.0 GB >> of 3 GB physical memory used; 4.9 GB of 30 GB virtual memory used. Killing >> container. >> Dump of the process-tree for container_e16_1575354121024_0050_01_08 : >> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) >> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE >> |- 184701 184523 184523 184523 (java) 21977 4845 5166649344 786279 >> /usr/local/jdk1.8.0_171/bin/java -Xms2224m -Xmx2224m >> -XX:MaxDirectMemorySize=848m -XX:NewRatio=2 -XX:+UseConcMarkSweepGC >> -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly >> -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError >> -Dlog.file=/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.log >> -Dlogback.configurationFile=file:./logback.xml >> -Dlog4j.configuration=file:./log4j.properties >> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . >> |- 184523 184521 184523 184523 (bash) 2 3 118067200 373 /bin/bash -c >> /usr/local/jdk1.8.0_171/bin/java -Xms2224m -Xmx2224m >> -XX:MaxDirectMemorySize=848m -XX:NewRatio=2 -XX:+UseConcMarkSweepGC >> -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly >> -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError >> -Dlog.file=/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.log >> -Dlogback.configurationFile=file:./logback.xml >> -Dlog4j.configuration=file:./log4j.properties >> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> >> /opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.out >> 2> >> /opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.err >> >> >> >> 2019-12-17 16:55:16,546 INFO >> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: >> Removed ProcessTree with root 184523 >> 2019-12-17 16:55:16,547 INFO >> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl: >> Container container_e16_1575354121024_0050_01_08 transitioned from >> RUNNING to KILLING >> 2019-12-17 16:55:16,549 INFO >> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch: >> Cleaning up container container_e16_1575354121024_0050_01_08 >> 2019-12-17 16:55:16,579 WARN >> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit >> code from container container_e16_1575354121024_0050_01_08 is : 143
Re: (补充图片链接) 关于直接设置Watermark和flatmap后再设置的疑问
kafka的exactly once是通过checkpoint机制保存消费位点来保证的,和event time没关系。在进入时间窗口前提取event time和设定watermark即可。 On Wed, Dec 18, 2019 at 4:12 PM 猫猫 <16770...@qq.com> wrote: > 图片不能粘贴,放到github上面了。 > https://github.com/maobuji/Blog/blob/master/image/changeEventTime.jpg > > > > > --原始邮件-- > 发件人:"猫猫"<16770...@qq.com; > 发送时间:2019年12月18日(星期三) 下午4:03 > 收件人:"user-zh" > 主题:回复: 关于直接设置Watermark和flatmap后再设置的疑问 > > > > 可能是我说的不太明确,我理解事件时间在整个流系统下只有一个。 > 但是读kafka时,我无法直接拿到业务数据中的event-time。因为数据可能会被拆分为多条。 > 我只能当做字符串取出,并设置事件时间为kafka的时间。 > > > 在进行一次消息转换后,才能获取到数据中的时间。所以我需要在flatmap以后才能正确的设置event-time > 但我又需要kafka的精确一次特性。 > > 所以问题转变为如下的问题,我能在流中重设eventTime吗?但对最初的流(kafka提交)不影响。 > 所以也就是之前提到的问题。 > env.addSource(flinkKafkaConsumer).assignTimestampsAndWatermarks(...) > > env.addSource(flinkKafkaConsumer).flatMap(...).assignTimestampsAndWatermarks(...); > > > 更进一步,我能向下图一样,在不同的流过程中,设置不同的eventTime吗?并且他们能够实现整体提交。 > > > > > > -- 原始邮件 -- > 发件人:"LakeShen" 发送时间:2019年12月18日(星期三) 下午2:10 > 收件人:"user-zh" > 主题:Re: 关于直接设置Watermark和flatmap后再设置的疑问 > > > > flatmap 逻辑中,你是否对消息记录的时间处理了吗,watermark 的更新的逻辑是比前一次 watermark 的时间截要大同时非空。 > > 猫猫 <16770...@qq.com 于2019年12月18日周三 上午9:27写道: > > env.addSource(flinkKafkaConsumer).assignTimestampsAndWatermarks(...) > > > env.addSource(flinkKafkaConsumer).flatMap(...).assignTimestampsAndWatermarks(...); > > 使用kafka进行消费,直接设置Watermark和经过flatMap()以后再设置,会产生什么样的区别和影响。 > flatMap可能会将数据处理为1-N条。那么在这种情况下,还能够保证kafka的精确一次吗? -- Regards, DinoZhang
?????????????????????? ????????????Watermark??flatmap??????????????
??github https://github.com/maobuji/Blog/blob/master/image/changeEventTime.jpg ---- ??:""<16770...@qq.com; :2019??12??18??(??) 4:03 ??:"user-zh"
?????? ????????????Watermark??flatmap??????????????
??kafka??event-time?? ??kafka ??flatmap??event-time ??kafka eventTime??kafka?? ?? env.addSource(flinkKafkaConsumer).assignTimestampsAndWatermarks(...) env.addSource(flinkKafkaConsumer).flatMap(...).assignTimestampsAndWatermarks(...); ??eventTime?? ---- ??:"LakeShen"