Re:咨询Flink 1.19文档中关于iterate操作

2024-05-20 文章 Xuyang
Hi, 

目前Iterate api在1.19版本上废弃了,不再支持,具体可以参考[1][2]。Flip[1]中提供了另一种替代的办法[3]




[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-357%3A+Deprecate+Iteration+API+of+DataStream

[2] https://issues.apache.org/jira/browse/FLINK-33144

[3] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184615300




--

Best!
Xuyang





在 2024-05-20 22:39:37,""  写道:
>尊敬的Flink开发团队:
>
>您好!
>
>我目前正在学习如何使用Apache Flink的DataStream API实现迭代算法,例如图的单源最短路径。在Flink 
>1.18版本的文档中,我注意到有关于iterate操作的介绍,具体请见:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/overview/#iterations
>
>但是,我发现Flink 
>1.19版本的文档中不再提及iterate操作。这让我有些困惑。不知道在最新版本中,这是否意味着iterate操作不再被支持?如果是这样的话,请问我该如何在数据流上进行迭代计算?
>
>非常感谢您的时间和帮助,期待您的回复。
>
>谢谢!
>
>李智诚


Re:Re: use flink 1.19 JDBC Driver can find jdbc connector

2024-05-15 文章 Xuyang
Hi, 

> 现在可以用中文了?

我看你发的是中文答疑邮箱




> 就是opt目录里面的gateway.jar直接编辑Factory文件把connector注册就行了

你的意思是,之前报错类似"找不到一个jdbc 
connector",然后直接在gateway的jar包里的META-INF/services内的Factory文件(SPI文件)内加入jdbc 
connector的Factory实现类就好了吗?




如果是这个问题就有点奇怪,因为本身flink-connector-jdbc的spi文件就已经将相关的类写进去了[1],按理说放到lib目录下,就会spi发现的




[1] 
https://github.com/apache/flink-connector-jdbc/blob/bde28e6a92ffa75ae45bc8df6be55d299ff995a2/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory#L16




--

Best!
Xuyang





在 2024-05-15 15:51:49,abc15...@163.com 写道:
>现在可以用中文了?就是opt目录里面的gateway.jar直接编辑Factory文件把connector注册就行了
>
>
>> 在 2024年5月15日,15:36,Xuyang  写道:
>> 
>> Hi, 看起来你之前的问题是jdbc driver找不到,可以简单描述下你的解决的方法吗?“注册connection数的数量”有点不太好理解。
>> 
>> 
>> 
>> 
>> 如果确实有类似的问题、并且通过这种手段解决了的话,可以建一个improvement的jira issue[1]来帮助社区跟踪、改善这个问题,感谢!
>> 
>> 
>> 
>> 
>> [1] https://issues.apache.org/jira/projects/FLINK/summary
>> 
>> 
>> 
>> 
>> --
>> 
>>Best!
>>Xuyang
>> 
>> 
>> 
>> 
>> 
>>> 在 2024-05-10 12:26:22,abc15...@163.com 写道:
>>> I've solved it. You need to register the number of connections in the jar 
>>> of gateway. But this is inconvenient, and I still hope to improve it.
>>> 发自我的 iPhone
>>> 
>>>>> 在 2024年5月10日,11:56,Xuyang  写道:
>>>> 
>>>> Hi, can you print the classloader and verify if the jdbc connector exists 
>>>> in it?
>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> 
>>>>   Best!
>>>>   Xuyang
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> At 2024-05-09 17:48:33, "McClone"  wrote:
>>>>> I put flink-connector-jdbc into flink\lib.use flink 1.19 JDBC Driver can 
>>>>> not  find jdbc connector,but use sql-client is normal.


Re:请问如何贡献Flink Hologres连接器?

2024-05-15 文章 Xuyang
Hi, 

我觉得如果只是从贡献的角度来说,支持flink hologres 
connector是没问题的,hologres目前作为比较热门的数据库,肯定是有很多需求的,并且现在aliyun 
github官方也基于此提供了开源的flink hologres connector[1]。





但是涉及到aliyun等公司商业化的ververica-connector-hologres包,如果想直接开源的话,在我的角度最好事先确认下面几点,不然可能会隐含一些法律风险

  1. jar包的提供方(aliyun等公司)是否知情、且愿意开源,不然直接拿着商业化的东西给出来有点不太好

2. jar包内的协议是否满足开源的协议,而不是商业化的协议




我推荐如果真要开源,可以基于开源github仓库的flink hologres connector[1]来贡献(比如现在我看目前它最高支持flink 
1.17,可以试试贡献支持到1.18、1.19等等)




[1] https://github.com/aliyun/alibabacloud-hologres-connectors




--

Best!
Xuyang





在 2024-05-14 11:24:37,"casel.chen"  写道:
>我们有使用阿里云商业版Hologres数据库,同时我们有自研的Flink实时计算平台,为了实现在Hologres上实时建仓,我们基于开源Apache 
>Flink 1.17.1结合阿里云maven仓库的ververica-connector-hologres包[1]和开源的holo 
>client[2]开发了hologres 
>connector,修复了一些jar依赖问题。目前我们已经在生产环境使用了一段时间,暂时没有发现问题,现在想将它贡献给社区。
>
>
>请问:
>1. 贡献Flink Hologres连接器是否合规?
>2. 如果合规的话,PR应该提到哪个项目代码仓库?
>3. 还是说要像 https://flink-packages.org/categories/connectors 
>这样链接到自己的github仓库?如果是的话要怎么在flink-packages.org上面注册呢?
>
>
>[1] 
>https://repo1.maven.org/maven2/com/alibaba/ververica/ververica-connector-hologres/1.17-vvr-8.0.4-1/
>[2] 
>https://github.com/aliyun/alibabacloud-hologres-connectors/tree/master/holo-client


Re:Re: use flink 1.19 JDBC Driver can find jdbc connector

2024-05-15 文章 Xuyang
Hi, 看起来你之前的问题是jdbc driver找不到,可以简单描述下你的解决的方法吗?“注册connection数的数量”有点不太好理解。




如果确实有类似的问题、并且通过这种手段解决了的话,可以建一个improvement的jira issue[1]来帮助社区跟踪、改善这个问题,感谢!




[1] https://issues.apache.org/jira/projects/FLINK/summary




--

Best!
Xuyang





在 2024-05-10 12:26:22,abc15...@163.com 写道:
>I've solved it. You need to register the number of connections in the jar of 
>gateway. But this is inconvenient, and I still hope to improve it.
>发自我的 iPhone
>
>> 在 2024年5月10日,11:56,Xuyang  写道:
>> 
>> Hi, can you print the classloader and verify if the jdbc connector exists 
>> in it?
>> 
>> 
>> 
>> 
>> --
>> 
>>Best!
>>Xuyang
>> 
>> 
>> 
>> 
>> 
>> At 2024-05-09 17:48:33, "McClone"  wrote:
>>> I put flink-connector-jdbc into flink\lib.use flink 1.19 JDBC Driver can 
>>> not  find jdbc connector,but use sql-client is normal.


Re:use flink 1.19 JDBC Driver can find jdbc connector

2024-05-09 文章 Xuyang
Hi, can you print the classloader and verify if the jdbc connector exists in it?




--

Best!
Xuyang





At 2024-05-09 17:48:33, "McClone"  wrote:
>I put flink-connector-jdbc into flink\lib.use flink 1.19 JDBC Driver can not  
>find jdbc connector,but use sql-client is normal.


Re:Flink 截止到1.18,是否有办法在Table API上添加uid?

2024-04-24 文章 Xuyang
Hi, 如果在中间添加了op,或者修改了处理逻辑,那么代表拓扑图会变,那么基于拓扑序所确定的uid也会变,从状态恢复就可能失败。具体可以参考[1]


目前table api应该是没有开放自定义uid的能力,可以在jira[2]上新建一个feature的jira,然后在dev邮件里发起讨论下。




[1] 
https://github.com/apache/flink/blob/92eef24d4cc531d6474252ef909fc6d431285dd9/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java#L243C38-L243C62
[2] https://issues.apache.org/jira/projects/FLINK/issues/



--

Best!
Xuyang





在 2024-04-25 01:18:55,"Guanlin Zhang"  写道:
>Hi Team,
>
>我们这边的业务使用 Flink MySQL CDC到 OpenSearch并且使用TABLE API: INSERT INTO t1 SELECT * 
>FROM t2 这种方式。
>
>由于我们这边可能会在运行过程中添加额外的Operator,我们有办法在使用snapshot 恢复后保留之前src和sink 
>operator的状态么?我看到在DataStream API可以通过设定uid。Table API有同样的方法吗?我看到Flink 
>jira:https://issues.apache.org/jira/browse/FLINK-28861 
>可以设置table.exec.uid.generation=PLAN_ONLY。请问默认配置下,中间添加transformation 
>operator或者其他变更后从snapshot恢复会保留之前的状态么?
>
>


Re:处理时间的滚动窗口提前触发

2024-04-23 文章 Xuyang
Hi, 我看你使用了System.currentTimeMillis(),有可能是分布式的情况下,多台TM上的机器时间不一致导致的吗?




--

Best!
Xuyang





在 2024-04-20 19:04:14,"hhq" <424028...@qq.com.INVALID> 写道:
>我使用了一个基于处理时间的滚动窗口,窗口大小设置为60s,但是我在窗口的处理函数中比较窗口的结束时间和系统时间,偶尔会发现获取到的系统时间早于窗口结束时间(这里的提前量不大,只有几毫秒,但是我不清楚,这是flink窗口本身的原因还是我代码的问题)我没有找到原因,请求帮助
>
>public static void main(String[] args) throws Exception {
>
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>DataStreamSource integerDataStreamSource = env.addSource(new 
> IntegerSource());
>
>integerDataStreamSource
>.keyBy(Integer::intValue)
>.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
>.process(new IntegerProcessFunction())
>.setParallelism(1);
>
>env.execute();
>}
>
>
>public class IntegerProcessFunction extends ProcessWindowFunctionObject, Integer, TimeWindow> {
>private Logger log;
>@Override
>public void open(Configuration parameters) throws Exception {
>super.open(parameters);
>this.log = Logger.getLogger(IntegerProcessFunction.class);
>}
>
>@Override
>public void process(Integer integer, ProcessWindowFunction Object, Integer, TimeWindow>.Context context, Iterable elements, 
> Collector out) throws Exception {
>long currentTimeMillis = System.currentTimeMillis();
>long end = context.window().getEnd();
>
>if (currentTimeMillis < end) {
>log <http://log.info/>.info <http://log.info/>("bad");
>} else {
>log <http://log.info/>.info <http://log.info/>("good");
>}
>}
>}
>


Re:Unable to use Table API in AWS Managed Flink 1.18

2024-04-10 文章 Xuyang
Hi, Perez.
Flink use SPI to find the jdbc connector in the classloader and when starting, 
the dir '${FLINK_ROOT}/lib' will be added 
into the classpath. That is why in AWS the exception throws. IMO there are two 
ways to solve this question.


1. upload the connector jar to AWS to let the classloader keep this jar. As for 
how to upload connector jars, you need to check 
the relevant documents of AWS.
2. package the jdbc connector jar into your job jar and submit it again.




--

Best!
Xuyang




At 2024-04-10 17:32:19, "Enrique Alberto Perez Delgado" 
 wrote:

Hi all,


I am using AWS Managed Flink 1.18, where I am getting this error when trying to 
submit my job:


```
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a 
connector using option: 'connector'='jdbc' at 
org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:798)
 at 
org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(FactoryUtil.java:772)
 at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:317)
 ... 32 more Caused by: org.apache.flink.table.api.ValidationException: Could 
not find any factory for identifier 'jdbc' that implements 
'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
```


I used to get this error when testing locally until I added the 
`flink-connector-jdbc-3.1.2-1.18`.jar to `/opt/flink/lib` in my local docker 
image, which I thought would be provided by AWS. apparently, it isn’t. Has 
anyone encountered this error before?


I highly appreciate any help you could give me,


Best regards, 


Enrique Perez
Data Engineer
HelloFresh SE | Prinzenstraße 89 | 10969 Berlin, Germany
Phone:  +4917625622422











| |
HelloFresh SE, Berlin (Sitz der Gesellschaft) | Vorstände: Dominik S. Richter 
(Vorsitzender), Thomas W. Griesel, Christian Gärtner, Edward Boyes | 
Vorsitzender des Aufsichtsrats: John H. Rittenhouse | Eingetragen beim 
Amtsgericht Charlottenburg, HRB 182382 B | USt-Id Nr.: DE 302210417

CONFIDENTIALITY NOTICE: This message (including any attachments) is 
confidential and may be privileged. It may be read, copied and used only by the 
intended recipient. If you have received it in error please contact the sender 
(by return e-mail) immediately and delete this message. Any unauthorized use or 
dissemination of this message in whole or in parts is strictly prohibited.

Re:Re: [ANNOUNCE] Donation Flink CDC into Apache Flink has Completed

2024-03-20 文章 Xuyang
Cheers!




--

Best!
Xuyang

在 2024-03-21 10:28:45,"Rui Fan" <1996fan...@gmail.com> 写道:
>Congratulations!
>
>Best,
>Rui
>
>On Thu, Mar 21, 2024 at 10:25 AM Hang Ruan  wrote:
>
>> Congrattulations!
>>
>> Best,
>> Hang
>>
>> Lincoln Lee  于2024年3月21日周四 09:54写道:
>>
>>>
>>> Congrats, thanks for the great work!
>>>
>>>
>>> Best,
>>> Lincoln Lee
>>>
>>>
>>> Peter Huang  于2024年3月20日周三 22:48写道:
>>>
>>>> Congratulations
>>>>
>>>>
>>>> Best Regards
>>>> Peter Huang
>>>>
>>>> On Wed, Mar 20, 2024 at 6:56 AM Huajie Wang  wrote:
>>>>
>>>>>
>>>>> Congratulations
>>>>>
>>>>>
>>>>>
>>>>> Best,
>>>>> Huajie Wang
>>>>>
>>>>>
>>>>>
>>>>> Leonard Xu  于2024年3月20日周三 21:36写道:
>>>>>
>>>>>> Hi devs and users,
>>>>>>
>>>>>> We are thrilled to announce that the donation of Flink CDC as a
>>>>>> sub-project of Apache Flink has completed. We invite you to explore the 
>>>>>> new
>>>>>> resources available:
>>>>>>
>>>>>> - GitHub Repository: https://github.com/apache/flink-cdc
>>>>>> - Flink CDC Documentation:
>>>>>> https://nightlies.apache.org/flink/flink-cdc-docs-stable
>>>>>>
>>>>>> After Flink community accepted this donation[1], we have completed
>>>>>> software copyright signing, code repo migration, code cleanup, website
>>>>>> migration, CI migration and github issues migration etc.
>>>>>> Here I am particularly grateful to Hang Ruan, Zhongqaing Gong,
>>>>>> Qingsheng Ren, Jiabao Sun, LvYanquan, loserwang1024 and other 
>>>>>> contributors
>>>>>> for their contributions and help during this process!
>>>>>>
>>>>>>
>>>>>> For all previous contributors: The contribution process has slightly
>>>>>> changed to align with the main Flink project. To report bugs or suggest 
>>>>>> new
>>>>>> features, please open tickets
>>>>>> Apache Jira (https://issues.apache.org/jira).  Note that we will no
>>>>>> longer accept GitHub issues for these purposes.
>>>>>>
>>>>>>
>>>>>> Welcome to explore the new repository and documentation. Your feedback
>>>>>> and contributions are invaluable as we continue to improve Flink CDC.
>>>>>>
>>>>>> Thanks everyone for your support and happy exploring Flink CDC!
>>>>>>
>>>>>> Best,
>>>>>> Leonard
>>>>>> [1] https://lists.apache.org/thread/cw29fhsp99243yfo95xrkw82s5s418ob
>>>>>>
>>>>>>


Re:一次执行单条insert语句和一次执行多条insert语句有什么区别

2024-03-13 文章 Xuyang
Hi, fengqi.
上面那种statement的方式,最终将只会产生一个作业,这个作业有机会复用这个source(拓扑图1 个source  -> 2 
个calc_sink),因此只需要读一次source就行了。
下面那种execute sql两次的方式,将产生两个作业,两个作业完全独立。



--

Best!
    Xuyang





At 2024-03-13 12:26:05, "ha.fen...@aisino.com"  wrote:
>StatementSet stmtSet = tEnv.createStatementSet();
>stmtSet.addInsertSql(
>  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product 
> LIKE '%Rubber%'");
>stmtSet.addInsertSql(
>  "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product 
> LIKE '%Glass%'");
>TableResult tableResult2 = stmtSet.execute();
>与下面有什么区别?
>tEnv.executeSql(
>  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product 
> LIKE '%Rubber%'");
>tEnv.executeSql(
>  "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product 
> LIKE '%Glass%'");


Re:flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 文章 Xuyang
Hi, 你的图挂了,可以用图床或者直接贴SQL




--

Best!
Xuyang




在 2024-03-08 10:54:19,"iasiuide"  写道:





下面的sql片段中
ods_ymfz_prod_sys_divide_order  为kafka source表
dim_ymfz_prod_sys_trans_log   为mysql为表
dim_ptfz_ymfz_merchant_info   为mysql为表



flink web ui界面的执行计划片段如下:

 [1]:TableSourceScan(table=[[default_catalog, default_database, 
ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 
1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 
5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, 
order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
+- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, IF(SEARCH(row_kind, 
Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * divide_fee_amt), divide_fee_amt) AS 
div_fee_amt, Reinterpret(CASE(create_time IS NULL, 1970-01-01 00:00:00, 
CAST(create_time AS TIMESTAMP(3 AS ts], where=[((order_state = '2') AND 
(divide_fee_amt  0) AND (sys_date = DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS 
TIMESTAMP(9)), '-MM-dd')))])
   +- 
[3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
 joinType=[LeftOuterJoin], async=[false], 
lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = 
DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], 
select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, 
bg_rel_trans_id, pay_type, member_id, mer_name])
  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
 +- 
[5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'merch', 
pk_id=member_id], where=[(data_source = 'merch')], select=[sys_date, 
create_time, div_fee_amt, ts, pay_type, member_id, mer_name, pk_id, agent_id, 
bagent_id])
+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
pay_type, member_id, mer_name, agent_id, bagent_id])
   +- 
[7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], 
where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], 
select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, mer_name, 
agent_id, bagent_id, pk_id, bagent_id, fagent_id])
  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS fagent_id0])
 +- 
[9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
 joinType=[LeftOuterJoin], async=[false], lookup=[data_source=_UTF-16LE'agent', 
pk_id=fagent_id0], where=[(data_source = 'agent')], select=[sys_date, 
create_time, div_fee_amt, ts, pay_type, member_id, mer_name, bagent_id, 
bagent_id0, fagent_id0, pk_id, agent_name, bagent_name])
  


