Re: 如何使用flink sql优雅的处理大量嵌套if-else逻辑

2022-11-28 文章 macia kk
我会选择 UDF + 配置文件,把配置文件放 HDFS上,UDF读这个配置文件。每次更新HDFS的配置文件,重启下任务 casel.chen 于2022年11月24日周四 12:01写道: > 我有一个flink > sql作业需要根据不同字段值满足不同条件来设置另一个字段值,还有一些嵌套if-else逻辑,这块逻辑不是固定的,业务方会过一段时间调整一次。 > 想请问如何使用flink sql优雅的处理嵌套if-else逻辑呢?我有想到使用drools规则引擎,通过udf来调用,不知道还有没有更好的办法? > >

Re: Tumble Window 会带来反压问题吗?

2022-10-20 文章 macia kk
0分钟,但是key比较分散,感觉这种情况可以增加资源加大一下并发试试,减少每个task发出的数据量 > > On Thu, Oct 20, 2022 at 9:49 AM yidan zhao wrote: > > > 这个描述前后矛盾,写出速度跟不上导致反压,那控制写出速度不是问题更大。不过你不需要考虑这些,因为你控制不了写出速度,只能控制写出时机。 > > > > 写出时机是由window的结束时间和watermark决定的,所以如果真要解决,需要控制分窗不要固定整点10分钟。 > > > > macia kk

Tumble Window 会带来反压问题吗?

2022-10-19 文章 macia kk
聚合10分钟再输出,到10分钟的时候由于积攒了很多数据,写出速度跟不上,导致反压,然后上游消费就处理变慢了。 如果控制一下写出的速度,让他慢慢写会不会好一些

Flink 的 大Hive 维度表

2022-09-21 文章 macia kk
Hi Flink 的 Hive 维度表是放在内从中,可以把这个放到State中吗,这样用 RocksDB 就能减小一下内存的使用量

Dynamic Table Options 被优化器去掉了

2021-04-25 文章 macia kk
Hi 我有在使用 temporal Joini 的时候有设置 如果读取分区的相关的 dynamic option,但是最后是没有生效的,我看全部使用的默认参数,打印出来了执行计划,逻辑执行计划是有的,优化之后没有了 如下,我设置的是加载最新分区,24小时加载一次,我看最后运行的日志是加载的全部分区,1小时有一次加载,这都是默认的参数,所以怀疑是 dyanmic option 没有生效。 == Abstract Syntax Tree == +- LogicalSnapshot(period=[$cor0.proctime])

Flink Temporal Join Two union Hive Table Error

2021-03-15 文章 macia kk
Hi, 麻烦帮忙看下这个问题: 创建 View promotionTable: SELECT *, 'CN' as country, id as pid FROM promotion_cn_rule_tab UNION SELECT *, 'JP' as country, id as pid FROM promotion_jp_rule_tab FLink SQL Query: SELECT t1.country, t1.promotionId, t1.orderId, CASE WHEN t2.pid IS NULL THEN 'Rebate' ELSE 'Rebate'

Flink Temporal Join Two union Hive Table Error

2021-03-15 文章 macia kk
Hi, 麻烦帮忙看下这个问题: 创建 View promotionTable: SELECT *, 'CN' as country, id as pid FROM promotion_cn_rule_tab UNION SELECT *, 'JP' as country, id as pid FROM promotion_jp_rule_tab FLink SQL Query: SELECT t1.country, t1.promotionId, t1.orderId, CASE WHEN t2.pid IS NULL THEN 'Rebate' ELSE 'Rebate' END

Re: Flink SQL temporal table join with Hive 报错

2021-02-10 文章 macia kk
数据的时候,其实就会有问题。 我看 FLIP-132 <https://cwiki.apache.org/confluence/display/FLINK/FLIP-132+Temporal+Table+DDL+and+Temporal+Table+Join> 有提到 Event Time semantics, 这是以后回支持的吗? Leonard Xu 于2021年2月10日周三 上午11:36写道: > Hi, macia > > > 在 2021年2月9日,10:40,macia kk 写道: &g

Re: Flink SQL temporal table join with Hive 报错

