回复: Flink DataStream 作业如何获取到作业血缘?
我们想修改源码 实现任意任务提交实时平台,初始化DAG的时候获取到血缘信息,StreamExecutionEnvironment注册 这种只能写在任务里 不满足需求 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2024年03月8日 18:23,Zhanghao Chen 写道: 你可以看下 OpenLineage 和 Flink 的集成方法 [1],它是在 StreamExecutionEnvironment 里注册了一个 JobListener(通过这个可以拿到 JobClient 进而拿到 job id)。然后从 execution environment 里可以抽取到 transformation 信息处理 [2]。 [1] https://openlineage.io/docs/integrations/flink/ [2] https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/app/src/main/java/io/openlineage/flink/OpenLineageFlinkJobListener.java Best, Zhanghao Chen From: 阿华田 Sent: Friday, March 8, 2024 16:48 To: user-zh@flink.apache.org Subject: 回复: Flink DataStream 作业如何获取到作业血缘? ”JobGraph 可以获得 transformation 信息“, JobGraph可以直接获取transformation的信息吗?, 我们是在 SourceTransformation 和SinkTransformation反射拿到链接信息 ,但是这个地方拿不到flinkJobid, JobGraph可以拿到source和sink的链接信息和flinkJobid? | | 阿华田 | | a15733178...@163.com | JobGraph 可以获得 transformation 信息transformation 签名由网易邮箱大师定制 在2024年03月8日 16:18,Zhanghao Chen 写道: JobGraph 里有个字段就是 jobid。 Best, Zhanghao Chen From: 阿华田 Sent: Friday, March 8, 2024 14:14 To: user-zh@flink.apache.org Subject: 回复: Flink DataStream 作业如何获取到作业血缘? 获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2024年02月26日 20:04,Feng Jin 写道: 通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris Sink,之后再通过反射获取里面的 properties 信息进行提取。 可以参考 OpenLineage[1] 的实现. 1. https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java Best, Feng On Mon, Feb 26, 2024 at 6:20 PM casel.chen wrote: 一个Flink DataStream 作业从mysql cdc消费处理后写入apache doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink connector信息,包括连接字符串、数据库名、表名等?
回复: Flink DataStream 作业如何获取到作业血缘?
”JobGraph 可以获得 transformation 信息“, JobGraph可以直接获取transformation的信息吗?, 我们是在 SourceTransformation 和SinkTransformation反射拿到链接信息 ,但是这个地方拿不到flinkJobid, JobGraph可以拿到source和sink的链接信息和flinkJobid? | | 阿华田 | | a15733178...@163.com | JobGraph 可以获得 transformation 信息transformation 签名由网易邮箱大师定制 在2024年03月8日 16:18,Zhanghao Chen 写道: JobGraph 里有个字段就是 jobid。 Best, Zhanghao Chen From: 阿华田 Sent: Friday, March 8, 2024 14:14 To: user-zh@flink.apache.org Subject: 回复: Flink DataStream 作业如何获取到作业血缘? 获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2024年02月26日 20:04,Feng Jin 写道: 通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris Sink,之后再通过反射获取里面的 properties 信息进行提取。 可以参考 OpenLineage[1] 的实现. 1. https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java Best, Feng On Mon, Feb 26, 2024 at 6:20 PM casel.chen wrote: 一个Flink DataStream 作业从mysql cdc消费处理后写入apache doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink connector信息,包括连接字符串、数据库名、表名等?
回复: Flink DataStream 作业如何获取到作业血缘?
获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2024年02月26日 20:04,Feng Jin 写道: 通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris Sink,之后再通过反射获取里面的 properties 信息进行提取。 可以参考 OpenLineage[1] 的实现. 1. https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java Best, Feng On Mon, Feb 26, 2024 at 6:20 PM casel.chen wrote: 一个Flink DataStream 作业从mysql cdc消费处理后写入apache doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink connector信息,包括连接字符串、数据库名、表名等?
回复: Flink任务链接信息审计获取
看了一下 这样需要每个任务都配置listener,做不到系统级的控制,推动下游用户都去配置listener比较困难 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2024年02月2日 19:38,Feng Jin 写道: hi, 可以参考下 OpenLineage[1] 的实现, 通过 Flink 配置JobListener 拿到 Transformation 信息,然后解析 Source 和 Sink 拿到血缘信息。 [1] https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/src/main/java/io/openlineage/flink/OpenLineageFlinkJobListener.java Best, Feng On Fri, Feb 2, 2024 at 6:36 PM 阿华田 wrote: 打算做flink任务画像的事情,主要是用户跑的datastream作业,在我们的实时平台运行起来之后希望能审计到使用了哪些kafka的topic,写入了哪些中间件(mysql,hbase ,ck 等等),大佬们有什么好的方式吗,目前flinksql可以通过sql获取到,用户自己编译的flink任务包去执行datastream作业获取不到 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
回复: Flink任务链接信息审计获取
好的 感谢 我研究一下 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2024年02月2日 19:38,Feng Jin 写道: hi, 可以参考下 OpenLineage[1] 的实现, 通过 Flink 配置JobListener 拿到 Transformation 信息,然后解析 Source 和 Sink 拿到血缘信息。 [1] https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/src/main/java/io/openlineage/flink/OpenLineageFlinkJobListener.java Best, Feng On Fri, Feb 2, 2024 at 6:36 PM 阿华田 wrote: 打算做flink任务画像的事情,主要是用户跑的datastream作业,在我们的实时平台运行起来之后希望能审计到使用了哪些kafka的topic,写入了哪些中间件(mysql,hbase ,ck 等等),大佬们有什么好的方式吗,目前flinksql可以通过sql获取到,用户自己编译的flink任务包去执行datastream作业获取不到 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
Flink任务链接信息审计获取
打算做flink任务画像的事情,主要是用户跑的datastream作业,在我们的实时平台运行起来之后希望能审计到使用了哪些kafka的topic,写入了哪些中间件(mysql,hbase ,ck 等等),大佬们有什么好的方式吗,目前flinksql可以通过sql获取到,用户自己编译的flink任务包去执行datastream作业获取不到 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
回复: flink ui 算子数据展示一直loading...
这个维度都排查了 都正常 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2024年01月23日 21:57,Feng Jin 写道: 可以尝试着下面几种方式确认下原因: 1. 打开浏览器开发者模式,看是否因为请求某个接口卡住 2. 查看下 JobManager 的 GC 情况,是否频繁 FullGC 3. 查看下 JobManager 的日志,是否存在某些资源文件丢失或者磁盘异常情况导致 web UI 无法访问. Best, Feng On Tue, Jan 23, 2024 at 6:16 PM 阿华田 wrote: 如下图,任务处理数据正常,任务状态也正常,但是flink_ui一致处于loading中,只有个别任务这样,其他正常,有可能是metirc包的某个类冲突导致的吗? 阿华田 a15733178...@163.com <https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=%E9%98%BF%E5%8D%8E%E7%94%B0=a15733178518%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png=%5B%22a15733178518%40163.com%22%5D> 签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制
flink ui 算子数据展示一直loading...
如下图,任务处理数据正常,任务状态也正常,但是flink_ui一致处于loading中,只有个别任务这样,其他正常,有可能是metirc包的某个类冲突导致的吗? | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
kafka_appender收集flink任务日志连接数过多问题
Flink集群使用kafka_appender收集flink产生的日志,但是现在实时运行的任务超过了三千个,运行的yarn-container有20万+。导致存储日志的kafka集群连接数过多,kafka集群压力有点大,请教各位大佬flink日志采集这块还有什么好的方式吗 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
如何获取flink任务 source 和sink的所链接的中间件的ip信息
本人目前再做实时计算平台,想审计平台上所有运行flink任务的中间件的连接信息。比如job1 是kafka写入hbase的flink任务,希望可以自动审计此任务所连接的kafka的集群ip地址和topic以及hbase的集群ip地址和表信息 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
回复: flink-job-history 任务太多页面卡死
这个解决不了根本问题 主要是我们的任务比较多,业务上就需要保留几千个任务 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2023年07月28日 11:28,Shammon FY 写道: Hi, 可以通过配置`jobstore.max-capacity`和`jobstore.expiration-time`控制保存的任务数,具体参数可以参考[1] [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#full-jobmanager-options Best, Shammon FY On Fri, Jul 28, 2023 at 10:17 AM 阿华田 wrote: 目前flink-job-history 已经收录5000+任务,当点击全部任务查看时,job-history就会卡死无法访问,各位大佬有什么好的解决方式? | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
flink-job-history 任务太多页面卡死
目前flink-job-history 已经收录5000+任务,当点击全部任务查看时,job-history就会卡死无法访问,各位大佬有什么好的解决方式? | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
回复:flink on k8s 任务状态监控问题
history server是不是有延迟性 做不到实时获取任务的状态 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2023年07月15日 12:14,casel.chen 写道: 可以查看history server 在 2023-07-14 18:36:42,"阿华田" 写道: hello 各位大佬, flink on K8S ,运行模式是native 模式,任务状态实时监控遇到一个问题: 就是pod退出了 无法判断flink任务是正常Finished 还是异常失败了,各位大佬有什么建议吗 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
flink on k8s 任务状态监控问题
hello 各位大佬, flink on K8S ,运行模式是native 模式,任务状态实时监控遇到一个问题: 就是pod退出了 无法判断flink任务是正常Finished 还是异常失败了,各位大佬有什么建议吗 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
flinksql如何设置es 字段属性
使用flinksql写入es是 index是动态的,所有想使用flinksql直接设置字段的属性,比如not_analyzed。目前发现好像只能设置字段的类型,大佬们有知道怎么设置字段的属性吗 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
自定义带有状态的udf
自定义UDF 实现CheckpointedFunction 伪代码如下 发现并没有执行initializeState public class ClusterInfoCollectUdf extends ScalarFunction implements CheckpointedFunction { private static final LoggerLOGGER = LoggerFactory.getLogger(ClusterInfoCollectUdf.class); private transient MapState mapState; private MapStateDescriptor mapStateDescriptor; 。 @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { LOGGER.info("the snapshotStateis started "); } @Override public void initializeState(FunctionInitializationContext context) throws Exception { mapStateDescriptor = new MapStateDescriptor<>( "app-status-map", String.class, Integer.class); mapState = context.getKeyedStateStore().getMapState(mapStateDescriptor); LOGGER.info("the initializeStateis started "); } | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
如何自定义带有状态的UDF
自定义UDF 实现CheckpointedFunction 伪代码如下 发现并没有执行initializeState public class ClusterInfoCollectUdf extends ScalarFunction implements CheckpointedFunction { private static final LoggerLOGGER = LoggerFactory.getLogger(ClusterInfoCollectUdf.class); private transient MapState mapState; private MapStateDescriptor mapStateDescriptor; 。 @Override public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { LOGGER.info("the snapshotStateis started "); } @Override public void initializeState(FunctionInitializationContext context) throws Exception { mapStateDescriptor = new MapStateDescriptor<>( "app-status-map", String.class, Integer.class); mapState = context.getKeyedStateStore().getMapState(mapStateDescriptor); LOGGER.info("the initializeState is started "); } | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
回复: yarn.containers.vcores使用问题
已经可以了 flink1.11 在jobManger的ui页面配置信息那块能看到这个参数是否配置成功 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2021年03月5日 11:44,Xintong Song 写道: 你的 flink 是什么版本? 部署模式是 per-job 还是 session? “看到任务配置参数也生效了”具体是在哪里看到的? Thank you~ Xintong Song On Thu, Mar 4, 2021 at 4:35 PM 阿华田 wrote: 使用-yD yarn.containers.vcores=4 区设置flink任务的总的cpu核数,看到任务配置参数也生效了 但是实际申请核数还是按照 cpu slot一比一申请的 各位大佬使用yarn.containers.vcores是不是还需要开启yarn的cpu 调度 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
yarn.containers.vcores使用问题
使用-yD yarn.containers.vcores=4 区设置flink任务的总的cpu核数,看到任务配置参数也生效了 但是实际申请核数还是按照 cpu slot一比一申请的 各位大佬使用yarn.containers.vcores是不是还需要开启yarn的cpu 调度 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
回复: flinksql集成hive权限管理
Ok 感谢 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2021年02月26日 15:29,Rui Li 写道: 你好, 目前hive connector还没有支持ranger,只支持HMS端基于storage的权限控制。 On Thu, Feb 25, 2021 at 8:49 PM 阿华田 wrote: 目前在做基于flinksql搭建实时计算平台,使用hive的catalog管理flinksql的元数据。hive本身使用Ranger进行权限管理。但是flinksql使用hive的catalog管理元数据,用户的权限Ranger好像无法管理。各位大佬知道有什么比较好的方式解决吗? | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 -- Best regards! Rui Li
flinksql集成hive权限管理
目前在做基于flinksql搭建实时计算平台,使用hive的catalog管理flinksql的元数据。hive本身使用Ranger进行权限管理。但是flinksql使用hive的catalog管理元数据,用户的权限Ranger好像无法管理。各位大佬知道有什么比较好的方式解决吗? | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
flink clickhouse connector
使用阿里的clickhouse connector 报找不到factory 大佬们遇到吗 现在flnk sql写入clickhouse 除了使用阿里的 还有别的方式吗? org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'clickhouse' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
FOR SYSTEM_TIME AS OF 维表关联 报错
各位大佬 在flink sql客户端执行维度关联报错 sql语句:insert into sink_a select a.user_id, b.user_name from source_a as a left join source_b FOR SYSTEM_TIME AS OF a.proc_time b on a.user_id = b.user_id; 报错信息 [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], UpdateKindTraitDef=[NONE]. Missing conversion is FlinkLogicalJoin[convention: LOGICAL -> STREAM_PHYSICAL] There is 1 empty subset: rel#280:Subset#14.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], the relevant part of the original plan is as follows | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
flink-sql-gateway如何使用flink自定义的udf
各位大佬,Flink-sql-gateway 提交flink sql任务 ,如何使用flink自定义的udf | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
回复:flink1.11 DDL定义kafka source报错
错误信息: Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph. at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47) at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) at com.huahui.sqldemo.DDLSource.main(DDLSource.java:49) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.uiDesigner.snapShooter.SnapShooter.main(SnapShooter.java:59) 代码: public class DDLSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); String create_sql= "create table test\n" + "(\n" + "name varchar,\n" + "city varchar\n" + ")with (\n" + "'connector.type' = 'kafka', \n" + "'connector.version' = 'universal',\n" + "'connector.topic' = 'test',\n" + "'connector.properties.0.key' = 'group.id',\n" + "'connector.properties.0.value' = 'test_gd',\n" + "'connector.properties.1.key' = 'bootstrap.servers',\n" + "'connector.properties.1.value' = '127.0.0.1:9092',\n" + "'connector.property-version' = '1',\n" + "'connector.startup-mode' = 'latest-offset',\n" + "'format.type' = 'json',\n" + "'format.property-version' = '1',\n" + "'format.derive-schema' = 'true',\n" + "'update-mode' = 'append')"; tableEnv.executeSql(create_sql); Table table = tableEnv.sqlQuery("select name from test "); TableSchema schema = table.getSchema(); System.out.println(schema); DataStream> tuple2DataStream = tableEnv.toRetractStream(table, Row.class); tuple2DataStream.print(); tableEnv.execute("test"); //bsEnv.execute("fff"); } } | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年08月7日 13:49,阿华田 写道: 代码如下 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
flink1.11 DDL定义kafka source报错
代码如下 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
flink读取kafka超时问题
Caused by: java.lang.Exception: org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired before the position for partition dercd_seeme-3 could be determined 大佬们flink读取kafka遇到过这个错误没?现在情况是 每次重启任务都会出现这个错,但是奇怪的是多试几次任务才能运行起来。这个任务的特点读取得topic较多(6个),数据量比较大。难道是读取得数据量太大给kafka集群的broker造成了很大的负载导致请求超时? | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
回复: FileInputFormat 使用问题
解决了 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年06月18日 17:56,john<506269...@qq.com> 写道: 嗨,找到问题了吗?我也遇到了 2020年6月1日 下午2:48,阿华田 写道: //初始化任务参数 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); FileInputFormat fileInputFormat = new TextInputFormat(new Path("hdfs://arc/success_fid_flow ")); fileInputFormat.setNestedFileEnumeration(true); //过滤掉条件为true fileInputFormat.setFilesFilter(new RegexExcludePathAndTimeFilter("2020-05-24","2020-05-24")); DataSet source =env.createInput(fileInputFormat); source.output(new HdfsTrainSinktest());
env.readFile 递归读取hdfs,临时文件不存在问题
使用flink读取递归读取hdfs文件,报.tmp结尾的文件不存在异常,正常这些tmp文件flink应该不用读取吧? File does not exist: /recommender/success_fid_flow/ds=2020-06-16/hour=14/2020-06-16_14.success_fid_jarvis-01.1592287200578.tmp | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制
回复:关于多个来源,如何保证数据对齐
建议使用缓存,因为b流会延迟20分钟到,所以将a流的数据缓存20分钟,时间到了在和b流进行关联,缓存推荐使用谷歌的cache, com.google.common.cache; | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年06月15日 14:41,steven chen 写道: hi: 1.项目中我们会汇集不同来源的消息的,然和合并进行统计并输出结果。 2. 有topic a 是所有的用户pv日志, topic b 是所有用户uv日志,现在1个job同时消费a,b2个消息,并将pv,uv的结果同时输出到下一级的kafka topic c中, 问题:当a 消息 提前到达,b 消息晚20分钟到达,job 在工作时如何保证2个topic 数据对齐,或者说2边数据进行关联整合? 相当于2条消息处理后合并成1条往下游sink ,如何保证数据数据a和b对应的上?
回复:flink on yarn报错 怎么获取
对 获取flink任务的metric信息 主要是任务状态 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年06月2日 14:21,air23 写道: 分钟级别定时去获取metrics? 这样吗 在 2020-06-02 14:05:39,"阿华田" 写道: 这种情况需要对flink任务进行监控 获取flink的任务状态 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年06月2日 14:03,air23 写道: 今天发现taskmanagers报json解析失败 他一起在重启 但是我们这边是监控yarn 任务级别的。像这种task 里面报错。yarn任务又不会挂掉。应该怎么去做监控。才能得知 程序后台有问题 谢谢
回复:flink on yarn报错 怎么获取
这种情况需要对flink任务进行监控 获取flink的任务状态 | | 阿华田 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年06月2日 14:03,air23 写道: 今天发现taskmanagers报json解析失败 他一起在重启 但是我们这边是监控yarn 任务级别的。像这种task 里面报错。yarn任务又不会挂掉。应该怎么去做监控。才能得知 程序后台有问题 谢谢
FileInputFormat 使用问题
使用FileInputFormat 递归读取hdfs文件,并添加过滤器。程序执行没有报错但是很快就执行完成也没有读取到数据,本地测试可以过滤并读取到数据,yarn集群上执行出现上述情况。 代码: //初始化任务参数 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); FileInputFormat fileInputFormat = new TextInputFormat(new Path("hdfs://arc/success_fid_flow")); fileInputFormat.setNestedFileEnumeration(true); //过滤掉条件为true fileInputFormat.setFilesFilter(new RegexExcludePathAndTimeFilter("2020-05-24","2020-05-24")); DataSet source =env.createInput(fileInputFormat); source.output(new HdfsTrainSinktest()); 打印的日志: 2020-06-01 14:43:41,848 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.rpc.address, localhost 2020-06-01 14:43:41,848 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.rpc.port, 6123 2020-06-01 14:43:41,848 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.heap.size, 1024m 2020-06-01 14:43:41,848 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: taskmanager.heap.size, 1024m 2020-06-01 14:43:41,849 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: taskmanager.numberOfTaskSlots, 1 2020-06-01 14:43:41,849 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: parallelism.default, 1 2020-06-01 14:43:41,849 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.execution.failover-strategy, region 2020-06-01 14:43:41,849 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: historyserver.web.address, 0.0.0.0 2020-06-01 14:43:41,849 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: historyserver.web.port, 8082 2020-06-01 14:43:41,890 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 0 registered types and 0 default Kryo serializers 2020-06-01 14:43:41,993 INFO org.apache.flink.optimizer.Optimizer - Compiler could not determine the size of input 'TextInputFormat ([hdfs://arc/success_fid_flow]) - UTF-8'. Using default estimates. 2020-06-01 14:43:42,022 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.rpc.address, localhost 2020-06-01 14:43:42,022 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.rpc.port, 6123 2020-06-01 14:43:42,022 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.heap.size, 1024m 2020-06-01 14:43:42,022 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: taskmanager.heap.size, 1024m 2020-06-01 14:43:42,022 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: taskmanager.numberOfTaskSlots, 1 2020-06-01 14:43:42,022 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: parallelism.default, 1 2020-06-01 14:43:42,023 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.execution.failover-strategy, region 2020-06-01 14:43:42,023 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: jobmanager.archive.fs.dir, hdfs://dap/tmp/completed-jobs/ 2020-06-01 14:43:42,023 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: historyserver.web.address, 0.0.0.0 2020-06-01 14:43:42,023 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: historyserver.web.port, 8082 2020-06-01 14:43:42,023 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: historyserver.archive.fs.dir, hdfs://dap/tmp/completed-jobs/ 2020-06-01 14:43:42,023 INFO org.apache.flink.configuration.GlobalConfiguration- Loading configuration property: historyserver.archive.fs.refresh-interval, 1 2020-06-01 14:43:42,069 INFO org.apache.flink.client.program.rest.RestClusterClient- Submitting job 410508f08b0775c0529e84b221dd909d (detached: false). 2020-06-01 14:43:52,134 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 0 registered types and 0 default Kryo serializers 2020-06-01 14:43:52,167 INFO org.apache.flink.optimizer.Optimizer -Compiler could not determine the size of input 'TextInputFormat ([hdfs://arc/success_fid_flow]) - UTF-8'. Using default estimates. 2020-06-01 14:43:52,171 INFO org.apache.flink.client.program.rest.RestClusterClient-
回复:recursive.file.enumeration使用问题
说明一下 读取的数据还没有到今天的数据 也就是提示文件不存在的目录xxx/ds=2020-05-28 | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年05月28日 16:36,阿华田 写道: 使用recursive.file.enumeration开启递归读取hdfs的目录文件,但是每次数据没有读完就会报如下错误: java.io.IOException: Error opening the InputSplit hdfs://xxx/ds=2020-05-28/hour=15/2020-05-28_15.log.flume2_idcfeature_kafkamq.tmp [0,134217728]: File does not exist: /xxx/ds=2020-05-28/hour=15/ | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制
recursive.file.enumeration使用问题
使用recursive.file.enumeration开启递归读取hdfs的目录文件,但是每次数据没有读完就会报如下错误: java.io.IOException: Error opening the InputSplit hdfs://xxx/ds=2020-05-28/hour=15/2020-05-28_15.log.flume2_idcfeature_kafkamq.tmp [0,134217728]: File does not exist: /xxx/ds=2020-05-28/hour=15/ | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制
flink正则读取hdfs目录下的文件
input_data = "hdfs://localhost:9002/tmp/match_bak/%s*[0-9]" % ('2018-07-16’) result = sc.textFile(input_data) flink可以像spark一样正则读取hdfs目录下的文件吗?目前测试好像不行,如果不支持,最早什么版本会支持呢? | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制
回复: flink如何正则读取hdfs下的文件
,Jingsong Li 写道: Hi, 志华, 如果在Datastream层,你可以使用FiIenputFormat.setFilesFilter来设置文件的过滤器。 目前Table层并不原生支持filter,你可以考虑自己写一个table connector。 但是更推荐的是你能把这个事情换成partition来处理,这个支持的会更自然些。 jimandlice, - 如果是1.10或以前,你需要写一个Datastream作业加上StreamingFileSink来写入Hive,并且列存格式只有parquet的支持。[1] - 如果是1.11(正在测试发布中),Table/SQL层原生支持streaming file sink,相关文档正在编写中。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html Best, Jingsong Lee On Thu, May 21, 2020 at 10:59 AM jimandlice wrote: flink 写入hive 使用api 思路是怎么的呢 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master 在2020年05月21日 10:57,阿华田 写道: flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件 | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制 -- Best, Jingsong Lee
flink如何正则读取hdfs下的文件
flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件 | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制
回复: flink背压问题
好的 感谢 | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年04月29日 10:29,Junzhong Qin 写道: 可以试一下Jsoniter, https://jsoniter.com/index.cn.html 阿华田 于2020年4月29日周三 上午10:07写道: 这个确实排查到了,主要是json解析那块耗时,老版本用的gson,现在改成fastjson了,解析速度提升了不少。看来大数据量的json解析还得是fastjson | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年04月29日 10:02,LakeShen 写道: Hi 阿华, 数据延迟有可能是逻辑中某个环节比较耗时,比如查询 mysql,或者某处逻辑较复杂等等。 可以看看自己代码中,有么有比较耗时的逻辑。同时可以将自己认为比较耗时的地方,加上日志,看下处理时间。 Best, LakeShen 阿华田 于2020年4月29日周三 上午9:21写道: 好的 感谢大佬 | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年04月29日 09:08,zhisheng 写道: hi, 数据延迟不一定会产生背压,举个例子,Flink 写 HBase 的作业,Source 并行度为 5,Sink 并行度 10,这种情况下游写入速度很快的,可能写入速度超过 Flink 消费 Kafka 的速度,这种情况就不会出现背压的问题。 1、建议排查一下作业的并行度(可以设置和 Kafka 分区数一样); 2、背压监控是通过 Flink Web UI 监控查看的,还是通过指标来判断的? 3、对于数据延迟建议还是得对 Kafka 消费的 Topic 进行消费组的监控,加上 Lag 告警,这样可以及时知道数据延迟情况 Best ! zhisheng 阿华田 于2020年4月28日周二 上午9:37写道: 线上任务对背压进行了监控,背压一直正常,任务却出现了大量的数据延迟,数据延迟不会产生背压吗? | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制
回复: flink背压问题
这个确实排查到了,主要是json解析那块耗时,老版本用的gson,现在改成fastjson了,解析速度提升了不少。看来大数据量的json解析还得是fastjson | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年04月29日 10:02,LakeShen 写道: Hi 阿华, 数据延迟有可能是逻辑中某个环节比较耗时,比如查询 mysql,或者某处逻辑较复杂等等。 可以看看自己代码中,有么有比较耗时的逻辑。同时可以将自己认为比较耗时的地方,加上日志,看下处理时间。 Best, LakeShen 阿华田 于2020年4月29日周三 上午9:21写道: 好的 感谢大佬 | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年04月29日 09:08,zhisheng 写道: hi, 数据延迟不一定会产生背压,举个例子,Flink 写 HBase 的作业,Source 并行度为 5,Sink 并行度 10,这种情况下游写入速度很快的,可能写入速度超过 Flink 消费 Kafka 的速度,这种情况就不会出现背压的问题。 1、建议排查一下作业的并行度(可以设置和 Kafka 分区数一样); 2、背压监控是通过 Flink Web UI 监控查看的,还是通过指标来判断的? 3、对于数据延迟建议还是得对 Kafka 消费的 Topic 进行消费组的监控,加上 Lag 告警,这样可以及时知道数据延迟情况 Best ! zhisheng 阿华田 于2020年4月28日周二 上午9:37写道: 线上任务对背压进行了监控,背压一直正常,任务却出现了大量的数据延迟,数据延迟不会产生背压吗? | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制
回复: flink背压问题
好的 感谢大佬 | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制 在2020年04月29日 09:08,zhisheng 写道: hi, 数据延迟不一定会产生背压,举个例子,Flink 写 HBase 的作业,Source 并行度为 5,Sink 并行度 10,这种情况下游写入速度很快的,可能写入速度超过 Flink 消费 Kafka 的速度,这种情况就不会出现背压的问题。 1、建议排查一下作业的并行度(可以设置和 Kafka 分区数一样); 2、背压监控是通过 Flink Web UI 监控查看的,还是通过指标来判断的? 3、对于数据延迟建议还是得对 Kafka 消费的 Topic 进行消费组的监控,加上 Lag 告警,这样可以及时知道数据延迟情况 Best ! zhisheng 阿华田 于2020年4月28日周二 上午9:37写道: 线上任务对背压进行了监控,背压一直正常,任务却出现了大量的数据延迟,数据延迟不会产生背压吗? | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制
flink背压问题
线上任务对背压进行了监控,背压一直正常,任务却出现了大量的数据延迟,数据延迟不会产生背压吗? | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制
WechatIMG169
在代码中给算子添加了名称,但是flink的ui上还是显示原始的名称,这种情况大佬们遇见过吗? | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制
回复:自定义具有Exactly-Once语义的sink
谢谢 | | 王志华 | | 邮箱:a15733178...@163.com | 签名由 网易邮箱大师 定制 在2020年04月15日 11:44,zhang...@lakala.com 写道: 昨天晚上看到一篇微信公众号文章,分享给你,希望对你有帮助。 “Flink 端到端 Exactly-once 机制剖析” https://mp.weixin.qq.com/s/fhUNuCOVFQUjRB-fo4Rl2g 发件人: 阿华田 发送时间: 2020-04-15 11:00 收件人: user-zh@flink.apache.org 主题: 自定义具有Exactly-Once语义的sink 如果自定义具有Exactly-Once语义的sink,继承TwoPhaseCommitSinkFunction,怎么确保真正的Exactly-Once,大佬们有已经上线使用的案例demo吗? | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制
自定义具有Exactly-Once语义的sink
如果自定义具有Exactly-Once语义的sink,继承TwoPhaseCommitSinkFunction,怎么确保真正的Exactly-Once,大佬们有已经上线使用的案例demo吗? | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制