为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT 
(CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> 
lookup=[bg_rel_trans_id=bg_rel_trans_id],
关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND 
c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> 
lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND 
(d.data_source = 'ex_agent' OR d.data_source = 'agent') 
中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
关联维表的限制条件有的会作为关联条件,有的不作为关联条件吗? 这种有什么规律吗? 因为这个会关乎维表的索引字段的设置。







Re:Window properties can only be used on windowed tables

2024-03-07 文章 Xuyang
Hi, fengqi.
这看起来像是select语句中,不能直接使用非来源于window 
agg的proctime或者event函数。目前不确定这是不是预期行为,方便的话可以在社区jira[1]上提一个bug看看。
快速绕过的话,可以试试下面的代码:



DataStream flintstones = env.fromCollection(list); // Table select = 
table.select($("name"), $("age"), $("addtime").proctime()); Table table = 
tEnv.fromDataStream( flintstones, Schema.newBuilder() .column("name", "string") 
.column("age", "int") .columnByExpression("addtime", "proctime()") .build()); 
Table select1 = // select.xxx 
table.window(Tumble.over(lit(10).second()).on($("addtime")).as("w")) 
.groupBy($("name"), $("w")) .select($("name"), $("age").sum()); 
select1.execute().print();


[1] https://issues.apache.org/jira/projects/FLINK/issues

--

Best!
Xuyang





At 2024-03-08 09:28:10, "ha.fen...@aisino.com"  wrote:
>public static void main(String[] args) {
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>List list = new ArrayList<>();
>list.add(new Persion("Fred",35));
>list.add(new Persion("wilma",35));
>list.add(new Persion("Pebbles",2));
>DataStream flintstones = env.fromCollection(list);
>Table table = tEnv.fromDataStream(flintstones);
>Table select = table.select($("name"), $("age"), 
> $("addtime").proctime());
>Table select1 = select.window(
>Tumble.over(lit(10).second())
>.on($("addtime"))
>.as("w"))
>.groupBy($("name"), $("w"))
>.select($("name"), $("age").sum());
>select1.execute().print();
>
>}
>
>public static class Persion{
>public String name;
>public Integer age;
>public Persion(){}
>public Persion(String name,Integer age){
>this.name = name;
>this.age = age;
>}
>public String toString(){
>return this.name.toString()+":age "+this.age.toString();
>}
>}
>
>提示Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>Window properties can only be used on windowed tables
>是哪里错了?


Re:Table中的java.util.Date类型对应sql中的什么类型

2024-03-04 文章 Xuyang
Hi, 
java.util.Date没有sql中的常规类型和它对应,因此使用的兜底的Raw类型(结构化类型)。实际上java.sql.Date 
对应的是sql中的Date。
具体可以参考下这张表:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/#data-type-extraction




--

Best!
Xuyang





在 2024-03-05 09:23:38,"ha.fen...@aisino.com"  写道:
>从流转换成Table
>DataStream streamSource = env.addSource(new OrdersSourceObject());
>Table table = 
>tEnv.fromDataStream(streamSource).select($("addtime"),$("cusname"),$("price"),$("status"));
>tEnv.createTemporaryView("itemtable",table);
>
>Orders定义
>private Date addtime;
>private String cusname;
>private BigDecimal price;
>private int status;
>
>输出到kafka
>String creatDDL = "CREATE TABLE kafka_sink (\n" +
>"  `addtime` TIMESTAMP(0),\n" +
>"  `cusname` STRING,\n" +
>"  `price` DECIMAL(15, 2),\n" +
>"  `status` INT\n" +
>") WITH (\n" +
>"  'connector' = 'kafka',\n" +
>"  'format' = 'json'\n" +
>.
>")";
>
>String query = "INSERT INTO kafka_sink SELECT * FROM itemtable;";
>tEnv.executeSql(query);
>
>报错
>
>Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>Column types of query result and sink for 
>'default_catalog.default_database.kafka_sink' do not match.
>Cause: Incompatible types for sink column 'addtime' at position 0.
>
>Query schema: [addtime: RAW('java.util.Date', '...'), cusname: STRING, price: 
>DECIMAL(38, 18), status: INT NOT NULL]
>Sink schema:  [addtime: TIMESTAMP(0), cusname: STRING, price: DECIMAL(15, 2), 
>status: INT]


Re:Re:Re:Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-21 文章 Xuyang
Hi, 
> 那是不是要计算sink延迟的话只需用 context.currentProcessingTime() - context.timestamp() 即可?
对,具体可以参考下这个内部实现的算子[1]


> 新的sink 
> v2中的SinkWriter.Context已经没有currentProcessingTime()方法了,是不是可以直接使用System.currentTimeMillis()
>  - context.timestamp()得到sink延迟呢?
应该是可以的,就是可能因为各tm的机器时间会有略微差异的情况,不会特别准,但是应该也够用了。


[1] 
https://github.com/apache/flink/blob/e7e973e212d0ca04855af3036fc5b73888b8e0e5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java#L314




--

    Best!
Xuyang





在 2024-02-21 15:17:49,"casel.chen"  写道:
>感谢!那是不是要计算sink延迟的话只需用 context.currentProcessingTime() - context.timestamp() 即可?
>我看新的sink 
>v2中的SinkWriter.Context已经没有currentProcessingTime()方法了,是不是可以直接使用System.currentTimeMillis()
> - context.timestamp()得到sink延迟呢?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2024-02-21 09:41:37,"Xuyang"  写道:
>>Hi, chen. 
>>可以试一下在sink function的invoke函数中使用:
>>
>>
>>@Override
>>public void invoke(RowData row, Context context) throws Exception {
>>context.currentProcessingTime(); 
>>context.currentWatermark(); 
>>...
>>}
>>
>>
>>
>>
>>
>>
>>
>>--
>>
>>Best!
>>Xuyang
>>
>>
>>
>>
>>
>>在 2024-02-20 19:38:44,"Feng Jin"  写道:
>>>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.
>>>
>>>Best,
>>>Feng
>>>
>>>On Tue, Feb 20, 2024 at 4:35 PM casel.chen  wrote:
>>>
>>>> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
>>>>
>>>>
>>>> public class XxxSinkFunction extends RichSinkFunction implements
>>>> CheckpointedFunction, CheckpointListener {
>>>>
>>>>
>>>> @Override
>>>> public synchronized void invoke(RowData rowData, Context context)
>>>> throws IOException {
>>>>//  这里想从rowData中获取event time和watermark值,如何实现呢?
>>>> }
>>>> }
>>>>
>>>>
>>>> 例如source table如下定义
>>>>
>>>>
>>>> CREATE TEMPORARY TABLE source_table(
>>>>   username varchar,
>>>>   click_url varchar,
>>>>   eventtime varchar,
>>>>
>>>>   ts AS TO_TIMESTAMP(eventtime),
>>>>   WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
>>>> ) with (
>>>>   'connector'='kafka',
>>>>   ...
>>>>
>>>> );
>>>>
>>>>
>>>> CREATE TEMPORARY TABLE sink_table(
>>>>   username varchar,
>>>>   click_url varchar,
>>>>   eventtime varchar
>>>> ) with (
>>>>   'connector'='xxx',
>>>>   ...
>>>> );
>>>> insert into sink_table select username,click_url,eventtime from
>>>> source_table;


Re:Re: flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?

2024-02-20 文章 Xuyang
Hi, chen. 
可以试一下在sink function的invoke函数中使用:


@Override
public void invoke(RowData row, Context context) throws Exception {
context.currentProcessingTime(); 
context.currentWatermark(); 
...
}







--

Best!
Xuyang





在 2024-02-20 19:38:44,"Feng Jin"  写道:
>我理解不应该通过 rowData 获取, 可以通过 Context 获得 watermark 和 eventTime.
>
>Best,
>Feng
>
>On Tue, Feb 20, 2024 at 4:35 PM casel.chen  wrote:
>
>> 请问flink sql中的自定义sink connector如何获取到source table中定义的event time和watermark?
>>
>>
>> public class XxxSinkFunction extends RichSinkFunction implements
>> CheckpointedFunction, CheckpointListener {
>>
>>
>> @Override
>> public synchronized void invoke(RowData rowData, Context context)
>> throws IOException {
>>//  这里想从rowData中获取event time和watermark值,如何实现呢?
>> }
>> }
>>
>>
>> 例如source table如下定义
>>
>>
>> CREATE TEMPORARY TABLE source_table(
>>   username varchar,
>>   click_url varchar,
>>   eventtime varchar,
>>
>>   ts AS TO_TIMESTAMP(eventtime),
>>   WATERMARK FOR ts AS ts -INTERVAL'2'SECOND--为Rowtime定义Watermark。
>> ) with (
>>   'connector'='kafka',
>>   ...
>>
>> );
>>
>>
>> CREATE TEMPORARY TABLE sink_table(
>>   username varchar,
>>   click_url varchar,
>>   eventtime varchar
>> ) with (
>>   'connector'='xxx',
>>   ...
>> );
>> insert into sink_table select username,click_url,eventtime from
>> source_table;


Re:回复: flink ui 算子数据展示一直loading...

2024-01-25 文章 Xuyang
Hi, 
手动curl 有问题的metric的接口,出来的数据正常吗? JM log里有发现什么异常么?



--

Best!
Xuyang





在 2024-01-26 11:51:53,"阿华田"  写道:
>这个维度都排查了  都正常
>
>
>| |
>阿华田
>|
>|
>a15733178...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2024年01月23日 21:57,Feng Jin 写道:
>可以尝试着下面几种方式确认下原因:
>
>
>1.
>
>打开浏览器开发者模式,看是否因为请求某个接口卡住
>2.
>
>查看下 JobManager 的 GC 情况,是否频繁 FullGC
>3.
>
>查看下 JobManager 的日志,是否存在某些资源文件丢失或者磁盘异常情况导致 web UI 无法访问.
>
>
>Best,
>Feng
>
>
>On Tue, Jan 23, 2024 at 6:16 PM 阿华田  wrote:
>
>
>
>如下图,任务处理数据正常,任务状态也正常,但是flink_ui一致处于loading中,只有个别任务这样,其他正常,有可能是metirc包的某个类冲突导致的吗?
>阿华田
>a15733178...@163.com
>
><https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=%E9%98%BF%E5%8D%8E%E7%94%B0=a15733178518%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png=%5B%22a15733178518%40163.com%22%5D>
>签名由 网易邮箱大师 <https://mail.163.com/dashi/dlpro.html?from=mail81> 定制
>
>


Re:flink cdc 读取数据后面可以跟窗口函数吗

2024-01-17 文章 Xuyang
Hi, 
Flink SQL中可以用Group Window[1]的方式来读完cdc数据后加窗口。
可以具体描述一下“一直不生效”的现象和SQL么?



[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/window-agg/#selecting-group-window-start-and-end-timestamps-1




--

Best!
Xuyang





在 2024-01-17 19:24:03,"2813732510" <2813732...@qq.com.INVALID> 写道:
>flinkcdc读取binlog数据后面可以开窗吗,测试滑动窗口,聚合,一直不生效,是有什么特别的用法嘛


Re:Flink1.16版本java.lang.OutOfMemoryError: Metaspace

2024-01-12 文章 Xuyang
Hi, 你的图挂了,贴一下代码吧。



--

Best!
Xuyang




在 2024-01-12 16:23:13,"Summer_Gu"  写道:

版本号:1.16
部署方式: Standalone Cluster集群部署
问题:通过webUi提交任务后,立马使任务报错,重复提交,会导致Metaspace内存溢出,(红框部分)




我看见官网有这部分说明:https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/debugging/debugging_classloading/#unloading-of-dynamically-loaded-classes-in-user-code
但是我在代码中填入:
并不起作用,所以想问一下是否使用方式有问题?或者还有其他解决方案吗?



Re:回复: flink-checkpoint 问题

2024-01-10 文章 Xuyang
Hi, 你的图挂了,可以用图床处理一下,或者直接贴log。




--

Best!
Xuyang




在 2024-01-11 13:40:43,"吴先生" <15951914...@163.com> 写道:

JM中chk失败时间点日志,没有25548的触发记录:


自动recovery失败:


TM日志:


checkpoint文件路径,25548里面空的:


| |
吴先生
|
|
15951914...@163.com
|
 回复的原邮件 
| 发件人 | Zakelly Lan |
| 发送日期 | 2024年1月10日 18:20 |
| 收件人 |  |
| 主题 | Re: flink-checkpoint 问题 |
你好,
方便的话贴一下jobmanager的log吧,应该有一些线索


On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote:

Flink版本: 1.12
checkpoint配置:hdfs

现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的




Re:Flink1.12读取不同topic, 认证信息不一样

2024-01-09 文章 Xuyang
Hi,
按照现在通用的设计应该是不行的。要么用两个comsumer读取后union;要么魔改下comsumer的代码,在真正数据拉取时用不同的aksk去读。




--

Best!
Xuyang





在 2024-01-09 14:49:35,"somebody someone" <1107807...@qq.com.INVALID> 写道:
>问题:目前使用Flink版本1.12
>需要接入01和02两个topic,属于同一集群,但是数据方给的两个topic的 
>jass的用户名username和密码password不一样,其他认证信息都一样,不想用两个Consumer去分别读取,
>怎么用同一个source 方式对接这种配置文件不一样的。
>
>这个上面也有人提出过,也没有想要的。
>https://stackoverflow.com/questions/38989443/flink-how-to-read-from-multiple-kafka-cluster-using-same-streamexecutionenviron
>
>
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>Properties properties = new Properties();
>properties.setProperty("bootstrap.servers", 
>"101.xxx.156.xxx:9097");
>properties.setProperty("group.id", "test_zj");
>properties.setProperty("security.protocol", 
>"SASL_SSL");
>properties.setProperty("sasl.mechanism", "PLAIN");
>properties.setProperty("sasl.jaas.config", 
>"org.apache.kafka.common.security.plain.PlainLoginModule required 
>username=\"xxx-9dc9-xxx\" password=\"-4CdNkCo$5b=xxx";");
>properties.setProperty("ssl.truststore.location", 
>"jks/client.truststore.jks");
>properties.setProperty("ssl.truststore.password", 
>"dmxxx");
>
>
>FlinkKafkaConsumerstringFlinkKafkaConsumer = new FlinkKafkaConsumer<(
>
>Arrays.asList("Topic01","topic02"),
>new 
>SimpleStringSchema(),
>properties
>);
>
>
>DataStreamSourceenv.addSource(stringFlinkKafkaConsumer);
>source.print();
>env.execute();
>
>
>
>
>
>
>
>
>
>
>somebodysomeone
>1107807...@qq.com
>
>
>
>


Re:flink1.18 regular join 支持两侧流各自配置state ttl

2024-01-05 文章 Xuyang
Hi,
文档中“The current TTL value for both left and right side is "0 ms", which 
means the state retention is not enabled.”,指的其实是并没有开启state 
ttl的意思,也就是并不会清理state、永久保留state,对应的是public involving 
api中的StateTTLConfig#UpdateType.Disabled[1],文档上的表述确实可以更加清晰一些,方便的话可以提一个jira 
improve一下文档。
另外,有点好奇为什么希望stateful的节点不保留state,可以举一个实际遇到的场景么?


[1] 
https://github.com/apache/flink/blob/4a852fee28f2d87529dc05f5ba2e79202a0e00b6/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java#L65C14-L65C14



--

Best!
Xuyang





在 2024-01-05 17:17:42,"Thomas Yang"  写道:
>本地测试发现 默认生成0ms,实际测试是两侧保留了永久状态,但是官方文档意思是0ms表示两侧都不保存状态. 是不是文档有错误?
>https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/#idle-state-retention-time
>
>另外咨询下:  如果0表示永久保留state  那么想不保存state应该使用什么值?
>*谢谢!*
>
>
>*杨勇*


Re:CUMULATE 窗口状态过大导致CK超时

2024-01-04 文章 Xuyang
Hi,

一般来说,业务上如果坚持要使用大state,可以尝试下尽可能的给多并发(让每个并发都持有一部分key的state,摊平大state)和内存(尽可能减少访问落盘的数据,减少IO)来提高性能。
对于你这个case来说,CUMULATE Window 
TVF在实现层面已经尽可能将小窗口的数据进行merge了[1]。可以dump下来看下具体是哪里的问题,是不是有进一步优化的空间。


[1] 
https://github.com/apache/flink/blob/b25dfaee80727d6662a5fd445fe51cc139a8b9eb/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/SliceAssigners.java#L340C15-L340C15

--

Best!
Xuyang





在 2024-01-05 09:41:01,"jiaot...@mail.jj.cn"  写道:
>Hi All,
> 我使用了CUMULATE( STEP => INTERVAL '1' MINUTES, SIZE => INTERVAL '1' 
> DAYS) 
> 累积窗口,导致太多数据保存在状态中,即使开启了增量式RocksDB,但是当程序运行一段时间后,CK依然超时从而导致任务失败。因此想咨询对于这种大窗口大状态应该如何优化和使用。非常感谢
>注:Flink版本 1.14.0


Re:Re: 滑动窗口按照处理时间触发的问题

2024-01-02 文章 Xuyang
Hi,
基本思路和Jinsui说的差不多,我怀疑也是watermark没有推进导致窗口没有开窗。具体可以debug一下EventTimeTrigger里的‘onElement’方法和‘onEventTime’方法。




--

Best!
Xuyang





在 2024-01-02 23:31:54,"Jinsui Chen"  写道:
>Hi,
>
>请问是否可以将所有代码贴出来,尤其是水位线相关的。因为事件时间的推进和水位线策略紧密相关。
>
>假设这样一种情况,将时间戳作为事件时间,假设你的水位线容错间隔设置为10min,就会出现上述情况,原因如下:
>1. 首先是时间窗口的对齐逻辑。窗口是根据 Epoch 时间(1970-01-01 00:00:00
>UTC)来对齐的。例如,如果窗口大小为5分钟,那么窗口的开始时间会是00:00、00:05、00:10等等很整的值,而不是事件时间。这也是为什么你的第一条数据会落在
>00:20 - 01:20 这个时间窗口上。
>2. 对于事件时间窗口,触发窗口计算的时机是‘水位线大于窗口结束时间’,也就是需要一条事件时间在 01:30 后的数据才会触发 00:20 -
>01:20 这个窗口。如果想要触发 1704129661000 对应的 00:25 - 01:25 窗口,需要一条事件时间大于 01:35
>的数据,而现实是没有这样的一条数据。
>
>我猜测你的水位线容错间隔是10min-14min 的值,导致了上述情况。
>
>Best regards,
>Jinsui
>
>ha.fen...@aisino.com  于2024年1月2日周二 20:17写道:
>
>>
>> 程序是一个滑动窗口SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)),
>> 处理函数aggregate(new AggregateFunctionMethod(), new MyProcess())。
>> 我使用socket方式进行录入数据
>> 2024-01-02 01:19:01  1704129541000
>> 2024-01-02 01:21:01  1704129661000
>> 2024-01-02 01:26:01  1704129961000
>> 2024-01-02 01:29:01  1704130141000
>> 2024-01-02 01:34:01  1704130441000
>> 前面是对应的时间,后面是我录入系统的时间
>> MyProcess类触发的时间是最后一次录入1704130441000的时候,输出窗口时间为
>> 2024-01-02 00:20:00.000,2024-01-02 01:20:00.000
>> 我认为应该录入1704129661000的时候就应该触发窗口函数了,但是并没有,所以我想问5分钟触发窗口的时间到底是怎么回事?
>>


Re:Re:RE: lock up表过滤条件下推导致的bug

2023-12-25 文章 Xuyang
Hi, 
可以贴一下flink的版本么?如果方便的话,也可以贴一下plan和最小可复现数据集。




--

Best!
Xuyang





在 2023-12-26 09:25:30,"杨光跃"  写道:
>
>
>
>
>
>
>CompiledPlan plan = env.compilePlanSql("insert into out_console " +
>" select r.apply_id from t_purch_apply_sent_route r " +
>" left join t_purch_apply_sent_route_goods FOR SYSTEM_TIME AS OF r.pt as  t " +
>"ON t.apply_id = r.apply_id and t.isdel = r.isdel" +
>" where r.apply_id = 61558439941351 and  t.route_goods_id is not null and 
>t.is_change = 2 " );
>
>
>
>
>
>
>
>
>
>
>
>在 2023-12-25 20:46:36,"Jiabao Sun"  写道:
>>Hi,
>>
>>邮件中的图片没显示出来,麻烦把 SQL 贴出来一下。
>>
>>Best,
>>Jiabao
>>
>>
>>On 2023/12/25 12:22:41 杨光跃 wrote:
>>> 我的sql如下:
>>> 、
>>> 
>>> 
>>> t_purch_apply_sent_route 是通过flink cdc创建的
>>> t_purch_apply_sent_route_goods 是普通的jdbc
>>> 我期望的结果是返回符合过滤条件的;但现在执行的结果,会返回t_purch_apply_sent_route表所有数据
>>> 这显然不符合我的预期,原因应该是因为过滤条件进行了过早的下推
>>> 这应该算是bug吧,或者要满足我的预期,该怎么写sql?
>>> 
>>> 
>>> 
>>> 


Re:关于文档中基于Table API 实现实时报表的问题

2023-12-13 文章 Xuyang
Hi, 
你可以试一下用TO_TIMESTAMP(FROM_UNIXTIME(transaction_time)) 将long转为timestamp




--

Best!
Xuyang





在 2023-12-13 15:36:50,"ha.fen...@aisino.com"  写道:
>文档中数据来源于kafka
>tEnv.executeSql("CREATE TABLE transactions (\n" +
>"account_id  BIGINT,\n" +
>"amount  BIGINT,\n" +
>"transaction_time TIMESTAMP(3),\n" +
>"WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' 
> SECOND\n" +
>") WITH (\n" +
>"'connector' = 'kafka',\n" +
>"'topic' = 'transactions',\n" +
>"'properties.bootstrap.servers' = 'kafka:9092',\n" +
>"'format'= 'csv'\n" +
>")");
>
>怎么对应kafka的transaction_time中的TIMESTAMP(3)类型?
>
>我是用实体类型为
>private long account_id;
>private int amount;
>private long transaction_time;
>
>通过下面插入kafka
>
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>DataStream streamSource = env.addSource(new 
> TransactionSource());
>tEnv.createTemporaryView("tran",streamSource, 
> $("account_id"),$("amount"),$("transaction_time"));
>tEnv.executeSql("CREATE TABLE transactions (\n" +
>"account_id  BIGINT,\n" +
>"amount  INT,\n" +
>"transaction_time TIMESTAMP(3)\n" +
>") WITH (\n" +
>"'connector' = 'kafka',\n" +
>"'topic' = 'kafka_mysql_transactions',\n" +
>"'properties.bootstrap.servers' = '172.24.6.109:9092',\n" +
>"'format'= 'json'\n" +
>")");
>tEnv.executeSql("insert into transactions select * from tran ");
>会报类似类型不匹配的错误
>Query schema: [account_id: BIGINT NOT NULL, amount: INT NOT NULL, 
>transaction_time: BIGINT NOT NULL]
>Sink schema:  [account_id: BIGINT, amount: INT, transaction_time: TIMESTAMP(3)]


Re:Flink脏数据处理

2023-12-06 文章 Xuyang
Hi, 
目前flink sql主动收集脏数据的行为。有下面两种可行的办法:
1. 如果知道脏数据是什么格式,那么将脏数据打个标,不走正常的处理逻辑,只收集,然后由一个UDAF来负责在达到一定的量的时候cancen。
2. 如果不知道脏数据是什么格式,可以在处理数据的那一个节点上使用UDX来处理正常的数据和脏数据,同时统计脏数据的数量,在达到一定上限的时候抛异常。


但是这里在udx里抛异常应该只会导致作业fo,无法让作业达到失败的状态。


要想让作业达到失败的状态,如果在source端就可以识别到脏数据的话,需要魔改下source 
connector,在识别到遇到多少脏数据的时候,不往后发数据就可以了。具体可以参考下[1]


[1] 
https://stackoverflow.com/questions/1153/how-to-stop-a-flink-streaming-job-from-program



--

Best!
Xuyang





在 2023-12-06 15:26:56,"刘建"  写道:
>Hi:我想使用flinkSQL 进行数据同步,如将MySQL数据读取并写入到MySQL中, 如果中途存在脏数据, 下游就会写不进去, 
>我如何收集这个脏数据呢, 当脏数据到达一定量的时候, 让该任务失败等等


Re:Re:Flink SQL作业配置'table.exec.sink.upsert-materialize'参数会影响TIMESTAMP类型精度?

2023-12-03 文章 Xuyang
Hi, 
可以提供一下最小能复现的query么?
我在本地尝试了下貌似并未复现,source数据为:


1,1,1970-01-01 00:00:00.001001
2,2,1970-01-01 00:00:00.002002
3,3,1970-01-01 00:00:00.003003


query为:


//tEnv.getConfig.getConfiguration
//  .set(ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE, 
UpsertMaterialize.FORCE)


tEnv.executeSql(s"""
   |create temporary table t1(
   |  a int primary key not enforced,
   |  b int,
   |  c timestamp(3)
   |) with (
   |'connector' = 'filesystem',
   |'path' = '/Users/zhongxuyang/test/sinkm/a',
   |'format'='csv'
   |)
   |""".stripMargin)


tEnv.executeSql(s"""
   |create temporary table t2(
   |  a int,
   |  b int primary key not enforced,
   |  c varchar
// |  c varchar(3)
   |) with (
   |'connector' = 'filesystem',
   |'path' = '/Users/zhongxuyang/test/sinkm/e',
   |'format'='csv'
   |)
   |""".stripMargin)


tEnv
  .executeSql(s"""
 | insert into t2 select a,b,cast(c as varchar) from t1;
     |""".stripMargin)
  .await()










--

Best!
Xuyang





在 2023-12-01 15:50:40,"casel.chen"  写道:
>补充一下,flink版本是 1.17.1
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2023-12-01 15:49:48,"casel.chen"  写道:
>>线上有一个flink sql作业,创建和更新时间列使用的是 TIMESTAMP(3) 
>>类型,没有配置'table.exec.sink.upsert-materialize'参数时是正常时间写入的`-MM-dd 
>>HH:mm:ss.SSS`格式,
>>然后添加了'table.exec.sink.upsert-materialize'='NONE'参数后,输出的时间格式变成了 `-MM-dd 
>>HH:mm:ss.SS`。数据类型变成了TIMESTAMP(6),请问这是已知的issue么?
>>
>>
>>-U[2023-11-29T21:11:02.327, 2023-11-29, 17332097, 20231129, 
>>YYHK6509100016607, S, 17332097, 1006.50, 30, 04, 1, 
>>23112921110248786000, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
>>+U[2023-11-30T12:43:04.676821, 2023-11-30, 000143554006, 20231130, 
>>23113012430450887882, F, 000143718775, 10.00, 44, 07, 2, 
>>23113012430450887895, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
>>+I[2023-11-29T17:37:01.556478, 2023-11-29, 000141180318, 20231129, 
>>2f1edf1e3337642d, P, 000141538175, 246.00, 999, 01, 2, 
>>23112917370147645164, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
>>-U[2023-11-25T16:02:45.145392, 2023-11-25, 000141288683, 20231125, 
>>2023112516024553495256400285, P, , 1200.00, 81, 02, 1, 23112516024525664244, 
>>2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
>>+U[2023-11-29T21:11:02.327, 2023-11-29, 17332097, 20231129, 
>>YYHK6509100016607, S, 17332097, 1006.50, 30, 04, 1, 
>>23112921110248786000, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
>>+U[2023-11-25T16:02:45.145392, 2023-11-25, 000141288683, 20231125, 
>>2023112516024553495256400285, F, 000141586078, 1200.00, 81, 02, 1, 
>>23112516024525664244, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
>>-U[2023-11-29T21:11:02.327, 2023-11-29, 17332097, 20231129, 
>>YYHK6509100016607, S, 17332097, 1006.50, 30, 04, 1, 
>>23112921110248786000, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]
>>-U[2023-11-28T14:53:21.349043, 2023-11-28, 000137842973, 20231128, 
>>HFPWALLET23112814532140921335, P, 000142774221, 62.98, 86, 06, 4, 
>>538014532140921373, 2023-12-01T15:01:51.683, 2023-12-01T15:01:51.683]


Re:关于Flink SQL语句内的函数名和内建函数名称对应不上的问题

2023-11-24 文章 Xuyang
Hi, 
关于你举的例子,如果编译了源码的话,可以在FlinkSqlParserImpl这个被动态生成的词法解析器类中找到PostfixRowOperator方法,大致是通过识别到IS
 NOT NULL这三个关键字,转化为Calcite的这个内置函数SqlStdOperatorTable.IS_NOT_NULL




--

Best!
Xuyang





在 2023-11-24 15:15:04,"jinzhuguang"  写道:
>flink 1.18.0
>
>
>例如我写下一条SQL:
> select * from KafkaTable where id is not null;
>
>IS NOT NULL应该属于系统内建函数,于是我找到相关代码:
>
>public static final BuiltInFunctionDefinition IS_NOT_NULL =
>BuiltInFunctionDefinition.newBuilder()
>.name("isNotNull")
>.kind(SCALAR)
>
> .inputTypeStrategy(wildcardWithCount(ConstantArgumentCount.of(1)))
>
> .outputTypeStrategy(explicit(DataTypes.BOOLEAN().notNull()))
>.build();
>
>发现他的name是“ isNotNull”,和“is not null”对应不上。并且经过实际测验,确实证实了我的猜想:
>
>DEBUG org.apache.flink.table.module.ModuleManager  [] - Cannot 
>find FunctionDefinition 'is not null' from any loaded modules.
>
>
>所以我很疑惑,SQL到底是在哪里找到了”is not null”这个函数的呢?
>
>以下是调用栈:
>@org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads()
>at 
> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:69)
>at 
> org.apache.calcite.sql.SqlUtil.lookupSubjectRoutinesByName(SqlUtil.java:609)
>at 
> org.apache.calcite.sql.SqlUtil.lookupSubjectRoutines(SqlUtil.java:535)
>at org.apache.calcite.sql.SqlUtil.lookupRoutine(SqlUtil.java:486)
>at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:595)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:6302)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:6287)
>at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:161)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1869)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1860)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereOrOn(SqlValidatorImpl.java:4341)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause(SqlValidatorImpl.java:4333)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3606)
>at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:64)
>at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:89)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1050)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1025)
>at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:248)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1000)
>at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:749)
>at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196)
>at 
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
>at 
> org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:261)
>at 
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
>at 
> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:187)
>at 
> org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
>at 
> org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119)
>at 
> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258)
>at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>at java.lang.Thread.run(Thread.java:750)


