Re:关于flink升级

2020-08-13 Thread USERNAME
官网有升级建议

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/ops/upgrading.html#compatibility-table














在 2020-08-14 09:15:53,"引领"  写道:
>我们的flink是在1.7版本的,所以这次想对flink进行升级,但升级建议直接升级flink1.11.1么?有木有大佬在生产环境部署的么?
>
>
>| |
>引领
>|
>|
>yrx73...@163.com
>|
>签名由网易邮箱大师定制
>


Re:Re: FLINK1.11.1 对OGG数据入HIVE的问题咨询

2020-08-13 Thread USERNAME


感谢您的回复,您说的这个方法类似于 “采用通过表结构”如下结构,屏蔽掉 
用table分区,每个表的data部分弱化到一个字段中,使用的时候再通过解析json方式来从 "before"或者"after"中获取对应表的字段及数据,
这种方式确实拓展跟灵活性强很多,牺牲掉部分易用性。
看到很多大公司 美团 字节等 都有基于flink的实时数仓,不知道他们是怎么解决这种大量表入仓的 拓展灵活易用性的
create table TABLENAME
 (
 table   STRING,
 op_type  STRING,
 op_ts  STRING,
 current_ts   STRING,
 pos STRING,
 "before"  STRING,
 "after" STRING

 ) partitioned by (pt_d table)
。





在 2020-08-13 16:35:19,"Rui Li"  写道:
>你提到的这三个难点现在的hive
>connector确实是支持不了的。前两个也许可以通过把写不同的表变成写同一个表的不同分区来解决。第三个可能可以通过检查数据跟目标schema是不是匹配,来判断是不是需要去跟HMS同步新的schema。
>
>On Thu, Aug 13, 2020 at 3:27 PM USERNAME  wrote:
>
>>
>>
>> 任务流程:
>> OGG->KAFKA->FLINK->HIVE
>>
>>
>> KAFKA数据样例:
>> 其中会有多个
>> "table",所以"before","after"中的字段是不一致的,同一个表如果有有DDL变更也会导致"before","after"字段的变更。
>> {
>> "table": "SCOOT.TABLENAME",
>> "op_type": "U",
>> "op_ts": "2020-08-11 07:53:40.008001",
>> "current_ts": "2020-08-11T15:56:41.233000",
>> "pos": "980119769930",
>> "before": {
>> "C1": 4499000,
>> "C2": null,
>> "C3": null,
>> "C4": null,
>> "C5": null
>> },
>> "after": {
>> "C1": 4499000,
>> "C2": null,
>> "C3": "",
>> "C4": "",
>> "C5": "通过"
>> }
>> }
>> 问题:有没有优雅的方式在入到hive中可以跟源库表及结构一致?
>> 看到很多FLINK to HIVE 的案例,很多大公司也都在用实时数仓,不知入hive这部分如果做到灵活,拓展,通用的?
>>
>>
>> 例如 样例数据在hive中建表
>> create table TABLENAME
>> (
>> op_type  STRING,
>> op_ts  STRING,
>> current_ts   STRING,
>> pos STRING,
>> "C1" STRING,
>> "C2" STRING,
>> "C3" STRING,
>> "C4" STRING,
>> "C5" STRING
>> )
>> 理解的难点,
>> 1.同一FLINK任务需要写入多个表,每个表的字段是不一致的
>> 2.同一FLINK任务会有新增的表,需自动适配
>> 3.同一个表结构不是固定的,需要随着源库DDL变更而变更,可能的字段类型长度变更,新增删除字段等
>>
>>
>> 或者只能采用通过表结构
>> create table TABLENAME
>> (
>> table   STRING,
>> op_type  STRING,
>> op_ts  STRING,
>> current_ts   STRING,
>> pos STRING,
>> "before"  STRING,
>> "after" STRING
>> )
>> 然后剩下的在HIVE中解决。
>>
>>
>> 或者有其他更好的方案?
>>
>>
>
>-- 
>Best regards!
>Rui Li


FLINK1.11.1 对OGG数据入HIVE的问题咨询

2020-08-13 Thread USERNAME


任务流程:
OGG->KAFKA->FLINK->HIVE


