我们想修改源码 实现任意任务提交实时平台,初始化DAG的时候获取到血缘信息,StreamExecutionEnvironment注册 这种只能写在任务里
不满足需求
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2024年03月8日 18:23,Zhanghao Chen 写道:
你可以看下 OpenLineage 和 Flink 的集成方法 [1],它是在 StreamExecutionEnvironment 里注册了一个
JobListener(通过这个可以拿到 JobClient 进而拿到 job id)。然后从
”JobGraph 可以获得 transformation 信息“, JobGraph可以直接获取transformation的信息吗?, 我们是在
SourceTransformation 和SinkTransformation反射拿到链接信息 ,但是这个地方拿不到flinkJobid,
JobGraph可以拿到source和sink的链接信息和flinkJobid?
| |
阿华田
|
|
a15733178...@163.com
|
JobGraph 可以获得 transformation 信息transformation
签名由网易邮箱大师定制
在2024年03月8
获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2024年02月26日 20:04,Feng Jin 写道:
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。
可以参考 OpenLineage[1] 的实现.
1.
https://github.com
看了一下 这样需要每个任务都配置listener,做不到系统级的控制,推动下游用户都去配置listener比较困难
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2024年02月2日 19:38,Feng Jin 写道:
hi,
可以参考下 OpenLineage[1] 的实现, 通过 Flink 配置JobListener 拿到 Transformation 信息,然后解析
Source 和 Sink 拿到血缘信息。
[1]
https://github.com/OpenLineage/OpenLineage/blob
好的 感谢 我研究一下
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2024年02月2日 19:38,Feng Jin 写道:
hi,
可以参考下 OpenLineage[1] 的实现, 通过 Flink 配置JobListener 拿到 Transformation 信息,然后解析
Source 和 Sink 拿到血缘信息。
[1]
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/src/main/java/io
打算做flink任务画像的事情,主要是用户跑的datastream作业,在我们的实时平台运行起来之后希望能审计到使用了哪些kafka的topic,写入了哪些中间件(mysql,hbase
,ck 等等),大佬们有什么好的方式吗,目前flinksql可以通过sql获取到,用户自己编译的flink任务包去执行datastream作业获取不到
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
这个维度都排查了 都正常
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2024年01月23日 21:57,Feng Jin 写道:
可以尝试着下面几种方式确认下原因:
1.
打开浏览器开发者模式,看是否因为请求某个接口卡住
2.
查看下 JobManager 的 GC 情况,是否频繁 FullGC
3.
查看下 JobManager 的日志,是否存在某些资源文件丢失或者磁盘异常情况导致 web UI 无法访问.
Best,
Feng
On Tue, Jan 23, 2024 at 6:16 PM 阿华田
如下图,任务处理数据正常,任务状态也正常,但是flink_ui一致处于loading中,只有个别任务这样,其他正常,有可能是metirc包的某个类冲突导致的吗?
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
Flink集群使用kafka_appender收集flink产生的日志,但是现在实时运行的任务超过了三千个,运行的yarn-container有20万+。导致存储日志的kafka集群连接数过多,kafka集群压力有点大,请教各位大佬flink日志采集这块还有什么好的方式吗
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
本人目前再做实时计算平台,想审计平台上所有运行flink任务的中间件的连接信息。比如job1
是kafka写入hbase的flink任务,希望可以自动审计此任务所连接的kafka的集群ip地址和topic以及hbase的集群ip地址和表信息
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
这个解决不了根本问题 主要是我们的任务比较多,业务上就需要保留几千个任务
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2023年07月28日 11:28,Shammon FY 写道:
Hi,
可以通过配置`jobstore.max-capacity`和`jobstore.expiration-time`控制保存的任务数,具体参数可以参考[1]
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#full
目前flink-job-history 已经收录5000+任务,当点击全部任务查看时,job-history就会卡死无法访问,各位大佬有什么好的解决方式?
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
history server是不是有延迟性 做不到实时获取任务的状态
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2023年07月15日 12:14,casel.chen 写道:
可以查看history server
在 2023-07-14 18:36:42,"阿华田" 写道:
hello 各位大佬, flink on K8S ,运行模式是native 模式,任务状态实时监控遇到一个问题: 就是pod退出了
无法判断flink任务是正常Finished 还是异常失败
hello 各位大佬, flink on K8S ,运行模式是native 模式,任务状态实时监控遇到一个问题: 就是pod退出了
无法判断flink任务是正常Finished 还是异常失败了,各位大佬有什么建议吗
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
使用flinksql写入es是
index是动态的,所有想使用flinksql直接设置字段的属性,比如not_analyzed。目前发现好像只能设置字段的类型,大佬们有知道怎么设置字段的属性吗
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
initializeStateis started ");
}
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
ublic void initializeState(FunctionInitializationContext context) throws
Exception {
mapStateDescriptor = new MapStateDescriptor<>(
"app-status-map",
String.class,
Integer.class);
mapState = context.getKeyedStateStore().getMapState(mapStateDescriptor);
LOGGER.info("the initializeState
已经可以了 flink1.11 在jobManger的ui页面配置信息那块能看到这个参数是否配置成功
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2021年03月5日 11:44,Xintong Song 写道:
你的 flink 是什么版本?
部署模式是 per-job 还是 session?
“看到任务配置参数也生效了”具体是在哪里看到的?
Thank you~
Xintong Song
On Thu, Mar 4, 2021 at 4:35 PM 阿华田 wrote:
使用-yD
使用-yD yarn.containers.vcores=4
区设置flink任务的总的cpu核数,看到任务配置参数也生效了 但是实际申请核数还是按照 cpu slot一比一申请的
各位大佬使用yarn.containers.vcores是不是还需要开启yarn的cpu 调度
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
Ok 感谢
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2021年02月26日 15:29,Rui Li 写道:
你好,
目前hive connector还没有支持ranger,只支持HMS端基于storage的权限控制。
On Thu, Feb 25, 2021 at 8:49 PM 阿华田 wrote:
目前在做基于flinksql搭建实时计算平台,使用hive的catalog管理flinksql的元数据。hive本身使用Ranger进行权限管理。但是flinksql使用hive的catalog管理元数据,用户的权
目前在做基于flinksql搭建实时计算平台,使用hive的catalog管理flinksql的元数据。hive本身使用Ranger进行权限管理。但是flinksql使用hive的catalog管理元数据,用户的权限Ranger好像无法管理。各位大佬知道有什么比较好的方式解决吗?
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
.
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
-> STREAM_PHYSICAL]
There is 1 empty subset: rel#280:Subset#14.STREAM_PHYSICAL.any.None:
0.[NONE].[NONE], the relevant part of the original plan is as follows
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
各位大佬,Flink-sql-gateway 提交flink sql任务 ,如何使用flink自定义的udf
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
"'update-mode' = 'append')";
tableEnv.executeSql(create_sql);
Table table = tableEnv.sqlQuery("select name from test ");
TableSchema schema = table.getSchema();
System.out.println(schema);
DataStream> tuple2DataStream =
tableEnv.toRetractStream(table, Row.class);
tuple2DataStream.print();
tableEnv.execute("test");
//bsEnv.execute("fff");
}
}
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2020年08月7日 13:49,阿华田 写道:
代码如下
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
代码如下
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
broker造成了很大的负载导致请求超时?
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
解决了
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2020年06月18日 17:56,john<506269...@qq.com> 写道:
嗨,找到问题了吗?我也遇到了
2020年6月1日 下午2:48,阿华田 写道:
//初始化任务参数
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
FileInputFormat fileInputFormat = new TextInputFormat(new
Path
使用flink读取递归读取hdfs文件,报.tmp结尾的文件不存在异常,正常这些tmp文件flink应该不用读取吧?
File does not exist:
/recommender/success_fid_flow/ds=2020-06-16/hour=14/2020-06-16_14.success_fid_jarvis-01.1592287200578.tmp
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
建议使用缓存,因为b流会延迟20分钟到,所以将a流的数据缓存20分钟,时间到了在和b流进行关联,缓存推荐使用谷歌的cache,
com.google.common.cache;
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2020年06月15日 14:41,steven chen 写道:
hi:
1.项目中我们会汇集不同来源的消息的,然和合并进行统计并输出结果。
2. 有topic a 是所有的用户pv日志, topic b
是所有用户uv日志,现在1个job同时消费a,b2个消息,并将pv,uv的结果同时输出到下一级的
对 获取flink任务的metric信息 主要是任务状态
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2020年06月2日 14:21,air23 写道:
分钟级别定时去获取metrics?
这样吗
在 2020-06-02 14:05:39,"阿华田" 写道:
这种情况需要对flink任务进行监控 获取flink的任务状态
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2020年06月2日 14:03,air2
这种情况需要对flink任务进行监控 获取flink的任务状态
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2020年06月2日 14:03,air23 写道:
今天发现taskmanagers报json解析失败 他一起在重启
但是我们这边是监控yarn 任务级别的。像这种task 里面报错。yarn任务又不会挂掉。应该怎么去做监控。才能得知 程序后台有问题
谢谢
使用FileInputFormat
递归读取hdfs文件,并添加过滤器。程序执行没有报错但是很快就执行完成也没有读取到数据,本地测试可以过滤并读取到数据,yarn集群上执行出现上述情况。
代码:
//初始化任务参数
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
FileInputFormat fileInputFormat = new TextInputFormat(new
Path("hdfs://arc/success_fid_flow"));
说明一下 读取的数据还没有到今天的数据 也就是提示文件不存在的目录xxx/ds=2020-05-28
| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2020年05月28日 16:36,阿华田 写道:
使用recursive.file.enumeration开启递归读取hdfs的目录文件,但是每次数据没有读完就会报如下错误:
java.io.IOException: Error opening the InputSplit
hdfs://xxx/ds=2020-05-28/hour=15/2020-05-28_15
使用recursive.file.enumeration开启递归读取hdfs的目录文件,但是每次数据没有读完就会报如下错误:
java.io.IOException: Error opening the InputSplit
hdfs://xxx/ds=2020-05-28/hour=15/2020-05-28_15.log.flume2_idcfeature_kafkamq.tmp
[0,134217728]: File does not exist: /xxx/ds=2020-05-28/hour=15/
| |
王志华
|
|
a15733178...@163.com
|
input_data = "hdfs://localhost:9002/tmp/match_bak/%s*[0-9]" % ('2018-07-16’)
result = sc.textFile(input_data)
flink可以像spark一样正则读取hdfs目录下的文件吗?目前测试好像不行,如果不支持,最早什么版本会支持呢?
| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
onnectors/streamfile_sink.html
Best,
Jingsong Lee
On Thu, May 21, 2020 at 10:59 AM jimandlice wrote:
flink 写入hive 使用api 思路是怎么的呢
| |
jimandlice
|
|
邮箱:jimandl...@163.com
|
Signature is customized by Netease Mail Master
在2020年05月21日 10:57,阿华田 写道:
flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件
flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件
| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
好的 感谢
| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2020年04月29日 10:29,Junzhong Qin 写道:
可以试一下Jsoniter, https://jsoniter.com/index.cn.html
阿华田 于2020年4月29日周三 上午10:07写道:
这个确实排查到了,主要是json解析那块耗时,老版本用的gson,现在改成fastjson了,解析速度提升了不少。看来大数据量的json解析还得是fastjson
| |
王志华
|
|
a15733178...@163.com
|
签名由
这个确实排查到了,主要是json解析那块耗时,老版本用的gson,现在改成fastjson了,解析速度提升了不少。看来大数据量的json解析还得是fastjson
| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在2020年04月29日 10:02,LakeShen 写道:
Hi 阿华,
数据延迟有可能是逻辑中某个环节比较耗时,比如查询 mysql,或者某处逻辑较复杂等等。
可以看看自己代码中,有么有比较耗时的逻辑。同时可以将自己认为比较耗时的地方,加上日志,看下处理时间。
Best,
LakeShen
阿华田 于2020年
、对于数据延迟建议还是得对 Kafka 消费的 Topic 进行消费组的监控,加上 Lag 告警,这样可以及时知道数据延迟情况
Best !
zhisheng
阿华田 于2020年4月28日周二 上午9:37写道:
线上任务对背压进行了监控,背压一直正常,任务却出现了大量的数据延迟,数据延迟不会产生背压吗?
| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
线上任务对背压进行了监控,背压一直正常,任务却出现了大量的数据延迟,数据延迟不会产生背压吗?
| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
在代码中给算子添加了名称,但是flink的ui上还是显示原始的名称,这种情况大佬们遇见过吗?
| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
谢谢
| |
王志华
|
|
邮箱:a15733178...@163.com
|
签名由 网易邮箱大师 定制
在2020年04月15日 11:44,zhang...@lakala.com 写道:
昨天晚上看到一篇微信公众号文章,分享给你,希望对你有帮助。
“Flink 端到端 Exactly-once 机制剖析”
https://mp.weixin.qq.com/s/fhUNuCOVFQUjRB-fo4Rl2g
发件人: 阿华田
发送时间: 2020-04-15 11:00
收件人: user-zh@flink.apache.org
主题: 自定义具有Exactly
如果自定义具有Exactly-Once语义的sink,继承TwoPhaseCommitSinkFunction,怎么确保真正的Exactly-Once,大佬们有已经上线使用的案例demo吗?
| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制
45 matches
Mail list logo