回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 文章
我们想修改源码 实现任意任务提交实时平台,初始化DAG的时候获取到血缘信息,StreamExecutionEnvironment注册 这种只能写在任务里 
不满足需求




| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年03月8日 18:23,Zhanghao Chen 写道:
你可以看下 OpenLineage 和 Flink 的集成方法 [1],它是在 StreamExecutionEnvironment 里注册了一个 
JobListener(通过这个可以拿到 JobClient 进而拿到 job id)。然后从 execution environment 里可以抽取到 
transformation 信息处理 [2]。

[1] https://openlineage.io/docs/integrations/flink/
[2] 
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/app/src/main/java/io/openlineage/flink/OpenLineageFlinkJobListener.java


Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 16:48
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?



”JobGraph 可以获得 transformation 信息“, JobGraph可以直接获取transformation的信息吗?, 我们是在
SourceTransformation 和SinkTransformation反射拿到链接信息 ,但是这个地方拿不到flinkJobid,  
JobGraph可以拿到source和sink的链接信息和flinkJobid?
| |
阿华田
|
|
a15733178...@163.com
|
JobGraph 可以获得 transformation 信息transformation
签名由网易邮箱大师定制


在2024年03月8日 16:18,Zhanghao Chen 写道:
JobGraph 里有个字段就是 jobid。

Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 14:14
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?

获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月26日 20:04,Feng Jin 写道:
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。

可以参考 OpenLineage[1] 的实现.


1.
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java


Best,
Feng


On Mon, Feb 26, 2024 at 6:20 PM casel.chen  wrote:

一个Flink DataStream 作业从mysql cdc消费处理后写入apache
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink
connector信息,包括连接字符串、数据库名、表名等?


回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 文章


 ”JobGraph 可以获得 transformation 信息“, JobGraph可以直接获取transformation的信息吗?, 我们是在
SourceTransformation 和SinkTransformation反射拿到链接信息 ,但是这个地方拿不到flinkJobid,  
JobGraph可以拿到source和sink的链接信息和flinkJobid?
| |
阿华田
|
|
a15733178...@163.com
|
 JobGraph 可以获得 transformation 信息transformation
签名由网易邮箱大师定制


在2024年03月8日 16:18,Zhanghao Chen 写道:
JobGraph 里有个字段就是 jobid。

Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 14:14
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?

获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月26日 20:04,Feng Jin 写道:
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。

可以参考 OpenLineage[1] 的实现.


1.
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java


Best,
Feng


On Mon, Feb 26, 2024 at 6:20 PM casel.chen  wrote:

一个Flink DataStream 作业从mysql cdc消费处理后写入apache
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink
connector信息,包括连接字符串、数据库名、表名等?


回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-07 文章
 获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月26日 20:04,Feng Jin 写道:
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。

可以参考 OpenLineage[1] 的实现.


1.
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java


Best,
Feng


On Mon, Feb 26, 2024 at 6:20 PM casel.chen  wrote:

一个Flink DataStream 作业从mysql cdc消费处理后写入apache
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink
connector信息,包括连接字符串、数据库名、表名等?


回复: Flink任务链接信息审计获取

2024-02-03 文章
看了一下 这样需要每个任务都配置listener,做不到系统级的控制,推动下游用户都去配置listener比较困难


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月2日 19:38,Feng Jin 写道:
hi,

可以参考下 OpenLineage[1] 的实现, 通过 Flink 配置JobListener 拿到 Transformation 信息,然后解析
Source 和 Sink 拿到血缘信息。

[1]
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/src/main/java/io/openlineage/flink/OpenLineageFlinkJobListener.java

Best,
Feng


On Fri, Feb 2, 2024 at 6:36 PM 阿华田  wrote:



打算做flink任务画像的事情,主要是用户跑的datastream作业,在我们的实时平台运行起来之后希望能审计到使用了哪些kafka的topic,写入了哪些中间件(mysql,hbase
,ck 等等),大佬们有什么好的方式吗,目前flinksql可以通过sql获取到,用户自己编译的flink任务包去执行datastream作业获取不到
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制