KAFKA数据样例:
其中会有多个 
"table",所以"before","after"中的字段是不一致的,同一个表如果有有DDL变更也会导致"before","after"字段的变更。
{
"table": "SCOOT.TABLENAME",
"op_type": "U",
"op_ts": "2020-08-11 07:53:40.008001",
"current_ts": "2020-08-11T15:56:41.233000",
"pos": "980119769930",
"before": {
"C1": 4499000,
"C2": null,
"C3": null,
"C4": null,
"C5": null
},
"after": {
"C1": 4499000,
"C2": null,
"C3": "",
"C4": "",
"C5": "通过"
}
}
问题:有没有优雅的方式在入到hive中可以跟源库表及结构一致?
看到很多FLINK to HIVE 的案例,很多大公司也都在用实时数仓,不知入hive这部分如果做到灵活,拓展,通用的?


例如 样例数据在hive中建表
create table TABLENAME
(
op_type  STRING,
op_ts  STRING,
current_ts   STRING,
pos STRING,
"C1" STRING,
"C2" STRING,
"C3" STRING,
"C4" STRING,
"C5" STRING
)
理解的难点,
1.同一FLINK任务需要写入多个表,每个表的字段是不一致的
2.同一FLINK任务会有新增的表,需自动适配
3.同一个表结构不是固定的,需要随着源库DDL变更而变更,可能的字段类型长度变更,新增删除字段等


或者只能采用通过表结构
create table TABLENAME
(
table   STRING,
op_type  STRING,
op_ts  STRING,
current_ts   STRING,
pos STRING,
"before"  STRING,
"after" STRING
)
然后剩下的在HIVE中解决。


或者有其他更好的方案?



Re:Re: CountEvictor 与 TriggerResult.FIRE_AND_PURGE 清理窗口数据有区别吗?

2020-01-21 Thread USERNAME
evict 丢弃掉的数据,在内存或者RocksDB中也会同步删除吗?






在 2020-01-21 17:27:38,"tison"  写道:
>正好看到这一部分,还是有的,你考虑下滑动的计数窗口
>
>[1] 会在 fire 之后把整个 windowState 丢掉,[2] 其实会重新计算 evict 之后的 windowState
>
>Best,
>tison.
>
>
>USERNAME  于2020年1月21日周二 下午5:21写道:
>
>> 大家,新年快乐~
>>
>>
>> [1] TriggerResult.FIRE_AND_PURGE
>>
>> https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java#L74
>> [2] CountEvictor
>>
>> https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377
>>
>>


CountEvictor 与 TriggerResult.FIRE_AND_PURGE 清理窗口数据有区别吗?

2020-01-21 Thread USERNAME
大家,新年快乐~


[1] TriggerResult.FIRE_AND_PURGE
https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java#L74
[2] CountEvictor
https://github.com/apache/flink/blob/1662d5d0cda6a813e5c59014acfd7615b153119f/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377



Re:FLINK 1.9.1 TriggerResult.FIRE_AND_PURGE 与 ctx.registerEventTimeTimer()

2020-01-18 Thread USERNAME


看到源码了,这里如果contents == null,就不触发计算了
evictingWindowState.clear();会导致contents 变成null,然后如果窗口没有数据就不会触发窗口计算




if (triggerResult.isFire()) {
   Iterable> contents = evictingWindowState.get();
if (contents == null) {
// if we have no state, there is nothing to do
continue;
   }
   emitWindowContents(actualWindow, contents, evictingWindowState);
}
-- 这里会吧 evictingWindowState给clear了
if (triggerResult.isPurge()) {
evictingWindowState.clear();
}








在 2020-01-19 13:27:17,"USERNAME"  写道:
>大家新年快乐!
>
>
>版本:FLINK 1.9.1
>
>
>部分代码
>.keyBy("key")
>.window(EventTimeSessionWindows.withGap(Time.hours(1)))
>.trigger(new NewTrigger())
>.process(new NewProcess())
>
>
>--NewTrigger()
>@Override
>public TriggerResult onElement(Bean bean, long l, TimeWindow timeWindow, 
>TriggerContext ctx) throws Exception {
>..
>ctx.registerEventTimeTimer(l + INTERVAL);
>return TriggerResult.FIRE_AND_PURGE;
>}
>
>
>问题:
>通过触发器,定义在窗口内,每隔固定时间计算一次并且清空计算数据,正常如果固定时间内没有数据也要输出一次。
>但是实际用的时候只有窗口有数据的时候才会触发process计算,如果没有数据不会触发,TriggerResult.FIRE_AND_PURGE改为TriggerResult.FIRE之后就可以正常的固定每隔一定时间输出一次。
>TriggerResult.FIRE_AND_PURGE会有哪些操作?好像不止清理了窗口数据,还对触发时间有操作呀?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


