Re: How's JobManager bring up TaskManager in Application Mode or Session Mode?

2022-11-28 Thread Matthias Pohl via user
Hi Mark,
the JobManager is not necessarily in charge of spinning up TaskManager
instances. It depends on the resource provider configuration you choose.
Flink differentiates between active and passive Resource Management (see
the two available implementations of ResourceManager [1]).

Active Resource Management actually takes care of spinning up new
TaskManager instances if needed (i.e. Flink runs out of free task slots).
This is handled by the corresponding AbstractResourceManageDriver
implementations [2].

In contrast, passive Resource Management (i.e. through the standalone
resource provider configurations [3]) doesn't do anything like that. Here,
Flink works with the TaskManagers that were instantiated by an external
process. Each TaskManager instance registers itself to the JobManager that
is specified in the Flink configuration which is provided to the
corresponding TaskManager instance.

I hope that helps. For future posts, please solely use the user mailing
list for questions around understanding Flink or troubleshooting. The dev
mailing list is reserved for development-related questions [4].

Matthias

[1]
https://github.com/apache/flink/blob/55a8d1a76067204e00839f1b6a2c09965434eaa4/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java#L117
[2]
https://github.com/apache/flink/blob/9815caad271a561640ffe0df7193c04270d53a25/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/AbstractResourceManagerDriver.java#L33
[3]
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/resource-providers/standalone/overview/
[4] https://flink.apache.org/community.html#mailing-lists

On Tue, Nov 29, 2022 at 5:23 AM 李  wrote:

> Hi,
>
>How's JobManager bring up TaskManager in Application Mode or Session
> Mode? I can’t get it even after reading source code of flink operator?
>
> Any help will be appreciate, Thank you.
>
>  Mark
>
>
>


Re: 如何使用flink sql优雅的处理大量嵌套if-else逻辑

2022-11-28 Thread macia kk
我会选择 UDF  + 配置文件,把配置文件放 HDFS上,UDF读这个配置文件。每次更新HDFS的配置文件,重启下任务

casel.chen  于2022年11月24日周四 12:01写道:

> 我有一个flink
> sql作业需要根据不同字段值满足不同条件来设置另一个字段值,还有一些嵌套if-else逻辑,这块逻辑不是固定的,业务方会过一段时间调整一次。
> 想请问如何使用flink sql优雅的处理嵌套if-else逻辑呢?我有想到使用drools规则引擎,通过udf来调用,不知道还有没有更好的办法?
>
>


How's JobManager bring up TaskManager in Application Mode or Session Mode?

2022-11-28 Thread
Hi,

   How's JobManager bring up TaskManager in Application Mode or Session Mode? I 
can’t get it even after reading source code of flink operator?

Any help will be appreciate, Thank you.

 Mark




Support Stored procedures

2022-11-28 Thread melin li
Supports operations like hudi/iceberg calls, such as savepoint/ checkpoint,
https://hudi.apache.org/docs/procedures/

CALL system.procedure_name(arg_1, arg_2, ... arg_n)

Based on the flink development platform, direct use of call sql to complete
some management operations, will be very convenient. hudi/iceberg can
easily customize various table actions


Re: 怎样从flink执行计划json生成StreamGraph?

2022-11-28 Thread yidan zhao
并不需要从执行计划json生成streamGraph呀~
streamGraph提交之前直接转jobGraph。

casel.chen  于2022年11月28日周一 08:53写道:
>
> 源码中只找到如何从streamgraph生成执行计划的json串,但没有找到如何解析执行计划的json串生成回streamgraph,还请赐教


flink sql有办法获取到rowkind元数据字段吗?

2022-11-28 Thread casel.chen
flink sql有办法获取到rowkind元数据字段吗?比如按rowkind进行case when处理或者过滤

flink sql作业无缝升级问题

2022-11-28 Thread casel.chen
线上有一个流量较大的flink sql作业需要升级添加业务字段,当前主要是kafka (canal) 多表关联写入 mongodb 
数据同步场景,除了source offset外无其他状态,如何让用户对升级无感呢?
常规的停止作业再启动作业至少要几十秒,会造成消息积压告警,有没有可能先启新作业待运行平稳后再停止老作业?kafka 
group使用同一个,作业启动从group-offsets开始可以吗?另外,如果是有大状态作业又要如何无缝升级?

flink sql接cdc数据源按最新数据统计问题