Re:Re:Re:flink sql支持批量lookup join

2023-11-22 文章 Xuyang
Hi, casel.
这种现在应该是没支持,好像也没有issue说要支持,可以去jira里建一个feature看看大家的反响。


目前同样可以实现的方式:
1. 三次look up join + union + udaf。
2. udf,row by row自己查,搞一个缓存提高性能。
3. 将社区的connector魔改一下,重新打包使用。
4. ..



--

Best!
Xuyang





在 2023-11-22 20:44:47,"casel.chen"  写道:
>有一张维表 user,包含id和name字段
>id  | name
>-
>1 | zhangsan
>2 | lisi
>3 | wangwu
>
>
>现在实时来了一条交易数据 
>id  | creator_id  | approver_id  | deployer_id
>-
>1   | 1| 2   | 3
>
>
>希望lookup维表user返回各用户名称
>id   |  creator_name   |  approver_name  |  deployer_name
>
>1| zhangsan  |  lisi|. wangwu
>
>
>
>以上场景用flink sql要如何实现?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2023-11-22 12:37:10,"Xuyang"  写道:
>>Hi, casel.
>>可以对“批量lookup join”再描述详细一点么?看上去是符合一个lookup join里直接带上k1=v1 and k2=v2 and 
>>k3=v3的用法的。
>>
>>
>>
>>
>>--
>>
>>Best!
>>Xuyang
>>
>>
>>
>>
>>在 2023-11-22 11:55:11,"casel.chen"  写道:
>>>一行数据带了三个待lookup查询的key,分别是key1,key2和key3
>>>
>>>
>>>id key1 key2 key3
>>>想实现批量lookup查询返回一行数据 id value1 value2 value3
>>>
>>>
>>>查了下目前包括jdbc connector在内的lookup都不支持批量查询,所以只能先将多列转成多行分别lookup再将多行转成多列,如下所示
>>>id key1 key2 key3
>>>先将多列转成多行
>>>id key1
>>>id key2
>>>id key3
>>>
>>>分别进行lookup join后得到
>>>id value1
>>>id value2
>>>id value3
>>>最后多行转多列返回一行数据
>>>
>>>id value1 value2 value3
>>>
>>>
>>>上述方案目前我能想到的是通过udtf + udaf来实现,但缺点是不具备通用性。Flink社区打算原生支持么?


Re:flink sql支持批量lookup join

2023-11-21 文章 Xuyang
Hi, casel.
可以对“批量lookup join”再描述详细一点么?看上去是符合一个lookup join里直接带上k1=v1 and k2=v2 and 
k3=v3的用法的。




--

Best!
Xuyang




在 2023-11-22 11:55:11,"casel.chen"  写道:
>一行数据带了三个待lookup查询的key,分别是key1,key2和key3
>
>
>id key1 key2 key3
>想实现批量lookup查询返回一行数据 id value1 value2 value3
>
>
>查了下目前包括jdbc connector在内的lookup都不支持批量查询,所以只能先将多列转成多行分别lookup再将多行转成多列,如下所示
>id key1 key2 key3
>先将多列转成多行
>id key1
>id key2
>id key3
>
>分别进行lookup join后得到
>id value1
>id value2
>id value3
>最后多行转多列返回一行数据
>
>id value1 value2 value3
>
>
>上述方案目前我能想到的是通过udtf + udaf来实现,但缺点是不具备通用性。Flink社区打算原生支持么?


Re:flink sql作业如何支持配置流?

2023-11-20 文章 Xuyang
Hi, 
是否可以将这个”配置维表“换成流表,利用flink cdc,改动这个配置表的时候,监听字段cdc变化,同时下游上流join呢?




--

Best!
Xuyang





在 2023-11-20 19:24:47,"casel.chen"  写道:
>我有一个flink 
>sql作业过滤条件customer_id是需要根据用户配置来定的,类似于Apollo配置中心,是否可以通过定义一张配置维表来实现呢?设置TTL定期去获取最新配置。
>
>
>create table customer_conf_tbl (
>  customer_id STRING
>) with (
>  'connector' = 'apollo',
>  '其他属性' 
>);
>select * from biz_table where customer_id in (select string_split(customer_id, 
>',') from customer_conf_tbl)
>
>
>如果要做成配置实时更新作用于sql作业的话又该如何实现呢?


Re:Flink sql 1.17.1 字段类型 DECIMAL(10, 0) 无法执行sql

2023-11-14 文章 Xuyang
Hi, 你的图挂了,可以贴一下图床链接或者直接贴一下代码。




--

Best!
Xuyang




在 2023-11-15 09:39:22,"刘聪聪"  写道:

Flink 1.17.1 遇到  DECIMAL(10, 0)类型字段,直接无法运行,我用强转都不行,还是报数组越界,去除 DECIMAL(10, 
0)类型字段,sql运行都正常。













Re:Re: Re: 关于 flink connect jar release 使用问题

2023-11-09 文章 Xuyang
另外,我确认了一下,你贴的kafka connector官方文档下载链接和maven依赖确实都是有问题的,社区已经有issue了[1]。
具体connector版本和flink版本对应关系就参考链接[2]吧,实际上kafka connector针对flink 
1.18的jar包已经有了,对应的版本是maven仓库[3]里的‘3.0.1-1.18’。





[1] https://issues.apache.org/jira/browse/FLINK-33512

[2] https://flink.apache.org/downloads/
[3] 
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/




--

Best!
Xuyang





在 2023-11-10 14:43:27,"zhhui yan"  写道:
>谢谢了,这些文档我都看到了,看起来 要使用1.18 还需要再等等了
>
>Xuyang  于2023年11月10日周五 14:32写道:
>
>> Hi,
>>   可以关注下这个讨论[1],1.18的connector还没有release出来。
>>   在flink 1.17的时候,flink connector基本上都从主仓库移出去了,参考kafka connector [1]。
>>   connector的下载和兼容的flink版本可以看下这个界面[3]。
>>
>>
>>
>>
>> [1] https://lists.apache.org/thread/r31f988m57rtjy4s75030pzwrlqybpq2
>>
>> [2] https://issues.apache.org/jira/browse/FLINK-30859
>> [3] https://flink.apache.org/downloads/
>>
>> --
>>
>> Best!
>> Xuyang
>>
>>
>>
>>
>> 在 2023-11-10 11:52:18,"zhhui yan"  写道:
>>
>> 所有的指向1.18 的 几乎都是,另外是 对应的 connector 以后是不是不要依赖 flink的具体版本了?,不然这个升级难道老大了
>> 文档:
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kafka/
>>
>>
>>
>>
>>
>>
>> Xuyang  于2023年11月10日周五 11:13写道:
>>
>> Hi, 可以贴一下release文档的地址吗?应该不需要自己编译才对。
>>
>>
>>
>>
>> --
>>
>> Best!
>> Xuyang
>>
>>
>>
>>
>>
>> 在 2023-11-09 15:46:06,"zhhui yan"  写道:
>> >我们这边想测试一下 升级flink 到1.18 但是我们的release 文档里面给的 connect 包都是404 ,这个是用1.17
>> >的包,还是需要自己编译?
>>
>>
>>
>>
>>
>> --
>>
>> All with you!
>>
>>From:  zhhuiyan
>>
>>  E-Mail:yzh...@gmail.com
>>
>>  QQ:451722401
>>
>>  Phone:13146724775
>>
>> 公司:九瑞网络科技有限公司
>
>
>
>-- 
>All with you!
>
> From:  zhhuiyan
>
> E-Mail:yzh...@gmail.com
>
>   QQ:451722401
>
> Phone:13146724775
>
>  公司:九瑞网络科技有限公司


Re:Re: 关于 flink connect jar release 使用问题

2023-11-09 文章 Xuyang
Hi, 
  可以关注下这个讨论[1],1.18的connector还没有release出来。
  在flink 1.17的时候,flink connector基本上都从主仓库移出去了,参考kafka connector [1]。
  connector的下载和兼容的flink版本可以看下这个界面[3]。




[1] https://lists.apache.org/thread/r31f988m57rtjy4s75030pzwrlqybpq2

[2] https://issues.apache.org/jira/browse/FLINK-30859
[3] https://flink.apache.org/downloads/

--

Best!
Xuyang




在 2023-11-10 11:52:18,"zhhui yan"  写道:

所有的指向1.18 的 几乎都是,另外是 对应的 connector 以后是不是不要依赖 flink的具体版本了?,不然这个升级难道老大了
文档: 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/datastream/kafka/
 






Xuyang  于2023年11月10日周五 11:13写道:

Hi, 可以贴一下release文档的地址吗?应该不需要自己编译才对。




--

Best!
Xuyang





在 2023-11-09 15:46:06,"zhhui yan"  写道:
>我们这边想测试一下 升级flink 到1.18 但是我们的release 文档里面给的 connect 包都是404 ,这个是用1.17
>的包,还是需要自己编译?





--

All with you!
 
From:  zhhuiyan
   
E-Mail:yzh...@gmail.com
   
QQ:451722401
   
Phone:13146724775
  
公司:九瑞网络科技有限公司

Re:关于 flink connect jar release 使用问题

2023-11-09 文章 Xuyang
Hi, 可以贴一下release文档的地址吗?应该不需要自己编译才对。




--

Best!
Xuyang





在 2023-11-09 15:46:06,"zhhui yan"  写道:
>我们这边想测试一下 升级flink 到1.18 但是我们的release 文档里面给的 connect 包都是404 ,这个是用1.17
>的包,还是需要自己编译?


Re:FLINK-33365 - Missing filter condition in execution plan containing lookup join with mysql jdbc connector

2023-11-07 文章 Xuyang
Hi, 
看了下发现这个jira下面已经有人在尝试复现但是没有成功。
如果可以的话,可以在jira下面留言回复一起多提供一些可以复现的case,帮助assigner复现这个问题,从而更快的定位+修复。




--

Best!
Xuyang





在 2023-11-07 15:59:53,"casel.chen"  写道:
>这个critical issue有人fix吗?我们线上使用flink 1.17.1版本有使用jdbc维表查询on带and过滤条件,发现and过滤条件不起作用
>
>
>例如
>select xxx from a left join b on a.id = b.id and b.type = 'xxx'
>发现b.type='xxx'这个过滤条件不起作用


Re:疑似BUG: 在滑动窗口中使用reduce()聚合时数据被多次处理

2023-11-03 文章 Xuyang
Hi,
   验证了下,问题疑似出现在reduce函数中,复用了下wordCount1这个对象。我试了下new一个新的WordCount作为输出应该就可以了。
猜测这可能和基于Heap的state backend有关,多个窗口的heap state可能直接使用的是一个对象的地址。


```
.reduce(
(wordCount1, wordCount2) -> {
WordCount newWC =
new WordCount(
wordCount1.word, wordCount1.count + wordCount2.count);
System.err.println(newWC);
return newWC;
})
```

