Re: Unable to Generate Flame Graphs in Flink UI

2024-03-25 Thread Feng Jin
Hi Nitin

You don't need any additional configuration. After the sampling is
completed, the flame graph will be displayed.

Best,
Feng

On Mon, Mar 25, 2024 at 5:55 PM Nitin Saini 
wrote:

> Hi Flink Community,
>
> I've set up a Flink cluster using Flink version 1.17.2 and enabled the
> option "rest.flamegraph.enabled: true" to generate flame graphs. However,
> upon accessing the Flink UI, I encountered the message: "We are waiting for
> the first samples to create a flame graph."
>
> Could you kindly advise if there's any configuration step I might have
> overlooked? I've attached a screenshot of the flink UI for your reference.
>
> Best regards,
> Nitin Saini
>


Re: [ANNOUNCE] Donation Flink CDC into Apache Flink has Completed

2024-03-20 Thread Feng Jin
Congratulations!


Best,
Feng


On Thu, Mar 21, 2024 at 11:37 AM Ron liu  wrote:

> Congratulations!
>
> Best,
> Ron
>
> Jark Wu  于2024年3月21日周四 10:46写道:
>
> > Congratulations and welcome!
> >
> > Best,
> > Jark
> >
> > On Thu, 21 Mar 2024 at 10:35, Rui Fan <1996fan...@gmail.com> wrote:
> >
> > > Congratulations!
> > >
> > > Best,
> > > Rui
> > >
> > > On Thu, Mar 21, 2024 at 10:25 AM Hang Ruan 
> > wrote:
> > >
> > > > Congrattulations!
> > > >
> > > > Best,
> > > > Hang
> > > >
> > > > Lincoln Lee  于2024年3月21日周四 09:54写道:
> > > >
> > > >>
> > > >> Congrats, thanks for the great work!
> > > >>
> > > >>
> > > >> Best,
> > > >> Lincoln Lee
> > > >>
> > > >>
> > > >> Peter Huang  于2024年3月20日周三 22:48写道:
> > > >>
> > > >>> Congratulations
> > > >>>
> > > >>>
> > > >>> Best Regards
> > > >>> Peter Huang
> > > >>>
> > > >>> On Wed, Mar 20, 2024 at 6:56 AM Huajie Wang 
> > > wrote:
> > > >>>
> > > 
> > >  Congratulations
> > > 
> > > 
> > > 
> > >  Best,
> > >  Huajie Wang
> > > 
> > > 
> > > 
> > >  Leonard Xu  于2024年3月20日周三 21:36写道:
> > > 
> > > > Hi devs and users,
> > > >
> > > > We are thrilled to announce that the donation of Flink CDC as a
> > > > sub-project of Apache Flink has completed. We invite you to
> explore
> > > the new
> > > > resources available:
> > > >
> > > > - GitHub Repository: https://github.com/apache/flink-cdc
> > > > - Flink CDC Documentation:
> > > > https://nightlies.apache.org/flink/flink-cdc-docs-stable
> > > >
> > > > After Flink community accepted this donation[1], we have
> completed
> > > > software copyright signing, code repo migration, code cleanup,
> > > website
> > > > migration, CI migration and github issues migration etc.
> > > > Here I am particularly grateful to Hang Ruan, Zhongqaing Gong,
> > > > Qingsheng Ren, Jiabao Sun, LvYanquan, loserwang1024 and other
> > > contributors
> > > > for their contributions and help during this process!
> > > >
> > > >
> > > > For all previous contributors: The contribution process has
> > slightly
> > > > changed to align with the main Flink project. To report bugs or
> > > suggest new
> > > > features, please open tickets
> > > > Apache Jira (https://issues.apache.org/jira).  Note that we will
> > no
> > > > longer accept GitHub issues for these purposes.
> > > >
> > > >
> > > > Welcome to explore the new repository and documentation. Your
> > > feedback
> > > > and contributions are invaluable as we continue to improve Flink
> > CDC.
> > > >
> > > > Thanks everyone for your support and happy exploring Flink CDC!
> > > >
> > > > Best,
> > > > Leonard
> > > > [1]
> > https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob
> > > >
> > > >
> > >
> >
>


Re: [ANNOUNCE] Donation Flink CDC into Apache Flink has Completed

2024-03-20 Thread Feng Jin
Congratulations!


Best,
Feng


On Thu, Mar 21, 2024 at 11:37 AM Ron liu  wrote:

> Congratulations!
>
> Best,
> Ron
>
> Jark Wu  于2024年3月21日周四 10:46写道:
>
> > Congratulations and welcome!
> >
> > Best,
> > Jark
> >
> > On Thu, 21 Mar 2024 at 10:35, Rui Fan <1996fan...@gmail.com> wrote:
> >
> > > Congratulations!
> > >
> > > Best,
> > > Rui
> > >
> > > On Thu, Mar 21, 2024 at 10:25 AM Hang Ruan 
> > wrote:
> > >
> > > > Congrattulations!
> > > >
> > > > Best,
> > > > Hang
> > > >
> > > > Lincoln Lee  于2024年3月21日周四 09:54写道:
> > > >
> > > >>
> > > >> Congrats, thanks for the great work!
> > > >>
> > > >>
> > > >> Best,
> > > >> Lincoln Lee
> > > >>
> > > >>
> > > >> Peter Huang  于2024年3月20日周三 22:48写道:
> > > >>
> > > >>> Congratulations
> > > >>>
> > > >>>
> > > >>> Best Regards
> > > >>> Peter Huang
> > > >>>
> > > >>> On Wed, Mar 20, 2024 at 6:56 AM Huajie Wang 
> > > wrote:
> > > >>>
> > > 
> > >  Congratulations
> > > 
> > > 
> > > 
> > >  Best,
> > >  Huajie Wang
> > > 
> > > 
> > > 
> > >  Leonard Xu  于2024年3月20日周三 21:36写道:
> > > 
> > > > Hi devs and users,
> > > >
> > > > We are thrilled to announce that the donation of Flink CDC as a
> > > > sub-project of Apache Flink has completed. We invite you to
> explore
> > > the new
> > > > resources available:
> > > >
> > > > - GitHub Repository: https://github.com/apache/flink-cdc
> > > > - Flink CDC Documentation:
> > > > https://nightlies.apache.org/flink/flink-cdc-docs-stable
> > > >
> > > > After Flink community accepted this donation[1], we have
> completed
> > > > software copyright signing, code repo migration, code cleanup,
> > > website
> > > > migration, CI migration and github issues migration etc.
> > > > Here I am particularly grateful to Hang Ruan, Zhongqaing Gong,
> > > > Qingsheng Ren, Jiabao Sun, LvYanquan, loserwang1024 and other
> > > contributors
> > > > for their contributions and help during this process!
> > > >
> > > >
> > > > For all previous contributors: The contribution process has
> > slightly
> > > > changed to align with the main Flink project. To report bugs or
> > > suggest new
> > > > features, please open tickets
> > > > Apache Jira (https://issues.apache.org/jira).  Note that we will
> > no
> > > > longer accept GitHub issues for these purposes.
> > > >
> > > >
> > > > Welcome to explore the new repository and documentation. Your
> > > feedback
> > > > and contributions are invaluable as we continue to improve Flink
> > CDC.
> > > >
> > > > Thanks everyone for your support and happy exploring Flink CDC!
> > > >
> > > > Best,
> > > > Leonard
> > > > [1]
> > https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob
> > > >
> > > >
> > >
> >
>


Re: Flink Batch Execution Mode

2024-03-12 Thread Feng Jin
Hi Irakli

What version of flink-connector-kafka are you using?
You may have encountered a bug [1] in the old version that prevents the
source task from entering the finished state.


[1]. https://issues.apache.org/jira/browse/FLINK-31319

Best,
Feng


On Tue, Mar 12, 2024 at 7:21 PM irakli.keshel...@sony.com <
irakli.keshel...@sony.com> wrote:

> Hello,
>
> I have a Flink job that is running in the Batch mode. The source for the
> job is a Kafka topic which has limited number of events. I can see that the
> job starts running fine and consumes the events, but never makes it past
> the first task and becomes idle. The Kafka source is defined to be bounded
> by following command:
> "KafkaSource.builder().setBounded(OffsetsInitializer.latest())".
> I expect the job to consume all the events that are in the Kafka topic and
> then move to the next task, but I'm not sure if the "
> OffsetsInitializer.latest()" is the right OffsetInitializer. Can anyone
> help me out here? Thanks!
>
> Cheers,
> Irakli
>


Re: FlinkSQL connector jdbc 用户密码是否可以加密/隐藏

2024-03-10 Thread Feng Jin
1. 目前 JDBC connector 本身不支持加密, 我理解可以在提交 SQL 给 SQL 文本来做加解密的操作,或者做一些变量替换来隐藏密码。

2. 可以考虑提前创建好 jdbc catalog,从而避免编写 DDL 暴露密码。


Best,
Feng

On Sun, Mar 10, 2024 at 9:50 PM 杨东树  wrote:

> 各位好,
>考虑到数据库用户、密码安全性问题,使用FlinkSQL connector
> jdbc时,请问如何对数据库的用户密码进行加密/隐藏呢。例如如下常用sql中的password:
> CREATE TABLE wordcount_sink (
>  word String,
>  cnt BIGINT,
>  primary key (word) not enforced
> ) WITH (
>  'connector' = 'jdbc',
>  'url' = 'jdbc:mysql://localhost:3306/flink',
>  'username' = 'root',
>  'password' = '123456',
>  'table-name' = 'wordcount_sink'
> );


Re: Re: Running Flink SQL in production

2024-03-07 Thread Feng Jin
Hi,

If you need to use Flink SQL in a production environment, I think it would
be better to use the Table API [1] and package it into a jar.
Then submit the jar to the cluster environment.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/common/#sql

Best,
Feng

On Thu, Mar 7, 2024 at 9:56 PM Xuyang  wrote:

> Hi.
> Hmm, if I'm mistaken, please correct me. Using a SQL client might not be
> very convenient for those who need to verify the
> results of submissions, such as checking for exceptions related to
> submission failures, and so on.
>
>
> --
> Best!
> Xuyang
>
>
> 在 2024-03-07 17:32:07,"Robin Moffatt"  写道:
>
> Thanks for the reply.
> In terms of production, my thinking is you'll have your SQL in a file
> under code control. Whether that SQL ends up getting submitted via an
> invocation of SQL Client with -f or via REST API seems moot. WDYT?
>
>
>
> On Thu, 7 Mar 2024 at 01:53, Xuyang  wrote:
>
>> Hi, IMO, both the SQL Client and the Restful API can provide connections
>> to the SQL Gateway service for submitting jobs. A slight difference is that
>> the SQL Client also offers a command-line visual interface for users to
>> view results.
>> In your production scenes, placing the SQL to be submitted into a file
>> and then using the '-f' command in SQL Client to submit the file sounds a
>> bit roundabout. You can just use the Restful API to submit them directly?
>>
>>
>> --
>> Best!
>> Xuyang
>>
>>
>> At 2024-03-07 04:11:01, "Robin Moffatt via user" 
>> wrote:
>>
>> I'm reading the deployment guide[1] and wanted to check my understanding.
>> For deploying a SQL job into production, would the pattern be to write the
>> SQL in a file that's under source control, and pass that file as an
>> argument to SQL Client with -f argument (as in this docs example[2])?
>> Or script a call to the SQL Gateway's REST API?
>>
>> Are there pros and cons to each approach?
>>
>> thanks, Robin
>>
>> [1]:
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/overview/
>> [2]:
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sqlclient/#execute-sql-files
>>
>>


Re: Handling late events with Table API / SQL

2024-03-05 Thread Feng Jin
You can use the  CURRENT_WATERMARK(rowtime)  function for some filtering,
please refer to [1] for details.


https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/

Best,
Feng

On Wed, Mar 6, 2024 at 1:56 AM Sunny S  wrote:

> Hi,
>
> I am using Flink SQL to create a table something like this :
>
> CREATE TABLE some-table (
> ...,
> ...,
> ...,
> ...,
> event_timestamp as TO_TIMESTAMP_LTZ(event_time*1000, 3),
> WATERMARK FOR event_timestamp AS event_timestamp - INTERVAL '30' SECOND
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'some-topic', +
> 'properties.bootstrap.servers' = 'localhost:9092',
> 'properties.group.id' = 'testGroup',
> 'value.format' = 'csv'
> )
>
> I want to understand how can I deal with late events / out of order events
> when using Flink SQL / Table API? How can I collect the late / out of order
> events to a side output with Table API / SQL?
>
> Thanks
>


Re: mysql cdc streamapi与sqlapi 输出表现不相同

2024-03-01 Thread Feng Jin
这两个 print 的实现是不一样的。

 dataStream().print 是增加的 PrintSinkFunction, 该算子接受到数据会立刻打印出来, 且结果是在 TM 上打印出来。

 而 table.execute().print() 是会把最终的结果通过 collect_sink 收集之后,回传到 client, 结果是在
client 的 stdout 打印出来, 且只有在做 checkpoint 时才会回传至 client,
它的可见周期会受限于 checkpoint 的间隔。


Best,
Feng Jin

On Fri, Mar 1, 2024 at 4:45 PM ha.fen...@aisino.com 
wrote:

> sink中只是打印
>
> streamapi,checkpoint设置的精准一次
> env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL
> Source").print();
> 数据库改变数据后,立即就可以在控制台打印出来。
>
> sqlapi,checkpoint设置的精准一次
> Table custab = tEnv.sqlQuery("select * from orders ");
> custab.execute().print();
> 数据库改变不会立即打印,等到checkpoint打印时才会把改变的数据打印出来。并且刚启动程序的时候,打印历史数据也是在checkpoint后才打印。
> 16:39:17,109 INFO
> com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader [] -
> Binlog offset on checkpoint 1: {ts_sec=0, file=mysql-bin.46, pos=11653,
> kind=SPECIFIC, gtids=, row=0, event=0}
> 16:39:17,231 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Completed
> checkpoint 1 for job 5bf08275f1992d1f7997fc8f7c32b6b1 (4268 bytes,
> checkpointDuration=218 ms, finalizationTime=6 ms).
> 16:39:17,241 INFO
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking
> checkpoint 1 as completed for source Source: orders[1].
>
> ++-+-++---+-+
> | op |  id | addtime |cusname
> | price |  status |
>
> ++-+-++---+-+
> | +I | 616 | 2024-02-22 16:23:11 |   name
> |  3.23 |   7 |
> | +I | 617 | 2024-03-01 11:42:03 |   name
> |  1.11 |   9 |
> | +I | 612 | 2024-01-31 13:53:49 |   name
> |  1.29 |   1 |
>
> 这是什么原因?
>


Re: Flink DataStream 作业如何获取到作业血缘?

2024-02-26 Thread Feng Jin
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。

可以参考 OpenLineage[1] 的实现.


1.
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java


Best,
Feng


On Mon, Feb 26, 2024 at 6:20 PM casel.chen  wrote:

> 一个Flink DataStream 作业从mysql cdc消费处理后写入apache
> doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink
> connector信息,包括连接字符串、数据库名、表名等?


Re: Temporal join on rolling aggregate

2024-02-23 Thread Feng Jin
+1 Support this feature. There are many limitations to using time window
aggregation currently, and if we can declare watermark and time attribute
on the view, it will make it easier for us to use time windows. Similarly,
it would be very useful if the primary key could be declared in the view.

Therefore, I believe we need a FLIP to detail the design of this feature.


Best,
Feng

On Fri, Feb 23, 2024 at 2:39 PM  wrote:

> +1 for supporting defining time attributes on views.
>
> I once encountered the same problem as yours. I did some regular joins and
> lost time attribute, and hence I could no longer do window operations in
> subsequent logics. I had to output the joined view to Kafka, read from it
> again, and define watermark on the new source - a cubersome workaround.
>
> It would be more flexible if we could control time attribute / watermark
> on views, just as if it's some kind of special source.
>
> Thanks,
> Yaming
> 在 Feb 22, 2024, 7:46 PM +0800,Gyula Fóra ,写道:
> > Posting this to dev as well as it potentially has some implications on
> development effort.
> >
> > What seems to be the problem here is that we cannot control/override
> Timestamps/Watermarks/Primary key on VIEWs. It's understandable that you
> cannot create a PRIMARY KEY on the view but I think the temporal join also
> should not require the PK, should we remove this limitation?
> >
> > The general problem is the inflexibility of the timestamp/watermark
> handling on query outputs, which makes this again impossible.
> >
> > The workaround here can be to write the rolling aggregate to Kafka, read
> it back again and join with that. The fact that this workaround is possible
> actually highlights the need for more flexibility on the query/view side in
> my opinion.
> >
> > Has anyone else run into this issue and considered the proper solution
> to the problem? Feels like it must be pretty common :)
> >
> > Cheers,
> > Gyula
> >
> >
> >
> >
> > > On Wed, Feb 21, 2024 at 10:29 PM Sébastien Chevalley 
> wrote:
> > > > Hi,
> > > >
> > > > I have been trying to write a temporal join in SQL done on a rolling
> aggregate view. However it does not work and throws :
> > > >
> > > > org.apache.flink.table.api.ValidationException: Event-Time Temporal
> Table Join requires both primary key and row time attribute in versioned
> table, but no row time attribute can be found.
> > > >
> > > > It seems that after the aggregation, the table looses the watermark
> and it's not possible to add one with the SQL API as it's a view.
> > > >
> > > > CREATE TABLE orders (
> > > > order_id INT,
> > > > price DECIMAL(6, 2),
> > > > currency_id INT,
> > > > order_time AS NOW(),
> > > > WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND
> > > > )
> > > > WITH (
> > > > 'connector' = 'datagen',
> > > > 'rows-per-second' = '10',
> > > > 'fields.order_id.kind' = 'sequence',
> > > > 'fields.order_id.start' = '1',
> > > > 'fields.order_id.end' = '10',
> > > > 'fields.currency_id.min' = '1',
> > > > 'fields.currency_id.max' = '20'
> > > > );
> > > >
> > > > CREATE TABLE currency_rates (
> > > > currency_id INT,
> > > > conversion_rate DECIMAL(4, 3),
> > > > PRIMARY KEY (currency_id) NOT ENFORCED
> > > > )
> > > > WITH (
> > > > 'connector' = 'datagen',
> > > > 'rows-per-second' = '10',
> > > > 'fields.currency_id.min' = '1',
> > > > 'fields.currency_id.max' = '20'
> > > > );
> > > >
> > > > CREATE TEMPORARY VIEW max_rates AS (
> > > > SELECT
> > > > currency_id,
> > > > MAX(conversion_rate) AS max_rate
> > > > FROM currency_rates
> > > > GROUP BY currency_id
> > > > );
> > > >
> > > > CREATE TEMPORARY VIEW temporal_join AS (
> > > > SELECT
> > > > order_id,
> > > > max_rates.max_rate
> > > > FROM orders
> > > >  LEFT JOIN max_rates FOR SYSTEM_TIME AS OF orders.order_time
> > > >  ON orders.currency_id = max_rates.currency_id
> > > > );
> > > >
> > > > SELECT * FROM temporal_join;
> > > >
> > > > Am I missing something? What would be a good starting point to
> address this?
> > > >
> > > > Thanks in advance,
> > > > Sébastien Chevalley
>


Re: Flink Prometheus Connector问题

2024-02-23 Thread Feng Jin
我理解可以参考 FLIP 中的设计, 基于 Prometheus Remote-Write API v1.0
  来初步实现一个
SinkFunction 实现写入 Prometheus


Best,
Feng

On Fri, Feb 23, 2024 at 7:36 PM 17610775726 <17610775...@163.com> wrote:

> Hi
> 参考官网,
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/metric_reporters/#prometheuspushgateway
>
>
> Best
> JasonLee
>
>
>  回复的原邮件 
> | 发件人 | casel.chen |
> | 发送日期 | 2024年02月23日 17:35 |
> | 收件人 | user-zh@flink.apache.org |
> | 主题 | Flink Prometheus Connector问题 |
> 场景:使用Flink实时生成指标写入Prometheus进行监控告警
> 网上搜索到 https://github.com/apache/flink-connector-prometheus 项目,但内容是空的
> 另外找到FLIP-312 是关于flink prometheus connector的,
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-312%3A+Prometheus+Sink+Connector
> 请问Flink官方有没有出flink prometheus connector?
> 如果现在要实时写入prometheus的话,推荐的方式是什么?谢谢!


Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-20 Thread Feng Jin
我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.

Best,
Feng

On Tue, Feb 20, 2024 at 4:35 PM casel.chen  wrote:

> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
>
>
> public class XxxSinkFunction extends RichSinkFunction implements
> CheckpointedFunction, CheckpointListener {
>
>
> @Override
> public synchronized void invoke(RowData rowData, Context context)
> throws IOException {
>//  这里想从rowData中获取event time和watermark值,如何实现呢?
> }
> }
>
>
> 例如source table如下定义
>
>
> CREATE TEMPORARY TABLE source_table(
>   username varchar,
>   click_url varchar,
>   eventtime varchar,
>
>   ts AS TO_TIMESTAMP(eventtime),
>   WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
> ) with (
>   'connector'='kafka',
>   ...
>
> );
>
>
> CREATE TEMPORARY TABLE sink_table(
>   username varchar,
>   click_url varchar,
>   eventtime varchar
> ) with (
>   'connector'='xxx',
>   ...
> );
> insert into sink_table select username,click_url,eventtime from
> source_table;


Re: Flink任务链接信息审计获取

2024-02-03 Thread Feng Jin
我理解应该是平台统一配置在 flink-conf.yaml  即可, 不需要用户单独配置相关参数.


Best,
Feng

On Sun, Feb 4, 2024 at 11:19 AM 阿华田  wrote:

> 看了一下 这样需要每个任务都配置listener,做不到系统级的控制,推动下游用户都去配置listener比较困难
>
>
> | |
> 阿华田
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2024年02月2日 19:38,Feng Jin 写道:
> hi,
>
> 可以参考下 OpenLineage[1] 的实现, 通过 Flink 配置JobListener 拿到 Transformation 信息,然后解析
> Source 和 Sink 拿到血缘信息。
>
> [1]
>
> https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/src/main/java/io/openlineage/flink/OpenLineageFlinkJobListener.java
>
> Best,
> Feng
>
>
> On Fri, Feb 2, 2024 at 6:36 PM 阿华田  wrote:
>
>
>
>
> 打算做flink任务画像的事情,主要是用户跑的datastream作业,在我们的实时平台运行起来之后希望能审计到使用了哪些kafka的topic,写入了哪些中间件(mysql,hbase
> ,ck 等等),大佬们有什么好的方式吗,目前flinksql可以通过sql获取到,用户自己编译的flink任务包去执行datastream作业获取不到
> | |
> 阿华田
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
>
>
>


Re: Re: Case for a Broadcast join + hint in Flink Streaming SQL

2024-02-02 Thread Feng Jin
+1 a FLIP for this topic.


Best,
Feng

On Fri, Feb 2, 2024 at 10:26 PM Martijn Visser 
wrote:

> Hi,
>
> I would definitely expect a FLIP on this topic before moving to
> implementation.
>
> Best regards,
>
> Martijn
>
> On Fri, Feb 2, 2024 at 12:47 PM Xuyang  wrote:
>
>> Hi, Prabhjot.
>>
>> IIUC, the main reasons why the community has not previously considered
>> supporting join hints only in batch mode are as follows:
>> 1. In batch mode, multiple join type algorithms were implemented quite
>> early on, and
>> 2. Stream processing represents a long-running scenario, and it is quite
>> difficult to determine whether a small table will become a large table
>> after a long period of operation.
>>
>> However, as you mentioned, join hints do indeed have their significance
>> in streaming. If you want to support the implementation of "join hints +
>> broadcast join" in streaming, the changes I can currently think of include:
>> 1. At optimizer, changing the exchange on the small table side to
>> broadcast instead of hash (InputProperty#BROADCAST).
>> 2. Unknown changes required at the table runtime level.
>>
>> You can also discuss it within the community through JIRA, FLIP, or the
>> dev mailing list.
>>
>>
>> --
>> Best!
>> Xuyang
>>
>>
>> At 2024-02-02 00:46:01, "Prabhjot Bharaj via user" 
>> wrote:
>>
>> Hi Feng,
>>
>> Thanks for your prompt response.
>> If we were to solve this in Flink, my higher level viewpoint is:
>>
>> 1. First to implement Broadcast join in Flink Streaming SQL, that works
>> across Table api (e.g. via a `left.join(right, ,
>> join_type="broadcast")
>> 2. Then, support a Broadcast hint that would utilize this new join based
>> on the hint type
>>
>> What do you think about this ?
>> Would you have some pointers on how/where to start on the first part ?
>> (I'm thinking we'd have to extend the Broadcast state pattern for this
>> purpose)
>>
>> Thanks,
>> Prabhjot
>>
>> On Thu, Feb 1, 2024 at 11:40 AM Feng Jin  wrote:
>>
>>> Hi Prabhjot
>>>
>>> I think this is a reasonable scenario. If there is a large table and a
>>> very small table for regular join, without broadcasting the regular join,
>>> it can easily cause data skew.
>>> We have also encountered similar problems too. Currently, we can only
>>> copy multiple copies of the small table using the union all and append
>>> random values to alleviate data skewness.
>>>
>>>
>>> Best,
>>> Feng
>>>
>>> On Fri, Feb 2, 2024 at 12:24 AM Prabhjot Bharaj via user <
>>> user@flink.apache.org> wrote:
>>>
>>>> Hello folks,
>>>>
>>>>
>>>> We have a use case where we have a few stream-stream joins, requiring
>>>> us to join a very large table with a much smaller table, essentially
>>>> enriching the large table with a permutation on the smaller table (Consider
>>>> deriving all orders/sessions for a new location). Given the nature of the
>>>> dataset, if we use a typical join that uses Hash distribution to co-locate
>>>> the records for each join key, we end up with a very skewed join (a few
>>>> task slots getting all of the work, as against a good distribution).
>>>>
>>>>
>>>> We’ve internally implemented a Salting based solution where we salt the
>>>> smaller table and join it with the larger table. While this works in the
>>>> POC stage, we’d like to leverage flink as much as possible to do such a
>>>> join.
>>>>
>>>>
>>>> By the nature of the problem, a broadcast join seems theoretically
>>>> helpful. We’ve done an exploration on query hints supported in Flink,
>>>> starting with this FLIP
>>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-229%3A+Introduces+Join+Hint+for+Flink+SQL+Batch+Job>
>>>> and this FLIP
>>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join>
>>>> .
>>>>
>>>>
>>>> Currently, the Optimizer doesn't consider the Broadcast hint in the
>>>> `Exchange` step of the join, when creating the physical plan (Possibly
>>>> because the hint would require the stream-stream join to also support
>>>> Broadcast join with SQL)
>>>>
>>>>
>>>> Notice that the Query

Re: Flink任务链接信息审计获取

2024-02-02 Thread Feng Jin
hi,

可以参考下 OpenLineage[1] 的实现, 通过 Flink 配置JobListener 拿到 Transformation 信息,然后解析
Source 和 Sink 拿到血缘信息。

[1]
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/src/main/java/io/openlineage/flink/OpenLineageFlinkJobListener.java

Best,
Feng


On Fri, Feb 2, 2024 at 6:36 PM 阿华田  wrote:

>
>
> 打算做flink任务画像的事情,主要是用户跑的datastream作业,在我们的实时平台运行起来之后希望能审计到使用了哪些kafka的topic,写入了哪些中间件(mysql,hbase
> ,ck 等等),大佬们有什么好的方式吗,目前flinksql可以通过sql获取到,用户自己编译的flink任务包去执行datastream作业获取不到
> | |
> 阿华田
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
>
>


Re: Case for a Broadcast join + hint in Flink Streaming SQL

2024-02-01 Thread Feng Jin
Hi Prabhjot

I think this is a reasonable scenario. If there is a large table and a very
small table for regular join, without broadcasting the regular join, it can
easily cause data skew.
We have also encountered similar problems too. Currently, we can only copy
multiple copies of the small table using the union all and append random
values to alleviate data skewness.


Best,
Feng

On Fri, Feb 2, 2024 at 12:24 AM Prabhjot Bharaj via user <
user@flink.apache.org> wrote:

> Hello folks,
>
>
> We have a use case where we have a few stream-stream joins, requiring us
> to join a very large table with a much smaller table, essentially enriching
> the large table with a permutation on the smaller table (Consider deriving
> all orders/sessions for a new location). Given the nature of the dataset,
> if we use a typical join that uses Hash distribution to co-locate the
> records for each join key, we end up with a very skewed join (a few task
> slots getting all of the work, as against a good distribution).
>
>
> We’ve internally implemented a Salting based solution where we salt the
> smaller table and join it with the larger table. While this works in the
> POC stage, we’d like to leverage flink as much as possible to do such a
> join.
>
>
> By the nature of the problem, a broadcast join seems theoretically
> helpful. We’ve done an exploration on query hints supported in Flink,
> starting with this FLIP
> 
> and this FLIP
> 
> .
>
>
> Currently, the Optimizer doesn't consider the Broadcast hint in the
> `Exchange` step of the join, when creating the physical plan (Possibly
> because the hint would require the stream-stream join to also support
> Broadcast join with SQL)
>
>
> Notice that the Query AST (Abstract Syntax Tree) has the broadcast hint
> parsed from the query:
>
>
> ```sql
>
> ...
>
> ...
>
> joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] options:[gpla)
>
> ...
>
> ```
>
>
> However, the Flink optimizer ignores the hint and still represents the
> join as a regular `hash` join in the `Exchange` step:
>
>
> ```sql
>
> ...
>
> ...
>
> :- Exchange(distribution=[hash[shop_id, join_key]])
>
> ...
>
> ```
>
>
> In Flink `StreamExecExchange`, the translation happens only via the
> `HASH` distribution type
> .
> unlike in the Flink `BatchExecExchange`, the translation can happen via a
> multitude of options
> 
> (`HASH/BROADCAST`).
>
>
>
> Quoting this Flink mailing list discussion
>  for
> the FLIP that implemented the Broadcast join hint for batch sql:
>
>
> > But currently, only in batch the optimizer has different Join strategies
> for Join and
>
> > there is no choice of join strategies in the stream. The join hints
> listed in the current
>
> > flip should be ignored (maybe can be warned) in streaming mode. When in
> the
>
> > future the stream mode has the choice of join strategies, I think that's
> a good time > to discuss that the join hint can affect the streaming SQL.
>
>
> What do you folks think about the possibility of a Broadcast join for
> Streaming Sql along with its corresponding Broadcast hint, that lets the
> user choose the kind of distribution they’d want with the dataset ?
>
> Happy to learn more about this and hopefully implement it, if it doesn’t
> sound like a terrible idea.
>
>
> Thanks,
>
> Prabhjot
>
>
>
>


Re: Re: failed when job graph change

2024-01-24 Thread Feng Jin
Hi nick

If you want to modify the sink operator , I think you can modify the uid of
the operator to avoid restoring the state that does not belong to it.


Best,
Feng


On Thu, Jan 25, 2024 at 1:19 AM nick toker  wrote:

> hi
>
> i didn't found anything in the log
> but i found that it happened when i add a new sink operator
> and because i work with checkpoints  the flink can't finish the
> transaction ( the new topic in kafka not part of the transaction before i
> added the new sink operator)
>
> so i must cancel the job to make it work
>
> How can I solve this issue?
>
>
> nick
>
> ‫בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-10:27 מאת ‪Xuyang‬‏ <‪xyzhong...@163.com
> ‬‏>:‬
>
>> Hi,
>> Can you attach the log about the exception when job failed?
>>
>> --
>> Best!
>> Xuyang
>>
>>
>> 在 2023-12-04 15:56:04,"nick toker"  写道:
>>
>> Hi
>>
>> restart the job it's ok and i do that , but i must cancel the job and
>> submit a new one and i dont want the data from the state
>> forget to  mention that i use the parameter "-allowNonRestoredState"
>>
>>
>> my steps:
>> 1. stop the job with savepoint
>> 2. run the updated job ( update job graph) from savepoint
>>
>> expect it to run
>>
>> currently the result is the the job fail
>>
>> nick
>>
>>
>>
>> ‫בתאריך יום ב׳, 4 בדצמ׳ 2023 ב-8:41 מאת ‪Xuyang‬‏ <‪xyzhong...@163.com
>> ‬‏>:‬
>>
>>> Hi, nick.
>>>
>>> > using savepoint i must cancel the job to be able run the new graph
>>>
>>> Do you mean that you need cancel and start the job using the new flink
>>> job graph in 1.17.1,
>>> and in the past, it was able to make the changes to the new operator
>>> effective without restarting the job?
>>>
>>> I think in order for the new job graph to take effect, it is necessary
>>> to restart the job.
>>>
>>> --
>>> Best!
>>> Xuyang
>>>
>>>
>>> At 2023-12-03 21:49:23, "nick toker"  wrote:
>>>
>>> Hi
>>>
>>> when i add or remove an operator in the job graph , using savepoint i
>>> must cancel the job to be able run the new graph
>>>
>>> e.g. by adding or removing operator (like new sink target)
>>> it was working in the past
>>> i using flink 1.17.1
>>>
>>> 1. is it a known bug? if so when planned to be fix
>>>
>>> 2. do i need to do something to make it work?
>>>
>>>
>>> nick
>>>
>>>


Re: flink ui 算子数据展示一直loading...

2024-01-23 Thread Feng Jin
可以尝试着下面几种方式确认下原因:


   1.

   打开浏览器开发者模式,看是否因为请求某个接口卡住
   2.

   查看下 JobManager 的 GC 情况,是否频繁 FullGC
   3.

   查看下 JobManager 的日志,是否存在某些资源文件丢失或者磁盘异常情况导致 web UI 无法访问.


Best,
Feng


On Tue, Jan 23, 2024 at 6:16 PM 阿华田  wrote:

>
>
> 如下图,任务处理数据正常,任务状态也正常,但是flink_ui一致处于loading中,只有个别任务这样,其他正常,有可能是metirc包的某个类冲突导致的吗?
> 阿华田
> a15733178...@163.com
>
> 
> 签名由 网易邮箱大师  定制
>
>


Re: Query on using two sinks for a Flink job (Flink SQL)

2023-12-06 Thread Feng Jin
Hi Elakiya,


You should use DML in the statement set  instead of DQL .


Here is a simple example:

executeSql("CREATE TABLE source_table1 ..");

executeSql("CREATE TABLE source_table2 ..");

executeSql("CREATE TABLE sink_table1 ..");

executeSql("CREATE TABLE sink_table1 ..");


stmtSet.addInsertSql("INSERT INTO sink_tabl1 SELECT xxx from  source_table1
join source_table2 ...");

stmtSet.addInsertSql("INSERT INTO sink_tabl2 SELECT xxx from  source_table1
join source_table2 ...");

stmtSet.execute();


Best,
Feng


On Thu, Dec 7, 2023 at 12:48 AM elakiya udhayanan 
wrote:

> Hi Xuyang, Zhangao,
>
> Thanks for your response, I have attached sample job files that I tried
> with the Statementset and with two queries. Please let me know if you are
> able to point out where I am possibly going wrong.
>
> Thanks,
> Elakiya
>
> On Wed, Dec 6, 2023 at 4:51 PM Xuyang  wrote:
>
>> Hi, Elakiya.
>> Are you following the example here[1]? Could you attach a minimal,
>> reproducible SQL?
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/insert/
>>
>>
>>
>> --
>> Best!
>> Xuyang
>>
>>
>> At 2023-12-06 17:49:17, "elakiya udhayanan"  wrote:
>>
>> Hi Team,
>>  I would like to know the possibility of having two sinks in a
>> single Flink job. In my case I am using the Flink SQL based job where I try
>> to consume from two different Kafka topics using the create table (as
>> below) DDL and then use a join condition to correlate them and at present
>> write it to an external database (PostgreSQL - as a sink). I would like to
>> know if I can add another sink where I want to also write it to kafka topic
>> (as the second sink).
>> I tried using two sql scripts (two create and two insert for the same)
>> but was facing an exception* "Cannot have more than one execute() or
>> executeAsync() call in a single environment. at "*
>> Also tried to use the StatementSet functionality which again gave me an
>> exception *"org.apache.flink.table.api.TableException: Only insert
>> statement is supported now. at ".*
>> I am looking for some help in regards to this. TIA
>>
>> *Note:* I am using the Flink UI to submit my job.
>>
>> *Sample DDL statement used:*String statement = "CREATE TABLE Person
>> (\r\n" +
>> "  person ROW(id STRING, name STRING\r\n" +
>> "  ),\r\n" +
>> "  PRIMARY KEY (id) NOT ENFORCED\r\n" +
>> ") WITH (\r\n" +
>> "  'connector' = 'upsert-kafka',\r\n" +
>> "  'topic' = 'employee',\r\n" +
>> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
>> "  'key.format' = 'raw',\r\n" +
>> "  'value.format' = 'avro-confluent',\r\n" +
>> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
>> +
>> ")";
>>
>> Thanks,
>> Elakiya
>>
>>


Re: S3 bucket as a source

2023-12-05 Thread Feng Jin
Hi Matwey,

I think you can customize an inputFormat to meet your needs. And use the
FileSource::forBulkFileFormat interface to create a FileSource;

In the custom inputFormat, you can choose to only read the metadata of the
file without reading its content.


https://github.com/apache/flink/blob/1dac395967e5870833d67c6bf1103ba874fce601/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java#L171

public static  FileSourceBuilder forBulkFileFormat(
final BulkFormat bulkFormat, final Path...
paths) {
checkNotNull(bulkFormat, "reader");
checkNotNull(paths, "paths");
checkArgument(paths.length > 0, "paths must not be empty");

return new FileSourceBuilder<>(paths, bulkFormat);
}


Best,
Feng


On Tue, Dec 5, 2023 at 8:43 PM Matwey V. Kornilov 
wrote:

> Hello,
>
> I have an S3 bucket and I would like to process the objects metainfo
> (such as keys (filenames), metainfo, tags, etc.).
> I don't care about the objects content since it is irrelevant for my
> task. What I want is to construct a data stream where each instance is a
> metainfo attached to some object from the bucket.
>
> Is it anyhow possible to tune and reuse the FileSystem connector for my
> purposes? The connector is provided to read content of files, while I
> would like to read content of directory, or metainfo for every file.
>
>


Re: Re: inputQueueSize metric

2023-11-28 Thread Feng Jin
Hi Dimitris

Yes, the inputQueueLength metric can only represent the number of buffers,
and cannot obtain the actual size of the messages.

Regarding the inputQueueSize metric, it ignores the LocalChannel.
Therefore, when tasks are connected through the Forward partition strategy,
it will consistently display a value of 0.
You may consider altering the partition strategy to the rebalance strategy
for testing purposes.st it.


Best,
Feng.


On Tue, Nov 28, 2023 at 2:32 AM Dimitris Banelas via user <
user@flink.apache.org> wrote:

> As per the docs, the `inputQueueLength` metric refers to the number of
> queued input buffers, and cannot be used on its own in order to
> determine buffered records.
>
> For instance, if I know that there are 5 queued input buffers, I cannot
> conclude anything regarding buffered records if the size of each queue
> is not known.
>
>
> On 2023/11/27 17:28:08 Feng Jin wrote:
>  > Hi Dimitris
>  >
>  > Maybe you can use the `inputQueueLength` metric.
>  >
>  >
>  > Best,
>  > Feng
>  >
>  > On Tue, Nov 28, 2023 at 12:07 AM Dimitris Mpanelas via user <
>  > user@flink.apache.org> wrote:
>  >
>  > > Hello,
>  > >
>  > > I am trying to determine the buffered records in the input buffers of
> a
>  > > task. I found the inputQueueSize metric. According to the docs it
> is "The
>  > > real size of queued input buffers in bytes". The docs also state
> that "The
>  > > size for local input channels is always 0 since the local channel take
>  > > records directly from the output queue".
>  > >
>  > > What does this metric truly represent? In which cases should it be
> 0 and
>  > > non 0? Is there a way to determine buffered records (or at least
> something
>  > > similar) by using this metric?
>  > >
>  > > Dimitris.
>  > >
>  > >
>  >
>


Re: How to call the open method of JdbcSink?

2023-11-28 Thread Feng Jin
Hi Sai

I think you can directly cast SinkFunction to GenericJdbcSinkFunction.


https://github.com/apache/flink-connector-jdbc/blob/b477d452ba3aac38d53d1f5d4c4820bdad3ad9cd/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java#L63C41-L63C41
```
public static  SinkFunction sink(
String sql,
JdbcStatementBuilder statementBuilder,
JdbcExecutionOptions executionOptions,
JdbcConnectionOptions connectionOptions) {
return new GenericJdbcSinkFunction<>(
new JdbcOutputFormat<>(
new SimpleJdbcConnectionProvider(connectionOptions),
executionOptions,
() -> JdbcBatchStatementExecutor.simple(sql,
statementBuilder)));
}
```


Best,
Feng


On Tue, Nov 28, 2023 at 5:49 PM Sai Vishnu 
wrote:

> Hi team,
>
>
> I am using the JdbcSink from flink-connector-jdbc artifact, version
> 3.1.0-1.17. I am trying to write a Sink wrapper that will internally call
> the invoke method and open method of jdbc sink. While implementing, I see
> that JdbcSink.*sink() *returns a SinkFunction which only exposes the
> invoke method and not the open method.
>
>
> Would appreciate any suggestions on how I can implement this. To add to
> the requirement, the use case is to try and enclose the invoke operation in
> a try catch block so that any exception during the db write process can be
> caught and handled properly.
>
>
> Thanks,
>
> Sai Vishnu Soudri
>


Re: inputQueueSize metric

2023-11-27 Thread Feng Jin
Hi Dimitris

Maybe you can use the `inputQueueLength` metric.


Best,
Feng

On Tue, Nov 28, 2023 at 12:07 AM Dimitris Mpanelas via user <
user@flink.apache.org> wrote:

> Hello,
>
> I am trying to determine the buffered records in the input buffers of a
> task. I found the inputQueueSize metric. According to the docs it is "The
> real size of queued input buffers in bytes". The docs also state that "The
> size for local input channels is always 0 since the local channel take
> records directly from the output queue".
>
> What does this metric truly represent? In which cases should it be 0 and
> non 0? Is there a way to determine buffered records (or at least something
> similar) by using this metric?
>
> Dimitris.
>
>


Re: Metrics not available

2023-11-27 Thread Feng Jin
Hi Oscar

Did you set
state.backend.latency-track.keyed-state-enabled=true;

https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/config/#state-backend-latency-track-keyed-state-enabled

Best,
Feng


On Mon, Nov 27, 2023 at 8:42 PM Oscar Perez via user 
wrote:

> Hi,
>
> We are using flink 1.16 and we woud like to monitor the state metrics of a
> certain job. Looking at the documentation I see that there are some state
> access latencies:
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/metrics/
>
> Namely, I would like to access the following:
>
> *mapStateGetLatency*
>
> How could I achieve this? I would expect to be accessed via
>
> flink.task/operator.mapStateGetLatency but I dont see any related state
> access latency metrics available. Needs to be enabled/configured in some
> way?
>
> Thanks,
> Oscar
>


Re: [EXTERNAL] Re: flink s3[parquet] -> s3[iceberg]

2023-11-24 Thread Feng Jin
Hi Dan

I think using Flink SQL should be able to meet your needs.

You can write a Flink Jar program. Accept different directories, schemas,
mappings, and sink tables to generate DDL and DML.

Assuming you have two directories:

directory1 -> f1, f2, f3, f4 -> iceberg1
directory2 -> f1, f2, f3 -> iceberg2

DDL can be generated roughly as follows.

CREATE TABLE s3_table1 (
  f1 varchar,
  f2 varchar,
  f3 varchar,
  f4 varchar
) with (
  'connector' = 's3://dir1'
  .
);

CREATE TABLE s3_table2 (
  f1 varchar,
  f2 varchar,
  f3 varchar
) with (
  'connector' = 's3://dir2',
 ...
);


Based on your MAPPING selection of the fields you need and then generate
DML.

INSERT INTO iceberg_catalog.iceberg_database1.tb1 SELECT
f1,f2,f3 FROM s3_table;

INSERT INTO iceberg_catalog.iceberg_database.tb12 SELECT
f11,f22 FROM s32_table;


Of course,this is my understanding of your requirements,I don't know if it
meets your scenario.


Best regards,
Feng


On Fri, Nov 24, 2023 at 3:02 AM Oxlade, Dan 
wrote:

> Thanks Feng,
>
> I think my challenge (and why I expected I’d need to use Java) is that
> there will be parquet files with different schemas landing in the s3 bucket
> - so I don’t want to hard-code the schema in a sql table definition.
>
> I’m not sure if this is even possible? Maybe I would have to write a job
> that accepts the schema, directory and iceberg target table as params and
> start instances of the job through the job api.
>
> Unless reading the parquet to a temporary table  doesn’t need the schema
> definition? I couldn't really work things out from the links.
>
> Dan
> --
> *From:* Feng Jin 
> *Sent:* Thursday, November 23, 2023 6:49:11 PM
> *To:* Oxlade, Dan 
> *Cc:* user@flink.apache.org 
> *Subject:* [EXTERNAL] Re: flink s3[parquet] -> s3[iceberg]
>
> Hi Oxlade
>
> I think using Flink SQL can conveniently fulfill your requirements.
>
> For S3 Parquet files, you can create a temporary table using a filesystem
> connector[1] .
> For Iceberg tables, FlinkSQL can easily integrate with the Iceberg
> catalog[2].
>
> Therefore, you can use Flink SQL to export S3 files to Iceberg.
>
> If you only need field mapping or transformation, I believe using Flink
> SQL + UDF (User-Defined Functions) would be sufficient to meet your needs.
>
>
> [1].   
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#directory-watching
> [nightlies.apache.org]
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__nightlies.apache.org_flink_flink-2Ddocs-2Dmaster_docs_connectors_table_filesystem_-23directory-2Dwatching=DwMFaQ=NUhaNIajfB1frln1iJ2Yk7NG56jrODI6LbjgSoSeFoE=DniGlstAN2EzNsLZ9xC7twBZPQnWEW90QWwFv-Z9BnI=nzRd1qdU-XluyJusASfUSi-QLOYVOWY6EvDAlicmzJgVY16Jtg60C5aADMd_oLJg=rnrUmbL_i3hK6kK_eWoXjz-67_xsc14c1oUxQrwK75A=>
> [2].  
> https://iceberg.apache.org/docs/latest/flink-connector/#table-managed-in-hadoop-catalog
> [iceberg.apache.org]
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__iceberg.apache.org_docs_latest_flink-2Dconnector_-23table-2Dmanaged-2Din-2Dhadoop-2Dcatalog=DwMFaQ=NUhaNIajfB1frln1iJ2Yk7NG56jrODI6LbjgSoSeFoE=DniGlstAN2EzNsLZ9xC7twBZPQnWEW90QWwFv-Z9BnI=nzRd1qdU-XluyJusASfUSi-QLOYVOWY6EvDAlicmzJgVY16Jtg60C5aADMd_oLJg=gbHDXpaow809oo_go0V99A3jIkA2KMh_mINPyNBwcDs=>
>
>
> Best,
> Feng
>
>
> On Thu, Nov 23, 2023 at 11:23 PM Oxlade, Dan 
> wrote:
>
> Hi all,
>
>
>
> I’m attempting to create a POC in flink to create a pipeline to stream
> parquet to a data warehouse in iceberg format.
>
>
>
> Ideally – I’d like to watch a directory in s3 (minio locally) and stream
> those to iceberg, doing the appropriate schema mapping/translation.
>
>
>
> I guess first; does this sound like a crazy idea?
>
> Assuming not is anyone able to share examples that might get me going.
> I’ve found lots of iceberg and flink sql examples but I think I’ll need
> something in java to do the schema mapping. Also some examples reading
> parquet for s3 seem a little hard to come by.
>
>
>
> I’m aware I’ll need a catalog, I can use nessie for the prototype. I’m
> also trying to use minio to get this all working locally but this might
> just be adding complexity at the moment.
>
>
>
> TIA
>
> Dan
>
> T. Rowe Price International Ltd (registered number 3957748) is registered
> in England and Wales with its registered office at Warwick Court, 5
> Paternoster Square, London EC4M 7DX. T. Rowe Price International Ltd is
> authorised and regulated by the Financial Conduct Authority. The company
> has a branch in Dubai International Financial Centre (regulated by the DFSA
> as a Representative Office).
>
> T. Rowe Price (including T. Rowe Price International Ltd and its
&g

Re: flink s3[parquet] -> s3[iceberg]

2023-11-23 Thread Feng Jin
Hi Oxlade

I think using Flink SQL can conveniently fulfill your requirements.

For S3 Parquet files, you can create a temporary table using a filesystem
connector[1] .
For Iceberg tables, FlinkSQL can easily integrate with the Iceberg
catalog[2].

Therefore, you can use Flink SQL to export S3 files to Iceberg.

If you only need field mapping or transformation, I believe using Flink SQL
+ UDF (User-Defined Functions) would be sufficient to meet your needs.


[1].
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/filesystem/#directory-watching
[2].
https://iceberg.apache.org/docs/latest/flink-connector/#table-managed-in-hadoop-catalog


Best,
Feng


On Thu, Nov 23, 2023 at 11:23 PM Oxlade, Dan 
wrote:

> Hi all,
>
>
>
> I’m attempting to create a POC in flink to create a pipeline to stream
> parquet to a data warehouse in iceberg format.
>
>
>
> Ideally – I’d like to watch a directory in s3 (minio locally) and stream
> those to iceberg, doing the appropriate schema mapping/translation.
>
>
>
> I guess first; does this sound like a crazy idea?
>
> Assuming not is anyone able to share examples that might get me going.
> I’ve found lots of iceberg and flink sql examples but I think I’ll need
> something in java to do the schema mapping. Also some examples reading
> parquet for s3 seem a little hard to come by.
>
>
>
> I’m aware I’ll need a catalog, I can use nessie for the prototype. I’m
> also trying to use minio to get this all working locally but this might
> just be adding complexity at the moment.
>
>
>
> TIA
>
> Dan
>
> T. Rowe Price International Ltd (registered number 3957748) is registered
> in England and Wales with its registered office at Warwick Court, 5
> Paternoster Square, London EC4M 7DX. T. Rowe Price International Ltd is
> authorised and regulated by the Financial Conduct Authority. The company
> has a branch in Dubai International Financial Centre (regulated by the DFSA
> as a Representative Office).
>
> T. Rowe Price (including T. Rowe Price International Ltd and its
> affiliates) and its associates do not provide legal or tax advice. Any
> tax-related discussion contained in this e-mail, including any attachments,
> is not intended or written to be used, and cannot be used, for the purpose
> of (i) avoiding any tax penalties or (ii) promoting, marketing, or
> recommending to any other party any transaction or matter addressed herein.
> Please consult your independent legal counsel and/or professional tax
> advisor regarding any legal or tax issues raised in this e-mail.
>
> The contents of this e-mail and any attachments are intended solely for
> the use of the named addressee(s) and may contain confidential and/or
> privileged information. Any unauthorized use, copying, disclosure, or
> distribution of the contents of this e-mail is strictly prohibited by the
> sender and may be unlawful. If you are not the intended recipient, please
> notify the sender immediately and delete this e-mail.
>


Re: Flink-1.15版本

2023-11-23 Thread Feng Jin
看起来是类似的,不过这个报错应该是作业失败之后的报错,看还有没有其他的异常日志。


Best,
Feng

On Sat, Nov 4, 2023 at 3:26 PM Ray  wrote:

> 各位专家:当前遇到如下问题1、场景:在使用Yarn场景下提交flink任务2、版本:Flink1.15.03、日志:查看yarn上的日志发下,版本上的问题2023-11-04
> 15:04:42,313 ERROR org.apache.flink.util.FatalExitExceptionHandler
> [] - FATAL: Thread 'flink-akka.actor.internal-dispatcher-3' produced an
> uncaught exception. Stopping the process...java.lang.NoClassDefFoundError:
> akka/actor/dungeon/FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1
> at
> akka.actor.dungeon.FaultHandling.handleNonFatalOrInterruptedException(FaultHandling.scala:334)
> ~[flink-rpc-akka_abb25197-9eee-499b-8c1f-f52d0afc4374.jar:1.15.0]
> at
> akka.actor.dungeon.FaultHandling.handleNonFatalOrInterruptedException$(FaultHandling.scala:334)
> ~[flink-rpc-akka_abb25197-9eee-499b-8c1f-f52d0afc4374.jar:1.15.0]
> at
> akka.actor.ActorCell.handleNonFatalOrInterruptedException(ActorCell.scala:411)
> ~[flink-rpc-akka_abb25197-9eee-499b-8c1f-f52d0afc4374.jar:1.15.0]
> at akka.actor.ActorCell.invoke(ActorCell.scala:551)
> ~[flink-rpc-akka_abb25197-9eee-499b-8c1f-f52d0afc4374.jar:1.15.0]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
> ~[flink-rpc-akka_abb25197-9eee-499b-8c1f-f52d0afc4374.jar:1.15.0]
> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
> ~[flink-rpc-akka_abb25197-9eee-499b-8c1f-f52d0afc4374.jar:1.15.0]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> [flink-rpc-akka_abb25197-9eee-499b-8c1f-f52d0afc4374.jar:1.15.0]
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> [?:1.8.0_181]
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> [?:1.8.0_181]
> at
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> [?:1.8.0_181]
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
> [?:1.8.0_181]
> Caused by: java.lang.ClassNotFoundException:
> akka.actor.dungeon.FaultHandling$$anonfun$handleNonFatalOrInterruptedException$1
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> ~[?:1.8.0_181]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> ~[?:1.8.0_181]
> at
> org.apache.flink.core.classloading.ComponentClassLoader.loadClassFromComponentOnly(ComponentClassLoader.java:149)
> ~[flink-dist-1.15.0.jar:1.15.0]
> at
> org.apache.flink.core.classloading.ComponentClassLoader.loadClass(ComponentClassLoader.java:112)
> ~[flink-dist-1.15.0.jar:1.15.0]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ~[?:1.8.0_181]
> ... 11 more
> 2023-11-04 15:04:42,324 ERROR
> org.apache.flink.runtime.util.ClusterUncaughtExceptionHandler [] - WARNING:
> Thread 'flink-shutdown-hook-1' produced an uncaught exception. If you want
> to fail on uncaught exceptions, then configure
> cluster.uncaught-exception-handling accordingly
> java.lang.NoClassDefFoundError:
> scala/collection/convert/Wrappers$MutableSetWrapper
> at
> scala.collection.convert.AsScalaConverters.asScalaSet(AsScalaConverters.scala:126)
> ~[flink-rpc-akka_abb25197-9eee-499b-8c1f-f52d0afc4374.jar:1.15.0]
> at
> scala.collection.convert.AsScalaConverters.asScalaSet$(AsScalaConverters.scala:124)
> ~[flink-rpc-akka_abb25197-9eee-499b-8c1f-f52d0afc4374.jar:1.15.0]
> at
> akka.util.ccompat.package$JavaConverters$.asScalaSet(package.scala:86)
> ~[flink-rpc-akka_abb25197-9eee-499b-8c1f-f52d0afc4374.jar:1.15.0]
> at
> scala.collection.convert.DecorateAsScala.$anonfun$asScalaSetConverter$1(DecorateAsScala.scala:59)
> ~[flink-rpc-akka_abb25197-9eee-499b-8c1f-f52d0afc4374.jar:1.15.0]
> at
> scala.collection.convert.Decorators$AsScala.asScala(Decorators.scala:25)
> ~[flink-rpc-akka_abb25197-9eee-499b-8c1f-f52d0afc4374.jar:1.15.0]
> at
> akka.actor.CoordinatedShutdown$tasks$.totalDuration(CoordinatedShutdown.scala:481)
> ~[flink-rpc-akka_abb25197-9eee-499b-8c1f-f52d0afc4374.jar:1.15.0]
> at
> akka.actor.CoordinatedShutdown.totalTimeout(CoordinatedShutdown.scala:784)
> ~[flink-rpc-akka_abb25197-9eee-499b-8c1f-f52d0afc4374.jar:1.15.0]
> at
> akka.actor.CoordinatedShutdown$.$anonfun$initJvmHook$1(CoordinatedShutdown.scala:271)
> ~[flink-rpc-akka_abb25197-9eee-499b-8c1f-f52d0afc4374.jar:1.15.0]
> at
> akka.actor.CoordinatedShutdown$$anon$3.run(CoordinatedShutdown.scala:814)
> ~[flink-rpc-akka_abb25197-9eee-499b-8c1f-f52d0afc4374.jar:1.15.0]
> Caused by: java.lang.ClassNotFoundException:
> scala.collection.convert.Wrappers$MutableSetWrapper
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> ~[?:1.8.0_181]
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> ~[?:1.8.0_181]
> at
> org.apache.flink.core.classloading.ComponentClassLoader.loadClassFromComponentOnly(ComponentClassLoader.java:149)
> ~[flink-dist-1.15.0.jar:1.15.0]
> at
> 

Re: Canal-json格式下乱码导致结果不符合预期

2023-11-13 Thread Feng Jin
hi

这个看起来不像是乱码造成的。

你可以尝试加上去重,还原出正确的CDC 再看下结果。

具体步骤如下:
1. 给 source 设置主键
2. table config 中设置 table.exec.source.cdc-events-duplicate 参数为 true
或者 set 'table.exec.source.cdc-events-duplicate'='true'

Best,
Feng



On Mon, Nov 13, 2023 at 4:09 PM yawning  wrote:

> mysql里面字段:
>
> `encrypted_xx` blob
>
> Canal-json
> "encrypted_xx":"\u0003üUãcA\u0018\u001A}àh\u0013\u001F æÉ"
>
>
>
> 乱码中会有}]这种特殊符号
>
>
> 普通查询:
> select * from tbl
>
> 结果符合预期:
>
> -U[273426307, xxx, u°àÈ;óX«V, üUãcA}àh æÉ, 1699359473865, 2]
> +U[273426307, xxx,
> u°àÈ;óX«V, üUãcA}àh æÉ, 1699359473865, 2]
> -U[399385197, yyy, nì¡… ^³Ø )½ÍU¸, ÉîA6³gŽŽMõýNêfŠ], 1697648682595, 0]
> +U[399385197, yyy, nì¡… ^³Ø )½ÍU¸, ÉîA6³gŽŽMõýNêfŠ], 1699359694026, 0]
> -U[399385197, yyy, nì¡… ^³Ø )½ÍU¸, ÉîA6³gŽŽMõýNêfŠ], 1699359694026, 0]
> +U[399385197, yyy, nì¡… ^³Ø )½ÍU¸, ÉîA6³gŽŽMõýNêfŠ], 1699359694242, 0]
>
> 聚合查询:
> select count(*) from tbl
>
>
> 结果错误
> +I[1]-D[1]
> +I[1]
> -D[1]
> +I[1]


Re: Flink Job Failed With Kafka Exception

2023-11-08 Thread Feng Jin
Hi Madan,

Perhaps you can filter out inactive topics in the client first and then
pass the filtered list of topics to KafkaConsumer.

Best,
Feng


On Tue, Nov 7, 2023 at 10:42 AM Madan D via user 
wrote:

> Hello Hang/Lee,
> Thanks!
> In my usecase we listen from multiple topics but in few cases one of the
> topic may become inactive if producer decides to shutdown one of the topic
> but other topics still will be receiving data but what we observe is that
> if there’s one of the topic is getting in-active entire flink application
> is getting failed due to time out while getting metadata but we would like
> flink job to continue to consume data from other source topics even if one
> of the topic has any issue since failing entire flink application doesn’t
> make sense if one if the topic has issue.
>
>
>
> Regards,
> Madan
>
> On Nov 5, 2023, at 11:29 PM, Hang Ruan  wrote:
>
> 
> Hi, Madan.
>
> This error seems like that there are some problems when the consumer tries
> to read the topic metadata. If you use the same source for these topics,
> the kafka connector cannot skip one of them. As you say, you need to modify
> the connector's default behavior.
> Maybe you should read the code in KafkaSourceEnumerator to skip this error.
>
> Best,
> Hang
>
> Junrui Lee  于2023年11月6日周一 14:30写道:
>
>> Hi Madan,
>>
>> Do you mean you want to restart only the failed tasks, rather than
>> restarting the entire pipeline region? As far as I know, currently Flink
>> does not support task-level restart, but requires restarting the pipeline
>> region.
>>
>> Best,
>> Junrui
>>
>> Madan D via user  于2023年10月11日周三 12:37写道:
>>
>>> Hello Team,
>>> We are running the Flink pipeline by consuming data from multiple
>>> topics, but we recently encountered that if there's one topic having issues
>>> with participation, etc., the whole Flink pipeline is failing, which is
>>> affecting topics. Is there a way we can make Flink Piplein keep running
>>> even after one of the topics has an issue? We tried to handle exceptions to
>>> make sure the job wouldn't fail, but it didn't help out.
>>>
>>> Caused by: java.lang.RuntimeException: Failed to get metadata for topics
>>>
>>>
>>> Can you please provide any insights?
>>>
>>>
>>> Regards,
>>> Madan
>>>
>>


Re: Monitoring File Processing Progress in Flink Jobs

2023-11-01 Thread Feng Jin
Hi arjun

Flink will save the currently processed file and its corresponding offset
in Flink state [1]. You may need to use the Flink state process API[1] to
access it.

However, I don't think this is a good approach. I suggest adding relevant
metrics to the FileSystem connector to report the current number of pending
files for monitoring the status of file processing.

For the details of the processed files , you can refer to FLIP 27 [3]
design document, which contains some details.


   1.


   
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/PendingSplitsCheckpoint.java
   2.


   
https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/libs/state_processor_api/
   3.


   
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface


Best,

Feng

On Tue, Oct 31, 2023 at 12:16 AM arjun s  wrote:

> Hi team,
> I'm also interested in finding out if there is Java code available to
> determine the extent to which a Flink job has processed files within a
> directory. Additionally, I'm curious about where the details of the
> processed files are stored within Flink.
>
> Thanks and regards,
> Arjun S
>


Re: [ANNOUNCE] Apache Flink 1.18.0 released

2023-10-26 Thread Feng Jin
Thanks for the great work! Congratulations


Best,
Feng Jin

On Fri, Oct 27, 2023 at 12:36 AM Leonard Xu  wrote:

> Congratulations, Well done!
>
> Best,
> Leonard
>
> On Fri, Oct 27, 2023 at 12:23 AM Lincoln Lee 
> wrote:
>
> > Thanks for the great work! Congrats all!
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Jing Ge  于2023年10月27日周五 00:16写道:
> >
> > > The Apache Flink community is very happy to announce the release of
> > Apache
> > > Flink 1.18.0, which is the first release for the Apache Flink 1.18
> > series.
> > >
> > > Apache Flink® is an open-source unified stream and batch data
> processing
> > > framework for distributed, high-performing, always-available, and
> > accurate
> > > data applications.
> > >
> > > The release is available for download at:
> > > https://flink.apache.org/downloads.html
> > >
> > > Please check out the release blog post for an overview of the
> > improvements
> > > for this release:
> > >
> > >
> >
> https://flink.apache.org/2023/10/24/announcing-the-release-of-apache-flink-1.18/
> > >
> > > The full release notes are available in Jira:
> > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352885
> > >
> > > We would like to thank all contributors of the Apache Flink community
> who
> > > made this release possible!
> > >
> > > Best regards,
> > > Konstantin, Qingsheng, Sergey, and Jing
> > >
> >
>


Re: [ANNOUNCE] Apache Flink 1.18.0 released

2023-10-26 Thread Feng Jin
Thanks for the great work! Congratulations


Best,
Feng Jin

On Fri, Oct 27, 2023 at 12:36 AM Leonard Xu  wrote:

> Congratulations, Well done!
>
> Best,
> Leonard
>
> On Fri, Oct 27, 2023 at 12:23 AM Lincoln Lee 
> wrote:
>
> > Thanks for the great work! Congrats all!
> >
> > Best,
> > Lincoln Lee
> >
> >
> > Jing Ge  于2023年10月27日周五 00:16写道:
> >
> > > The Apache Flink community is very happy to announce the release of
> > Apache
> > > Flink 1.18.0, which is the first release for the Apache Flink 1.18
> > series.
> > >
> > > Apache Flink® is an open-source unified stream and batch data
> processing
> > > framework for distributed, high-performing, always-available, and
> > accurate
> > > data applications.
> > >
> > > The release is available for download at:
> > > https://flink.apache.org/downloads.html
> > >
> > > Please check out the release blog post for an overview of the
> > improvements
> > > for this release:
> > >
> > >
> >
> https://flink.apache.org/2023/10/24/announcing-the-release-of-apache-flink-1.18/
> > >
> > > The full release notes are available in Jira:
> > >
> > >
> >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352885
> > >
> > > We would like to thank all contributors of the Apache Flink community
> who
> > > made this release possible!
> > >
> > > Best regards,
> > > Konstantin, Qingsheng, Sergey, and Jing
> > >
> >
>


Re: Flink SQL: MySQL to Elaticsearch soft delete

2023-10-22 Thread Feng Jin
Hi Hemi

You can not just filter the delete records.

You must use the following syntax to generate a delete record.

```
CREATE TABLE test_source (f1 xxx, f2, xxx, f3 xxx,  deleted boolean) with
(.);

INSERT INTO es_sink
SELECT f1, f2, f3
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY f1, f2   ORDER BY proctime())  as rnk
from test_source
) where rnk = 1 AND deleted = false;
```


Best,
Feng


On Sat, Oct 21, 2023 at 11:32 PM Hemi Grs  wrote:

> Hi Feng,
>
> thanks for the reply. I actually already tried that because we have a
> deletedAt column (if the record is deleted then it will update with the
> timestamp to that column).
>
> What I've tried is as follows:
>
> insert into es_sink_table
> select * from table_source where deletedAt is null
>
> what it actually do is just not updating any more changes to the elastic
> because it's not queried but it't not actually deleting the documents in
> elastic.
> What I want to achieve is to actually delete the record/document in
> elastic ...
>
> What I'm thinking right now is actually just doing a job every few minutes
> to delete all records where the deletedAt is not null in elastic. But this
> approach is not elegant at all :) That's why I was hoping there'll be a
> better method to do this ...
>
>
>
> On Sat, Oct 21, 2023 at 5:21 PM Feng Jin  wrote:
>
>> Hi Hemi,
>>
>> One possible way, but it may generate many useless states.
>>
>> As shown below:
>> ```
>> CREATE TABLE test_source (f1 xxx, f2, xxx, f3 xxx,  deleted boolean) with
>> (.);
>>
>> INSERT INTO es_sink
>> SELECT f1, f2, f3
>> FROM (
>> SELECT *,
>> ROW_NUMBER() OVER (PARTITION BY f1, f2   ORDER BY proctime())  as rnk
>> from test_source
>> ) where rnk = 1 AND deleted = false;
>> ```
>>
>> Best,
>> Feng
>>
>> On Fri, Oct 20, 2023 at 1:38 PM Hemi Grs  wrote:
>>
>>> hello everyone,
>>>
>>> right now I'm using flink to sync from mysql to elasticsearch and so far
>>> so good. If we insert, update, or delete it will sync from mysql to elastic
>>> without any problem.
>>>
>>> The problem I have right now is the application is not actually doing
>>> hard delete to the records in mysql, but doing soft delete (updating a
>>> deletedAt column).
>>>
>>> Because it's not actually doing a deletion, flink is not deleting the
>>> data in elastic. How do I make it so it will delete the data in elastic?
>>>
>>> Thanks
>>>
>>


Re: Flink SQL: MySQL to Elaticsearch soft delete

2023-10-21 Thread Feng Jin
Hi Hemi,

One possible way, but it may generate many useless states.

As shown below:
```
CREATE TABLE test_source (f1 xxx, f2, xxx, f3 xxx,  deleted boolean) with
(.);