2022-11-28 Thread casel.chen
业务需求是mysql订单表按天按供应商实时统计交易金额,订单表会发生修改和删除,用flink 
sql要如何实现呢?开窗取最新一条记录再聚合吗?如果遇到delete记录会不会减去相应的price呢?试着写了如下flink sql不知道对不对


select 
  s.biddate, 
  s.supplier, 
  sum(s.price) 
from 
  (
select 
  * 
from 
  (
select 
  biddate, 
  supplier, 
  price, 
  ROW_NUMBER() OVER (
PARTITION BY biddate, 
supplier 
ORDER BY 
  bidtime DESC
  ) as rownum 
from 
  (
select 
  bidtime, 
  date_format(bidtime, '-MM-dd-HH') as biddate, 
  supplier, 
  price 
from 
  orders
  )
  ) as t 
where 
  t.rownum = 1
  ) as s 
group by 
  s.biddate, 
  s.supplier
;



Flink Kubernetes Operator何时会保留JobManager

2022-11-28 Thread hjw
环境:
Flink:1.15
Operator: 1.2.0


问题:
注意到Operator源码里execution.shutdown-on-application-finish参数被设置为False。
想请问Flink operator部署的Flink作业在何时会保留JobManager的pod?因为我想在作业失败或者停止后还能获取到作业的日志。


除了正常Running状态会保留外,我尝试了其他情况。
1、作业State设置为suspended:保留对应的FlinkDeployment资源类型,但作业相关的Deployment和service全部删除,包括JobManager所在pod。
2、直接执行kubectl delete flinkdeployment 
my-deployment。作业的所有关联资源全被删除,包括HA资源,状态存储信息,Deployment等。











--

Best,
Hjw

回复:请问flink metrics如何获取任务状态?

2022-11-28 Thread m17610775726_1
hi


你的图片挂了 可以用图床上传一下图片 在这里贴个链接 另外自定义 reportor 把需要的metric 过滤出来上报就行了
 回复的原邮件 
| 发件人 | 陈佳豪 |
| 发送日期 | 2022年11月28日 00:54 |
| 收件人 | user-zh |
| 主题 | 请问flink metrics如何获取任务状态? |
自定义了一个kafka  Metric Reporters #请问如何使用上述指标呢?
 我想通过上报获取任务状态。除了上述指标外如果有其他方案也可以,当前flink 版本是15.2 还望大神指教一番。

Flink Kubernetes Operator何时会保留JobManager Pod

2022-11-28 Thread hjw
环境:

Flink:1.15
Operator: 1.2.0


问题:
注意到Operator源码里execution.shutdown-on-application-finish参数被设置为False。
想请问Flink operator部署的Flink作业在何时会保留JobManager的pod?因为我想在作业失败或者停止后还能获取到作业的日志。


除了正常Running状态会保留外,我尝试了其他情况。
1、作业State设置为suspended:保留对应的FlinkDeployment资源类型,但作业相关的Deployment和service全部删除,包括JobManager所在pod。
2、直接执行kubectl delete flinkdeployment 
my-deployment。作业的所有关联资源全被删除,包括HA资源,状态存储信息,Deployment等。













--

Best,
Hjw

请问flink sql可以被捕获异常吗?

2022-11-28 Thread 陈佳豪
hi
请问能在java代码里面try catch到基于flink sql写的任务异常信息吗?

Re:Re: Flink Kubernetes Operator何时会保留JobManager Pod

2022-11-28 Thread hjw
hi.如果是流作业呢?其实我是想在作业失败的时候能保留日志方便查看排查。







--

Best,
Hjw





在 2022-11-28 15:33:37,"Biao Geng"  写道:
>hi,主要就是针对作业FINISHED或者FAILED时也能保留作业。你可以跑一个批作业试试。
>Best,
>Biao Geng
>
>获取 Outlook for iOS
>
>发件人: hjw 
>发送时间: Monday, November 28, 2022 3:13:56 PM
>收件人: user-zh@flink.apache.org 
>主题: Flink Kubernetes Operator何时会保留JobManager Pod
>
>环境:
>
>Flink:1.15
>Operator: 1.2.0
>
>
>问题:
>注意到Operator源码里execution.shutdown-on-application-finish参数被设置为False。
>想请问Flink operator部署的Flink作业在何时会保留JobManager的pod?因为我想在作业失败或者停止后还能获取到作业的日志。
>
>
>除了正常Running状态会保留外,我尝试了其他情况。
>1、作业State设置为suspended:保留对应的FlinkDeployment资源类型,但作业相关的Deployment和service全部删除,包括JobManager所在pod。
>2、直接执行kubectl delete flinkdeployment 
>my-deployment。作业的所有关联资源全被删除,包括HA资源,状态存储信息,Deployment等。
>
>
>
>
>
>
>
>--
>
>Best,
>Hjw