2021-02-08 文章 macia kk
FORMAT (lt.event_time, '-MM-dd') = cast(ex.date_id as String) Rui Li 于2021年2月9日周二 上午10:20写道: > Hi, > > 那join的语句是怎么写的呢? > > On Mon, Feb 8, 2021 at 2:45 PM macia kk wrote: > > > 图就是哪个报错 > > > > 建表语句如下,表示公共表,我也没有改的权限. > > > > CREATE EXTERNAL TABLE `exc

Re: Flink SQL temporal table join with Hive 报错

2021-02-07 文章 macia kk
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' WITH SERDEPROPERTIES ( 'serialization.format' = '1' ) Rui Li 于2021年2月8日周一 下午2:17写道: > 你好,图挂了,可以贴一下hive建表的DDL和join的语句是怎么写的么? > > On Mon, Feb 8, 2021 at 10:33 AM macia kk wrote: > > > Currently the join key in Tempo

Flink SQL temporal table join with Hive 报错

2021-02-07 文章 macia kk
Currently the join key in Temporal Table Join can not be empty. 我的 Hive 表 join DDL 没有设置 is not null ,但是都是有值的,还是会报这个错 [image: image.png]

Flink SQL Hive 使用 partition.include 结果跟文档不一致

2021-02-04 文章 macia kk
Flink 1.12.1 streaming-source.partition.includeOption to set the partitions to read, the supported option are `all` and `latest`, the `all` means read all partitions; the `latest` means read latest partition in order of 'streaming-source.partition.order', the `latest` only works` when the

Flink 读 Hive 表,如何设置 TTL