--

Best!
    Xuyang





在 2023-11-03 10:53:37,"tao zhang"  写道:
>reduce()方法的状态在窗口间未被隔离,多个窗口聚合时使用的是同一对象.一个数据进入时,被重复累加
>是reduce的特性吗? 还是reduce中的窗口间隔离出现问题? 希望得到回复
>
>测试输入如下:
>1001,/home,1000
>1002,/home,2000
>
>输出如下:
>input> test.Event(user=1001, page=/home, ts=1000)
>input> test.Event(user=1002, page=/home, ts=2000)
>test.WordCount(word=/home, count=2)
>test.WordCount(word=/home, count=3)
>
>代码如下:
>
>import lombok.AllArgsConstructor;
>import lombok.Data;
>import lombok.NoArgsConstructor;
>import org.apache.flink.api.common.eventtime.WatermarkStrategy;
>import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import 
>org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
>import org.apache.flink.streaming.api.windowing.time.Time;
>import java.io.Serializable;
>import java.time.Duration;
>
>public class test {
>public static void main(String[] args) {
>//准备环境
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>env.setParallelism(1);
>
>//从端口读数据
>SingleOutputStreamOperator ds1 = 
> env.socketTextStream("hadoop102", 5).map(
>value->{
>String[] strings = value.split(",");
>return new 
> Event(strings[0].trim(),strings[1].trim(),Long.valueOf(strings[2].trim()) );
>}
>
>).assignTimestampsAndWatermarks(
>//增加水位线策略
>
> WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner((Event,
>  l) -> Event.getTs())
>);
>//检查输入流
>ds1.print("input");
>
>
>ds1.map(event -> new WordCount(event.getPage(), 1)
>).keyBy(WordCount::getWord
>//按键分组
>).window(
>//TumblingEventTimeWindows.of(Time.seconds(10))
>SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))
>//size为10步长为5的滑动窗口
>).reduce(
>//先增量聚合.将多个数据处理为一个中间结果
>
>(wordCount1, wordCount2) -> {
>
>Integer count = wordCount1.getCount();
>
>wordCount1.setCount(count + 1);
>
>System.out.println(wordCount1);
>
>return wordCount1;
>}
>
>
>);
>
>try {
>env.execute();
>} catch (Exception e) {
>throw new RuntimeException(e);
>}
>}
>
>@Data
>@AllArgsConstructor
>@NoArgsConstructor
>public static class Event {
>private String user;
>private String page;
>private Long ts;
>
>}
>
>@Data
>@AllArgsConstructor
>@NoArgsConstructor
>
>public static class WordCount implements Serializable {
>private String word;
>private Integer count;
>
>}
>
>
>
>}
>


Re:flink的sql gateway支持自定义的UDF吗?

2023-11-01 文章 Xuyang
Hi, 
你指的是sql gateway上 ADD JAR这种方式来上传自定义UDF的jar包[1]么?




[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/jar/

--

Best!
Xuyang





在 2023-11-01 14:21:04,"RS"  写道:
>Hi
>flink的sql gateway支持自定义的UDF吗?包括java和python的,有示例可以参考下吗?


Re:Re:回复: flink sql不支持show create catalog 吗?

2023-10-29 文章 Xuyang
Hi, CatalogStore 的引入我理解是为了Catalog能被更好地管理、注册和元数据存储,具体motivation可以参考Flip295[1].
我的理解是倒不是说“引入CatalogStore后才可以提供show create 
catalog语法支持”,而是之前没有直接存储catalog配置的地方和能力,在CatalogStore之后,天然支持了对catalog配置的存储,因此这个feat就可以直接快速的支持了。




[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-295%3A+Support+lazy+initialization+of+catalogs+and+persistence+of+catalog+configurations




--

Best!
Xuyang





在 2023-10-29 20:34:52,"casel.chen"  写道:
>请问flink 1.18引入的CatalogStore是为了解决什么问题呢?为什么引入CatalogStore后才可以提供show create 
>catalog语法支持?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2023-10-20 17:03:46,"李宇彬"  写道:
>>Hi Feng
>>
>>
>>我之前建过一个jira(https://issues.apache.org/jira/browse/FLINK-24939),引入CatalogStore后,实现这个特性的时机应该已经成熟了,我们这边业务场景里用到了很多catalog,管理起来很麻烦,有这个特性会好很多。
>>| |
>> 回复的原邮件 
>>| 发件人 | Feng Jin |
>>| 发送日期 | 2023年10月20日 13:18 |
>>| 收件人 |  |
>>| 主题 | Re: flink sql不支持show create catalog 吗? |
>>hi casel
>>
>>
>>从 1.18 开始,引入了 CatalogStore,持久化了 Catalog 的配置,确实可以支持 show create catalog 了。
>>
>>
>>Best,
>>Feng
>>
>>On Fri, Oct 20, 2023 at 11:55 AM casel.chen  wrote:
>>
>>之前在flink sql中创建过一个catalog,现在想查看当初创建catalog的语句复制并修改一下另存为一个新的catalog,发现flink
>>sql不支持show create catalog 。
>>而据我所知doris是支持show create catalog语句的。flink sql能否支持一下呢?


Re:flink sql如何处理脏数据问题?

2023-10-29 文章 Xuyang
Flink SQL目前对于脏数据没有类似side output的机制来输出,这个需求用自定义connector应该可以实现。







--

Best!
Xuyang





在 2023-10-29 10:23:38,"casel.chen"  写道:
>场景:使用flink 
>sql将数据写入下游OLAP系统,如doris,遇到一些异常情况,比如字段值超长或者分区字段值为当前doris表不存在的分区(需要先人为创建)等等,当前写入这些脏数据会使得作业写入报错,进而导致作业失败。我们是希望能够将这些“脏”数据单独发到一个kafka
> topic或者写入一个文件便于事后审查。这个目前有办法做到吗?


Re:如何在Flink Connector Source退出时清理资源

2023-10-23 文章 Xuyang
Hi, 
看一下你的DynamicTableSource实现的类,如果你用的是InputFormat的旧source(用的是类似InputFormatProvider.of),可以使用InputFormat里的close方法;
如果用的是flip-27的source(用的是类似SourceProvider.of),SplitReader里也有一个close方法










--

Best!
Xuyang





在 2023-10-24 11:54:36,"jinzhuguang"  写道:
>版本:Flink 1.16.0
>
>需求:在某个source结束退出时清理相关的资源。
>
>问题:目前没有找到Source退出时相关的hook函数,不知道在哪里编写清理资源的代码。
>
>恳请大佬们指教。


Re:flink-sql读取kafka Source并行度问题

2022-11-06 文章 Xuyang
Hi, 目前已经有相关的Flip来尝试在Flink SQL中单独设置source和sink的并行度[1],但是目前source 
并没有实现。如果实在需要的话,可能需要现在自己的本地poc一下自行build
[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-146%3A+Improve+new+TableSource+and+TableSink+interfaces



在 2022-11-04 14:10:24,"杨扬"  写道:

各位好!
目前使用flinkSQL连接kafka不能单独指定source算子并行度,所以大于kafka-partition个数的并行度都没数据了(如下图所示),kafka-partition个数是3使用flinkSQL开发的作业即使设置了5个并行度,但也只有3个并行度中是有数据处理的。
这个问题如何解决呢?
--
杨扬
银联数据服务有限公司 研究院
电话:021-60269751
邮箱:yangya...@cupdata.com









Re:flink exactly once 写kafka,如果checkpoint超时了导致kafka的transaction在commit之前也超时了, flink会怎么处理呢?

2022-11-01 文章 Xuyang
Hi, 应该会等到下一次做cp的时候再提交
在 2022-11-01 17:13:22,"郑 致远"  写道:
>大佬们好.
>flink exactly once 写kafka,如果flink 
>checkpoint超时了导致kafka的transaction在commit之前也超时了, flink会怎么处理呢?
>kafka的transaction因为超时,abort后, 会导致abort之前写kafka的数据,丢失吗?


Re:????????????????????????????

2022-11-01 文章 Xuyang
Himap
?? 2022-10-31 10:47:42??"" <289108...@qq.com.INVALID> ??
>??pojo??mapuion??
>
>
>
>piao289108...@vip.qq.com
>
>
>
>


Re:1.16什么时候release?

2022-10-19 文章 Xuyang
Hi, 
1.16的动向可以订阅下dev邮箱,最新的动态参考[1],官方docker版本会在release的时候同一时间发布[1]https://lists.apache.org/thread/b594swf6owhvl3thnfm1v8covwfxjj04
在 2022-10-18 12:12:39,"谭家良"  写道:
>hello,all
>如题,目前我对1.16的新功能比较感兴趣,想问一下flink1.16什么时候可以完成release?什么时候会有一个可用的官方docker镜像?
>
>
>| |
>谭家良
>|
>|
>tanjl_w...@126.com
>|


Re:flink cdc什么时候支持flink 1.15.x?

2022-10-12 文章 Xuyang
Hi,你可以参考cdc社区中支持flink 
1.15的issue[1]和pr[2],着急的话,可以尝试先cp这个pr到本地分支[1]https://github.com/ververica/flink-cdc-connectors/issues/1363[2]https://github.com/ververica/flink-cdc-connectors/pull/1504
在 2022-10-11 11:01:25,"casel.chen"  写道:
>当前flinlk cdc master分支的snapshot版本最高支持到flink 1.14.4,尝试使用flink 
>1.15.2编译会出错,请问flink cdc什么时候支持flink 1.15.x?


Re:控制流方式能否改变作业ExecutionGraph?

2022-10-10 文章 Xuyang
Hi,不重启作业的情况下,修改配置,实时改变ExecutionGraph目前是不支持的。







--

Best!
Xuyang





在 2022-09-27 08:36:53,"casel.chen"  写道:
>我有一个数据同步场景是希望通过修改配置来实时动态修改数据同步的目标,例如使用flink 
>cdc将mysql中的变更数据实时同步进kafka,如果后来业务又要求同一份数据再同步进mongodb的话,我是否可以通过修改同步配置来达到不停止原来作业来动态修改数据同步的目标(由一个变多个)?又或者是flink
> cdc整库同步mysql变更数据到kafka一个topic,后来业务又要求按表划分topic,这种能否同样通过修改配置来实现呢?


Re:flink cdc能否同步DDL语句?

2022-10-10 文章 Xuyang
Hi, 目前应该是不行的
在 2022-09-26 23:27:05,"casel.chen"  写道:
>flink cdc能否同步DDL语句以实现schema同步? 例如create table , create index, truncate table等


Re:咨询多条flink cdc作业消费同一个库下不同表优化方案

2022-09-25 文章 Xuyang
Hi,我理解你的业务可能需要的是,在全量阶段直接读取mysql数据,在增量阶段切换读取kafka的source?
如果是上述的需求的话,可以尝试下使用Hybrid 
source[1],从而在运行时实现不同源的切换。只不过可能需要参考现有connector的逻辑,copy出一个自定义的source




[1]https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/hybridsource/







--

Best!
Xuyang





在 2022-09-25 18:18:52,"casel.chen"  写道:
>目前业内针对多条flink 
>cdc作业消费同一个库下不同表为了防止对数据库方产生很大查询压力,一般都是发到kafka,但这样的话下游作业只能获取到实时增量数据进行处理,如果下游作业需要获取全量数据处理的话,还得再回过头来使用cdc
> connector,但这样会产生上述副作用。我在想作业是否能够在获取到全量数据之后做一个checkpoint,接下来就可以改使用kafka 
>connector? 续接的点是binlog offset,即cdc connector消费到的binlog offset要续接上kafka 
>connector某个消息带的binlog offset。不知道这种想法是否可行?


Re:Flink-1.15 HiveDynamicTableFactory 取Catalog使用

2022-09-21 文章 Xuyang
Hi,你可以看下HiveDynamicTableFactory的实现,作为connector需要实现factoryIdentifier来表明在with参数中使用的‘connector’=‘xxx’,但这个类由于仅支持在hive
 catalog中使用,所以没有实现(还有一些其他的方法,如options可以透传with中的其他参数)。
如果你需要HiveDynamicTableFactory的功能,我感觉可以通过copy出一个新的connector类,参照其他正常connector的方式裁剪和实现一些必要的方法,然后mvn打包一下




--

Best!
Xuyang





在 2022-09-19 17:04:00,"yanggang_it_job"  写道:
>当前使用HiveDynamicTableFactory需要先声明HiveCatalog才可以使用。
>请问能否把HiveDynamicTableFactory直接作为一个connector使用,或者说需要怎么调整才可以直接使用?


Re:native k8s部署模式下使用HA架构的HDFS集群无法正常连接

2022-09-21 文章 Xuyang
Hi,我对HA的HDFS部署不是很熟悉,但是看错误栈是由于无法识别hostname引起的:
Caused by: java.lang.IllegalArgumentException:
java.net.UnknownHostException: datacluster
我猜测是不是可以修改为以下两种之一:
1. hdfs://datacluster: port (类似hdfs://datacluster:8080)

2.  hdfs:///datacluster (三个斜杠)




希望可以帮到你

--

Best!
Xuyang





在 2022-09-21 18:24:46,"Tino Hean"  写道:
>*大家好, *
>*我正在测试在k8s集群部署模式下使用HA架构的HDFS集群, 以下是我的提交命令参数*
>./bin/flink run-application \
>--detached \
>--target kubernetes-application \
>-Dkubernetes.cluster-id=test \
>-Dkubernetes.container.image=flink-java11 \
>-Dfs.default-scheme=hdfs://datacluster \
>-Dkubernetes.rest-service.exposed.type=LoadBalancer \
>
>-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>\
>-Dhigh-availability.storageDir=hdfs://datacluster/flink/recovery \
>-Dkubernetes.namespace=flink \
>-Dkubernetes.service-account=flink-sa \
>-Denv.hadoop.conf.dir=/opt/flink/conf \
>-Dkubernetes.container.image.pull-policy=Always \
>local:///opt/flink/usrlib/test.jar
>
>*我已经复制了core-site.xml 和hdfs-site.xml到$FLINK_HOME/conf下,  目录结构如下*
>flink@e3187a41a139:~$ ls conf
>core-site.xml hdfs-site.xml log4j-console.properties
>log4j-session.properties logback-session.xml masters zoo.cfg
>flink-conf.yaml log4j-cli.properties log4j.properties logback-console.xml
>logback.xml workers
>
>*但是遇到了下面的报错:*
>
>2022-09-21 10:17:40,156 ERROR
>org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Could not
>start cluster entrypoint KubernetesApplicationClusterEntrypoint.
>org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
>initialize the cluster entrypoint KubernetesApplicationClusterEntrypoint.
>at
>org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:250)
>~[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:711)
>[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:86)
>[flink-dist-1.15.2.jar:1.15.2]
>Caused by: org.apache.flink.util.FlinkException: Could not create the ha
>services from the instantiated HighAvailabilityServicesFactory
>org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
>at
>org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:287)
>~[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:143)
>~[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:427)
>~[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:376)
>~[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:277)
>~[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:227)
>~[flink-dist-1.15.2.jar:1.15.2]
>at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
>at javax.security.auth.Subject.doAs(Unknown Source) ~[?:?]
>at
>org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1898)
>~[flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar:3.1.1.7.2.9.0-173-9.0]
>at
>org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>~[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:224)
>~[flink-dist-1.15.2.jar:1.15.2]
>... 2 more
>Caused by: java.io.IOException: Could not create FileSystem for highly
>available storage path (hdfs://datacluster/flink/recovery/cruiser)
>at
>org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:102)
>~[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:86)
>~[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.createHAServices(KubernetesHaServicesFactory.java:53)
>~[flink-dist-1.15.2.jar:1.15.2]
>at
>org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createCustomHAServices(HighAvailabilityServicesUtils.java:284)
>~[flink-dist-1.15.2.jar:1.1

Re:Re: 某作业计算算子处于busy状态

2022-09-21 文章 Xuyang
Hi, 可以尝试下使用Arthas+jmap的方式定位可能出现内存泄露的原因







--

Best!
Xuyang





在 2022-09-21 13:40:32,"杨扬"  写道:
>flink内存泄漏有什么排查的指标或者工具吗?
>比如大致定位泄漏的位置之类的。
>
>
>
>
>
>> 在 2022年9月19日,下午5:41,yidan zhao  写道:
>> 
>> 那你代码检查下有没有内存泄露呢。
>> 
>> 杨扬  于2022年9月19日周一 11:21写道:
>>> 
>>> 还有一个现象,观察到 
>>> taskHeap内存占用在逐步升高,作业刚启动的时候占用在10%左右,一周后增加至25%左右,两周后增加至50%左右,上述指的是GC后观察到的内存占用值。两周后计算算子几乎一直100%busy状态,端到端延迟已经达到了10s左右,作业已经不可用需要重启了。
>>> 
>>> 
>>> 
>>> 
>>>> 在 2022年9月15日,下午8:58,yidan zhao  写道:
>>>> 
>>>> 本身低延迟一定程度上就是靠“资源低利用率”实现的。资源高利用率情况,就是尽可能满负荷够用就行的意思。
>>>> 
>>>> yidan zhao  于2022年9月15日周四 20:57写道:
>>>>> 
>>>>> 资源足够,busy 50%+,延迟如果也可接受的话,其实就不算问题。2s延迟不算高。
>>>>> 
>>>>> 杨扬  于2022年9月15日周四 20:02写道:
>>>>>> 
>>>>>> 目前并发度已经设定为25,每个slot内存为4G,已经使用100G内存,峰值流量1TPS左右,资源是足够的吧?
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>> 在 2022年9月15日,下午7:27,yidan zhao  写道:
>>>>>>> 
>>>>>>> busy那就提升并发度看看效果?
>>>>>>> 
>>>>>>> 杨扬 mailto:yangya...@cupdata.com>> 于2022年9月15日周四 
>>>>>>> 14:51写道:
>>>>>>> 各位好!
>>>>>>> 目前有一flink作业,大致分为3个阶段:
>>>>>>> 读取kafka中数据(1个source,并行度3)-> 进行数据筛选和条件判断(没有窗口操作,并行度25)-> 
>>>>>>> 结果写入kafka(20多个sink,每个sink并行度3)。可参考附件图片。
>>>>>>> 
>>>>>>> 目前存在的问题是:作业在运行一段时间后,中间25并行度的一系列计算算子会变为busy状态(会达到50%以上),端到端的信息延迟增加,偶尔延迟会达到2秒以上。此时作业日志并没有报错、异常、告警等信息。
>>>>>>> 
>>>>>>> 
>>>>>>> 上述问题因为没有日志异常告警信息,本人有些无从下手解决。猜测是否因为sink数据量太多且每个sink并行度都是3会导致中间25个并行度的一系列算子和sink之间的交互产生大量shuffle引起?望各位大佬帮忙分析一下这个问题
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> ===
>>>>>>> 此邮件已由 Deep Discovery Email Inspector 进行了分析。
>>>>>> 
>>>> 
>>>> ===
>>>> 此邮件已由 Deep Discovery Email Inspector 进行了分析。
>>> 
>> 
>> === 
>> 此邮件已由 Deep Discovery Email Inspector 进行了分析。
>


Re:flink实时双流驱动join问题

2022-09-19 文章 Xuyang
Hi, 看上去这种情况只能使用inner join来实现,state很大的话有考虑过用FsStateBackend或者RocksDB 
StateBackend来存储state么?




--

Best!
Xuyang





在 2022-09-17 10:59:16,"casel.chen"  写道:
>请教一个flink实现实时双流驱动join问题:
>
>
>order cdc流字段:order_id, order_status, order_time, user_id (order_id是主键)
>user cdc流字段:user_id, user_name, user_phone, user_address(user_id是主键)
>关联结果流字段:order_id, order_status, order_time, user_name, user_phone, 
>user_address(order_id是主键)
>期望当order流数据更新或user流数据更新时,关联结果流数据都会得到更新。inner join不满足是因为两条流distinct 
>id都很大,状态会很大,且不能TTL,因为user流更新时间不定,短的几小时,长达上月。
>
>
>请问这种场景下要如何使用flink实现实时双流驱动join?


Re:flink-sql-connector-hbase-2.2模块在shade打包时遗漏了commons-io依赖

2022-09-19 文章 Xuyang
Hi,可以向社区jira[1]报告一下这个bug,详细记录下flink版本、错误信息等。
可以通过手动修改flink源码下hbase connector pom文件,并重新编译打包的方式来快速fix这个bug。




[1] https://issues.apache.org/jira/projects/FLINK/issues




--

Best!
Xuyang





在 2022-09-16 09:34:02,"junjie.m...@goupwith.com"  写道:
>flink-sql-connector-hbase-2.2模块在shade打包时遗漏了commons-io依赖,导致当使用hbase 
>connector时开启lookup.async=true后,当执行结束时调用HBaseRowDataAsyncLookupFunction类的close()方法时调用asyncConnection.close()内部报错java.lang.ClassNotFoundException:
> org.apache.flink.hbase.shaded.org.apache.commons.io.IOUtils。


Re:Flink+Hudi:java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V

2022-09-19 文章 Xuyang
Hi, 
看起来像是这几个项目中的版本并不适配,导致com.google.common.base.Preconditions这个类版本冲突导致的,可以尝试下将这个包在flink和hudi中shade一下试试













--

Best!
Xuyang





At 2022-09-14 09:27:45, "Summer"  wrote:
>
>版本:Flink1.13.3、Hudi0.10.1、Hive3.1.2、Hadoop3.2.1
>
>
>编译:Hudi:mvn clean package -DskipITs  -Dmaven.test.skip=true 
>-Dhadoop.version=3.2.1  -Pflink-bundle-shade-hive3
>
>Flink-SQL-HIVE:mvn clean install -Dfast -Dhadoop.version=3.2.1   -Dscala-2.11 
>-DskipTests  -Dfast -T 4 -Dmaven.compile.fork=true -Dmaven.javadoc.skip=true 
>-Dcheckstyle.skip=true
>
>
>
>启动:./sql-client.sh embedded -j ../lib/hudi-flink-bundle_2.11-0.10.1-rc1.jar
>
>Lib目录:
>
>
>[root@rhy-t-bd-java lib]# ll
>total 271504
>-rw-r--r-- 1 root  root  92313 Oct 12  2021 flink-csv-1.13.3.jar
>-rw-r--r-- 1 root  root  106535831 Oct 12  2021 flink-dist_2.12-1.13.3.jar
>-rw-r--r-- 1 root  root 148127 Oct 12  2021 flink-json-1.13.3.jar
>-rwxrwxrwx 1 root  root7709740 Jun  8  2021 
>flink-shaded-zookeeper-3.4.14.jar
>-rw-r--r-- 1 stack wheel  48845196 Sep 13 18:43 
>flink-sql-connector-hive-3.1.2_2.11-1.13.2.jar
>-rw-r--r-- 1 root  root   35051553 Oct 12  2021 flink-table_2.12-1.13.3.jar
>-rw-r--r-- 1 root  root   38613339 Oct 12  2021 
>flink-table-blink_2.12-1.13.3.jar
>-rw-r--r-- 1 root  root   38955252 Sep 13 17:20 
>hudi-flink-bundle_2.11-0.10.1-rc1.jar
>-rwxrwxrwx 1 root  root  67114 Mar 31  2021 log4j-1.2-api-2.12.1.jar
>-rwxrwxrwx 1 root  root 276771 Mar 31  2021 log4j-api-2.12.1.jar
>-rwxrwxrwx 1 root  root1674433 Mar 31  2021 log4j-core-2.12.1.jar
>-rwxrwxrwx 1 root  root  23518 Mar 31  2021 log4j-slf4j-impl-2.12.1.jar
>
>
>Flink-SQL:CREATE TABLE paat_hudi_flink_test(
>
>id bigint ,
>
>name string,
>
>birthday TIMESTAMP(3),
>
>tsTIMESTAMP(3),
>
>partition VARCHAR(20),
>
>primary key(id) not enforced -- the uuid primary key must be specified
>
>)
>
>PARTITIONED BY (partition)
>
>with(
>
>'connector'='hudi',
>
>'path' = 
>'hdfs://emr-cluster/user/hive/hudi/warehouse/ods_hudi.hudi_flink_test/'
>
>, 'hoodie.datasource.write.recordkey.field' = 'id'
>
>, 'write.precombine.field' = 'ts'
>
>, 'write.tasks' = '1'
>
>, 'compaction.tasks' = '1'
>
>, 'write.rate.limit' = '2000'
>
>, 'table.type' = 'MERGE_ON_READ'
>
>, 'compaction.async.enable' = 'true'
>
>, 'compaction.trigger.strategy' = 'num_commits'
>
>, 'compaction.max_memory' = '1024'
>
>, 'changelog.enable' = 'true'
>
>, 'read.streaming.enable' = 'true'
>
>, 'read.streaming.check-interval' = '4'
>
>, 'hive_sync.enable' = 'true'
>
>, 'hive_sync.mode'= 'hms'
>
>, 'hive_sync.metastore.uris' = 'thrift://:9083'
>, 'hive_sync.jdbc_url' = 'jdbc:hive2://
>
>, 'hive_sync.jdbc_url' = 'jdbc:hive2://:1'
>, 'hive_sync.table' = 'hudi_flink_test'
>, 'hive_sync.db' = 'ods_hudi'
>, 'hive_sync.username' = '
>
>, 'hive_sync.table' = 'hudi_flink_test'
>
>, 'hive_sync.db' = 'ods_hudi'
>
>, 'hive_sync.username' = ''
>, 'hive_sync.password' = '
>
>, 'hive_sync.password' = '*^'
>
>, 'hive_sync.support_timestamp' = 'true'
>
>);
>
>Query: select * from paat_hudi_flink_test;
>
>
>
>
>出现错误:2022-09-13 18:45:29,203 INFO  
>org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor [] - The 
>RpcEndpoint jobmanager_2 failed.
>org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: Could not start 
>RpcEndpoint jobmanager_2.
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:610)
>  ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:180)
>  ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>at akka.actor.Actor.aroundReceive(Actor.scala:517) 
> ~[flink-dist_2.12-1.13.3.jar:1.13.3]
>at akka.actor.Actor.aroundReceive$(Acto

Re:Re: 关于 UDAF 里面 ListView 的疑问

2022-09-09 文章 Xuyang
Hi,可以尝试下在createAccumulator中打个断点,然后一步步看为啥在getValue的时候acc变成null了。


我理解如果是“使用 ListView 时,无法正常获得 TypeInference”的话,应该报错,而不应该正确执行但是后面突然null了。 
如果确定是某个地方发生了问题的话,可以在jira里贴一个issue[1]向社区反馈这个问题 ;)


[1]https://issues.apache.org/jira/projects/FLINK/summary




--

Best!
Xuyang





在 2022-09-08 10:48:10,"Zhiwen Sun"  写道:
>hi,
>
>感谢你的回复。
>
>报错是在 getValue 的时候。
>
>   at GroupAggsHandler$439.getValue(Unknown Source)
>   at 
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:146)
>
>
>
>我的疑问是使用一个 Class 包装下 ListView 就能正常工作,而直接使用 ListView 是会报错。
>
>比如使用  AggregateFunction  就正常,而使用
>AggregateFunctionListView>  就会 NPE。
>
>
>我怀疑使用 ListView 时,无法正常获得 TypeInference。
>
>
>Zhiwen Sun
>
>
>
>On Wed, Sep 7, 2022 at 11:46 PM Xuyang  wrote:
>
>> Hi,
>> 理论上来说,在你的case中,会先通过createAccumulator方法创建一个ListView作为acc,然后,每一个输入的row都会触发accumulate方法,将数据更新到刚才的acc中,最终通过getValue方法拿到当前acc的值。
>>
>>
>>
>>
>> 实际测试中的NPE发生在更新acc的时候还是getValue的时候呢?可以通过在这三个阶段设一下断点,分别看一下当前持有的acc是不是同一个对象
>>
>>
>>
>>
>> --
>>
>> Best!
>> Xuyang
>>
>>
>>
>>
>>
>> 在 2022-09-07 16:23:25,"Zhiwen Sun"  写道:
>>
>> Hi,
>> 理论上来说,在你的case中,会先通过createAccumulator方法创建一个ListView作为acc,然后,每一个输入的row都会触发accumulate方法,将数据更新到刚才的acc中,最终通过getValue方法拿到当前acc的值。实际测试中的NPE发生在更新acc的时候还是getValue的时候呢?可以通过在这三个阶段设一下断点,分别看一下当前持有的acc是不是同一个对象