FLINK 1.9.1 TriggerResult.FIRE_AND_PURGE 与 ctx.registerEventTimeTimer()

2020-01-18 Thread USERNAME
大家新年快乐!


版本:FLINK 1.9.1


部分代码
.keyBy("key")
.window(EventTimeSessionWindows.withGap(Time.hours(1)))
.trigger(new NewTrigger())
.process(new NewProcess())


--NewTrigger()
@Override
public TriggerResult onElement(Bean bean, long l, TimeWindow timeWindow, 
TriggerContext ctx) throws Exception {
..
ctx.registerEventTimeTimer(l + INTERVAL);
return TriggerResult.FIRE_AND_PURGE;
}


问题:
通过触发器,定义在窗口内,每隔固定时间计算一次并且清空计算数据,正常如果固定时间内没有数据也要输出一次。
但是实际用的时候只有窗口有数据的时候才会触发process计算,如果没有数据不会触发,TriggerResult.FIRE_AND_PURGE改为TriggerResult.FIRE之后就可以正常的固定每隔一定时间输出一次。
TriggerResult.FIRE_AND_PURGE会有哪些操作?好像不止清理了窗口数据,还对触发时间有操作呀?



















Re:Re: FLINK 不同 StateBackend ProcessWindowFunction的差别

2020-01-07 Thread USERNAME
感谢 唐老师 解答!

在 2020-01-07 19:46:06,"Yun Tang"  写道:
>Hi
>
>使用iterator.remove() 去除state中已经计算过的数据不是一个标准做法,标准的做法应该是 clear掉相应的state [1]
>至于为什么使用MemoryStateBackend会去除数据是因为 get 返回的结果是backend中on heap直接存储的对象[2],存在修改的副作用。
>而RocksDB state backend get返回的结果是反序列化的list,而不是RocksDB自身存储的数据 [3],也就不存在修改的副作用了。
>
>
>[1] 
>https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L377
>[2] 
>https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/AbstractHeapAppendingState.java#L57
>[3] 
>https://github.com/apache/flink/blob/b195383b6b792ea1363ae340ffcfb6ef45c84677/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java#L119
>
>祝好
>唐云
>
>
>From: USERNAME 
>Sent: Tuesday, January 7, 2020 17:54
>To: user-zh@flink.apache.org 
>Subject: FLINK 不同 StateBackend ProcessWindowFunction的差别
>
>各位好!
>祝大家新年快乐!
>
>
>
>
>--版本
>FLINK 1.9.1 ON YARN
>
>
>--过程
>1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
>2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
>3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
>--问题
>new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
>使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
>使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
>这种计算场景有更好的计算方法吗?
>
>
>--部分代码
>final StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
>new ProcessWindowFunction{
>public void process(Tuple tuple, Context context, Iterable 
>elements, Collector out) throws Exception {
>for (Iterator iter = elements.iterator(); iter.hasNext(); ) {
>
>iter.remove();
>}
>}
>
>}
>
>
>
>
>
>
>


Re:Re:Re: FLINK 不同 StateBackend ProcessWindowFunction的差别

2020-01-07 Thread USERNAME
TTL 好像不支持 TimeCharacteristic.EventTime 方式