回复: Flink任务链接信息审计获取

2024-02-03 文章
好的 感谢 我研究一下


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月2日 19:38,Feng Jin 写道:
hi,

可以参考下 OpenLineage[1] 的实现, 通过 Flink 配置JobListener 拿到 Transformation 信息,然后解析
Source 和 Sink 拿到血缘信息。

[1]
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/src/main/java/io/openlineage/flink/OpenLineageFlinkJobListener.java

Best,
Feng


On Fri, Feb 2, 2024 at 6:36 PM 阿华田  wrote:



打算做flink任务画像的事情,主要是用户跑的datastream作业,在我们的实时平台运行起来之后希望能审计到使用了哪些kafka的topic,写入了哪些中间件(mysql,hbase
,ck 等等),大佬们有什么好的方式吗,目前flinksql可以通过sql获取到,用户自己编译的flink任务包去执行datastream作业获取不到
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制




Flink任务链接信息审计获取

2024-02-02 文章


打算做flink任务画像的事情,主要是用户跑的datastream作业,在我们的实时平台运行起来之后希望能审计到使用了哪些kafka的topic,写入了哪些中间件(mysql,hbase
 ,ck 等等),大佬们有什么好的方式吗,目前flinksql可以通过sql获取到,用户自己编译的flink任务包去执行datastream作业获取不到
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



回复: flink ui 算子数据展示一直loading...

2024-01-25 文章
这个维度都排查了  都正常


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年01月23日 21:57,Feng Jin 写道:
可以尝试着下面几种方式确认下原因:


1.

打开浏览器开发者模式,看是否因为请求某个接口卡住
2.

查看下 JobManager 的 GC 情况,是否频繁 FullGC
3.

查看下 JobManager 的日志,是否存在某些资源文件丢失或者磁盘异常情况导致 web UI 无法访问.


Best,
Feng


On Tue, Jan 23, 2024 at 6:16 PM 阿华田  wrote:



如下图,任务处理数据正常,任务状态也正常,但是flink_ui一致处于loading中,只有个别任务这样,其他正常,有可能是metirc包的某个类冲突导致的吗?
阿华田
a15733178...@163.com

<https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=%E9%98%BF%E5%8D%8E%E7%94%B0=a15733178518%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png=%5B%22a15733178518%40163.com%22%5D>
签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制




flink ui 算子数据展示一直loading...

2024-01-23 文章


如下图,任务处理数据正常,任务状态也正常,但是flink_ui一致处于loading中,只有个别任务这样,其他正常,有可能是metirc包的某个类冲突导致的吗?
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



kafka_appender收集flink任务日志连接数过多问题

2023-10-16 文章
Flink集群使用kafka_appender收集flink产生的日志,但是现在实时运行的任务超过了三千个,运行的yarn-container有20万+。导致存储日志的kafka集群连接数过多,kafka集群压力有点大,请教各位大佬flink日志采集这块还有什么好的方式吗


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



如何获取flink任务 source 和sink的所链接的中间件的ip信息

2023-09-22 文章
本人目前再做实时计算平台,想审计平台上所有运行flink任务的中间件的连接信息。比如job1 
是kafka写入hbase的flink任务,希望可以自动审计此任务所连接的kafka的集群ip地址和topic以及hbase的集群ip地址和表信息
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



回复: flink-job-history 任务太多页面卡死

2023-07-28 文章
这个解决不了根本问题 主要是我们的任务比较多,业务上就需要保留几千个任务


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2023年07月28日 11:28,Shammon FY 写道:
Hi,