Re:Re: 回复: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里

2022-11-28 Thread 左岩
CDC是自己编译的2.3,对应flink1.14的版本,还有一个问题是,可以读到变更数据。比如11点30写入mysql,flinkcdc读出来要慢几分钟,5~7分钟之后才能读到新写入或者变更的数据,第二个问题就行,变更数据插不到另外一张mysql表里

















在 2022-11-07 10:11:56,"Shengkai Fang"  写道:
>你用的是哪个版本的 cdc,增量部分的数据需要全量的部分读完才能进行。
>
>Best,
>Shengkai
>
>左岩 <13520871...@163.com> 于2022年11月4日周五 17:58写道:
>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> .print(); 去掉也不行,
>>  
>> 跟这个也没有关系(一个tableEnv要执行2个DML),而且现在有一个现象,就是现在改了mysql表数据,flinkcdc得等好久才能读到变化数据,这是什么情况呢
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2022-11-04 16:52:08,"yinghua...@163.com"  写道:
>>
>> >你这个是在一个tableEnv要执行2个DML,要使用StatementSet保存2个DML语句,在StatementSet上执行exuectute,如下
>> >StatementSet statementSet = tenv.createStatementSet();
>> >statementSet.addInsertSql(sql1);
>> >statementSet.addInsertSql(sql2);
>> >TableResult result = statementSet.execute();
>> >result.getJobClient().get().getJobID().toString();
>> >
>> >
>> >或者你把打印的那个任务给去掉,看能否将数据插入到目的的mysql中。
>> >// 查询
>> >tenv.executeSql("select * from flink_t_stu").print();
>>  这个任务给去掉
>> >
>> >
>> >
>> >yinghua...@163.com
>> >
>> >发件人: 左岩
>> >发送时间: 2022-11-04 14:34
>> >收件人: user-zh
>> >主题: FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里
>> >用FlinkCDC可以读到MySQL变更数据,但是插不到新的MySQL表里,两个表都有主键,表结构相同
>> >代码如下:控制台打印情况见附件
>> >public static void main(String[] args) throws Exception {
>> >Configuration conf = new Configuration();
>> >conf.setInteger("rest.port", 10041);
>> >StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment(conf);
>> >StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
>> >
>> >env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);
>> >env.getCheckpointConfig().setCheckpointStorage("file:///d:/checkpoint3");
>> >
>> >env.setParallelism(1);
>> >// 建表
>> >tenv.executeSql("CREATE TABLE flink_t_stu ( " +
>> >"  userid INT, " +
>> >"  username string, " +
>> >"  age string, " +
>> >"  `partition` INT, " +
>> >" PRIMARY KEY(userid) NOT ENFORCED " +
>> >" ) WITH ( " +
>> >" 'connector' = 'mysql-cdc', " +
>> >" 'server-id' = '5401-5404', " +
>> >" 'scan.startup.mode' = 'latest-offset', " +
>> >//" 'scan.startup.mode' = 'earliest-offset', " +
>> >" 'hostname' = '192.168.0.220', " +
>> >" 'port' = '3306', " +
>> >" 'username' = 'root', " +
>> >" 'password' = 'root', " +
>> >" 'database-name' = 'zy', " +
>> >" 'table-name' = 't_stu' " +
>> >")");
>> >
>> >// 查询
>> >tenv.executeSql("select * from flink_t_stu").print();
>> >
>> >
>> >// 建一个目标表,用来存放查询结果
>> >tenv.executeSql(
>> >"CREATE TABLE flink_t_stu2 ( " +
>> >"  userid INT, " +
>> >"  username string, " +
>> >"  age string, " +
>> >"  `partition` INT, " +
>> >" PRIMARY KEY(userid) NOT ENFORCED " +
>> >" ) WITH ( " +
>> >"  'connector' = 'jdbc', " +
>> >"  'url' = 'jdbc:mysql://192.168.0.220:3306/zy', " +
>> >"  'table-name' = 't_stu2', " +
>> >"  'username' = 'root', " +
>> >"  'password' = 'root'  " +
>> >")"
>> >);
>> >
>> >tenv.executeSql("INSERT INTO flink_t_stu2 " +
>> >"SELECT * FROM flink_t_stu");
>> >env.execute();
>> >
>> >}
>>


