回复: 如果用flink sql持续查询过去30分钟登录网站的人数?

2019-12-18 文章 Yuan,Youjun
不好意思,之前没看到这个问题。
Darwin-amd64就是mac上的可执行文件格式。信任他,直接可以执行的。

-邮件原件-
发件人: 陈帅  
发送时间: Saturday, December 7, 2019 10:48 PM
收件人: user-zh@flink.apache.org
主题: Re: 如果用flink sql持续查询过去30分钟登录网站的人数?

请问这个平台生成可执行文件creek是如何实现的?对应的Dawin-amd64环境下载下来的文件是什么格式的?

Yuan,Youjun  于2019年12月7日周六 下午8:32写道:

> 是的,与Flink完全一样的SQL接口,为边缘计算设计的流式计算框架。
>
>
> -邮件原件-
> 发件人: 陈帅 
> 发送时间: Saturday, December 7, 2019 11:36 AM
> 收件人: user-zh@flink.apache.org
> 主题: Re: 如果用flink sql持续查询过去30分钟登录网站的人数?
>
> 你们这个平台还挺方便快速验证的,是扩展了Flink SQL吗?
> 虽然没有完全解决我的问题,但还是要谢谢你。
>
> Yuan,Youjun  于2019年12月5日周四 上午10:41写道:
>
> > 可以用30分钟的range over窗口来处理,但是你提到两个0值得输出恐怕做不到,没有数据,没有产出。
> > 假设你得输入包含ts和userid两个字段,分别为时间戳和用户id,那么SQL应该这样:
> > INSERT INTO mysink
> > SELECT
> >ts, userid,
> >COUNT(userid)
> >OVER (PARTITION BY userid ORDER BY rowtime RANGE BETWEEN 
> > INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW) AS cnt FROM mysrc
> >
> > 以如下输入为例:
> > "2019-12-05 12:02:00,user1",
> > "2019-12-05 12:13:00,user1",
> > "2019-12-05 12:15:00,user1",
> > "2019-12-05 12:31:00,user1",
> > "2019-12-05 12:40:00,user1",
> > "2019-12-05 12:45:00,user1"
> > 产出如下结果:
> > {"cnt":1,"ts":157554732,"userid":"user1"}
> > {"cnt":2,"ts":157554798,"userid":"user1"}
> > {"cnt":3,"ts":157554810,"userid":"user1"}
> > {"cnt":4,"ts":157554906,"userid":"user1"}
> > {"cnt":4,"ts":157554960,"userid":"user1"}
> > {"cnt":4,"ts":157554990,"userid":"user1"}
> >
> > 为了验证上述SQL,你可以将如下作业粘贴到http://creek.baidubce.com/
> > 的作业定义输入框中,点击生成可执行文件,运行下载到的可执行文件,就能看到结果:
> > {
> > "sources": [{
> > "schema": {
> > "format": "CSV",
> > "fields": [{
> > "name": "ts",
> > "type": "SQL_TIMESTAMP"
> > },
> > {
> > "name": "userid",
> > "type": "STRING"
> > }]
> > },
> > "watermark": 0,
> > "name": "mysrc",
> > "eventTime": "ts",
> > "type": "COLLECTION",
> > "attr": {
> > "input":[
> > "2019-12-05 12:02:00,user1",
> > "2019-12-05 12:13:00,user1",
> > "2019-12-05 12:15:00,user1",
> > "2019-12-05 12:31:00,user1",
> > "2019-12-05 12:40:00,user1",
> > "2019-12-05 12:45:00,user1"
> >   ]
> >   }
> > }],
> > "sink": {
> > "schema": {
> > "format": "JSON"
> > },
> > "name": "mysink",
> > "type": "STDOUT"
> > },
> > "name": "demojob",
> > "timeType": "EVENTTIME",
> > "sql": "INSERT INTO mysink SELECT rowtime, userid, COUNT(userid) 
> > OVER (PARTITION BY userid ORDER BY rowtime RANGE BETWEEN INTERVAL '30'
> > MINUTE PRECEDING AND CURRENT ROW) AS cnt FROM mysrc"
> > }
> >
> >
> > 当然上面的例子是以事件时间,用处理时间也是可以的。为了验证,你可以把source.type从COLLECTION改成STDIN,把tim
> > eT
> > ype从EVENTTIME改成PROCESSTIME,重新生成、运行,从命令行下输入数据。
> >
> > 袁尤军
> >
> > -邮件原件-
> > 发件人: 陈帅 
> > 发送时间: Wednesday, December 4, 2019 11:40 PM
> > 收件人: user-zh@flink.apache.org
> > 主题: 如果用flink sql持续查询过去30分钟登录网站的人数?
> >
> > 例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无
> > 那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为
> > 12:01 (0),  12:03:(1),  12:14 (2),  12:16(3), 12:30 (4), 12:35 (4),
> > 12:41 (5), 12:46 (4), 13:16 (0)
> > 即每个元素进来就会设一个30分钟过期时间,窗口状态是维护还当前未过期元素集合。
> >
> > 如果用sliding
> > window的话,步长需要设置成1秒,那么窗口个数会膨胀很多,而实际上我只需要统计其中一个窗口,多余的窗口浪费了。我也考虑过用
> > over window,但是不知道它是否支持处理时间,因为我的场景是需要根据处理时间推移而改变统计值的。我尝试用stream
> > api来实现,利用了timerService设置元素过期时间,但我测下来发现元素过期速度赶不上进入元素的速度,导致state大小一直增长.
> >
> > 所以想问一下:
> > 1. 针对这种case有没有标准做法?sql支持吗?
> > 2. 要怎么解决timerService的性能问题?timerService底层实现是不是单线程处理priority queue?
> >
> > 谢谢!
> > 陈帅
> >
>


