flink sql ddl CREATE TABLE kafka011 sink ????????????exactly-once??
flink sql CREATE TABLE kafka sinkcheckpointsql sinkexactly-once?? ?? ?? Consistency guarantees: By default, a Kafka sink ingests data with at-least-once guarantees into a Kafka topic if the query is executed with checkpointing enabled.?? CREATE TABLE ?? at-least-once
Re: 关于local cluster的问题
start-cluster.sh每次就是会启动一个Standalone集群的,由于都是一个flink conf,所以新起的JM 肯定会因为端口冲突起不来,TM会注册在之前已经running的JM上。 如果你只是测试,用完以后,需要stop-cluster.sh停掉 如果是想在一个JVM里面进行测试,那可以用MiniCluster,所有的组件都会以线程模式启动 Best, Yang naisili Yuan 于2020年6月30日周二 下午7:09写道: > 不好意思没说清楚,跟提交任务没关系,只是执行start-cluster.sh后taskmanager就自动加一 > > 发自我的iPhone > > > 在 2020年6月30日,18:54,"17610775...@163.com" <17610775...@163.com> 写道: > > > > hi > > 你这个问题没有描述清楚啊 是提交一个任务jm就会自动启动一个? > > > > > > > > Best > > JasonLee > > > > 发件人: naisili Yuan > > 发送时间: 2020-06-30 18:29 > > 收件人: user-zh > > 主题: 关于local cluster的问题 > > Hi all > > 我这边有写一个java服务去自动拉起本地flink > cluster(单机模式)用来调试使用。我是直接调用的bin/start-cluster.sh脚本。 > > > 现在问题是每次重新发布服务之后,发现这个启动的会话jobmanager会自动增加一个,导致slots总数越来越高。研究半天始终没找到原因,希望获得帮助! > > flink版本1.10.0 >
?????? flink sql if ????????????
tks -- -- ??: "Benchao Li"
flink1.10 用flinksql 写hbase,报错:UpsertStreamTableSink requires that Table has a full primary keys if it is updated.
你好, flink1.10,用flinkSQL写hbase,报错:UpsertStreamTableSink requires that Table has a full primary keys if it is updated. 看到网上的资料说是,upsertSink的primary key是通过query来推断的,而我的query无法推断出PK,所以报错。说是需要1.10的临时解决方法是加一层group by,使得query可以推断出 primary key。 但是,我添加group by以后还是报错,这个问题该怎么解决呢??到底query是如何推断PK的?? 以下是我的sql语句: 创建表的语句(字段有点多,还请见谅) CREATE TABLE `AssetInfoRiskResultSinkTable` ( rowkey string, d ROW(`id` bigint, `user_id` string, `income_no` string , `nation` string, `card_auth_expiry_time` string , `user_name` string , `login_phone` string , `card_no` string , `address` string , `highest_eduction` string , `is_married` string , `resident_address` string , `resident_province` string , `resident_city` string , `resident_town` string , `profession` string , `job_salary` string , `company_name` string , `company_province` string , `company_city` string , `company_town` string , `company_address` string , `family_name` string , `family_phone` string , `workmate_name` string , `workmate_phone` string , `bank_card_no` string , `bank_phone` string , `device_address` string , `device_ip` string , `contacts` string , `create_user` string , `create_time` string , `update_user` string , `update_time` string, `scene_id` string , `access_type` string , `status` string , `label_id` string , `label_name` string , `ocr_real_name` string , `ocr_id_card` string , `ocr_id_card_address` string, `longitude` string , `latitude` string , `imei` string , `imsi` string , `mac` string , `resident_province2` string , `loan_use` string , `channel_code` string , `channel_name` string , `credit_card_number` string , `product_type_code` string , `product_type_name` string , `apply_amount` string , `income_time` string , `credit_card_phone` string , `credit_card_amount` string , `period` string , `start_work_time` string , `register_channel` string , `register_channel_name` string , `bank_name` string , `company_call` string , `system_tag` string , `is_trans` string , `is_dial_confirm` string , `is_dial_type` string , `is_dial_typeM` string , `is_dial_typeHuman` string , `user_risk_score` string , `upload_imgs` string , `upload_status` string , `famliy_relationship` string , `work_relationship` string , `request_time` string , `ac_record` string , `profession_station` string , `mobile_os` string , `mobile_type` string , `mobile_brand` string , `networktype` string , `jail_break` string , `open_udid` string , `simulator` string , `idfa` string , `idfv` string , `device_type` string , `credit_card_id_card_no` string , `credit_card_username` string , `sign_issue_org` string , `user_level` string , `channel_request_time` string , `real_income_no` string , `org_channel_code` string , `qq` string , `mail` string , `pre_grant_credit_amount` string , `pre_grant_credit_term` string , `pre_grant_credit_term_unit` string , `monthly_repay_amount` string , `total_repay_amount` string , `xhd_white_list_flag` string , `white_list_flag` string , `white_list_level` string , `white_list_type` string , `bus_type` string , `housing_fund_status` string , `operator_auth_status` string , `credit_card_status` string , `pboc_credit_status` string , `bh_url_flag` string, `zmxy_auth_expiry_time` string , `operator_auth_expiry_time` string , `housing_fund_status_time` string , `credit_card_expiry_time` string , `pboc_credit_status_time` string , `credit_card_bank_name` string , `due_limit_unit` string , `due_limit` string , `loan_amount` string , `risk_lead_flag` string , `period_unit` string , `face_score` string , `notify_url` string , `org_id` string , `contract_id` string , `birthday` string , `zmxy_status` string , `is_root` string , `is_virtualmachine` string , `appnum` string , `wifi_ip` string , `blue_mac` string , `wifi_mac` string , `vpn_ip` string , `cell_ip` string , `true_ip` string , `is_helical_accelerator` string , `bussiness` string , `manual_check` string , `has_contacts` string , `length_of_residence_year` string , `length_of_residence_month` string , `gps_province` string , `gps_city` string , `gps_area` string , `gps_detail_address` string , `positional_titles` string , `work_years` string , `work_months` string , `max_acceptable_monthly_payment` string , `profession_code` string , `occupation` string , `career_status` string , `work_position` string , `work_time` string , `end_result` string ) ) with ( 'connector.type' = 'hbase', 'connector.version' = '1.4.3', 'connector.table-name' = 'rtest:borrower_related_asset_info_real_time', 'connector.zookeeper.quorum' = 'fdw6.fengjr.inc,fdw4.fengjr.inc,fdw5.fengjr.inc,fjr-yz-204-11,fj
Re: 关于flink sql问题
我理解你只需要把这同一个Mysql表再做一个维表即可。可以写两次DDL,一个给维表用,一个给sink用。 如果你就觉得它是实时变化的,你可以把维表的cache关掉,保证每次都是获取Mysql中最新的数据就可以了吧? 当然了,在DDL的时候并没有区分这个表是维表还是sink表,具体它是什么类型,只是根据你在SQL里面怎么使用来决定的。 理论上来讲,你一个DDL可以同时做维表也可以做Sink。(只是它们可能有些配置会不同,分开写两个DDL应该是更清晰一些) zya 于2020年6月30日周二 下午11:26写道: > 请问下,sink写出的表能做维表吗,因为sink会一直写入,做维表的话会一直动态变化 > > > > > > > > > > > -- 原始邮件 -- > 发件人: "Benchao Li" 发送时间: 2020年6月30日(星期二) 晚上11:14 > 收件人: "user-zh" > 主题: Re: 关于flink sql问题 > > > > 应该做一个维表Join就可以了。 > > > zya > > Hi 各位,有个问题想请教一下: > > 目前我有一个功能想使用flink sql来完成,source是kafka,sink是mysql, > > > > > 在写入mysql的时候,我希望能先根据key获取mysql中的数据进行判断,然后决定如何写入数据,请问flink1.10目前能实现这种功能吗? > > > > -- > > Best, > Benchao Li -- Best, Benchao Li
Re: flink sql if 函数使用问题
看报错,应该是你的IF的后面两个参数的类型不同吧。这里应该让后面两个参数的类型也相同的,要不然IF函数的返回值类型就不好确定了。 kcz <573693...@qq.com> 于2020年7月1日周三 上午11:03写道: > flink-1.10.1 blink_planner > if使用时候限制了返回的数据类型吗? > Cannot apply 'IF' to arguments of type 'IF( 'IF( 我想创建DDL时候,因为字段可能有空,所以如果为空了我想设置一个默认值,但是报错提示是只支持返回数据类型。 -- Best, Benchao Li
??????????????????restart????????OOM
oommetaspace ??os kill?? | | a511955993 | | ??a511955...@163.com | ?? ??2020??07??01?? 11:32??kcz ?? 1.10.0??1.11.0classloader?? OK??OOMmetaspaceOOM?? -- -- ??: ""https://issues.apache.org/jira/browse/FLINK-11205 SmileSmile
?????? ????????????restart????????OOM
1.10.0??1.11.0classloader?? OK??OOMmetaspaceOOM?? -- -- ??: ""https://issues.apache.org/jira/browse/FLINK-11205 SmileSmile
Re: 作业因为异常restart后,频繁OOM
很早以前遇到这个问题, standalone 模式下 metaspace 释放不掉, 感觉是一个比较严重的 bug https://issues.apache.org/jira/browse/FLINK-11205 这边有过讨论 SmileSmile 于2020年6月30日周二 下午11:45写道: > 作业如果正常运行,堆外内存是足够的。在restart后才会出现频繁重启的情况,重构集群才能恢复正常 > > > | | > a511955993 > | > | > 邮箱:a511955...@163.com > | > > 签名由 网易邮箱大师 定制 > > 在2020年06月30日 23:39,LakeShen 写道: > 我在较低版本,Flink on k8s ,也遇到 OOM 被 kill 了。 > > 我感觉可能是 TaskManager 堆外内存不足了,我目前是 Flink 1.6 版本,Flink on k8s , standalone per > job 模式,堆外内存默认没有限制~。 > > 我的解决方法增加了一个参数:taskmanager.memory.off-heap: true. > > 目前来看,OOM被 kill 掉的问题没有在出现了。希望能帮到你。 > > Best, > LakeShen > > SmileSmile 于2020年6月30日周二 下午11:19写道: > > > > > 补充一下,内核版本为 3.10.x,是否会是堆外内存cache没被回收而导致的内存超用? > > > > > > | | > > a511955993 > > | > > | > > 邮箱:a511955...@163.com > > | > > > > 签名由 网易邮箱大师 定制 > > > > 在2020年06月30日 23:00,GuoSmileSmil 写道: > > hi all, > > > > > > > > > 我使用的Flink版本为1.10.1,使用的backend是rocksdb,没有开启checkpoint,运行在kubernetes平台上,模式是standalone。 > > > > > > > 目前遇到的问题是作业如果因为网络抖动或者硬件故障导致的pod被失联而fail,在pod重生后,作业自动restart,作业运行一段时间(半小时到1小时不等)很容易出现其他pod因为oom被os > > kill的现象,然后反复循环,pod 被kill越来越频繁。目前的解决方法是手动销毁这个集群,重新构建一个集群后重启作业,就恢复正常。 > > > > > > 如果单纯heap的状态后台,作业restart不会出现这样的问题。 > > > > > > 有一些不成熟的猜测,作业在fail后,native memory没有释放干净,pod的limit假设为10G,那么job > > restart后只有8G,TM还是按照10G的标准运行,pod使用的内存就会超过10G而被os kill(纯属猜测)。 > > > > > > 请问大家是否有什么好的提议或者解决方法? > > > > > > 其中一次系统内核日志如下: > > > > > > Jun 30 21:59:15 flink-tm-1 kernel: memory: usage 28672000kB, limit > > 28672000kB, failcnt 11225 > > Jun 30 21:59:15 flink-tm-1 kernel: memory+swap: usage 28672000kB, limit > > 9007199254740988kB, failcnt 0 > > Jun 30 21:59:15 flink-tm-1 kernel: kmem: usage 0kB, limit > > 9007199254740988kB, failcnt 0 > > Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for > > /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice: > > cache:0KB rss:0KB rss_huge:0KB mapped_file:0KB swap:0K > > B inactive_anon:0KB active_anon:0KB inactive_file:0KB active_file:0KB > > unevictable:0KB > > Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for > > > /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-fe101418a3b2a7c534e89b4ac73d29b04070eb923220a5b1 > > 7338850bbdb3817a.scope: cache:0KB rss:44KB rss_huge:0KB mapped_file:0KB > > swap:0KB inactive_anon:0KB active_anon:44KB inactive_file:0KB > > active_file:0KB unevictable:0KB > > Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for > > > /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-a2295e812a828738810a8f1ae69cd48e99ef98b9e1038158a6e33f81524cc02a.scope: > > cache:180KB rss:28671776KB rss_huge:26437632KB mapped_file:144KB swap:0KB > > inactive_anon:0KB active_anon:28671760KB inactive_file:4KB > active_file:4KB > > unevictable:0KB > > Jun 30 21:59:15 flink-tm-1 kernel: [ pid ] uid tgid total_vm rss > > nr_ptes swapents oom_score_adj name > > Jun 30 21:59:15 flink-tm-1 kernel: [16875] 0 16875 2531 > > 40 -998 pause > > Jun 30 21:59:15 flink-tm-1 kernel: [17274] 0 17274 1369 421 > > 70 -998 bash > > Jun 30 21:59:15 flink-tm-1 kernel: [18089] 0 18089 10824832 7174316 > > 145000 -998 java > > Jun 30 21:59:15 flink-tm-1 kernel: [18348] 0 18348 1017 196 > > 60 -998 tail > > Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup out of memory: Kill > > process 26824 (Window(Tumbling) score 4 or sacrifice child > > Jun 30 21:59:15 flink-tm-1 kernel: Killed process 18089 (java) > > total-vm:43299328kB, anon-rss:28669084kB, file-rss:28180kB, shmem-rss:0kB > > > > > > > > > > > > > > Looking forward to your reply and help. > > > > Best >
flink sql if ????????????
flink-1.10.1 blink_planner if Cannot apply 'IF' to arguments of type 'IF(
回复:【Flink的transformations】
hi, 除了source、sink、union之类有特有的Transformation,大部分算子都属于OneInputTransformation 原始邮件 发件人: 忝忝向仧<153488...@qq.com> 收件人: user-zh 发送时间: 2020年6月29日(周一) 22:29 主题: 【Flink的transformations】 Hi,all: 请教下,Flink的应用程序首先都会转为逻辑映射也就是transformations,我看org.apache.flink.streaming.api.transformations包下面目前有17种Transformation类(SourceTransformation,SplitTransformation,TwoInputTransformation等),有没有一个映射关系列表,也就是说应用程序里面哪些算子或者操作(比如map,flatmap,filter,connect,select等)会对应到哪一个Transformation类. 谢谢.
Flink Training - why cannot keyBy hour?
Hi experts, I am going through Ververica flink training, and when doing the lab with window (https://training.ververica.com/exercises/windows), basically it requires to compute within an hour which driver earns the most tip. The logic is to 0. keyBy driverId 1. create 1 hour window based on eventTime 2. sum up all the tips for this driver within this 1 hour window 3. create an 1 hour globalWindow for all drivers 4. find the max tips sample code shown as below. SingleOutputStreamOperator> aggregatedTipsPerDriver = fares.keyBy(rides -> rides.driverId) .window(TumblingEventTimeWindows.of(Time.hours(1))) .process(new SumTipsFunction()); // Tuple3: reporting the timestamp for the end of the hour, the driverId, and the total of that driver's tips for that hour SingleOutputStreamOperator> hourlyMax = aggregatedTipsPerDriver.windowAll(TumblingEventTimeWindows.of(Time.hours(1))) .maxBy(2); The question is shown as 4th slide: why we cannot keyed by the hour? If I change the implementation to keyBy hour and run the HourlyTipsTest, the test of testMaxAcrossDrivers will fail: // (94668840,1,6.0) -> for timestamp window: 94668840, driverId: 1, earns most tip: 6.0 Expected :[(94668840,1,6.0), (94669200,2,20.0)] Actual :[(94668840,1,6.0), (94669200,2,20.0), (94669200,2,20.0)] [image: image.png] Thanks a lot! Eleanore
回复:作业因为异常restart后,频繁OOM
作业如果正常运行,堆外内存是足够的。在restart后才会出现频繁重启的情况,重构集群才能恢复正常 | | a511955993 | | 邮箱:a511955...@163.com | 签名由 网易邮箱大师 定制 在2020年06月30日 23:39,LakeShen 写道: 我在较低版本,Flink on k8s ,也遇到 OOM 被 kill 了。 我感觉可能是 TaskManager 堆外内存不足了,我目前是 Flink 1.6 版本,Flink on k8s , standalone per job 模式,堆外内存默认没有限制~。 我的解决方法增加了一个参数:taskmanager.memory.off-heap: true. 目前来看,OOM被 kill 掉的问题没有在出现了。希望能帮到你。 Best, LakeShen SmileSmile 于2020年6月30日周二 下午11:19写道: > > 补充一下,内核版本为 3.10.x,是否会是堆外内存cache没被回收而导致的内存超用? > > > | | > a511955993 > | > | > 邮箱:a511955...@163.com > | > > 签名由 网易邮箱大师 定制 > > 在2020年06月30日 23:00,GuoSmileSmil 写道: > hi all, > > > > 我使用的Flink版本为1.10.1,使用的backend是rocksdb,没有开启checkpoint,运行在kubernetes平台上,模式是standalone。 > > > 目前遇到的问题是作业如果因为网络抖动或者硬件故障导致的pod被失联而fail,在pod重生后,作业自动restart,作业运行一段时间(半小时到1小时不等)很容易出现其他pod因为oom被os > kill的现象,然后反复循环,pod 被kill越来越频繁。目前的解决方法是手动销毁这个集群,重新构建一个集群后重启作业,就恢复正常。 > > > 如果单纯heap的状态后台,作业restart不会出现这样的问题。 > > > 有一些不成熟的猜测,作业在fail后,native memory没有释放干净,pod的limit假设为10G,那么job > restart后只有8G,TM还是按照10G的标准运行,pod使用的内存就会超过10G而被os kill(纯属猜测)。 > > > 请问大家是否有什么好的提议或者解决方法? > > > 其中一次系统内核日志如下: > > > Jun 30 21:59:15 flink-tm-1 kernel: memory: usage 28672000kB, limit > 28672000kB, failcnt 11225 > Jun 30 21:59:15 flink-tm-1 kernel: memory+swap: usage 28672000kB, limit > 9007199254740988kB, failcnt 0 > Jun 30 21:59:15 flink-tm-1 kernel: kmem: usage 0kB, limit > 9007199254740988kB, failcnt 0 > Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for > /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice: > cache:0KB rss:0KB rss_huge:0KB mapped_file:0KB swap:0K > B inactive_anon:0KB active_anon:0KB inactive_file:0KB active_file:0KB > unevictable:0KB > Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for > /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-fe101418a3b2a7c534e89b4ac73d29b04070eb923220a5b1 > 7338850bbdb3817a.scope: cache:0KB rss:44KB rss_huge:0KB mapped_file:0KB > swap:0KB inactive_anon:0KB active_anon:44KB inactive_file:0KB > active_file:0KB unevictable:0KB > Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for > /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-a2295e812a828738810a8f1ae69cd48e99ef98b9e1038158a6e33f81524cc02a.scope: > cache:180KB rss:28671776KB rss_huge:26437632KB mapped_file:144KB swap:0KB > inactive_anon:0KB active_anon:28671760KB inactive_file:4KB active_file:4KB > unevictable:0KB > Jun 30 21:59:15 flink-tm-1 kernel: [ pid ] uid tgid total_vm rss > nr_ptes swapents oom_score_adj name > Jun 30 21:59:15 flink-tm-1 kernel: [16875] 0 16875 2531 > 40 -998 pause > Jun 30 21:59:15 flink-tm-1 kernel: [17274] 0 17274 1369 421 > 70 -998 bash > Jun 30 21:59:15 flink-tm-1 kernel: [18089] 0 18089 10824832 7174316 > 145000 -998 java > Jun 30 21:59:15 flink-tm-1 kernel: [18348] 0 18348 1017 196 > 60 -998 tail > Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup out of memory: Kill > process 26824 (Window(Tumbling) score 4 or sacrifice child > Jun 30 21:59:15 flink-tm-1 kernel: Killed process 18089 (java) > total-vm:43299328kB, anon-rss:28669084kB, file-rss:28180kB, shmem-rss:0kB > > > > > > > Looking forward to your reply and help. > > Best
Re: 作业因为异常restart后,频繁OOM
我在较低版本,Flink on k8s ,也遇到 OOM 被 kill 了。 我感觉可能是 TaskManager 堆外内存不足了,我目前是 Flink 1.6 版本,Flink on k8s , standalone per job 模式,堆外内存默认没有限制~。 我的解决方法增加了一个参数:taskmanager.memory.off-heap: true. 目前来看,OOM被 kill 掉的问题没有在出现了。希望能帮到你。 Best, LakeShen SmileSmile 于2020年6月30日周二 下午11:19写道: > > 补充一下,内核版本为 3.10.x,是否会是堆外内存cache没被回收而导致的内存超用? > > > | | > a511955993 > | > | > 邮箱:a511955...@163.com > | > > 签名由 网易邮箱大师 定制 > > 在2020年06月30日 23:00,GuoSmileSmil 写道: > hi all, > > > > 我使用的Flink版本为1.10.1,使用的backend是rocksdb,没有开启checkpoint,运行在kubernetes平台上,模式是standalone。 > > > 目前遇到的问题是作业如果因为网络抖动或者硬件故障导致的pod被失联而fail,在pod重生后,作业自动restart,作业运行一段时间(半小时到1小时不等)很容易出现其他pod因为oom被os > kill的现象,然后反复循环,pod 被kill越来越频繁。目前的解决方法是手动销毁这个集群,重新构建一个集群后重启作业,就恢复正常。 > > > 如果单纯heap的状态后台,作业restart不会出现这样的问题。 > > > 有一些不成熟的猜测,作业在fail后,native memory没有释放干净,pod的limit假设为10G,那么job > restart后只有8G,TM还是按照10G的标准运行,pod使用的内存就会超过10G而被os kill(纯属猜测)。 > > > 请问大家是否有什么好的提议或者解决方法? > > > 其中一次系统内核日志如下: > > > Jun 30 21:59:15 flink-tm-1 kernel: memory: usage 28672000kB, limit > 28672000kB, failcnt 11225 > Jun 30 21:59:15 flink-tm-1 kernel: memory+swap: usage 28672000kB, limit > 9007199254740988kB, failcnt 0 > Jun 30 21:59:15 flink-tm-1 kernel: kmem: usage 0kB, limit > 9007199254740988kB, failcnt 0 > Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for > /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice: > cache:0KB rss:0KB rss_huge:0KB mapped_file:0KB swap:0K > B inactive_anon:0KB active_anon:0KB inactive_file:0KB active_file:0KB > unevictable:0KB > Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for > /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-fe101418a3b2a7c534e89b4ac73d29b04070eb923220a5b1 > 7338850bbdb3817a.scope: cache:0KB rss:44KB rss_huge:0KB mapped_file:0KB > swap:0KB inactive_anon:0KB active_anon:44KB inactive_file:0KB > active_file:0KB unevictable:0KB > Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for > /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-a2295e812a828738810a8f1ae69cd48e99ef98b9e1038158a6e33f81524cc02a.scope: > cache:180KB rss:28671776KB rss_huge:26437632KB mapped_file:144KB swap:0KB > inactive_anon:0KB active_anon:28671760KB inactive_file:4KB active_file:4KB > unevictable:0KB > Jun 30 21:59:15 flink-tm-1 kernel: [ pid ] uid tgid total_vm rss > nr_ptes swapents oom_score_adj name > Jun 30 21:59:15 flink-tm-1 kernel: [16875] 0 16875 2531 > 40 -998 pause > Jun 30 21:59:15 flink-tm-1 kernel: [17274] 0 17274 1369 421 > 70 -998 bash > Jun 30 21:59:15 flink-tm-1 kernel: [18089] 0 18089 10824832 7174316 > 145000 -998 java > Jun 30 21:59:15 flink-tm-1 kernel: [18348] 0 18348 1017 196 > 60 -998 tail > Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup out of memory: Kill > process 26824 (Window(Tumbling) score 4 or sacrifice child > Jun 30 21:59:15 flink-tm-1 kernel: Killed process 18089 (java) > total-vm:43299328kB, anon-rss:28669084kB, file-rss:28180kB, shmem-rss:0kB > > > > > > > Looking forward to your reply and help. > > Best
?????? ????flink sql????
sinksink -- -- ??: "Benchao Li"
回复:作业因为异常restart后,频繁OOM
补充一下,内核版本为 3.10.x,是否会是堆外内存cache没被回收而导致的内存超用? | | a511955993 | | 邮箱:a511955...@163.com | 签名由 网易邮箱大师 定制 在2020年06月30日 23:00,GuoSmileSmil 写道: hi all, 我使用的Flink版本为1.10.1,使用的backend是rocksdb,没有开启checkpoint,运行在kubernetes平台上,模式是standalone。 目前遇到的问题是作业如果因为网络抖动或者硬件故障导致的pod被失联而fail,在pod重生后,作业自动restart,作业运行一段时间(半小时到1小时不等)很容易出现其他pod因为oom被os kill的现象,然后反复循环,pod 被kill越来越频繁。目前的解决方法是手动销毁这个集群,重新构建一个集群后重启作业,就恢复正常。 如果单纯heap的状态后台,作业restart不会出现这样的问题。 有一些不成熟的猜测,作业在fail后,native memory没有释放干净,pod的limit假设为10G,那么job restart后只有8G,TM还是按照10G的标准运行,pod使用的内存就会超过10G而被os kill(纯属猜测)。 请问大家是否有什么好的提议或者解决方法? 其中一次系统内核日志如下: Jun 30 21:59:15 flink-tm-1 kernel: memory: usage 28672000kB, limit 28672000kB, failcnt 11225 Jun 30 21:59:15 flink-tm-1 kernel: memory+swap: usage 28672000kB, limit 9007199254740988kB, failcnt 0 Jun 30 21:59:15 flink-tm-1 kernel: kmem: usage 0kB, limit 9007199254740988kB, failcnt 0 Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice: cache:0KB rss:0KB rss_huge:0KB mapped_file:0KB swap:0K B inactive_anon:0KB active_anon:0KB inactive_file:0KB active_file:0KB unevictable:0KB Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-fe101418a3b2a7c534e89b4ac73d29b04070eb923220a5b1 7338850bbdb3817a.scope: cache:0KB rss:44KB rss_huge:0KB mapped_file:0KB swap:0KB inactive_anon:0KB active_anon:44KB inactive_file:0KB active_file:0KB unevictable:0KB Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-a2295e812a828738810a8f1ae69cd48e99ef98b9e1038158a6e33f81524cc02a.scope: cache:180KB rss:28671776KB rss_huge:26437632KB mapped_file:144KB swap:0KB inactive_anon:0KB active_anon:28671760KB inactive_file:4KB active_file:4KB unevictable:0KB Jun 30 21:59:15 flink-tm-1 kernel: [ pid ] uid tgid total_vm rss nr_ptes swapents oom_score_adj name Jun 30 21:59:15 flink-tm-1 kernel: [16875] 0 16875 2531 40 -998 pause Jun 30 21:59:15 flink-tm-1 kernel: [17274] 0 17274 1369 421 70 -998 bash Jun 30 21:59:15 flink-tm-1 kernel: [18089] 0 18089 10824832 7174316 145000 -998 java Jun 30 21:59:15 flink-tm-1 kernel: [18348] 0 18348 1017 196 60 -998 tail Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup out of memory: Kill process 26824 (Window(Tumbling) score 4 or sacrifice child Jun 30 21:59:15 flink-tm-1 kernel: Killed process 18089 (java) total-vm:43299328kB, anon-rss:28669084kB, file-rss:28180kB, shmem-rss:0kB Looking forward to your reply and help. Best
Re: 关于flink sql问题
应该做一个维表Join就可以了。 zya 于2020年6月30日周二 下午9:02写道: > Hi 各位,有个问题想请教一下: > 目前我有一个功能想使用flink sql来完成,source是kafka,sink是mysql, > > 在写入mysql的时候,我希望能先根据key获取mysql中的数据进行判断,然后决定如何写入数据,请问flink1.10目前能实现这种功能吗? -- Best, Benchao Li
作业因为异常restart后,频繁OOM
hi all, 我使用的Flink版本为1.10.1,使用的backend是rocksdb,没有开启checkpoint,运行在kubernetes平台上,模式是standalone。 目前遇到的问题是作业如果因为网络抖动或者硬件故障导致的pod被失联而fail,在pod重生后,作业自动restart,作业运行一段时间(半小时到1小时不等)很容易出现其他pod因为oom被os kill的现象,然后反复循环,pod 被kill越来越频繁。目前的解决方法是手动销毁这个集群,重新构建一个集群后重启作业,就恢复正常。 如果单纯heap的状态后台,作业restart不会出现这样的问题。 有一些不成熟的猜测,作业在fail后,native memory没有释放干净,pod的limit假设为10G,那么job restart后只有8G,TM还是按照10G的标准运行,pod使用的内存就会超过10G而被os kill(纯属猜测)。 请问大家是否有什么好的提议或者解决方法? 其中一次系统内核日志如下: Jun 30 21:59:15 flink-tm-1 kernel: memory: usage 28672000kB, limit 28672000kB, failcnt 11225 Jun 30 21:59:15 flink-tm-1 kernel: memory+swap: usage 28672000kB, limit 9007199254740988kB, failcnt 0 Jun 30 21:59:15 flink-tm-1 kernel: kmem: usage 0kB, limit 9007199254740988kB, failcnt 0 Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice: cache:0KB rss:0KB rss_huge:0KB mapped_file:0KB swap:0K B inactive_anon:0KB active_anon:0KB inactive_file:0KB active_file:0KB unevictable:0KB Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-fe101418a3b2a7c534e89b4ac73d29b04070eb923220a5b1 7338850bbdb3817a.scope: cache:0KB rss:44KB rss_huge:0KB mapped_file:0KB swap:0KB inactive_anon:0KB active_anon:44KB inactive_file:0KB active_file:0KB unevictable:0KB Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup stats for /kubepods.slice/kubepods-pod5ad5d2ea_5faa_4a11_96b4_39271ab76e99.slice/docker-a2295e812a828738810a8f1ae69cd48e99ef98b9e1038158a6e33f81524cc02a.scope: cache:180KB rss:28671776KB rss_huge:26437632KB mapped_file:144KB swap:0KB inactive_anon:0KB active_anon:28671760KB inactive_file:4KB active_file:4KB unevictable:0KB Jun 30 21:59:15 flink-tm-1 kernel: [ pid ] uid tgid total_vm rss nr_ptes swapents oom_score_adj name Jun 30 21:59:15 flink-tm-1 kernel: [16875] 0 16875 2531 40 -998 pause Jun 30 21:59:15 flink-tm-1 kernel: [17274] 0 17274 1369 421 70 -998 bash Jun 30 21:59:15 flink-tm-1 kernel: [18089] 0 18089 10824832 7174316 145000 -998 java Jun 30 21:59:15 flink-tm-1 kernel: [18348] 0 18348 1017 196 60 -998 tail Jun 30 21:59:15 flink-tm-1 kernel: Memory cgroup out of memory: Kill process 26824 (Window(Tumbling) score 4 or sacrifice child Jun 30 21:59:15 flink-tm-1 kernel: Killed process 18089 (java) total-vm:43299328kB, anon-rss:28669084kB, file-rss:28180kB, shmem-rss:0kB Looking forward to your reply and help. Best
????flink sql????
Hi ?? ??flink sqlsource??kafka??sink??mysql?? ??mysql??keymysqlflink1.10??
Re: 关于local cluster的问题
不好意思没说清楚,跟提交任务没关系,只是执行start-cluster.sh后taskmanager就自动加一 发自我的iPhone > 在 2020年6月30日,18:54,"17610775...@163.com" <17610775...@163.com> 写道: > > hi > 你这个问题没有描述清楚啊 是提交一个任务jm就会自动启动一个? > > > > Best > JasonLee > > 发件人: naisili Yuan > 发送时间: 2020-06-30 18:29 > 收件人: user-zh > 主题: 关于local cluster的问题 > Hi all > 我这边有写一个java服务去自动拉起本地flink cluster(单机模式)用来调试使用。我是直接调用的bin/start-cluster.sh脚本。 > 现在问题是每次重新发布服务之后,发现这个启动的会话jobmanager会自动增加一个,导致slots总数越来越高。研究半天始终没找到原因,希望获得帮助! > flink版本1.10.0
Re: flink SQL如何将秒转换为timestamp
hi t as TO_TIMESTAMP(FROM_UNIXTIME(ts/1000,'-MM-dd HH:mm:ss')) 这样设置就可以了. Best JasonLee 发件人: zilong xiao 发送时间: 2020-06-30 16:29 收件人: user-zh 主题: flink SQL如何将秒转换为timestamp 有一个字段itime,类型为int,意为当前时间的秒值,如何将该字段转换成timestamp?以下是我的想法,不知是否正确,求遇到过类似问题的大佬指导 TO_TIMESTAMP(DATE_FORMAT(CAST(itime * 1000 as TIMESTAMP(3)), '-MM-dd hh:mm:ss'))
Re: 关于local cluster的问题
hi 你这个问题没有描述清楚啊 是提交一个任务jm就会自动启动一个? Best JasonLee 发件人: naisili Yuan 发送时间: 2020-06-30 18:29 收件人: user-zh 主题: 关于local cluster的问题 Hi all 我这边有写一个java服务去自动拉起本地flink cluster(单机模式)用来调试使用。我是直接调用的bin/start-cluster.sh脚本。 现在问题是每次重新发布服务之后,发现这个启动的会话jobmanager会自动增加一个,导致slots总数越来越高。研究半天始终没找到原因,希望获得帮助! flink版本1.10.0
关于local cluster的问题
Hi all 我这边有写一个java服务去自动拉起本地flink cluster(单机模式)用来调试使用。我是直接调用的bin/start-cluster.sh脚本。 现在问题是每次重新发布服务之后,发现这个启动的会话jobmanager会自动增加一个,导致slots总数越来越高。研究半天始终没找到原因,希望获得帮助! flink版本1.10.0
Re: flink SQL如何将秒转换为timestamp
好的,我试试~ 王松 于2020年6月30日周二 下午5:35写道: > 可以试试这样写: > TO_TIMESTAMP(FROM_UNIXTIME(itime, '-MM-dd HH:mm:ss')) > > zilong xiao 于2020年6月30日周二 下午4:30写道: > > > > 有一个字段itime,类型为int,意为当前时间的秒值,如何将该字段转换成timestamp?以下是我的想法,不知是否正确,求遇到过类似问题的大佬指导 > > > > TO_TIMESTAMP(DATE_FORMAT(CAST(itime * 1000 as TIMESTAMP(3)), '-MM-dd > > hh:mm:ss')) > > >
Re: flink SQL如何将秒转换为timestamp
可以试试这样写: TO_TIMESTAMP(FROM_UNIXTIME(itime, '-MM-dd HH:mm:ss')) zilong xiao 于2020年6月30日周二 下午4:30写道: > 有一个字段itime,类型为int,意为当前时间的秒值,如何将该字段转换成timestamp?以下是我的想法,不知是否正确,求遇到过类似问题的大佬指导 > > TO_TIMESTAMP(DATE_FORMAT(CAST(itime * 1000 as TIMESTAMP(3)), '-MM-dd > hh:mm:ss')) >
??????????RichReduceFunction??RichAggregateFunction
?? uid keybyreduce??aggregatestateBloomFilterWindowFunction??stateprocess | | BenChen | | haibin...@163.com | ?? ??2020??06??30?? 15:55??Yichao Yang<1048262...@qq.com> ?? Hi ??stateuid keybyBloomFilter Best, Yichao Yang -- -- ??: "BenChen"
flink SQL如何将秒转换为timestamp
有一个字段itime,类型为int,意为当前时间的秒值,如何将该字段转换成timestamp?以下是我的想法,不知是否正确,求遇到过类似问题的大佬指导 TO_TIMESTAMP(DATE_FORMAT(CAST(itime * 1000 as TIMESTAMP(3)), '-MM-dd hh:mm:ss'))
??????????RichReduceFunction??RichAggregateFunction
Hi ??stateuid keybyBloomFilter Best, Yichao Yang -- -- ??: "BenChen"
关于RichReduceFunction和RichAggregateFunction
Hi all, 在flink里面尝试使用RichReduceFunction和RichAggregateFunction,但是收到了UnsupportedOperationException。 看了下源代码,在reduce和aggregate方法里面会检测到是RichFunction的话就会直接抛异常。同时ReduceFunction和AggregateFunction是作为ReducingState和AggregatingState的属性,作为函数的聚合方法,真正让用户使用state是要放到reduce和aggregate对应的WindowFunction参数里面去。 目前我的问题是 1. 是否从Flink设计角度来说,就不支持在reduce和aggregate使用RichFunction?还是说以后会实现? 2. Flink自带的RichReduceFunction和RichAggregateFunction是用在什么场景? 3. 在使用reduce和aggregate聚合的过程中,如果我需要一些全局的state,比如使用BloomFilter判断用户是否参与过这个活动,有什么建议吗? 感谢。 | | BenChen | | haibin...@163.com | 签名由网易邮箱大师定制