在 2020-01-08 14:17:11,"USERNAME"  写道:
>我这例子需要通过 在触发器中 TriggerResult.FIRE_AND_PURGE 来清理当前计算窗口的数据,实现增量计算,跟TTL有点区别吧。
>
>
>
>
>
>在 2020-01-07 19:51:57,"huoguo"  写道:
>>
>>
>>过期数据能通过TTL 设置过期吗?
>>
>>> 在 2020年1月7日,17:54,USERNAME  写道:
>>> 
>>> 各位好!
>>> 祝大家新年快乐!
>>> 
>>> 
>>> 
>>> 
>>> --版本
>>> FLINK 1.9.1 ON YARN
>>> 
>>> 
>>> --过程
>>> 1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
>>> 2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
>>> 3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
>>> --问题
>>> new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
>>> 使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
>>> 使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
>>> 这种计算场景有更好的计算方法吗?
>>> 
>>> 
>>> --部分代码
>>> final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>> 
>>> 
>>> new ProcessWindowFunction{
>>> public void process(Tuple tuple, Context context, Iterable 
>>> elements, Collector out) throws Exception {
>>> for (Iterator iter = elements.iterator(); iter.hasNext(); ) {
>>> 
>>> iter.remove();
>>> }
>>> }
>>> 
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>


FLINK 不同 StateBackend ProcessWindowFunction的差别

2020-01-07 Thread USERNAME
各位好!
祝大家新年快乐!




--版本
FLINK 1.9.1 ON YARN


--过程
1.定义一个 EventTimeSessionWindows.withGap(Time.hours(1))窗口
2.定义一个new Trigger(),每隔固定间隔计算一次,并且输出
3.定义一个new ProcessWindowFunction(),每隔固定时间计算一次,并且输出,并且不保留已经计算的数据
--问题
new ProcessWindowFunction()中通过iter.remove();来将计算过的数据去掉,
使用MemoryStateBackend可以达到预期的目标,在固定间隔的计算周期内,都不会包含上一次的值。
使用RocksDBStateBackend该iter.remove();步骤会不生效,下次计算还会有历史数据。
这种计算场景有更好的计算方法吗?


--部分代码
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


new ProcessWindowFunction{
public void process(Tuple tuple, Context context, Iterable 
elements, Collector out) throws Exception {
for (Iterator iter = elements.iterator(); iter.hasNext(); ) {

iter.remove();
}
}

}









Re:Re: FLINK 1.9.1 StreamingFileSink 压缩问题

2020-01-02 Thread USERNAME
非常感谢帮助!
祝腊八快乐,祝大家腊八愉快!!

在 2020-01-02 15:00:25,"JingsongLee"  写道:
>Hi,
>
>看起来你只能改下connector代码才能支持压缩了:
>ParquetAvroWriters.createAvroParquetWriter里:设置AvroParquetWriter.Builder的压缩格式。
>
>Best,
>Jingsong Lee
>
>
>--
>From:USERNAME 
>Send Time:2020年1月2日(星期四) 13:36
>To:user-zh 
>Subject:FLINK 1.9.1 StreamingFileSink 压缩问题
>
>各位好,FLINK 1.9.1 使用 StreamingFileSink 写Parquet到HDFS,能启用压缩吗?
>
>--代码
>StreamingFileSink sink = StreamingFileSink
>.forBulkFormat(new Path(FILE_HDFS_PATH), 
> ParquetAvroWriters.forReflectRecord(HDFSBean.class))
>.withBucketAssigner(new DateTimeBucketAssigner<>(FILE_HDFS_FORMAT))
>
>.build();
>


Re:Re: FLINK 1.9 + YARN+ SessionWindows + 大数据量 + 运行一段时间后 OOM

2019-12-18 Thread USERNAME
@tonysong...@gmail.com 感谢回复
看了下参数的含义,
taskmanager.memory.off-heap: 
如果设置为true,TaskManager分配用于排序,hash表和缓存中间结果的内存位于JVM堆外。对于具有较大内存的设置,这可以提高在内存上执行的操作的效率(默认为false)。
JVM堆内使用的内存是受YARN限制的,JVM堆外不受YARN限制,如果这样确实能 说通现在我的问题,
已经修改并且在测试了,非常感谢tonysong...@gmail.com
咱们FLINK有没有一些最佳实践的项目样例,能体现一些细节上的东西,能让大家用的更简单一些,体现FLINK的强大。



在 2019-12-17 18:16:02,"Xintong Song"  写道:
>你这个不是OOM,是 container 内存超用被 yarn 杀掉了。
>JVM 的内存是不可能超用的,否则会报 OOM。所以比较可能是 RocksDB 的内存够用量增加导致了超用。
>
>建议:
>
>1.  增加如下配置
>taskmanager.memory.off-heap: true
>taskmanager.memory.preallocate: false
>
>2. 若果已经采用了如下配置,或者改了配置之后仍存在问题,可以尝试调大下面这个配置,未配置时默认值是0.25
>containerized.heap-cutoff-ratio
>
>Thank you~
>
>Xintong Song
>
>
>
>On Tue, Dec 17, 2019 at 5:49 PM USERNAME  wrote:
>
>> 版本:flink 1.9.1
>> --运行命令
>> flink run -d -m yarn-cluster -yn 40 -ys 2 
>>
>>
>> --部分代码
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> RocksDBStateBackend backend = new RocksDBStateBackend(CHECKPOINT_PATH,
>> true);
>>
>>
>> .keyBy("imei")  //10W+
>> .window(EventTimeSessionWindows.withGap(Time.hours(1))) //设备超过1小时没有点就算离线
>> .trigger(new Trigger())
>> .aggregate(new AggregateFunction(), new ProcessWindowFunction())
>>
>>
>> --数据
>> 总共10W+设备,每个设备每30秒一条数据,一分钟数据量20W左右。
>>
>>
>> --错误现象
>> 运行一段时间(几天)之后,taskmanager就会挂掉。
>>
>>
>> --求教
>> 1. flink 内存不断增加?
>> 数据量是挺大的,并且窗口保留期可能会很长,但是实际数据运算一次就可以不用了,也做了StateTtlConfig 不知道
>> 哪里?什么?导致的内存一直占用,可能用法有问题,希望大神能够指点一下迷津。
>> 2. flink / yarn 参数配置能优化吗?
>> 有flink on yarn 的配置最佳实践吗?
>>
>>
>> 问题困扰很久了 从1.7 - 1.8 - 1.9 ,希望有熟悉内部机制和有过类似问题的大神指点一下。
>>
>>
>>
>>
>> --错误信息 --> nodemanager .log
>>
>>
>> 2019-12-17 16:55:16,545 WARN
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>> Process tree for container: container_e16_1575354121024_0050_01_08 has
>> processes older than 1 iteration running over the configured limit.
>> Limit=3221225472, current usage = 3222126592
>> 2019-12-17 16:55:16,546 WARN
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>> Container
>> [pid=184523,containerID=container_e16_1575354121024_0050_01_08] is
>> running 901120B beyond the 'PHYSICAL' memory limit. Current usage: 3.0 GB
>> of 3 GB physical memory used; 4.9 GB of 30 GB virtual memory used. Killing
>> container.
>> Dump of the process-tree for container_e16_1575354121024_0050_01_08 :
>> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
>> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>> |- 184701 184523 184523 184523 (java) 21977 4845 5166649344 786279
>> /usr/local/jdk1.8.0_171/bin/java -Xms2224m -Xmx2224m
>> -XX:MaxDirectMemorySize=848m -XX:NewRatio=2 -XX:+UseConcMarkSweepGC
>> -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
>> -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError
>> -Dlog.file=/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.log
>> -Dlogback.configurationFile=file:./logback.xml
>> -Dlog4j.configuration=file:./log4j.properties
>> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir .
>> |- 184523 184521 184523 184523 (bash) 2 3 118067200 373 /bin/bash -c
>> /usr/local/jdk1.8.0_171/bin/java -Xms2224m -Xmx2224m
>> -XX:MaxDirectMemorySize=848m -XX:NewRatio=2 -XX:+UseConcMarkSweepGC
>> -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
>> -XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError
>> -Dlog.file=/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.log
>> -Dlogback.configurationFile=file:./logback.xml
>> -Dlog4j.configuration=file:./log4j.properties
>> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1>
>> /opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.out
>> 2>
>> /opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.err
>>
>>
>>
>> 2019-12-17 16:55:16,546 INFO
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
>> Removed ProcessTree with root 184523
>> 2019-12-17 16:55:16,547 INFO
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl:
>> Container container_e16_1575354121024_0050_01_08 transitioned from
>> RUNNING to KILLING
>> 2019-12-17 16:55:16,549 INFO
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
>> Cleaning up container container_e16_1575354121024_0050_01_08
>> 2019-12-17 16:55:16,579 WARN
>> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit
>> code from container container_e16_1575354121024_0050_01_08 is : 143


FLINK 1.9 + YARN+ SessionWindows + 大数据量 + 运行一段时间后 OOM

2019-12-17 Thread USERNAME
版本:flink 1.9.1
--运行命令
flink run -d -m yarn-cluster -yn 40 -ys 2 


--部分代码
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
RocksDBStateBackend backend = new RocksDBStateBackend(CHECKPOINT_PATH, true);


.keyBy("imei")  //10W+
.window(EventTimeSessionWindows.withGap(Time.hours(1))) //设备超过1小时没有点就算离线
.trigger(new Trigger())
.aggregate(new AggregateFunction(), new ProcessWindowFunction())


--数据
总共10W+设备,每个设备每30秒一条数据,一分钟数据量20W左右。


--错误现象
运行一段时间(几天)之后,taskmanager就会挂掉。


--求教
1. flink 内存不断增加?
数据量是挺大的,并且窗口保留期可能会很长,但是实际数据运算一次就可以不用了,也做了StateTtlConfig 不知道 
哪里?什么?导致的内存一直占用,可能用法有问题,希望大神能够指点一下迷津。
2. flink / yarn 参数配置能优化吗?
有flink on yarn 的配置最佳实践吗?


问题困扰很久了 从1.7 - 1.8 - 1.9 ,希望有熟悉内部机制和有过类似问题的大神指点一下。




--错误信息 --> nodemanager .log


2019-12-17 16:55:16,545 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Process tree for container: container_e16_1575354121024_0050_01_08 has 
processes older than 1 iteration running over the configured limit. 
Limit=3221225472, current usage = 3222126592
2019-12-17 16:55:16,546 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Container [pid=184523,containerID=container_e16_1575354121024_0050_01_08] 
is running 901120B beyond the 'PHYSICAL' memory limit. Current usage: 3.0 GB of 
3 GB physical memory used; 4.9 GB of 30 GB virtual memory used. Killing 
container.
Dump of the process-tree for container_e16_1575354121024_0050_01_08 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) 
VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 184701 184523 184523 184523 (java) 21977 4845 5166649344 786279 
/usr/local/jdk1.8.0_171/bin/java -Xms2224m -Xmx2224m 
-XX:MaxDirectMemorySize=848m -XX:NewRatio=2 -XX:+UseConcMarkSweepGC 
-XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly 
-XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError 
-Dlog.file=/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.log
 -Dlogback.configurationFile=file:./logback.xml 