Re: [DISCUSS] What parts of the Python API should we focus on next ?

2019-12-18 文章 jincheng sun
Also CC user-zh.

Best,
Jincheng


jincheng sun  于2019年12月19日周四 上午10:20写道:

> Hi folks,
>
> As release-1.10 is under feature-freeze(The stateless Python UDF is
> already supported), it is time for us to plan the features of PyFlink for
> the next release.
>
> To make sure the features supported in PyFlink are the mostly demanded for
> the community, we'd like to get more people involved, i.e., it would be
> better if all of the devs and users join in the discussion of which kind of
> features are more important and urgent.
>
> We have already listed some features from different aspects which you can
> find below, however it is not the ultimate plan. We appreciate any
> suggestions from the community, either on the functionalities or
> performance improvements, etc. Would be great to have the following
> information if you want to suggest to add some features:
>
> -
> - Feature description: 
> - Benefits of the feature: 
> - Use cases (optional): 
> --
>
> Features in my mind
>
> 1. Integration with most popular Python libraries
> - fromPandas/toPandas API
>Description:
>   Support to convert between Table and pandas.DataFrame.
>Benefits:
>   Users could switch between Flink and Pandas API, for example, do
> some analysis using Flink and then perform analysis using the Pandas API if
> the result data is small and could fit into the memory, and vice versa.
>
> - Support Scalar Pandas UDF
>Description:
>   Support scalar Pandas UDF in Python Table API & SQL. Both the
> input and output of the UDF is pandas.Series.
>Benefits:
>   1) Scalar Pandas UDF performs better than row-at-a-time UDF,
> ranging from 3x to over 100x (from pyspark)
>   2) Users could use Pandas/Numpy API in the Python UDF
> implementation if the input/output data type is pandas.Series
>
> - Support Pandas UDAF in batch GroupBy aggregation
>Description:
>Support Pandas UDAF in batch GroupBy aggregation of Python
> Table API & SQL. Both the input and output of the UDF is pandas.DataFrame.
>Benefits:
>   1) Pandas UDAF performs better than row-at-a-time UDAF more than
> 10x in certain scenarios
>   2) Users could use Pandas/Numpy API in the Python UDAF
> implementation if the input/output data type is pandas.DataFrame
>
> 2. Fully support  all kinds of Python UDF
> - Support Python UDAF(stateful) in GroupBy aggregation (NOTE: Please
> give us some use case if you want this feature to be contained in the next
> release)
>   Description:
> Support UDAF in GroupBy aggregation.
>   Benefits:
> Users could define and use Python UDAF and use it in GroupBy
> aggregation. Without it, users have to use Java/Scala UDAF.
>
> - Support Python UDTF
>   Description:
>Support  Python UDTF in Python Table API & SQL
>   Benefits:
> Users could define and use Python UDTF in Python Table API & SQL.
> Without it, users have to use Java/Scala UDTF.
>
> 3. Debugging and Monitoring of Python UDF
>- Support User-Defined Metrics
>  Description:
>Allow users to define user-defined metrics and global job
> parameters with Python UDFs.
>  Benefits:
>UDF needs metrics to monitor some business or technical indicators,
> which is also a requirement for UDFs.
>
>- Make the log level configurable
>  Description:
>Allow users to config the log level of Python UDF.
>  Benefits:
>Users could configure different log levels when debugging and
> deploying.
>
> 4. Enrich the Python execution environment
>- Docker Mode Support
>  Description:
>  Support running python UDF in docker workers.
>  Benefits:
>  Support various of deployments to meet more users' requirements.
>
> 5. Expand the usage scope of Python UDF
>- Support to use Python UDF via SQL client
>  Description:
>  Support to register and use Python UDF via SQL client
>  Benefits:
>  SQL client is a very important interface for SQL users. This
> feature allows SQL users to use Python UDFs via SQL client.
>
>- Integrate Python UDF with Notebooks
>  Description:
>  Such as Zeppelin, etc (Especially Python dependencies)
>
>- Support to register Python UDF into catalog
>   Description:
>   Support to register Python UDF into catalog
>   Benefits:
>   1)Catalog is the centralized place to manage metadata such as
> tables, UDFs, etc. With it, users could register the UDFs once and use it
> anywhere.
>   2) It's an important part of the SQL functionality. If Python
> UDFs are not supported to be registered and used in catalog, Python UDFs
> could not be shared between jobs.
>
> 6. Performance Improvements of Python UDF
>- Cython improvements
>   Description:
>   Cython Improvements in coder & operations
>   Benefits:
>   Initial 

