Re: flinksql join
dob_dim_account 维表如果使用 jdbc 的 connector, flink 会在初始化的时候一次性读取所有的数据, 后续数据库中更新并不会触发 flink 计算。 要解决这个问题, dob_dim_account 需要变成流表。 Zhiwen Sun On Thu, Nov 17, 2022 at 1:56 PM Jason_H wrote: > hi,你好 > 这种方式,需要使用cdc,但是我们的现在方案里领导不考虑使用cdc,只想用flinksql去解决这个问题 > > > | | > Jason_H > | > | > hyb_he...@163.com > | > Replied Message > | From | 任召金 | > | Date | 11/15/2022 09:52 | > | To | user-zh | > | Subject | Re: flinksql join | > hello,你可以试下,将mysql的数据通过CDC变成流数据,然后跟主流inner join,注意状态的TTL > > > --Original-- > From: "Jason_H" Date: Tue, Nov 15, 2022 09:46 AM > To: "flink中文邮件组" > Subject: Re: flinksql join > > > > hi,你好 > 我想基于现有的flinksql的join实现这种情况,当维表更新慢的时候,事实数据会放在状态中等待。 > > > | | > Jason_H > | > | > hyb_he...@163.com > | > Replied Message > | From | RS | Date | 11/15/2022 09:07 | > | To | user-zh@flink.apache.org | Subject | Re:flinksql join | > Hi, > 我的理解是后插入的维表数据,关联不到是正常现象, > 如果要实现=3的话,应该要手动重新跑历史数据,然后更新现有数据, > > > Thanks > > > > > > > 在 2022-11-11 11:10:03,"Jason_H" > > hi,大家好 > > 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下: > kakfa输入: > 账号 金额 笔数 > 100 1 - 未匹配 > 100 1 - 未匹配 > 100 1 - 匹配上 > > 维表 > 账号 企业 > > - 后插入的账号信息 > 实际输出结果 > 企业 金额 笔数 > 100 1 > > > 我想要的结果: > 企业 金额 笔数 > 300 3 > > > > > > sql如下: > String sql2 = "insert into dws_b2b_trade_year_index\n" + > "WITH temp AS (\n" + > "select \n" + > " ta.gmtStatistical as gmtStatistical,\n" + > " ta.paymentMethod as paymentMethod,\n" + > " tb.CORP_ID as outCorpId,\n" + > " tc.CORP_ID as inCorpId,\n" + > " sum(ta.tradeAmt) as tranAmount,\n" + > " sum(ta.tradeCnt) as tranNum \n" + > "from dws_a2a_trade_year_index ta \n" + > "left join dob_dim_account for system_time as of ta.proc as tb on > ta.outAcctCode = tb.ACCT_CODE \n" + > "left join dob_dim_account for system_time as of ta.proc as tc on > ta.inAcctCode = tc.ACCT_CODE \n" + > "group by \n" + > " ta.gmtStatistical, \n" + > " ta.paymentMethod, \n" + > " tb.CORP_ID, \n" + > " tc.CORP_ID \n" + > ") \n" + > "SELECT \n" + > " DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as > gmtUpdate, \n" + > " gmtStatistical, \n" + > " paymentMethod, \n" + > " outCorpId, \n" + > " inCorpId, \n" + > " tranAmount, \n" + > " tranNum \n" + > "FROM temp"; > > | | > Jason_H > | > | > hyb_he...@163.com > |
Re: flinksql join
用普通的 join, 不要用 lookup join Zhiwen Sun On Fri, Nov 11, 2022 at 11:10 AM Jason_H wrote: > > > hi,大家好 > > 我正在使用flink的sql实现一个维表join的逻辑,数据源为kafka(交易数据),维表为mysql(账号),现在遇到一个问题:当kafka有数据进来时,没有在维表中找到账号,这时我手动插入该账号,在下一条数据进来时可以匹配上对应的账号信息,但是,输出的累计结果就会缺失没有匹配上的那条数据,举例如下: > kakfa输入: > 账号 金额 笔数 > 100 1 -> 未匹配 > 100 1 -> 未匹配 > 100 1 -> 匹配上 > > 维表 > 账号 企业 > > -> 后插入的账号信息 > 实际输出结果 > 企业 金额 笔数 > 100 1 > > > 我想要的结果: > 企业 金额 笔数 > 300 3 > > > > > > sql如下: > String sql2 = "insert into dws_b2b_trade_year_index\n" + >"WITH temp AS (\n" + >"select \n" + >" ta.gmtStatistical as gmtStatistical,\n" + >" ta.paymentMethod as paymentMethod,\n" + >" tb.CORP_ID as outCorpId,\n" + >" tc.CORP_ID as inCorpId,\n" + >" sum(ta.tradeAmt) as tranAmount,\n" + >" sum(ta.tradeCnt) as tranNum \n" + >"from dws_a2a_trade_year_index ta \n" + >"left join dob_dim_account for system_time as of ta.proc as > tb on ta.outAcctCode = tb.ACCT_CODE \n" + >"left join dob_dim_account for system_time as of ta.proc as > tc on ta.inAcctCode = tc.ACCT_CODE \n" + >"group by \n" + >" ta.gmtStatistical, \n" + >" ta.paymentMethod, \n" + >" tb.CORP_ID, \n" + >" tc.CORP_ID \n" + >") \n" + >"SELECT \n" + >" DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd HH:mm:ss') as > gmtUpdate, \n" + >" gmtStatistical, \n" + >" paymentMethod, \n" + >" outCorpId, \n" + >" inCorpId, \n" + >" tranAmount, \n" + >" tranNum \n" + >"FROM temp"; > > | | > Jason_H > | > | > hyb_he...@163.com > |
Re: Flink SQL 中同时写入多个 sink 时,是否能够保证先后次序
谢谢,有具体的思路嘛? 比如我需要先写入 jdbc 后再发送消息 是自定义一个 DynamicTableSink , 里面有 JdbcDynamicTableSink KafkaDynamicSink , 还是说继承 JdbcDynamicTableSink , 自定义的类里面再去 new KafkaDynamicSink? 初看起来没办法知道什么时候 db 写入了。要知道什么时候写入,要去自定义 TableInsertOrUpdateStatementExecutor Zhiwen Sun On Tue, Oct 18, 2022 at 5:56 PM 悟空 wrote: > Hi Zhiwen Sun: > > 自定义Sink 这个思路没问题的,我这边目前就是通过这种方式实现的,只需要自定义一个connector 融合多个sink > connector ,相关options 沿用flink 官方connector的 ,这样方便后续升级。 > 如果有具体相关问题,欢迎讨论。 > > > > > --原始邮件-- > 发件人: > "user-zh" > < > pens...@gmail.com; > 发送时间:2022年10月14日(星期五) 中午11:55 > 收件人:"user-zh" > 主题:Re: Flink SQL 中同时写入多个 sink 时,是否能够保证先后次序 > > > > 好的,谢谢大家,之前也想过这个方案,复用/继承 JdbcDynamicTableSink 相关代码自定义 connector 。 > > Zhiwen Sun > > > > On Fri, Oct 14, 2022 at 10:08 AM yidan zhao wrote: > > 在一个自定义sink中实现先写database,再发消息。 > > > 或者2个都是自定义的,但是不能通过sink,因为sink后就没数据了。通过process,第一个process完成写入database后,后续process发送消息。 > > Shuo Cheng > Flink SQL 自身机制无法保证同一个作业多个 sink 的写入次序。 是否可以考虑从业务逻辑上动手脚,比如写入消息队列 > sink 前加个 > udf > filter, udf 查询 database,满足条件才写入消息队列,当然这种方式对性能可能有影响。 > > On Wed, Oct 12, 2022 at 2:41 PM Zhiwen Sun wrote: > >hi all: > > 我们有个场景,需要 Flink SQL 同时写入消息和 database, 后续实时任务消费消息,再次读取 > database, 如果消息先于 >database 写入,这就可能导致读取的数据不正确。 > >是否有办法保证 database 写入后,再发送消息? > >Zhiwen Sun > >
Re: Flink 1.15 Deduplicate之后Interval Join出错
不用 interval join 用普通的流 join 。时间只要不是 proctime 或者 eventtime 就行。 Zhiwen Sun On Sat, Oct 15, 2022 at 9:46 AM 余列冰 wrote: > Hi! > > 我在使用Deduplicate之后进行Interval Join出现问题。我使用的Flink版本是1.15 > > 我希望使用Flink的Interval Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。 > ```sql > CREATE TEMPORARY TABLE `source` ( > id INT, > name STRING, > event_time TIMESTAMP(3), > WATERMARK FOR event_time AS event_time > ) WITH ( > 'connector' = 'datagen' > ); > > > CREATE TEMPORARY TABLE B ( > id INT, > `start` INT, > `end` INT, > event_time TIMESTAMP(3), > WATERMARK FOR event_time AS event_time > ) WITH ( > 'connector' = 'datagen' > ); > > create TEMPORARY view A as > select id, name, event_time from ( > select id, name, event_time, > row_number() over(partition by id, name, event_time order by event_time > asc) as rn > from source > ) > where rn = 1; > > SELECT * > FROM A, B > WHERE > A.id = B.id AND A.id >= B.`start` AND A.id <= B.`end` AND > A.event_time BETWEEN B.event_time - INTERVAL '10' SECOND AND > B.event_time + INTERVAL '10' SECOND; > ``` > > 在去重时我采用了保留第一行数据,这时view A应该只会产生insert的行,但是运行上述SQL会出现如下错误。 > ``` > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin > doesn't support consuming update and delete changes which is produced by > node Deduplicate(keep=[FirstRow], key=[id, name, event_time], > order=[ROWTIME]) > ``` > > 请问如何在使用Deduplicate之后进行Interval Join? > > > > -原始邮件- > > 发件人: LB > > 发送时间: 2022-10-15 09:39:31 (星期六) > > 收件人: user-zh > > 抄送: > > 主题: Flink 1.15 Deduplicate之后Interval Join出错 > > > > 抱歉上一封邮件格式有问题,以此为准。Hi! 我在使用Deduplicate之后进行Interval > Join出现问题。我使用的Flink版本是1.15 我希望使用Flink的Interval > Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。 ```sql CREATE TEMPORARY TABLE `source` > ( id INT, name STRING, event_time TIMESTAMP(3), WATERMARK FOR > event_time AS event_time ) WITH ( 'connector' = 'datagen' ); CREATE > TEMPORARY TABLE B ( id INT, `start` INT, `end` INT, event_time > TIMESTAMP(3), WATERMARK FOR event_time AS event_time ) WITH ( > 'connector' = 'datagen' ); create TEMPORARY view A as select id, name, > event_time from ( select id, name, event_time, row_number() > over(partition by id, name, event_time order by event_time asc) as rn > from source ) where rn = 1; SELECT * FROM A, B WHEREA.id = B.id AND > A.id = B.`start` AND A.id <= B.`end` ANDA.event_time BETWEEN > B.event_time - INTERVAL '10' SECOND ANDB.event_time + INTERVAL '10' > SECOND; ``` 在去重时我采用了保留第一行数据,这时view A应该只会产生insert的行,但是运行上述SQL会出现如下错误。 ``` > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin > doesn't support consuming update and delete changes which is produced by > node Deduplicate(keep=[FirstRow], key=[id, name, event_time], > order=[ROWTIME]) ``` 请问如何在使用Deduplicate之后进行Interval Join? >
Re: Flink SQL 中同时写入多个 sink 时,是否能够保证先后次序
好的,谢谢大家,之前也想过这个方案,复用/继承 JdbcDynamicTableSink 相关代码自定义 connector 。 Zhiwen Sun On Fri, Oct 14, 2022 at 10:08 AM yidan zhao wrote: > 在一个自定义sink中实现先写database,再发消息。 > > 或者2个都是自定义的,但是不能通过sink,因为sink后就没数据了。通过process,第一个process完成写入database后,后续process发送消息。 > > Shuo Cheng 于2022年10月12日周三 16:59写道: > > > > Flink SQL 自身机制无法保证同一个作业多个 sink 的写入次序。 是否可以考虑从业务逻辑上动手脚,比如写入消息队列 sink 前加个 > udf > > filter, udf 查询 database,满足条件才写入消息队列,当然这种方式对性能可能有影响。 > > > > On Wed, Oct 12, 2022 at 2:41 PM Zhiwen Sun wrote: > > > > > hi all: > > > > > > 我们有个场景,需要 Flink SQL 同时写入消息和 database, 后续实时任务消费消息,再次读取 database, 如果消息先于 > > > database 写入,这就可能导致读取的数据不正确。 > > > > > > 是否有办法保证 database 写入后,再发送消息? > > > > > > Zhiwen Sun > > > >
Re: Re: flink实时双流驱动join问题
实际业务的确是这样的。 state 永不过期, 要全量的数据计算,全量的数据放到 state 里面。 目前看来只有等 flink table store 了。 Zhiwen Sun On Fri, Sep 23, 2022 at 8:29 AM casel.chen wrote: > > 我这里只是举了一个例子表示Flink用于OLAP实时关联场景会遇到的一个问题,实际业务中确实会出现两张关联表都需要更新情况,不管哪一边更新数据业务都想获取到最新关联结果,而不是旧的关联状态。引出我想问的另一个问题是如果查询模式固定,Flink实时关联是否能取代OLAP系统例如Doris呢? > 1. 一般双流join为避免state无限膨胀,都会设置ttl,你这边的业务场景ttl需要保留n个月? > 确切地说不应该设置ttl,业务数据有长尾效应,大多数都在当天更新完毕,短的几秒种,长的甚至会在半年后还发生更新 > > > 2. order流和user流在业务场景上要求的state ttl时长是不是不一样? > 同上 > > > 3. order流和user流的数据规模/state size规模大概可以到什么级别? > TB级别 > > > > > > > > > > > > > > > 在 2022-09-20 10:28:49,"Jinzhong Li" 写道: > >hi,casel, 关于你们的业务场景,我有几个问题, 希望可以交流一下。 > >1. 一般双流join为避免state无限膨胀,都会设置ttl,你这边的业务场景ttl需要保留n个月? > >2. order流和user流在业务场景上要求的state ttl时长是不是不一样? > >(从你描述上来看,user流的ttl需要几个月,order流可以比较短些?) > >3. order流和user流的数据规模/state size规模大概可以到什么级别? > > > >casel.chen 于2022年9月17日周六 10:59写道: > > > >> 请教一个flink实现实时双流驱动join问题: > >> > >> > >> order cdc流字段:order_id, order_status, order_time, user_id (order_id是主键) > >> user cdc流字段:user_id, user_name, user_phone, user_address(user_id是主键) > >> 关联结果流字段:order_id, order_status, order_time, user_name, user_phone, > >> user_address(order_id是主键) > >> 期望当order流数据更新或user流数据更新时,关联结果流数据都会得到更新。inner join不满足是因为两条流distinct > >> id都很大,状态会很大,且不能TTL,因为user流更新时间不定,短的几小时,长达上月。 > >> > >> > >> 请问这种场景下要如何使用flink实现实时双流驱动join? >
Re: 关于 UDAF 里面 ListView 的疑问
hi, 感谢你的回复。 报错是在 getValue 的时候。 at GroupAggsHandler$439.getValue(Unknown Source) at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:146) 我的疑问是使用一个 Class 包装下 ListView 就能正常工作,而直接使用 ListView 是会报错。 比如使用 AggregateFunction 就正常,而使用 AggregateFunction> 就会 NPE。 我怀疑使用 ListView 时,无法正常获得 TypeInference。 Zhiwen Sun On Wed, Sep 7, 2022 at 11:46 PM Xuyang wrote: > Hi, > 理论上来说,在你的case中,会先通过createAccumulator方法创建一个ListView作为acc,然后,每一个输入的row都会触发accumulate方法,将数据更新到刚才的acc中,最终通过getValue方法拿到当前acc的值。 > > > > > 实际测试中的NPE发生在更新acc的时候还是getValue的时候呢?可以通过在这三个阶段设一下断点,分别看一下当前持有的acc是不是同一个对象 > > > > > -- > > Best! > Xuyang > > > > > > 在 2022-09-07 16:23:25,"Zhiwen Sun" 写道: > > Hi, > 理论上来说,在你的case中,会先通过createAccumulator方法创建一个ListView作为acc,然后,每一个输入的row都会触发accumulate方法,将数据更新到刚才的acc中,最终通过getValue方法拿到当前acc的值。实际测试中的NPE发生在更新acc的时候还是getValue的时候呢?可以通过在这三个阶段设一下断点,分别看一下当前持有的acc是不是同一个对象
关于 UDAF 里面 ListView 的疑问
Hello all, 我看 ListView 使用的时候,有以下示例 public class MyAccumulator { public ListView list = new ListView<>(); // or explicit: // {@literal @}DataTypeHint("ARRAY") // public ListView list = new ListView<>(); public long count = 0L; } public class MyAggregateFunction extends AggregateFunction 我想请教下大家,为什么需要在外层包裹一个 MyAccumulator 呢, 我实际测下来, 直接时用 AggregateFunction> 在 getValue 的时候会报空指针异常 Flink 版本: 1.13.1 谢谢。 Zhiwen Sun
Re: Re: 关于Flink state初始化的问题
你应该没有正确理解 state 的使用 我们一般在程序里面是用的是 KeyedState , 也就是和 key 伴随的。 基于上面,所以 open() 里面只能对 state 进行初始化, 但是没有办法设置 state 的 value,因为这时候没有 key ; 另外一方面,也不会在 map() 的时候去 new state (可以认为 state 是一个大的 Map,你 map 的时候只是操作其中的一个 key)。 回到你的需求,你应该在 open() 的时候保存相关信息到类变量里面,当 map() 的时候再去 update state。 Zhiwen Sun On Fri, Aug 26, 2022 at 1:55 PM 曲洋 wrote: > > 对的,是后者,statAccumulator.value()是null,就是map方法中取值就成null了,但是open中命名初始化了,这个是因为map太快了吗,state没初始化完就开始拿了吗. > 嗯嗯,我现在改成先进行判断 > > > > > > > > > > > > > > > > > > 在 2022-08-26 11:22:43,"Hangxiang Yu" 写道: > >open确实是初始化的时候就会调用的; > > >第一次调用是null是说statAccumulator是null还是statAccumulator.value()是null,后者的话是正常可能会出现的; > >这里的写法看起来有点问题,一般用value方法取出来可以先判断下,然后对value state的更新用update方法; > > > >On Fri, Aug 26, 2022 at 10:25 AM 曲洋 wrote: > > > >> 各位好, > >> > >> > 我想请教一个问题,我的Flink应用中会在state里边存储一些指标,比如一年的总数,然后当任务各种原因断掉的时候,我希望可以通过入参的方式直接调节这个state,但是遇到了一个问题,如下: > >> 我重写了RichMapFunction,yearTotal > >> > 这个指标是通过命令行传进来的,然后我希望初始化到state里边,但是我发现,open方法第一次调用的时候state都是null,然后这个参数就进不来 > >> 所以我想问下这个场景怎么办,还有open方法的生命周期是怎么样的,我本以为是map第一次打开的时候就会调用,结果好像不是 > >> public static class AccumulateAmounts extends RichMapFunction >> BlueAccumulaterInitState> { > >> private transient ValueState > >> statAccumulator; > >> > >> > >> @Override > >> public BlueAccumulaterInitState map(v2bean currentAccumulator) > >> throws Exception { > >> > >> > >> BlueAccumulaterInitState stat = (statAccumulator.value() != > >> null) ? statAccumulator.value() : new BlueAccumulaterInitState(); > >> Long yearIncrement = year.equals(stat.getYear()) ? > >> stat.getYearMetric() + 1L : 1L; > >> stat.setYearMetric(yearIncrement); > >> > >> > >> statAccumulator.update(stat); > >> return stat; > >> } > >> > >> > >> @Override > >> public void open(Configuration config) { > >> ValueStateDescriptor descriptor = > >> new ValueStateDescriptor<>( > >> "total", > >> TypeInformation.of(new > >> TypeHint() { > >> })); > >> statAccumulator = getRuntimeContext().getState(descriptor); > >> ExecutionConfig.GlobalJobParameters globalParams = > >> getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); > >> Configuration globConf = (Configuration) globalParams; > >> long yearTotal = > >> globConf.getLong(ConfigOptions.key("year").longType().noDefaultValue()); > >> statAccumulator.value().setYearMetric(yearTotal); > >> > >> > >> > >> } > >> } > > > > > > > >-- > >Best, > >Hangxiang. >
Re: Re:Re: Re: Flink 使用interval join数据丢失疑问
我猜测是 watermark 的问题, 看楼主的设置, watermark 是 -2s ,也就是说, order header 流,有数据晚了 2s ,就会被丢弃。 楼主之前看的也是 订单明细比订单主表晚几秒, 这只是同一个订单的数据生成时间差异。 如果是这样的话,使用一般的 inner join + ttl 就可以满足需求了。 BTW: watermark 我觉得很难使用好,实际使用场景非常有限。 Zhiwen Sun On Wed, Jun 15, 2022 at 11:43 AM Shengkai Fang wrote: > > 我在sql inner join里设置的状态ttl为20s,为什么20s的状态保留数据要比interval join开分钟级别的数据还要准确 > > 不合理的 watermark 设置在 interval join 就会导致丢数据。设置 ttl 情况下,如果某个 key > 的数据频繁访问情况下,那么这个数据就不会过期。 > > > 我的做法是排查了tm和jm的日志,没有看见写的表的配置的日志。 > > 我记得日志是会打印相关的日志。能提一些相关的日志吗? > > best, > Shengkai > > lxk 于2022年6月14日周二 20:04写道: > > > Hi, > > 我目前使用sql interval join,窗口的上下界增加到分钟级别,分别是-2 minute 和 +4 minute > > 目前来看数据量和使用inner join要差不多了。以下是代码 > > Table headerTable = > > streamTableEnvironment.fromDataStream(headerFilterStream, > > Schema.newBuilder() > > .columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS > > TIMESTAMP_LTZ(3))") > > .watermark("rowtime", "rowtime + INTERVAL '2' SECOND") > > .build()); > > Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream, > > Schema.newBuilder() > > .columnByExpression("rowtime", "CAST(substring(last_updated_at,0,19) AS > > TIMESTAMP_LTZ(3))") > > .watermark("rowtime", "rowtime + INTERVAL '2' SECOND") > > .build()); > > > > > > streamTableEnvironment.createTemporaryView("header",headerTable); > > streamTableEnvironment.createTemporaryView("item",itemTable); > > Table result = streamTableEnvironment.sqlQuery("select > header.customer_id" > > + > > ",item.goods_id" + > > ",header.id" + > > ",header.order_status" + > > ",header.shop_id" + > > ",header.parent_order_id" + > > ",header.order_at" + > > ",header.pay_at" + > > ",header.channel_id" + > > ",header.root_order_id" + > > ",item.id" + > > ",item.row_num" + > > ",item.p_sp_sub_amt" + > > ",item.display_qty" + > > ",item.qty" + > > ",item.bom_type" + > > " from item JOIN header on header.id = item.order_id and item.rowtime > > BETWEEN header.rowtime - INTERVAL '2' MINUTE AND header.rowtime + > INTERVAL > > '4' MINUTE"); > > > > 对此,我又有新的疑问了,我在sql inner join里设置的状态ttl为20s,为什么20s的状态保留数据要比interval > > join开分钟级别的数据还要准确?针对这个问题,不知道大家有什么看法和思路? > > 我的一个猜测是我设置的表的ttl没有生效,inner join一直使用的是全量的数据,所以结果准确度要比interval > > join高,我的做法是排查了tm和jm的日志,没有看见写的表的配置的日志,我的配置如下: > > Configuration conf = new Configuration(); > > conf.setString("table.exec.mini-batch.enabled","true"); > > conf.setString("table.exec.mini-batch.allow-latency","15 s"); > > conf.setString("table.exec.mini-batch.size","100"); > > conf.setString("table.exec.state.ttl","20 s"); > > env.configure(conf); > > StreamTableEnvironment streamTableEnvironment = > > StreamTableEnvironment.create(env, > > EnvironmentSettings.fromConfiguration(conf)); > > > > > > 我想了解下,从tm和jm日志是否能正确反应我的配置生效?如果不行,那我要使用什么方法才能知道我的这个配置是否生效? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 在 2022-06-13 21:12:48,"Xuyang" 写道: > > >Hi, > > > 1、理论上来说inner join关联的数据量应该比interval > > join更大吧。关于左右两边流速度不一致的情况,理论上应该问题不大,因为需要等到两边的watermark都到齐之后才会触发状态里过期数据的清除。 > > > 2、inner > > > join没有水印的情况下,就是到了就发,完全根据这条数据进入这个算子的时间来算,也就是“处理时间”。默认数据是不会过期的,会存全量数据。如果定义了ttl,得看join两侧的表的pk和join > > > key,分别用不同的state数据格式来存数据(ListState、MapState等),原则上是多长时间没更新数据之后,就清空过期数据,是按照的“处理时间”来处理的。 > > > > > > > > >如果我有不对的地方,请指正我哈。 > > > > > > > > > > > > > > >-- > > > > > >Best! > > >Xuyang > > > > > > > > > > > > > > > > > >在 2022-06-12 14:39:39,"lxk7...@163.com" 写道: > > >>非常感谢回复 > > >>1.针对watermark,我会再去进行测试。同时还会测试使用处理时间,interval join会不会丢失数据 > > >>2.针对interval jon,我个人的理解是它能关联到的数据范围要比inner > > > join大,所以数据应该更准确,但是从结果上看却是数据丢失,当时非常震惊,有点颠覆我的认知了。同时我自己还有一个新的猜测,就是两个流的数据量不一样,可能也会造成数据丢失。目前左流是订单粒度数据,右流是订单-商品粒度数据,数据量要大很多。我个人理解,在处理右流的时候,应该会慢一点,所以导致两边的时间进展可能不一致。但是这又引发了一个新的疑问?inner > > join应该也会受这样的影响 > > >>3.还有一个问题可能是我没有阐述清楚,我在sql里使用inner > > join,没有注册水印,那么两个流的join应该是以处理时间来定义的?那么表的state的过期是否也是以处理
Re: flink-connector-jdbc是否支持多个values问题
支持同时写入多个 values ,这个是 jdbcurl 控制,设置 *rewriteBatchedStatements=true* 生成的 SQL 类似: INSERT INTO `order_summary`(`order_id`, `proctime`, `order_status`, > `order_name`, `total`) > VALUES > (3, '2022-06-14 22:31:24.699', 'OK', 'order-name-1', 20) , > (2, '2022-06-14 22:31:21.496', 'OK', 'order-name-1', 131) > ON DUPLICATE KEY UPDATE `order_id`=VALUES(`order_id`), > `proctime`=VALUES(`proctime`), `order_status`=VALUES(`order_status`), > `order_name`=VALUES(`order_name`), `total`=VALUES(`total`) Zhiwen Sun On Mon, Mar 7, 2022 at 5:07 PM 黑色 wrote: > 你看一下底层的源码实现全知道了,它insert into x() values() ON duplicate > Key实现Insert update,所以不会的 > > > > > --原始邮件-- > 发件人: "payne_z" 发送时间: 2022年3月7日(星期一) 下午3:49 > 收件人: "user-zh" 主题: flink-connector-jdbc是否支持多个values问题 > > > > 请问flink-connector-jdbc是否支持同时写入多个values的用法?
Re: flink sql回撤流sink优化问题
不用那么复杂,正常的 insert select group by 即可, 一分钟写一次 mysql 就行。 参考 JDBC sink [1] 中的 sink.buffer-flush.interval 和 sink.buffer-flush.max-rows 参数 [1] : https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/ Zhiwen Sun On Thu, Dec 23, 2021 at 8:15 AM casel.chen wrote: > flink sql中aggregate without > window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql > 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? > 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗? > > > 例如有下面binlog cdc购买数据(订单购买金额会更新): > > orderid. categorydt amt > > 订单id 商品类型 购买时间(MMddHH) 购买金额 > > > > > 按商品类型统计每小时成交总额(每分钟写入下游mysql) 可以写成下面的flink sql实现吗?配合state ttl设置成1小时 > > > > INSERT INTO mysql_sink_table > > SELECT category, dt, LAST_VALUE(total) > > OVER ( > > PARTITION BY category > > ORDER BY PROCTIME() > > RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW > > ) AS var1 > > FROM ( > > SELECT category, dt, SUM(amt) AS total FROM t1 GROUP BY category, dt > > );
Re: Re: Flink任务每运行20天均会发生内部异常
看看 task manager 的 jvm 内存, jstack 情况 ? Zhiwen Sun On Tue, Oct 26, 2021 at 7:22 PM mayifan wrote: > 非常感谢大佬的答复: > > 目前从任务来看的话总共存在三个任务,其中两个异常任务分别使用了1到2个MapState,过期时间均为1天或3天。 > > 正常运行的任务使用了MapState及ListState各4个,过期时间为60min-120min。 > > 异常任务在产生异常后从checkpoint重启又会恢复正常。 > > > > -- 原始邮件 -- > > 发 件 人:"Caizhi Weng" > > 发送时间:2021-10-26 18:45:44 > > 收 件 人:"flink中文邮件组" > > 抄 送: > > 主 题:Re: Flink任务每运行20天均会发生内部异常 > > > > Hi! > > > > 听起来和 state 过期时间非常有关。你配置了哪些和 state 过期相关的参数?是否有 20 天过期的 state? > > > > mayifan 于2021年10月26日周二 下午4:43写道: > > > > > Hi! > > > > > > 麻烦请教大家一个问题。 > > > > > > > > > > 有三个Flink任务以yarn-per-job模式运行在Flink-1.11.2版本的集群上,均使用RocksDB作为状态后端,数据以增量的方式写入RocksDB,且均配置了状态过期时间。 > > > > > > > > > > 任务逻辑大致都是通过状态与历史数据进行自关联或双流join,每输入一条数据都会产出等量、1/2或多倍的数据到下游,当数据无法通过状态关联,任务则无法向下游产出数据。 > > > > > > > > > > 奇怪的是三个任务中有两个任务存在异常,异常现象是每次当任务启动运行至第20个工作日,都会非常准时的产生下游数据输出骤降的现象,输出与输入的数据量级差数十倍,并且此时任务中没有任何异常日志。 > > > > > > > > > > > > > > > 问题:目前怀疑是集群配置或RocksDB状态的问题,但是没有任何思路或排查线索,请问这种现象是怎样产生的?应该怎样排查? > > > > >
Re: 如何将canal json格式数据按操作类型过滤
先通过 json 或者 raw format 消费原始 canal kafka , 过滤掉 delete 的数据写入到一个新的 kafka ,然后你再基于新的 kafka 建一个 canal-json 的表来落地。 Zhiwen Sun On Wed, Jul 7, 2021 at 10:51 PM JasonLee <17610775...@163.com> wrote: > hi > > > 最后一个字段 type 就是操作的类型, 过滤掉 DELETE 就行了. > > > Best > JasonLee > 在2021年7月7日 22:43,casel.chen 写道: > 使用场景:我们使用canal将mysql binlog输出到kafka,然后想通过flink消费kafka数据过滤掉 delete > 操作的数据插入到文件系统,因为要做历史数据存档用。 > 查了下官网 > https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/canal/#available-metadata > {"data":[{"id":"111","name":"scooter","description":"Big 2-wheel > scooter","weight":"5.18"}],"database":"inventory","es":158937356,"id":9,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"weight":"5.15"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products","ts":1589373560798,"type":"UPDATE"} > > CREATETABLEKafkaTable(origin_databaseSTRINGMETADATAFROM'value.database'VIRTUAL,origin_tableSTRINGMETADATAFROM'value.table'VIRTUAL,origin_sql_typeMAPMETADATAFROM'value.sql-type'VIRTUAL,origin_pk_namesARRAYMETADATAFROM'value.pk-names'VIRTUAL,origin_tsTIMESTAMP(3)METADATAFROM'value.ingestion-timestamp'VIRTUAL,user_idBIGINT,item_idBIGINT,behaviorSTRING)WITH('connector'='kafka','topic'='user_behavior','properties.bootstrap.servers'='localhost:9092',' > properties.group.id > '='testGroup','scan.startup.mode'='earliest-offset','value.format'='canal-json'); > 只能获取到原始 database, table, sql-type, pk-names, ingestion-timestamp > 字段,而拿不到代表操作类型的 type 字段。请问有什么别的办法么? > > > > >
Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath
parquet 相关依赖增加了吗? Zhiwen Sun On Sun, Jun 27, 2021 at 3:57 PM Wei JI10 季伟 wrote: > Hi: >在使用flink sql connector的filesytem时,指定format为parquet。抛出异常信息 > Caused by: org.apache.flink.table.api.ValidationException: Could not find > any format factory for identifier 'parquet' in the classpath. >at > org.apache.flink.table.filesystem.FileSystemTableSource.(FileSystemTableSource.java:97) >at > org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:72) >at > org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:119) >... 41 more > > Sql语句如下: > CREATE TABLE user_info ( > `user_id` bigint, > `user_name` string > ) PARTITIONED BY (user_id) WITH ( > 'connector' = 'filesystem', > 'path' = '', > 'format' = 'parquet' > ); > > CREATE TABLE sink_table ( > `user_id` bigint, > `user_name` string > ) PARTITIONED BY (datetime) WITH ( > 'connector'='filesystem', > 'path'='', > 'format'='parquet', > 'sink.partition-commit.delay'='1h', > 'sink.partition-commit.policy.kind'='success-file' > ); > > insert OVERWRITE sink_table select *, '2021062600' as datetime from > user_info; >
Re: Flink 提交到yarn失败
HADOOP_CLASSPATH 设置了吗? Zhiwen Sun On Fri, Jun 18, 2021 at 9:47 AM yangpengyi <963087...@qq.com.invalid> wrote: > 环境: FLINK 1.12 & CDH6.1.1 > 问题: > > 利用yarn-per-job提交时,在初始化hdfs客户端时出错。看起来应该是hadoop版本的兼容问题,不过从堆栈看应该使用到了正确的客户端jar包。 > Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: > Cannot support file system for 'hdfs' via Hadoop, because Hadoop is not in > the classpath, or some classes are missing from the classpath. > at > > org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:184) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > > org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:487) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:389) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:292) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > > org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:100) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > > org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:89) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > > org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:117) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:309) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:272) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:212) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:173) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at java.security.AccessController.doPrivileged(Native Method) > ~[?:1.8.0_181] > at javax.security.auth.Subject.doAs(Subject.java:422) > ~[?:1.8.0_181] > at > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746) > ~[cloud-flinkAppCrashAnalysis-1.0.0-encodetest-RELEASE.jar:?] > at > > org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > at > > org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:172) > ~[flink-dist_2.11-1.12.0.jar:1.12.0] > ... 2 more > Caused by: java.lang.VerifyError: Bad return type > Exception Details: > Location: > > > org/apache/hadoop/hdfs/DFSClient.getQuotaUsage(Ljava/lang/String;)Lorg/apache/hadoop/fs/QuotaUsage; > @157: areturn > Reason: > Type 'org/apache/hadoop/fs/ContentSummary' (current frame, stack[0]) is > not assignable to 'org/apache/hadoop/fs/QuotaUsage' (from method signature) > Current Frame: > bci: @157 > flags: { } > locals: { 'org/apache/hadoop/hdfs/DFSClient', 'java/lang/String', > 'org/apache/hadoop/ipc/RemoteException', 'java/io/IOException' } > stack: { 'org/apache/hadoop/fs/ContentSummary' } > Bytecode: > 0x000: 2ab6 00b5 2a13 01f4 2bb6 00b7 4d01 4e2a > 0x010: b400 422b b901 f502 003a 042c c600 1d2d > 0x020: c600 152c b600 b9a7 0012 3a05 2d19 05b6 > 0x030: 00bb a700 072c b600 b919 04b0 3a04 1904 > 0x040: 4e19 04bf 3a06 2cc6 001d 2dc6 0015 2cb6 > 0x050: 00b9 a700 123a 072d 1907 b600 bba7 0007 > 0x060: 2cb6 00b9 1906 bf4d 2c07 bd00 d459 0312 > 0x070: d653 5904 12e0 5359 0512 e153 5906 1301 > 0x080: f653 b600 d74e 2dc1 01f6 9900 14b2 0023 > 0x090: 1301 f7b9 002b 0200 2a2b b601 f8b0 2dbf > 0x0a0: > Exception Handler Table: > bci [35, 39] => handler: 42 > bci [15, 27] => handler: 60 > bci [15, 27] => handler: 68 > bci [78, 82] => handler: 85 > bci [60, 70] => handler: 68 > bci [4, 57] => handler: 103 > bci [60, 103] => handler: 103 > Stackmap Table: > > > full_frame(@42,{Object[#751],Object[#774],Object[#829],Object[#799],Object[#1221]},{Object[#799]}) > same_frame(@53) > same_frame(@57) > > > full_frame(@60,{Object[#751],Object[#774],Object[#829],Object[#799]},{Object[#799]}) > same_locals_1_stack_item_frame(@68,Object[#799]) > > > full_frame(@85,{Object[#751],Object[#774],Obje
Re: 邮件退订
退订是发邮件到 user-zh-unsubscr...@flink.apache.org Zhiwen Sun On Tue, Jun 8, 2021 at 10:21 PM happiless wrote: > 您好,麻烦邮件退订一下 > > > 发自我的iPhone
Re: flink1.11.2 yarn-session 部分类路径未加载
不需要 mapreduce 相关库吧。 我看我的 job 里加载到 classpath 的也没有 mapreduce。 Zhiwen Sun On Wed, Jun 2, 2021 at 11:56 AM datayangl wrote: > flink1.11.2 启动yarn-session之后发现,有部分类路径始终没有加载到class_path中去 > 环境变量配置如下: > < > http://apache-flink.147419.n8.nabble.com/file/t919/66604010-2A08-4A68-8478-70A27D61224B.png> > > > 其中tm的日志如下: > tm.log <http://apache-flink.147419.n8.nabble.com/file/t919/tm.log> > > 其中hadoop-mapreduce-client相关的类路径一直没有加载到class_path中,求指教 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: flink 1.12.2 编译报错
看了下依赖树,似乎是因为 org.powermock 引入了 hamcrest-core 导致 junit 无法引入,但 junit 在前面,按道理应该它应该将 hamcrest-core 引入到 compile scope 。 [INFO] [INFO] Building Flink : Test utils : Junit 1.12.2 [INFO] [INFO] [INFO] --- maven-dependency-plugin:3.1.1:tree (default-cli) @ flink-test-utils-junit --- [INFO] org.apache.flink:flink-test-utils-junit:jar:1.12.2 [INFO] +- junit:junit:jar:4.12:compile [INFO] +- org.apache.logging.log4j:log4j-slf4j-impl:jar:2.12.1:compile [INFO] +- org.apache.logging.log4j:log4j-api:jar:2.12.1:compile [INFO] +- org.apache.logging.log4j:log4j-core:jar:2.12.1:compile [INFO] +- org.apache.flink:force-shading:jar:1.12.2:compile [INFO] +- org.slf4j:slf4j-api:jar:1.7.15:compile [INFO] +- com.google.code.findbugs:jsr305:jar:1.3.9:compile [INFO] +- org.mockito:mockito-core:jar:2.21.0:test [INFO] | +- net.bytebuddy:byte-buddy:jar:1.8.15:test [INFO] | +- net.bytebuddy:byte-buddy-agent:jar:1.8.15:test [INFO] | \- org.objenesis:objenesis:jar:2.1:test [INFO] +- org.powermock:powermock-module-junit4:jar:2.0.4:test [INFO] | +- org.powermock:powermock-module-junit4-common:jar:2.0.4:test [INFO] | | +- org.powermock:powermock-reflect:jar:2.0.4:test [INFO] | | \- org.powermock:powermock-core:jar:2.0.4:test [INFO] | | \- org.javassist:javassist:jar:3.24.0-GA:test [INFO] | \- org.hamcrest:hamcrest-core:jar:1.3:test [INFO] +- org.powermock:powermock-api-mockito2:jar:2.0.4:test [INFO] | \- org.powermock:powermock-api-support:jar:2.0.4:test [INFO] +- org.hamcrest:hamcrest-all:jar:1.3:test [INFO] \- org.apache.logging.log4j:log4j-1.2-api:jar:2.12.1:test [INFO] [INFO] BUILD SUCCESS [INFO] Zhiwen Sun On Fri, May 28, 2021 at 11:12 AM Zhiwen Sun wrote: > 谢谢,看了下,junit 的确依赖 org.hamcrest ,而且相关版本都没问题。 > > 那这个报错的原因是什么呢? 什么地方导致 hamcrest 被 exclude 了?然后手动增加了dependency 就好了? 代码拉下来没修改过。 > > Zhiwen Sun > > > > On Fri, May 28, 2021 at 10:58 AM Shuo Cheng wrote: > >> Hi, org.hamcrest 是 junit 的依赖 >> >> On Fri, May 28, 2021 at 10:28 AM Zhiwen Sun wrote: >> >> > 才编译到 Test utils : Junit 模块,就报错了 >> > >> > maven 版本: 3.2.5 >> > jdk 版本:1.8.0_251 >> > flink 版本: flink 1.12.2 >> > 执行的命令:mvn clean install -DskipTests -Dfast >> > >> > 错误信息: >> > >> > [ERROR] COMPILATION ERROR : >> > [INFO] - >> > [ERROR] >> > >> > >> /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,27] >> > package org.hamcrest does not exist >> > [ERROR] >> > >> > >> /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,1] >> > static import only from classes and interfaces >> > [ERROR] >> > >> > >> /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[39,27] >> > package org.hamcrest does not exist >> > >> > [INFO] Reactor Summary: >> > [INFO] >> > [INFO] Flink : Tools : Force Shading .. SUCCESS [ >> > 1.042 s] >> > [INFO] Flink : SUCCESS [ >> > 1.404 s] >> > [INFO] Flink : Annotations SUCCESS [ >> > 0.735 s] >> > [INFO] Flink : Test utils : ... SUCCESS [ >> > 0.042 s] >> > [INFO] Flink : Test utils : Junit . FAILURE [ >> > 0.283 s] >> > >> > >> > 看起来是缺少 org.hamcrest 相关依赖 >> > 我看 flink-test-utils-parent/pom.xml 和 flink-test-utils-junit/pom.xml >> > 的确没加 org.hamcrest 相关依赖, 不知道这个是怎么工作的。 >> > >> > 请问大家下,原因是什么呢? >> > >> > >> > Zhiwen Sun >> > >> >
Re: flink 1.12.2 编译报错
谢谢,看了下,junit 的确依赖 org.hamcrest ,而且相关版本都没问题。 那这个报错的原因是什么呢? 什么地方导致 hamcrest 被 exclude 了?然后手动增加了dependency 就好了? 代码拉下来没修改过。 Zhiwen Sun On Fri, May 28, 2021 at 10:58 AM Shuo Cheng wrote: > Hi, org.hamcrest 是 junit 的依赖 > > On Fri, May 28, 2021 at 10:28 AM Zhiwen Sun wrote: > > > 才编译到 Test utils : Junit 模块,就报错了 > > > > maven 版本: 3.2.5 > > jdk 版本:1.8.0_251 > > flink 版本: flink 1.12.2 > > 执行的命令:mvn clean install -DskipTests -Dfast > > > > 错误信息: > > > > [ERROR] COMPILATION ERROR : > > [INFO] - > > [ERROR] > > > > > /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,27] > > package org.hamcrest does not exist > > [ERROR] > > > > > /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,1] > > static import only from classes and interfaces > > [ERROR] > > > > > /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[39,27] > > package org.hamcrest does not exist > > > > [INFO] Reactor Summary: > > [INFO] > > [INFO] Flink : Tools : Force Shading .. SUCCESS [ > > 1.042 s] > > [INFO] Flink : SUCCESS [ > > 1.404 s] > > [INFO] Flink : Annotations SUCCESS [ > > 0.735 s] > > [INFO] Flink : Test utils : ... SUCCESS [ > > 0.042 s] > > [INFO] Flink : Test utils : Junit ......... FAILURE [ > > 0.283 s] > > > > > > 看起来是缺少 org.hamcrest 相关依赖 > > 我看 flink-test-utils-parent/pom.xml 和 flink-test-utils-junit/pom.xml > > 的确没加 org.hamcrest 相关依赖, 不知道这个是怎么工作的。 > > > > 请问大家下,原因是什么呢? > > > > > > Zhiwen Sun > > >
Re: flink 1.12.2 编译报错
在我手动加上依赖后,这个模块,能编译通过了,但 runtime 又失败了。 INFO] --- scala-maven-plugin:3.2.2:compile (scala-compile-first) @ flink-runtime_2.11 --- [INFO] /data/flink-release-1.12.2/flink-runtime/src/main/java:-1: info: compiling [INFO] /data/flink-release-1.12.2/flink-runtime/src/main/scala:-1: info: compiling [INFO] Compiling 1958 source files to /data/flink-release-1.12.2/flink-runtime/target/classes at 1622169188312 [ERROR] java.lang.NoClassDefFoundError: scala/reflect/internal/Trees [INFO] at java.lang.ClassLoader.defineClass1(Native Method) [INFO] at java.lang.ClassLoader.defineClass(ClassLoader.java:756) [INFO] at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) [INFO] at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) [INFO] at java.net.URLClassLoader.access$100(URLClassLoader.java:74) [INFO] at java.net.URLClassLoader$1.run(URLClassLoader.java:369) [INFO] at java.net.URLClassLoader$1.run(URLClassLoader.java:363) [INFO] at java.security.AccessController.doPrivileged(Native Method) [INFO] at java.net.URLClassLoader.findClass(URLClassLoader.java:362) [INFO] at java.lang.ClassLoader.loadClass(ClassLoader.java:418) [INFO] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) [INFO] at java.lang.ClassLoader.loadClass(ClassLoader.java:351) [INFO] at java.lang.ClassLoader.defineClass1(Native Method) [INFO] at java.lang.ClassLoader.defineClass(ClassLoader.java:756) [INFO] at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) [INFO] at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) [INFO] at java.net.URLClassLoader.access$100(URLClassLoader.java:74) [INFO] at java.net.URLClassLoader$1.run(URLClassLoader.java:369) [INFO] at java.net.URLClassLoader$1.run(URLClassLoader.java:363) [INFO] at java.security.AccessController.doPrivileged(Native Method) [INFO] at java.net.URLClassLoader.findClass(URLClassLoader.java:362) [INFO] at java.lang.ClassLoader.loadClass(ClassLoader.java:418) [INFO] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) [INFO] at java.lang.ClassLoader.loadClass(ClassLoader.java:351) [INFO] at java.lang.Class.getDeclaredMethods0(Native Method) [INFO] at java.lang.Class.privateGetDeclaredMethods(Class.java:2701) [INFO] at java.lang.Class.privateGetMethodRecursive(Class.java:3048) [INFO] at java.lang.Class.getMethod0(Class.java:3018) [INFO] at java.lang.Class.getMethod(Class.java:1784) [INFO] at scala_maven_executions.MainHelper.runMain(MainHelper.java:155) [INFO] at scala_maven_executions.MainWithArgsInFile.main(MainWithArgsInFile.java:26) [INFO] Caused by: java.lang.ClassNotFoundException: scala.reflect.internal.Trees [INFO] at java.net.URLClassLoader.findClass(URLClassLoader.java:382) [INFO] at java.lang.ClassLoader.loadClass(ClassLoader.java:418) [INFO] at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) [INFO] at java.lang.ClassLoader.loadClass(ClassLoader.java:351) [INFO] ... 31 more [INFO] [INFO] Reactor Summary: [INFO] [INFO] Flink : Tools : Force Shading .. SUCCESS [ 1.093 s] [INFO] Flink : SUCCESS [ 1.168 s] [INFO] Flink : Annotations SUCCESS [ 1.218 s] [INFO] Flink : Test utils : ... SUCCESS [ 0.047 s] [INFO] Flink : Test utils : Junit . SUCCESS [ 0.795 s] [INFO] Flink : Metrics : .. SUCCESS [ 0.037 s] [INFO] Flink : Metrics : Core . SUCCESS [ 0.501 s] [INFO] Flink : Core ... SUCCESS [ 17.510 s] [INFO] Flink : Java ... SUCCESS [ 2.467 s] [INFO] Flink : Queryable state : .. SUCCESS [ 0.029 s] [INFO] Flink : Queryable state : Client Java .. SUCCESS [ 3.106 s] [INFO] Flink : FileSystems : .. SUCCESS [ 0.031 s] [INFO] Flink : FileSystems : Hadoop FS SUCCESS [ 3.457 s] [INFO] Flink : Runtime FAILURE [ 18.086 s] 然后我修改 scala 的版本为 2.12 相关命令: mvn clean install -DskipTests -Dfast -Dscala-2.12 目前能够正常编译了,我看 release 版本支持 scala-2.11 的。是我的环境有问题吗? Zhiwen Sun On Fri, May 28, 2021 at 10:28 AM Zhiwen Sun wrote: > 才编译到 Test utils : Junit 模块,就报错了 > > maven 版本: 3.2.5 > jdk 版本:1.8.0_251 > flink 版本: flink 1.12.2 > 执行的命令:mvn clean install -DskipTests -Dfast > > 错误信息: > > [ERROR] COMPILATION ERROR : > [INFO] - > [ERROR] > /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,27] > package org.hamcrest does not exist > [ERROR] > /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testu
flink 1.12.2 编译报错
编译到 Test utils : Junit 模块,就报错了 maven 版本: 3.2.5 jdk 版本:1.8.0_251 flink 版本: flink 1.12.2 执行的命令:mvn clean install -DskipTests -Dfast 错误信息: [ERROR] COMPILATION ERROR : [INFO] - [ERROR] /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,27] package org.hamcrest does not exist [ERROR] /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,1] static import only from classes and interfaces [ERROR] /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[39,27] package org.hamcrest does not exist [INFO] Reactor Summary: [INFO] [INFO] Flink : Tools : Force Shading .. SUCCESS [ 1.042 s] [INFO] Flink : SUCCESS [ 1.404 s] [INFO] Flink : Annotations SUCCESS [ 0.735 s] [INFO] Flink : Test utils : ... SUCCESS [ 0.042 s] [INFO] Flink : Test utils : Junit . FAILURE [ 0.283 s] 而且我看 flink-test-utils-parent/pom.xml 和 flink-test-utils-junit/pom.xml 的确没加 org.hamcrest 相关依赖啊。 请问大家下,原因是什么呢? Zhiwen Sun
flink 1.12.2 编译报错
才编译到 Test utils : Junit 模块,就报错了 maven 版本: 3.2.5 jdk 版本:1.8.0_251 flink 版本: flink 1.12.2 执行的命令:mvn clean install -DskipTests -Dfast 错误信息: [ERROR] COMPILATION ERROR : [INFO] - [ERROR] /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,27] package org.hamcrest does not exist [ERROR] /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[38,1] static import only from classes and interfaces [ERROR] /data/flink-release-1.12.2/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java:[39,27] package org.hamcrest does not exist [INFO] Reactor Summary: [INFO] [INFO] Flink : Tools : Force Shading .. SUCCESS [ 1.042 s] [INFO] Flink : SUCCESS [ 1.404 s] [INFO] Flink : Annotations SUCCESS [ 0.735 s] [INFO] Flink : Test utils : ... SUCCESS [ 0.042 s] [INFO] Flink : Test utils : Junit . FAILURE [ 0.283 s] 看起来是缺少 org.hamcrest 相关依赖 我看 flink-test-utils-parent/pom.xml 和 flink-test-utils-junit/pom.xml 的确没加 org.hamcrest 相关依赖, 不知道这个是怎么工作的。 请问大家下,原因是什么呢? Zhiwen Sun