如果一个hive 数据库同时有hudi表和parquet,需要注册两个catalog?

2022-11-28 Thread melin li
如果一个hive 数据库同时有hudi和parquet表,好像只能分别注册hive catalog,和hudi
catalog,两不同表需要分别使用通过catalog。不是很优雅,也对用户造成困惑。
select * from hudi_catalog.dbName.table1 left jon hive_catalog.dbBane.table2

spark 如果把hudi catalog注册名为spark_catalog,spark_catalog 是spark 默认cataog 名。sql
同时方式hudi 和parquet表,不需要添加catalog name。
select * from dbName.table1 left jon dbBane.table2


Re: Query about flink job manager dashboard

2022-11-28 Thread naga sudhakar
Hi,
We are able to disable this cancela nd upload otpion in ui.
But this is having issues with endpoints for below.
Get call for /jars to list all uploaded jars and post call
/jars/{jarid}/run are giving 404 after disabling the two flags.
Is the process of uploading jars and running a jar with specific id changes
after this change?

Please suggest.

Thanks & Regards,
Nagasudhakar

On Thu, 24 Nov, 2022, 2:07 PM Martijn Visser, 
wrote:

> Hi,
>
> 1) No, that's currently not possible.
> 2) You could consider disabling to disallow uploading new JARs and/or
> cancelling jobs from the UI. See
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#advanced-options-for-flink-web-ui
>
> Best regards,
>
> Martijn
>
> On Thu, Nov 24, 2022 at 3:44 AM naga sudhakar 
> wrote:
>
>> Hi Team,
>>> Greetings!!!
>>> I am a software developer using apache flink and deploying flink jobs
>>> using the same. I have two queries about flink job manager dashboard. Can
>>> you please help with below?
>>>
>>> 1) is it possible to add login mechanism for the flink job manager dash
>>> board and have a role based mechanism for viewing running jobs, cancelling
>>> jobs, adding the jobs?
>>> 2) is it possible to disable to dash bord display but use api to do the
>>> same operations using API?
>>>
>>>
>>> Thanks,
>>> Nagasudhakar.
>>>
>>


Re: Support for higher-than-millisecond resolution event-time timestamps

2022-11-28 Thread Salva Alcántara
Hi David,

Many thanks for your reply. Two things, then:
1. If there are any chances to contribute on this, let me know
2. In the meantime, process functions FTW!

Salva

On Fri, Nov 25, 2022 at 9:21 AM David Anderson  wrote:

> When it comes to event time processing and watermarks, I believe that if
> you stick to the lower level APIs, then the milliseconds assumption is
> indeed arbitrary, but at higher levels that assumption is baked in.
>
> In other words, that rules out using Flink SQL, or things
> like TumblingEventTimeWindows.of(Time.milliseconds(10)). It might not be
> difficult to build something to work around those assumptions, but I
> haven't given it much thought. But if you stick to KeyedProcessFunction, it
> should be fine.
>
> Best,
> David
>
> On Fri, Nov 25, 2022 at 5:32 AM Salva Alcántara 
> wrote:
>
>> As mentioned in the docs
>> 
>> :
>>
>> > Attention: Both timestamps and watermarks are specified as milliseconds
>> since the Java epoch of 1970-01-01T00:00:00Z.
>>
>> Are there any plans for supporting higher time resolutions?
>>
>> Also, internally, Flink uses the `long` type for the timestamps, so maybe
>> the milliseconds assumption is arbitrary and things would actually work
>> just fine for higher resolutions provided that they fit into the long type
>> (???). I found this SO post:
>>
>>
>> https://stackoverflow.com/questions/54402759/streaming-data-processing-and-nano-second-time-resolution
>>
>> which touches upon this but it's a bit old already and there seems to be
>> no clear answer in the end. So maybe we could touch base on it.
>>
>> Regards,
>>
>> Salva
>>
>


How to make flink operator as a cluster operator?

2022-11-28 Thread Mark Lee
Hi all,  

   How to make flink operator as a cluster operator? And How to register it
to CVO(Cluster Version Operator)?

   

   I didn't find any code or configure file in flink-operator's code.

   

   Thank you.

 

Mark