Re:Re: 关于flink table store的疑问

2022-09-09 文章 Xuyang
Hi,我理解Flink table store主要有以下几个优势:
1、减少架构复杂性,不需要额外引入多余的组件
2、支持Flink计算中直接使用Flink table store的存储
3、毫秒级流式查询和olap能力




--

Best!
Xuyang





在 2022-09-08 16:09:39,"r pp"  写道:
>应该是为了 流批一体 。不丢数据
>
>Kyle Zhang  于2022年9月8日周四 08:37写道:
>
>> Hi all,
>>   看table
>> store的介绍也是关于数据湖存储以及用于实时流式读取的,那在定位上与iceberg、hudi等项目有什么不一样么,为什么要再开发一个项目?
>>
>> Best.
>>
>
>
>-- 
>Best,
>  pp


Re:这里为什么会报null指针错误,和源表数据有关系吗?

2022-09-09 文章 Xuyang
Hi,看上去是遇到了一条脏数据,问一下是在运行了一段时间之后突然报错的嘛?







--

Best!
Xuyang





At 2022-09-09 11:46:47, "Asahi Lee"  wrote:
>2022-09-09 11:36:42,866 INFO 
>org.apache.flink.runtime.executiongraph.ExecutionGraph   
>[] - Source: HiveSource-ods_jt_hrs.ods_hrmis_HR_EMPL_Education (1/1) 
>(2a68412dab3602a1eeda5a750b308e23) switched from RUNNING to FAILED on 
>container_1658144991761_106260_01_02 @ hhny-cdh05 (dataPort=45015).
>java.lang.RuntimeException: null
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:97)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.api.operators.source.NoOpTimestampsAndWatermarks$TimestampsOnlyOutput.collect(NoOpTimestampsAndWatermarks.java:91)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:45)
>  ~[flink-table_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.connector.file.src.impl.FileSourceRecordEmitter.emitRecord(FileSourceRecordEmitter.java:35)
>  ~[flink-table_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
>  ~[flink-table_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) 
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_181]
>Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.table.data.writer.BinaryWriter.write(BinaryWriter.java:118) 
> ~[flink-table_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.toBinaryRow(RowDataSerializer.java:204)
>  ~[flink-table_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:103)
>  ~[flink-table_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.serialize(RowDataSerializer.java:48)
>  ~[flink-table_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:168)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
>  ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>   at 
> or

Re:flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游, 设计的考虑是啥?

2022-09-09 文章 Xuyang
Hi,主要是pull模型:下游主动拉取上游的数据。可以在下游的消费能力达到极限时,通过反压机制,让上游减少生产的数据。







--

Best!
Xuyang





在 2022-09-09 19:04:27,"郑 致远"  写道:
>各位大佬好
>请教下,
>flink 的数据传输,是上游算子推给下游, 还是下游算子拉取上游,  这么设计的考虑是啥呢?


Re:关于 UDAF 里面 ListView 的疑问

2022-09-07 文章 Xuyang
Hi,  
理论上来说,在你的case中,会先通过createAccumulator方法创建一个ListView作为acc,然后,每一个输入的row都会触发accumulate方法,将数据更新到刚才的acc中,最终通过getValue方法拿到当前acc的值。




实际测试中的NPE发生在更新acc的时候还是getValue的时候呢?可以通过在这三个阶段设一下断点,分别看一下当前持有的acc是不是同一个对象




--

Best!
Xuyang





在 2022-09-07 16:23:25,"Zhiwen Sun"  写道:

Hi,  
理论上来说,在你的case中,会先通过createAccumulator方法创建一个ListView作为acc,然后,每一个输入的row都会触发accumulate方法,将数据更新到刚才的acc中,最终通过getValue方法拿到当前acc的值。实际测试中的NPE发生在更新acc的时候还是getValue的时候呢?可以通过在这三个阶段设一下断点,分别看一下当前持有的acc是不是同一个对象

Re:flink table API使用

2022-09-05 文章 Xuyang
Hi, 可以类似这样写 “.filter($("a").isGreater(10)) "。 更多的使用方法可以参考[1]




[1] 
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java




--

Best!
Xuyang





在 2022-09-05 20:53:03,"小昌同学"  写道:
>
>
>Table result = kafka_item.groupBy($("trans_number"))   
>.select($("trans_number"),$("sales_amount").sum().as("sum_amount"))
>   .filter($("sum_amount "));
>各位大佬  请教一个问题  我这边想通过flink table API 达到这样一个效果:
>根据trans_number进行分组  然后对另一个字段进行sum计算  然后我想最后进行过滤的时候 过滤出来这个sum值大于100的
>我这个后续怎么使用API啊  这个filter算子咋用呀
>| |
>小昌
>|
>|
>ccc0606fight...@163.com
>|


Re:Flink1.15.1读取Tidb6,sql-client执行select count异常 java.sql.SQLSyntaxErrorException

2022-09-01 文章 Xuyang
Hi,可以贴一下log目录下的sql 
client日志吗?另外问一下,你是用的jdbc的connector嘛?你select的表名是`origin_object_data_61`?
在 2022-09-01 20:18:05,"RS"  写道:
>Hi,
>环境:
>flink-1.15.1
>TiDB-v6.1.0
>
>
>现象:
>Flink SQL> select count(*) from t1;
>
>[ERROR] Could not execute SQL statement. Reason:
>
>java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check 
>the manual that corresponds to your TiDB version for the right syntax to use 
>line 1 column 12 near "FROM `origin_object_data_61`" 
>
>执行失败了
>
>
>Flink SQL> select * from t1 limit 3;
>执行成功了,有结果返回
>
>
>请教下各位,为什么count不能执行,select字段就可以执行??
>
>
>Thanks
>
>
>


Re:【flink native k8s】HA配置 taskmanager pod一直重启

2022-08-30 文章 Xuyang
Hi, 能贴一下TM的日志吗,看Warn的日志貌似是TM一直起不来
在 2022-08-30 03:45:43,"Wu,Zhiheng"  写道:
>【问题描述】
>启用HA配置之后,taskmanager pod一直处于创建-停止-创建的过程,无法启动任务
>
>1. 任务配置和启动过程
>
>a)  修改conf/flink.yaml配置文件,增加HA配置
>kubernetes.cluster-id: realtime-monitor
>high-availability: 
>org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>high-availability.storageDir: file:///opt/flink/checkpoint/recovery/monitor
>// 这是一个NFS路径,以pvc挂载到pod
>
>b)  先通过以下命令创建一个无状态部署,建立一个session集群
>
>./bin/kubernetes-session.sh \
>
>-Dkubernetes.secrets=cdn-res-bd-keystore:/opt/flink/kafka/res/keystore/bd,cdn-res-bd-truststore:/opt/flink/kafka/res/truststore/bd,cdn-res-bj-keystore://opt/flink/kafka/res/keystore/bj,cdn-res-bj-truststore:/opt/flink/kafka/res/truststore/bj
> \
>
>-Dkubernetes.pod-template-file=./conf/pod-template.yaml \
>
>-Dkubernetes.cluster-id=realtime-monitor \
>
>-Dkubernetes.jobmanager.service-account=wuzhiheng \
>
>-Dkubernetes.namespace=monitor \
>
>-Dtaskmanager.numberOfTaskSlots=6 \
>
>-Dtaskmanager.memory.process.size=8192m \
>
>-Djobmanager.memory.process.size=2048m
>
>c)  最后通过web ui提交一个jar包任务,jobmanager 出现如下日志
>
>2022-08-29 23:49:04,150 INFO  
>org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod 
>realtime-monitor-taskmanager-1-13 is created.
>
>2022-08-29 23:49:04,152 INFO  
>org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Pod 
>realtime-monitor-taskmanager-1-12 is created.
>
>2022-08-29 23:49:04,161 INFO  
>org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received new 
>TaskManager pod: realtime-monitor-taskmanager-1-12
>
>2022-08-29 23:49:04,162 INFO  
>org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
>Requested worker realtime-monitor-taskmanager-1-12 with resource spec 
>WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), 
>taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes), 
>managedMemSize=0 bytes, numSlots=6}.
>
>2022-08-29 23:49:04,162 INFO  
>org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Received new 
>TaskManager pod: realtime-monitor-taskmanager-1-13
>
>2022-08-29 23:49:04,162 INFO  
>org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
>Requested worker realtime-monitor-taskmanager-1-13 with resource spec 
>WorkerResourceSpec {cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), 
>taskOffHeapSize=0 bytes, networkMemSize=711.680mb (746250577 bytes), 
>managedMemSize=0 bytes, numSlots=6}.
>
>2022-08-29 23:49:07,176 WARN  
>org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
>Reaching max start worker failure rate: 12 events detected in the recent 
>interval, reaching the threshold 10.00.
>
>2022-08-29 23:49:07,176 INFO  
>org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
>Will not retry creating worker in 3000 ms.
>
>2022-08-29 23:49:07,176 INFO  
>org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
>Worker realtime-monitor-taskmanager-1-12 with resource spec WorkerResourceSpec 
>{cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 
>bytes, networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes, 
>numSlots=6} was requested in current attempt and has not registered. Current 
>pending count after removing: 1.
>
>2022-08-29 23:49:07,176 INFO  
>org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
>Worker realtime-monitor-taskmanager-1-12 is terminated. Diagnostics: Pod 
>terminated, container termination statuses: [flink-main-container(exitCode=1, 
>reason=Error, message=null)], pod status: Failed(reason=null, message=null)
>
>2022-08-29 23:49:07,176 INFO  
>org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
>Requesting new worker with resource spec WorkerResourceSpec {cpuCores=6.0, 
>taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 bytes, 
>networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes, 
>numSlots=6}, current pending count: 2.
>
>2022-08-29 23:49:07,514 WARN  
>org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
>Reaching max start worker failure rate: 13 events detected in the recent 
>interval, reaching the threshold 10.00.
>
>2022-08-29 23:49:07,514 INFO  
>org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
>Worker realtime-monitor-taskmanager-1-13 with resource spec WorkerResourceSpec 
>{cpuCores=6.0, taskHeapSize=6.005gb (6447819631 bytes), taskOffHeapSize=0 
>bytes, networkMemSize=711.680mb (746250577 bytes), managedMemSize=0 bytes, 
>numSlots=6} was requested in current attempt and has not registered. Current 
>pending count after removing: 1.
>
>2022-08-29 23:49:07,514 INFO  
>org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - 
>Worker realtime-monitor-taskmanager-1-13 is terminated. Diagnostics: Pod 
>terminated, container termination statuses: 

Re:flink作业生成保存点失败

2022-08-30 文章 Xuyang
Hi,看起来这个报错是用于输出信息的文件找不到了,可以尝试加一下这个配置再试一下“taskmanager.log.path”,找一下导致tasks超时的根本原因。
还可以试一下用火焰图或jstack查看一下那几个tasks超时的时候是卡在哪个方法上。










--

Best!
Xuyang