-Dlog4j.configuration=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 
|- 184523 184521 184523 184523 (bash) 2 3 118067200 373 /bin/bash -c 
/usr/local/jdk1.8.0_171/bin/java -Xms2224m -Xmx2224m 
-XX:MaxDirectMemorySize=848m -XX:NewRatio=2 -XX:+UseConcMarkSweepGC 
-XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly 
-XX:+AlwaysPreTouch -server -XX:+HeapDumpOnOutOfMemoryError 
-Dlog.file=/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.log
 -Dlogback.configurationFile=file:./logback.xml 
-Dlog4j.configuration=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> 
/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.out
 2> 
/opt/hadoop/logs/userlogs/application_1575354121024_0050/container_e16_1575354121024_0050_01_08/taskmanager.err
 


2019-12-17 16:55:16,546 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Removed ProcessTree with root 184523
2019-12-17 16:55:16,547 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl:
 Container container_e16_1575354121024_0050_01_08 transitioned from RUNNING 
to KILLING
2019-12-17 16:55:16,549 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
 Cleaning up container container_e16_1575354121024_0050_01_08
2019-12-17 16:55:16,579 WARN 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code 
from container container_e16_1575354121024_0050_01_08 is : 143

Re:Re:FLINK Checkpoint 问题咨询