Re: flink跨集群kerberos认证问题

2019-12-18 文章 李现
是配置了:keytab和principal两个配置吗

Leslie Yuen 于2019年12月19日 周四08:35写道:

>
> 遇到过同样的问题,最后的解决办法就是直接在应用的集群中配置好flink-conf.yaml文件,代码中不需要额外加入认证代码就可以了。我测试通过的是消费写入带kerberos认证的Kafka
>
> > 在 2019年12月18日,23:05,李现  写道:
> >
> > 各位好,
> >flink集群跨集群无法通过kerberos认证。
> >集群1:flink集群,无kerberos认证
> >集群2:hadoop2.6.0集群,有kerberos认证
> >
> 集群1数据到集群2hdfs,用的是bucketingsink,无法通过kerberos认证。按照官方文档的配置,如果是在一个集群是能通过验证的。
> > 现在没有什么思路,希望有经验的朋友提点下,不胜感激。
>


Re: flink跨集群kerberos认证问题

2019-12-18 文章 李现
这个方法我试过,如果是在应用集群的flink配置好conf文件,flinkjob初始化以及checkpoint存储需要和本地的hdfs交互,这样也是会遇到问题。就是client是security,server(本地hdfs)是simple,这样也是会抛异常。

Leslie Yuen 于2019年12月19日 周四08:35写道:

>
> 遇到过同样的问题,最后的解决办法就是直接在应用的集群中配置好flink-conf.yaml文件,代码中不需要额外加入认证代码就可以了。我测试通过的是消费写入带kerberos认证的Kafka
>
> > 在 2019年12月18日,23:05,李现  写道:
> >
> > 各位好,
> >flink集群跨集群无法通过kerberos认证。
> >集群1:flink集群,无kerberos认证
> >集群2:hadoop2.6.0集群,有kerberos认证
> >
> 集群1数据到集群2hdfs,用的是bucketingsink,无法通过kerberos认证。按照官方文档的配置,如果是在一个集群是能通过验证的。
> > 现在没有什么思路,希望有经验的朋友提点下,不胜感激。
>


Re: flink跨集群kerberos认证问题

2019-12-18 文章 Leslie Yuen
遇到过同样的问题,最后的解决办法就是直接在应用的集群中配置好flink-conf.yaml文件,代码中不需要额外加入认证代码就可以了。我测试通过的是消费写入带kerberos认证的Kafka

