Re: flink sql平台多版本支持问题

2021-06-12 Thread casel.chen



好的,我先尝试使用一下,谢谢!














在 2021-06-13 10:43:12,"Jeff Zhang"  写道:
>如果不是native k8s的话,现在已经支持了,用remote模式就可以,
>https://www.yuque.com/jeffzhangjianfeng/gldg8w/engh3w
>native k8s的话,社区正在做,这是PR: https://github.com/apache/zeppelin/pull/4116
>
>
>casel.chen  于2021年6月13日周日 上午9:39写道:
>
>> 嗯,flink on zeppelin也是一个不错的选择,只是因为现在flink on
>> zeppelin还不支持作业运行在kubernetes上,所以暂时还无法直接使用,未来支持后可以考虑引入。
>> 谢谢大佬给的建议。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2021-06-13 07:21:46,"Jeff Zhang"  写道:
>> >另外一个选择是flink on zeppelin,可以调用flink on zeppelin的rest api,把zeppelin当做是flink
>> >job server, zeppelin天然支持flink 1.10之后的所有版本。钉钉群:32803524
>> >
>> >casel.chen  于2021年6月12日周六 下午5:56写道:
>> >
>> >> 需求背景:
>> >> 因为不同Flink版本SQL底层实现不相同,同一个带状态的Flink SQL作业目前跨版本升级社区还不支持。所以如果基于Flink
>> >> SQL开发实时计算平台的话,平台需要支持不同Flink版本。早期的Flink SQL作业用的是1.11,最新的Flink
>> >> SQL作业采用的是1.13开发的。
>> >>
>> >>
>> >> 而让平台支持不同Flink版本,我能想到有三种实现方案:
>> >>
>> >>
>> >> 1. 平台直接调用 flink run 或 flink run-application 提交作业
>> >> 优点:实现简单,每个flink版本都会带这个shell脚本
>> >>
>> >>
>> 缺点:受限于脚本提供的功能,像语法检查、格式化sql、获取sql执行计划及sql结构和血缘这些“高级”功能没有在官方脚本中提供(能否完善下呢?),如果要使用的话得自己扩展实现(同样需要自己维护多版本)
>> >>
>> >>
>> >> 2. 平台调用 flink sql gateway 暴露的 restful api 提交作业
>> >> 优点:实现简单,通过gateway屏蔽上层应用调用多版本flink api问题
>> >> 缺点:需要额外扩展开发和部署gateway,目前暂不清楚社区对gateway的定位是什么?只是配合flink jdbc
>> >> driver使用么?这两项目目前活跃度都不高
>> >>
>> >>
>> >> 3. 需要维护多个平台版本,像 flink-sql-connector-hive 支持多版本hive那样
>> >> 优点:现有平台就是基于特定版本flink api开发的,改动小,需要抽取公用接口
>> >> 缺点:需要“复制”平台多份实现以支持不同flink版本,开发和维护量比较大
>> >>
>> >>
>> >> 目前,我暂时prefer方案2,可能不对,也可能还有更优的我没想到,欢迎各位大佬们给点建议。
>> >>
>> >>
>> >
>> >--
>> >Best Regards
>> >
>> >Jeff Zhang
>>
>
>
>-- 
>Best Regards
>
>Jeff Zhang


Re: Re: flink sql平台多版本支持问题

2021-06-12 Thread Jeff Zhang
如果不是native k8s的话,现在已经支持了,用remote模式就可以,
https://www.yuque.com/jeffzhangjianfeng/gldg8w/engh3w
native k8s的话,社区正在做,这是PR: https://github.com/apache/zeppelin/pull/4116


casel.chen  于2021年6月13日周日 上午9:39写道:

> 嗯,flink on zeppelin也是一个不错的选择,只是因为现在flink on
> zeppelin还不支持作业运行在kubernetes上,所以暂时还无法直接使用,未来支持后可以考虑引入。
> 谢谢大佬给的建议。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-06-13 07:21:46,"Jeff Zhang"  写道:
> >另外一个选择是flink on zeppelin,可以调用flink on zeppelin的rest api,把zeppelin当做是flink
> >job server, zeppelin天然支持flink 1.10之后的所有版本。钉钉群:32803524
> >
> >casel.chen  于2021年6月12日周六 下午5:56写道:
> >
> >> 需求背景:
> >> 因为不同Flink版本SQL底层实现不相同,同一个带状态的Flink SQL作业目前跨版本升级社区还不支持。所以如果基于Flink
> >> SQL开发实时计算平台的话,平台需要支持不同Flink版本。早期的Flink SQL作业用的是1.11,最新的Flink
> >> SQL作业采用的是1.13开发的。
> >>
> >>
> >> 而让平台支持不同Flink版本,我能想到有三种实现方案:
> >>
> >>
> >> 1. 平台直接调用 flink run 或 flink run-application 提交作业
> >> 优点:实现简单,每个flink版本都会带这个shell脚本
> >>
> >>
> 缺点:受限于脚本提供的功能,像语法检查、格式化sql、获取sql执行计划及sql结构和血缘这些“高级”功能没有在官方脚本中提供(能否完善下呢?),如果要使用的话得自己扩展实现(同样需要自己维护多版本)
> >>
> >>
> >> 2. 平台调用 flink sql gateway 暴露的 restful api 提交作业
> >> 优点:实现简单,通过gateway屏蔽上层应用调用多版本flink api问题
> >> 缺点:需要额外扩展开发和部署gateway,目前暂不清楚社区对gateway的定位是什么?只是配合flink jdbc
> >> driver使用么?这两项目目前活跃度都不高
> >>
> >>
> >> 3. 需要维护多个平台版本,像 flink-sql-connector-hive 支持多版本hive那样
> >> 优点:现有平台就是基于特定版本flink api开发的,改动小,需要抽取公用接口
> >> 缺点:需要“复制”平台多份实现以支持不同flink版本,开发和维护量比较大
> >>
> >>
> >> 目前,我暂时prefer方案2,可能不对,也可能还有更优的我没想到,欢迎各位大佬们给点建议。
> >>
> >>
> >
> >--
> >Best Regards
> >
> >Jeff Zhang
>


-- 
Best Regards

Jeff Zhang


Re:Re: flink sql cdc数据同步至mysql

2021-06-12 Thread casel.chen
请问 flink sql cdc 场景下如何增大下游sink端并行度?
我试了修改default.parallism=2参数,并且将operator chain参数设置成false,并没有效果。
而后,我将作业分成两步:首先 源mysql cdc sink到 upsert kafka,再从 upsert kafka sink到 
目标mysql。是想通过kafka partition增大sink并行度
初步测试效果是可以的,kafka建了3个partitions,每个partitions都按主键hash分配到数据,下游并行度跟partitions个数对齐。


以下是作业内容:


-- source
CREATE TABLE mysql_old_order_table
(
order_number BIGINT,
priceDECIMAL,
order_time   TIMESTAMP(3),
PRIMARY KEY (order_number) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'flink-test',
'table-name' = 'old_order'
);

-- sink
CREATE TABLE kafka_order_table
(
order_number BIGINT,
priceDECIMAL,
order_time   TIMESTAMP(3),
PRIMARY KEY (order_number) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'order',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'value.format' = 'json'
);

-- insert
INSERT INTO kafka_order_table SELECT * FROM mysql_old_order_table;







-- source
CREATE TABLE kafka_order_table
(
order_number BIGINT,
priceDECIMAL,
order_time   TIMESTAMP(3),
PRIMARY KEY (order_number) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'order',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'value.format' = 'json'
);

-- sink
CREATE TABLE mysql_order_table
(
order_number BIGINT,
priceDECIMAL,
order_time   TIMESTAMP(3),
PRIMARY KEY (order_number) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/flink-test',
'table-name' = 'order',
'username' = 'root',
'password' = 'root',
'sink.buffer-flush.max-rows' = '3',
'sink.buffer-flush.interval' = '1s'
);

-- insert
INSERT INTO mysql_order_table SELECT * FROM kafka_order_table;





在 2021-06-08 19:49:40,"Leonard Xu"  写道:
>试着回答下这两个问题。
>
>> flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc 
>> connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决?
>是的,关键问题是cdc connector为了保证数据一致性只能单并发,所以作业也只能单并发。这个需要cdc 
>connector支持多并发读取,下游sink自然就能解决。
>
>
>> flink 1.13的jdbc connector新增 sink.parallism 
>> 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?
>
>这个不仅在同步场景,在其他场景也需要注意 sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 
>sink 上定义的pk 和上游数据shuffle的key(比如 group key, join key)保持一致,
>否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 
>https://issues.apache.org/jira/browse/FLINK-20374 
> 
>https://issues.apache.org/jira/browse/FLINK-22901 
> 
>
>祝好,
>Leonard