2019-07-16 Thread USERNAME
1.是采用增量方式 RocksDBStateBackend backend = new 
RocksDBStateBackend("hdfs:/user/flink", true)
2.时间间隔 20 分钟
谢谢!


ID  Status  Acknowledged  Trigger Time  Latest Acknowledgement  
End to End Duration  State Size  Buffered During Alignment  
50459/59  16:51:30  16:51:34  4s  80.1 MB  0 B  

50359/59  16:31:30  16:31:33  3s  80.1 MB  0 B  

50259/59  16:11:30  16:11:34  4s  80.4 MB  0 B  

50159/59  15:51:30  15:52:23  53s  3.56 GB  0 B 
 
50059/59  15:31:51  15:31:56  5s  76.8 MB  0 B  

49959/59  15:14:16  15:14:19  3s  93.8 MB  0 B  

49859/59  14:54:16  14:54:20  3s  93.9 MB  0 B  

49759/59  14:34:16  14:35:04  47s  3.54 GB  0 B 
 
49659/59  14:14:16  14:14:20  3s  92.9 MB  0 B  

49559/59  13:54:16  13:54:19  3s  92.8 MB  0 B  


在 2019-07-16 20:46:02,"唐军亮"  写道:
>确定两个问题:
>1、使用的是rocksdb 增量state?
>2、checkpoint的时间间隔设置的多少?
> 
> 
>-- Original --
>From:  "USERNAME";
>Date:  Tue, Jul 16, 2019 05:36 PM
>To:  "user-zh"; 
>
>Subject:  FLINK Checkpoint 问题咨询
>
> 
>
>先谢谢各位大佬!
>
>
>1.环境
>FLINK 版本 :1.7.2
>运行模式:flink on yarn (yarn single job)
>
>
>2.配置
>状态保存方式:RocksDBStateBackend backend = new 
>RocksDBStateBackend("hdfs:/user/flink", true)
>窗口方式:EventTimeSessionWindows.withGap(Time.hours(1))
>计算方式:.aggregate(new MyAggregate(), new MyProcess())
>
>
>3.数据
>数据为设备信息,正常每30秒一条数据,有10万+设备,每秒的数据1.3K左右
>
>
>4.需求
>监测设备超过1小时没有数据,离线报警
>设备在线期间在一定规则的情况下每隔30秒输出一次计算(设备基本都在线,设备会在线很长时间)
>
>
>5.现象
>该job运行一段时间(15天左右)就会频繁挂掉,并且Checkpoint增长很快(参见样例数据)
>
>
>6.咨询问题
>a. 超长时间窗口会将整个在线窗口的数据都保存吗?时间窗口的状态是保存在哪里?内存还是RocksDB
>b.样例数据中每个3次就会有一次增长很快的备份,这个为什么会这样?
>c.如果时间窗口的数据会一直保留在内存中,是否有办法可以清空数据(超过多长时间的清理或者超过多少条清理)并且保留窗口状态,能够继续进行后续计算?
>问题可能会比较低级,如果理解有误,麻烦能给纠正一下,谢谢!
>
>
>祝大家 头发越来越多,代码BUG越来越少!
>
>
>
>
>--样例数据
>IDStatusAcknowledgedTrigger TimeLatest AcknowledgementEnd to End DurationState 
>SizeBuffered During Alignment
>50459/5916:51:3016:51:344s80.1 MB0 B
>50359/5916:31:3016:31:333s80.1 MB0 B
>50259/5916:11:3016:11:344s80.4 MB0 B
>50159/5915:51:3015:52:2353s3.56 GB0 B
>50059/5915:31:5115:31:565s76.8 MB0 B
>49959/5915:14:1615:14:193s93.8 MB0 B
>49859/5914:54:1614:54:203s93.9 MB0 B
>49759/5914:34:1614:35:0447s3.54 GB0 B
>49659/5914:14:1614:14:203s92.9 MB0 B
>49559/5913:54:1613:54:193s92.8 MB0 B