> 在 2019年12月18日,23:05,李现  写道:
> 
> 各位好,
>flink集群跨集群无法通过kerberos认证。
>集群1:flink集群,无kerberos认证
>集群2:hadoop2.6.0集群,有kerberos认证
>集群1数据到集群2hdfs,用的是bucketingsink,无法通过kerberos认证。按照官方文档的配置,如果是在一个集群是能通过验证的。
> 现在没有什么思路,希望有经验的朋友提点下,不胜感激。


Re: Re: flink如何动态修改窗口大小和类型?

2019-12-18 文章 王双利
有例子吗?复杂点的是有一个控制窗口大小的控制流connect实现



王双利
 
发件人: 陈帅
发送时间: 2019-12-18 22:51
收件人: user-zh@flink.apache.org
主题: Re: flink如何动态修改窗口大小和类型?
现实中确实有这类需求,简单点的是元素本身带有窗口大小设置,复杂点的是有一个控制窗口大小的控制流connect实现。
 
LakeShen  于2019年12月18日周三 下午2:12写道:
 
> 使用自定义 Trigger 试试,在 Trigger 中自定义业务触发逻辑。
>
> 陈帅  于2019年12月14日周六 下午6:44写道:
>
> > flink目前支不支持动态修改窗口大小和类型呢?例如先是统计5分钟窗口,而后修改成统计10分钟窗口.
> >
>


flink跨集群kerberos认证问题

2019-12-18 文章 李现
各位好,
flink集群跨集群无法通过kerberos认证。
集群1:flink集群,无kerberos认证
集群2:hadoop2.6.0集群,有kerberos认证
集群1数据到集群2hdfs,用的是bucketingsink,无法通过kerberos认证。按照官方文档的配置,如果是在一个集群是能通过验证的。
现在没有什么思路,希望有经验的朋友提点下,不胜感激。


Re: flink如何动态修改窗口大小和类型?

2019-12-18 文章 陈帅
现实中确实有这类需求,简单点的是元素本身带有窗口大小设置,复杂点的是有一个控制窗口大小的控制流connect实现。

LakeShen  于2019年12月18日周三 下午2:12写道:

> 使用自定义 Trigger 试试,在 Trigger 中自定义业务触发逻辑。
>
> 陈帅  于2019年12月14日周六 下午6:44写道:
>
> > flink目前支不支持动态修改窗口大小和类型呢?例如先是统计5分钟窗口,而后修改成统计10分钟窗口.
> >
>


Re: flink sql confluent schema avro topic注册成表

2019-12-18 文章 陈帅
谢谢回复,有了schema registry url为何还需要填subject和avroSchemaStr呢?

朱广彬  于2019年12月18日周三 上午10:30写道:

> Hi 陈帅,
>
> 目前社区确实不支持confluent schema registry的avro格式,我们内部也是依赖schema registry来做avro
> schema的管理,所以,我们改动了flink-avro 的源码来支持。
>
> 主要涉及到这些地方:
>
> org.apache.flink.formats.avro.{AvroRowFormatFactory,AvroRowDeserializationSchema,AvroRowSerializationSchema}
> 和org.apache.flink.table.descriptors.{Avro,AvroValidator}
>
> 使用时在构建Avro时指定以下三个参数即可(见标红部分):
>
> tableEnv.connect(
> new Kafka()
> .version("universal")
> .topic(topic)
> .properties(props)
> ).withFormat(
> new Avro()
>   .useRegistry(true)
>   .registryUrl(KAFKA_SCHEMA_REGISTRY_URL_ADDRESS)
>   .registrySubject(subject)
>   .avroSchema(avroSchemaStr)
> )
>
>
> 陈帅  于2019年12月18日周三 上午8:26写道:
> >
> > flink sql是否能够支持将confluent schema registry注册的一个avro数据格式 的topic注册成一张table?
>


Re: flink如何动态修改窗口大小和类型?

2019-12-18 文章 Utopia
不是有 Dynamic gap 吗?

Best  regards
Utopia
在 2019年12月18日 +0800 22:34,jingjing bai ,写道:
> 目前一个任务中,仅支持一种窗口。
> 动态修改本身应该是一个伪需求
> 如果你仅仅是为了方便快速开发,建议你用sql方式去开发,比如自己扩展sql client。
>
>
> LakeShen  于2019年12月18日周三 下午2:12写道:
>
> > 使用自定义 Trigger 试试,在 Trigger 中自定义业务触发逻辑。
> >
> > 陈帅  于2019年12月14日周六 下午6:44写道:
> >
> > > flink目前支不支持动态修改窗口大小和类型呢?例如先是统计5分钟窗口,而后修改成统计10分钟窗口.
> > >
> >


