Re: 回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 文章 Zhanghao Chen
事实上是可行的。你可以直接修改 StreamExecutionEnvironment 的源码,默认给作业作业注册上一个你们定制的 
listener,然后通过某种那个方式把这个信息透出来。在 FLIP-314 [1] 中,我们计划直接在 Flink 里原生提供一个这样的接口让你去注册自己的 
listener 获取血缘信息,不过还没发布,可以先自己做。

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-314:+Support+Customized+Job+Lineage+Listener

From: 阿华田 
Sent: Friday, March 8, 2024 18:47
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?

我们想修改源码 实现任意任务提交实时平台,初始化DAG的时候获取到血缘信息,StreamExecutionEnvironment注册 这种只能写在任务里 
不满足需求




| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年03月8日 18:23,Zhanghao Chen 写道:
你可以看下 OpenLineage 和 Flink 的集成方法 [1],它是在 StreamExecutionEnvironment 里注册了一个 
JobListener(通过这个可以拿到 JobClient 进而拿到 job id)。然后从 execution environment 里可以抽取到 
transformation 信息处理 [2]。

[1] https://openlineage.io/docs/integrations/flink/
[2] 
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/app/src/main/java/io/openlineage/flink/OpenLineageFlinkJobListener.java


Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 16:48
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?



”JobGraph 可以获得 transformation 信息“, JobGraph可以直接获取transformation的信息吗?, 我们是在
SourceTransformation 和SinkTransformation反射拿到链接信息 ,但是这个地方拿不到flinkJobid,  
JobGraph可以拿到source和sink的链接信息和flinkJobid?
| |
阿华田
|
|
a15733178...@163.com
|
JobGraph 可以获得 transformation 信息transformation
签名由网易邮箱大师定制


在2024年03月8日 16:18,Zhanghao Chen 写道:
JobGraph 里有个字段就是 jobid。

Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 14:14
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?

获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月26日 20:04,Feng Jin 写道:
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。

可以参考 OpenLineage[1] 的实现.


1.
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java


Best,
Feng


On Mon, Feb 26, 2024 at 6:20 PM casel.chen  wrote:

一个Flink DataStream 作业从mysql cdc消费处理后写入apache
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink
connector信息,包括连接字符串、数据库名、表名等?


回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 文章 阿华田
我们想修改源码 实现任意任务提交实时平台,初始化DAG的时候获取到血缘信息,StreamExecutionEnvironment注册 这种只能写在任务里 
不满足需求




| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年03月8日 18:23,Zhanghao Chen 写道:
你可以看下 OpenLineage 和 Flink 的集成方法 [1],它是在 StreamExecutionEnvironment 里注册了一个 
JobListener(通过这个可以拿到 JobClient 进而拿到 job id)。然后从 execution environment 里可以抽取到 
transformation 信息处理 [2]。

[1] https://openlineage.io/docs/integrations/flink/
[2] 
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/app/src/main/java/io/openlineage/flink/OpenLineageFlinkJobListener.java


Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 16:48
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?



”JobGraph 可以获得 transformation 信息“, JobGraph可以直接获取transformation的信息吗?, 我们是在
SourceTransformation 和SinkTransformation反射拿到链接信息 ,但是这个地方拿不到flinkJobid,  
JobGraph可以拿到source和sink的链接信息和flinkJobid?
| |
阿华田
|
|
a15733178...@163.com
|
JobGraph 可以获得 transformation 信息transformation
签名由网易邮箱大师定制


在2024年03月8日 16:18,Zhanghao Chen 写道:
JobGraph 里有个字段就是 jobid。

Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 14:14
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?

获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月26日 20:04,Feng Jin 写道:
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。

可以参考 OpenLineage[1] 的实现.


1.
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java


Best,
Feng


On Mon, Feb 26, 2024 at 6:20 PM casel.chen  wrote:

一个Flink DataStream 作业从mysql cdc消费处理后写入apache
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink
connector信息,包括连接字符串、数据库名、表名等?


Re: 回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 文章 Zhanghao Chen
你可以看下 OpenLineage 和 Flink 的集成方法 [1],它是在 StreamExecutionEnvironment 里注册了一个 
JobListener(通过这个可以拿到 JobClient 进而拿到 job id)。然后从 execution environment 里可以抽取到 
transformation 信息处理 [2]。

[1] https://openlineage.io/docs/integrations/flink/
[2] 
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/app/src/main/java/io/openlineage/flink/OpenLineageFlinkJobListener.java


Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 16:48
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?



 ”JobGraph 可以获得 transformation 信息“, JobGraph可以直接获取transformation的信息吗?, 我们是在
SourceTransformation 和SinkTransformation反射拿到链接信息 ,但是这个地方拿不到flinkJobid,  
JobGraph可以拿到source和sink的链接信息和flinkJobid?
| |
阿华田
|
|
a15733178...@163.com
|
 JobGraph 可以获得 transformation 信息transformation
签名由网易邮箱大师定制


在2024年03月8日 16:18,Zhanghao Chen 写道:
JobGraph 里有个字段就是 jobid。

Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 14:14
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?

获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月26日 20:04,Feng Jin 写道:
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。

可以参考 OpenLineage[1] 的实现.


1.
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java


Best,
Feng


On Mon, Feb 26, 2024 at 6:20 PM casel.chen  wrote:

一个Flink DataStream 作业从mysql cdc消费处理后写入apache
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink
connector信息,包括连接字符串、数据库名、表名等?


回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 文章 阿华田


 ”JobGraph 可以获得 transformation 信息“, JobGraph可以直接获取transformation的信息吗?, 我们是在
SourceTransformation 和SinkTransformation反射拿到链接信息 ,但是这个地方拿不到flinkJobid,  
JobGraph可以拿到source和sink的链接信息和flinkJobid?
| |
阿华田
|
|
a15733178...@163.com
|
 JobGraph 可以获得 transformation 信息transformation
签名由网易邮箱大师定制


在2024年03月8日 16:18,Zhanghao Chen 写道:
JobGraph 里有个字段就是 jobid。

Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 14:14
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?

获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月26日 20:04,Feng Jin 写道:
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。

可以参考 OpenLineage[1] 的实现.


1.
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java


Best,
Feng


On Mon, Feb 26, 2024 at 6:20 PM casel.chen  wrote:

一个Flink DataStream 作业从mysql cdc消费处理后写入apache
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink
connector信息,包括连接字符串、数据库名、表名等?


Re: 回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 文章 Zhanghao Chen
JobGraph 里有个字段就是 jobid。

Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 14:14
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?

获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月26日 20:04,Feng Jin 写道:
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。

可以参考 OpenLineage[1] 的实现.


1.
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java


Best,
Feng


On Mon, Feb 26, 2024 at 6:20 PM casel.chen  wrote:

一个Flink DataStream 作业从mysql cdc消费处理后写入apache
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink
connector信息,包括连接字符串、数据库名、表名等?


回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-07 文章 阿华田
 获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月26日 20:04,Feng Jin 写道:
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。

可以参考 OpenLineage[1] 的实现.


1.
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java


Best,
Feng


On Mon, Feb 26, 2024 at 6:20 PM casel.chen  wrote:

一个Flink DataStream 作业从mysql cdc消费处理后写入apache
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink
connector信息,包括连接字符串、数据库名、表名等?