Re: flinksql join

2022-11-16 文章 Zhiwen Sun
dob_dim_account 维表如果使用 jdbc 的 connector, flink 会在初始化的时候一次性读取所有的数据,
后续数据库中更新并不会触发 flink 计算。

要解决这个问题, dob_dim_account 需要变成流表。


Zhiwen Sun



On Thu, Nov 17, 2022 at 1:56 PM Jason_H  wrote:

> hi,你好
> 这种方式,需要使用cdc,但是我们的现在方案里领导不考虑使用cdc,只想用flinksql去解决这个问题
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |
>  Replied Message 
> | From | 任召金 |
> | Date | 11/15/2022 09:52 |
> | To | user-zh |
> | Subject | Re: flinksql join |
> hello,你可以试下,将mysql的数据通过CDC变成流数据,然后跟主流inner join,注意状态的TTL
> 
> 
> --Original--
> From: "Jason_H" Date: Tue, Nov 15, 2022 09:46 AM
> To: "flink中文邮件组"
> Subject: Re: flinksql join
>
> 
>
> hi,你好
> 我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。
>
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |
>  Replied Message 
> | From | RS | Date | 11/15/2022 09:07 |
> | To | user-zh@flink.apache.org | Subject | Re:flinksql join |
> Hi,
> 我的理解是后插入的维表数据,关联不到是正常现象,
> 如果要实现=3的话,应该要手动重新跑历史数据,然后更新现有数据,
>
>
> Thanks
>
>
>
>
>
>
> 在 2022-11-11 11:10:03,"Jason_H" 
>
> hi,大家好
>
> 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
> kakfa输入:
> 账号 金额 笔数
>  100 1 - 未匹配
>  100 1 - 未匹配
>  100 1 - 匹配上
>
> 维表
> 账号 企业
>  
>   - 后插入的账号信息
> 实际输出结果
> 企业 金额 笔数
>  100 1
>
>
> 我想要的结果:
> 企业 金额 笔数
>  300 3
>
>
>
>
>
> sql如下:
> String sql2 = "insert into dws_b2b_trade_year_index\n" +
> "WITH temp AS (\n" +
> "select \n" +
> " ta.gmtStatistical as gmtStatistical,\n" +
> " ta.paymentMethod as paymentMethod,\n" +
> " tb.CORP_ID as outCorpId,\n" +
> " tc.CORP_ID as inCorpId,\n" +
> " sum(ta.tradeAmt) as tranAmount,\n" +
> " sum(ta.tradeCnt) as tranNum \n" +
> "from dws_a2a_trade_year_index ta \n" +
> "left join dob_dim_account for system_time as of ta.proc as tb on
> ta.outAcctCode = tb.ACCT_CODE \n" +
> "left join dob_dim_account for system_time as of ta.proc as tc on
> ta.inAcctCode = tc.ACCT_CODE \n" +
> "group by \n" +
> " ta.gmtStatistical, \n" +
> " ta.paymentMethod, \n" +
> " tb.CORP_ID, \n" +
> " tc.CORP_ID \n" +
> ") \n" +
> "SELECT \n" +
> " DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as
> gmtUpdate, \n" +
> " gmtStatistical, \n" +
> " paymentMethod, \n" +
> " outCorpId, \n" +
> " inCorpId, \n" +
> " tranAmount, \n" +
> " tranNum \n" +
> "FROM temp";
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |


Re: flinksql join

2022-11-10 文章 Zhiwen Sun
用普通的 join, 不要用 lookup join

Zhiwen Sun



On Fri, Nov 11, 2022 at 11:10 AM Jason_H  wrote:

>
>
> hi,大家好
>
> 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下:
> kakfa输入:
> 账号 金额 笔数
>  100 1  -> 未匹配
>  100 1  -> 未匹配
>  100 1  -> 匹配上
>
> 维表
> 账号  企业
>   
>      -> 后插入的账号信息
> 实际输出结果
> 企业  金额  笔数
>  100   1
>
>
> 我想要的结果:
> 企业  金额  笔数
>  300   3
>
>
>
>
>
> sql如下:
> String sql2 =  "insert into dws_b2b_trade_year_index\n" +
>"WITH temp AS (\n" +
>"select \n" +
>"  ta.gmtStatistical as gmtStatistical,\n" +
>"  ta.paymentMethod as paymentMethod,\n" +
>"  tb.CORP_ID as outCorpId,\n" +
>"  tc.CORP_ID as inCorpId,\n" +
>"  sum(ta.tradeAmt) as tranAmount,\n" +
>"  sum(ta.tradeCnt) as tranNum \n" +
>"from dws_a2a_trade_year_index ta \n" +
>"left join dob_dim_account for system_time as of ta.proc as
> tb on ta.outAcctCode = tb.ACCT_CODE \n" +
>"left join dob_dim_account for system_time as of ta.proc as
> tc on ta.inAcctCode = tc.ACCT_CODE \n" +
>"group by \n" +
>" ta.gmtStatistical, \n" +
>" ta.paymentMethod, \n" +
>" tb.CORP_ID, \n" +
>" tc.CORP_ID \n" +
>") \n" +
>"SELECT \n" +
>"   DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as
> gmtUpdate, \n" +
>"   gmtStatistical, \n" +
>"   paymentMethod, \n" +
>"   outCorpId, \n" +
>"   inCorpId, \n" +
>"   tranAmount, \n" +
>"   tranNum \n" +
>"FROM temp";
>
> | |
> Jason_H
> |
> |
> hyb_he...@163.com
> |


Re: Flink SQL 中同时写入多个 sink 时,是否能够保证先后次序

2022-10-19 文章 Zhiwen Sun
谢谢,有具体的思路嘛?

比如我需要先写入 jdbc 后再发送消息

是自定义一个 DynamicTableSink , 里面有 JdbcDynamicTableSink KafkaDynamicSink ,
还是说继承 JdbcDynamicTableSink , 自定义的类里面再去 new KafkaDynamicSink?


初看起来没办法知道什么时候 db 写入了。要知道什么时候写入,要去自定义 TableInsertOrUpdateStatementExecutor

Zhiwen Sun



On Tue, Oct 18, 2022 at 5:56 PM 悟空  wrote:

> Hi Zhiwen Sun:
>
>  自定义Sink 这个思路没问题的,我这边目前就是通过这种方式实现的,只需要自定义一个connector 融合多个sink
> connector ,相关options 沿用flink 官方connector的 ,这样方便后续升级。
>  如果有具体相关问题,欢迎讨论。
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> pens...@gmail.com;
> 发送时间:2022年10月14日(星期五) 中午11:55
> 收件人:"user-zh"
> 主题:Re: Flink SQL 中同时写入多个 sink 时,是否能够保证先后次序
>
>
>
> 好的,谢谢大家,之前也想过这个方案,复用/继承 JdbcDynamicTableSink 相关代码自定义 connector 。
>
> Zhiwen Sun
>
>
>
> On Fri, Oct 14, 2022 at 10:08 AM yidan zhao  wrote:
>
>  在一个自定义sink中实现先写database,再发消息。
> 
> 
> 或者2个都是自定义的,但是不能通过sink,因为sink后就没数据了。通过process,第一个process完成写入database后,后续process发送消息。
> 
>  Shuo Cheng   
>   Flink SQL 自身机制无法保证同一个作业多个 sink 的写入次序。 是否可以考虑从业务逻辑上动手脚,比如写入消息队列
> sink 前加个
>  udf
>   filter, udf 查询 database,满足条件才写入消息队列,当然这种方式对性能可能有影响。
>  
>   On Wed, Oct 12, 2022 at 2:41 PM Zhiwen Sun  wrote:
>  
>hi all:
>   
>    我们有个场景,需要 Flink SQL 同时写入消息和 database, 后续实时任务消费消息,再次读取
> database, 如果消息先于
>database 写入,这就可能导致读取的数据不正确。
>   
>是否有办法保证 database 写入后,再发送消息?
>   
>Zhiwen Sun
>   
> 


Re: Flink 1.15 Deduplicate之后Interval Join出错

2022-10-17 文章 Zhiwen Sun
不用 interval join 用普通的流 join 。时间只要不是 proctime 或者 eventtime 就行。

Zhiwen Sun



On Sat, Oct 15, 2022 at 9:46 AM 余列冰  wrote:

> Hi!
>
> 我在使用Deduplicate之后进行Interval Join出现问题。我使用的Flink版本是1.15
>
> 我希望使用Flink的Interval Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。
> ```sql
> CREATE TEMPORARY TABLE `source` (
>   id INT,
>   name STRING,
>   event_time TIMESTAMP(3),
>   WATERMARK FOR event_time AS event_time
> ) WITH (
>   'connector' = 'datagen'
> );
>
>
> CREATE TEMPORARY TABLE B (
>   id INT,
>   `start` INT,
>   `end` INT,
>   event_time TIMESTAMP(3),
>   WATERMARK FOR event_time AS event_time
> ) WITH (
>   'connector' = 'datagen'
> );
>
> create TEMPORARY view A as
> select id, name, event_time from (
>   select id, name, event_time,
>   row_number() over(partition by id, name, event_time order by event_time
> asc) as rn
>   from source
> )
> where rn = 1;
>
> SELECT *
> FROM A, B
> WHERE
> A.id = B.id AND A.id >= B.`start` AND A.id <= B.`end` AND
> A.event_time BETWEEN B.event_time - INTERVAL '10' SECOND AND
> B.event_time + INTERVAL '10' SECOND;
> ```
>
> 在去重时我采用了保留第一行数据,这时view A应该只会产生insert的行,但是运行上述SQL会出现如下错误。
> ```
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin
> doesn't support consuming update and delete changes which is produced by
> node Deduplicate(keep=[FirstRow], key=[id, name, event_time],
> order=[ROWTIME])
> ```
>
> 请问如何在使用Deduplicate之后进行Interval Join?
>
>
> > -原始邮件-
> > 发件人: LB 
> > 发送时间: 2022-10-15 09:39:31 (星期六)
> > 收件人: user-zh 
> > 抄送:
> > 主题: Flink 1.15 Deduplicate之后Interval Join出错
> >
> > 抱歉上一封邮件格式有问题,以此为准。Hi! 我在使用Deduplicate之后进行Interval
> Join出现问题。我使用的Flink版本是1.15 我希望使用Flink的Interval
> Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。 ```sql CREATE TEMPORARY TABLE `source`
> (   id INT,   name STRING,   event_time TIMESTAMP(3),   WATERMARK FOR
> event_time AS event_time ) WITH (   'connector' = 'datagen' ); CREATE
> TEMPORARY TABLE B (   id INT,   `start` INT,   `end` INT,   event_time
> TIMESTAMP(3),   WATERMARK FOR event_time AS event_time ) WITH (
>  'connector' = 'datagen' ); create TEMPORARY view A as select id, name,
> event_time from (   select id, name, event_time,   row_number()
> over(partition by id, name, event_time order by event_time asc) as rn
>  from source ) where rn = 1; SELECT * FROM A, B WHEREA.id = B.id AND
> A.id = B.`start` AND A.id <= B.`end` ANDA.event_time BETWEEN
> B.event_time - INTERVAL '10' SECOND ANDB.event_time + INTERVAL '10'
> SECOND; ``` 在去重时我采用了保留第一行数据,这时view A应该只会产生insert的行,但是运行上述SQL会出现如下错误。 ```
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin
> doesn't support consuming update and delete changes which is produced by
> node Deduplicate(keep=[FirstRow], key=[id, name, event_time],
> order=[ROWTIME]) ``` 请问如何在使用Deduplicate之后进行Interval Join?
>


Re: Flink SQL 中同时写入多个 sink 时,是否能够保证先后次序

2022-10-13 文章 Zhiwen Sun
好的,谢谢大家,之前也想过这个方案,复用/继承 JdbcDynamicTableSink 相关代码自定义 connector 。

Zhiwen Sun



On Fri, Oct 14, 2022 at 10:08 AM yidan zhao  wrote:

> 在一个自定义sink中实现先写database,再发消息。
>
> 或者2个都是自定义的,但是不能通过sink,因为sink后就没数据了。通过process,第一个process完成写入database后,后续process发送消息。
>
> Shuo Cheng  于2022年10月12日周三 16:59写道:
> >
> > Flink SQL 自身机制无法保证同一个作业多个 sink 的写入次序。 是否可以考虑从业务逻辑上动手脚,比如写入消息队列 sink 前加个
> udf
> > filter, udf 查询 database,满足条件才写入消息队列,当然这种方式对性能可能有影响。
> >
> > On Wed, Oct 12, 2022 at 2:41 PM Zhiwen Sun  wrote:
> >
> > > hi all:
> > >
> > > 我们有个场景,需要 Flink SQL 同时写入消息和 database, 后续实时任务消费消息,再次读取 database, 如果消息先于
> > > database 写入,这就可能导致读取的数据不正确。
> > >
> > > 是否有办法保证 database 写入后,再发送消息?
> > >
> > > Zhiwen Sun
> > >
>


Re: Re: flink实时双流驱动join问题

2022-09-23 文章 Zhiwen Sun
实际业务的确是这样的。
state 永不过期, 要全量的数据计算,全量的数据放到 state 里面。

目前看来只有等 flink table store 了。

Zhiwen Sun



On Fri, Sep 23, 2022 at 8:29 AM casel.chen  wrote:

>
> 我这里只是举了一个例子表示Flink用于OLAP实时关联场景会遇到的一个问题,实际业务中确实会出现两张关联表都需要更新情况,不管哪一边更新数据业务都想获取到最新关联结果,而不是旧的关联状态。引出我想问的另一个问题是如果查询模式固定,Flink实时关联是否能取代OLAP系统例如Doris呢?
> 1. 一般双流join为避免state无限膨胀,都会设置ttl,你这边的业务场景ttl需要保留n个月?
> 确切地说不应该设置ttl,业务数据有长尾效应,大多数都在当天更新完毕,短的几秒种,长的甚至会在半年后还发生更新
>
>
> 2. order流和user流在业务场景上要求的state ttl时长是不是不一样?
> 同上
>
>
> 3. order流和user流的数据规模/state size规模大概可以到什么级别?
> TB级别
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-09-20 10:28:49,"Jinzhong Li"  写道:
> >hi,casel, 关于你们的业务场景,我有几个问题, 希望可以交流一下。
> >1. 一般双流join为避免state无限膨胀,都会设置ttl,你这边的业务场景ttl需要保留n个月?
> >2. order流和user流在业务场景上要求的state ttl时长是不是不一样?
> >(从你描述上来看,user流的ttl需要几个月,order流可以比较短些?)
> >3. order流和user流的数据规模/state size规模大概可以到什么级别?
> >
> >casel.chen  于2022年9月17日周六 10:59写道:
> >
> >> 请教一个flink实现实时双流驱动join问题:
> >>
> >>
> >> order cdc流字段:order_id, order_status, order_time, user_id (order_id是主键)
> >> user cdc流字段:user_id, user_name, user_phone, user_address(user_id是主键)
> >> 关联结果流字段:order_id, order_status, order_time, user_name, user_phone,
> >> user_address(order_id是主键)
> >> 期望当order流数据更新或user流数据更新时,关联结果流数据都会得到更新。inner join不满足是因为两条流distinct
> >> id都很大,状态会很大,且不能TTL,因为user流更新时间不定,短的几小时,长达上月。
> >>
> >>
> >> 请问这种场景下要如何使用flink实现实时双流驱动join?
>


Re: 关于 UDAF 里面 ListView 的疑问

2022-09-07 文章 Zhiwen Sun
hi,

感谢你的回复。

报错是在 getValue 的时候。

at GroupAggsHandler$439.getValue(Unknown Source)
at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:146)



我的疑问是使用一个 Class 包装下 ListView 就能正常工作,而直接使用 ListView 是会报错。

比如使用  AggregateFunction  就正常,而使用
AggregateFunction>  就会 NPE。


我怀疑使用 ListView 时,无法正常获得 TypeInference。


Zhiwen Sun



On Wed, Sep 7, 2022 at 11:46 PM Xuyang  wrote:

> Hi,
> 理论上来说,在你的case中,会先通过createAccumulator方法创建一个ListView作为acc,然后,每一个输入的row都会触发accumulate方法,将数据更新到刚才的acc中,最终通过getValue方法拿到当前acc的值。
>
>
>
>
> 实际测试中的NPE发生在更新acc的时候还是getValue的时候呢?可以通过在这三个阶段设一下断点,分别看一下当前持有的acc是不是同一个对象
>
>
>
>
> --
>
> Best!
> Xuyang
>
>
>
>
>
> 在 2022-09-07 16:23:25,"Zhiwen Sun"  写道:
>
> Hi,
> 理论上来说,在你的case中,会先通过createAccumulator方法创建一个ListView作为acc,然后,每一个输入的row都会触发accumulate方法,将数据更新到刚才的acc中,最终通过getValue方法拿到当前acc的值。实际测试中的NPE发生在更新acc的时候还是getValue的时候呢?可以通过在这三个阶段设一下断点,分别看一下当前持有的acc是不是同一个对象


关于 UDAF 里面 ListView 的疑问

2022-09-07 文章 Zhiwen Sun
Hello all,

我看 ListView 使用的时候,有以下示例

 public class MyAccumulator {

public ListView list = new ListView<>();

// or explicit:
// {@literal @}DataTypeHint("ARRAY")
// public ListView list = new ListView<>();

public long count = 0L;
  }

public class MyAggregateFunction extends AggregateFunction

我想请教下大家,为什么需要在外层包裹一个 MyAccumulator 呢, 我实际测下来, 直接时用
AggregateFunction> 在 getValue 的时候会报空指针异常

Flink 版本: 1.13.1

谢谢。


Zhiwen Sun


Re: Re: 关于Flink state初始化的问题

2022-08-29 文章 Zhiwen Sun
你应该没有正确理解 state 的使用

我们一般在程序里面是用的是 KeyedState , 也就是和 key 伴随的。

基于上面,所以 open() 里面只能对 state 进行初始化, 但是没有办法设置 state 的 value,因为这时候没有 key ;
另外一方面,也不会在 map() 的时候去 new state (可以认为 state 是一个大的 Map,你 map 的时候只是操作其中的一个
key)。

回到你的需求,你应该在 open() 的时候保存相关信息到类变量里面,当 map() 的时候再去 update state。






Zhiwen Sun



On Fri, Aug 26, 2022 at 1:55 PM 曲洋  wrote:

>
> 对的,是后者,statAccumulator.value()是null,就是map方法中取值就成null了,但是open中命名初始化了,这个是因为map太快了吗,state没初始化完就开始拿了吗.
> 嗯嗯,我现在改成先进行判断
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2022-08-26 11:22:43,"Hangxiang Yu"  写道:
> >open确实是初始化的时候就会调用的;
>
> >第一次调用是null是说statAccumulator是null还是statAccumulator.value()是null,后者的话是正常可能会出现的;
> >这里的写法看起来有点问题,一般用value方法取出来可以先判断下,然后对value state的更新用update方法;
> >
> >On Fri, Aug 26, 2022 at 10:25 AM 曲洋  wrote:
> >
> >> 各位好,
> >>
> >>
> 我想请教一个问题,我的Flink应用中会在state里边存储一些指标,比如一年的总数,然后当任务各种原因断掉的时候,我希望可以通过入参的方式直接调节这个state,但是遇到了一个问题,如下:
> >> 我重写了RichMapFunction,yearTotal
> >>
> 这个指标是通过命令行传进来的,然后我希望初始化到state里边,但是我发现,open方法第一次调用的时候state都是null,然后这个参数就进不来
> >> 所以我想问下这个场景怎么办,还有open方法的生命周期是怎么样的,我本以为是map第一次打开的时候就会调用,结果好像不是
> >> public static class AccumulateAmounts extends RichMapFunction >> BlueAccumulaterInitState> {
> >> private transient ValueState
> >> statAccumulator;
> >>
> >>
> >> @Override
> >> public BlueAccumulaterInitState map(v2bean currentAccumulator)
> >> throws Exception {
> >>
> >>
> >> BlueAccumulaterInitState stat = (statAccumulator.value() !=
> >> null) ? statAccumulator.value() : new BlueAccumulaterInitState();
> >> Long yearIncrement = year.equals(stat.getYear()) ?
> >> stat.getYearMetric() + 1L : 1L;
> >> stat.setYearMetric(yearIncrement);
> >>
> >>
> >> statAccumulator.update(stat);
> >> return stat;
> >> }
> >>
> >>
> >> @Override
> >> public void open(Configuration config) {
> >> ValueStateDescriptor descriptor =
> >> new ValueStateDescriptor<>(
> >> "total",
> >> TypeInformation.of(new
> >> TypeHint() {
> >> }));
> >> statAccumulator = getRuntimeContext().getState(descriptor);
> >> ExecutionConfig.GlobalJobParameters globalParams =
> >> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
> >> Configuration globConf = (Configuration) globalParams;
> >> long yearTotal =
> >> globConf.getLong(ConfigOptions.key("year").longType().noDefaultValue());
> >> statAccumulator.value().setYearMetric(yearTotal);
> >>
> >>
> >>
> >> }
> >> }
> >
> >
> >
> >--
> >Best,
> >Hangxiang.
>