Re: flink如何动态修改窗口大小和类型?

2019-12-18 文章 jingjing bai
目前一个任务中,仅支持一种窗口。
动态修改本身应该是一个伪需求
如果你仅仅是为了方便快速开发,建议你用sql方式去开发,比如自己扩展sql client。


LakeShen  于2019年12月18日周三 下午2:12写道:

> 使用自定义 Trigger 试试,在 Trigger 中自定义业务触发逻辑。
>
> 陈帅  于2019年12月14日周六 下午6:44写道:
>
> > flink目前支不支持动态修改窗口大小和类型呢?例如先是统计5分钟窗口,而后修改成统计10分钟窗口.
> >
>


Re: Re: FLINK 1.9 + YARN+ SessionWindows + 大数据量 + 运行一段时间后 OOM

2019-12-18 文章 Xintong Song
   - "TaskManager分配用于排序,hash表和缓存中间结果的内存位于JVM堆外" 这个是针对 batch (dataset /
   blink sql)  作业的,我看你跑的应该是 streaming 作业,把 taskmanager.memory.off-heap 设成 true
   只是单纯为了减小 jvm heap size,留出空间给 rocksdb。
   - 有一个 flink-examples 的目录,里面有一些示例作业,不过主要是展示 api 用法的。部署、资源调优方面的示例暂时还没有。
   - 另外,我在上一封邮件里描述的解决方案,是针对 flink 1.9 及以前版本的。最新尚未发布的 flink 1.10
   中资源配置部分做了比较大的改动,如果有兴趣的话可以等到发布之后关注一下相关的文档。

Thank you~

Xintong Song



On Wed, Dec 18, 2019 at 4:49 PM USERNAME  wrote:

> @tonysong...@gmail.com 感谢回复
> 看了下参数的含义,
> taskmanager.memory.off-heap:
> 如果设置为true,TaskManager分配用于排序,hash表和缓存中间结果的内存位于JVM堆外。对于具有较大内存的设置,这可以提高在内存上执行的操作的效率(默认为false)。
> JVM堆内使用的内存是受YARN限制的,JVM堆外不受YARN限制,如果这样确实能 说通现在我的问题,
> 已经修改并且在测试了,非常感谢tonysong...@gmail.com
> 咱们FLINK有没有一些最佳实践的项目样例,能体现一些细节上的东西,能让大家用的更简单一些,体现FLINK的强大。
>
>
>
> 在 2019-12-17 18:16:02,"Xintong Song"  写道:
> >你这个不是OOM,是 container 内存超用被 yarn 杀掉了。
> >JVM 的内存是不可能超用的,否则会报 OOM。所以比较可能是 RocksDB 的内存够用量增加导致了超用。
> >
> >建议:
> >
> >1.  增加如下配置
> >taskmanager.memory.off-heap: true
> >taskmanager.memory.preallocate: false
> >
> >2. 若果已经采用了如下配置,或者改了配置之后仍存在问题,可以尝试调大下面这个配置,未配置时默认值是0.25
> >containerized.heap-cutoff-ratio
> >
> >Thank you~
> >
> >Xintong Song
> >
> >
> >
> >On Tue, Dec 17, 2019 at 5:49 PM USERNAME  wrote:
> >
> >> 版本:flink 1.9.1
> >> --运行命令
> >> flink run -d -m yarn-cluster -yn 40 -ys 2 
> >>
> >>
> >> --部分代码
> >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >> RocksDBStateBackend backend = new RocksDBStateBackend(CHECKPOINT_PATH,
> >> true);
> >>
> >>
> >> .keyBy("imei")  //10W+
> >> .window(EventTimeSessionWindows.withGap(Time.hours(1))) //设备超过1小时没有点就算离线
> >> .trigger(new Trigger())
> >> .aggregate(new AggregateFunction(), new ProcessWindowFunction())
> >>
> >>
> >> --数据
> >> 总共10W+设备,每个设备每30秒一条数据,一分钟数据量20W左右。
> >>
> >>
> >> --错误现象
> >> 运行一段时间(几天)之后,taskmanager就会挂掉。
> >>
> >>
> >> --求教
> >> 1. flink 内存不断增加?
> >> 数据量是挺大的,并且窗口保留期可能会很长,但是实际数据运算一次就可以不用了,也做了StateTtlConfig 不知道
> >> 哪里?什么?导致的内存一直占用,可能用法有问题,希望大神能够指点一下迷津。
> >> 2. flink / yarn 参数配置能优化吗?
> >> 有flink on yarn 的配置最佳实践吗?
> >>
> >>
> >> 问题困扰很久了 从1.7 - 1.8 - 1.9 ,希望有熟悉内部机制和有过类似问题的大神指点一下。
> >>
> >>
> >>
> >>
> >> --错误信息 --> nodemanager .log
> >>
> >>
> >> 2019-12-17 16:55:16,545 WARN
> >>
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
> >> Process tree for container: container_e16_1575354121024_0050_01_08
> has
> >> processes older than 1 iteration running over the configured limit.
> >> Limit=3221225472, current usage = 3222126592
> >> 2019-12-17 16:55:16,546 WARN
> >>
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
> >> Container
> >> [pid=184523,containerID=container_e16_1575354121024_0050_01_08] is
> >> running 901120B beyond the 'PHYSICAL' memory limit. Current usage: 3.0
> GB
> >> of 3 GB physical memory used; 4.9 GB of 30 GB virtual memory used.
> Killing
> >> container.
> >> Dump of the process-tree for container_e16_1575354121024_0050_01_08
> :
> >> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
> >> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
> >> |- 184701 184523 184523 184523 (java) 21977 4845 5166649344 786279
> >> /usr/local/jdk1.8.0_171/bin/java -Xms2224m -Xmx2224m
> >> -XX:MaxDirectMemorySize=848m -XX:NewRatio=2 -XX:+UseConcMarkSweepGC
> >> -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
> >> -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError
> >>
> -Dlog.file=/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.log
> >> -Dlogback.configurationFile=file:./logback.xml
> >> -Dlog4j.configuration=file:./log4j.properties
> >> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir .
> >> |- 184523 184521 184523 184523 (bash) 2 3 118067200 373 /bin/bash -c
> >> /usr/local/jdk1.8.0_171/bin/java -Xms2224m -Xmx2224m
> >> -XX:MaxDirectMemorySize=848m -XX:NewRatio=2 -XX:+UseConcMarkSweepGC
> >> -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
> >> -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError
> >>
> -Dlog.file=/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.log
> >> -Dlogback.configurationFile=file:./logback.xml
> >> -Dlog4j.configuration=file:./log4j.properties
> >> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1>
> >>
> /opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.out
> >> 2>
> >>
> /opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.err
> >>
> >>
> >>
> >> 2019-12-17 16:55:16,546 INFO
> >>
> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
> >> Removed ProcessTree with root 184523
> >> 2019-12-17 16:55:16,547 INFO
> >>
> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl:
> >> 

Re: jobmanager异常日志

2019-12-18 文章 Dino Zhang
这个只能说明超时,具体原因要看taskmanager日志

On Mon, Dec 16, 2019 at 2:49 PM pengchenglin  wrote:

> 各位:
>
> 大家好,在standalone的jobmanager节点的日志中看到的,这个153的taskmanager挂掉了,不知道是不是下面的报错导致的,有知道下面的Error是啥意思不
>
> 2019-12-15 17:15:21.999 [flink-metrics-379] ERROR akka.remote.Remoting
> flink-metrics-akka.remote.default-remote-dispatcher-20 - Association to
> [akka.tcp://flink-metr...@xx.xx.xx.153:35929] with UID [1
> 617823256] irrecoverably failed. Quarantining address.
> java.util.concurrent.TimeoutException: Remote system has been silent for
> too long. (more than 48.0 hours)
> at
> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:386)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> at
> akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>

-- 
Regards,
DinoZhang


Re:Re: FLINK 1.9 + YARN+ SessionWindows + 大数据量 + 运行一段时间后 OOM