INSERT INTO es_sink
SELECT f1, f2, f3
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY f1, f2   ORDER BY proctime())  as rnk
from test_source
) where rnk = 1 AND deleted = false;
```

Best,
Feng

On Fri, Oct 20, 2023 at 1:38 PM Hemi Grs  wrote:

> hello everyone,
>
> right now I'm using flink to sync from mysql to elasticsearch and so far
> so good. If we insert, update, or delete it will sync from mysql to elastic
> without any problem.
>
> The problem I have right now is the application is not actually doing hard
> delete to the records in mysql, but doing soft delete (updating a deletedAt
> column).
>
> Because it's not actually doing a deletion, flink is not deleting the data
> in elastic. How do I make it so it will delete the data in elastic?
>
> Thanks
>


Re: kafka_appender收集flink任务日志连接数过多问题

2023-10-19 Thread Feng Jin
可以考虑在每台 yarn 机器部署日志服务(可收集本机的日志到 kafka)
yarn container -> 单机的日志服务 -> kafka.



On Mon, Oct 16, 2023 at 3:58 PM 阿华田  wrote:

>
> Flink集群使用kafka_appender收集flink产生的日志,但是现在实时运行的任务超过了三千个,运行的yarn-container有20万+。导致存储日志的kafka集群连接数过多,kafka集群压力有点大,请教各位大佬flink日志采集这块还有什么好的方式吗
>
>
> | |
> 阿华田
> |
> |
> a15733178...@163.com
> |
> 签名由网易邮箱大师定制
>
>


Re: flink sql不支持show create catalog 吗?

2023-10-19 Thread Feng Jin
hi casel


从 1.18 开始,引入了 CatalogStore,持久化了 Catalog 的配置,确实可以支持 show create catalog 了。


Best,
Feng

On Fri, Oct 20, 2023 at 11:55 AM casel.chen  wrote:

> 之前在flink sql中创建过一个catalog,现在想查看当初创建catalog的语句复制并修改一下另存为一个新的catalog,发现flink
> sql不支持show create catalog 。
> 而据我所知doris是支持show create catalog语句的。flink sql能否支持一下呢?


Re: How to handle BatchUpdateException when using JdbcSink

2023-10-19 Thread Feng Jin
Hi Sai,

If you directly utilize JdbcSink, you may not be able to catch this
exception.

But, you can create your own SinkFunction and invoke the `invoke` method of
JdbcSink and catch the Exception, and invoke the dlq sink.

As shown below,
```
public class SinkWrapper {

private JdbcSink jdbcSink;
private DlpSink dlpSink;

public void open(Configuration parameters){
jdbcSink.open(parameters);
   dlpSink.open(parameters);
}

@Override
public void invoke(T value, Context context) throws IOException {
   try {
 jdbcSink.invoke(value, context);
  } catch (Exception e) {
dlpSink.invoke(value, context);
  }
   }
}