可以通过配置`jobstore.max-capacity`和`jobstore.expiration-time`控制保存的任务数,具体参数可以参考[1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#full-jobmanager-options

Best,
Shammon FY

On Fri, Jul 28, 2023 at 10:17 AM 阿华田  wrote:

目前flink-job-history
已经收录5000+任务,当点击全部任务查看时,job-history就会卡死无法访问,各位大佬有什么好的解决方式?
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制




flink-job-history 任务太多页面卡死

2023-07-27 文章
目前flink-job-history 已经收录5000+任务,当点击全部任务查看时,job-history就会卡死无法访问,各位大佬有什么好的解决方式?
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



回复:flink on k8s 任务状态监控问题

2023-07-16 文章


history server是不是有延迟性 做不到实时获取任务的状态

| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2023年07月15日 12:14,casel.chen 写道:
可以查看history server














在 2023-07-14 18:36:42,"阿华田"  写道:


hello  各位大佬, flink on K8S  ,运行模式是native 模式,任务状态实时监控遇到一个问题: 就是pod退出了 
无法判断flink任务是正常Finished  还是异常失败了,各位大佬有什么建议吗
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



flink on k8s 任务状态监控问题

2023-07-14 文章


hello  各位大佬, flink on K8S  ,运行模式是native 模式,任务状态实时监控遇到一个问题: 就是pod退出了 
无法判断flink任务是正常Finished  还是异常失败了,各位大佬有什么建议吗
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



flinksql如何设置es 字段属性

2021-07-17 文章


使用flinksql写入es是 
index是动态的,所有想使用flinksql直接设置字段的属性,比如not_analyzed。目前发现好像只能设置字段的类型,大佬们有知道怎么设置字段的属性吗
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



自定义带有状态的udf

2021-06-01 文章
自定义UDF 实现CheckpointedFunction 
伪代码如下 发现并没有执行initializeState






public class ClusterInfoCollectUdf   extends ScalarFunction implements 
CheckpointedFunction {
private static final LoggerLOGGER = 
LoggerFactory.getLogger(ClusterInfoCollectUdf.class);
private transient MapState  mapState;
private   MapStateDescriptor mapStateDescriptor;
   。



@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) 
throws Exception {


LOGGER.info("the snapshotStateis  started ");


}

@Override
public void initializeState(FunctionInitializationContext context) throws 
Exception {
mapStateDescriptor = new MapStateDescriptor<>(
"app-status-map",
String.class,
Integer.class);

mapState = context.getKeyedStateStore().getMapState(mapStateDescriptor);
LOGGER.info("the initializeStateis  started ");




}





| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



如何自定义带有状态的UDF

2021-06-01 文章
自定义UDF 实现CheckpointedFunction 
伪代码如下 发现并没有执行initializeState






public class ClusterInfoCollectUdf   extends ScalarFunction implements 
CheckpointedFunction {
private static final LoggerLOGGER = 
LoggerFactory.getLogger(ClusterInfoCollectUdf.class);
private transient MapState  mapState;
private   MapStateDescriptor mapStateDescriptor;
   。



@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) 
throws Exception {


LOGGER.info("the snapshotStateis  started ");


}

@Override
public void initializeState(FunctionInitializationContext context) throws 
Exception {
mapStateDescriptor = new MapStateDescriptor<>(
"app-status-map",
String.class,
Integer.class);

mapState = context.getKeyedStateStore().getMapState(mapStateDescriptor);
LOGGER.info("the initializeState    is  started ");




}



| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



回复: yarn.containers.vcores使用问题

2021-03-04 文章
已经可以了  flink1.11   在jobManger的ui页面配置信息那块能看到这个参数是否配置成功


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2021年03月5日 11:44,Xintong Song 写道:
你的 flink 是什么版本?
部署模式是 per-job 还是 session?
“看到任务配置参数也生效了”具体是在哪里看到的?

Thank you~

Xintong Song



On Thu, Mar 4, 2021 at 4:35 PM 阿华田  wrote:

使用-yD yarn.containers.vcores=4
区设置flink任务的总的cpu核数,看到任务配置参数也生效了 但是实际申请核数还是按照 cpu  slot一比一申请的
各位大佬使用yarn.containers.vcores是不是还需要开启yarn的cpu 调度
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制




yarn.containers.vcores使用问题

2021-03-04 文章
使用-yD yarn.containers.vcores=4 
区设置flink任务的总的cpu核数,看到任务配置参数也生效了 但是实际申请核数还是按照 cpu  slot一比一申请的
各位大佬使用yarn.containers.vcores是不是还需要开启yarn的cpu 调度
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



回复: flinksql集成hive权限管理

2021-02-26 文章
Ok  感谢


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2021年02月26日 15:29,Rui Li 写道:
你好,

目前hive connector还没有支持ranger,只支持HMS端基于storage的权限控制。

On Thu, Feb 25, 2021 at 8:49 PM 阿华田  wrote:


目前在做基于flinksql搭建实时计算平台,使用hive的catalog管理flinksql的元数据。hive本身使用Ranger进行权限管理。但是flinksql使用hive的catalog管理元数据,用户的权限Ranger好像无法管理。各位大佬知道有什么比较好的方式解决吗?


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



--
Best regards!
Rui Li


flinksql集成hive权限管理

2021-02-25 文章
目前在做基于flinksql搭建实时计算平台,使用hive的catalog管理flinksql的元数据。hive本身使用Ranger进行权限管理。但是flinksql使用hive的catalog管理元数据,用户的权限Ranger好像无法管理。各位大佬知道有什么比较好的方式解决吗?


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



flink clickhouse connector

2021-02-02 文章
使用阿里的clickhouse connector  报找不到factory  大佬们遇到吗  
现在flnk sql写入clickhouse 除了使用阿里的  还有别的方式吗?




org.apache.flink.table.api.ValidationException: Could not find any factory for 
identifier 'clickhouse' that implements 
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



FOR SYSTEM_TIME AS OF 维表关联 报错

2021-01-29 文章


各位大佬 在flink sql客户端执行维度关联报错
sql语句:insert into  sink_a select a.user_id, b.user_name  from source_a as a 
left join source_b  FOR SYSTEM_TIME AS OF a.proc_time  b on a.user_id = 
b.user_id; 
报错信息
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough 
rules to produce a node with desired properties: convention=STREAM_PHYSICAL, 
FlinkRelDistributionTraitDef=any, MiniBatchIntervalTraitDef=None: 0, 
ModifyKindSetTraitDef=[NONE], UpdateKindTraitDef=[NONE].
Missing conversion is FlinkLogicalJoin[convention: LOGICAL -> STREAM_PHYSICAL]
There is 1 empty subset: rel#280:Subset#14.STREAM_PHYSICAL.any.None: 
0.[NONE].[NONE], the relevant part of the original plan is as follows
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



flink-sql-gateway如何使用flink自定义的udf

2021-01-26 文章
各位大佬,Flink-sql-gateway 提交flink sql任务 ,如何使用flink自定义的udf


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



回复:flink1.11 DDL定义kafka source报错

2020-08-07 文章
错误信息:
Exception in thread "main" java.lang.IllegalStateException: No operators 
defined in streaming topology. Cannot generate StreamGraph.
at 
org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at 
org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at com.huahui.sqldemo.DDLSource.main(DDLSource.java:49)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.uiDesigner.snapShooter.SnapShooter.main(SnapShooter.java:59)




代码:
public class DDLSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment bsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(bsEnv, 
bsSettings);
String  create_sql=
"create table test\n" +
"(\n" +
"name varchar,\n" +
"city varchar\n" +
")with (\n" +
"'connector.type' = 'kafka', \n" +
"'connector.version' = 'universal',\n" +
"'connector.topic' = 'test',\n" +
"'connector.properties.0.key' = 'group.id',\n" +
"'connector.properties.0.value' = 'test_gd',\n" +
"'connector.properties.1.key' = 'bootstrap.servers',\n" +
"'connector.properties.1.value' = '127.0.0.1:9092',\n" +
"'connector.property-version' = '1',\n" +
"'connector.startup-mode' = 'latest-offset',\n" +
"'format.type' = 'json',\n" +
"'format.property-version' = '1',\n" +
"'format.derive-schema' = 'true',\n" +
"'update-mode' = 'append')";

tableEnv.executeSql(create_sql);
Table table = tableEnv.sqlQuery("select name from test ");
TableSchema schema = table.getSchema();
System.out.println(schema);
DataStream> tuple2DataStream = 
tableEnv.toRetractStream(table, Row.class);
tuple2DataStream.print();
tableEnv.execute("test");
//bsEnv.execute("fff");
}
}



| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2020年08月7日 13:49,阿华田 写道:
代码如下


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



flink1.11 DDL定义kafka source报错

2020-08-06 文章
代码如下


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



flink读取kafka超时问题

2020-06-28 文章
Caused by: java.lang.Exception: 
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
before the position for partition dercd_seeme-3 could be determined  
大佬们flink读取kafka遇到过这个错误没?现在情况是 
每次重启任务都会出现这个错,但是奇怪的是多试几次任务才能运行起来。这个任务的特点读取得topic较多(6个),数据量比较大。难道是读取得数据量太大给kafka集群的broker造成了很大的负载导致请求超时?


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



回复: FileInputFormat 使用问题

2020-06-20 文章
解决了  
| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2020年06月18日 17:56,john<506269...@qq.com> 写道:
嗨,找到问题了吗?我也遇到了

2020年6月1日 下午2:48,阿华田  写道:

//初始化任务参数
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
FileInputFormat fileInputFormat = new TextInputFormat(new 
Path("hdfs://arc/success_fid_flow "));
fileInputFormat.setNestedFileEnumeration(true);
//过滤掉条件为true
fileInputFormat.setFilesFilter(new 
RegexExcludePathAndTimeFilter("2020-05-24","2020-05-24"));
DataSet source =env.createInput(fileInputFormat);
source.output(new HdfsTrainSinktest());



