flink sql ddl CREATE TABLE kafka011 sink ????????????exactly-once??

2020-06-30 文章 ????????
 flink sql CREATE TABLE kafka sinkcheckpointsql 
sinkexactly-once?? ??
??
Consistency guarantees: By default, a Kafka sink ingests data with 
at-least-once guarantees into a Kafka topic if the query is executed with 
checkpointing enabled.??  
CREATE TABLE ?? at-least-once

Re: 关于local cluster的问题

2020-06-30 文章 Yang Wang
start-cluster.sh每次就是会启动一个Standalone集群的,由于都是一个flink conf,所以新起的JM
肯定会因为端口冲突起不来,TM会注册在之前已经running的JM上。
如果你只是测试,用完以后,需要stop-cluster.sh停掉

如果是想在一个JVM里面进行测试,那可以用MiniCluster,所有的组件都会以线程模式启动


Best,
Yang

naisili Yuan  于2020年6月30日周二 下午7:09写道:

> 不好意思没说清楚,跟提交任务没关系,只是执行start-cluster.sh后taskmanager就自动加一
>
> 发自我的iPhone
>
> > 在 2020年6月30日,18:54,"17610775...@163.com" <17610775...@163.com> 写道:
> >
> > hi
> > 你这个问题没有描述清楚啊 是提交一个任务jm就会自动启动一个?
> >
> >
> >
> > Best
> > JasonLee
> >
> > 发件人: naisili Yuan
> > 发送时间: 2020-06-30 18:29
> > 收件人: user-zh
> > 主题: 关于local cluster的问题
> > Hi all
> > 我这边有写一个java服务去自动拉起本地flink
> cluster(单机模式)用来调试使用。我是直接调用的bin/start-cluster.sh脚本。
> >
> 现在问题是每次重新发布服务之后,发现这个启动的会话jobmanager会自动增加一个,导致slots总数越来越高。研究半天始终没找到原因,希望获得帮助!
> > flink版本1.10.0
>


?????? flink sql if ????????????

2020-06-30 文章 kcz
tks 




--  --
??: "Benchao Li"

flink1.10 用flinksql 写hbase,报错:UpsertStreamTableSink requires that Table has a full primary keys if it is updated.

2020-06-30 文章 tiantingting5...@163.com
你好,
flink1.10,用flinkSQL写hbase,报错:UpsertStreamTableSink requires that Table has a 
full primary keys if it is updated.
看到网上的资料说是,upsertSink的primary 
key是通过query来推断的,而我的query无法推断出PK,所以报错。说是需要1.10的临时解决方法是加一层group by,使得query可以推断出 
primary key。
但是,我添加group by以后还是报错,这个问题该怎么解决呢??到底query是如何推断PK的??