Re: Looking for online live training courses

2021-06-12 Thread Xia(Nate) Qu
Thanks Jing, I really appreciate your sharing.

多谢老铁!:)

Best,

*Xia(Nate) Qu*



On Thu, Jun 10, 2021 at 10:07 PM JING ZHANG  wrote:

> Hi XiaQu,
> Welcome to Flink community!
> I don't know if there are online interactive training courses yet, I would
> add it to the email later after I consult offline.
>
> I would like to list the most popular resources I know as follows, I hope
> it helps.
> Training Course
> 1. https://flink.apache.org/training.html
> 2.
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/learn-flink/overview/
>
> Training projects in github
> 1. https://github.com/apache/flink-playgrounds
> 2. https://github.com/apache/flink-training
>
> History Flink Forward Conference video
> 1. https://www.flink-forward.org/
>
> If you could understand chinese, the following resource could also help
> you,
> 1. https://flink-learning.org.cn/
> 2. https://github.com/flink-china/flink-training-course
> 3.
> https://space.bilibili.com/33807709?spm_id_from=333.788.b_765f7570696e666f.2
> 4.
> https://developer.aliyun.com/group/sc?spm=a2c6h.12883283.1377930.10.5f1f201crBKprD#/?_k=3ahyn2
>
> Best regards,
> JING ZHANG
>
> Xia(Nate) Qu  于2021年6月11日周五 上午9:33写道:
>
>> Hi all,
>>
>> My team is planning to start our journey of Apache Flink, was wondering
>> if there are any professional training courses (online interactive at this
>> moment) recommended? Thanks
>>
>>
>> Best,
>>
>> *Xia(Nate) Qu*
>>
>>


Re: Looking for online live training courses

2021-06-12 Thread Xia(Nate) Qu
Hi BB,

Thanks for your suggestion, I will take a look.

I was hoping there are customized training services for flink like Kafka or
Spark since out team had some video courses already and would like to dive
into some specific topics and ask some questions.


Best,

*Xia(Nate) Qu*



On Fri, Jun 11, 2021 at 9:39 AM B.B.  wrote:

> There are some beginner courses on Pluralsight. Just look for those with
> newer dates.
>
> BR,
> BB
>
> On Fri, 11 Jun 2021 at 03:33, Xia(Nate) Qu  wrote:
>
>> Hi all,
>>
>> My team is planning to start our journey of Apache Flink, was wondering
>> if there are any professional training courses (online interactive at this
>> moment) recommended? Thanks
>>
>>
>> Best,
>>
>> *Xia(Nate) Qu*
>>
>> --
> Everybody wants to be a winner
> Nobody wants to lose their game
> Its insane for me
> Its insane for you
> Its insane
>


Re:Re: flink sql平台多版本支持问题

2021-06-12 Thread casel.chen
嗯,flink on zeppelin也是一个不错的选择,只是因为现在flink on 
zeppelin还不支持作业运行在kubernetes上,所以暂时还无法直接使用,未来支持后可以考虑引入。
谢谢大佬给的建议。

