Hi,看起来这个报错是用于输出信息的文件找不到了,可以尝试加一下这个配置再试一下“taskmanager.log.path”,找一下导致tasks超时的根本原因。还可以试一下用火焰图或jstack查看一下那几个tasks超时的时候是卡在哪个方法上。
在 2022-08-29 16:19:15,"casel.chen"  写道:
>有一个线上flink作业在人为主动创建保存点时失败,作业有两个算子:从kafka读取数据和写到mongodb,都是48个并行度,出错后查看到写mongodb算子一共48个task,完成了45个,还有3个tasks超时(超时时长设为3分钟),正常情况下完成一次checkpoint要4秒,状态大小只有23.7kb。出错后,查看作业日志如下。在创建保存点失败后作业周期性的检查点生成也都失败了(每个算子各有3个tasks超时)。使用的是FileStateBackend,DFS用的是阿里云oss。请问出错会是因为什么原因造成的?
>
>
>+5
>[2022-08-29 15:38:32]
>content: 
>2022-08-29 15:38:32,617 ERROR 
>org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler 
>[] - Failed to transfer file from TaskExecutor 
>sqrc-session-prod-taskmanager-1-30.
>+6
>[2022-08-29 15:38:32]
>content: 
>java.util.concurrent.CompletionException: 
>org.apache.flink.util.FlinkException: The file STDOUT does not exist on the 
>TaskExecutor.
>+7
>[2022-08-29 15:38:32]
>content: 
>at 
>org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$24(TaskExecutor.java:2064)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>+8
>[2022-08-29 15:38:32]
>content: 
>at 
>java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> ~[?:1.8.0_312]
>+9
>[2022-08-29 15:38:32]
>content: 
>at 
>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_312]
>+10
>[2022-08-29 15:38:32]
>content: 
>at 
>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_312]
>+11
>[2022-08-29 15:38:32]
>content: 
>at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312]
>+12
>[2022-08-29 15:38:32]
>content: 
>Caused by: org.apache.flink.util.FlinkException: The file STDOUT does not 
>exist on the TaskExecutor.
>+13
>[2022-08-29 15:38:32]
>content: 
>... 5 more
>+14
>[2022-08-29 15:38:32]
>content: 
>2022-08-29 15:38:32,617 ERROR 
>org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler 
>[] - Unhandled exception.
>+15
>[2022-08-29 15:38:32]
>content: 
>org.apache.flink.util.FlinkException: The file STDOUT does not exist on the 
>TaskExecutor.
>+16
>[2022-08-29 15:38:32]
>content: 
>at 
>org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$24(TaskExecutor.java:2064)
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>+17
>[2022-08-29 15:38:32]
>content: 
>at 
>java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> ~[?:1.8.0_312]
>+18
>[2022-08-29 15:38:32]
>content: 
>at 
>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> ~[?:1.8.0_312]
>+19
>[2022-08-29 15:38:32]
>content: 
>at 
>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ~[?:1.8.0_312]
>+20
>[2022-08-29 15:38:32]
>content: 
>at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312]


Re:Re: flink1.15.1 stop 任务失败

2022-08-23 文章 Xuyang
Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的







--

Best!
Xuyang





Hi, TM上有报错信息嘛?有的话可以贴出来看一下是什么导致cp失败的
在 2022-08-23 20:41:59,"yidan zhao"  写道:
>补充部分信息:
>看日志,如果是 flink savepoint xxx 这样触发检查点,JM的日志很简单:
>2022-08-23 20:33:22,307 INFO
>org.apache.flink.runtime.jobmaster.JobMaster [] -
>Triggering savepoint for job 8d231de75b8227a1b
>715b1aa665caa91.
>
>2022-08-23 20:33:22,318 INFO
>org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
>Triggering checkpoint 5 (type=SavepointType{na
>me='Savepoint', postCheckpointAction=NONE, formatType=CANONICAL}) @
>1661258002307 for job 8d231de75b8227a1b715b1aa665caa91.
>
>2022-08-23 20:33:23,701 INFO
>org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream
>[] - Cannot create recoverable writer
> due to Recoverable writers on Hadoop are only supported for HDFS,
>will use the ordinary writer.
>
>2022-08-23 20:33:23,908 INFO
>org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
>Completed checkpoint 5 for job 8d231de75b8227a1b715b1aa665caa91
>(1638207 bytes, checkpointDuration=1600 ms, finalizationTime=1 ms).
>
>
>如果是 stop xxx 这样停止任务,则JM日志(错误)如下:
>
>2022-08-23 20:35:01,834 INFO
>org.apache.flink.runtime.jobmaster.JobMaster [] -
>Triggering stop-with-savepoint for job
>8d231de75b8227a1b715b1aa665caa91.
>
>2022-08-23 20:35:01,842 INFO
>org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
>Triggering checkpoint 6 (type=SavepointType{name='Suspend Savepoint',
>postCheckpointAction=SUSPEND, formatType=CANONICAL}) @ 1661258101834
>for job 8d231de75b8227a1b715b1aa665caa91.
>
>2022-08-23 20:35:02,083 INFO
>org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] -
>Decline checkpoint 6 by task a65383dad01bc15f654c4afe4aa63b6d of job
>8d231de75b8227a1b715b1aa665caa91 at 10.35.95.150:13151-3dfdc5 @
>xxx.xxx.com (dataPort=13156).
>(此处看起来是被decline了,原因是 task failed?)
>org.apache.flink.util.SerializedThrowable: Task name with subtask :
>Source: XXX_Kafka(startTs:latest) ->... ->... ->... (10/10)#2 Failure
>reason: Task has failed.
>at 
> org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1388)
>~[flink-dist-1.15.1.jar:1.15.1]
>at 
> org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1331)
>~[flink-dist-1.15.1.jar:1.15.1]
>at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
>~[?:1.8.0_251]
>at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
>~[?:1.8.0_251]
>at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>~[?:1.8.0_251]
>at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
>~[?:1.8.0_251]
>at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:343)
>~[flink-dist-1.15.1.jar:1.15.1]
>Caused by: org.apache.flink.util.SerializedThrowable:
>org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
>at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>~[?:1.8.0_251]
>at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>~[?:1.8.0_251]
>at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
>~[?:1.8.0_251]
>at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
>~[?:1.8.0_251]
>... 3 more
>Caused by: org.apache.flink.util.SerializedThrowable
>at 
> org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
>~[?:?]
>at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
>~[?:?]
>at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:1002)
>~[?:?]
>at 
> org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
>~[flink-dist-1.15.1.jar:1.15.1]
>at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:305)
>~[flink-dist-1.15.1.jar:1.15.1]
>at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:285)
>~[flink-dist-1.15.1.jar:1.15.1]
>at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
&g

Re:Re:Re:flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?

2022-08-22 文章 Xuyang
Hi, 文档[1] 记录的是类似mysql (cancel json) -> kafka -> flink -> other db 
的行为,主要还是侧重于flink 读canal format。


中间的转换需要自己实现下,可以在udf中通过open方法连一下mongodb拿一下,因为目前udf是感知不到catalog。





[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/formats/canal/#%e5%a6%82%e4%bd%95%e4%bd%bf%e7%94%a8-canal-format




--

Best!
    Xuyang





在 2022-08-23 08:55:44,"casel.chen"  写道:
>数据流图是 mongodb --> flink cdc --> kafka (canal json)
>看了flink cdc解析出的mongodb oplog转成json字符串是下面这样子[1],而下游需要从kafka消费canal 
>json格式的消息,中间的格式转换得自己实现是么?
>但mongodb oplog是不带schema信息的,而且没有canal中的old字段信息,这块信息要怎么转换呢?
>
>
>另,我用flink sql如下往kafka发送canal json格式数据是不完整的[2],并不是一个标准的canal 
>json数据[3]。这是已知的issue么?
>
>
>CREATETABLE 
>mongo_customers(_idSTRING,customer_idINT,nameSTRING,addressSTRING,PRIMARYKEY(_id)NOTENFORCED)WITH('connector'='mongodb-cdc','hosts'='localhost:27017','username'='mongouser','password'='mongopw','database'='mgdb','collection'='customers');
>CREATETABLE 
>kafka_customers(_idSTRING,customer_idINT,nameSTRING,addressSTRING,PRIMARYKEY(_id)NOTENFORCED)WITH('connector'='upsert-kafka','topic'='customers','properties.bootstrap.servers'='localhost:9092',
>'format'='canal-json');
>INSERT INTO kafka_customers SELECT * FROM mongo_customers;
>
>
>[1]
>{
>"_id": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}, \"copyingData\": 
>true}",
>"operationType": "insert",
>"fullDocument": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}, 
>\"_class\": \"com.huifu.uqp.dal.mongo.model.TopTransOrder\", \"reqSeqId\": 
>\"123\", \"ordId\": \"1440509760709632000\", \"outTransId\": \"123\", 
>\"merOrdId\": \"123\", \"hfSeqId\": \"123\", \"partyOrderId\": \"123\", 
>\"bankSeqId\": \"123\", \"orgOrdId\": \"123\", \"orgTermOrdId\": \"123\", 
>\"orgHuifuSeqId\": \"123\", \"transDate\": \"20210913\", \"productId\": 
>\"app8\", \"serviceId\": \"6767639\", \"topAgentId\": \"123\", 
>\"belongAgentId\": \"123\", \"chainsId\": \"123\", \"huifuId\": 
>\"84552350\", \"transMajorCategory\": \"123\", 
>\"consoleActualPayChannel\": \"123\", \"consolePayType\": \"123\", 
>\"consolePreAuthFlag\": \"123\", \"consoleSubsidyFlag\": \"123\", 
>\"consoleDcType\": \"123\", \"consoleIsFq\": \"123\", \"consoleAcctDivFlag\": 
>\"123\", \"actualPayChannel\": \"123\", \"payChannel\": \"123\", 
>\"transType\": \"123\", \"payType\": \"123\", \"dcType\": \"123\", 
>\"isAcctDiv\": \"123\", \"isDelayAcct\": \"123\", \"creditType\": \"123\", 
>\"devsId\": \"123\", \"ordAmt\": 123.32, \"feeAmt\": 123.0, \"actOrdAmt\": 
>123.0, \"actualRefAmt\": 123.0, \"refAmt\": 123.0, \"refFeeAmt\": 123.0, 
>\"subsidyAmt\": 123.0, \"subsidyRefAmt\": 123.0, \"payCardId\": \"123\", 
>\"feeRecType\": \"123\", \"feeFlag\": \"123\", \"transStat\": \"S\", 
>\"createTime\": {\"$date\": 1632279264987}, \"transFinishTime\": \"123\", 
>\"kafkaTime\": \"123\", \"tableName\": \"123\", \"offset\": \"123\", 
>\"recordVersion\": \"123\", \"sign\": \"123\"}",
>"source": {
>"ts_ms": 0,
>"snapshot": "true"
>},
>"ns": {
>"db": "amp_test",
>"coll": "TopTransOrder"
>},
>"to": null,
>"documentKey": "{\"_id\": {\"$oid\": \"614a9ae069736f5fcc48fe59\"}}",
>"updateDescription": null,
>"clusterTime": null,
>"txnNumber": null,
>"lsid": null
>}
>
>
>
>
>[2]
>{"data":[{"_id":"614a9b3769736f5fcc492613","id":null,"reqSeqId":"123","ordId":"1440510124339011584","outTransId":"123",&qu

Re:flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?

2022-08-22 文章 Xuyang
Hi, 请问你的需求是 “debezium数据”- flink -“canal ”么? 
如果是这样的话,可以用UDF[1]来尝试下。[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/
在 2022-08-21 10:49:29,"casel.chen"  写道:
>flink cdc如何将捕获变更记录转成canal json格式输出到下游kafka?
>flink cdc获取的是debezium格式记录(用的是 JsonDebeziumDeserializationSchema),要如何转换成canal 
>json格式输出呢?有没有例子或关键代码展示?谢谢!


Re:flink sql 实现 查询phoenix count 语法

2022-08-15 文章 Xuyang
Hi, 
现在好像没有count相关的下沉逻辑,目前应该只实现了关于filter、limit、partition、projection等的source下沉。具体可以参考下[1]等等




[1] 
https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java













--

Best!
Xuyang





在 2022-08-09 16:55:47,"悟空"  写道:
>大家好, 我目前在使用的flink 是1.12.3版本,
>目前场景是这样的, source table 一共两张, 一张kafka 表 source_table_kafka 一张 phoenix表 
>source_table_phoenix ,我想通过kafka流表 join phoenix 然后count 之后的结果 sink 
>到kafka表中,其中count 查询 想下探到phoenix 完成 。请问这种可以实现吗?
>
>
>select stp.number_no, count(*) as stat_number from source_table_kafka 
>stk join source_table_phoenixFOR SYSTEM_TIME AS OF stk.`PROCTIME` 
>as stp on stk.id = stp.id group by stp.number_no
>
>
>
>
>请问 如何实现这种逻辑,想kafka里的消息每来一条 都下沉到phoenix中查询 去做count, 目前我已经实现了 phoenix 
>相关的connector 支持 SQL 语法


Re:Does flink sql support UDTAGG

2022-08-07 文章 Xuyang
Hi, what you want is UDAF? Please check whether this[1] is meet your 
requirement.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#aggregate-functions



在 2022-08-07 22:06:29,"wang" <24248...@163.com> 写道:

Hi dear engineers,


One small question:  does flink sql support UDTAGG? (user-defined table 
aggregate function), seems only supported in flink table api? If not supported 
in flink sql, how can I define an aggregated udf which could output multiple 
rows to kafka.


Thanks for your help!




Regards,
Hunk

Re:flink sql解析kafka数据

2022-07-04 文章 Xuyang
Hi, 目前我在flink 
master上没找到这个参数'json.infer-schema.flatten-nested-columns.enable'='true'。

你可以试一下在source读完整数据,然后通过UDF手动展开潜逃类型。



在 2022-06-30 15:02:55,"小昌同学"  写道:

各位大佬  请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型  我这边想直接通过flink sql建表语句拿到最里面的字段的值
我百度找到了 'json.infer-schema.flatten-nested-columns.enable'='true'  但是我在客户端执行的时候  
发现识别不到这个字段  
有大佬遇到我这样的问题嘛 或者有啥其他的解决法子嘛
CREATE TABLE ccc_test_20220630_2
(  
   trans_number   STRING,
   end_timestamp  STRING,
   return_flagSTRING,
   commodity_type STRING

   
)
   COMMENT '中台交易流水小票头' WITH (
   'connector' = 'kafka',
   'topic' = 'yh_rme_soc_stream_prod-tlog_header',
   'properties.bootstrap.servers' = '',
   'properties.group.id' = 'ccc_test_20220630_2',
   'properties.request.timeout.ms' = '6',
   'format' = 'json',
   'scan.startup.mode' = 'group-offsets',
   -- 'scan.startup.mode' = 'timestamp',
   -- 'scan.startup.timestamp-millis' = '165373920',
   'json.fail-on-missing-field' = 'false',
   'json.ignore-parse-errors' = 'true'
'json.infer-schema.flatten-nested-columns.enable'='true'
);


| |
小昌
|
|
ccc0606fight...@163.com
|

Re:Re:what time support java17 ?

2022-07-04 文章 Xuyang
Hi,社区已经有一个issue[1]在尝试推进支持java17了,可以关注下。[1] 
https://issues.apache.org/jira/browse/FLINK-15736
在 2022-07-04 17:04:52,"jiangjiguang719"  写道:
>这个问题没人解答吗? spark现在已经支持java17了
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>At 2022-07-02 18:15:10, "jiangjiguang719"  wrote:
>>hi guys: 
>>what time can flink support java17 ? I urgently want to use java17 new 
>>features.
>>
>>
>>I have try to upgrade to java17,but failed.  Has anyone succeeded?
>>
>>
>>thanks,
>>jiguang


Re:使用lombok生成的pojo对象是否支持State Schema Evolution

2022-06-29 文章 Xuyang
Hi,请问下是修改了作业的逻辑之后,根据savepoint重启吗?如果是这样,是状态不兼容的原因
在 2022-06-29 17:57:54,"Howie Yang"  写道:
>flink版本:1.9.0
>
>问题:使用lombok生成的pojo对象,在数据流进行传输,中途终止任务做savepoint,state中保存应该都是这个对象;
>从savepoint重启任务后,报这个error:StateMigrationException: The new state serializer 
>cannot be incompatible. ... Heap state backend
>
>
>
>
>
>
>
>
>
>
>
>
>
>--
>
>Best,
>Howie


Re:flink 1.10.1 flinkui ???????? ????????????cancalling?? ????????????????

2022-06-16 文章 Xuyang
Hi?? 
??JMTM??logdatastream??closedelay
?? 2022-06-16 15:10:07??"??" <757434...@qq.com.INVALID> ??
>flink 1.10.1 flinkui  cancalling?? 


Re:??????????

2022-06-13 文章 Xuyang
Hi??  user-zh-unsubscr...@flink.apache.org
?? 2022-06-14 07:43:29??"?I ?? ?? ?I" <877144...@qq.com.INVALID> ??
>
>
>
>
>
>----
>??:
>"user-zh"  
>  
>:2022??6??9??(??) 9:55
>??:"user-zh"
>:
>
>
>
>


Re:Re: Re: Flink 使用interval join数据丢失疑问

2022-06-13 文章 Xuyang
Hi,
  1、理论上来说inner join关联的数据量应该比interval 
join更大吧。关于左右两边流速度不一致的情况,理论上应该问题不大,因为需要等到两边的watermark都到齐之后才会触发状态里过期数据的清除。
  2、inner 
join没有水印的情况下,就是到了就发,完全根据这条数据进入这个算子的时间来算,也就是“处理时间”。默认数据是不会过期的,会存全量数据。如果定义了ttl,得看join两侧的表的pk和join
 
key,分别用不同的state数据格式来存数据(ListState、MapState等),原则上是多长时间没更新数据之后,就清空过期数据,是按照的“处理时间”来处理的。


如果我有不对的地方,请指正我哈。




--

Best!
Xuyang





在 2022-06-12 14:39:39,"lxk7...@163.com"  写道:
>非常感谢回复
>1.针对watermark,我会再去进行测试。同时还会测试使用处理时间,interval join会不会丢失数据 
>2.针对interval jon,我个人的理解是它能关联到的数据范围要比inner 
>join大,所以数据应该更准确,但是从结果上看却是数据丢失,当时非常震惊,有点颠覆我的认知了。同时我自己还有一个新的猜测,就是两个流的数据量不一样,可能也会造成数据丢失。目前左流是订单粒度数据,右流是订单-商品粒度数据,数据量要大很多。我个人理解,在处理右流的时候,应该会慢一点,所以导致两边的时间进展可能不一致。但是这又引发了一个新的疑问?inner
> join应该也会受这样的影响
>3.还有一个问题可能是我没有阐述清楚,我在sql里使用inner 
>join,没有注册水印,那么两个流的join应该是以处理时间来定义的?那么表的state的过期是否也是以处理时间来定义?
>
>
>
>lxk7...@163.com
> 
>发件人: Shengkai Fang
>发送时间: 2022-06-11 20:35
>收件人: user-zh
>主题: Re: Re: Flink 使用interval join数据丢失疑问
>hi,
> 
>对于第一点,丢数据的情况有很多。首先,要确认是不是 JOIN 算子丢数据(SINK 的使用不当也会丢数据)。如果明确了是 join
>算子丢的数据,建议明确下丢的数据是咋样的,是不是 watermark 设置不合理,导致数据被误认为是晚到数据从而被丢了。例如,这里的是 `event
>time` = `rowtime` - 2s,是不是不合适,我咋记得一般都是 +2 s 呢?
> 
>对于第二点,interval join 我个人初步的理解是 state 的清理是根据两边的 event time,也就是说,如果右流的 event
>time 的更新会影响左流的数据清理。比如说右流的时间点到了 12:00,join 条件要求左流的时间不会晚于右流的时间 1h,那么左流
>11:00之前的数据都可以被清理了。
> 
>对于第三点,我觉得是不能的。目前的 inner join +  state 清理无法覆盖 event time 的window join 的。
> 
>best,
>Shengkai
> 
>lxk7...@163.com  于2022年6月10日周五 23:03写道:
> 
>> 对于这个问题,我还是有很大的疑问,再把我这个场景描述一下:
>>
>> 目前是使用flink进行双流join,两个流的数据,一个流是订单主表,另一个流是订单明细表。我们探查了离线的数据,订单明细表一般会在订单主表生成后晚几秒内生成,这个差异在秒级别。
>> 我们做了以下几轮测试,并对比了另一个实时落的表数据量。(这个表就是基准参照数据,只是简单落表,没做任何处理,两边的数据源一致,对比的口径一致。)
>> 1.使用datastream api,使用kafka自带的时间戳做水印,使用interval join。对比完结果,数据少。
>> 2.使用流转表,sql inner join,没有设置watermark。对比完结果数据正常。
>> 3.使用流转表,sql interval join,从数据中的事件时间提取水印,对比完结果数据,数据少。
>>  从结果上看,我不太明白为什么sql里inner join能保证数据准确,而interval
>> join不行?有什么好的方式或者思路能让我更好的去尝试了解这个问题产生的原因
>>
>> 针对第二种方式,我的疑问是,sql里没有设置水印,那么表的state过期是以处理时间来计算吗?针对这种设置了表state过期时间的join,我能理解为这个inner
>> join其实是一个window join吗?
>>
>>
>>
>> lxk7...@163.com
>>
>> 发件人: lxk
>> 发送时间: 2022-06-10 18:18
>> 收件人: user-zh
>> 主题: Re:Re:Re:Re:Re:Re:Re: Flink 使用interval join数据丢失疑问
>>
>>
>>
>> 现在改成了sql interval join,代码和执行计划如下,其他配置没变,数据量还是少,使用inner join就没问题
>>
>>
>>
>>
>> Table headerTable =
>> streamTableEnvironment.fromDataStream(headerFilterStream,
>>  Schema.newBuilder()
>> .columnByExpression("rowtime",
>> "CAST(substring(last_updated_at,0,19) AS TIMESTAMP_LTZ(3))")
>> .watermark("rowtime", "rowtime - INTERVAL '2' SECOND")
>> .build());
>> Table itemTable =
>> streamTableEnvironment.fromDataStream(filterItemStream, Schema.newBuilder()
>> .columnByExpression("rowtime",
>> "CAST(substring(last_updated_at,0,19) AS TIMESTAMP_LTZ(3))")
>> .watermark("rowtime", "rowtime - INTERVAL '2' SECOND")
>> .build());
>>
>>
>>
>>
>> streamTableEnvironment.createTemporaryView("header",headerTable);
>> streamTableEnvironment.createTemporaryView("item",itemTable);
>>
>>
>>
>>
>>
>>
>> Table result = streamTableEnvironment.sqlQuery("select
>> header.customer_id" +
>> ",item.goods_id" +
>> ",header.id" +
>> ",header.order_status" +
>> ",header.shop_id" +
>> ",header.parent_order_id" +
>> ",header.order_at" +
>> ",header.pay_at" +
>> ",header.channel_id" +
>> ",header.root_order_id" +
>> ",item.id" +
>> ",item.row_num" +
>> ",item.p_sp_sub_amt" +
>> ",item.display_qty" +
>> ",item.qty" +
>> ",item.bom_type" +
>> " from header JOIN item on header.id = item.order_id and
>> item.rowtime BETWEEN header.rowtime - INTERVAL '10' SECOND AND
>> header.rowtime + INTERVAL '20' SECOND");
>>
>>
>>
>>
>> String intervalJoin = streamTableEnvironment.explainSql("select
>> header.customer_id" +
>> ",item.goods_id

Re:12/04/2022 11:01:43 自动保存草稿

2022-06-10 文章 Xuyang
Hi, 退订请发送任意消息至 user-zh-unsubscr...@flink.apache.org
在 2022-06-10 15:37:32,"郑林怡"  写道:
>退订


Re:Re:Re:Re:Re: Flink 使用interval join数据丢失疑问

2022-06-10 文章 Xuyang
Hi, datastream的这个interval join的api应该对标的是sql中的interval 
join。但是你目前写的这个sql,是普通join。普通join和interval join在业务含义和实现上都是有区别的。所以你直接拿datastream 
api的interval join和sql上的普通join结果对比,其实是有问题的。所以我之前的建议是让你试下让sql也使用interval 
join,这样双方才有可比性。


另外sql中设置的table.exec.state.ttl这个参数,只是代表的state会20s清空过期数据,但我看你要比较的时间窗口是-10s和20s,貌似也不大一样。




--

Best!
Xuyang





在 2022-06-10 14:33:37,"lxk"  写道:
>
>
>
>我不理解的点在于,我interval join开的时间窗口比我sql中设置的状态时间都要长,窗口的上下界别是-10s 和 20s,为什么会丢数据?
>
>sql中我设置这个table.exec.state.ttl参数 
>为20s,照理来说两个流应该也是保留20s的数据在状态中进行join。不知道我的理解是否有问题,希望能够得到解答。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2022-06-10 14:15:29,"Xuyang"  写道:
>>Hi, 你的这条SQL 并不是interval join,是普通join。
>>interval join的使用文档可以参考文档[1]。可以试下使用SQL interval 
>>join会不会丢数据(注意设置state的ttl),从而判断是数据的问题还是datastream api的问题。
>>
>>
>>
>>
>>[1] 
>>https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#interval-joins
>>
>>
>>
>>
>>
>>
>>
>>--
>>
>>Best!
>>Xuyang
>>
>>
>>
>>
>>
>>在 2022-06-10 11:26:33,"lxk"  写道:
>>>我用的是以下代码:
>>>String s = streamTableEnvironment.explainSql("select header.customer_id" +
>>>",item.goods_id" +
>>>",header.id" +
>>>",header.order_status" +
>>>",header.shop_id" +
>>>",header.parent_order_id" +
>>>",header.order_at" +
>>>",header.pay_at" +
>>>",header.channel_id" +
>>>",header.root_order_id" +
>>>",item.id" +
>>>",item.row_num" +
>>>",item.p_sp_sub_amt" +
>>>",item.display_qty" +
>>>",item.qty" +
>>>",item.bom_type" +
>>>" from header JOIN item on header.id = item.order_id");
>>>
>>>System.out.println("explain:" + s);
>>>
>>>
>>>
>>>
>>>plan信息为:
>>>explain:== Abstract Syntax Tree ==
>>>LogicalProject(customer_id=[$2], goods_id=[$15], id=[$0], order_status=[$1], 
>>>shop_id=[$3], parent_order_id=[$4], order_at=[$5], pay_at=[$6], 
>>>channel_id=[$7], root_order_id=[$8], id0=[$12], row_num=[$14], 
>>>p_sp_sub_amt=[$19], display_qty=[$22], qty=[$17], bom_type=[$20])
>>>+- LogicalJoin(condition=[=($0, $13)], joinType=[inner])
>>>   :- LogicalTableScan(table=[[default_catalog, default_database, 
>>> Unregistered_DataStream_Source_5]])
>>>   +- LogicalTableScan(table=[[default_catalog, default_database, 
>>> Unregistered_DataStream_Source_8]])
>>>
>>>
>>>== Optimized Physical Plan ==
>>>Calc(select=[customer_id, goods_id, id, order_status, shop_id, 
>>>parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, 
>>>p_sp_sub_amt, display_qty, qty, bom_type])
>>>+- Join(joinType=[InnerJoin], where=[=(id, order_id)], select=[id, 
>>>order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, 
>>>channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, 
>>>p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey], 
>>>rightInputSpec=[NoUniqueKey])
>>>   :- Exchange(distribution=[hash[id]])
>>>   :  +- Calc(select=[id, order_status, customer_id, shop_id, 
>>> parent_order_id, order_at, pay_at, channel_id, root_order_id])
>>>   : +- TableSourceScan(table=[[default_catalog, default_database, 
>>> Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, 
>>> shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, 
>>> last_updated_at, business_flag, mysql_op_type])
>>>   +- Exchange(distribution=[hash[order_id]])
>>>  +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, 
>>> bom_type, display_qty])
>>> +- TableSourceScan(table=[[default_catalog, default_database, 
>>> Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, 
>>> goods_id, s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, 
>>> last_updated_at, display_qty, is_first_flag])
>>>
>>>
>>>== Optimized Execution Plan ==
>>>Calc(select=[customer_id, goods_id, id, order_status, shop_id, 
>>>parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, 
>>>p_sp_sub_amt, display_qty, qty, bom_type])
>>>+- Join(joinType=[InnerJoin], where=[(id = order_id)], select=[id, 
>>>order_status, customer_id, shop_

Re:flinkcdc

2022-06-09 文章 Xuyang
Hi?? ??DDL
At 2022-06-09 17:48:01, "1223681919" <1223681...@qq.com.INVALID> wrote:
>
>
>
>[flink-akka.actor.default-dispatcher-6] ERROR 
>org.apache.flink.runtime.util.FatalExitExceptionHandler - FATAL: Thread 
>'flink-akka.actor.default-dispatcher-6' produced an uncaught exception. 
>Stopping the process...
>java.util.concurrent.CompletionException: 
>org.apache.flink.util.FlinkRuntimeException: Failed to start the operator 
>coordinators
>   at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>   at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>   at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:708)
>   at 
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)
>   at 
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.resetAndStartScheduler(JobMaster.java:935)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:801)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$start$1(JobMaster.java:357)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:383)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:88)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>   at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>   at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>   at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to start the 
>operator coordinators
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.startAllOperatorCoordinators(SchedulerBase.java:1100)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:567)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:944)
>   at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705)
>   ... 27 more
>Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to discover 
>captured tables for enumerator
>   at 
> com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:170)
>   at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:119)
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:308)
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:72)
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:182)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.startAllOperatorCoordinators(SchedulerBase.java:1094)
>   ... 30 more
>Caused by: java.lang.IllegalArgumentException: Can't find any matched tables, 
>please check your configured database-name: [hand] and table-name: [hand]
>   at 
> com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables(DebeziumUtils.java:167)
>   at 
> com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:161)
>   ... 35 more