以下是我的sql语句:
创建表的语句(字段有点多,还请见谅)
CREATE TABLE `AssetInfoRiskResultSinkTable` (
  rowkey string,
  d ROW(`id` bigint,
  `user_id` string,
  `income_no` string ,
  `nation` string,
  `card_auth_expiry_time` string ,
  `user_name` string ,
  `login_phone` string ,
  `card_no` string ,
  `address` string ,
  `highest_eduction` string ,
  `is_married` string ,
  `resident_address` string ,
  `resident_province` string ,
  `resident_city` string ,
  `resident_town` string ,
  `profession` string ,
  `job_salary` string ,
  `company_name` string ,
  `company_province` string ,
  `company_city` string ,
  `company_town` string ,
  `company_address` string ,
  `family_name` string ,
  `family_phone` string ,
  `workmate_name` string ,
  `workmate_phone` string ,
  `bank_card_no` string ,
  `bank_phone` string ,
  `device_address` string ,
  `device_ip` string ,
  `contacts` string ,
  `create_user` string ,
  `create_time` string ,
  `update_user` string ,
  `update_time` string,
  `scene_id` string  ,
  `access_type` string  ,
  `status` string  ,
  `label_id` string  ,
  `label_name` string  ,
  `ocr_real_name` string ,
  `ocr_id_card` string ,
  `ocr_id_card_address` string,
  `longitude` string ,
  `latitude` string ,
  `imei` string ,
  `imsi` string ,
  `mac` string ,
  `resident_province2` string ,
  `loan_use` string ,
  `channel_code` string  ,
  `channel_name` string  ,
  `credit_card_number` string  ,
  `product_type_code` string  ,
  `product_type_name` string  ,
  `apply_amount` string  ,
  `income_time` string  ,
  `credit_card_phone` string  ,
  `credit_card_amount` string  ,
  `period` string  ,
  `start_work_time` string  ,
  `register_channel` string  ,
  `register_channel_name` string  ,
  `bank_name` string  ,
  `company_call` string  ,
  `system_tag` string  ,
  `is_trans` string  ,
  `is_dial_confirm` string  ,
  `is_dial_type` string  ,
  `is_dial_typeM` string  ,
  `is_dial_typeHuman` string  ,
  `user_risk_score` string  ,
  `upload_imgs` string ,
  `upload_status` string ,
  `famliy_relationship` string  ,
  `work_relationship` string  ,
  `request_time` string  ,
  `ac_record` string ,
  `profession_station` string  ,
  `mobile_os` string  ,
  `mobile_type` string  ,
  `mobile_brand` string  ,
  `networktype` string  ,
  `jail_break` string  ,
  `open_udid` string  ,
  `simulator` string  ,
  `idfa` string  ,
  `idfv` string  ,
  `device_type` string  ,
  `credit_card_id_card_no` string  ,
  `credit_card_username` string  ,
  `sign_issue_org` string  ,
  `user_level` string  ,
  `channel_request_time` string  ,
  `real_income_no` string ,
  `org_channel_code` string  ,
  `qq` string  ,
  `mail` string  ,
  `pre_grant_credit_amount` string  ,
  `pre_grant_credit_term` string  ,
  `pre_grant_credit_term_unit` string  ,
  `monthly_repay_amount` string  ,
  `total_repay_amount` string  ,
  `xhd_white_list_flag` string  ,
  `white_list_flag` string  ,
  `white_list_level` string ,
  `white_list_type` string ,
  `bus_type` string  ,
  `housing_fund_status` string  ,
  `operator_auth_status` string ,
  `credit_card_status` string  ,
  `pboc_credit_status` string ,
  `bh_url_flag` string,
  `zmxy_auth_expiry_time` string  ,
  `operator_auth_expiry_time` string  ,
  `housing_fund_status_time` string  ,
  `credit_card_expiry_time` string  ,
  `pboc_credit_status_time` string  ,
  `credit_card_bank_name` string  ,
  `due_limit_unit` string  ,
  `due_limit` string  ,
  `loan_amount` string  ,
  `risk_lead_flag` string ,
  `period_unit` string  ,
  `face_score` string  ,
  `notify_url` string  ,
  `org_id` string  ,
  `contract_id` string  ,
  `birthday` string  ,
  `zmxy_status` string  ,
  `is_root` string  ,
  `is_virtualmachine` string  ,
  `appnum` string  ,
  `wifi_ip` string ,
  `blue_mac` string  ,
  `wifi_mac` string ,
  `vpn_ip` string  ,
  `cell_ip` string ,
  `true_ip` string  ,
  `is_helical_accelerator` string  ,
  `bussiness` string  ,
  `manual_check` string  ,
  `has_contacts` string  ,
  `length_of_residence_year` string  ,
  `length_of_residence_month` string  ,
  `gps_province` string  ,
  `gps_city` string  ,
  `gps_area` string  ,
  `gps_detail_address` string  ,
  `positional_titles` string  ,
  `work_years` string  ,
  `work_months` string  ,
  `max_acceptable_monthly_payment` string  ,
  `profession_code` string  ,
  `occupation` string  ,
  `career_status` string  ,
  `work_position` string  ,
  `work_time` string  ,
  `end_result` string  ) 
  ) with (
   'connector.type' = 'hbase',
   'connector.version' = '1.4.3', 
   'connector.table-name' = 'rtest:borrower_related_asset_info_real_time', 
   'connector.zookeeper.quorum' = 
'fdw6.fengjr.inc,fdw4.fengjr.inc,fdw5.fengjr.inc,fjr-yz-204-11,fj

Re: 关于flink sql问题

2020-06-30 文章 Benchao Li
我理解你只需要把这同一个Mysql表再做一个维表即可。可以写两次DDL,一个给维表用,一个给sink用。
如果你就觉得它是实时变化的,你可以把维表的cache关掉,保证每次都是获取Mysql中最新的数据就可以了吧?

当然了,在DDL的时候并没有区分这个表是维表还是sink表,具体它是什么类型,只是根据你在SQL里面怎么使用来决定的。
理论上来讲,你一个DDL可以同时做维表也可以做Sink。(只是它们可能有些配置会不同,分开写两个DDL应该是更清晰一些)

zya  于2020年6月30日周二 下午11:26写道:

> 请问下,sink写出的表能做维表吗,因为sink会一直写入,做维表的话会一直动态变化
>
>
>
>
>
>  
>
>
>
>
> -- 原始邮件 --
> 发件人: "Benchao Li" 发送时间: 2020年6月30日(星期二) 晚上11:14
> 收件人: "user-zh"
> 主题: Re: 关于flink sql问题
>
>
>
> 应该做一个维表Join就可以了。
>
>
> zya 
> > Hi 各位,有个问题想请教一下:
> >     目前我有一个功能想使用flink sql来完成,source是kafka,sink是mysql,
> >
> >
>     在写入mysql的时候,我希望能先根据key获取mysql中的数据进行判断,然后决定如何写入数据,请问flink1.10目前能实现这种功能吗?
>
>
>
> --
>
> Best,
> Benchao Li



-- 

Best,
Benchao Li


Re: flink sql if 函数使用问题

2020-06-30 文章 Benchao Li
看报错,应该是你的IF的后面两个参数的类型不同吧。这里应该让后面两个参数的类型也相同的,要不然IF函数的返回值类型就不好确定了。

kcz <573693...@qq.com> 于2020年7月1日周三 上午11:03写道:

> flink-1.10.1 blink_planner
> if使用时候限制了返回的数据类型吗?
> Cannot apply 'IF' to arguments of type 'IF(  'IF( 我想创建DDL时候,因为字段可能有空,所以如果为空了我想设置一个默认值,但是报错提示是只支持返回数据类型。



-- 

Best,
Benchao Li


??????????????????restart????????OOM

2020-06-30 文章 SmileSmile
oommetaspace ??os kill??




| |
a511955993
|
|
??a511955...@163.com
|

??  

??2020??07??01?? 11:32??kcz ??
1.10.0??1.11.0classloader??
OK??OOMmetaspaceOOM??




--  --
??: ""https://issues.apache.org/jira/browse/FLINK-11205 

SmileSmile 

?????? ????????????restart????????OOM

2020-06-30 文章 kcz
1.10.0??1.11.0classloader??
OK??OOMmetaspaceOOM??




--  --
??: ""https://issues.apache.org/jira/browse/FLINK-11205 

SmileSmile 

Re: 作业因为异常restart后,频繁OOM

2020-06-30 文章 徐骁
很早以前遇到这个问题, standalone 模式下 metaspace 释放不掉, 感觉是一个比较严重的 bug
https://issues.apache.org/jira/browse/FLINK-11205 这边有过讨论

SmileSmile  于2020年6月30日周二 下午11:45写道:

> 作业如果正常运行,堆外内存是足够的。在restart后才会出现频繁重启的情况,重构集群才能恢复正常
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年06月30日 23:39,LakeShen 写道:
> 我在较低版本,Flink on k8s ,也遇到 OOM 被 kill 了。
>
> 我感觉可能是 TaskManager 堆外内存不足了,我目前是 Flink 1.6 版本,Flink on k8s , standalone per
> job 模式,堆外内存默认没有限制~。
>
> 我的解决方法增加了一个参数:taskmanager.memory.off-heap: true.
>
> 目前来看,OOM被 kill 掉的问题没有在出现了。希望能帮到你。
>
> Best,
> LakeShen
>
> SmileSmile  于2020年6月30日周二 下午11:19写道:
>
> >
> > 补充一下,内核版本为 3.10.x,是否会是堆外内存cache没被回收而导致的内存超用?
> >
> >
> > | |
> > a511955993
> > |
> > |
> > 邮箱:a511955...@163.com
> > |
> >
> > 签名由 网易邮箱大师 定制
> >
> > 在2020年06月30日 23:00,GuoSmileSmil 写道:
> > hi all,
> >
> >
> >
> >
> 我使用的Flink版本为1.10.1,使用的backend是rocksdb,没有开启checkpoint,运行在kubernetes平台上,模式是standalone。
> >
> >
> >
> 目前遇到的问题是作业如果因为网络抖动或者硬件故障导致的pod被失联而fail,在pod重生后,作业自动restart,作业运行一段时间(半小时到1小时不等)很容易出现其他pod因为oom被os
> > kill的现象,然后反复循环,pod 被kill越来越频繁。目前的解决方法是手动销毁这个集群,重新构建一个集群后重启作业,就恢复正常。
> >
> >
> > 如果单纯heap的状态后台,作业restart不会出现这样的问题。
> >
> >
> > 有一些不成熟的猜测,作业在fail后,native memory没有释放干净,pod的limit假设为10G,那么job
> > restart后只有8G,TM还是按照10G的标准运行,pod使用的内存就会超过10G而被os kill(纯属猜测)。
> >
> >
> > 请问大家是否有什么好的提议或者解决方法?
> >
> >
> > 其中一次系统内核日志如下:
> >
> >
> > Jun 30 21:59:15 flink-tm-1 kernel: memory: usage 28672000kB, limit
> > 28672000kB, failcnt 11225
> > Jun 30 21:59:15 flink-tm-1 kernel: memory+swap: usage 28672000kB, limit
> > 9007199254740988kB, failcnt 0
> > Jun 30 21:59:15 flink-tm-1 kernel: kmem: usage 0kB, limit
> > 9007199254740988kB, failcnt 0
> > Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for
> > /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice:
> > cache:0KB rss:0KB rss_huge:0KB mapped_file:0KB swap:0K
> > B inactive_anon:0KB active_anon:0KB inactive_file:0KB active_file:0KB
> > unevictable:0KB
> > Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for
> >
> /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-fe101418a3b2a7c534e89b4ac73d29b04070eb923220a5b1
> > 7338850bbdb3817a.scope: cache:0KB rss:44KB rss_huge:0KB mapped_file:0KB
> > swap:0KB inactive_anon:0KB active_anon:44KB inactive_file:0KB
> > active_file:0KB unevictable:0KB
> > Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for
> >
> /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-a2295e812a828738810a8f1ae69cd48e99ef98b9e1038158a6e33f81524cc02a.scope:
> > cache:180KB rss:28671776KB rss_huge:26437632KB mapped_file:144KB swap:0KB
> > inactive_anon:0KB active_anon:28671760KB inactive_file:4KB
> active_file:4KB
> > unevictable:0KB
> > Jun 30 21:59:15 flink-tm-1 kernel: [ pid ]   uid  tgid total_vm  rss
> > nr_ptes swapents oom_score_adj name
> > Jun 30 21:59:15 flink-tm-1 kernel: [16875] 0 16875  2531
> >  40  -998 pause
> > Jun 30 21:59:15 flink-tm-1 kernel: [17274] 0 17274 1369  421
> >  70  -998 bash
> > Jun 30 21:59:15 flink-tm-1 kernel: [18089] 0 18089 10824832  7174316
> >  145000  -998 java
> > Jun 30 21:59:15 flink-tm-1 kernel: [18348] 0 18348 1017  196
> >  60  -998 tail
> > Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup out of memory: Kill
> > process 26824 (Window(Tumbling) score 4 or sacrifice child
> > Jun 30 21:59:15 flink-tm-1 kernel: Killed process 18089 (java)
> > total-vm:43299328kB, anon-rss:28669084kB, file-rss:28180kB, shmem-rss:0kB
> >
> >
> >
> >
> >
> >
> > Looking forward to your reply and help.
> >
> > Best
>


flink sql if ????????????

2020-06-30 文章 kcz
flink-1.10.1 blink_planner
if 
Cannot apply 'IF' to arguments of type 'IF(

回复:【Flink的transformations】

2020-06-30 文章 17626017841
hi,
除了source、sink、union之类有特有的Transformation,大部分算子都属于OneInputTransformation
 原始邮件 
发件人: 忝忝向仧<153488...@qq.com>
收件人: user-zh
发送时间: 2020年6月29日(周一) 22:29
主题: 【Flink的transformations】


Hi,all: 
请教下,Flink的应用程序首先都会转为逻辑映射也就是transformations,我看org.apache.flink.streaming.api.transformations包下面目前有17种Transformation类(SourceTransformation,SplitTransformation,TwoInputTransformation等),有没有一个映射关系列表,也就是说应用程序里面哪些算子或者操作(比如map,flatmap,filter,connect,select等)会对应到哪一个Transformation类.
 谢谢.

Flink Training - why cannot keyBy hour?

2020-06-30 文章 Eleanore Jin
Hi experts,

I am going through Ververica flink training, and when doing the lab with
window (https://training.ververica.com/exercises/windows), basically it
requires to compute within an hour which driver earns the most tip.

The logic is to
0. keyBy driverId
1. create 1 hour window based on eventTime
2. sum up all the tips for this driver within this 1 hour window
3. create an 1 hour globalWindow for all drivers
4. find the max tips

sample code shown as below.

SingleOutputStreamOperator>
aggregatedTipsPerDriver = fares.keyBy(rides -> rides.driverId)
 .window(TumblingEventTimeWindows.of(Time.hours(1)))
 .process(new SumTipsFunction());

// Tuple3: reporting the timestamp for the end of the hour, the
driverId, and the total of that driver's tips for that hour
SingleOutputStreamOperator> hourlyMax =
 aggregatedTipsPerDriver.windowAll(TumblingEventTimeWindows.of(Time.hours(1)))
 .maxBy(2);


The question is shown as 4th slide: why we cannot keyed by the hour?

If I change the implementation to keyBy hour and run the HourlyTipsTest,

the test of testMaxAcrossDrivers will fail:

// (94668840,1,6.0) -> for timestamp window: 94668840,
driverId: 1, earns most tip: 6.0

Expected :[(94668840,1,6.0), (94669200,2,20.0)]
Actual   :[(94668840,1,6.0), (94669200,2,20.0), (94669200,2,20.0)]


[image: image.png]

Thanks a lot!
Eleanore


回复:作业因为异常restart后,频繁OOM

2020-06-30 文章 SmileSmile
作业如果正常运行,堆外内存是足够的。在restart后才会出现频繁重启的情况,重构集群才能恢复正常


| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年06月30日 23:39,LakeShen 写道:
我在较低版本,Flink on k8s ,也遇到 OOM 被 kill 了。

我感觉可能是 TaskManager 堆外内存不足了,我目前是 Flink 1.6 版本,Flink on k8s , standalone per
job 模式,堆外内存默认没有限制~。

我的解决方法增加了一个参数:taskmanager.memory.off-heap: true.

目前来看,OOM被 kill 掉的问题没有在出现了。希望能帮到你。

Best,
LakeShen

SmileSmile  于2020年6月30日周二 下午11:19写道:

>
> 补充一下,内核版本为 3.10.x,是否会是堆外内存cache没被回收而导致的内存超用?
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年06月30日 23:00,GuoSmileSmil 写道:
> hi all,
>
>
>
> 我使用的Flink版本为1.10.1,使用的backend是rocksdb,没有开启checkpoint,运行在kubernetes平台上,模式是standalone。
>
>
> 目前遇到的问题是作业如果因为网络抖动或者硬件故障导致的pod被失联而fail,在pod重生后,作业自动restart,作业运行一段时间(半小时到1小时不等)很容易出现其他pod因为oom被os
> kill的现象,然后反复循环,pod 被kill越来越频繁。目前的解决方法是手动销毁这个集群,重新构建一个集群后重启作业,就恢复正常。
>
>
> 如果单纯heap的状态后台,作业restart不会出现这样的问题。
>
>
> 有一些不成熟的猜测,作业在fail后,native memory没有释放干净,pod的limit假设为10G,那么job
> restart后只有8G,TM还是按照10G的标准运行,pod使用的内存就会超过10G而被os kill(纯属猜测)。
>
>
> 请问大家是否有什么好的提议或者解决方法?
>
>
> 其中一次系统内核日志如下:
>
>
> Jun 30 21:59:15 flink-tm-1 kernel: memory: usage 28672000kB, limit
> 28672000kB, failcnt 11225
> Jun 30 21:59:15 flink-tm-1 kernel: memory+swap: usage 28672000kB, limit
> 9007199254740988kB, failcnt 0
> Jun 30 21:59:15 flink-tm-1 kernel: kmem: usage 0kB, limit
> 9007199254740988kB, failcnt 0
> Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for
> /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice:
> cache:0KB rss:0KB rss_huge:0KB mapped_file:0KB swap:0K
> B inactive_anon:0KB active_anon:0KB inactive_file:0KB active_file:0KB
> unevictable:0KB
> Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for
> /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-fe101418a3b2a7c534e89b4ac73d29b04070eb923220a5b1
> 7338850bbdb3817a.scope: cache:0KB rss:44KB rss_huge:0KB mapped_file:0KB
> swap:0KB inactive_anon:0KB active_anon:44KB inactive_file:0KB
> active_file:0KB unevictable:0KB
> Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for
> /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-a2295e812a828738810a8f1ae69cd48e99ef98b9e1038158a6e33f81524cc02a.scope:
> cache:180KB rss:28671776KB rss_huge:26437632KB mapped_file:144KB swap:0KB
> inactive_anon:0KB active_anon:28671760KB inactive_file:4KB active_file:4KB
> unevictable:0KB
> Jun 30 21:59:15 flink-tm-1 kernel: [ pid ]   uid  tgid total_vm  rss
> nr_ptes swapents oom_score_adj name
> Jun 30 21:59:15 flink-tm-1 kernel: [16875] 0 16875  2531
>  40  -998 pause
> Jun 30 21:59:15 flink-tm-1 kernel: [17274] 0 17274 1369  421
>  70  -998 bash
> Jun 30 21:59:15 flink-tm-1 kernel: [18089] 0 18089 10824832  7174316
>  145000  -998 java
> Jun 30 21:59:15 flink-tm-1 kernel: [18348] 0 18348 1017  196
>  60  -998 tail
> Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup out of memory: Kill
> process 26824 (Window(Tumbling) score 4 or sacrifice child
> Jun 30 21:59:15 flink-tm-1 kernel: Killed process 18089 (java)
> total-vm:43299328kB, anon-rss:28669084kB, file-rss:28180kB, shmem-rss:0kB
>
>
>
>
>
>
> Looking forward to your reply and help.
>
> Best


Re: 作业因为异常restart后,频繁OOM

2020-06-30 文章 LakeShen
我在较低版本,Flink on k8s ,也遇到 OOM 被 kill 了。

我感觉可能是 TaskManager 堆外内存不足了,我目前是 Flink 1.6 版本,Flink on k8s , standalone per
job 模式,堆外内存默认没有限制~。

我的解决方法增加了一个参数:taskmanager.memory.off-heap: true.

目前来看,OOM被 kill 掉的问题没有在出现了。希望能帮到你。

Best,
LakeShen

SmileSmile  于2020年6月30日周二 下午11:19写道:

>
> 补充一下,内核版本为 3.10.x,是否会是堆外内存cache没被回收而导致的内存超用?
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年06月30日 23:00,GuoSmileSmil 写道:
> hi all,
>
>
>
> 我使用的Flink版本为1.10.1,使用的backend是rocksdb,没有开启checkpoint,运行在kubernetes平台上,模式是standalone。
>
>
> 目前遇到的问题是作业如果因为网络抖动或者硬件故障导致的pod被失联而fail,在pod重生后,作业自动restart,作业运行一段时间(半小时到1小时不等)很容易出现其他pod因为oom被os
> kill的现象,然后反复循环,pod 被kill越来越频繁。目前的解决方法是手动销毁这个集群,重新构建一个集群后重启作业,就恢复正常。
>
>
> 如果单纯heap的状态后台,作业restart不会出现这样的问题。
>
>
> 有一些不成熟的猜测,作业在fail后,native memory没有释放干净,pod的limit假设为10G,那么job
> restart后只有8G,TM还是按照10G的标准运行,pod使用的内存就会超过10G而被os kill(纯属猜测)。
>
>
> 请问大家是否有什么好的提议或者解决方法?
>
>
> 其中一次系统内核日志如下:
>
>
> Jun 30 21:59:15 flink-tm-1 kernel: memory: usage 28672000kB, limit
> 28672000kB, failcnt 11225
> Jun 30 21:59:15 flink-tm-1 kernel: memory+swap: usage 28672000kB, limit
> 9007199254740988kB, failcnt 0
> Jun 30 21:59:15 flink-tm-1 kernel: kmem: usage 0kB, limit
> 9007199254740988kB, failcnt 0
> Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for
> /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice:
> cache:0KB rss:0KB rss_huge:0KB mapped_file:0KB swap:0K
> B inactive_anon:0KB active_anon:0KB inactive_file:0KB active_file:0KB
> unevictable:0KB
> Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for
> /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-fe101418a3b2a7c534e89b4ac73d29b04070eb923220a5b1
> 7338850bbdb3817a.scope: cache:0KB rss:44KB rss_huge:0KB mapped_file:0KB
> swap:0KB inactive_anon:0KB active_anon:44KB inactive_file:0KB
> active_file:0KB unevictable:0KB
> Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for
> /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-a2295e812a828738810a8f1ae69cd48e99ef98b9e1038158a6e33f81524cc02a.scope:
> cache:180KB rss:28671776KB rss_huge:26437632KB mapped_file:144KB swap:0KB
> inactive_anon:0KB active_anon:28671760KB inactive_file:4KB active_file:4KB
> unevictable:0KB
> Jun 30 21:59:15 flink-tm-1 kernel: [ pid ]   uid  tgid total_vm  rss
> nr_ptes swapents oom_score_adj name
> Jun 30 21:59:15 flink-tm-1 kernel: [16875] 0 16875  2531
>  40  -998 pause
> Jun 30 21:59:15 flink-tm-1 kernel: [17274] 0 17274 1369  421
>  70  -998 bash
> Jun 30 21:59:15 flink-tm-1 kernel: [18089] 0 18089 10824832  7174316
>  145000  -998 java
> Jun 30 21:59:15 flink-tm-1 kernel: [18348] 0 18348 1017  196
>  60  -998 tail
> Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup out of memory: Kill
> process 26824 (Window(Tumbling) score 4 or sacrifice child
> Jun 30 21:59:15 flink-tm-1 kernel: Killed process 18089 (java)
> total-vm:43299328kB, anon-rss:28669084kB, file-rss:28180kB, shmem-rss:0kB
>
>
>
>
>
>
> Looking forward to your reply and help.
>
> Best


?????? ????flink sql????

2020-06-30 文章 zya
sinksink





 




--  --
??: "Benchao Li"

回复:作业因为异常restart后,频繁OOM

2020-06-30 文章 SmileSmile

补充一下,内核版本为 3.10.x,是否会是堆外内存cache没被回收而导致的内存超用?


| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年06月30日 23:00,GuoSmileSmil 写道:
hi all,


我使用的Flink版本为1.10.1,使用的backend是rocksdb,没有开启checkpoint,运行在kubernetes平台上,模式是standalone。


目前遇到的问题是作业如果因为网络抖动或者硬件故障导致的pod被失联而fail,在pod重生后,作业自动restart,作业运行一段时间(半小时到1小时不等)很容易出现其他pod因为oom被os
 kill的现象,然后反复循环,pod 被kill越来越频繁。目前的解决方法是手动销毁这个集群,重新构建一个集群后重启作业,就恢复正常。


如果单纯heap的状态后台,作业restart不会出现这样的问题。


有一些不成熟的猜测,作业在fail后,native memory没有释放干净,pod的limit假设为10G,那么job 
restart后只有8G,TM还是按照10G的标准运行,pod使用的内存就会超过10G而被os kill(纯属猜测)。


请问大家是否有什么好的提议或者解决方法?


其中一次系统内核日志如下:


Jun 30 21:59:15 flink-tm-1 kernel: memory: usage 28672000kB, limit 28672000kB, 
failcnt 11225
Jun 30 21:59:15 flink-tm-1 kernel: memory+swap: usage 28672000kB, limit 
9007199254740988kB, failcnt 0
Jun 30 21:59:15 flink-tm-1 kernel: kmem: usage 0kB, limit 9007199254740988kB, 
failcnt 0
Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for 
/kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice: 
cache:0KB rss:0KB rss_huge:0KB mapped_file:0KB swap:0K
B inactive_anon:0KB active_anon:0KB inactive_file:0KB active_file:0KB 
unevictable:0KB
Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for 
/kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-fe101418a3b2a7c534e89b4ac73d29b04070eb923220a5b1
7338850bbdb3817a.scope: cache:0KB rss:44KB rss_huge:0KB mapped_file:0KB 
swap:0KB inactive_anon:0KB active_anon:44KB inactive_file:0KB active_file:0KB 
unevictable:0KB
Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for 
/kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-a2295e812a828738810a8f1ae69cd48e99ef98b9e1038158a6e33f81524cc02a.scope:
 cache:180KB rss:28671776KB rss_huge:26437632KB mapped_file:144KB swap:0KB 
inactive_anon:0KB active_anon:28671760KB inactive_file:4KB active_file:4KB 
unevictable:0KB
Jun 30 21:59:15 flink-tm-1 kernel: [ pid ]   uid  tgid total_vm  rss 
nr_ptes swapents oom_score_adj name
Jun 30 21:59:15 flink-tm-1 kernel: [16875] 0 16875  2531   
40  -998 pause
Jun 30 21:59:15 flink-tm-1 kernel: [17274] 0 17274 1369  421   
70  -998 bash
Jun 30 21:59:15 flink-tm-1 kernel: [18089] 0 18089 10824832  7174316   
145000  -998 java
Jun 30 21:59:15 flink-tm-1 kernel: [18348] 0 18348 1017  196   
60  -998 tail
Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup out of memory: Kill process 
26824 (Window(Tumbling) score 4 or sacrifice child
Jun 30 21:59:15 flink-tm-1 kernel: Killed process 18089 (java) 
total-vm:43299328kB, anon-rss:28669084kB, file-rss:28180kB, shmem-rss:0kB






Looking forward to your reply and help.

Best

Re: 关于flink sql问题

2020-06-30 文章 Benchao Li
应该做一个维表Join就可以了。


zya  于2020年6月30日周二 下午9:02写道:

> Hi 各位,有个问题想请教一下:
>     目前我有一个功能想使用flink sql来完成,source是kafka,sink是mysql,
>
> 在写入mysql的时候,我希望能先根据key获取mysql中的数据进行判断,然后决定如何写入数据,请问flink1.10目前能实现这种功能吗?



-- 

Best,
Benchao Li


作业因为异常restart后,频繁OOM

2020-06-30 文章 GuoSmileSmil
hi all,


我使用的Flink版本为1.10.1,使用的backend是rocksdb,没有开启checkpoint,运行在kubernetes平台上,模式是standalone。


目前遇到的问题是作业如果因为网络抖动或者硬件故障导致的pod被失联而fail,在pod重生后,作业自动restart,作业运行一段时间(半小时到1小时不等)很容易出现其他pod因为oom被os
 kill的现象,然后反复循环,pod 被kill越来越频繁。目前的解决方法是手动销毁这个集群,重新构建一个集群后重启作业,就恢复正常。


 如果单纯heap的状态后台,作业restart不会出现这样的问题。


有一些不成熟的猜测,作业在fail后,native memory没有释放干净,pod的limit假设为10G,那么job 
restart后只有8G,TM还是按照10G的标准运行,pod使用的内存就会超过10G而被os kill(纯属猜测)。


请问大家是否有什么好的提议或者解决方法?


其中一次系统内核日志如下:


Jun 30 21:59:15 flink-tm-1 kernel: memory: usage 28672000kB, limit 28672000kB, 
failcnt 11225
Jun 30 21:59:15 flink-tm-1 kernel: memory+swap: usage 28672000kB, limit 
9007199254740988kB, failcnt 0
Jun 30 21:59:15 flink-tm-1 kernel: kmem: usage 0kB, limit 9007199254740988kB, 
failcnt 0
Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for 
/kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice: 
cache:0KB rss:0KB rss_huge:0KB mapped_file:0KB swap:0K
B inactive_anon:0KB active_anon:0KB inactive_file:0KB active_file:0KB 
unevictable:0KB
Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for 
/kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-fe101418a3b2a7c534e89b4ac73d29b04070eb923220a5b1
7338850bbdb3817a.scope: cache:0KB rss:44KB rss_huge:0KB mapped_file:0KB 
swap:0KB inactive_anon:0KB active_anon:44KB inactive_file:0KB active_file:0KB 
unevictable:0KB
Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for 
/kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-a2295e812a828738810a8f1ae69cd48e99ef98b9e1038158a6e33f81524cc02a.scope:
 cache:180KB rss:28671776KB rss_huge:26437632KB mapped_file:144KB swap:0KB 
inactive_anon:0KB active_anon:28671760KB inactive_file:4KB active_file:4KB 
unevictable:0KB
Jun 30 21:59:15 flink-tm-1 kernel: [ pid ]   uid  tgid total_vm  rss 
nr_ptes swapents oom_score_adj name
Jun 30 21:59:15 flink-tm-1 kernel: [16875] 0 16875  2531   
40  -998 pause
Jun 30 21:59:15 flink-tm-1 kernel: [17274] 0 17274 1369  421   
70  -998 bash
Jun 30 21:59:15 flink-tm-1 kernel: [18089] 0 18089 10824832  7174316   
145000  -998 java
Jun 30 21:59:15 flink-tm-1 kernel: [18348] 0 18348 1017  196   
60  -998 tail
Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup out of memory: Kill process 
26824 (Window(Tumbling) score 4 or sacrifice child
Jun 30 21:59:15 flink-tm-1 kernel: Killed process 18089 (java) 
total-vm:43299328kB, anon-rss:28669084kB, file-rss:28180kB, shmem-rss:0kB






Looking forward to your reply and help.

Best

????flink sql????

2020-06-30 文章 zya
Hi ??
    ??flink 
sqlsource??kafka??sink??mysql??
??mysql??keymysqlflink1.10??

Re: 关于local cluster的问题

2020-06-30 文章 naisili Yuan
不好意思没说清楚,跟提交任务没关系,只是执行start-cluster.sh后taskmanager就自动加一

发自我的iPhone

> 在 2020年6月30日,18:54,"17610775...@163.com" <17610775...@163.com> 写道:
> 
> hi 
> 你这个问题没有描述清楚啊 是提交一个任务jm就会自动启动一个?
> 
> 
> 
> Best
> JasonLee
> 
> 发件人: naisili Yuan
> 发送时间: 2020-06-30 18:29
> 收件人: user-zh
> 主题: 关于local cluster的问题
> Hi all
> 我这边有写一个java服务去自动拉起本地flink cluster(单机模式)用来调试使用。我是直接调用的bin/start-cluster.sh脚本。
> 现在问题是每次重新发布服务之后,发现这个启动的会话jobmanager会自动增加一个,导致slots总数越来越高。研究半天始终没找到原因,希望获得帮助!
> flink版本1.10.0


Re: flink SQL如何将秒转换为timestamp

2020-06-30 文章 17610775...@163.com

hi

t as TO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'-MM-dd HH:mm:ss')) 这样设置就可以了. 
Best
JasonLee
 
发件人: zilong xiao
发送时间: 2020-06-30 16:29
收件人: user-zh
主题: flink SQL如何将秒转换为timestamp
有一个字段itime,类型为int,意为当前时间的秒值,如何将该字段转换成timestamp?以下是我的想法,不知是否正确,求遇到过类似问题的大佬指导
 
TO_TIMESTAMP(DATE_FORMAT(CAST(itime * 1000 as TIMESTAMP(3)), '-MM-dd
hh:mm:ss'))


Re: 关于local cluster的问题

2020-06-30 文章 17610775...@163.com
hi 
你这个问题没有描述清楚啊 是提交一个任务jm就会自动启动一个?



Best
JasonLee
 
发件人: naisili Yuan
发送时间: 2020-06-30 18:29
收件人: user-zh
主题: 关于local cluster的问题
Hi all
我这边有写一个java服务去自动拉起本地flink cluster(单机模式)用来调试使用。我是直接调用的bin/start-cluster.sh脚本。
现在问题是每次重新发布服务之后,发现这个启动的会话jobmanager会自动增加一个,导致slots总数越来越高。研究半天始终没找到原因,希望获得帮助!
flink版本1.10.0


关于local cluster的问题

2020-06-30 文章 naisili Yuan
Hi all
我这边有写一个java服务去自动拉起本地flink cluster(单机模式)用来调试使用。我是直接调用的bin/start-cluster.sh脚本。
现在问题是每次重新发布服务之后,发现这个启动的会话jobmanager会自动增加一个,导致slots总数越来越高。研究半天始终没找到原因,希望获得帮助!
flink版本1.10.0


Re: flink SQL如何将秒转换为timestamp

2020-06-30 文章 zilong xiao
好的,我试试~

王松  于2020年6月30日周二 下午5:35写道:

> 可以试试这样写:
> TO_TIMESTAMP(FROM_UNIXTIME(itime, '-MM-dd HH:mm:ss'))
>
> zilong xiao  于2020年6月30日周二 下午4:30写道:
>
> >
> 有一个字段itime,类型为int,意为当前时间的秒值,如何将该字段转换成timestamp?以下是我的想法,不知是否正确,求遇到过类似问题的大佬指导
> >
> > TO_TIMESTAMP(DATE_FORMAT(CAST(itime * 1000 as TIMESTAMP(3)), '-MM-dd
> > hh:mm:ss'))
> >
>


Re: flink SQL如何将秒转换为timestamp

2020-06-30 文章 王松
可以试试这样写:
TO_TIMESTAMP(FROM_UNIXTIME(itime, '-MM-dd HH:mm:ss'))

zilong xiao  于2020年6月30日周二 下午4:30写道:

> 有一个字段itime,类型为int,意为当前时间的秒值,如何将该字段转换成timestamp?以下是我的想法,不知是否正确,求遇到过类似问题的大佬指导
>
> TO_TIMESTAMP(DATE_FORMAT(CAST(itime * 1000 as TIMESTAMP(3)), '-MM-dd
> hh:mm:ss'))
>


??????????RichReduceFunction??RichAggregateFunction

2020-06-30 文章 BenChen
??


uid 
keybyreduce??aggregatestateBloomFilterWindowFunction??stateprocess


| |
BenChen
|
|
haibin...@163.com
|
??


??2020??06??30?? 15:55??Yichao Yang<1048262...@qq.com> ??
Hi


??stateuid 
keybyBloomFilter


Best,
Yichao Yang




--  --
??: "BenChen"

flink SQL如何将秒转换为timestamp

2020-06-30 文章 zilong xiao
有一个字段itime,类型为int,意为当前时间的秒值,如何将该字段转换成timestamp?以下是我的想法,不知是否正确,求遇到过类似问题的大佬指导

TO_TIMESTAMP(DATE_FORMAT(CAST(itime * 1000 as TIMESTAMP(3)), '-MM-dd
hh:mm:ss'))


??????????RichReduceFunction??RichAggregateFunction

2020-06-30 文章 Yichao Yang
Hi


??stateuid 
keybyBloomFilter


Best,
Yichao Yang




--  --
??: "BenChen"

关于RichReduceFunction和RichAggregateFunction

2020-06-30 文章 BenChen


Hi all,
在flink里面尝试使用RichReduceFunction和RichAggregateFunction,但是收到了UnsupportedOperationException。


看了下源代码,在reduce和aggregate方法里面会检测到是RichFunction的话就会直接抛异常。同时ReduceFunction和AggregateFunction是作为ReducingState和AggregatingState的属性,作为函数的聚合方法,真正让用户使用state是要放到reduce和aggregate对应的WindowFunction参数里面去。


目前我的问题是
1. 是否从Flink设计角度来说,就不支持在reduce和aggregate使用RichFunction?还是说以后会实现?
2. Flink自带的RichReduceFunction和RichAggregateFunction是用在什么场景?
3. 
在使用reduce和aggregate聚合的过程中,如果我需要一些全局的state,比如使用BloomFilter判断用户是否参与过这个活动,有什么建议吗?


感谢。
| |
BenChen
|
|
haibin...@163.com
|
签名由网易邮箱大师定制