2019-12-18 文章 USERNAME
@tonysong...@gmail.com 感谢回复
看了下参数的含义,
taskmanager.memory.off-heap: 
如果设置为true,TaskManager分配用于排序,hash表和缓存中间结果的内存位于JVM堆外。对于具有较大内存的设置,这可以提高在内存上执行的操作的效率(默认为false)。
JVM堆内使用的内存是受YARN限制的,JVM堆外不受YARN限制,如果这样确实能 说通现在我的问题,
已经修改并且在测试了,非常感谢tonysong...@gmail.com
咱们FLINK有没有一些最佳实践的项目样例,能体现一些细节上的东西,能让大家用的更简单一些,体现FLINK的强大。



在 2019-12-17 18:16:02,"Xintong Song"  写道:
>你这个不是OOM,是 container 内存超用被 yarn 杀掉了。
>JVM 的内存是不可能超用的,否则会报 OOM。所以比较可能是 RocksDB 的内存够用量增加导致了超用。
>
>建议:
>
>1.  增加如下配置
>taskmanager.memory.off-heap: true
>taskmanager.memory.preallocate: false
>
>2. 若果已经采用了如下配置,或者改了配置之后仍存在问题,可以尝试调大下面这个配置,未配置时默认值是0.25
>containerized.heap-cutoff-ratio
>
>Thank you~
>
>Xintong Song
>
>
>
>On Tue, Dec 17, 2019 at 5:49 PM USERNAME  wrote:
>
>> 版本:flink 1.9.1
>> --运行命令
>> flink run -d -m yarn-cluster -yn 40 -ys 2 
>>
>>
>> --部分代码
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> RocksDBStateBackend backend = new RocksDBStateBackend(CHECKPOINT_PATH,
>> true);
>>
>>
>> .keyBy("imei")  //10W+
>> .window(EventTimeSessionWindows.withGap(Time.hours(1))) //设备超过1小时没有点就算离线
>> .trigger(new Trigger())
>> .aggregate(new AggregateFunction(), new ProcessWindowFunction())
>>
>>
>> --数据
>> 总共10W+设备,每个设备每30秒一条数据,一分钟数据量20W左右。
>>
>>
>> --错误现象
>> 运行一段时间(几天)之后,taskmanager就会挂掉。
>>
>>
>> --求教
>> 1. flink 内存不断增加?
>> 数据量是挺大的,并且窗口保留期可能会很长,但是实际数据运算一次就可以不用了,也做了StateTtlConfig 不知道
>> 哪里?什么?导致的内存一直占用,可能用法有问题,希望大神能够指点一下迷津。
>> 2. flink / yarn 参数配置能优化吗?
>> 有flink on yarn 的配置最佳实践吗?
>>
>>
>> 问题困扰很久了 从1.7 - 1.8 - 1.9 ,希望有熟悉内部机制和有过类似问题的大神指点一下。
>>
>>
>>
>>
>> --错误信息 --> nodemanager .log
>>
>>
>> 2019-12-17 16:55:16,545 WARN
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>> Process tree for container: container_e16_1575354121024_0050_01_08 has
>> processes older than 1 iteration running over the configured limit.
>> Limit=3221225472, current usage = 3222126592
>> 2019-12-17 16:55:16,546 WARN
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>> Container
>> [pid=184523,containerID=container_e16_1575354121024_0050_01_08] is
>> running 901120B beyond the 'PHYSICAL' memory limit. Current usage: 3.0 GB
>> of 3 GB physical memory used; 4.9 GB of 30 GB virtual memory used. Killing
>> container.
>> Dump of the process-tree for container_e16_1575354121024_0050_01_08 :
>> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
>> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>> |- 184701 184523 184523 184523 (java) 21977 4845 5166649344 786279
>> /usr/local/jdk1.8.0_171/bin/java -Xms2224m -Xmx2224m
>> -XX:MaxDirectMemorySize=848m -XX:NewRatio=2 -XX:+UseConcMarkSweepGC
>> -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
>> -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError
>> -Dlog.file=/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.log
>> -Dlogback.configurationFile=file:./logback.xml
>> -Dlog4j.configuration=file:./log4j.properties
>> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir .
>> |- 184523 184521 184523 184523 (bash) 2 3 118067200 373 /bin/bash -c
>> /usr/local/jdk1.8.0_171/bin/java -Xms2224m -Xmx2224m
>> -XX:MaxDirectMemorySize=848m -XX:NewRatio=2 -XX:+UseConcMarkSweepGC
>> -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
>> -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError
>> -Dlog.file=/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.log
>> -Dlogback.configurationFile=file:./logback.xml
>> -Dlog4j.configuration=file:./log4j.properties
>> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1>
>> /opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.out
>> 2>
>> /opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.err
>>
>>
>>
>> 2019-12-17 16:55:16,546 INFO
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>> Removed ProcessTree with root 184523
>> 2019-12-17 16:55:16,547 INFO
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl:
>> Container container_e16_1575354121024_0050_01_08 transitioned from
>> RUNNING to KILLING
>> 2019-12-17 16:55:16,549 INFO
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
>> Cleaning up container container_e16_1575354121024_0050_01_08
>> 2019-12-17 16:55:16,579 WARN
>> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit
>> code from container container_e16_1575354121024_0050_01_08 is : 143