```


Best,
Feng


On Thu, Oct 19, 2023 at 4:12 PM Sai Vishnu 
wrote:

> Hi team,
>
>
>
> We are using the JdbcSink from flink-connector-jdbc artifact, version
> 3.1.0-1.17.
>
> I want to know if it’s possible to catch BatchUpdateException thrown and
> put that message to DLQ.
>
>
>
> Below is the use case:
>
> Flink job reads a packet from Kafka and writes it to Postgres using the
> JdbcSink.
>
> For any missing field, we are catching it during the data transform layer
> and writing it a side output that writes the exception along with the
> original message to a dlq sink.
>
> In scenarios where a field has characters that is greater than what is
> defined in Postgres, we are currently receiving a BatchUpdateException
> during the update to the Postgres stage.
>
>
>
> Is it possible to catch this exception and write the message to a dlq sink?
>
>
>
> Thanks,
>
> Sai Vishnu Soudri
>


Re: Table API in process function

2023-10-15 Thread Feng Jin
Hi Yashoda,

I think this is not a reasonable way and it is not supported at the moment.

I suggest that you can convert the DataStream generated by windowsAll into
a Table, and then use the TableAPI.

AllWindowProcess -> ConvertDataStreamToTable ->  ProcessUsingTableAPI


Best,
Feng

On Fri, Oct 13, 2023 at 9:31 PM Yashoda Krishna T 
wrote:

> Is it possible to use table API inside a processAll window function .
> Lets say, the use case is process function should enrich for each element
> by querying some SQL queries over the entire elements in the window using
> table API. Is this case supported in flink? If not what is the suggested way
>
> Thanks
>


Re: Flink with HDFS

2023-10-11 Thread Feng Jin
Hi Jose,

Sorry, my previous response may have been misleading.

I have confirmed here that Flink 1.15 only supports Hadoop 2.8.5 and above,
so you should use a Hadoop version of 2.8.5 or higher.
https://nightlies.apache.org/flink/flink-docs-release-1.15/release-notes/flink-1.15/#upgrade-the-minimal-supported-hadoop-version-to-285

Starting from Flink 1.11, Flink no longer provides shaded-Hadoop and
recommends configuring HADOOP_CLASSPATH instead.
https://nightlies.apache.org/flink/flink-docs-release-1.11/release-notes/flink-1.11.html#support-for-hadoop-300-and-higher-flink-11086


Best,
Feng

On Thu, Oct 12, 2023 at 4:26 AM Jose Henrry Matituy Manchabajoy <
henrrymati...@gmail.com> wrote:

> Thanks for the above reply.
>
> To the repesct, I comment the following: in tests performed with
> flink-shaded-hadoop-2-uber-2.8.3-10.0.jar, Hadoop 2.8.3 and Flink 1.10.0
> everything goes fine. But, when I upgrade Flink to 1.15.0. I encounter
> errors. According to the documentation, Flink 1.15.0 only works with Hadoop
> versions higher than 2.8.5. Apparently, flink-shaded-hadoop-2-uber-2.8.3,
> only works with hadoop 2.8.3. Due to the above, I cannot connect Flink with
> HDFS. Could you please clarify this aspect for me?
>
> Thank you very much.
>
> El lun, 9 oct 2023 a las 8:10, Jose Henrry Matituy Manchabajoy (<
> henrrymati...@gmail.com>) escribió:
>
>> Cordial greetings
>>
>> In tests performed with flink-shaded-hadoop-2-uber-2.8.3-10.0.jar, Hadoop
>> 2.8.3 and Flink 1.10.0 everything is fine. However when I upgrade Flink to
>> 1.15.0. I encounter errors. According to the documentation, Flink 1.15.0
>> only works with Hadoop versions higher than 2.8.5. Apparently,
>> flink-shaded-hadoop-2-uber-2.8.3, only works with hadoop 2.8.3. Because of
>> the above, I am unable to connect Flink with HDFS.
>>
>> I appreciate your valuable collaboration.
>>
>> Yours sincerely,
>>
>>
>> Jose Henrry Matituy
>>
>> El mié, 27 sept 2023 a las 20:34, Feng Jin ()
>> escribió:
>>
>>> Hi Jose
>>>
>>> This component is compatible with the latest version of Flink (1.17.1).
>>> The package in question is merely a shaded dependency for Hadoop and is not
>>> dependent on the version of Flink.
>>>
>>> Best,
>>> Feng
>>>
>>> On Thu, Sep 28, 2023 at 4:03 AM Jose Henrry Matituy Manchabajoy <
>>> henrrymati...@gmail.com> wrote:
>>>
>>>> Cordial greetings
>>>>
>>>>
>>>>
>>>> I am working on an application where I need to use Flik with HDFS. I
>>>> understand that to make the connection I need to use the
>>>> flink-shaded-hadoop-2-uber-2.8.3-10.0.jar extension. My question is. What
>>>> is the most up to date version of Flink that works with this extension or
>>>> is there another way to use a more up to date version of Flink with HDFS?
>>>>
>>>>
>>>>
>>>> Thank you very much for your attention.
>>>>
>>>>
>>>>
>>>> Yours sincerely,
>>>>
>>>>
>>>> Jose Henrry Matituy
>>>>
>>>


Re: flink两阶段提交

2023-10-07 Thread Feng Jin
hi,

可以参考这篇博客,描述的非常清晰:
https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/


Best,
Feng

On Sun, Sep 24, 2023 at 9:54 PM 海风 <18751805...@163.com> wrote:

> 请教一下,flink的两阶段提交对于sink算子,预提交是在做检查点的哪个阶段触发的?预提交时具体是做了什么工作?
>
>
>


Re: Flink CDC消费Apache Paimon表

2023-10-07 Thread Feng Jin
hi casel

Flink 实时消费 paimon,默认情况就是全量 + 增量的方式。

具体可以参考:  https://paimon.apache.org/docs/master/maintenance/configurations/
中的 scan.mode 参数


best,
Feng

On Fri, Sep 29, 2023 at 5:50 PM casel.chen  wrote:

> 目前想要通过Flink全量+增量消费Apache Paimon表需要分别起离线和增量消费两个作业,比较麻烦,而且无法无缝衔接,能否通过类似Flink
> CDC消费mysql表的方式消费Apache Paimon表?


Re: Flink Kafka offset commit issues

2023-09-28 Thread Feng Jin
Hi Elakiya

1. You can confirm if the checkpoint for the task has been triggered
normally?

2. Also, If you stop the job, you need to use "STOP WITH SAVEPOINT" and
specify the path to the savepoint when starting the Flink job for recovery.
This is necessary to continue consuming from the historical offset
correctly.


Best,
Feng


On Thu, Sep 28, 2023 at 4:41 PM elakiya udhayanan 
wrote:

> Hi team,
>
> I have a Kafka topic named employee which uses confluent avro schema and
> will emit the payload as below:
> {
> "id": "emp_123456",
> "employee": {
> "id": "123456",
> "name": "sampleName"
> }
> }
> I am using the upsert-kafka connector to consume the events from the above
> Kafka topic as below using the Flink SQL DDL statement.The problem is the
> connector is not committing the offset. Everytime, I submit the job, it
> reads Kafka events from the beginning. Please let me know if we can commit
> the offset for the read Kafka events.
>
> DDL Statement:
> String statement = "CREATE TABLE Employee (\r\n" +
> "  id STRING,\r\n" +
> "  employee  ROW(id STRING, name STRING\r\n" +
> "  ),\r\n" +
> "  PRIMARY KEY (i d) NOT ENFORCED\r\n" +
> ") WITH (\r\n" +
> "  'connector' = 'upsert-kafka',\r\n" +
> "  'topic' = 'employee',\r\n" +
> "  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
> "  'key.format' = 'raw',\r\n" +
> "  'value.format' = 'avro-confluent',\r\n" +
> "  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n"
> +
> ")";
> Any help is appreciated TIA
>
> Thanks,
> Elakiya
>


Re: After using the jemalloc memory allocator for a period of time, checkpoint timeout occurs and tasks are stuck

2023-09-28 Thread Feng Jin
hi Rui,

We are using open jdk ,  `openjdk version "1.8.0_265"`


Best,
Feng

On Thu, Sep 28, 2023 at 2:15 PM rui chen  wrote:

> hi Feng
>
> Are you using the open jdk or the oracle jdk?
>
> Best,
> rui
>
> rui chen  于2023年9月27日周三 20:22写道:
>
>> hi Feng,
>>
>> Thanks for your reply, we are 8 u192 JDK, may be is the question, I found
>> a JDK issue:https://bugs.openjdk.org/browse/JDK-8215355.
>>
>> Best,
>> rui
>>
>> Feng Jin  于2023年9月27日周三 20:09写道:
>>
>>> hi Rui,
>>>
>>> Which version of JDK are you using?
>>>
>>> This issue could potentially be a bug in the JDK version.
>>>
>>> If you are using JDK 8, you can try using OpenJDK 8u265 as a possible
>>> solution.
>>>
>>>
>>> Best,
>>> Feng
>>>
>>>
>>> On Wed, Sep 27, 2023 at 8:08 PM rui chen  wrote:
>>>
>>>>
>>>>
>>>> rui chen  于2023年9月27日周三 19:32写道:
>>>>
>>>>> hi Feng,
>>>>>
>>>>> Thank you for your reply,We observed the GC situation, there is no
>>>>> change before and after replacement, several tasks on our line using
>>>>> jemalloc have appeared stuck, after removing jemalloc, no stuck situation
>>>>> has been found.
>>>>>
>>>>> Best,
>>>>> rui
>>>>>
>>>>> Feng Jin  于2023年9月27日周三 19:19写道:
>>>>>
>>>>>>
>>>>>> hi rui,
>>>>>>
>>>>>> In general, checkpoint timeouts are typically associated with the
>>>>>> job's processing performance. When using jemalloc, performance 
>>>>>> degradation
>>>>>> is generally not observed.
>>>>>>
>>>>>> It is advisable to analyze whether the job's garbage collection (GC)
>>>>>> has become more frequent.
>>>>>>
>>>>>>
>>>>>> Best,
>>>>>> Feng
>>>>>>
>>>>>>
>>>>>> On Mon, Sep 25, 2023 at 1:21 PM rui chen 
>>>>>> wrote:
>>>>>>
>>>>>>> After using the jemalloc memory allocator for a period of time,
>>>>>>> checkpoint timeout occurs and tasks are stuck. Who has encountered this?
>>>>>>> flink version:1.13.2, jiemalloc version: 5.3.0
>>>>>>>
>>>>>>


Re: Flink with HDFS

2023-09-27 Thread Feng Jin
Hi Jose

This component is compatible with the latest version of Flink (1.17.1). The
package in question is merely a shaded dependency for Hadoop and is not
dependent on the version of Flink.

Best,
Feng

On Thu, Sep 28, 2023 at 4:03 AM Jose Henrry Matituy Manchabajoy <
henrrymati...@gmail.com> wrote:

> Cordial greetings
>
>
>
> I am working on an application where I need to use Flik with HDFS. I
> understand that to make the connection I need to use the
> flink-shaded-hadoop-2-uber-2.8.3-10.0.jar extension. My question is. What
> is the most up to date version of Flink that works with this extension or
> is there another way to use a more up to date version of Flink with HDFS?
>
>
>
> Thank you very much for your attention.
>
>
>
> Yours sincerely,
>
>
> Jose Henrry Matituy
>


Re: After using the jemalloc memory allocator for a period of time, checkpoint timeout occurs and tasks are stuck

2023-09-27 Thread Feng Jin
hi Rui,

Which version of JDK are you using?

This issue could potentially be a bug in the JDK version.

If you are using JDK 8, you can try using OpenJDK 8u265 as a possible
solution.


Best,
Feng


On Wed, Sep 27, 2023 at 8:08 PM rui chen  wrote:

>
>
> rui chen  于2023年9月27日周三 19:32写道:
>
>> hi Feng,
>>
>> Thank you for your reply,We observed the GC situation, there is no change
>> before and after replacement, several tasks on our line using jemalloc have
>> appeared stuck, after removing jemalloc, no stuck situation has been found.
>>
>> Best,
>> rui
>>
>> Feng Jin  于2023年9月27日周三 19:19写道:
>>
>>>
>>> hi rui,
>>>
>>> In general, checkpoint timeouts are typically associated with the job's
>>> processing performance. When using jemalloc, performance degradation is
>>> generally not observed.
>>>
>>> It is advisable to analyze whether the job's garbage collection (GC) has
>>> become more frequent.
>>>
>>>
>>> Best,
>>> Feng
>>>
>>>
>>> On Mon, Sep 25, 2023 at 1:21 PM rui chen  wrote:
>>>
>>>> After using the jemalloc memory allocator for a period of time,
>>>> checkpoint timeout occurs and tasks are stuck. Who has encountered this?
>>>> flink version:1.13.2, jiemalloc version: 5.3.0
>>>>
>>>


Re: After using the jemalloc memory allocator for a period of time, checkpoint timeout occurs and tasks are stuck

2023-09-27 Thread Feng Jin
hi rui,

In general, checkpoint timeouts are typically associated with the job's
processing performance. When using jemalloc, performance degradation is
generally not observed.

It is advisable to analyze whether the job's garbage collection (GC) has
become more frequent.


Best,
Feng


On Mon, Sep 25, 2023 at 1:21 PM rui chen  wrote:

> After using the jemalloc memory allocator for a period of time, checkpoint
> timeout occurs and tasks are stuck. Who has encountered this? flink
> version:1.13.2, jiemalloc version: 5.3.0
>


Re: Pending Metrics

2023-09-27 Thread Feng Jin
Hi Rania

Currently, there are no similar metrics available. Only the number of
memory segments in the queue between tasks
(inputQueueLength/outputQueueLength) and the buffer occupancy ratio
(inPoolUsage/outPoolUsage) are available.

For more detailed information, please refer to:
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/metrics/#network


Best,
Feng

On Mon, Sep 25, 2023 at 8:19 PM rania duni  wrote:

> Hello!
>
> Is there any metric that refers to the queued records, for each operator?
>
> Thanks!
>


Re: About Flink parquet format

2023-09-24 Thread Feng Jin
Hi Kamal

Indeed, Flink does not handle this exception. When this exception occurs,
the Flink job will fail directly and internally keep restarting,
continuously creating new files.

Personally, I think this logic can be optimized. When this exception
occurs, the file with the exception should be deleted before the Flink job
exits, to avoid generating too many unnecessary files.


Best,
Feng

On Mon, Sep 25, 2023 at 10:27 AM Kamal Mittal 
wrote:

> Hello,
>
>
>
> Can you please share that why Flink is not able to handle exception and
> keeps on creating files continuously without closing?
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Kamal Mittal via user 
> *Sent:* 21 September 2023 07:58 AM
> *To:* Feng Jin 
> *Cc:* user@flink.apache.org
> *Subject:* RE: About Flink parquet format
>
>
>
> Yes.
>
>
>
> Due to below error, Flink bulk writer never close the part file and keep
> on creating new part file continuously. Is flink not handling exceptions
> like below?
>
>
>
> *From:* Feng Jin 
> *Sent:* 20 September 2023 05:54 PM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: About Flink parquet format
>
>
>
> Hi
>
>
>
> I tested it on my side and also got the same error. This should be a
> limitation of Parquet.
>
>
>
> ```
>
> java.lang.IllegalArgumentException: maxCapacityHint can't be less than
> initialSlabSize 64 1
>
> at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:
> 57) ~[flink-sql-parquet-1.17.1.jar:1.17.1]
>
> at org.apache.parquet.bytes.CapacityByteArrayOutputStream.(
> CapacityByteArrayOutputStream.java:153) ~[flink-sql-parquet-1.17.1.jar:
> 1.17.1]
>
> at org.apache.parquet.column.values.rle.
> RunLengthBitPackingHybridEncoder.(RunLengthBitPackingHybridEncoder
> .jav
>
> ```
>
>
>
>
>
> So I think the current minimum page size that can be set for parquet is
> 64B.
>
>
>
> Best,
>
> Feng
>
>
>
>
>
> On Tue, Sep 19, 2023 at 6:06 PM Kamal Mittal 
> wrote:
>
> Hello,
>
>
>
> If given page size as 1 byte then encountered exception as  -
> ‘maxCapacityHint can't be less than initialSlabSize %d %d’.
>
>
>
> This is coming from class CapacityByteArrayOutputStream and contained in
> parquet-common library.
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Feng Jin 
> *Sent:* 19 September 2023 01:01 PM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: About Flink parquet format
>
>
>
> Hi Kamal
>
>
>
> What exception did you encounter? I have tested it locally and it works
> fine.
>
>
>
>
>
> Best,
>
> Feng
>
>
>
>
>
> On Mon, Sep 18, 2023 at 11:04 AM Kamal Mittal 
> wrote:
>
> Hello,
>
>
>
> Checkpointing is enabled and works fine if configured parquet page size is
> at least 64 bytes as otherwise there is exception thrown at back-end.
>
>
>
> Looks to be an issue which is not handled by file sink bulk writer?
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Feng Jin 
> *Sent:* 15 September 2023 04:14 PM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: About Flink parquet format
>
>
>
> Hi Kamal
>
>
>
> Check if the checkpoint of the task is enabled and triggered correctly. By
> default, write parquet files will roll a new file when checkpointing.
>
>
>
>
>
> Best,
>
> Feng
>
>
>
> On Thu, Sep 14, 2023 at 7:27 PM Kamal Mittal via user <
> user@flink.apache.org> wrote:
>
> Hello,
>
>
>
> Tried parquet file creation with file sink bulk writer.
>
>
>
> If configured parquet page size as low as 1 byte (allowed configuration)
> then flink keeps on creating multiple ‘in-progress’ state files and with
> content only as ‘PAR1’ and never closed the file.
>
>
>
> I want to know what is the reason of not closing the file and creating
> multiple ‘in-progress’ part files or why no error is given if applicable?
>
>
>
> Rgds,
>
> Kamal
>
>


Re: About Flink parquet format

2023-09-20 Thread Feng Jin
Hi

I tested it on my side and also got the same error. This should be a
limitation of Parquet.

```
java.lang.IllegalArgumentException: maxCapacityHint can't be less than
initialSlabSize 64 1
at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:57)
~[flink-sql-parquet-1.17.1.jar:1.17.1]
at org.apache.parquet.bytes.CapacityByteArrayOutputStream.(
CapacityByteArrayOutputStream.java:153) ~[flink-sql-parquet-1.17.1.jar:1.17.
1]
at org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder
.(RunLengthBitPackingHybridEncoder.jav
```


So I think the current minimum page size that can be set for parquet is 64B.

Best,
Feng


On Tue, Sep 19, 2023 at 6:06 PM Kamal Mittal 
wrote:

> Hello,
>
>
>
> If given page size as 1 byte then encountered exception as  -
> ‘maxCapacityHint can't be less than initialSlabSize %d %d’.
>
>
>
> This is coming from class CapacityByteArrayOutputStream and contained in
> parquet-common library.
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Feng Jin 
> *Sent:* 19 September 2023 01:01 PM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: About Flink parquet format
>
>
>
> Hi Kamal
>
>
>
> What exception did you encounter? I have tested it locally and it works
> fine.
>
>
>
>
>
> Best,
>
> Feng
>
>
>
>
>
> On Mon, Sep 18, 2023 at 11:04 AM Kamal Mittal 
> wrote:
>
> Hello,
>
>
>
> Checkpointing is enabled and works fine if configured parquet page size is
> at least 64 bytes as otherwise there is exception thrown at back-end.
>
>
>
> Looks to be an issue which is not handled by file sink bulk writer?
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Feng Jin 
> *Sent:* 15 September 2023 04:14 PM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: About Flink parquet format
>
>
>
> Hi Kamal
>
>
>
> Check if the checkpoint of the task is enabled and triggered correctly. By
> default, write parquet files will roll a new file when checkpointing.
>
>
>
>
>
> Best,
>
> Feng
>
>
>
> On Thu, Sep 14, 2023 at 7:27 PM Kamal Mittal via user <
> user@flink.apache.org> wrote:
>
> Hello,
>
>
>
> Tried parquet file creation with file sink bulk writer.
>
>
>
> If configured parquet page size as low as 1 byte (allowed configuration)
> then flink keeps on creating multiple ‘in-progress’ state files and with
> content only as ‘PAR1’ and never closed the file.
>
>
>
> I want to know what is the reason of not closing the file and creating
> multiple ‘in-progress’ part files or why no error is given if applicable?
>
>
>
> Rgds,
>
> Kamal
>
>


Re: About Flink parquet format

2023-09-19 Thread Feng Jin
Hi Kamal

What exception did you encounter? I have tested it locally and it works
fine.


Best,
Feng


On Mon, Sep 18, 2023 at 11:04 AM Kamal Mittal 
wrote:

> Hello,
>
>
>
> Checkpointing is enabled and works fine if configured parquet page size is
> at least 64 bytes as otherwise there is exception thrown at back-end.
>
>
>
> Looks to be an issue which is not handled by file sink bulk writer?
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Feng Jin 
> *Sent:* 15 September 2023 04:14 PM
> *To:* Kamal Mittal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: About Flink parquet format
>
>
>
> Hi Kamal
>
>
>
> Check if the checkpoint of the task is enabled and triggered correctly. By
> default, write parquet files will roll a new file when checkpointing.
>
>
>
>
>
> Best,
>
> Feng
>
>
>
> On Thu, Sep 14, 2023 at 7:27 PM Kamal Mittal via user <
> user@flink.apache.org> wrote:
>
> Hello,
>
>
>
> Tried parquet file creation with file sink bulk writer.
>
>
>
> If configured parquet page size as low as 1 byte (allowed configuration)
> then flink keeps on creating multiple ‘in-progress’ state files and with
> content only as ‘PAR1’ and never closed the file.
>
>
>
> I want to know what is the reason of not closing the file and creating
> multiple ‘in-progress’ part files or why no error is given if applicable?
>
>
>
> Rgds,
>
> Kamal
>
>


Re: About Flink parquet format

2023-09-15 Thread Feng Jin
Hi Kamal

Check if the checkpoint of the task is enabled and triggered correctly. By
default, write parquet files will roll a new file when checkpointing.


Best,
Feng

On Thu, Sep 14, 2023 at 7:27 PM Kamal Mittal via user 
wrote:

> Hello,
>
>
>
> Tried parquet file creation with file sink bulk writer.
>
>
>
> If configured parquet page size as low as 1 byte (allowed configuration)
> then flink keeps on creating multiple ‘in-progress’ state files and with
> content only as ‘PAR1’ and never closed the file.
>
>
>
> I want to know what is the reason of not closing the file and creating
> multiple ‘in-progress’ part files or why no error is given if applicable?
>
>
>
> Rgds,
>
> Kamal
>


Re: e2e tests with flink

2023-09-11 Thread Feng Jin
Hi Oscar

You can refer to the unit tests in flink-connector-kafka.

https://github.com/apache/flink-connector-kafka/blob/d6525c1481fc2d2821f361d2d5ce48f97221dd74/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java#L152


Best,
Feng

On Mon, Sep 11, 2023 at 5:39 PM Oscar Perez via user 
wrote:

> Hi,
>
> I have a flink job which I want to test e2e.
>
> In the test I start flink minicluster and this reads from kafka topics in
> testcontainers. I m facing a problem that for some topics I have starting
> offset as latest and I want to publish these messages just after the job
> has been completely started, so that these messages can be read
>
> Is there a clean solution to send the payment event after the job has been
> started? Currently I m using Thread.sleep for that but I would like to
> await on something but dont know what would be the trigger for that
>
> Regards,
> Oscar
>
>
>


Re: [DISCUSS][FLINK-31788][FLINK-33015] Add back Support emitUpdateWithRetract for TableAggregateFunction

2023-09-10 Thread Feng Jin
Thanks Jane for following up on this issue!

+1 for adding it back first.

Supporting emitUpdateWithRetract for TableAggregateFunction is a good
feature, we should support it unless there are better alternatives.


Best,
Feng

On Thu, Sep 7, 2023 at 11:01 PM Lincoln Lee  wrote:

> Thanks to Jane for following up on this issue!  +1 for adding it back
> first.
>
> For the deprecation, considering that users aren't usually motivated to
> upgrade to a major version (1.14, from two years ago, wasn't that old,
> which may be
> part of the reason for not receiving more feedback), I'd recommend holding
> off on removing `TableAggregateFunction` until we have a replacement for
> it,
> e.g., user-defined-operator as Jark mentioned or something else.
>
> Best,
> Lincoln Lee
>
>
> Best,
> Lincoln Lee
>
>
> Jark Wu  于2023年9月7日周四 21:30写道:
>
> > +1 to fix it first.
> >
> > I also agree to deprecate it if there are few people using it,
> > but this should be another discussion thread within dev+user ML.
> >
> > In the future, we are planning to introduce user-defined-operator
> > based on the TVF functionality which I think can fully subsume
> > the UDTAG, cc @Timo Walther .
> >
> > Best,
> > Jark
> >
> > On Thu, 7 Sept 2023 at 11:44, Jane Chan  wrote:
> >
> > > Hi devs,
> > >
> > > Recently, we noticed an issue regarding a feature regression related to
> > > Table API. `org.apache.flink.table.functions.TableAggregateFunction`
> > > provides an API `emitUpdateWithRetract` [1] to cope with updated
> values,
> > > but it's not being called in the code generator. As a result, even if
> > users
> > > override this method, it does not work as intended.
> > >
> > > This issue has been present since version 1.15 (when the old planner
> was
> > > deprecated), but surprisingly, only two users have raised concerns
> about
> > it
> > > [2][3].
> > >
> > > So, I would like to initiate a discussion to bring it back. Of course,
> if
> > > few users use it, we can also consider deprecating it.
> > >
> > > [1]
> > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#retraction-example
> > > [2] https://lists.apache.org/thread/rnvw8k3636dqhdttpmf1c9colbpw9svp
> > > [3]
> https://www.mail-archive.com/user-zh@flink.apache.org/msg15230.html
> > >
> > > Best,
> > > Jane
> > >
> >
>


Re: Watermarks

2023-09-09 Thread Feng Jin
hi Sid


1. You can customize KafkaDeserializationSchema[1], in the `deserialize`
method, you can obtain the Kafka event time.

2.  I don't think it's necessary to explicitly mention the watermark
strategy.


[1].
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.html
[2].
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#the-deserializationschema


best,
Feng


On Sat, Sep 9, 2023 at 7:25 PM Sid  wrote:

> Hello experts,
>
> My source is Kafka and I am trying to generate records for which I have
> FlinkKafkaConsumer class.
>
> Now my first question is how to consume an event timestamp for the records
> generated.
> I know for a fact that for CLI, there is one property called
> *print.timestamp=true* which gives you epoch creation time but not sure
> how to do it programmatically.
>
> The second question is since my processing won't be based on the event
> timestamp (but there is a need to append it to the records) do I need to
> explicitly mention WatermarkStrategy.noWatermarks()?
>
> TIA,
> P
>


Re: Help needed on stack overflow query

2023-09-06 Thread Feng Jin
Hi Nihar,
Have you tried using the following configuration:

metrics.reporter.my_reporter.filter.includes: jobmanager:*:*;taskmanager:*:*

Please note that the default delimiter for the List parameter in Flink is
";".

Best regards,
Feng

On Thu, Aug 24, 2023 at 11:36 PM Nihar Rao  wrote:

> Hello,
> Creating a new question for this query
>  as I
> am not able to reply to the post.
>
> Can anyone help with the below query
>
>
> https://stackoverflow.com/questions/76949195/how-to-include-multiple-filters-in-filter-includes-parameter-for-my-flink-metric
>
> Thanks
>


Re: Send data asynchronously to a 3rd party via SinkFunction

2023-09-01 Thread Feng Jin
hi, patricia

I suggest using the generic asynchronous base sink.

https://flink.apache.org/2022/03/16/the-generic-asynchronous-base-sink/


Best,
Feng

On Fri, Sep 1, 2023 at 6:07 PM patricia lee  wrote:

>
> I'd like to ask if there is a way to send data to a vendor (SDK plugin,
> which is also an HTTP request) asynchronously in flink 1.17?
>
> After transformation on the data, I usually collate them as a List to my
> custom SinkFunction. I initialized a CompleteableFuture inside the invoke()
> method. However I read about the Async I/O from the documentation but I
> couldn't figure out how to use it in my use case.
>
>
> How can I close the resources initialized in SinkFunction properly? e.g.
> the job failed.
> Is using completableFuture inside SinkFunction a good approach?
>
>
> Regards,
> Pat
>
>


Re: flink-metrics如何获取applicationid

2023-08-30 Thread Feng Jin
hi,

可以尝试获取下 _APP_ID  这个 JVM 环境变量.
System.getenv(YarnConfigKeys.ENV_APP_ID);

https://github.com/apache/flink/blob/6c9bb3716a3a92f3b5326558c6238432c669556d/flink-yarn/src/main/java/org/apache/flink/yarn/YarnConfigKeys.java#L28


Best,
Feng

On Wed, Aug 30, 2023 at 7:14 PM allanqinjy  wrote:

> hi,
>请教大家一个问题,就是在上报指标到prometheus时候,jobname会随机生成一个后缀,看源码也是new Abstract
> ID(),有方法在这里获取本次上报的作业applicationid吗?


Re: flink sql语句转成底层处理函数

2023-08-28 Thread Feng Jin
Loglevel 设置为 debug 之后,可以看到具体的 codegen 的代码。

On Mon, Aug 28, 2023 at 1:25 PM 海风 <18751805...@163.com> wrote:

> 嗯,执行计划确实可以看到一些信息,只是还想知道是否还有比较好的方式能看具体有哪些底层函数以及状态,从而更方便去分析性能相关问题的
>
>
>
>  回复的原邮件 
> | 发件人 | Shammon FY |
> | 日期 | 2023年08月28日 12:05 |
> | 收件人 | user-zh@flink.apache.org |
> | 抄送至 | |
> | 主题 | Re: flink sql语句转成底层处理函数 |
> 如果想看一个sql被转换后包含哪些具体执行步骤,可以通过explain语法[1]查看执行计划
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/explain/
>
> On Sun, Aug 27, 2023 at 5:23 PM 海风 <18751805...@163.com> wrote:
>
> > 请教下,是否可以去查询一个flink
> > sql提交运行后,flink给它转成的底层处理函数到底是什么样的,假如涉及状态计算,flink给这个sql定义的状态变量是哪些呢?
> >
> >
> >
>


Re: Flink AVRO to Parquet writer - Row group size/Page size

2023-08-19 Thread Feng Jin
Hi Kamal

I think it depends on the amount of data and the usage scenario. If your
checkpoint interval is very long, multiple row groups may be written within
one checkpoint cycle, so configuring the row group size makes sense.
However, if there are short intervals between checkpoints or if you have a
small amount of data, there is indeed not much need to configure the row
group size.


Best,
Feng

On Sat, Aug 19, 2023 at 7:00 PM Kamal Mittal via user 
wrote:

> Hello Community,
>
>
>
> Please help me to find out inputs for below query.
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Kamal Mittal via user 
> *Sent:* 18 August 2023 08:04 AM
> *To:* user@flink.apache.org
> *Subject:* RE: Flink AVRO to Parquet writer - Row group size/Page size
>
>
>
> Hello Community,
>
>
>
> Please share views for below.
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Kamal Mittal 
> *Sent:* 17 August 2023 08:01 AM
> *To:* Kamal Mittal ; user@flink.apache.org
> *Subject:* RE: Flink AVRO to Parquet writer - Row group size/Page size
>
>
>
> Hello Community,
>
>
>
> Please share views for below.
>
>
>
> Rgds,
>
> Kamal
>
>
>
> *From:* Kamal Mittal via user 
> *Sent:* 16 August 2023 04:35 PM
> *To:* user@flink.apache.org
> *Subject:* Flink AVRO to Parquet writer - Row group size/Page size
>
>
>
> Hello,
>
>
>
> For Parquet, default row group size is 128 MB and Page size is 1MB but
> Flink Bulk writer using file sink create the files based on checkpointing
> interval only.
>
>
>
> So is there any significance of configured row group size and page size
> for Flink parquet bulk writer? How Flink uses these two values with
> checkpointing interval?
>
>
>
> Rgds,
>
> Kamal
>


Re: 建议Flink ROWKIND做成元数据metadata

2023-07-18 Thread Feng Jin
Hi casel

之前有类似的讨论, 不过暴露 ROWKIND 之后可能可以会造成 SQL 语义上的不明确,你可以在 dev 邮件在发起讨论看看,看看大家的想法。

https://issues.apache.org/jira/browse/FLINK-24547

Best,
Feng

On Wed, Jul 19, 2023 at 12:06 AM casel.chen  wrote:

> 社区无人响应吗?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-07-15 12:19:46,"casel.chen"  写道:
> >Flink社区能否考虑将ROWKIND做成元数据metadata,类似于kafka
> connector中的offset和partition等,用户可以使用这些ROWKIND
> metadata进行处理,例如对特定ROWKIND过滤或者答输出数据中添加ROWKIND字段
>


Re: Re: flink sql作业指标名称过长把prometheus内存打爆问题

2023-06-14 Thread Feng Jin
org___AS_huifuSecOrg__CASE__huifu_thd_org_IS_NULL_OR__TRIM_FLAG_BOTHUTF_16LE_huifu_thd_org_UTF_16LE__:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE___OR__TRIM_FLAG_BOTHUTF_16LE_huifu_thd_org_UTF_16LE_null_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE__UTF_16LE_defalut_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE___TRIM_FLAG_BOTHUTF_16LE_huifu_thd_org___AS_huifuThdOrg__CASE__huifu_for_org_IS_NULL_OR__TRIM_FLAG_BOTHUTF_16LE_huifu_for_org_UTF_16LE__:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE___OR__TRIM_FLAG_BOTHUTF_16LE_huifu_for_org_UTF_16LE_null_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE__UTF_16LE_defalut_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE___TRIM_FLAG_BOTHUTF_16LE_huifu_for_org___AS_huifuForOrg__CASE__huifu_sales_sub_IS_NULL_OR__TRIM_FLAG_BOTHUTF_16LE_huifu_sales_sub_UTF_16LE__:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE___OR__TRIM_FLAG_BOTHUTF_16LE_huifu_sales_sub_UTF_16LE_null_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE__UTF_16LE_defalut_:VARCHAR_2147483647__CHARACTER_SET__UTF_16LE___TRIM_FLAG_BOTHUTF_16LE_huifu_sales_sub___AS_huifuSales__DATE_FORMAT_CAST_LOCALTIMESTAMP__UTF_16LE_MMddHHmmssSSS___AS_synModifyTime__CAST_CURRENT_TIMESTAMPAS_synTtlDate__NotNullEnforcer_fields__serviceId__Sink:_Sink_table__hive_default_mongodb_active_channel_sink___fields__transDate__serviceId__huifuFstOrg__huifuSecOrg__huifuThdOrg__huifuForOrg__huifuSales__synModifyTime__synTtlDate__",task_attempt_num="1",job_name="tb_top_top_trans_order_binlog2mongo",tm_id="tb_top_top_trans_order_binlog2mongo_taskmanager_1_112",subtask_index="35",}
> 73144.0
>
>
>
>
>
> 在 2023-06-13 10:13:17,"Feng Jin"  写道:
> >hi casel
> >
> >1. 可以考虑使用 Flink1.15, 使用精简的 operator name
> >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/config/#table-exec-simplify-operator-name-enabled
> >
> >2.  Flink 也提供了 restful 接口直接获取瞬时的 metric,如果不需要历史的 metric
> >
> >
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobmanager-metrics
> >
> >
> >Best,
> >Feng
> >
> >On Tue, Jun 13, 2023 at 8:51 AM casel.chen  wrote:
> >
> >> 线上跑了200多个flink
> >>
> sql作业,接了prometheus指标(prometheus定期来获取作业指标)监控后没跑一会儿就将prometheus内存打爆(开了64GB内存),查了一下是因为指标名称过长导致的。
> >> flink
> >>
> sql作业的指标名称一般是作业名称+算子名称组成的,而算子名称是由sql内容拼出来的,在select字段比较多或sql较复杂的情况下容易生成过长的名称,
> >> 请问这个问题有什么好的办法解决吗?
>


Re: flink sql作业指标名称过长把prometheus内存打爆问题

2023-06-12 Thread Feng Jin
hi casel

1. 可以考虑使用 Flink1.15, 使用精简的 operator name

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/config/#table-exec-simplify-operator-name-enabled

2.  Flink 也提供了 restful 接口直接获取瞬时的 metric,如果不需要历史的 metric

https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobmanager-metrics


Best,
Feng

On Tue, Jun 13, 2023 at 8:51 AM casel.chen  wrote:

> 线上跑了200多个flink
> sql作业,接了prometheus指标(prometheus定期来获取作业指标)监控后没跑一会儿就将prometheus内存打爆(开了64GB内存),查了一下是因为指标名称过长导致的。
> flink
> sql作业的指标名称一般是作业名称+算子名称组成的,而算子名称是由sql内容拼出来的,在select字段比较多或sql较复杂的情况下容易生成过长的名称,
> 请问这个问题有什么好的办法解决吗?


Re: Flink RocketMQ Connector

2023-05-26 Thread Feng Jin
hi casel

Flink RocketMQ connector 是由 RockeMQ 社区维护的, 对应的项目地址是:
https://github.com/apache/rocketmq-flink  这个版本默认的消息是格式 DELIMIT 格式(默认消息是
String,按分隔符进行分割), 只能指定消息的列分隔符.


best,
feng


On Fri, May 26, 2023 at 7:44 PM casel.chen  wrote:

> 有没有Flink RocketMQ官方连接器? 需要自己开发吗?Flink生态组件网址(用户上传自己开发的连接器格式什么的)是什么?


Re: flink datastream api写的代码如何在idea中调试

2023-04-22 Thread Feng Jin
如果你是要本地 idea debug 线上的作业,需要在 taskManager 的 JVM 参数中开启debug

提交作业时, 添加参数:
env.java.opts.taskmanager="-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"



然后在 idea 创建一个 remote debug 启动项,连接到线上的 TaskManager 所在的机器 IP 即可。之后即可在在 idea
中打断点,或者截取执行栈
*(前提是你本地的机器和线上的机器网络是互通的)*

参考:
https://www.jetbrains.com/help/idea/tutorial-remote-debug.html#174f812f

---
Best,
Feng Jin

On Sat, Apr 22, 2023 at 10:04 PM m18751805115_1 <18751805...@163.com> wrote:

> 抱歉啊,可能我没有把问题描述清楚。我是想本地对代码进行debug调试,观察每条流输入后的变量值以及调用栈等信息的。
>
>
>
> ---- 回复的原邮件 
> | 发件人 | Feng Jin |
> | 日期 | 2023年04月22日 21:53 |
> | 收件人 | user-zh@flink.apache.org |
> | 抄送至 | |
> | 主题 | Re: flink datastream api写的代码如何在idea中调试 |
> 支持的, 在 idea 中执行 main 函数即可.执行前,idea 中的运行配置中,最好勾选上: *Include dependencies
> with "Provided" scope *否则有可能会有 class not found 的报错.
>
>
> 
> Best,
> Feng Jin
>
> On Sat, Apr 22, 2023 at 9:28 PM m18751805115_1 <18751805...@163.com>
> wrote:
>
> > 请教一下,在idea中用flink datastream
> >
> api写的代码,source输入是一条一条socket流数据,那如何在本地idea中进行调试,观察每条输入数据的运行情况,idea是否支持这种调试?
> >
> >
> >
>


Re: flink datastream api写的代码如何在idea中调试

2023-04-22 Thread Feng Jin
支持的, 在 idea 中执行 main 函数即可.执行前,idea 中的运行配置中,最好勾选上: *Include dependencies
with "Provided" scope *否则有可能会有 class not found 的报错.


----
Best,
Feng Jin

On Sat, Apr 22, 2023 at 9:28 PM m18751805115_1 <18751805...@163.com> wrote:

> 请教一下,在idea中用flink datastream
> api写的代码,source输入是一条一条socket流数据,那如何在本地idea中进行调试,观察每条输入数据的运行情况,idea是否支持这种调试?
>
>
>


Re: emitValueWithRetract issue

2023-04-12 Thread Feng Jin
hi Adam

As far as I know, there is currently no similar API available,
but I believe that this feature was accidentally removed and we should add
it back.
I have created a Jira to track the progress of this feature.
https://issues.apache.org/jira/browse/FLINK-31788



On Tue, Apr 11, 2023 at 12:10 AM Adam Augusta  wrote:

> Many thanks for the sanity check, Feng.
>
> It’s a shame this well-documented feature was silently removed.
> emitValue() creates an unreasonable amount of unnecessary and disruptive
> chatter on the changelog stream, as evidenced by putting a print table
> after the flatAggregate. Lots of -D/+I RowData pairs with identical fields.
>
> Is there any clean way to set up a stateful group aggregation in the 1.18
> Table API that doesn’t misbehave in this fashion?
>
> On Mon, Apr 10, 2023 at 11:43 AM Feng Jin  wrote:
>
>> hi Adam
>>
>> I have checked the code and indeed this feature is not available in the
>> latest version of Flink code.
>>
>> This feature was originally implemented in the old planner:
>> <https://github.com/apache/flink/pull/8550/files>
>> https://github.com/apache/flink/pull/8550/files
>>
>> However, this logic was not implemented in the new planner , the Blink
>> planner.
>>
>> With the removal of the old planner in version 1.14
>> https://github.com/apache/flink/pull/16080 , this code was also removed.
>>
>>
>>
>> Best
>>
>> Feng
>>
>> On Sat, Apr 8, 2023 at 4:17 AM Adam Augusta  wrote:
>>
>>> The TableAggregateFunction javadocs indicate that either "emitValue" or
>>> "emitUpdateWithRetract" is required.
>>>
>>> But if I implement my TableAggregateFunction with
>>> "emitUpdateWithRetract", I get a validation error. If I implement both
>>> methods it works, but emitUpdateWithRetract is not used.
>>>
>>> Peering into the Flink source code, I see that
>>> ImperativeAggCodeGen validates the presence of emitValue, but is agnostic
>>> to emitUpdateWithRetract.
>>> More curiously, Flink's source code doesn't have a single test with a
>>> TableAggregateFunction that uses emitUpdateWithRetract.
>>>
>>> Is this a ghost feature?
>>>
>>> Thanks,
>>> Adam
>>>
>>


Re: emitValueWithRetract issue

2023-04-10 Thread Feng Jin
hi Adam

I have checked the code and indeed this feature is not available in the
latest version of Flink code.

This feature was originally implemented in the old planner:

https://github.com/apache/flink/pull/8550/files

However, this logic was not implemented in the new planner , the Blink
planner.

With the removal of the old planner in version 1.14
https://github.com/apache/flink/pull/16080 , this code was also removed.



Best

Feng

On Sat, Apr 8, 2023 at 4:17 AM Adam Augusta  wrote:

> The TableAggregateFunction javadocs indicate that either "emitValue" or
> "emitUpdateWithRetract" is required.
>
> But if I implement my TableAggregateFunction with "emitUpdateWithRetract",
> I get a validation error. If I implement both methods it works, but
> emitUpdateWithRetract is not used.
>
> Peering into the Flink source code, I see that
> ImperativeAggCodeGen validates the presence of emitValue, but is agnostic
> to emitUpdateWithRetract.
> More curiously, Flink's source code doesn't have a single test with a
> TableAggregateFunction that uses emitUpdateWithRetract.
>
> Is this a ghost feature?
>
> Thanks,
> Adam
>


Re: Quick question about flink document.

2023-04-09 Thread Feng Jin
Hi Dongwoo


This can be quite confusing.
Before Flink 1.13, Flink's statebackend was actually a hybrid concept that
included three types of statebackends:
*MemoryStateBackend*, *FsStateBackend*, and *RocksDBStateBackend*.

The default *MemoryStateBackend* uses heap as the backend, and the state is
stored in jobManger.


You can refer to this migration document for more information:
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#migrating-from-legacy-backends
.


Best
Feng

On Sun, Apr 9, 2023 at 10:23 PM Dongwoo Kim  wrote:

> Hi community, I’m new to flink and trying to learn about the concepts of
> flink to prepare migrating heron application to flink.
> I have a quick question about this flink document.
> (
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/stateful-stream-processing/#snapshotting-operator-state
> )
>
> What I understood is states are stored in configured state backend which
> can be either task manager’s heap or rocksdb.
> And snapshots of checkpoint is stored by default in job manager’s heap and
> mostly in distributed file system.
> But in the document it says like below and it is confusing to me. Isn’t
> the second line talking about checkpoint storage or checkpoint backend? Not
> state backend? Thanks in advance, enjoy your weekend!
>
> *"Because the state of a snapshot may be large, it is stored in a
> configurable state backend
> .
> By default, this is the JobManager’s memory, but for production use a
> distributed reliable storage should be configured (such as HDFS)” *
>