在 2021-06-13 07:21:46,"Jeff Zhang"  写道:
>另外一个选择是flink on zeppelin,可以调用flink on zeppelin的rest api,把zeppelin当做是flink
>job server, zeppelin天然支持flink 1.10之后的所有版本。钉钉群:32803524
>
>casel.chen  于2021年6月12日周六 下午5:56写道:
>
>> 需求背景:
>> 因为不同Flink版本SQL底层实现不相同,同一个带状态的Flink SQL作业目前跨版本升级社区还不支持。所以如果基于Flink
>> SQL开发实时计算平台的话,平台需要支持不同Flink版本。早期的Flink SQL作业用的是1.11,最新的Flink
>> SQL作业采用的是1.13开发的。
>>
>>
>> 而让平台支持不同Flink版本,我能想到有三种实现方案:
>>
>>
>> 1. 平台直接调用 flink run 或 flink run-application 提交作业
>> 优点:实现简单,每个flink版本都会带这个shell脚本
>>
>> 缺点:受限于脚本提供的功能,像语法检查、格式化sql、获取sql执行计划及sql结构和血缘这些“高级”功能没有在官方脚本中提供(能否完善下呢?),如果要使用的话得自己扩展实现(同样需要自己维护多版本)
>>
>>
>> 2. 平台调用 flink sql gateway 暴露的 restful api 提交作业
>> 优点:实现简单,通过gateway屏蔽上层应用调用多版本flink api问题
>> 缺点:需要额外扩展开发和部署gateway,目前暂不清楚社区对gateway的定位是什么?只是配合flink jdbc
>> driver使用么?这两项目目前活跃度都不高
>>
>>
>> 3. 需要维护多个平台版本,像 flink-sql-connector-hive 支持多版本hive那样
>> 优点:现有平台就是基于特定版本flink api开发的,改动小,需要抽取公用接口
>> 缺点:需要“复制”平台多份实现以支持不同flink版本,开发和维护量比较大
>>
>>
>> 目前,我暂时prefer方案2,可能不对,也可能还有更优的我没想到,欢迎各位大佬们给点建议。
>>
>>
>
>-- 
>Best Regards
>
>Jeff Zhang


Re:flink sql cdc数据同步至mysql

2021-06-12 Thread casel.chen
即使下游sink能加大并行度,也不能确保上游同一个PK记录会流入到同一个task,也就无法保证操作同一条记录的顺序能正确replay,不是么?

















在 2021-06-11 19:30:39,"东东"  写道:
>
>
>
>1、升级到1.13
>2、能不能追上要看写入量到底有多大,以及下游的处理能力啊,就是mysql自己的主从复制也不一定能确保追上,实践就知道了。
>3、可以设置一下default.parallism试试,如果发现被chain到一起了,可以把operator chain关掉试试。
>
>
>在 2021-06-11 18:57:36,"casel.chen"  写道:
>>我的场景就是简单的数据同步,没有join也没有group by,从一个mysql库同步到另一个mysql库。
>>上游写入数据速度很快,如果用flink sql同步的话默认只有一个并行度写,速度会跟不上,这种情况要怎么处理?
>>用的是flink 1.12.1 其jdbc connector还不支持sink.parallism参数
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2021-06-11 16:32:00,"东东"  写道:
>>>
>>>
>>>
>>>他这里列举的case是sink前发生了基于非pk的shuffle,比如说有join而且join条件不是主键,你的场景是怎样的呢?
>>>
>>>
>>>
>>>
>>>另外,https://issues.apache.org/jira/browse/FLINK-20374 是sink到es,但es 
>>>connector并不支持指定sink.parallelism,也就是说sink端的并行度必然为上游的并行度。而jdbc 
>>>connector是可以指定sink.parallelism的,只要与上游的并行度不一致,runtime就会根据pk做hash 
>>>shuffle,确保相同pk的记录发到同一个sink task。
>>>
>>>
>>>在 2021-06-11 15:57:29,"casel.chen"  写道:
引用 Leonard Xu大佬之前的回答:

> flink 1.13的jdbc connector新增 sink.parallism 
> 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?

这个不仅在同步场景,在其他场景也需要注意 
sink.parallism这个参数的使用,目前框架没有保证同一个pk的记录发送到同一个task,需要用户自己保证 sink 上定义的pk 
和上游数据shuffle的key(比如 group key, join key)保持一致,
否则可能导致数据乱序。 这个社区也在从 plan 推导上并解决,可以参考 
https://issues.apache.org/jira/browse/FLINK-20374 
 
https://issues.apache.org/jira/browse/FLINK-22901 
 

说明加 sink.parallelism 是不行的














在 2021-06-11 15:44:51,"JasonLee" <17610775...@163.com> 写道:
>hi
>
>sink 端可以通过 sink.parallelism 进行设置.
>
>
>
>-
>Best Wishes
>JasonLee
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql平台多版本支持问题

2021-06-12 Thread Jeff Zhang
另外一个选择是flink on zeppelin,可以调用flink on zeppelin的rest api,把zeppelin当做是flink
job server, zeppelin天然支持flink 1.10之后的所有版本。钉钉群:32803524

casel.chen  于2021年6月12日周六 下午5:56写道:

> 需求背景:
> 因为不同Flink版本SQL底层实现不相同,同一个带状态的Flink SQL作业目前跨版本升级社区还不支持。所以如果基于Flink
> SQL开发实时计算平台的话,平台需要支持不同Flink版本。早期的Flink SQL作业用的是1.11,最新的Flink
> SQL作业采用的是1.13开发的。
>
>
> 而让平台支持不同Flink版本,我能想到有三种实现方案:
>
>
> 1. 平台直接调用 flink run 或 flink run-application 提交作业
> 优点:实现简单,每个flink版本都会带这个shell脚本
>
> 缺点:受限于脚本提供的功能,像语法检查、格式化sql、获取sql执行计划及sql结构和血缘这些“高级”功能没有在官方脚本中提供(能否完善下呢?),如果要使用的话得自己扩展实现(同样需要自己维护多版本)
>
>
> 2. 平台调用 flink sql gateway 暴露的 restful api 提交作业
> 优点:实现简单,通过gateway屏蔽上层应用调用多版本flink api问题
> 缺点:需要额外扩展开发和部署gateway,目前暂不清楚社区对gateway的定位是什么?只是配合flink jdbc
> driver使用么?这两项目目前活跃度都不高
>
>
> 3. 需要维护多个平台版本,像 flink-sql-connector-hive 支持多版本hive那样
> 优点:现有平台就是基于特定版本flink api开发的,改动小,需要抽取公用接口
> 缺点:需要“复制”平台多份实现以支持不同flink版本,开发和维护量比较大
>
>
> 目前,我暂时prefer方案2,可能不对,也可能还有更优的我没想到,欢迎各位大佬们给点建议。
>
>

-- 
Best Regards

Jeff Zhang


Re: Flink PrometheusReporter support for HTTPS

2021-06-12 Thread Austin Cawley-Edwards
Hi Ashutosh,

How are you deploying your Flink apps? Would running a reverse proxy like
Nginx or Envoy that handles the HTTPS connection work for you?

Best,
Austin

On Sat, Jun 12, 2021 at 1:11 PM Ashutosh Uttam 
wrote:

> Hi All,
>
> Does PrometheusReporter provide support for HTTPS?. I couldn't find any
> information in flink documentation.
>
> Is there any way we can achieve the same?
>
> Thanks & Regards,
> Ashutosh
>
>
>


Re: Error with extracted type from custom partitioner key

2021-06-12 Thread Ken Krugler
Hi Timo,

Thanks, I’ll give the ResultTypeQueryable interface a try - my previous 
experience registering custom Kryo serializers wasn’t so positive.

Though I’m still curious as to whether java.lang.ClassCastException I got was 
representative of a bug in Flink, or my doing something wrong.

But with the ongoing deprecation of DataSet support, I imagine that’s a low 
priority issue in any case.

Regards,

— Ken


> On Jun 4, 2021, at 7:05 AM, Timo Walther  wrote:
> 
> Hi Ken,
> 
> non-POJOs are serialized with Kryo. This might not give you optimal 
> performance. You can register a custom Kryo serializer in ExecutionConfig to 
> speed up the serialization.
> 
> Alternatively, you can implement `ResultTypeQueryable` provide a custom type 
> information with a custom serializer.
> 
> I hope this helps. Otherwise can you share a little example how you would 
> like to cann partitionCustom()?
> 
> Regards,
> Timo
> 
> On 04.06.21 15:38, Ken Krugler wrote:
>> Hi all,
>> I'm using Flink 1.12 and a custom partitioner/partitioning key (batch mode, 
>> with a DataSet) to do a better job of distributing data to tasks. The 
>> classes look like:
>> public class MyPartitioner implements Partitioner
>> {
>> ...
>> }
>> public class MyGroupingKey implements Comparable
>> {
>> ...
>> }
>> This worked fine, but I noticed a warning logged by Flink about 
>> MyGroupingKey not having an empty constructor, and thus not being treated as 
>> a POJO.
>> I added that empty constructor, and then I got an error because 
>> partitionCustom() only works on a single field key.
>> So I changed MyGroupingKey to have a single field (a string), with transient 
>> cached values for the pieces of the key that I need while partitioning. Now 
>> I get an odd error:
>> java.lang.RuntimeException: Error while calling custom partitioner
>> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to 
>> MyGroupingKey
>> at MyPartitioner.partition(AdsPinotFilePartitioner.java:11)
>> at 
>> org.apache.flink.runtime.operators.shipping.OutputEmitter.customPartition(OutputEmitter.java:235)
>> ... 19 more
>> So I've got two questions…
>> • Should I just get rid of the empty constructor, and have Flink treat it as 
>> a non-POJO? This seemed to be working fine.
>> • Is it a bug in Flink that the extracted field from the key is being used 
>> as the expected type for partitioning?
>> Thanks!
>> — Ken
>> --
>> Ken Krugler
>> http://www.scaleunlimited.com 
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
> 