env.readFile 递归读取hdfs,临时文件不存在问题

2020-06-17 文章
使用flink读取递归读取hdfs文件,报.tmp结尾的文件不存在异常,正常这些tmp文件flink应该不用读取吧?
File does not exist: 
/recommender/success_fid_flow/ds=2020-06-16/hour=14/2020-06-16_14.success_fid_jarvis-01.1592287200578.tmp


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



回复:关于多个来源,如何保证数据对齐

2020-06-15 文章
建议使用缓存,因为b流会延迟20分钟到,所以将a流的数据缓存20分钟,时间到了在和b流进行关联,缓存推荐使用谷歌的cache, 
com.google.common.cache;


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2020年06月15日 14:41,steven chen 写道:
hi:
1.项目中我们会汇集不同来源的消息的,然和合并进行统计并输出结果。
2. 有topic a 是所有的用户pv日志, topic b 
是所有用户uv日志,现在1个job同时消费a,b2个消息,并将pv,uv的结果同时输出到下一级的kafka topic c中,
问题:当a 消息 提前到达,b 消息晚20分钟到达,job 在工作时如何保证2个topic 数据对齐,或者说2边数据进行关联整合?
相当于2条消息处理后合并成1条往下游sink ,如何保证数据数据a和b对应的上?






回复:flink on yarn报错 怎么获取

2020-06-02 文章
对 获取flink任务的metric信息 主要是任务状态


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2020年06月2日 14:21,air23 写道:
分钟级别定时去获取metrics?
这样吗



















在 2020-06-02 14:05:39,"阿华田"  写道:
这种情况需要对flink任务进行监控 获取flink的任务状态


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2020年06月2日 14:03,air23 写道:
今天发现taskmanagers报json解析失败 他一起在重启
但是我们这边是监控yarn 任务级别的。像这种task 里面报错。yarn任务又不会挂掉。应该怎么去做监控。才能得知 程序后台有问题
谢谢


回复:flink on yarn报错 怎么获取

2020-06-02 文章
这种情况需要对flink任务进行监控 获取flink的任务状态


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2020年06月2日 14:03,air23 写道:
今天发现taskmanagers报json解析失败 他一起在重启
但是我们这边是监控yarn 任务级别的。像这种task 里面报错。yarn任务又不会挂掉。应该怎么去做监控。才能得知 程序后台有问题
谢谢

FileInputFormat 使用问题