FLINK Checkpoint 问题咨询

2019-07-16 Thread USERNAME
先谢谢各位大佬!


1.环境
FLINK 版本 :1.7.2
运行模式:flink on yarn (yarn single job)


2.配置
状态保存方式:RocksDBStateBackend backend = new 
RocksDBStateBackend("hdfs:/user/flink", true)
窗口方式:EventTimeSessionWindows.withGap(Time.hours(1))
计算方式:.aggregate(new MyAggregate(), new MyProcess())


3.数据
数据为设备信息,正常每30秒一条数据,有10万+设备,每秒的数据1.3K左右


4.需求
监测设备超过1小时没有数据,离线报警
设备在线期间在一定规则的情况下每隔30秒输出一次计算(设备基本都在线,设备会在线很长时间)


5.现象
该job运行一段时间(15天左右)就会频繁挂掉,并且Checkpoint增长很快(参见样例数据)


6.咨询问题
a. 超长时间窗口会将整个在线窗口的数据都保存吗?时间窗口的状态是保存在哪里?内存还是RocksDB
b.样例数据中每个3次就会有一次增长很快的备份,这个为什么会这样?
c.如果时间窗口的数据会一直保留在内存中,是否有办法可以清空数据(超过多长时间的清理或者超过多少条清理)并且保留窗口状态,能够继续进行后续计算?
问题可能会比较低级,如果理解有误,麻烦能给纠正一下,谢谢!