2021-01-27 文章 macia kk
文档上是在 create table 的时候, 设置 lookup.join.cache.ttl 但是我现在想用 streaming kafka 的数据,join 一张已经存在的 Hive 表,怎么设置TTL? CREATE TABLE dimension_table ( product_id STRING, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time

Scala REPL YARN 运行模式报 NoSuchMethodError setPrintSpaceAfterFullCompletion

2021-01-26 文章 macia kk
bin/start-scala-shell.sh yarn scala> Exception in thread "main" java.lang.NoSuchMethodError: jline.console.completer.CandidateListCompletionHandler.setPrintSpaceAfterFullCompletion(Z)V at scala.tools.nsc.interpreter.jline.JLineConsoleReader.initCompletion(JLineReader.scala:139) at

Re: 关于 stream-stream Interval Join 的问题

2020-12-10 文章 macia kk
的Flink呢? > > 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。 > 所以你的binlog是怎么读进来的呢?自定义的format? > > macia kk 于2020年12月10日周四 上午1:06写道: > > > 我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS event_time > - > > INTERVAL 'x' HOUR > > > > 发现一个很奇怪的问题 ,按理说

Re: 关于 stream-stream Interval Join 的问题

2020-12-09 文章 macia kk
type":1,"transaction_id":11,"reference_id":"11","transaction_sn":"1","merchant_id":1,"status":1,"event_time":" *2020-12-10T01:02:24Z*"} UI 上显示的 watermark 是 1607555031000(Your time zone: 2020年12月10日星期

Re: 关于 stream-stream Interval Join 的问题

2020-12-09 文章 macia kk
边数据。 > > > > > > > (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left > > join。 > > > > (2)此外,还有一个点,这个我也不确认。如果是datastream > > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。 >

Re: 关于 stream-stream Interval Join 的问题

2020-12-08 文章 macia kk
的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么? - 你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要 watermark 来驱动。 我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以 join上,就输出 join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把. Benchao Li 于2020年12月

Re: 关于 stream-stream Interval Join 的问题

2020-12-07 文章 macia kk
抱歉,是 >-30 and <+30 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有 赵一旦 于2020年12月7日 周一23:28写道: > 准确点,2个条件之间没and?2个都是>? > > macia kk 于2020年12月7日周一 下午10:30写道: > > > 不好意思,我上边贴错了 > > > > SELECT * > > FROM A > > LEFT OUT JOIN B > >

Re: 关于 stream-stream Interval Join 的问题

2020-12-07 文章 macia kk
st/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183 > > > Best, > Hailong > 在 2020-12-07 13:10:02,"macia kk" 写道: > >Hi, 各位大佬 > > > > 我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的 > >Flink任务的在处理的时

关于 stream-stream Interval Join 的问题

2020-12-06 文章 macia kk
Hi, 各位大佬 我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的 Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 order item 信息,所以 我用: SELECT * FROM A LEFT OUT JOIN B ON order_id Where A.event_time > B.event_time + 30 s A.event_time > B.event_time - 30 s 我测了下,A 和 BI

Re: 关于去重(Deduplication)

2020-11-15 文章 macia kk
好的,明白了,谢谢 Jark Wu 于2020年11月16日周一 上午10:27写道: > 关于2, 你理解反了。性能上 deduplicate with first row 比 first_value 更优。 因为deduplicate > with first row 在 state 里面只存了 key,value 只用了一个空字节来表示。 > > On Sun, 15 Nov 2020 at 21:47, macia kk wrote: > > > 感谢 Jark 回复, 一直有看你的博客,收益匪浅。 > > > &

Re: 关于去重(Deduplication)

2020-11-15 文章 macia kk
> 2. 性能上 deduplicate 更优,比如 first row, 只保存了 key 的state信息。 > > Best, > Jark > > On Sun, 15 Nov 2020 at 19:23, macia kk wrote: > > > 各位大佬: > > > > 我看文档上建议使用的去重方式是用窗口函数 > > < > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/tabl

关于去重(Deduplication)

2020-11-15 文章 macia kk
各位大佬: 我看文档上建议使用的去重方式是用窗口函数 SELECT [column_list]FROM ( SELECT [column_list], ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS

Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
 不好意思,没注意到 感谢 Benchao Li 于2020年6月7日周日 下午6:47写道: > FROM_UNIXTIME接收的是秒的时间戳,你的maxwell_ts看起来是微秒吧,应该/100吧 > > macia kk 于2020年6月7日周日 下午6:15写道: > >> 打印出来是这样的 >> >> "maxwell_ts":1591518126072000,"ts_watermark":"52403-03-16 00:21:12" &g

Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
打印出来是这样的 "maxwell_ts":1591518126072000,"ts_watermark":"52403-03-16 00:21:12" macia kk 于2020年6月7日周日 下午5:53写道: > 再次感谢,成功了,这个问题困扰我两周了。还有个问题,能麻烦帮我看下嘛 > > 如上,这里定义的 ts_watermark, 想用他们做 watermakr,但是他的返回结果是 null, 我的 maxwell_ts 是 > millseco

Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
t;.stripMargin Leonard Xu 于2020年6月7日周日 下午5:51写道: > Hi, > 1.10确实有这个bug, > 这个问题在1.10.1中已经修复了,你可以使用升级到1.10.1,Benchao之前修了一个计算列相关的,我刚刚看不使用计算列也会有这问题,应该是在[1]中 > jark wu 修复的。 > > Best, > Leonard Xu > [1] https://issues.apache.org/jira/browse/FLINK-16526 < > https://issues.ap

Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
好的,感谢 Benchao Li 于2020年6月7日周日 下午4:04写道: > 1.10还是有bug的,1.10.1已经修复[1]了。可以尝试下1.10.1 > > [1] https://issues.apache.org/jira/browse/FLINK-16068 > > macia kk 于2020年6月7日周日 下午3:51写道: > > > 1.10 > > > > 1048262223 <1048262...@qq.com> 于2020年6月7日周日 下午3:48写道: >

Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
1.10 1048262223 <1048262...@qq.com> 于2020年6月7日周日 下午3:48写道: > Hi > > > 这个好像有同学在群里提到过,也会出现ddl关键字冲突问题,他是通过将版本升到了1.10解决的,能提供下你是用的版本吗? > > > Best, > Yichao Yang > > > > 发自我的iPhone > > > ------ 原始邮件 -- > 发件人: macia kk

Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
e", main_table) bsTableEnv.sqlUpdate("INSERT INTO my_kafak_dst SELECT * FROM main_table") macia kk 于2020年6月7日周日 下午3:45写道: > ```scala > val bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val bsTable

Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
in_table", main_table) bsTableEnv.sqlUpdate("INSERT INTO my_kafak_dst SELECT * FROM main_table") ``` macia kk 于2020年6月7日周日 下午3:41写道: > 下边的代码里,没有用 `table` 字段,我现在只要把 table, database 这两行去掉,可以能跑,只要加上,就会挂 > > Benchao Li 于2020年6月7日周日 下午3:38写道: > >> Hi, >> 看

Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
下边的代码里,没有用 `table` 字段,我现在只要把 table, database 这两行去掉,可以能跑,只要加上,就会挂 Benchao Li 于2020年6月7日周日 下午3:38写道: > Hi, > 看起来你的DDL应该没有太大问题,你可以把代码贴的再完整一点么,可能跟下面的代码有关系。 > > macia kk 于2020年6月7日周日 下午3:33写道: > > > 各位大佬, > > > > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了

Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
各位大佬, 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢 val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) val sourceTable = """CREATE TABLE my_kafak_source (

Re: Table API 有 类似于 SQL API 的 TO_TIMESTAMP 函数吗

2020-05-27 文章 macia kk
Leonard Xu > > > 在 2020年5月28日,10:23,macia kk 写道: > > > > 好的,谢谢, > > > > 放弃治疗,我先尝试DDL,先把 job 跑通,我原以为 Table API 会比 SQL API 自由度大一些,毕竟可以代码层面定制 > > > > Leonard Xu 于2020年5月28日周四 上午10:17写道: > > > >> Hi, > >> 我看了下Descriptor的代码,如果数据源

Re: Table API 有 类似于 SQL API 的 TO_TIMESTAMP 函数吗

2020-05-27 文章 macia kk
/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java#L156 > > > > > > > 在 2020年5月28日,00:45,macia kk 写道: > > > > Hi 各位大佬 > > > > .field("event_time", TIMESTAMP()).rowtime( > > new Rowtime() > >

Table API 有 类似于 SQL API 的 TO_TIMESTAMP 函数吗

2020-05-27 文章 macia kk
Hi 各位大佬 .field("event_time", TIMESTAMP()).rowtime( new Rowtime() .timestampsFromField("maxwell_ts") .watermarksPeriodicBounded(6) ) 我这个 maxwell_ts 是 milliseconds ,直接这么用会报错: Type TIMESTAMP(6) of table field ‘event_time’ does not match with the

Re: Flink SQL FIRST_VALUE 是append 模式吗?

2020-05-27 文章 macia kk
4. topn,排名变化需要更新结果 > 5. window + emit,提前emit的结果需要retract来更新 > > macia kk 于2020年5月27日周三 下午6:19写道: > > > 感谢 Benchao 和 Leonard 的回复 > > > > 我理解错误的地方在于,我以为 Left join 是 append 模式的,对于左边的数据来说,join 上一条就会 emit > > 出去,但是什么情况下会产生 react 消息呢? > > > > Leo

Re: Flink SQL FIRST_VALUE 是append 模式吗?

2020-05-27 文章 macia kk
`, `event_time`, `status`, > > `reference_id` > > |FROM main_table > > |LEFT JOIN merchant_table > > | ON main_table.reference_id = > > merchant_table.transaction_sn > >

Flink SQL FIRST_VALUE 是append 模式吗?

2020-05-26 文章 macia kk
Hi,各位大佬,谁有空帮我看下这个问题 Source: Kafka SinkL Kafka 主要逻辑是 *main_table* left join *merchatn_table* 以后,使用 FIRST_VALUE 函数取第一条 transaction_id,我这个模式应该是 append 模式,但是结果好像不是 Error org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: AppendStreamTableSink requires

Re: Flink SQL 嵌套 nested Json 解析

2020-05-25 文章 macia kk
"user_id":333, "amount":555, "reference_id":"666", "status":3, "transaction_type":3, "merchant_id":2, "update_time":1590416550, "create_time":159041

Re: Flink SQL 嵌套 nested Json 解析

2020-05-25 文章 macia kk
} } ``` 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame Leonard Xu 于2020年5月26日周二 上午8:58写道: > Hi, kk > > 使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看 > > > 祝好, > Leonard Xu > > > > 在 2020年5月26日,01:26,macia kk 写道: > > > > 有哪位大佬帮我看下,谢谢 > > &g

Flink SQL 嵌套 nested Json 解析

2020-05-25 文章 macia kk
有哪位大佬帮我看下,谢谢 尝试了很久,还是无法解析嵌套结构的Json Error Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 4, column 9 to line 4, column 31: Column 'data.transaction_type' not found in any table at

Re: Could not find a suitable table factory for 'TableSourceFactory'

2020-05-24 文章 macia kk
对了还有个问题,我之前看文档使用 `flink-connector-kafka_2.11`一直都无法运行,后来看别人也遇到这道这个问题,改成 `flink-sql-connector-kafka-0.11` 才可以运行,这两个有什么区别,如果不一样的话,对于 table API 最好标明一下用后者 macia kk 于2020年5月25日周一 上午10:05写道: > built.sbt > > val flinkVersion = "1.10.0" > libraryDependencies ++= Seq( > &qu

Re: Could not find a suitable table factory for 'TableSourceFactory'

2020-05-24 文章 macia kk
lt;<<<<<<<<<<<<<<<< Kafka 0.11 "org.apache.flink" % "flink-json" % flinkVersion ) Leonard Xu 于2020年5月25日周一 上午9:33写道: > Hi, > 你使用的kafka connector的版本是0.11的吗?报错看起来有点像版本不对 > > Best, > Leonard Xu > > > 在 2

Re: Could not find a suitable table factory for 'TableSourceFactory'

2020-05-24 文章 macia kk
CliFrontend.java:968) 麻烦帮我看下,谢谢 Lijie Wang 于2020年5月25日周一 上午12:34写道: > Hi,我不能加载你邮件中的图片。从下面的报错看起来是因为找不到 match 的connector。可以检查一下 DDL 中的 with 属性是否正确。 > > > > 在 2020-05-25 00:11:16,"macia kk" 写道: > > 有人帮我看下这个问题吗,谢谢 > > > > > > > org.apache.flink.

Could not find a suitable table factory for 'TableSourceFactory'

2020-05-24 文章 macia kk
有人帮我看下这个问题吗,谢谢 [image: image.png] [image: image.png] org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: findAndCreateTableSource failed. Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for