Re: (补充图片链接) 关于直接设置Watermark和flatmap后再设置的疑问

2019-12-18 文章 Dino Zhang
kafka的exactly once是通过checkpoint机制保存消费位点来保证的,和event time没关系。在进入时间窗口前提取event
time和设定watermark即可。

On Wed, Dec 18, 2019 at 4:12 PM 猫猫 <16770...@qq.com> wrote:

> 图片不能粘贴,放到github上面了。
> https://github.com/maobuji/Blog/blob/master/image/changeEventTime.jpg
>
>
>
>
> --原始邮件--
> 发件人:"猫猫"<16770...@qq.com;
> 发送时间:2019年12月18日(星期三) 下午4:03
> 收件人:"user-zh"
> 主题:回复: 关于直接设置Watermark和flatmap后再设置的疑问
>
>
>
> 可能是我说的不太明确,我理解事件时间在整个流系统下只有一个。
> 但是读kafka时,我无法直接拿到业务数据中的event-time。因为数据可能会被拆分为多条。
> 我只能当做字符串取出,并设置事件时间为kafka的时间。
>
>
> 在进行一次消息转换后,才能获取到数据中的时间。所以我需要在flatmap以后才能正确的设置event-time
> 但我又需要kafka的精确一次特性。
>
> 所以问题转变为如下的问题,我能在流中重设eventTime吗?但对最初的流(kafka提交)不影响。
> 所以也就是之前提到的问题。
> env.addSource(flinkKafkaConsumer).assignTimestampsAndWatermarks(...)
>
> env.addSource(flinkKafkaConsumer).flatMap(...).assignTimestampsAndWatermarks(...);
>
>
> 更进一步,我能向下图一样,在不同的流过程中,设置不同的eventTime吗?并且他们能够实现整体提交。
>
>
>
>
>
> -- 原始邮件 --
> 发件人:"LakeShen" 发送时间:2019年12月18日(星期三) 下午2:10
> 收件人:"user-zh"
> 主题:Re: 关于直接设置Watermark和flatmap后再设置的疑问
>
>
>
> flatmap 逻辑中,你是否对消息记录的时间处理了吗,watermark 的更新的逻辑是比前一次 watermark 的时间截要大同时非空。
>
> 猫猫 <16770...@qq.com 于2019年12月18日周三 上午9:27写道:
>
>  env.addSource(flinkKafkaConsumer).assignTimestampsAndWatermarks(...)
> 
> 
> env.addSource(flinkKafkaConsumer).flatMap(...).assignTimestampsAndWatermarks(...);
> 
>  使用kafka进行消费,直接设置Watermark和经过flatMap()以后再设置,会产生什么样的区别和影响。
>  flatMap可能会将数据处理为1-N条。那么在这种情况下,还能够保证kafka的精确一次吗?



-- 
Regards,
DinoZhang


?????????????????????? ????????????Watermark??flatmap??????????????

2019-12-18 文章 ????
??github
https://github.com/maobuji/Blog/blob/master/image/changeEventTime.jpg




----
??:""<16770...@qq.com;
:2019??12??18??(??) 4:03
??:"user-zh"

?????? ????????????Watermark??flatmap??????????????

2019-12-18 文章 ????

??kafka??event-time??
??kafka


??flatmap??event-time
??kafka

eventTime??kafka??
??
env.addSource(flinkKafkaConsumer).assignTimestampsAndWatermarks(...)
env.addSource(flinkKafkaConsumer).flatMap(...).assignTimestampsAndWatermarks(...);


??eventTime??





----
??:"LakeShen"