--
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch





should i expect POJO serialization warnings when dealing w/ kryo protobuf serialization?

2021-06-12 Thread Jin Yi
i'm currently using protobufs, and registering the serializers using kryo
protobuf using the following snippet of code:

static void optionalRegisterProtobufSerializer(ExecutionConfig config,
Class clazz) {

if (clazz != null) {

config.registerTypeWithKryoSerializer(clazz, ProtobufSerializer.
class);

}

}


static void configureExecutionConfig(ExecutionConfig config) {

optionalRegisterProtobufSerializer(config, User.class);

optionalRegisterProtobufSerializer(config, View.class);

optionalRegisterProtobufSerializer(config, Request.class);

optionalRegisterProtobufSerializer(config, Insertion.class);

optionalRegisterProtobufSerializer(config, Impression.class);

optionalRegisterProtobufSerializer(config, Action.class);

optionalRegisterProtobufSerializer(config, FlatEvent.class);

optionalRegisterProtobufSerializer(config, LatestImpression.class);

}


// *TODO* - reuse with batch.

void configureStreamExecutionEnvironment(StreamExecutionEnvironment
env) {

configureExecutionConfig(env.getConfig());

if (checkpointInterval > 0) {

env.enableCheckpointing(checkpointInterval);

}

env.getCheckpointConfig().setCheckpointingMode(checkpointingMode);

// *TODO* - evaluate if we want setMinPauseBetweenCheckpoints.

if (minPauseBetweenCheckpoints > 0) {


env.getCheckpointConfig().setMinPauseBetweenCheckpoints(minPauseBetweenCheckpoints);

}

if (unalignedCheckpoints) {

env.getCheckpointConfig().enableUnalignedCheckpoints();

}

if (checkpointTimeout > 0) {


env.getCheckpointConfig().setCheckpointTimeout(checkpointTimeout);

}


env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

}

the concerning thing i have a question on is that i'm seeing these sorts of
info logs in the taskmanager logs:

org.apache.flink.api.java.typeutils.TypeExtractor[] - Class
class ai.promoted.proto.event.FlatEvent cannot be used as a POJO type
because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.

2021-06-12 17:47:03,230 INFO
org.apache.flink.api.java.typeutils.TypeExtractor[] - class
ai.promoted.proto.event.LatestImpression does not contain a getter for
field impressionId_

2021-06-12 17:47:03,230 INFO
org.apache.flink.api.java.typeutils.TypeExtractor[] - class
ai.promoted.proto.event.LatestImpression does not contain a setter for
field impressionId_

2021-06-12 17:47:03,230 INFO
org.apache.flink.api.java.typeutils.TypeExtractor[] - Class
class ai.promoted.proto.event.LatestImpression cannot be used as a POJO
type because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.

2021-06-12 17:47:03,230 INFO
org.apache.flink.api.java.typeutils.TypeExtractor[] - class
ai.promoted.proto.event.LatestImpression does not contain a getter for
field impressionId_

2021-06-12 17:47:03,230 INFO
org.apache.flink.api.java.typeutils.TypeExtractor[] - class
ai.promoted.proto.event.LatestImpression does not contain a setter for
field impressionId_

2021-06-12 17:47:03,230 INFO
org.apache.flink.api.java.typeutils.TypeExtractor[] - Class
class ai.promoted.proto.event.LatestImpression cannot be used as a POJO
type because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.

can i safely ignore these?  is it telling me that it's doing the right
thing since kryo should kick in for GenericType?


Flink PrometheusReporter support for HTTPS

2021-06-12 Thread Ashutosh Uttam
Hi All,

Does PrometheusReporter provide support for HTTPS?. I couldn't find any
information in flink documentation.

Is there any way we can achieve the same?

Thanks & Regards,
Ashutosh