Re: Re:Re: Re: Flink 使用interval join数据丢失疑问

2022-06-14 文章 Zhiwen Sun
我猜测是 watermark 的问题, 看楼主的设置, watermark 是 -2s ,也就是说, order header 流,有数据晚了 2s
,就会被丢弃。

楼主之前看的也是 订单明细比订单主表晚几秒, 这只是同一个订单的数据生成时间差异。 如果是这样的话,使用一般的 inner join + ttl
就可以满足需求了。

BTW: watermark 我觉得很难使用好,实际使用场景非常有限。



Zhiwen Sun



On Wed, Jun 15, 2022 at 11:43 AM Shengkai Fang  wrote:

> > 我在sql inner join里设置的状态ttl为20s,为什么20s的状态保留数据要比interval join开分钟级别的数据还要准确
>
> 不合理的 watermark 设置在 interval join 就会导致丢数据。设置 ttl 情况下,如果某个 key
> 的数据频繁访问情况下,那么这个数据就不会过期。
>
> > 我的做法是排查了tm和jm的日志,没有看见写的表的配置的日志。
>
> 我记得日志是会打印相关的日志。能提一些相关的日志吗?
>
> best,
> Shengkai
>
> lxk  于2022年6月14日周二 20:04写道:
>
> > Hi,
> >   我目前使用sql interval join,窗口的上下界增加到分钟级别,分别是-2 minute 和 +4 minute
> > 目前来看数据量和使用inner join要差不多了。以下是代码
> > Table headerTable =
> > streamTableEnvironment.fromDataStream(headerFilterStream,
> >  Schema.newBuilder()
> > .columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS
> > TIMESTAMP_LTZ(3))")
> > .watermark("rowtime", "rowtime  + INTERVAL '2' SECOND")
> > .build());
> > Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream,
> > Schema.newBuilder()
> > .columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS
> > TIMESTAMP_LTZ(3))")
> > .watermark("rowtime", "rowtime  + INTERVAL '2' SECOND")
> > .build());
> >
> >
> > streamTableEnvironment.createTemporaryView("header",headerTable);
> > streamTableEnvironment.createTemporaryView("item",itemTable);
> > Table result = streamTableEnvironment.sqlQuery("select
> header.customer_id"
> > +
> > ",item.goods_id" +
> > ",header.id" +
> > ",header.order_status" +
> > ",header.shop_id" +
> > ",header.parent_order_id" +
> > ",header.order_at" +
> > ",header.pay_at" +
> > ",header.channel_id" +
> > ",header.root_order_id" +
> > ",item.id" +
> > ",item.row_num" +
> > ",item.p_sp_sub_amt" +
> > ",item.display_qty" +
> > ",item.qty" +
> > ",item.bom_type" +
> > " from item JOIN header on header.id = item.order_id and item.rowtime
> > BETWEEN header.rowtime - INTERVAL '2' MINUTE AND header.rowtime +
> INTERVAL
> > '4' MINUTE");
> >
> >   对此,我又有新的疑问了,我在sql inner join里设置的状态ttl为20s,为什么20s的状态保留数据要比interval
> > join开分钟级别的数据还要准确?针对这个问题,不知道大家有什么看法和思路?
> >   我的一个猜测是我设置的表的ttl没有生效,inner join一直使用的是全量的数据,所以结果准确度要比interval
> > join高,我的做法是排查了tm和jm的日志,没有看见写的表的配置的日志,我的配置如下:
> > Configuration conf = new Configuration();
> > conf.setString("table.exec.mini-batch.enabled","true");
> > conf.setString("table.exec.mini-batch.allow-latency","15 s");
> > conf.setString("table.exec.mini-batch.size","100");
> > conf.setString("table.exec.state.ttl","20 s");
> > env.configure(conf);
> > StreamTableEnvironment streamTableEnvironment =
> > StreamTableEnvironment.create(env,
> > EnvironmentSettings.fromConfiguration(conf));
> >
> >
> > 我想了解下,从tm和jm日志是否能正确反应我的配置生效?如果不行,那我要使用什么方法才能知道我的这个配置是否生效?
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2022-06-13 21:12:48,"Xuyang"  写道:
> > >Hi,
> > >  1、理论上来说inner join关联的数据量应该比interval
> > join更大吧。关于左右两边流速度不一致的情况,理论上应该问题不大,因为需要等到两边的watermark都到齐之后才会触发状态里过期数据的清除。
> > >  2、inner
> >
> join没有水印的情况下,就是到了就发,完全根据这条数据进入这个算子的时间来算,也就是“处理时间”。默认数据是不会过期的,会存全量数据。如果定义了ttl,得看join两侧的表的pk和join
> >
> key,分别用不同的state数据格式来存数据(ListState、MapState等),原则上是多长时间没更新数据之后,就清空过期数据,是按照的“处理时间”来处理的。
> > >
> > >
> > >如果我有不对的地方,请指正我哈。
> > >
> > >
> > >
> > >
> > >--
> > >
> > >Best!
> > >Xuyang
> > >
> > >
> > >
> > >
> > >
> > >在 2022-06-12 14:39:39,"lxk7...@163.com"  写道:
> > >>非常感谢回复
> > >>1.针对watermark,我会再去进行测试。同时还会测试使用处理时间,interval join会不会丢失数据
> > >>2.针对interval jon,我个人的理解是它能关联到的数据范围要比inner
> >
> join大,所以数据应该更准确,但是从结果上看却是数据丢失,当时非常震惊,有点颠覆我的认知了。同时我自己还有一个新的猜测,就是两个流的数据量不一样,可能也会造成数据丢失。目前左流是订单粒度数据,右流是订单-商品粒度数据,数据量要大很多。我个人理解,在处理右流的时候,应该会慢一点,所以导致两边的时间进展可能不一致。但是这又引发了一个新的疑问?inner
> > join应该也会受这样的影响
> > >>3.还有一个问题可能是我没有阐述清楚,我在sql里使用inner
> > join,没有注册水印,那么两个流的join应该是以处理时间来定义的?那么表的state的过期是否也是以处理

Re: flink-connector-jdbc是否支持多个values问题

2022-06-14 文章 Zhiwen Sun
支持同时写入多个 values ,这个是 jdbcurl 控制,设置 *rewriteBatchedStatements=true*

生成的 SQL 类似:

INSERT INTO `order_summary`(`order_id`, `proctime`, `order_status`,
> `order_name`, `total`)
>  VALUES
>   (3, '2022-06-14 22:31:24.699', 'OK', 'order-name-1', 20) ,
>   (2, '2022-06-14 22:31:21.496', 'OK', 'order-name-1', 131)
> ON DUPLICATE KEY UPDATE `order_id`=VALUES(`order_id`),
> `proctime`=VALUES(`proctime`), `order_status`=VALUES(`order_status`),
> `order_name`=VALUES(`order_name`), `total`=VALUES(`total`)



Zhiwen Sun



On Mon, Mar 7, 2022 at 5:07 PM 黑色  wrote:

> 你看一下底层的源码实现全知道了,它insert into x() values() ON duplicate
> Key实现Insert update,所以不会的
>
>
>
>
> --原始邮件--
> 发件人: "payne_z" 发送时间: 2022年3月7日(星期一) 下午3:49
> 收件人: "user-zh" 主题: flink-connector-jdbc是否支持多个values问题
>
>
>
> 请问flink-connector-jdbc是否支持同时写入多个values的用法?


Re: flink sql回撤流sink优化问题

2021-12-25 文章 Zhiwen Sun
不用那么复杂,正常的 insert select group by 即可, 一分钟写一次 mysql 就行。

参考 JDBC sink [1] 中的 sink.buffer-flush.interval 和 sink.buffer-flush.max-rows
参数

[1] :
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/


Zhiwen Sun



On Thu, Dec 23, 2021 at 8:15 AM casel.chen  wrote:

> flink sql中aggregate without
> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql
> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游?
> 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗?
>
>
> 例如有下面binlog cdc购买数据(订单购买金额会更新):
>
> orderid.   categorydt  amt
>
> 订单id 商品类型   购买时间(MMddHH)  购买金额
>
>
>
>
> 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时
>
>
>
> INSERT INTO mysql_sink_table
>
> SELECT category, dt, LAST_VALUE(total)
>
> OVER (
>
>   PARTITION BY category
>
>   ORDER BY PROCTIME()
>
>   RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW
>
> ) AS var1
>
> FROM (
>
>   SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt
>
> );


Re: Re: Flink任务每运行20天均会发生内部异常

2021-10-27 文章 Zhiwen Sun
看看 task manager 的 jvm 内存, jstack 情况 ?

Zhiwen Sun



On Tue, Oct 26, 2021 at 7:22 PM mayifan  wrote:

> 非常感谢大佬的答复:
>
> 目前从任务来看的话总共存在三个任务,其中两个异常任务分别使用了1到2个MapState,过期时间均为1天或3天。
>
> 正常运行的任务使用了MapState及ListState各4个,过期时间为60min-120min。
>
> 异常任务在产生异常后从checkpoint重启又会恢复正常。
>
>
> > -- 原始邮件 --
> > 发 件 人:"Caizhi Weng" 
> > 发送时间:2021-10-26 18:45:44
> > 收 件 人:"flink中文邮件组" 
> > 抄 送:
> > 主 题:Re: Flink任务每运行20天均会发生内部异常
> >
> > Hi!
> >
> > 听起来和 state 过期时间非常有关。你配置了哪些和 state 过期相关的参数?是否有 20 天过期的 state?
> >
> > mayifan 于2021年10月26日周二 下午4:43写道:
> >
> > > Hi!
> > >
> > > 麻烦请教大家一个问题。
> > >
> > >
> > >
> 有三个Flink任务以yarn-per-job模式运行在Flink-1.11.2版本的集群上,均使用RocksDB作为状态后端,数据以增量的方式写入RocksDB,且均配置了状态过期时间。
> > >
> > >
> > >
> 任务逻辑大致都是通过状态与历史数据进行自关联或双流join,每输入一条数据都会产出等量、1/2或多倍的数据到下游,当数据无法通过状态关联,任务则无法向下游产出数据。
> > >
> > >
> > >
> 奇怪的是三个任务中有两个任务存在异常,异常现象是每次当任务启动运行至第20个工作日,都会非常准时的产生下游数据输出骤降的现象,输出与输入的数据量级差数十倍,并且此时任务中没有任何异常日志。
> > >
> > >
> > >
> > >
> > > 问题:目前怀疑是集群配置或RocksDB状态的问题,但是没有任何思路或排查线索,请问这种现象是怎样产生的?应该怎样排查?
>
>
>
>
>


Re: 如何将canal json格式数据按操作类型过滤

2021-07-07 文章 Zhiwen Sun
先通过 json 或者 raw format 消费原始 canal kafka , 过滤掉 delete 的数据写入到一个新的 kafka
,然后你再基于新的 kafka 建一个 canal-json 的表来落地。

Zhiwen Sun



On Wed, Jul 7, 2021 at 10:51 PM JasonLee <17610775...@163.com> wrote:

> hi
>
>
> 最后一个字段 type 就是操作的类型, 过滤掉 DELETE 就行了.
>
>
> Best
> JasonLee
> 在2021年7月7日 22:43,casel.chen 写道:
> 使用场景:我们使用canal将mysql binlog输出到kafka,然后想通过flink消费kafka数据过滤掉 delete
> 操作的数据插入到文件系统,因为要做历史数据存档用。
> 查了下官网
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/canal/#available-metadata
> {"data":[{"id":"111","name":"scooter","description":"Big 2-wheel
> scooter","weight":"5.18"}],"database":"inventory","es":158937356,"id":9,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"weight":"5.15"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products","ts":1589373560798,"type":"UPDATE"}
>
> CREATETABLEKafkaTable(origin_databaseSTRINGMETADATAFROM'value.database'VIRTUAL,origin_tableSTRINGMETADATAFROM'value.table'VIRTUAL,origin_sql_typeMAPMETADATAFROM'value.sql-type'VIRTUAL,origin_pk_namesARRAYMETADATAFROM'value.pk-names'VIRTUAL,origin_tsTIMESTAMP(3)METADATAFROM'value.ingestion-timestamp'VIRTUAL,user_idBIGINT,item_idBIGINT,behaviorSTRING)WITH('connector'='kafka','topic'='user_behavior','properties.bootstrap.servers'='localhost:9092','
> properties.group.id
> '='testGroup','scan.startup.mode'='earliest-offset','value.format'='canal-json');
> 只能获取到原始 database, table, sql-type, pk-names, ingestion-timestamp
> 字段,而拿不到代表操作类型的 type 字段。请问有什么别的办法么?
>
>
>
>
>


Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 文章 Zhiwen Sun
parquet 相关依赖增加了吗?

Zhiwen Sun



On Sun, Jun 27, 2021 at 3:57 PM Wei JI10 季伟 
wrote:

> Hi:
>在使用flink sql connector的filesytem时,指定format为parquet。抛出异常信息
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any format factory for identifier 'parquet' in the classpath.
>at
> org.apache.flink.table.filesystem.FileSystemTableSource.(FileSystemTableSource.java:97)
>at
> org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:72)
>at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:119)
>... 41 more
>
> Sql语句如下:
> CREATE TABLE user_info (
> `user_id` bigint,
> `user_name` string
> ) PARTITIONED BY (user_id) WITH (
> 'connector' = 'filesystem',
> 'path' = '',
> 'format' = 'parquet'
> );
>
> CREATE TABLE sink_table (
> `user_id` bigint,
> `user_name` string
> ) PARTITIONED BY (datetime) WITH (
> 'connector'='filesystem',
> 'path'='',
> 'format'='parquet',
> 'sink.partition-commit.delay'='1h',
> 'sink.partition-commit.policy.kind'='success-file'
> );
>
> insert OVERWRITE sink_table select *, '2021062600' as  datetime from
> user_info;
>


Re: Flink 提交到yarn失败

2021-06-18 文章 Zhiwen Sun
HADOOP_CLASSPATH 设置了吗?

Zhiwen Sun



On Fri, Jun 18, 2021 at 9:47 AM yangpengyi <963087...@qq.com.invalid> wrote:

> 环境: FLINK 1.12 & CDH6.1.1
> 问题:
>
> 利用yarn-per-job提交时,在初始化hdfs客户端时出错。看起来应该是hadoop版本的兼容问题,不过从堆栈看应该使用到了正确的客户端jar包。
> Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
> Cannot support file system for 'hdfs' via Hadoop, because Hadoop is not in
> the classpath, or some classes are missing from the classpath.
> at
>
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:184)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:487)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>
> org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>
> org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>
> org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:117)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:309)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:272)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:212)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:173)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at java.security.AccessController.doPrivileged(Native Method)
> ~[?:1.8.0_181]
> at javax.security.auth.Subject.doAs(Subject.java:422)
> ~[?:1.8.0_181]
> at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
> ~[cloud-flinkAppCrashAnalysis-1.0.0-encodetest-RELEASE.jar:?]
> at
>
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> at
>
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:172)
> ~[flink-dist_2.11-1.12.0.jar:1.12.0]
> ... 2 more
> Caused by: java.lang.VerifyError: Bad return type
> Exception Details:
>   Location:
>
>
> org/apache/hadoop/hdfs/DFSClient.getQuotaUsage(Ljava/lang/String;)Lorg/apache/hadoop/fs/QuotaUsage;
> @157: areturn
>   Reason:
> Type 'org/apache/hadoop/fs/ContentSummary' (current frame, stack[0]) is
> not assignable to 'org/apache/hadoop/fs/QuotaUsage' (from method signature)
>   Current Frame:
> bci: @157
> flags: { }
> locals: { 'org/apache/hadoop/hdfs/DFSClient', 'java/lang/String',
> 'org/apache/hadoop/ipc/RemoteException', 'java/io/IOException' }
> stack: { 'org/apache/hadoop/fs/ContentSummary' }
>   Bytecode:
> 0x000: 2ab6 00b5 2a13 01f4 2bb6 00b7 4d01 4e2a
> 0x010: b400 422b b901 f502 003a 042c c600 1d2d
> 0x020: c600 152c b600 b9a7 0012 3a05 2d19 05b6
> 0x030: 00bb a700 072c b600 b919 04b0 3a04 1904
> 0x040: 4e19 04bf 3a06 2cc6 001d 2dc6 0015 2cb6
> 0x050: 00b9 a700 123a 072d 1907 b600 bba7 0007
> 0x060: 2cb6 00b9 1906 bf4d 2c07 bd00 d459 0312
> 0x070: d653 5904 12e0 5359 0512 e153 5906 1301
> 0x080: f653 b600 d74e 2dc1 01f6 9900 14b2 0023
> 0x090: 1301 f7b9 002b 0200 2a2b b601 f8b0 2dbf
> 0x0a0:
>   Exception Handler Table:
> bci [35, 39] => handler: 42
> bci [15, 27] => handler: 60
> bci [15, 27] => handler: 68
> bci [78, 82] => handler: 85
> bci [60, 70] => handler: 68
> bci [4, 57] => handler: 103
> bci [60, 103] => handler: 103
>   Stackmap Table:
>
>
> full_frame(@42,{Object[#751],Object[#774],Object[#829],Object[#799],Object[#1221]},{Object[#799]})
> same_frame(@53)
> same_frame(@57)
>
>
> full_frame(@60,{Object[#751],Object[#774],Object[#829],Object[#799]},{Object[#799]})
> same_locals_1_stack_item_frame(@68,Object[#799])
>
>
> full_frame(@85,{Object[#751],Object[#774],Obje

Re: 邮件退订

2021-06-08 文章 Zhiwen Sun
退订是发邮件到 user-zh-unsubscr...@flink.apache.org

Zhiwen Sun



On Tue, Jun 8, 2021 at 10:21 PM happiless  wrote:

> 您好,麻烦邮件退订一下
>
>
> 发自我的iPhone


Re: flink1.11.2 yarn-session 部分类路径未加载

2021-06-01 文章 Zhiwen Sun
不需要 mapreduce 相关库吧。

我看我的 job 里加载到 classpath 的也没有 mapreduce。

Zhiwen Sun



On Wed, Jun 2, 2021 at 11:56 AM datayangl  wrote:

> flink1.11.2 启动yarn-session之后发现,有部分类路径始终没有加载到class_path中去
> 环境变量配置如下:
> <
> http://apache-flink.147419.n8.nabble.com/file/t919/66604010-2A08-4A68-8478-70A27D61224B.png>
>
>
> 其中tm的日志如下:
> tm.log <http://apache-flink.147419.n8.nabble.com/file/t919/tm.log>
>
> 其中hadoop-mapreduce-client相关的类路径一直没有加载到class_path中,求指教
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink 1.12.2 编译报错

2021-05-28 文章 Zhiwen Sun
看了下依赖树,似乎是因为 org.powermock 引入了 hamcrest-core 导致 junit 无法引入,但 junit
在前面,按道理应该它应该将 hamcrest-core 引入到 compile scope 。

[INFO]

[INFO] Building Flink : Test utils : Junit 1.12.2
[INFO]

[INFO]
[INFO] --- maven-dependency-plugin:3.1.1:tree (default-cli) @
flink-test-utils-junit ---
[INFO] org.apache.flink:flink-test-utils-junit:jar:1.12.2
[INFO] +- junit:junit:jar:4.12:compile
[INFO] +- org.apache.logging.log4j:log4j-slf4j-impl:jar:2.12.1:compile
[INFO] +- org.apache.logging.log4j:log4j-api:jar:2.12.1:compile
[INFO] +- org.apache.logging.log4j:log4j-core:jar:2.12.1:compile
[INFO] +- org.apache.flink:force-shading:jar:1.12.2:compile
[INFO] +- org.slf4j:slf4j-api:jar:1.7.15:compile
[INFO] +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
[INFO] +- org.mockito:mockito-core:jar:2.21.0:test
[INFO] |  +- net.bytebuddy:byte-buddy:jar:1.8.15:test
[INFO] |  +- net.bytebuddy:byte-buddy-agent:jar:1.8.15:test
[INFO] |  \- org.objenesis:objenesis:jar:2.1:test
[INFO] +- org.powermock:powermock-module-junit4:jar:2.0.4:test
[INFO] |  +- org.powermock:powermock-module-junit4-common:jar:2.0.4:test
[INFO] |  |  +- org.powermock:powermock-reflect:jar:2.0.4:test
[INFO] |  |  \- org.powermock:powermock-core:jar:2.0.4:test
[INFO] |  | \- org.javassist:javassist:jar:3.24.0-GA:test
[INFO] |  \- org.hamcrest:hamcrest-core:jar:1.3:test
[INFO] +- org.powermock:powermock-api-mockito2:jar:2.0.4:test
[INFO] |  \- org.powermock:powermock-api-support:jar:2.0.4:test
[INFO] +- org.hamcrest:hamcrest-all:jar:1.3:test
[INFO] \- org.apache.logging.log4j:log4j-1.2-api:jar:2.12.1:test
[INFO]

[INFO] BUILD SUCCESS
[INFO]



Zhiwen Sun



On Fri, May 28, 2021 at 11:12 AM Zhiwen Sun  wrote:

> 谢谢,看了下,junit 的确依赖 org.hamcrest ,而且相关版本都没问题。
>
> 那这个报错的原因是什么呢? 什么地方导致 hamcrest 被 exclude 了?然后手动增加了dependency 就好了? 代码拉下来没修改过。
>
> Zhiwen Sun
>
>
>
> On Fri, May 28, 2021 at 10:58 AM Shuo Cheng  wrote:
>
>> Hi, org.hamcrest 是 junit 的依赖
>>
>> On Fri, May 28, 2021 at 10:28 AM Zhiwen Sun  wrote:
>>
>> > 才编译到 Test utils : Junit 模块,就报错了
>> >
>> > maven 版本: 3.2.5
>> > jdk 版本:1.8.0_251
>> > flink 版本: flink 1.12.2
>> > 执行的命令:mvn clean install -DskipTests -Dfast
>> >
>> > 错误信息:
>> >
>> > [ERROR] COMPILATION ERROR :
>> > [INFO] -
>> > [ERROR]
>> >
>> >
>> /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,27]
>> > package org.hamcrest does not exist
>> > [ERROR]
>> >
>> >
>> /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,1]
>> > static import only from classes and interfaces
>> > [ERROR]
>> >
>> >
>> /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[39,27]
>> > package org.hamcrest does not exist
>> >
>> > [INFO] Reactor Summary:
>> > [INFO]
>> > [INFO] Flink : Tools : Force Shading .. SUCCESS [
>> >  1.042 s]
>> > [INFO] Flink :  SUCCESS [
>> >  1.404 s]
>> > [INFO] Flink : Annotations  SUCCESS [
>> >  0.735 s]
>> > [INFO] Flink : Test utils : ... SUCCESS [
>> >  0.042 s]
>> > [INFO] Flink : Test utils : Junit . FAILURE [
>> >  0.283 s]
>> >
>> >
>> > 看起来是缺少 org.hamcrest  相关依赖
>> > 我看 flink-test-utils-parent/pom.xml 和 flink-test-utils-junit/pom.xml
>> > 的确没加 org.hamcrest 相关依赖, 不知道这个是怎么工作的。
>> >
>> > 请问大家下,原因是什么呢?
>> >
>> >
>> > Zhiwen Sun
>> >
>>
>


Re: flink 1.12.2 编译报错

2021-05-27 文章 Zhiwen Sun
谢谢,看了下,junit 的确依赖 org.hamcrest ,而且相关版本都没问题。

那这个报错的原因是什么呢? 什么地方导致 hamcrest 被 exclude 了?然后手动增加了dependency 就好了? 代码拉下来没修改过。

Zhiwen Sun



On Fri, May 28, 2021 at 10:58 AM Shuo Cheng  wrote:

> Hi, org.hamcrest 是 junit 的依赖
>
> On Fri, May 28, 2021 at 10:28 AM Zhiwen Sun  wrote:
>
> > 才编译到 Test utils : Junit 模块,就报错了
> >
> > maven 版本: 3.2.5
> > jdk 版本:1.8.0_251
> > flink 版本: flink 1.12.2
> > 执行的命令:mvn clean install -DskipTests -Dfast
> >
> > 错误信息:
> >
> > [ERROR] COMPILATION ERROR :
> > [INFO] -
> > [ERROR]
> >
> >
> /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,27]
> > package org.hamcrest does not exist
> > [ERROR]
> >
> >
> /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,1]
> > static import only from classes and interfaces
> > [ERROR]
> >
> >
> /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[39,27]
> > package org.hamcrest does not exist
> >
> > [INFO] Reactor Summary:
> > [INFO]
> > [INFO] Flink : Tools : Force Shading .. SUCCESS [
> >  1.042 s]
> > [INFO] Flink :  SUCCESS [
> >  1.404 s]
> > [INFO] Flink : Annotations  SUCCESS [
> >  0.735 s]
> > [INFO] Flink : Test utils : ... SUCCESS [
> >  0.042 s]
> > [INFO] Flink : Test utils : Junit ......... FAILURE [
> >  0.283 s]
> >
> >
> > 看起来是缺少 org.hamcrest  相关依赖
> > 我看 flink-test-utils-parent/pom.xml 和 flink-test-utils-junit/pom.xml
> > 的确没加 org.hamcrest 相关依赖, 不知道这个是怎么工作的。
> >
> > 请问大家下,原因是什么呢?
> >
> >
> > Zhiwen Sun
> >
>


Re: flink 1.12.2 编译报错

2021-05-27 文章 Zhiwen Sun
在我手动加上依赖后,这个模块,能编译通过了,但 runtime 又失败了。

INFO] --- scala-maven-plugin:3.2.2:compile (scala-compile-first) @
flink-runtime_2.11 ---
[INFO] /data/flink-release-1.12.2/flink-runtime/src/main/java:-1: info:
compiling
[INFO] /data/flink-release-1.12.2/flink-runtime/src/main/scala:-1: info:
compiling
[INFO] Compiling 1958 source files to
/data/flink-release-1.12.2/flink-runtime/target/classes at 1622169188312
[ERROR] java.lang.NoClassDefFoundError: scala/reflect/internal/Trees
[INFO] at java.lang.ClassLoader.defineClass1(Native Method)
[INFO] at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
[INFO] at
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
[INFO] at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
[INFO] at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
[INFO] at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
[INFO] at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
[INFO] at java.security.AccessController.doPrivileged(Native Method)
[INFO] at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
[INFO] at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
[INFO] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
[INFO] at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
[INFO] at java.lang.ClassLoader.defineClass1(Native Method)
[INFO] at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
[INFO] at
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
[INFO] at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
[INFO] at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
[INFO] at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
[INFO] at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
[INFO] at java.security.AccessController.doPrivileged(Native Method)
[INFO] at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
[INFO] at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
[INFO] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
[INFO] at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
[INFO] at java.lang.Class.getDeclaredMethods0(Native Method)
[INFO] at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
[INFO] at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
[INFO] at java.lang.Class.getMethod0(Class.java:3018)
[INFO] at java.lang.Class.getMethod(Class.java:1784)
[INFO] at scala_maven_executions.MainHelper.runMain(MainHelper.java:155)
[INFO] at
scala_maven_executions.MainWithArgsInFile.main(MainWithArgsInFile.java:26)
[INFO] Caused by: java.lang.ClassNotFoundException:
scala.reflect.internal.Trees
[INFO] at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
[INFO] at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
[INFO] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
[INFO] at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
[INFO] ... 31 more
[INFO]

[INFO] Reactor Summary:
[INFO]
[INFO] Flink : Tools : Force Shading .. SUCCESS [
 1.093 s]
[INFO] Flink :  SUCCESS [
 1.168 s]
[INFO] Flink : Annotations  SUCCESS [
 1.218 s]
[INFO] Flink : Test utils : ... SUCCESS [
 0.047 s]
[INFO] Flink : Test utils : Junit . SUCCESS [
 0.795 s]
[INFO] Flink : Metrics : .. SUCCESS [
 0.037 s]
[INFO] Flink : Metrics : Core . SUCCESS [
 0.501 s]
[INFO] Flink : Core ... SUCCESS [
17.510 s]
[INFO] Flink : Java ... SUCCESS [
 2.467 s]
[INFO] Flink : Queryable state : .. SUCCESS [
 0.029 s]
[INFO] Flink : Queryable state : Client Java .. SUCCESS [
 3.106 s]
[INFO] Flink : FileSystems : .. SUCCESS [
 0.031 s]
[INFO] Flink : FileSystems : Hadoop FS  SUCCESS [
 3.457 s]
[INFO] Flink : Runtime  FAILURE [
18.086 s]


然后我修改 scala 的版本为 2.12
相关命令: mvn clean install -DskipTests -Dfast -Dscala-2.12
目前能够正常编译了,我看 release 版本支持 scala-2.11 的。是我的环境有问题吗?

Zhiwen Sun



On Fri, May 28, 2021 at 10:28 AM Zhiwen Sun  wrote:

> 才编译到 Test utils : Junit 模块,就报错了
>
> maven 版本: 3.2.5
> jdk 版本:1.8.0_251
> flink 版本: flink 1.12.2
> 执行的命令:mvn clean install -DskipTests -Dfast
>
> 错误信息:
>
> [ERROR] COMPILATION ERROR :
> [INFO] -
> [ERROR]
> /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,27]
> package org.hamcrest does not exist
> [ERROR]
> /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testu

flink 1.12.2 编译报错

2021-05-27 文章 Zhiwen Sun
编译到 Test utils : Junit 模块,就报错了

maven 版本: 3.2.5
jdk 版本:1.8.0_251
flink 版本: flink 1.12.2
执行的命令:mvn clean install -DskipTests -Dfast

错误信息:

[ERROR] COMPILATION ERROR :
[INFO] -
[ERROR]
/data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,27]
package org.hamcrest does not exist
[ERROR]
/data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,1]
static import only from classes and interfaces
[ERROR]
/data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[39,27]
package org.hamcrest does not exist

[INFO] Reactor Summary:
[INFO]
[INFO] Flink : Tools : Force Shading .. SUCCESS [
 1.042 s]
[INFO] Flink :  SUCCESS [
 1.404 s]
[INFO] Flink : Annotations  SUCCESS [
 0.735 s]
[INFO] Flink : Test utils : ... SUCCESS [
 0.042 s]
[INFO] Flink : Test utils : Junit . FAILURE [
 0.283 s]


而且我看 flink-test-utils-parent/pom.xml 和 flink-test-utils-junit/pom.xml
的确没加 org.hamcrest 相关依赖啊。

请问大家下,原因是什么呢?


Zhiwen Sun


flink 1.12.2 编译报错

2021-05-27 文章 Zhiwen Sun
才编译到 Test utils : Junit 模块,就报错了

maven 版本: 3.2.5
jdk 版本:1.8.0_251
flink 版本: flink 1.12.2
执行的命令:mvn clean install -DskipTests -Dfast

错误信息:

[ERROR] COMPILATION ERROR :
[INFO] -
[ERROR]
/data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,27]
package org.hamcrest does not exist
[ERROR]
/data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,1]
static import only from classes and interfaces
[ERROR]
/data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[39,27]
package org.hamcrest does not exist

[INFO] Reactor Summary:
[INFO]
[INFO] Flink : Tools : Force Shading .. SUCCESS [
 1.042 s]
[INFO] Flink :  SUCCESS [
 1.404 s]
[INFO] Flink : Annotations  SUCCESS [
 0.735 s]
[INFO] Flink : Test utils : ... SUCCESS [
 0.042 s]
[INFO] Flink : Test utils : Junit . FAILURE [
 0.283 s]


看起来是缺少 org.hamcrest  相关依赖
我看 flink-test-utils-parent/pom.xml 和 flink-test-utils-junit/pom.xml
的确没加 org.hamcrest 相关依赖, 不知道这个是怎么工作的。

请问大家下,原因是什么呢?


Zhiwen Sun