祝大家 头发越来越多,代码BUG越来越少!




--样例数据
IDStatusAcknowledgedTrigger TimeLatest AcknowledgementEnd to End DurationState 
SizeBuffered During Alignment
50459/5916:51:3016:51:344s80.1 MB0 B
50359/5916:31:3016:31:333s80.1 MB0 B
50259/5916:11:3016:11:344s80.4 MB0 B
50159/5915:51:3015:52:2353s3.56 GB0 B
50059/5915:31:5115:31:565s76.8 MB0 B
49959/5915:14:1615:14:193s93.8 MB0 B
49859/5914:54:1614:54:203s93.9 MB0 B
49759/5914:34:1614:35:0447s3.54 GB0 B
49659/5914:14:1614:14:203s92.9 MB0 B
49559/5913:54:1613:54:193s92.8 MB0 B

Re:Re: Re:Flink1.8+Hadoop3.1.2 编译问题

2019-06-28 Thread USERNAME
您好,


非常感谢 唐老师 您的回复,问题已解决。谢谢!



在 2019-06-28 15:59:48,"Yun Tang"  写道:
>你好
>
>因为从Flink-1.8 开始,flink的默认编译选项里面就不再带上hadoop依赖了。可以参考[1] 
>了解更多信息。实际上从官方的下载链接[2]里面也说明了从Flink-1.8开始shaded-hadoop的相关jar包需要单独下载并放置在lib目录下。
>
>如果需要shaded-hadoop jar包,可以单独去编译好的 flink-shaded-hadoop 子项目目录下找到相关的jar包。
>
>[1] https://issues.apache.org/jira/browse/FLINK-11266
>[2] https://flink.apache.org/downloads.html
>
>祝好
>唐云
>
>
>
>From: USERNAME 
>Sent: Friday, June 28, 2019 15:41
>To: user-zh@flink.apache.org
>Subject: Re:Flink1.8+Hadoop3.1.2 编译问题
>
>修正图片内容
>
>
>
>在 2019-06-28 15:26:57,"USERNAME"  写道:
>
>1.软件版本
>Flink 1.8
>Hadoop 3.1.2
>Apache Maven 3.0.5
>
>
>2.操作方式
>>git clone -b release-1.8.0 https://github.com/apache/flink
>>cd flink
>>mvn clean install -DskipTests -Dhadoop.version=3.1.2
>
>
>3.问题
>编译成功之后 .flink/build-target/lib 目录只有三个文件(↓)
>-rw-r--r-- 1 flink flink 96049496 Jun 28 15:17 flink-dist_2.11-1.8.0.jar
>-rw-rw-r-- 1 flink flink   489884 Jun 19 13:35 log4j-1.2.17.jar
>-rw-rw-r-- 1 flink flink 9931 Jun 19 13:35 slf4j-log4j12-1.7.15.jar
>
>
>正常的Flink1.7.2的编译结果(↓)
>-rw-r--r-- 1 flink flink 93445603 Mar 27 22:46 flink-dist_2.11-1.7.2.jar
>-rw-r--r-- 1 flink flink   141881 Mar 27 22:44 flink-python_2.11-1.7.2.jar
>-rw-r--r-- 1 flink flink 53380671 Mar 27 22:19 
>flink-shaded-hadoop2-uber-1.7.2.jar
>-rw-rw-r-- 1 flink flink   489884 Mar 27 22:16 log4j-1.2.17.jar
>-rw-rw-r-- 1 flink flink 9931 Mar 27 22:16 slf4j-log4j12-1.7.15.jar
>
>
>有没有遇到过此问题的??
>
>
>
>
>
>
>
>
>


Flink1.8+Hadoop3.1.2 编译问题

2019-06-28 Thread USERNAME
1.软件版本
Flink 1.8
Hadoop 3.1.2
Apache Maven 3.0.5


2.操作方式
>git clone -b release-1.8.0 https://github.com/apache/flink
>cd flink
>mvn clean install -DskipTests -Dhadoop.version=3.1.2


3.问题
编译成功之后 .flink/build-target/lib 目录只有三个文件(↓)
正常的Flink1.7.2的编译结果(↓)
有没有遇到过此问题的??