Re:(无主题)

2022-06-09 文章 Xuyang



Hi, 退订请发送任意消息至 user-zh-unsubscr...@flink.apache.org










--

Best!
Xuyang





在 2022-06-09 16:12:11,"zehir.tong"  写道:
>退订


Re:退订

2022-06-09 文章 Xuyang
Hi, 退订请发送任意消息至 user-zh-unsubscr...@flink.apache.org







--

Best!
Xuyang





在 2022-06-09 13:18:55,"高亮"  写道:
>退订


Re:flink webui stdout疑惑

2022-06-08 文章 Xuyang
Hi, 
请问下你找的.out文件是tm的还是jm的?tm的out文件应该是有内容的才对。




--

Best!
Xuyang





在 2022-06-08 16:49:20,"陈卓宇" <2572805...@qq.com.INVALID> 写道:
>您好:
>向flink集群提交的sql:
>CREATE TABLE datagen (
>f_sequence INT,
>f_random INT,
>f_random_str STRING,
>ts AS localtimestamp,
>WATERMARK FOR ts AS ts
>  ) WITH (
>'connector' = 'datagen',
>-- optional options --
>'rows-per-second'='5',
>'fields.f_sequence.kind'='sequence',
>'fields.f_sequence.start'='1',
>'fields.f_sequence.end'='500',
>'fields.f_random.min'='1',
>'fields.f_random.max'='500',
>'fields.f_random_str.length'='10'
>  );
>
>  CREATE TABLE print_table (
>f_sequence INT,
>f_random INT,
>f_random_str STRING
>) WITH (
>'connector' = 'print'
>  );
>
>  INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen;
>
>
>想请问一下在flink web ui的stdout上发现没有打印一片空白,进入日志是有打印的,这是什么原因导致的? 
>我如何解决,让ui的stdout能把内容打印出来


Re:Re: 实现SupportsFilterPushDown接口过程中遇到的问题

2022-06-07 文章 Xuyang
Hi, 非常欢迎你一起参与社区的建设中来。

社区有一套完整的贡献流程,大体可以参照文档[1]。 

总的来说可以细分为:

1、发现问题,并在jira[2]中提出一个issue

2、提一个pr并在pr中写明相关的issue号码+模块名+简单描述,具体可以参考下其他的pr来写

3、热心的同学会帮你review的,你也可以在你新建的issue下面ping我下(xuyang)




[1] https://flink.apache.org/contributing/how-to-contribute.html

[2] 
https://issues.apache.org/jira/projects/FLINK/issues/FLINK-27084?filter=allopenissues




--

Best!
Xuyang





在 2022-06-07 18:08:10,"朱育锋"  写道:
>Hi
>
>很抱歉这么晚回复
>
>1. Hi 
>Xuyang老师,确实如你所说,调用applyFilters方法之后,又调用了copy方法生成了新的JdbcDynamicTableSource对象,在新的JdbcDynamicTableSource对象中再次调用了getScanRuntimeProvider方法。现在已经成功实现了谓词下推,十分感谢
>2. Hi Shengkai老师,我十分愿意贡献到社区,不过我之前从未参与过GitHub开源项目,这对我既是机会又是挑战,我愿意挑战下自己。
>为此,我通读了一遍Flink官网的贡献指南,当我阅读到这一节时[1],我点击了文本[2]上的链接,显示"The requested URL was not 
>found on this 
>server",发现是md文档中链接的URL拼写错误造成的。我提了一个hotfix[3],正好借助这个PR熟悉下贡献流程,辛苦老师帮忙Review下
>
>[1] 
>https://flink.apache.org/contributing/code-style-and-quality-common.html#nullability-of-the-mutable-parts
>[2] "usage of Java Optional" 
>https://flink.apache.org/contributing/code-style-and-quality-java.md#java-optional
>[3] https://github.com/apache/flink-web/pull/544
>
>Best regards
>YuFeng
>
>> 2022年6月2日 10:28,Shengkai Fang  写道:
>> 
>> Hi.
>> 
>> 我记得 Jdbc Connector 实现了 ProjectionPushDown。你可以参考着实现。
>> 
>> xuyang 老师说的对,getScanRuntimeProvider 发生在 push down
>> 之后。应该不会有你说的问题。另外,可以考虑贡献到社区[1],我们也可以帮忙一起 review 下,帮忙解决你的问题?
>> 
>> Best,
>> Shengkai
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-19651
>> 
>> Xuyang  于2022年6月1日周三 23:47写道:
>> 
>>> 
>>> Hi,在SQL优化的末尾,StreamPhysicalTableSourceScan转化为StreamExecTableSourceScan的过程中[1],会执行一次getScanRuntimeProvider,此时的getScanRuntimeProvider一定是在applyFilters方法之后触发的。
>>> 
>>> 
>>> 
>>> 
>>> 你可以尝试将filterFields记录在JdbcDynamicTableSource
>>> 这个类中,如果该值为空,则getScanRuntimeProvider
>>> 时无需拼接(在applyFilters执行之前一定是空的);当该值不为空的时候,在getScanRuntimeProvider
>>> 进行拼接(最后一次physical node转exec node时,一定执行过applyFilters方法)。
>>> 
>>> 
>>> 
>>> 
>>> [1]
>>> https://github.com/apache/flink/blob/98997ea37ba08eae0f9aa6dd34823238097d8e0d/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java#L83
>>> 
>>> 
>>> 
>>> 
>>> --
>>> 
>>>Best!
>>>Xuyang
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2022-06-01 20:03:58,"朱育锋"  写道:
>>> 
>>> Hi,在SQL优化的末尾,StreamPhysicalTableSourceScan转化为StreamExecTableSourceScan的过程中[1],会执行一次getScanRuntimeProvider,此时的getScanRuntimeProvider一定是在applyFilters方法之后触发的。你可以尝试将filterFields记录在JdbcDynamicTableSource
>>> 这个类中,如果该值为空,则getScanRuntimeProvider
>>> 时无需拼接(在applyFilters执行之前一定是空的);当该值不为空的时候,在getScanRuntimeProvider
>>> 进行拼接(最后一次physical node转exec node时,一定执行过applyFilters方法)。[1]
>>> https://github.com/apache/flink/blob/98997ea37ba08eae0f9aa6dd34823238097d8e0d/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java#L83


Re:Re: Re: [Internet]Re: Re: Some question with Flink state

2022-06-02 文章 Xuyang
Hi, 理论上来说这句话是不是有问题?


> “是因为如果使用value-state,一个task会存在多个key,不同的key的内容会进行替换”


因为ValueState也是keyedState的一种,所以也是每个key各自维护一个valuestate,不同的key之间是隔离的。
其实一般情况下ValueState里面存Map,和直接MapState没啥区别,只不过在不同的状态存储上和状态的TTL策略有略微不同,所以不太推荐ValueState里面存Map。
所以其实还是看具体的业务场景,假如只是算一个累加的值的话,用valuestate就够了。




--

Best!
    Xuyang