flink on 原生kubernetes支持批处理吗?

2021-06-12 Thread casel.chen
我们知道flink on 原生kubernetes当前是用k8s deployment运行一个流作业的,请问会用k8s job运行一个批作业吗?

Re: NPE when aggregate window.

2021-06-12 Thread HaochengWang
Hi, 
I meet the same exception, and find your suggestion here. I'm confused about
the word 'grouping key', is that refers to the key of the accumulating hash
map, or the key that separate the stream by some information?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink1.10 SQL支持消费Kafka多个Topic吗

2021-06-12 Thread Jason Lee
Hi,各位社区伙伴


我这里有一个问题想请教大家,Flink 1.10版本中可以写Flink SQL任务建表的时候指定多个Kafka的Topic吗?我发现Flink 
1.12版本中Flink 
SQL任务可以通过用’topic’='topic-1;topic-2’这种方式消费多个Topic数据,但是想问一下大家知道在Flink 
1.10版本中支持吗?可以通过正则匹配的方式实现消费多个Topic吗?


当然可以换一种方式比如建多个表每个表一个Topic饭后union,但是这种方式比较繁琐。


Best,
JasonLee1781
| |
李闯
|
|
jasonlee1...@163.com
|
签名由网易邮箱大师定制



Web UI shows my AssignTImestamp is in high back pressure but in/outPoolUsage are both 0.

2021-06-12 Thread Haocheng Wang
Hi, I have a job like 'Source -> assignmentTimestamp -> flatmap ->  Window
-> Sink' and I get back pressure from 'Source' to the 'FlatMap' operators
form the 'BackPressure' tab in the Web UI.
When trying to find which operator is the source of back pressure, I use
metrics provided by the Web UI, specifically, 'inPoolUsage' and
'outPoolUsage'.
Firstly, As far as I know, when both of the metrics are 0, the operator
should not be defined as 'back pressured', but when I check the
'AssignmentTimestamp' operator, where 8 subtasks running, I find 1 or 2 of
them have 0 value about the back pressure index, and the others have the
index higher than 0.80, and all of them are marked  in 'HIGH' status.
However, the two metrics, 'in/outPoolUsage', are always be 0. So maybe the
operator is not back pressured actually?  Or is there any problem with my
Flink WebUI?
Second question is, from my experience, I think the source of the back
pressure should be the Window operator because the outPoolUsage of the
'FlatMap' are 1, and the 'Window' is the first downstream operator from the
'Flatmap', but the inPoolUsage and the outPoolUsage are also 0. So the
cause of the back pressure should be the network bottleneck between window
and flatmap? Am I right?
Thanks for your reading, and I'm looking forward for your ideas.

Haocheng


flink sql平台多版本支持问题

2021-06-12 Thread casel.chen
需求背景:
因为不同Flink版本SQL底层实现不相同,同一个带状态的Flink SQL作业目前跨版本升级社区还不支持。所以如果基于Flink 
SQL开发实时计算平台的话,平台需要支持不同Flink版本。早期的Flink SQL作业用的是1.11,最新的Flink SQL作业采用的是1.13开发的。


而让平台支持不同Flink版本,我能想到有三种实现方案:


1. 平台直接调用 flink run 或 flink run-application 提交作业
优点:实现简单,每个flink版本都会带这个shell脚本
缺点:受限于脚本提供的功能,像语法检查、格式化sql、获取sql执行计划及sql结构和血缘这些“高级”功能没有在官方脚本中提供(能否完善下呢?),如果要使用的话得自己扩展实现(同样需要自己维护多版本)


2. 平台调用 flink sql gateway 暴露的 restful api 提交作业
优点:实现简单,通过gateway屏蔽上层应用调用多版本flink api问题
缺点:需要额外扩展开发和部署gateway,目前暂不清楚社区对gateway的定位是什么?只是配合flink jdbc 
driver使用么?这两项目目前活跃度都不高


3. 需要维护多个平台版本,像 flink-sql-connector-hive 支持多版本hive那样
优点:现有平台就是基于特定版本flink api开发的,改动小,需要抽取公用接口
缺点:需要“复制”平台多份实现以支持不同flink版本,开发和维护量比较大


目前,我暂时prefer方案2,可能不对,也可能还有更优的我没想到,欢迎各位大佬们给点建议。