2020-06-01 文章
使用FileInputFormat 
递归读取hdfs文件,并添加过滤器。程序执行没有报错但是很快就执行完成也没有读取到数据,本地测试可以过滤并读取到数据,yarn集群上执行出现上述情况。
代码:
//初始化任务参数
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
FileInputFormat fileInputFormat = new TextInputFormat(new 
Path("hdfs://arc/success_fid_flow"));
fileInputFormat.setNestedFileEnumeration(true);
//过滤掉条件为true
fileInputFormat.setFilesFilter(new 
RegexExcludePathAndTimeFilter("2020-05-24","2020-05-24"));
DataSet source =env.createInput(fileInputFormat);
source.output(new HdfsTrainSinktest());
打印的日志:


2020-06-01 14:43:41,848 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rpc.address, localhost
2020-06-01 14:43:41,848 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rpc.port, 6123
2020-06-01 14:43:41,848 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.heap.size, 1024m
2020-06-01 14:43:41,848 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: taskmanager.heap.size, 1024m
2020-06-01 14:43:41,849 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: taskmanager.numberOfTaskSlots, 1
2020-06-01 14:43:41,849 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: parallelism.default, 1
2020-06-01 14:43:41,849 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.execution.failover-strategy, region
2020-06-01 14:43:41,849 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: historyserver.web.address, 0.0.0.0
2020-06-01 14:43:41,849 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: historyserver.web.port, 8082
2020-06-01 14:43:41,890 INFO  org.apache.flink.api.java.ExecutionEnvironment
- The job has 0 registered types and 0 default Kryo serializers
2020-06-01 14:43:41,993 INFO  org.apache.flink.optimizer.Optimizer  
- Compiler could not determine the size of input 'TextInputFormat 
([hdfs://arc/success_fid_flow]) - UTF-8'. Using default estimates.
2020-06-01 14:43:42,022 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rpc.address, localhost
2020-06-01 14:43:42,022 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.rpc.port, 6123
2020-06-01 14:43:42,022 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.heap.size, 1024m
2020-06-01 14:43:42,022 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: taskmanager.heap.size, 1024m
2020-06-01 14:43:42,022 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: taskmanager.numberOfTaskSlots, 1
2020-06-01 14:43:42,022 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: parallelism.default, 1
2020-06-01 14:43:42,023 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.execution.failover-strategy, region
2020-06-01 14:43:42,023 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: jobmanager.archive.fs.dir, 
hdfs://dap/tmp/completed-jobs/
2020-06-01 14:43:42,023 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: historyserver.web.address, 0.0.0.0
2020-06-01 14:43:42,023 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: historyserver.web.port, 8082
2020-06-01 14:43:42,023 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: historyserver.archive.fs.dir, 
hdfs://dap/tmp/completed-jobs/
2020-06-01 14:43:42,023 INFO  
org.apache.flink.configuration.GlobalConfiguration- Loading 
configuration property: historyserver.archive.fs.refresh-interval, 1
2020-06-01 14:43:42,069 INFO  
org.apache.flink.client.program.rest.RestClusterClient- Submitting job 
410508f08b0775c0529e84b221dd909d (detached: false).
2020-06-01 14:43:52,134 INFO  org.apache.flink.api.java.ExecutionEnvironment
- The job has 0 registered types and 0 default Kryo serializers
2020-06-01 14:43:52,167 INFO  org.apache.flink.optimizer.Optimizer  
-Compiler could not determine the size of input 'TextInputFormat 
([hdfs://arc/success_fid_flow]) - UTF-8'. Using default estimates.
2020-06-01 14:43:52,171 INFO  
org.apache.flink.client.program.rest.RestClusterClient- 

回复:recursive.file.enumeration使用问题

2020-05-28 文章
说明一下 读取的数据还没有到今天的数据 也就是提示文件不存在的目录xxx/ds=2020-05-28


| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2020年05月28日 16:36,阿华田 写道:
使用recursive.file.enumeration开启递归读取hdfs的目录文件,但是每次数据没有读完就会报如下错误:


java.io.IOException: Error opening the InputSplit 
hdfs://xxx/ds=2020-05-28/hour=15/2020-05-28_15.log.flume2_idcfeature_kafkamq.tmp
 [0,134217728]: File does not exist: /xxx/ds=2020-05-28/hour=15/
| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



recursive.file.enumeration使用问题

2020-05-28 文章
使用recursive.file.enumeration开启递归读取hdfs的目录文件,但是每次数据没有读完就会报如下错误:


java.io.IOException: Error opening the InputSplit 
hdfs://xxx/ds=2020-05-28/hour=15/2020-05-28_15.log.flume2_idcfeature_kafkamq.tmp
 [0,134217728]: File does not exist: /xxx/ds=2020-05-28/hour=15/
| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



flink正则读取hdfs目录下的文件

2020-05-21 文章
input_data = "hdfs://localhost:9002/tmp/match_bak/%s*[0-9]" % ('2018-07-16’)
result = sc.textFile(input_data)
flink可以像spark一样正则读取hdfs目录下的文件吗?目前测试好像不行,如果不支持,最早什么版本会支持呢?


| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



回复: flink如何正则读取hdfs下的文件

2020-05-21 文章
,Jingsong Li 写道:
Hi,

志华,
如果在Datastream层,你可以使用FiIenputFormat.setFilesFilter来设置文件的过滤器。
目前Table层并不原生支持filter,你可以考虑自己写一个table connector。
但是更推荐的是你能把这个事情换成partition来处理,这个支持的会更自然些。

jimandlice,
-
如果是1.10或以前,你需要写一个Datastream作业加上StreamingFileSink来写入Hive,并且列存格式只有parquet的支持。[1]
- 如果是1.11(正在测试发布中),Table/SQL层原生支持streaming file sink,相关文档正在编写中。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/streamfile_sink.html

Best,
Jingsong Lee

On Thu, May 21, 2020 at 10:59 AM jimandlice  wrote:

flink 写入hive 使用api 思路是怎么的呢


| |
jimandlice
|
|
邮箱:jimandl...@163.com
|

Signature is customized by Netease Mail Master

在2020年05月21日 10:57,阿华田 写道:
flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件


| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



--
Best, Jingsong Lee


flink如何正则读取hdfs下的文件

2020-05-20 文章
flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件


| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



回复: flink背压问题

2020-04-28 文章
好的 感谢


| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2020年04月29日 10:29,Junzhong Qin 写道:
可以试一下Jsoniter, https://jsoniter.com/index.cn.html

阿华田  于2020年4月29日周三 上午10:07写道:


这个确实排查到了,主要是json解析那块耗时,老版本用的gson,现在改成fastjson了,解析速度提升了不少。看来大数据量的json解析还得是fastjson


| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2020年04月29日 10:02,LakeShen 写道:
Hi 阿华,

数据延迟有可能是逻辑中某个环节比较耗时,比如查询 mysql,或者某处逻辑较复杂等等。

可以看看自己代码中,有么有比较耗时的逻辑。同时可以将自己认为比较耗时的地方,加上日志,看下处理时间。

Best,
LakeShen


阿华田  于2020年4月29日周三 上午9:21写道:

好的 感谢大佬



| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2020年04月29日 09:08,zhisheng 写道:
hi,

数据延迟不一定会产生背压,举个例子,Flink 写 HBase 的作业,Source 并行度为 5,Sink 并行度
10,这种情况下游写入速度很快的,可能写入速度超过 Flink 消费 Kafka 的速度,这种情况就不会出现背压的问题。

1、建议排查一下作业的并行度(可以设置和 Kafka 分区数一样);

2、背压监控是通过 Flink Web UI 监控查看的,还是通过指标来判断的?

3、对于数据延迟建议还是得对 Kafka 消费的 Topic 进行消费组的监控,加上 Lag 告警,这样可以及时知道数据延迟情况

Best !

zhisheng

阿华田  于2020年4月28日周二 上午9:37写道:

线上任务对背压进行了监控,背压一直正常,任务却出现了大量的数据延迟,数据延迟不会产生背压吗?


| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制






回复: flink背压问题

2020-04-28 文章
这个确实排查到了,主要是json解析那块耗时,老版本用的gson,现在改成fastjson了,解析速度提升了不少。看来大数据量的json解析还得是fastjson


| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2020年04月29日 10:02,LakeShen 写道:
Hi 阿华,

数据延迟有可能是逻辑中某个环节比较耗时,比如查询 mysql,或者某处逻辑较复杂等等。

可以看看自己代码中,有么有比较耗时的逻辑。同时可以将自己认为比较耗时的地方,加上日志,看下处理时间。

Best,
LakeShen


阿华田  于2020年4月29日周三 上午9:21写道:

好的 感谢大佬



| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2020年04月29日 09:08,zhisheng 写道:
hi,

数据延迟不一定会产生背压,举个例子,Flink 写 HBase 的作业,Source 并行度为 5,Sink 并行度
10,这种情况下游写入速度很快的,可能写入速度超过 Flink 消费 Kafka 的速度,这种情况就不会出现背压的问题。

1、建议排查一下作业的并行度(可以设置和 Kafka 分区数一样);

2、背压监控是通过 Flink Web UI 监控查看的,还是通过指标来判断的?

3、对于数据延迟建议还是得对 Kafka 消费的 Topic 进行消费组的监控,加上 Lag 告警,这样可以及时知道数据延迟情况

Best !

zhisheng

阿华田  于2020年4月28日周二 上午9:37写道:

线上任务对背压进行了监控,背压一直正常,任务却出现了大量的数据延迟,数据延迟不会产生背压吗?


| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制





回复: flink背压问题

2020-04-28 文章
好的 感谢大佬



| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2020年04月29日 09:08,zhisheng 写道:
hi,

数据延迟不一定会产生背压,举个例子,Flink 写 HBase 的作业,Source 并行度为 5,Sink 并行度
10,这种情况下游写入速度很快的,可能写入速度超过 Flink 消费 Kafka 的速度,这种情况就不会出现背压的问题。

1、建议排查一下作业的并行度(可以设置和 Kafka 分区数一样);

2、背压监控是通过 Flink Web UI 监控查看的,还是通过指标来判断的?

3、对于数据延迟建议还是得对 Kafka 消费的 Topic 进行消费组的监控,加上 Lag 告警,这样可以及时知道数据延迟情况

Best !

zhisheng

阿华田  于2020年4月28日周二 上午9:37写道:

线上任务对背压进行了监控,背压一直正常,任务却出现了大量的数据延迟,数据延迟不会产生背压吗?


| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制




flink背压问题

2020-04-27 文章
线上任务对背压进行了监控,背压一直正常,任务却出现了大量的数据延迟,数据延迟不会产生背压吗?


| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



WechatIMG169

2020-04-16 文章
在代码中给算子添加了名称,但是flink的ui上还是显示原始的名称,这种情况大佬们遇见过吗? 


| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



回复:自定义具有Exactly-Once语义的sink

2020-04-14 文章
 谢谢




| |
王志华
|
|
邮箱:a15733178...@163.com
|

签名由 网易邮箱大师 定制

在2020年04月15日 11:44,zhang...@lakala.com 写道:
昨天晚上看到一篇微信公众号文章,分享给你,希望对你有帮助。
“Flink 端到端 Exactly-once 机制剖析”
https://mp.weixin.qq.com/s/fhUNuCOVFQUjRB-fo4Rl2g


发件人: 阿华田
发送时间: 2020-04-15 11:00
收件人: user-zh@flink.apache.org
主题: 自定义具有Exactly-Once语义的sink
如果自定义具有Exactly-Once语义的sink,继承TwoPhaseCommitSinkFunction,怎么确保真正的Exactly-Once,大佬们有已经上线使用的案例demo吗?
| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



自定义具有Exactly-Once语义的sink

2020-04-14 文章
如果自定义具有Exactly-Once语义的sink,继承TwoPhaseCommitSinkFunction,怎么确保真正的Exactly-Once,大佬们有已经上线使用的案例demo吗?
| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制