在 2022-05-25 13:38:52,"lxk7...@163.com"  写道:
>
>刚看了下keygroup的原理,前面的内容大致能理解了,对于下面这段话
>"map-state的话相当于某些固定的key group里面的key都可以通过map-state的user-key去分别存储"
>我理解   
>是因为如果使用value-state,一个task会存在多个key,不同的key的内容会进行替换,而使用map的话,就算同一个task有多个key,根据用户自定义的key还是可以匹配到的。
>这样的话,大部分场景其实都适合使用map-state。
>
>
>lxk7...@163.com
> 
>From: jurluo(罗凯)
>Date: 2022-05-25 11:05
>To: user-zh@flink.apache.org
>Subject: Re: [Internet]Re: Re: Some question with Flink state
>老哥,看起来好像没什么问题,相同的key都分配在了同个task,每个task会存在多种key是正常的。key会按最大并行度分成多个key 
>group,然后固定的key 
>group分配到各个task里。只能保证相同的key会到同一个task,不能保证一个task只有一个key。你这个需求用map-state才合适。map-state的话相当于某些固定的key
> group里面的key都可以通过map-state的user-key去分别存储。
> 
>> 2022年5月25日 上午10:45,lxk7...@163.com 写道:
>> 
>> 图片好像又挂了  我重发下
>> hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准
>> 
>> 
>> 
>>下面是我的代码及测试结果
>> 
>> 
>> 
>> 一.使用int类型
>> 
>> 
>> 
>>public class KeyByTest {
>> 
>> 
>> 
>> public static void main(String[] args) throws Exception {
>> 
>> 
>> 
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>> 
>> 
>> env.setParallelism(10);
>> 
>> 
>> 
>> 
>> 
>> DataStreamSource dataDataStreamSource = 
>> env.fromCollection(Arrays.asList(new data(1, "123", "首页"),
>> 
>> 
>> 
>> new data(1, "123", "分类页"),
>> 
>> 
>> 
>> new data(2, "r-123", "搜索结果页"),
>> 
>> 
>> 
>> new data(1, "r-123", "我的页"),
>> 
>> 
>> 
>> new data(3, "r-4567", "搜索结果页")));
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> SingleOutputStreamOperator map = 
>> dataDataStreamSource.keyBy(new MyKeySelector())
>> 
>> 
>> 
>> .map(new RichMapFunction() {
>> 
>> 
>> 
>> 
>> 
>> @Override
>> 
>> 
>> 
>> public String map(data data) throws Exception {
>> 
>> 
>> 
>> System.out.println(data.toString() + "的subtask为:" + 
>> getRuntimeContext().getIndexOfThisSubtask() );
>> 
>> 
>> 
>> return data.toString();
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> });
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> env.execute("test");
>> 
>> 
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> class data{
>> 
>> 
>> 
>> private int id;
>> 
>> 
>> 
>> private String goods;
>> 
>> 
>> 
>> private String pageName;
>> 
>> 
>> 
>> 
>> 
>> public data(int id, String goods, String pageName) {
>> 
>> 
>> 
>> this.id = id;
>> 
>> 
>> 
>> this.goods = goods;
>> 
>> 
>> 
>> this.pageName = pageName;
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> public data() {
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> public int getId() {
>> 
>> 
>> 
>> return id;
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> public void setId(int id) {
>> 
>> 
>> 
>> this.id = id;
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> public String getGoods() {
>> 
>> 
>> 
>> return goods;
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> public void setGoods(String goods) {
>> 
>> 
>> 
>> this.goods = goods;

Re:实现SupportsFilterPushDown接口过程中遇到的问题

2022-06-01 文章 Xuyang
Hi,在SQL优化的末尾,StreamPhysicalTableSourceScan转化为StreamExecTableSourceScan的过程中[1],会执行一次getScanRuntimeProvider,此时的getScanRuntimeProvider一定是在applyFilters方法之后触发的。




你可以尝试将filterFields记录在JdbcDynamicTableSource 这个类中,如果该值为空,则getScanRuntimeProvider 
时无需拼接(在applyFilters执行之前一定是空的);当该值不为空的时候,在getScanRuntimeProvider 
进行拼接(最后一次physical node转exec node时,一定执行过applyFilters方法)。




[1] 
https://github.com/apache/flink/blob/98997ea37ba08eae0f9aa6dd34823238097d8e0d/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java#L83




--

Best!
Xuyang





在 2022-06-01 20:03:58,"朱育锋"  写道:

Hi,在SQL优化的末尾,StreamPhysicalTableSourceScan转化为StreamExecTableSourceScan的过程中[1],会执行一次getScanRuntimeProvider,此时的getScanRuntimeProvider一定是在applyFilters方法之后触发的。你可以尝试将filterFields记录在JdbcDynamicTableSource
 这个类中,如果该值为空,则getScanRuntimeProvider 
时无需拼接(在applyFilters执行之前一定是空的);当该值不为空的时候,在getScanRuntimeProvider 
进行拼接(最后一次physical node转exec node时,一定执行过applyFilters方法)。[1] 
https://github.com/apache/flink/blob/98997ea37ba08eae0f9aa6dd34823238097d8e0d/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java#L83

Re:关于flinksql聚合函数实现的学习疑问

2022-06-01 文章 Xuyang
Hi, 代码的话可以参考[1],由于agg的相关代码走的是codegen,推荐通过debug相关的测试类到附近。然后观察生成的代码。[1] 
https://github.com/apache/flink/blob/95e378e6565eea9b6b83702645e99733c33a957a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecGroupWindowAggregateBase.scala#L171
在 2022-06-01 15:41:05,"hdxg1101300...@163.com"  写道:
>您好:
>   最近再使用flinksql的过程中突然有这样一个疑问:(目前flink1.12.4)
>   比如这样一条sql语句:
> select 
>dim,
>count(*) as pv,
>sum(price) as sum_price,
>max(price) as max_price,
>min(price) as min_price,
>-- 计算 uv 数
>count(distinct user_id) as uv,
>UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS 
> STRING)) * 1000  as window_start
>from source_table
>group by
>dim,
>tumble(row_time, interval '1' minute);
>在指定窗口大小和维度上做聚合计算,可以灵活指定count(*),sum(price), max(price)等聚合函数;
>如果使用datastream api来做这种聚合,怎么实现多个聚合计算;目前api的aggregate函数 
>aggregate(AggregateFunction aggFunction, WindowFunction 
>windowFunction) 
>是传入一个聚合函数和一个窗口函数,这种怎么实现灵活的组合;或者flinksql是怎么样实现的,想了解这一块的代码?但是不知道从哪里入手。希望有朋友可以指定下。
>谢谢!
>
>
>hdxg1101300...@163.com


Re:Re: Re: Some question with Flink state

2022-05-24 文章 Xuyang
我不确定但大概率是两次keyby只以后面那个为准,所以可能会导致你前面的keyby其实是无用的(可以试验下)。可以按你说的方式将数据中这两个key拼成一个string当作shuffle的key。
在 2022-05-24 21:06:58,"lxk7...@163.com"  写道:
>如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗?
>
>
>
>lxk7...@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:51
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
>在 2022-05-24 20:43:19,"lxk7...@163.com"  写道:
>>
>>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>>
>>这样呢
>>
>>
>>lxk7...@163.com
>> 
>>From: Xuyang
>>Date: 2022-05-24 20:17
>>To: user-zh
>>Subject: Re:Re: Re: Some question with Flink state
>>Hi, 你的图还是挂了,可以使用图床工具试一下
>> 
>> 
>> 
>>在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
>> 
>>图片好像有点问题,重新上传一下
>>lxk7...@163.com
>>From: Hangxiang Yu
>>Date: 2022-05-24 12:09
>>To: user-zh
>>Subject: Re: Re: Some question with Flink state
>>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>>On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
>>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>>
>>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>>
>>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>>
>>>
>>>
>>> lxk7...@163.com
>>>
>>> From: Hangxiang Yu
>>> Date: 2022-05-23 23:09
>>> To: user-zh; lxk7491
>>> Subject: Re: Some question with Flink state
>>> Hello,
>>> All states will not be shared in different parallelisms.
>>> BTW, English questions could be sent to u...@flink.apache.org.
>>>
>>> Best,
>>> Hangxiang.
>>>
>>> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>>>
>>> >
>>> > Hi everyone
>>> >I was used Flink keyed-state in my Project.But I found some questions
>>> > that make me confused.
>>> >when I used value-state in multi parallelism  the value is not I
>>> wanted.
>>> >So I guess that value-state is in every parallelism. every parallelism
>>> > saved their only value  which means the value is Thread-Level
>>> >But when I used map-state,the value is correctly. I mean the map-state
>>> > was shared by every parallelism.
>>> >   looking forward to your reply
>>> >
>>> >
>>> > lxk7...@163.com
>>> >
>>>


Re:Re:Re:Re:在自定义表聚合函数 TableAggregateFunction 使用 emitUpdateWithRetract 异常

2022-05-24 文章 Xuyang
reFirst = 0;public long beforeSecond = 
>>0;public long afterFirst = 0;public long 
>>afterSecond = 0;}// 创建 Top2Accumulator 累加器并做初始化   
>> @Overridepublic Top2RetractAccumulator createAccumulator() {   
>> LOG.info("[INFO] createAccumulator ...");   
>> Top2RetractAccumulator acc = new Top2RetractAccumulator();
>>acc.beforeFirst = Integer.MIN_VALUE;acc.beforeSecond = 
>>Integer.MIN_VALUE;acc.afterFirst = Integer.MIN_VALUE;   
>> acc.afterSecond = Integer.MIN_VALUE;return acc;
>>}// 接收输入元素并累加到 Accumulator 数据结构public void 
>>accumulate(Top2RetractAccumulator acc, Long value) {
>>LOG.info("[INFO] accumulate ...");if 
>>(value  acc.afterFirst) {acc.afterSecond = 
>>acc.afterFirst;acc.afterFirst = value;} else if 
>>(value  acc.afterSecond) {acc.afterSecond = value;  
>>  }}// 带撤回的输出public void 
>>emitUpdateWithRetract(Top2RetractAccumulator acc, 
>>RetractableCollectorTuple2Long, Integer out) {
>>LOG.info("[INFO] emitUpdateWithRetract ...");
>>if (!Objects.equals(acc.afterFirst, acc.beforeFirst)) {
>>// 撤回旧记录if (acc.beforeFirst != Integer.MIN_VALUE) { 
>>   out.retract(Tuple2.of(acc.beforeFirst, 1));} 
>>   // 输出新记录out.collect(Tuple2.of(acc.afterFirst, 
>>1));acc.beforeFirst = acc.afterFirst;} 
>>   if (!Objects.equals(acc.afterSecond, acc.beforeSecond)) {
>>// 撤回旧记录if (acc.beforeSecond != Integer.MIN_VALUE) {
>>out.retract(Tuple2.of(acc.beforeSecond, 2));
>>}// 输出新记录
>>out.collect(Tuple2.of(acc.afterSecond, 2));acc.beforeSecond 
>>= acc.afterSecond;}
>>}}```完整调用代码:```// 
>>执行环境StreamExecutionEnvironment env = 
>>StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);EnvironmentSettings
>> settings = EnvironmentSettings.newInstance()
>>.useOldPlanner() // Blink Planner 异常 Old Planner 可以
>>.inStreamingMode().build();StreamTableEnvironment tEnv = 
>>StreamTableEnvironment.create(env, settings);DataStreamRow 
>>sourceStream = env.fromElements(Row.of("李雷", "语文", 78), 
>>   Row.of("韩梅梅", "语文", 50),Row.of("李雷", "语文", 99),
>>Row.of("韩梅梅", "语文", 80),Row.of("李雷", "英语", 90),
>>Row.of("韩梅梅", "英语", 40),Row.of("李雷", "英语", 98),
>>Row.of("韩梅梅", "英语", 88));// 
>>注册虚拟表tEnv.createTemporaryView("stu_score", sourceStream, $("name"), 
>>$("course"), $("score"));// 
>>注册临时i系统函数tEnv.createTemporarySystemFunction("Top2", new 
>>Top2RetractTableAggregateFunction());// 
>>调用函数tEnv.from("stu_score").groupBy($("course"))
>>.flatAggregate(call("Top2", $("score")).as("score", "rank"))
>>.select($("course"), $("score"), $("rank")).execute()   
>> .print();```Flink 版本:1.13.5
>>在 2022-05-23 09:55:40,"Xuyang"  写道:
>>>Hi, 可以将你的taf具体代码发出来吗?还有你的版本,也贴一下。
>>>
>>>
>>>
>>>
>>>--
>>>
>>>Best!
>>>Xuyang
>>>
>>>
>>>
>>>
>>>
>>>在 2022-05-22 22:35:46,"赢峰"  写道:
>>>>
>>>>
>>>>在自定义表聚合函数 TableAggregateFunction 时使用的是 emitUpdateWithRetract 
>>>>输出数据。在调用的时候参考文档的使用方式:
>>>>```
>>>>tEnv.from("stu_score")
>>>>.groupBy($("course"))
>>>>.flatAggregate(call(Top2RetractTableAggregateFunction.class, 
>>>> $("score")))
>>>>.select($("course"), $("f0"), $("f1"))
>>>>```
>>>>使用默认 blink Planner,会抛出如下异常:
>>>>```
>>>>Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>>>>Could not find an implementation method 'emitValue' in class 
>>>>'com.flink.example.table.function.custom.Top2RetractTableAggregateFunction' 
>>>>for function 'Top2' that matches the following signature:
>>>>void 
>>>>emitValue(com.flink.example.table.function.custom.Top2RetractTableAggregateFunction.Top2RetractAccumulator,
>>>> org.apache.flink.util.Collector)
>>>>```
>>>>但是使用 Old Planner,则会正常输出:
>>>>```
>>>>StreamExecutionEnvironment env = 
>>>>StreamExecutionEnvironment.getExecutionEnvironment();
>>>>env.setParallelism(1);
>>>>EnvironmentSettings settings = EnvironmentSettings
>>>>.newInstance()
>>>>.useOldPlanner()
>>>>.inStreamingMode()
>>>>.build();
>>>>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>>>>```
>>>>这是什么地方使用有问题?
>>>>
>>>>
>>>>
>>>>
>>>> 


Re:Re: Re: Some question with Flink state

2022-05-24 文章 Xuyang
看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
在 2022-05-24 20:43:19,"lxk7...@163.com"  写道:
>
>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>
>这样呢
>
>
>lxk7...@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:17
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>Hi, 你的图还是挂了,可以使用图床工具试一下
> 
> 
> 
>在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
> 
>图片好像有点问题,重新上传一下
>lxk7...@163.com
>From: Hangxiang Yu
>Date: 2022-05-24 12:09
>To: user-zh
>Subject: Re: Re: Some question with Flink state
>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>
>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>
>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>
>>
>>
>> lxk7...@163.com
>>
>> From: Hangxiang Yu
>> Date: 2022-05-23 23:09
>> To: user-zh; lxk7491
>> Subject: Re: Some question with Flink state
>> Hello,
>> All states will not be shared in different parallelisms.
>> BTW, English questions could be sent to u...@flink.apache.org.
>>
>> Best,
>> Hangxiang.
>>
>> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>>
>> >
>> > Hi everyone
>> >I was used Flink keyed-state in my Project.But I found some questions
>> > that make me confused.
>> >when I used value-state in multi parallelism  the value is not I
>> wanted.
>> >So I guess that value-state is in every parallelism. every parallelism
>> > saved their only value  which means the value is Thread-Level
>> >But when I used map-state,the value is correctly. I mean the map-state
>> > was shared by every parallelism.
>> >   looking forward to your reply
>> >
>> >
>> > lxk7...@163.com
>> >
>>


Re:Re: Re: Some question with Flink state

2022-05-24 文章 Xuyang
Hi, 你的图还是挂了,可以使用图床工具试一下



在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:

图片好像有点问题,重新上传一下
lxk7...@163.com
 
From: Hangxiang Yu
Date: 2022-05-24 12:09
To: user-zh
Subject: Re: Re: Some question with Flink state
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
 
On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
 
> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7...@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to u...@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>
> >
> > Hi everyone
> >I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >when I used value-state in multi parallelism  the value is not I
> wanted.
> >So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7...@163.com
> >
>

Re:退订

2022-05-22 文章 Xuyang
Hi,退订请发送至邮箱user-zh-unsubscr...@flink.apache.org
在 2022-05-22 15:32:02,"xudongjun123...@163.com"  写道:
>退订
>
>
>
>xudongjun123...@163.com


Re:在自定义表聚合函数 TableAggregateFunction 使用 emitUpdateWithRetract 异常

2022-05-22 文章 Xuyang
Hi, 可以将你的taf具体代码发出来吗?还有你的版本,也贴一下。




--

Best!
Xuyang





在 2022-05-22 22:35:46,"赢峰"  写道:
>
>
>在自定义表聚合函数 TableAggregateFunction 时使用的是 emitUpdateWithRetract 
>输出数据。在调用的时候参考文档的使用方式:
>```
>tEnv.from("stu_score")
>.groupBy($("course"))
>.flatAggregate(call(Top2RetractTableAggregateFunction.class, $("score")))
>.select($("course"), $("f0"), $("f1"))
>```
>使用默认 blink Planner,会抛出如下异常:
>```
>Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>Could not find an implementation method 'emitValue' in class 
>'com.flink.example.table.function.custom.Top2RetractTableAggregateFunction' 
>for function 'Top2' that matches the following signature:
>void 
>emitValue(com.flink.example.table.function.custom.Top2RetractTableAggregateFunction.Top2RetractAccumulator,
> org.apache.flink.util.Collector)
>```
>但是使用 Old Planner,则会正常输出:
>```
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>env.setParallelism(1);
>EnvironmentSettings settings = EnvironmentSettings
>.newInstance()
>.useOldPlanner()
>.inStreamingMode()
>.build();
>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>```
>这是什么地方使用有问题?
>
>
>
>
> 


Re:flink rest 接口部分数据不返回,返回空,状态200正常。

2022-05-19 文章 Xuyang
Hi,请问在UI界面这些数据都是空的吗?可以贴一下具体的代码和UI界面截图吗?会不会是由于算子chain在一起了导致输入/输出数据是0呢?




--

Best!
Xuyang





在 2022-05-19 17:52:20,"yidan zhao"  写道:
>如题,主要表现是web 
>ui部分监控,比如watermark,每个节点的数据之类不展示。看chrome的network视图可以发现请求返回状态码都是200,但是数据是空的。
>
>以watermarks请求为例:
>.../jobs/f5600ae6822108629f26b492ed7a1f96/vertices/95d07d760a7164acf2417ef057dca790/watermarks
>结果是 [] 。 不清楚啥情况,有人清楚吗。
>
>目前看了下jobmanger日志,有部分报错,但和这些请求没关系都。报的都是org.apache.flink.runtime.checkpoint.CheckpointsCleaner
>  - Could not properly discard completed checkpoint 32748 这种错误。


Re:如何更新中文文档?

2022-05-17 文章 Xuyang
Hi, 文档的贡献可以参考[1]。
1、一般来说,发现一个问题,就可以直接开issue,修文档bug,issue的component贴好文档和对应的模块就行。
2、关于你说的讨论区,因为开发者涉及国内外,因此现在基本都是在dev邮箱[2]讨论比较重要的决定,社区现在也在尝试建一个slack来更高效的讨论内容,详见[3]。pr会有热心同学给你review,如果有翻译错误的情况,会直接在你开的issue或pr下面讨论的。


ps: 你的图挂了,下次可以贴图床的链接。




[1] https://flink.apache.org/contributing/contribute-documentation.html
[2] https://flink.apache.org/community.html#mailing-lists
[3] https://lists.apache.org/thread/n43r4qmwprhdmzrj494dbbwr9w7bbdcv



--

Best!
Xuyang




在 2022-05-17 11:13:34,"z y xing"  写道:

各位好:
  最近准备对照中英文系统的学习一下Flink,注意到部分中文翻译可能有问题,以及只有英文,所以想要咨询如下两个问题:
1. 修复的粒度一般是什么?
例如我最近发现的一个问题如下,Flink应该是在1.11版本开始就去除了ingest 
time,至少英文版本是去除了这个属性,但是这里几个版本的时间属性翻译的都是有问题的
同时也参考了https://flink.apache.org/zh/contributing/contribute-documentation.html#section-3
 中的中文文档翻译手册,但是不是很清楚这种小改动应该是怎么处理了?也是要jira上提issue,走PR吗?还是是有一个大的issue的,直接patch即可?








https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/concepts/time_attributes/



2. 有单独的讨论区吗
后续可能会有技术上以及翻译上的(因为英语也不是很好)问题,会有专门做这块的前辈们先带一下吗?当然钉钉群已经加了


谢谢!
振宇





Re:BigDecimal数据转为指定精度的DecimalData,为什么要用

2022-05-16 文章 Xuyang
Hi, 
我说说我的看法。第一个问题在我看来,用RoundingMode.HALF_UP更多的是和其他主流db的行为保持一致。我在mysql中试了,如果cast一个double类型的1.5到SIGNED
 int,会变成2。cast 1.4 as 
SIGNED,结果是1.第二个问题,你看下Flink函数TRUNCATE满足你的要求吗?不行的话你可以试下用udf自定义行为。
在 2022-05-13 11:03:27,"happygoing"  写道:
>Hi,All:
>
>
>使用tableEnv.createTemporaryView(String path, DataStream dataStream, 
> Schema 
> schema)创建临时表后使用SQL查询(schema中有个字段为decimal,定义为DataTypes.DECIMAL(10,2))时。当我发送的数据小数位超过2,结果会被四舍五入,如,10.136
>  -> 10.14。 后面debug时发现在数据转换时,会被四舍五入。
>
>
>   想咨询下:
>   1、为什么要用RoundingMode.HALF_UP,是出于什么考虑?
>   2、可以调整成RoundingMode.ROUND_DOWN只取固定的小数位吗?
>
>
>附录:
>DecimalData转换源码
>/**
> * Creates an instance of {@link DecimalData} from a {@link BigDecimal} and 
> the given precision
> * and scale.
> *
> * The returned decimal value may be rounded to have the desired scale. The 
> precision will be
> * checked. If the precision overflows, null will be returned.
> */
>public static @Nullable DecimalData fromBigDecimal(BigDecimal bd, int 
>precision, int scale) {
>bd = bd.setScale(scale, RoundingMode.HALF_UP);
>if (bd.precision() > precision) {
>return null;
>}
>
>long longVal = -1;
>if (precision <= MAX_COMPACT_PRECISION) {
>longVal = bd.movePointRight(scale).longValueExact();
>}
>return new DecimalData(precision, scale, longVal, bd);
>}


Re:Pull request 的 CI build 不能自动触发

2022-05-04 文章 Xuyang
Hi, 我看你的pr里,目前已经正常重跑了azure?一般来说,@flinkbot run azure之后,需要稍等一会才会触发。
在 2022-05-03 13:33:06,"mefor sy"  写道:
>大家好,我第一次提交 Flink PR https://github.com/apache/flink/pull/19617,构建失败修复后,CI
>build 不能自动触发,  @flinkbot  run
>azure 也不能,请问应该怎么处理?


Re:Flink SQL??????Java code????debug

2022-04-28 文章 Xuyang
Hi??FlinkjaninoJanino[1]??org.codehaus.janino.source_debugging.enable=true??org.codehaus.janino.source_debugging.dir=mypathdebug[1]
 http://janino-compiler.github.io/janino/#debugging
?? 2022-04-25 17:04:30??"zhiyezou" <1530130...@qq.com.INVALID> ??
>
>Ideadebug??


Re:咨询个Flink SQL的问题,如何去除null的字段

2021-12-29 文章 Xuyang
可以使用case when试一下
在 2021-12-29 16:40:39,"RS"  写道:
>Hi,
>使用Flink SQL消费Kafka写ES,有时候有的字段不存在,不存在的不想写入ES,这种情况怎么处理呢?
>
>
>比如:源数据有3个字段,a,b,c
>insert into table2
>select
>a,b,c
>from table1
>当b=null的时候,只希望写入a和c
>当c=null的时候,只希望写入a和b
>