Re: 如何使用flink sql优雅的处理大量嵌套if-else逻辑

2022-11-28 文章 macia kk
我会选择 UDF  + 配置文件,把配置文件放 HDFS上,UDF读这个配置文件。每次更新HDFS的配置文件,重启下任务

casel.chen  于2022年11月24日周四 12:01写道:

> 我有一个flink
> sql作业需要根据不同字段值满足不同条件来设置另一个字段值,还有一些嵌套if-else逻辑,这块逻辑不是固定的,业务方会过一段时间调整一次。
> 想请问如何使用flink sql优雅的处理嵌套if-else逻辑呢?我有想到使用drools规则引擎,通过udf来调用,不知道还有没有更好的办法?
>
>


Re: Tumble Window 会带来反压问题吗?

2022-10-20 文章 macia kk
Hi  yidan

我的的意思是,假设上游 1-10 分钟在处理数据,然后第11分钟就把大批量数据发给 sink,然后上游继续进行 10-20的处理,但是这时候 sink
由于数据量大产生了阻塞,造成反压反馈给上游,上游就变慢了。但实际上如果没有反压机制。10-20 的时候,sink
其实可以慢慢写完的。唯一的区别是他发送了一个反压信号,导致上游处理变慢。不知道理解的对不对。


为了要10分钟发送,是因为上游太多数据, 所以我先提前用窗口个聚合一下,目前一秒将近有 800MB 的流量



Shammon FY  于2022年10月20日周四 11:48写道:

> 如果必须要10分钟,但是key比较分散,感觉这种情况可以增加资源加大一下并发试试,减少每个task发出的数据量
>
> On Thu, Oct 20, 2022 at 9:49 AM yidan zhao  wrote:
>
> > 这个描述前后矛盾,写出速度跟不上导致反压,那控制写出速度不是问题更大。不过你不需要考虑这些,因为你控制不了写出速度,只能控制写出时机。
> >
> > 写出时机是由window的结束时间和watermark决定的,所以如果真要解决,需要控制分窗不要固定整点10分钟。
> >
> > macia kk  于2022年10月20日周四 00:57写道:
> > >
> > > 聚合10分钟再输出,到10分钟的时候由于积攒了很多数据,写出速度跟不上,导致反压,然后上游消费就处理变慢了。
> > >
> > > 如果控制一下写出的速度,让他慢慢写会不会好一些
> >
>


Tumble Window 会带来反压问题吗?

2022-10-19 文章 macia kk
聚合10分钟再输出,到10分钟的时候由于积攒了很多数据,写出速度跟不上,导致反压,然后上游消费就处理变慢了。

如果控制一下写出的速度,让他慢慢写会不会好一些


Flink 的 大Hive 维度表

2022-09-21 文章 macia kk
Hi
  Flink 的 Hive 维度表是放在内从中,可以把这个放到State中吗,这样用 RocksDB 就能减小一下内存的使用量


Dynamic Table Options 被优化器去掉了

2021-04-25 文章 macia kk
Hi

  我有在使用 temporal Joini 的时候有设置 如果读取分区的相关的 dynamic
option,但是最后是没有生效的,我看全部使用的默认参数,打印出来了执行计划,逻辑执行计划是有的,优化之后没有了
  如下,我设置的是加载最新分区,24小时加载一次,我看最后运行的日志是加载的全部分区,1小时有一次加载,这都是默认的参数,所以怀疑是 dyanmic
option 没有生效。


== Abstract Syntax Tree ==
+- LogicalSnapshot(period=[$cor0.proctime])
   +- LogicalTableScan(table=[[ds, my_db, store_da_table,
source: [HiveTableSource(store_id, store_name, merchant_id, tag_id,
brand_id, tob_user_id, is_use_wallet, is_use_merchant_app, longitude,
latitude, state, city, district, address, postal_code, register_phone,
email, email_source, register_time, logo, banner, partner_type,
commission_rate, tax_rate, service_fee, min_spend, delivery_distance,
preparation_time, contact_phone, store_status, closed_start_time,
closed_end_time, effective_closed_end_time, auto_confirmed,
auto_confirmed_enabled, create_time, update_time, rating_total,
rating_score, opening_status, surcharge_intervals, service_charge_fee_rate,
driver_modify_order_enabled, delivery_distance_mode, business_info_added,
mtime, dt, grass_region) TablePath: my_db.store_da_table, PartitionPruned:
false, PartitionNums: null], dynamic options:
{streaming-source.enable=true, streaming-source.monitor-interval=24 h,
streaming-source.partition.include=latest}]])

== Optimized Logical Plan ==
Calc(select=[_UTF-16LE'v4' AS version, _UTF-16LE'ID' AS country, city, id,
event_time, operation, platform, payment_method, gmv, 0.0:DECIMAL(2, 1) AS
gmv_usd], where=[NOT(LIKE(UPPER(store_name), _UTF-16LE'%[TEST]%'))])
+- LookupJoin(table=[ds.my_db.store_da_table],
joinType=[LeftOuterJoin], async=[false], lookup=[store_id=store_id],
select=[city, id, event_time, operation, platform, payment_method, gmv,
store_id, store_id, store_name])
   +- Union(all=[true], union=[city, id, event_time, operation, platform,
payment_method, gmv, store_id])
  :- Calc(select=[delivery_city AS city, id, /(CAST(create_time), 1000)
AS event_time, CASE(OR(=(order_status, 440), =(order_status, 800)),
_UTF-16LE'NET':VARCHAR(5) CHARACTER SET "UTF-16LE",
_UTF-16LE'GROSS':VARCHAR(5) CHARACTER SET "UTF-16LE") AS operation,
_UTF-16LE'' AS platform, payment_method, /(CAST(total_amount), 10)
AS gmv, CAST(store_id) AS store_id])
  :  +- DataStreamScan(table=[[ds, keystats,
main_db__transaction_tab]], fields=[id, delivery_city, store_id,
create_time, payment_time, order_status, payment_method, total_amount,
proctime], reuse_id=[1])
  +- Calc(select=[delivery_city AS city, id, /(CAST(payment_time),
1000) AS event_time, _UTF-16LE'NET':VARCHAR(5) CHARACTER SET "UTF-16LE" AS
operation, _UTF-16LE'AIRPAY' AS platform, payment_method,
/(CAST(total_amount), 10) AS gmv, CAST(store_id) AS store_id],
where=[OR(=(order_status, 440), =(order_status, 800))])
 +- Reused(reference_id=[1])


Flink Temporal Join Two union Hive Table Error

2021-03-15 文章 macia kk
Hi, 麻烦帮忙看下这个问题:


创建 View promotionTable:

SELECT *, 'CN' as country, id as pid
FROM promotion_cn_rule_tab
UNION
SELECT *, 'JP' as country, id as pid
FROM promotion_jp_rule_tab


FLink SQL Query:

SELECT t1.country, t1.promotionId, t1.orderId,
CASE WHEN t2.pid IS NULL THEN 'Rebate'
ELSE 'Rebate'
END AS rebate
FROM eventTable AS t1
LEFT JOIN promotionTable
/*+ OPTIONS('streaming-source.enable' = 'false',
'streaming-source.partition.include' = 'all',
'lookup.join.cache.ttl' = '5 m') */
FOR SYSTEM_TIME AS OF t1.procTime AS t2
ON t1.promotionId = t2.pid
AND t1.country = t2.country


如果去掉 Hive 表的 union ,只保留一个国家的 Hive 表,可以run 成功,但是如果 Union 两张表的话,会得到错误:

Caused by: org.apache.flink.table.api.ValidationException: Currently the
join key in Temporal Table Join can not be empty


Flink Temporal Join Two union Hive Table Error

2021-03-15 文章 macia kk
Hi, 麻烦帮忙看下这个问题:


创建 View promotionTable:

SELECT *, 'CN' as country, id as pid FROM promotion_cn_rule_tab UNION
SELECT *, 'JP' as country, id as pid FROM promotion_jp_rule_tab

FLink SQL Query:
SELECT t1.country, t1.promotionId, t1.orderId, CASE WHEN t2.pid IS NULL
THEN 'Rebate' ELSE 'Rebate' END AS rebate FROM eventTable AS t1 LEFT JOIN
promotionTable /*+ OPTIONS('streaming-source.enable' = 'false',
'streaming-source.partition.include' = 'all', 'lookup.join.cache.ttl' = '5
m') */ FOR SYSTEM_TIME AS OF t1.procTime AS t2 ON t1.promotionId = t2.pid
AND t1.country = t2.country


如果去掉 Hive 表的 union ,只保留一个国家的 Hive 表,可以run 成功,但是如果 Union 两张表的话,会得到错误:

Caused by: org.apache.flink.table.api.ValidationException: Currently the
join key in Temporal Table Join can not be empty


Re: Flink SQL temporal table join with Hive 报错

2021-02-10 文章 macia kk
Hi, Leonard

 我们的业务变得越来越复杂,所以现在需要 Join Hive 维表的情况非常普遍。现在维表分三种情况

 一,维表没有分区,没有 primary key

  这时候 `'streaming-source.partition.include' = 'latest',因为没有
parition,所以 latest 应该加载的就是全部的数据。

 二,维表有有分区,每个分区仅仅包含当天的数据,没有 primary key

  这种情况因为要 Join 全部的数据,所以还是需要设置 'streaming-source.partition.include' =
'all',但是还是因为没有 primary Key,所以无法 run。

三,维表有分区,每个分区包含全量数据,没有 primiary key

  这种情况可以设置,'streaming-source.partition.include' = 'latest',这个是是官网的案例,测试没有问题。


现在就是针对第二种情况,因为Hive的维度表不是我维护的,很多人都在用,所以不能修改去加上 primary key,无法进行 join.




--- BYW---

还有我看文档现在不支持 event time join, 官网的汇率是按照 process time
join,但是如果要回溯昨天的数据的时候,其实就会有问题。

我看 FLIP-132
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-132+Temporal+Table+DDL+and+Temporal+Table+Join>
有提到 Event
Time semantics, 这是以后回支持的吗?



Leonard Xu  于2021年2月10日周三 上午11:36写道:

> Hi,  macia
>
> > 在 2021年2月9日,10:40,macia kk  写道:
> >
> > SELECT *FROM
> >(
> >SELECT  tt.*
> >FROM
> >input_tabe_01 tt
> >FULL OUTER JOIN input_tabe_02 mt
> >ON (mt.transaction_sn = tt.reference_id)
> >and tt.create_time >= mt.create_time + INTERVAL '5' MINUTES
> >and tt.create_time <= mt.create_time - INTERVAL '5' MINUTES
> >WHERE COALESCE(tt.create_time, mt.create_time) is not NULL
> >) lt
> >LEFT JOIN exchange_rate ex
> >/*+
> OPTIONS('streaming-source.enable'='true','streaming-source.partition.include'
> > = 'all') */
> >FOR SYSTEM_TIME AS OF lt.event_time ex ON DATE_FORMAT
> > (lt.event_time, '-MM-dd') = cast(ex.date_id as String)
>
>
> 你说的异常我本地没有复现,异常栈能直接贴下吗?
>
> 另外看你写的是lt.event_time,
> 这个sql的是要做版本表的维表关联吗?目前Hive还不支持指定版本表的,只支持最新分区作为维表或者整个hive表作为维表,
> 两种维表的option你可参考下[1]
>
> 祝好,
> Leonard
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/hive/hive_read_write.html#temporal-join-the-latest-table
>
>
>
>


Re: Flink SQL temporal table join with Hive 报错

2021-02-08 文章 macia kk
SELECT *FROM
(
SELECT  tt.*
FROM
input_tabe_01 tt
FULL OUTER JOIN input_tabe_02 mt
ON (mt.transaction_sn = tt.reference_id)
and tt.create_time >= mt.create_time + INTERVAL '5' MINUTES
and tt.create_time <= mt.create_time - INTERVAL '5' MINUTES
WHERE COALESCE(tt.create_time, mt.create_time) is not NULL
) lt
LEFT JOIN exchange_rate ex
/*+ 
OPTIONS('streaming-source.enable'='true','streaming-source.partition.include'
= 'all') */
FOR SYSTEM_TIME AS OF lt.event_time ex ON DATE_FORMAT
(lt.event_time, '-MM-dd') = cast(ex.date_id as String)


Rui Li  于2021年2月9日周二 上午10:20写道:

> Hi,
>
> 那join的语句是怎么写的呢?
>
> On Mon, Feb 8, 2021 at 2:45 PM macia kk  wrote:
>
> > 图就是哪个报错
> >
> > 建表语句如下,表示公共表,我也没有改的权限.
> >
> > CREATE EXTERNAL TABLE `exchange_rate`(`grass_region` string COMMENT
> > 'country', `currency` string COMMENT 'currency', `exchange_rate`
> > decimal(25,10) COMMENT 'exchange rate')
> > PARTITIONED BY (`grass_date` date COMMENT 'partition key, -MM-dd')
> > ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io
> > .parquet.serde.ParquetHiveSerDe'
> > WITH SERDEPROPERTIES (
> >   'serialization.format' = '1'
> > )
> >
> >
> > Rui Li  于2021年2月8日周一 下午2:17写道:
> >
> > > 你好,图挂了,可以贴一下hive建表的DDL和join的语句是怎么写的么?
> > >
> > > On Mon, Feb 8, 2021 at 10:33 AM macia kk  wrote:
> > >
> > > > Currently the join key in Temporal Table Join can not be empty.
> > > >
> > > > 我的 Hive 表 join DDL 没有设置 is not null ,但是都是有值的,还是会报这个错
> > > >
> > > > [image: image.png]
> > > >
> > >
> > >
> > > --
> > > Best regards!
> > > Rui Li
> > >
> >
>
>
> --
> Best regards!
> Rui Li
>


Re: Flink SQL temporal table join with Hive 报错

2021-02-07 文章 macia kk
图就是哪个报错

建表语句如下,表示公共表,我也没有改的权限.

CREATE EXTERNAL TABLE `exchange_rate`(`grass_region` string COMMENT
'country', `currency` string COMMENT 'currency', `exchange_rate`
decimal(25,10) COMMENT 'exchange rate')
PARTITIONED BY (`grass_date` date COMMENT 'partition key, -MM-dd')
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
)


Rui Li  于2021年2月8日周一 下午2:17写道:

> 你好,图挂了,可以贴一下hive建表的DDL和join的语句是怎么写的么?
>
> On Mon, Feb 8, 2021 at 10:33 AM macia kk  wrote:
>
> > Currently the join key in Temporal Table Join can not be empty.
> >
> > 我的 Hive 表 join DDL 没有设置 is not null ,但是都是有值的,还是会报这个错
> >
> > [image: image.png]
> >
>
>
> --
> Best regards!
> Rui Li
>


Flink SQL temporal table join with Hive 报错

2021-02-07 文章 macia kk
Currently the join key in Temporal Table Join can not be empty.

我的 Hive 表 join DDL 没有设置 is not null ,但是都是有值的,还是会报这个错

[image: image.png]


Flink SQL Hive 使用 partition.include 结果跟文档不一致

2021-02-04 文章 macia kk
Flink 1.12.1

streaming-source.partition.includeOption to set the partitions to read, the
supported option are `all` and `latest`, the `all` means read all
partitions; the `latest` means read latest partition in order of
'streaming-source.partition.order', the `latest` only works` when the
streaming hive source table used as temporal table. By default the option
is `all`.


报错
Flink SQL> SELECT * FROM exrate_table /*+
OPTIONS('streaming-source.enable'='true','streaming-source.partition.include'
= 'latest') */; [ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: The only supported
'streaming-source.partition.include' is 'all' in hive table scan, but is
'latest'


Flink 读 Hive 表,如何设置 TTL

2021-01-27 文章 macia kk
文档上是在 create table 的时候, 设置 lookup.join.cache.ttl

但是我现在想用 streaming kafka 的数据,join 一张已经存在的 Hive 表,怎么设置TTL?

CREATE TABLE dimension_table (
  product_id STRING,
  product_name STRING,
  unit_price DECIMAL(10, 4),
  pv_count BIGINT,
  like_count BIGINT,
  comment_count BIGINT,
  update_time TIMESTAMP(3),
  update_user STRING,
  ...) TBLPROPERTIES (
  'streaming-source.enable' = 'false',   -- option with
default value, can be ignored.
  'streaming-source.partition.include' = 'all',  -- option with
default value, can be ignored.
  'lookup.join.cache.ttl' = '12 h');


Scala REPL YARN 运行模式报 NoSuchMethodError setPrintSpaceAfterFullCompletion

2021-01-26 文章 macia kk
 bin/start-scala-shell.sh  yarn


scala> Exception in thread "main" java.lang.NoSuchMethodError:
jline.console.completer.CandidateListCompletionHandler.setPrintSpaceAfterFullCompletion(Z)V
at
scala.tools.nsc.interpreter.jline.JLineConsoleReader.initCompletion(JLineReader.scala:139)
at
scala.tools.nsc.interpreter.jline.InteractiveReader.postInit(JLineReader.scala:54)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$25.apply(ILoop.scala:899)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$25.apply(ILoop.scala:897)
at
scala.tools.nsc.interpreter.SplashReader.postInit(InteractiveReader.scala:130)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$scala$tools$nsc$interpreter$ILoop$$anonfun$$loopPostInit$1$1.apply$mcV$sp(ILoop.scala:926)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$scala$tools$nsc$interpreter$ILoop$$anonfun$$loopPostInit$1$1.apply(ILoop.scala:908)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$scala$tools$nsc$interpreter$ILoop$$anonfun$$loopPostInit$1$1.apply(ILoop.scala:908)
at scala.tools.nsc.interpreter.ILoop$$anonfun$mumly$1.apply(ILoop.scala:189)
at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:221)
at scala.tools.nsc.interpreter.ILoop.mumly(ILoop.scala:186)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1$$anonfun$startup$1$1.apply(ILoop.scala:979)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:990)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:891)
at
scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:891)
at
scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:891)
at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:184)
at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:131)
at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)
va
Exception in thread "Thread-2" java.lang.InterruptedException
at java.util.concurrent.SynchronousQueue.put(SynchronousQueue.java:879)
at scala.tools.nsc.interpreter.SplashLoop.run(InteractiveReader.scala:77)
at java.lang.Thread.run(Thread.java:748)
```


Re: 关于 stream-stream Interval Join 的问题

2020-12-10 文章 macia kk
你用的是哪个版本的Flink呢?
-
1.11.2

看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
所以你的binlog是怎么读进来的呢?自定义的format?
-
ts 就是时间戳

bsTableEnv.executeSql("""
  CREATE TABLE input_database (
`table` STRING,
`database` STRING,
`data` ROW(
  reference_id STRING,
  transaction_sn STRING,
  transaction_type BIGINT,
  merchant_id BIGINT,
  transaction_id BIGINT,
  status BIGINT
 ),
ts BIGINT,
event_time AS TO_TIMESTAMP(FROM_UNIXTIME(ts)),
WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR
 ) WITH (
   'connector.type' = 'kafka',
   'connector.version' = '0.11',
   'connector.topic' = 'mytopic',
   'connector.properties.bootstrap.servers' = '',
   'format.type' = 'json'
 )
)



```



Benchao Li  于2020年12月10日周四 下午6:14写道:

> 你用的是哪个版本的Flink呢?
>
> 看起来你的watermark固定快8个小时的话,应该是时区问题,而不是数据问题。
> 所以你的binlog是怎么读进来的呢?自定义的format?
>
> macia kk  于2020年12月10日周四 上午1:06写道:
>
> > 我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS event_time
> -
> > INTERVAL 'x' HOUR
> >
> >  发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness
> >
> > 但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x,
> > 能够反推出来数据的 currentMaxTimestamp
> >
> > currentMaxTimestamp = watermark + maxOutOfOrderness
> >
> > 但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快
> > 8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。
> >
> >
> > 但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55
> >
> >
> {"table":"transaction_tab_0122","database":"main_db","transaction_type":1,"transaction_id":11,"reference_id":"11","transaction_sn":"1","merchant_id":1,"status":1,"event_time":"
> > *2020-12-10T01:02:24Z*"}
> >
> > UI 上显示的 watermark 是 1607555031000(Your time zone: 2020年12月10日星期四早上7点02分
> > GMT+08:00)
> >
> > 这个 watermark 是未来的时间 
> >
> >
> >
> >
> >
> > macia kk  于2020年12月9日周三 下午11:36写道:
> >
> > > 感谢 一旦 和 Benchao
> > >
> > >   1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join 上的数据,但是我
> > Job
> > > 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。
> > >
> > > val result = bsTableEnv.sqlQuery("""
> > >SELECT *
> > >FROM (
> > >   SELECT t1.`table`, t1.`database`, t1.transaction_type,
> > t1.transaction_id,
> > > t1.reference_id, t1.transaction_sn, t1.merchant_id,
> > t1.status, t1.event_time
> > >   FROM main_db as t1
> > >   LEFT JOIN main_db as t2
> > >   ON t1.reference_id = t2.reference_id
> > >   WHERE t1.event_time >= t2.event_time + INTERVAL '5' MINUTES
> > >AND t1.event_time <= t2.event_time - INTERVAL '5' MINUTES
> > >)
> > >   """.stripMargin)
> > >
> > > 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到
> > >
> > > 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source
> > > subtask的watermark。
> > > ---
> > > 这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的 watermark
> > > 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog.
> > >
> > > 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为
> event
> > > time,但是有的表又没有这个字段,会导致解析的时候直接报错.
> > >
> > > 5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark
> > > 加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为.
> > >
> > >
> > > Thanks and best regards
> > >
> > >
> > > Benchao Li  于2020年12月9日周三 上午10:24写道:
> > >
> > >> Hi macia,
> > >>
> > >> 一旦回答的基本比较完整了。
> > >> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。
> > >> 如果是两侧都有数据,watermark不前进,也都可以正常输出。
> > >>
> > >> 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source
> > subtask见到的最大的watermark
> > >> 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay
> > 10个小时,这个已经会导致
> > >> 你的没有join到的数据下发会延迟很多了。
> > >>
> > >> 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。
> > >>
> > >> 赵一旦  于2020年12月9日周三 上午10:15写道:
> > >>
> > >> > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。
> > >> >
> > >> >
> 

Re: 关于 stream-stream Interval Join 的问题

2020-12-09 文章 macia kk
我刚才跑了很多任务,设置不同的 maxOutOfOrderness WATERMARK FOR event_time AS event_time -
INTERVAL 'x' HOUR

 发现一个很奇怪的问题 ,按理说 watermark = currentMaxTimestamp - maxOutOfOrderness

但是 我通过 页面上的 watermark 时间,和我设置 maxOutOfOrderness x,
能够反推出来数据的 currentMaxTimestamp

currentMaxTimestamp = watermark + maxOutOfOrderness

但是我无论设置多少的 maxOutOfOrderness, 反推出来的 currentMaxTimestamp 比现在此时此刻的时间快
8个小时,也就是说 currentMaxTimestamp 在未来后的 8个小时,这个数字一直是固定的8。


但是,我进行 Join, 直接输出任意一张表,得到的 evet time 都是对的,比如现在 00:55
{"table":"transaction_tab_0122","database":"main_db","transaction_type":1,"transaction_id":11,"reference_id":"11","transaction_sn":"1","merchant_id":1,"status":1,"event_time":"
*2020-12-10T01:02:24Z*"}

UI 上显示的 watermark 是 1607555031000(Your time zone: 2020年12月10日星期四早上7点02分
GMT+08:00)

这个 watermark 是未来的时间 





macia kk  于2020年12月9日周三 下午11:36写道:

> 感谢 一旦 和 Benchao
>
>   1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join 上的数据,但是我 Job
> 跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。
>
> val result = bsTableEnv.sqlQuery("""
>SELECT *
>FROM (
>   SELECT t1.`table`, t1.`database`, t1.transaction_type, 
> t1.transaction_id,
> t1.reference_id, t1.transaction_sn, t1.merchant_id, t1.status, 
> t1.event_time
>   FROM main_db as t1
>   LEFT JOIN main_db as t2
>   ON t1.reference_id = t2.reference_id
>   WHERE t1.event_time >= t2.event_time + INTERVAL '5' MINUTES
>AND t1.event_time <= t2.event_time - INTERVAL '5' MINUTES
>)
>   """.stripMargin)
>
> 2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到
>
> 3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source
> subtask的watermark。
> ---
> 这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的 watermark
> 永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog.
>
> 4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为 event
> time,但是有的表又没有这个字段,会导致解析的时候直接报错.
>
> 5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark
> 加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为.
>
>
> Thanks and best regards
>
>
> Benchao Li  于2020年12月9日周三 上午10:24写道:
>
>> Hi macia,
>>
>> 一旦回答的基本比较完整了。
>> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。
>> 如果是两侧都有数据,watermark不前进,也都可以正常输出。
>>
>> 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source subtask见到的最大的watermark
>> 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay 10个小时,这个已经会导致
>> 你的没有join到的数据下发会延迟很多了。
>>
>> 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。
>>
>> 赵一旦  于2020年12月9日周三 上午10:15写道:
>>
>> > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。
>> >
>> >
>> >
>> (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left
>> > join。
>> >
>> > (2)此外,还有一个点,这个我也不确认。如果是datastream
>> > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。
>> >
>> >
>> input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。
>> >
>> > macia kk  于2020年12月9日周三 上午1:17写道:
>> >
>> > > @Benchao Li   感谢回复,这个问题困扰我半年了,导致我一直不能迁移到
>> > > FLink,可能我的Case 太特殊了.
>> > >
>> > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的
>> Binlog,我需要
>> > > filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个 DB
>> > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集.
>> > >
>> > > 还要注意的是 even time 是 create_time, 这里问题非常大:
>> > >  1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱
>> > >  2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响
>> > watermark
>> > > forward on.
>> > >
>> > > bsTableEnv.executeSql("""
>> > >   CREATE TABLE input_database (
>> > > `table` STRING,
>> > > `database` STRING,
>> > > `data` ROW(
>> > >   reference_id STRING,
>> > >   transaction_sn STRING,
>> > >   transaction_type BIGINT,
>> > >   merchant_id BIGINT,
>> > >   transaction_id BIGINT,
>> > >   status BIGINT
>> > >  ),
>> > > ts BIGINT,
>> > > event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)),
>> > > WATERMARK FOR event_time

Re: 关于 stream-stream Interval Join 的问题

2020-12-09 文章 macia kk
感谢 一旦 和 Benchao

  1. 如果是我的 watermark 设置过长,导致无法输出的话,是有点疑问的,因为我可以确定的是,一定会有 Join 上的数据,但是我 Job
跑了几天也没有一条输出。我还试了如下的SQL,自己 Join 自己,所以理论肯定是直接输出的,实际也是一条也没有。

val result = bsTableEnv.sqlQuery("""
   SELECT *
   FROM (
  SELECT t1.`table`, t1.`database`, t1.transaction_type,
t1.transaction_id,
t1.reference_id, t1.transaction_sn, t1.merchant_id,
t1.status, t1.event_time
  FROM main_db as t1
  LEFT JOIN main_db as t2
  ON t1.reference_id = t2.reference_id
  WHERE t1.event_time >= t2.event_time + INTERVAL '5' MINUTES
   AND t1.event_time <= t2.event_time - INTERVAL '5' MINUTES
   )
  """.stripMargin)

2. 一旦提到的 watermark 传递的问题,我可以确认的是,会传递下去,这可以在 UI 上看到

3. 这个底层的watermark只会取当前source subtask见到的最大的watermark 作为这个source
subtask的watermark。
---
这里应该是使用 source subtask 最小的 watermark 传递过去,因为我可以看到的是,我的 watermark
永远和现在相差8个小时,所以怀疑是有一张表,总是会迟8个小时才会有 BinLog.

4. Flink SQL 有没有方法在定义 schema 的时候,如果一个字段不存在,就是 null,我现在想换另外一个时间字段作为 event
time,但是有的表又没有这个字段,会导致解析的时候直接报错.

5. 我能不能不在 input_table 上注册 water mark,在 filter 出两张表后,再把 watermark
加载两张表上,这样可以避免因为别的表,导致 watermark 停止不前,混乱的行为.


Thanks and best regards


Benchao Li  于2020年12月9日周三 上午10:24写道:

> Hi macia,
>
> 一旦回答的基本比较完整了。
> watermark影响的主要是left join没有join到的情况下,+(left, null)这样的数据输出的时机。
> 如果是两侧都有数据,watermark不前进,也都可以正常输出。
>
> 关于watermark,如果你的事件时间忽高忽低,这个底层的watermark只会取当前source subtask见到的最大的watermark
> 作为这个source subtask的watermark。但是你的watermark计算逻辑本身就是事件时间delay 10个小时,这个已经会导致
> 你的没有join到的数据下发会延迟很多了。
>
> 你也可以尝试下用处理时间来做一下interval join,看看能不能达到预期。
>
> 赵一旦  于2020年12月9日周三 上午10:15写道:
>
> > 重点是watermark是否推进了,如果不推进,left join也无法知道什么时候右边就没数据了,可以仅输出左边数据。
> >
> >
> >
> (1)你这个的话我看到一个问题,就是watermark你定义10小时的maxOutOfOrderness,确定这么长嘛要,这么大的maxOutOfOrderness,会导致join到的则会及时输出,join不到的需要等10小时才能输出“仅左边”数据,即left
> > join。
> >
> > (2)此外,还有一个点,这个我也不确认。如果是datastream
> > api,watermark是可以正常传播的,不清楚flinkSQL情况是否能这么传播。
> >
> >
> input_database中定义了watermark,从input_database到2个filter后的表不清楚是否还存在watermark(我感觉是存在的),只要存在那就没问题,唯一需要注意的是第1点。
> >
> > macia kk  于2020年12月9日周三 上午1:17写道:
> >
> > > @Benchao Li   感谢回复,这个问题困扰我半年了,导致我一直不能迁移到
> > > FLink,可能我的Case 太特殊了.
> > >
> > > 我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的
> Binlog,我需要
> > > filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个 DB
> > > 中的两个表。所以这里的字段我定义的是 两张表的字段的并集.
> > >
> > > 还要注意的是 even time 是 create_time, 这里问题非常大:
> > >  1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱
> > >  2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响
> > watermark
> > > forward on.
> > >
> > > bsTableEnv.executeSql("""
> > >   CREATE TABLE input_database (
> > > `table` STRING,
> > > `database` STRING,
> > > `data` ROW(
> > >   reference_id STRING,
> > >   transaction_sn STRING,
> > >   transaction_type BIGINT,
> > >   merchant_id BIGINT,
> > >   transaction_id BIGINT,
> > >   status BIGINT
> > >  ),
> > > ts BIGINT,
> > > event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)),
> > > WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR
> > >  ) WITH (
> > >'connector.type' = 'kafka',
> > >'connector.version' = '0.11',
> > >'connector.topic' = 'mytopic',
> > >'connector.properties.bootstrap.servers' = '',
> > >'format.type' = 'json'
> > >  )
> > > """)
> > >
> > >
> > > 分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。
> > >
> > > val main_db = bsTableEnv.sqlQuery("""
> > >   | SELECT *
> > >   | FROM input_database
> > >   | WHERE `database` = 'main_db'
> > >   |  AND `table` LIKE 'transaction_tab%'
> > >   | """.stripMargin)
> > >
> > > val merchant_db = bsTableEnv.sqlQuery("""
> > >   | SELECT *
> > >   | FROM input_database
> > >   | WHERE `database` = 'merchant_db'
> > >   |   AND `table` LIKE 'transaction_tab%'
> > >   | """.stripMargin)
> > >
> > > bsTableEnv.createTemporaryView("main_db", main_db)
> > > bsTableEnv.createTemporaryView("merchant_db", merchant_db)
> > >
> > > val result = bsT

Re: 关于 stream-stream Interval Join 的问题

2020-12-08 文章 macia kk
@Benchao Li   感谢回复,这个问题困扰我半年了,导致我一直不能迁移到
FLink,可能我的Case 太特殊了.

我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 Binlog,我需要
filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个 DB
中的两个表。所以这里的字段我定义的是 两张表的字段的并集.

还要注意的是 even time 是 create_time, 这里问题非常大:
 1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱
 2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响 watermark
forward on.

bsTableEnv.executeSql("""
  CREATE TABLE input_database (
`table` STRING,
`database` STRING,
`data` ROW(
  reference_id STRING,
  transaction_sn STRING,
  transaction_type BIGINT,
  merchant_id BIGINT,
  transaction_id BIGINT,
  status BIGINT
 ),
ts BIGINT,
event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)),
WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR
 ) WITH (
   'connector.type' = 'kafka',
   'connector.version' = '0.11',
   'connector.topic' = 'mytopic',
   'connector.properties.bootstrap.servers' = '',
   'format.type' = 'json'
 )
""")


分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。

val main_db = bsTableEnv.sqlQuery("""
  | SELECT *
  | FROM input_database
  | WHERE `database` = 'main_db'
  |  AND `table` LIKE 'transaction_tab%'
  | """.stripMargin)

val merchant_db = bsTableEnv.sqlQuery("""
  | SELECT *
  | FROM input_database
  | WHERE `database` = 'merchant_db'
  |   AND `table` LIKE 'transaction_tab%'
  | """.stripMargin)

bsTableEnv.createTemporaryView("main_db", main_db)
bsTableEnv.createTemporaryView("merchant_db", merchant_db)

val result = bsTableEnv.sqlQuery("""
   SELECT *
   FROM (
  SELECT t1.`table`, t1.`database`, t1.transaction_type,
t1.transaction_id,
t1.reference_id, t1.transaction_sn, t1.merchant_id,
t1.status, t1.event_time
  FROM main_db as t1
  LEFT JOIN merchant_db as t2
  ON t1.reference_id = t2.reference_id
  WHERE t1.event_time >= t2.event_time + INTERVAL '1' HOUR
   AND t1.event_time <= t2.event_time - INTERVAL '1' HOUR
   )
  """.stripMargin)



事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
-
你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要 watermark
来驱动。
我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以 join上,就输出
join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把.






Benchao Li  于2020年12月8日周二 下午3:23写道:

> hi macia,
>
> 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
>
> macia kk  于2020年12月8日周二 上午1:15写道:
>
> > 抱歉,是 >-30 and <+30
> >
> > 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有
> >
> > 赵一旦 于2020年12月7日 周一23:28写道:
> >
> > > 准确点,2个条件之间没and?2个都是>?
> > >
> > > macia kk  于2020年12月7日周一 下午10:30写道:
> > >
> > > > 不好意思,我上边贴错了
> > > >
> > > > SELECT *
> > > >  FROM A
> > > >  LEFT OUT JOIN B
> > > >  ON order_id
> > > >  Where A.event_time > B.event_time -  30 s
> > > >  A.event_time > B.event_time + 30 s
> > > >
> > > > event_time 是 Time Attributes 设置的 event_time
> > > >
> > > > 这样是没有输出的。
> > > >
> > > >
> > > >
> > > > interval join 左右表在 state 中是缓存多久的?
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > hailongwang <18868816...@163.com> 于2020年12月7日周一 下午8:05写道:
> > > >
> > > > > Hi,
> > > > > 其中 条件是
> > > > > `Where A.event_time < B.event_time + 30 s and A.event_time >
> > > B.event_time
> > > > > - 30 s ` 吧
> > > > > 可以参考以下例子[1],看下有木有写错。
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183
> > > > >
> > > > >
> > > > > Best,
> > > > > Hailong
> > > > > 在 2020-12-07 13:10:02,"macia kk"  写道:
> > > > > >Hi, 各位大佬
> > > > > >
> > > > > >  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的
> > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是
> order
> > > > item
> > > > > >信息,所以 我用:
> > > > > >
> > > > > > SELECT *
> > > > > > FROM A
> > > > > > LEFT OUT JOIN B
> > > > > > ON order_id
> > > > > > Where A.event_time > B.event_time + 30 s
> > > > > > A.event_time > B.event_time - 30 s
> > > > > >
> > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark
> > > > > Structural
> > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
> > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题?
> > > > >
> > > >
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: 关于 stream-stream Interval Join 的问题

2020-12-07 文章 macia kk
抱歉,是 >-30 and <+30

贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有

赵一旦 于2020年12月7日 周一23:28写道:

> 准确点,2个条件之间没and?2个都是>?
>
> macia kk  于2020年12月7日周一 下午10:30写道:
>
> > 不好意思,我上边贴错了
> >
> > SELECT *
> >  FROM A
> >  LEFT OUT JOIN B
> >  ON order_id
> >  Where A.event_time > B.event_time -  30 s
> >  A.event_time > B.event_time + 30 s
> >
> > event_time 是 Time Attributes 设置的 event_time
> >
> > 这样是没有输出的。
> >
> >
> >
> > interval join 左右表在 state 中是缓存多久的?
> >
> >
> >
> >
> >
> >
> > hailongwang <18868816...@163.com> 于2020年12月7日周一 下午8:05写道:
> >
> > > Hi,
> > > 其中 条件是
> > > `Where A.event_time < B.event_time + 30 s and A.event_time >
> B.event_time
> > > - 30 s ` 吧
> > > 可以参考以下例子[1],看下有木有写错。
> > > [1]
> > >
> >
> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183
> > >
> > >
> > > Best,
> > > Hailong
> > > 在 2020-12-07 13:10:02,"macia kk"  写道:
> > > >Hi, 各位大佬
> > > >
> > > >  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的
> > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 order
> > item
> > > >信息,所以 我用:
> > > >
> > > > SELECT *
> > > > FROM A
> > > > LEFT OUT JOIN B
> > > > ON order_id
> > > > Where A.event_time > B.event_time + 30 s
> > > > A.event_time > B.event_time - 30 s
> > > >
> > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark
> > > Structural
> > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
> > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题?
> > >
> >
>


Re: 关于 stream-stream Interval Join 的问题

2020-12-07 文章 macia kk
不好意思,我上边贴错了

SELECT *
 FROM A
 LEFT OUT JOIN B
 ON order_id
 Where A.event_time > B.event_time -  30 s
 A.event_time > B.event_time + 30 s

event_time 是 Time Attributes 设置的 event_time

这样是没有输出的。



interval join 左右表在 state 中是缓存多久的?






hailongwang <18868816...@163.com> 于2020年12月7日周一 下午8:05写道:

> Hi,
> 其中 条件是
> `Where A.event_time < B.event_time + 30 s and A.event_time > B.event_time
> - 30 s ` 吧
> 可以参考以下例子[1],看下有木有写错。
> [1]
> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183
>
>
> Best,
> Hailong
> 在 2020-12-07 13:10:02,"macia kk"  写道:
> >Hi, 各位大佬
> >
> >  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的
> >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 order item
> >信息,所以 我用:
> >
> > SELECT *
> > FROM A
> > LEFT OUT JOIN B
> > ON order_id
> > Where A.event_time > B.event_time + 30 s
> > A.event_time > B.event_time - 30 s
> >
> >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark
> Structural
> >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
> >所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题?
>


关于 stream-stream Interval Join 的问题

2020-12-06 文章 macia kk
Hi, 各位大佬

  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的
Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是 order item
信息,所以 我用:

 SELECT *
 FROM A
 LEFT OUT JOIN B
 ON order_id
 Where A.event_time > B.event_time + 30 s
 A.event_time > B.event_time - 30 s

我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark Structural
Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题?


Re: 关于去重(Deduplication)

2020-11-15 文章 macia kk
好的,明白了,谢谢

Jark Wu  于2020年11月16日周一 上午10:27写道:

> 关于2, 你理解反了。性能上 deduplicate with first row 比 first_value 更优。 因为deduplicate
> with first row 在 state 里面只存了 key,value 只用了一个空字节来表示。
>
> On Sun, 15 Nov 2020 at 21:47, macia kk  wrote:
>
> > 感谢 Jark 回复, 一直有看你的博客,收益匪浅。
> >
> > 关于2,性能上是 first_value 更优,因为只保存了 key 和 对应的 value,而窗口函数保留了整行数据?
> >
> >
> >
> > Jark Wu  于2020年11月15日周日 下午8:45写道:
> >
> > > 主要两个区别:
> > > 1. 在语义上,deduplicate 是整行去重,而 first_value, last_value 是列去重。比如 deduplicate
> > > with last row,是保留最后一行,如果最后一行中有 null 值,也会保留。而 last_value 是保留该列的最后非 null
> 值。
> > > 2. 性能上 deduplicate 更优,比如 first row, 只保存了 key 的state信息。
> > >
> > > Best,
> > > Jark
> > >
> > > On Sun, 15 Nov 2020 at 19:23, macia kk  wrote:
> > >
> > > > 各位大佬:
> > > >
> > > >   我看文档上建议使用的去重方式是用窗口函数
> > > > <
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
> > > > >
> > > >
> > > > SELECT [column_list]FROM (
> > > >SELECT [column_list],
> > > >  ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
> > > >ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
> > > >FROM table_name)WHERE rownum <= N [AND conditions]
> > > >
> > > >
> > > > 但是我看 Flink SQL 里还有个 first_value, laste_value,也能实现同样的目标。
> > > > 请问这两者有什么区别吗,尤其是在 watermark  以及状态管理上?
> > > >
> > >
> >
>


Re: 关于去重(Deduplication)

2020-11-15 文章 macia kk
感谢 Jark 回复, 一直有看你的博客,收益匪浅。

关于2,性能上是 first_value 更优,因为只保存了 key 和 对应的 value,而窗口函数保留了整行数据?



Jark Wu  于2020年11月15日周日 下午8:45写道:

> 主要两个区别:
> 1. 在语义上,deduplicate 是整行去重,而 first_value, last_value 是列去重。比如 deduplicate
> with last row,是保留最后一行,如果最后一行中有 null 值,也会保留。而 last_value 是保留该列的最后非 null 值。
> 2. 性能上 deduplicate 更优,比如 first row, 只保存了 key 的state信息。
>
> Best,
> Jark
>
> On Sun, 15 Nov 2020 at 19:23, macia kk  wrote:
>
> > 各位大佬:
> >
> >   我看文档上建议使用的去重方式是用窗口函数
> > <
> >
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
> > >
> >
> > SELECT [column_list]FROM (
> >SELECT [column_list],
> >  ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
> >ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
> >FROM table_name)WHERE rownum <= N [AND conditions]
> >
> >
> > 但是我看 Flink SQL 里还有个 first_value, laste_value,也能实现同样的目标。
> > 请问这两者有什么区别吗,尤其是在 watermark  以及状态管理上?
> >
>


关于去重(Deduplication)

2020-11-15 文章 macia kk
各位大佬:

  我看文档上建议使用的去重方式是用窗口函数


SELECT [column_list]FROM (
   SELECT [column_list],
 ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
   ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM table_name)WHERE rownum <= N [AND conditions]


但是我看 Flink SQL 里还有个 first_value, laste_value,也能实现同样的目标。
请问这两者有什么区别吗,尤其是在 watermark  以及状态管理上?


Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk


不好意思,没注意到

感谢

Benchao Li  于2020年6月7日周日 下午6:47写道:

> FROM_UNIXTIME接收的是秒的时间戳,你的maxwell_ts看起来是微秒吧,应该/100吧
>
> macia kk  于2020年6月7日周日 下午6:15写道:
>
>> 打印出来是这样的
>>
>> "maxwell_ts":1591518126072000,"ts_watermark":"52403-03-16 00:21:12"
>>
>> macia kk  于2020年6月7日周日 下午5:53写道:
>>
>>> 再次感谢,成功了,这个问题困扰我两周了。还有个问题,能麻烦帮我看下嘛
>>>
>>> 如上,这里定义的 ts_watermark, 想用他们做 watermakr,但是他的返回结果是 null, 我的 maxwell_ts 是
>>> millseconds,我看了函数的使用方法,没想到哪里有问题
>>>
>>> val bsSettings = 
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>> val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) val
>>> sourceTable = """CREATE TABLE my_kafak_source ( | `table` varchar, |
>>> `database` varchar, | `data` row < transaction_id varchar, | user_id int, |
>>> amount int, | >, | maxwell_ts bigint, | ts_watermark as
>>> TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) |) WITH ( |)""".stripMargin
>>>
>>> Leonard Xu  于2020年6月7日周日 下午5:51写道:
>>>
>>>> Hi,
>>>> 1.10确实有这个bug,
>>>> 这个问题在1.10.1中已经修复了,你可以使用升级到1.10.1,Benchao之前修了一个计算列相关的,我刚刚看不使用计算列也会有这问题,应该是在[1]中
>>>> jark wu 修复的。
>>>>
>>>> Best,
>>>> Leonard Xu
>>>> [1] https://issues.apache.org/jira/browse/FLINK-16526 <
>>>> https://issues.apache.org/jira/browse/FLINK-16526>
>>>>
>>>> > 在 2020年6月7日,15:32,macia kk  写道:
>>>> >
>>>> > 各位大佬,
>>>> >
>>>> > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢
>>>> >
>>>> >val bsSettings =
>>>> >
>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>> >val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
>>>> >val sourceTable = """CREATE TABLE my_kafak_source (
>>>> >|`table` varchar,
>>>> >|`database` varchar,
>>>> >|`data` row < transaction_id varchar,
>>>> >|   user_id int,
>>>> >|   amount int,
>>>> >|>,
>>>> >|maxwell_ts bigint,
>>>> >|ts_watermark as
>>>> > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
>>>> >|) WITH (
>>>> >|)""".stripMargin
>>>> >
>>>> > error
>>>> >
>>>> > The program finished with the following exception:
>>>> >
>>>> > org.apache.flink.client.program.ProgramInvocationException: The main
>>>> > method caused an error: SQL parse failed. Encountered "table" at line
>>>> > 1, column 8.
>>>> > Was expecting one of:
>>>> >"ABS" ...
>>>> >"ALL" ...
>>>> >"ARRAY" ...
>>>> >"AVG" ...
>>>> >"CARDINALITY" ...
>>>> >"CASE" ...
>>>> >"CAST" ...
>>>> >"CEIL" ...
>>>>
>>>>
>
> --
>
> Best,
> Benchao Li
>


Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
打印出来是这样的

"maxwell_ts":1591518126072000,"ts_watermark":"52403-03-16 00:21:12"

macia kk  于2020年6月7日周日 下午5:53写道:

> 再次感谢,成功了,这个问题困扰我两周了。还有个问题,能麻烦帮我看下嘛
>
> 如上,这里定义的 ts_watermark, 想用他们做 watermakr,但是他的返回结果是 null, 我的 maxwell_ts 是
> millseconds,我看了函数的使用方法,没想到哪里有问题
>
> val bsSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) val
> sourceTable = """CREATE TABLE my_kafak_source ( | `table` varchar, |
> `database` varchar, | `data` row < transaction_id varchar, | user_id int, |
> amount int, | >, | maxwell_ts bigint, | ts_watermark as
> TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) |) WITH ( |)""".stripMargin
>
> Leonard Xu  于2020年6月7日周日 下午5:51写道:
>
>> Hi,
>> 1.10确实有这个bug,
>> 这个问题在1.10.1中已经修复了,你可以使用升级到1.10.1,Benchao之前修了一个计算列相关的,我刚刚看不使用计算列也会有这问题,应该是在[1]中
>> jark wu 修复的。
>>
>> Best,
>> Leonard Xu
>> [1] https://issues.apache.org/jira/browse/FLINK-16526 <
>> https://issues.apache.org/jira/browse/FLINK-16526>
>>
>> > 在 2020年6月7日,15:32,macia kk  写道:
>> >
>> > 各位大佬,
>> >
>> > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢
>> >
>> >val bsSettings =
>> >
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> >val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
>> >val sourceTable = """CREATE TABLE my_kafak_source (
>> >|`table` varchar,
>> >|`database` varchar,
>> >|`data` row < transaction_id varchar,
>> >|   user_id int,
>> >|   amount int,
>> >|>,
>> >|maxwell_ts bigint,
>> >|ts_watermark as
>> > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
>> >|) WITH (
>> >|)""".stripMargin
>> >
>> > error
>> >
>> > The program finished with the following exception:
>> >
>> > org.apache.flink.client.program.ProgramInvocationException: The main
>> > method caused an error: SQL parse failed. Encountered "table" at line
>> > 1, column 8.
>> > Was expecting one of:
>> >"ABS" ...
>> >"ALL" ...
>> >"ARRAY" ...
>> >"AVG" ...
>> >"CARDINALITY" ...
>> >"CASE" ...
>> >"CAST" ...
>> >"CEIL" ...
>>
>>


Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
再次感谢,成功了,这个问题困扰我两周了。还有个问题,能麻烦帮我看下嘛

如上,这里定义的 ts_watermark, 想用他们做 watermakr,但是他的返回结果是 null, 我的 maxwell_ts 是
millseconds,我看了函数的使用方法,没想到哪里有问题

val bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) val
sourceTable = """CREATE TABLE my_kafak_source ( | `table` varchar, |
`database` varchar, | `data` row < transaction_id varchar, | user_id int, |
amount int, | >, | maxwell_ts bigint, | ts_watermark as
TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000)) |) WITH ( |)""".stripMargin

Leonard Xu  于2020年6月7日周日 下午5:51写道:

> Hi,
> 1.10确实有这个bug,
> 这个问题在1.10.1中已经修复了,你可以使用升级到1.10.1,Benchao之前修了一个计算列相关的,我刚刚看不使用计算列也会有这问题,应该是在[1]中
> jark wu 修复的。
>
> Best,
> Leonard Xu
> [1] https://issues.apache.org/jira/browse/FLINK-16526 <
> https://issues.apache.org/jira/browse/FLINK-16526>
>
> > 在 2020年6月7日,15:32,macia kk  写道:
> >
> > 各位大佬,
> >
> > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢
> >
> >val bsSettings =
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> >val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
> >val sourceTable = """CREATE TABLE my_kafak_source (
> >|`table` varchar,
> >|`database` varchar,
> >|`data` row < transaction_id varchar,
> >|   user_id int,
> >|   amount int,
> >|>,
> >|maxwell_ts bigint,
> >|ts_watermark as
> > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
> >|) WITH (
> >|)""".stripMargin
> >
> > error
> >
> > The program finished with the following exception:
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main
> > method caused an error: SQL parse failed. Encountered "table" at line
> > 1, column 8.
> > Was expecting one of:
> >"ABS" ...
> >"ALL" ...
> >"ARRAY" ...
> >"AVG" ...
> >"CARDINALITY" ...
> >"CASE" ...
> >"CAST" ...
> >"CEIL" ...
>
>


Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
好的,感谢

Benchao Li  于2020年6月7日周日 下午4:04写道:

> 1.10还是有bug的,1.10.1已经修复[1]了。可以尝试下1.10.1
>
> [1] https://issues.apache.org/jira/browse/FLINK-16068
>
> macia kk  于2020年6月7日周日 下午3:51写道:
>
> > 1.10
> >
> > 1048262223 <1048262...@qq.com> 于2020年6月7日周日 下午3:48写道:
> >
> > > Hi
> > >
> > >
> > > 这个好像有同学在群里提到过,也会出现ddl关键字冲突问题,他是通过将版本升到了1.10解决的,能提供下你是用的版本吗?
> > >
> > >
> > > Best,
> > > Yichao Yang
> > >
> > >
> > >
> > > 发自我的iPhone
> > >
> > >
> > > -- 原始邮件 --
> > > 发件人: macia kk  > > 发送时间: 2020年6月7日 15:42
> > > 收件人: user-zh  > > 主题: 回复:Flink SQL create table 关键字 table 加反引号 解析失败
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
1.10

1048262223 <1048262...@qq.com> 于2020年6月7日周日 下午3:48写道:

> Hi
>
>
> 这个好像有同学在群里提到过,也会出现ddl关键字冲突问题,他是通过将版本升到了1.10解决的,能提供下你是用的版本吗?
>
>
> Best,
> Yichao Yang
>
>
>
> 发自我的iPhone
>
>
> ------ 原始邮件 --
> 发件人: macia kk  发送时间: 2020年6月7日 15:42
> 收件人: user-zh  主题: 回复:Flink SQL create table 关键字 table 加反引号 解析失败


Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
val bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
val sourceTable = """CREATE TABLE my_kafak_source (
|`table` varchar,
|`database` varchar,
|`data` row < transaction_id varchar,
|   user_id int,
|   amount int,
|   reference_id varchar,
|   status int,
|   transaction_type int,
|   merchant_id int,
|   update_time int,
|   create_time int
|>,
|maxwell_ts bigint,
|ts_watermark as
TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
|) WITH (
|'connector.type' = 'kafka',
|'connector.version' = '0.11',
|'connector.topic' = 'xx',
|'connector.startup-mode' = 'latest-offset',
|'update-mode' = 'append',
|'format.type' = 'json',
|'format.derive-schema' = 'true'
|)""".stripMargin

val dstTable = """CREATE TABLE my_kafak_dst (
 | transaction_type int,
 | transaction_id VARCHAR,
 | reference_id VARCHAR,
 | merchant_id int,
 | status int,
 | create_time int,
 | maxwell_ts bigint,
 | ts_watermark TIMESTAMP(3)
 |) WITH (
 |'connector.type' = 'kafka',
 |'connector.version' = '0.11',
 |'connector.topic' = 'uu',
 |'update-mode' = 'append',
 |'format.type' = 'json',
 |'format.derive-schema' = 'true'
 |)""".stripMargin

bsTableEnv.sqlUpdate(sourceTable)
bsTableEnv.sqlUpdate(dstTable)


val main_table = bsTableEnv.sqlQuery("SELECT transaction_type,
transaction_id, reference_id, merchant_id, status, create_time,
maxwell_ts, ts_watermark FROM my_kafak_source")
bsTableEnv.createTemporaryView("main_table", main_table)

    bsTableEnv.sqlUpdate("INSERT INTO my_kafak_dst SELECT * FROM main_table")


macia kk  于2020年6月7日周日 下午3:45写道:

> ```scala
> val bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
> val sourceTable = """CREATE TABLE my_kafak_source (
> |`table` varchar,
> |`database` varchar,
> |`data` row < transaction_id varchar,
> |   user_id int,
> |   amount int,
> |   reference_id varchar,
> |   status int,
> |   transaction_type int,
> |   merchant_id int,
> |   update_time int,
> |   create_time int
> |>,
> |maxwell_ts bigint,
> |ts_watermark as
> TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
> |) WITH (
> |'connector.type' = 'kafka',
> |'connector.version' = '0.11',
> |'connector.topic' = 'xx',
> |'connector.startup-mode' = 'latest-offset',
> |'update-mode' = 'append',
> |'format.type' = 'json',
> |'format.derive-schema' = 'true'
> |)""".stripMargin
>
> val dstTable = """CREATE TABLE my_kafak_dst (
>  | transaction_type int,
>  | transaction_id VARCHAR,
>  | reference_id VARCHAR,
>  | merchant_id int,
>  | status int,
>  | create_time int,
>  | maxwell_ts bigint,
>  | ts_

Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
```scala
val bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
val sourceTable = """CREATE TABLE my_kafak_source (
|`table` varchar,
|`database` varchar,
|`data` row < transaction_id varchar,
|   user_id int,
|   amount int,
|   reference_id varchar,
|   status int,
|   transaction_type int,
|   merchant_id int,
|   update_time int,
|   create_time int
|>,
|maxwell_ts bigint,
|ts_watermark as
TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
|) WITH (
|'connector.type' = 'kafka',
|'connector.version' = '0.11',
|'connector.topic' = 'xx',
|'connector.startup-mode' = 'latest-offset',
|'update-mode' = 'append',
|'format.type' = 'json',
|'format.derive-schema' = 'true'
|)""".stripMargin

val dstTable = """CREATE TABLE my_kafak_dst (
 | transaction_type int,
 | transaction_id VARCHAR,
 | reference_id VARCHAR,
 | merchant_id int,
 | status int,
 | create_time int,
 | maxwell_ts bigint,
 | ts_watermark TIMESTAMP(3)
 |) WITH (
 |'connector.type' = 'kafka',
 |'connector.version' = '0.11',
 |'connector.topic' = 'uu',
 |'update-mode' = 'append',
 |'format.type' = 'json',
 |'format.derive-schema' = 'true'
 |)""".stripMargin


bsTableEnv.sqlUpdate(sourceTable)
bsTableEnv.sqlUpdate(dstTable)


val main_table = bsTableEnv.sqlQuery("SELECT transaction_type,
transaction_id, reference_id, merchant_id, status, create_time, maxwell_ts,
ts_watermark FROM my_kafak_source")
bsTableEnv.createTemporaryView("main_table", main_table)

    bsTableEnv.sqlUpdate("INSERT INTO my_kafak_dst SELECT * FROM
main_table")
```

macia kk  于2020年6月7日周日 下午3:41写道:

> 下边的代码里,没有用 `table` 字段,我现在只要把 table, database 这两行去掉,可以能跑,只要加上,就会挂
>
> Benchao Li  于2020年6月7日周日 下午3:38写道:
>
>> Hi,
>> 看起来你的DDL应该没有太大问题,你可以把代码贴的再完整一点么,可能跟下面的代码有关系。
>>
>> macia kk  于2020年6月7日周日 下午3:33写道:
>>
>> > 各位大佬,
>> >
>> > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢
>> >
>> > val bsSettings =
>> >
>> >
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> > val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
>> > val sourceTable = """CREATE TABLE my_kafak_source (
>> > |`table` varchar,
>> > |`database` varchar,
>> > |`data` row < transaction_id varchar,
>> > |   user_id int,
>> > |   amount int,
>> > |>,
>> > |maxwell_ts bigint,
>> > |ts_watermark as
>> > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
>> > |) WITH (
>> > |)""".stripMargin
>> >
>> > error
>> >
>> >  The program finished with the following exception:
>> >
>> > org.apache.flink.client.program.ProgramInvocationException: The main
>> > method caused an error: SQL parse failed. Encountered "table" at line
>> > 1, column 8.
>> > Was expecting one of:
>> > "ABS" ...
>> > "ALL" ...
>> > "ARRAY" ...
>> > "AVG" ...
>> > "CARDINALITY" ...
>> > "CASE" ...
>> > "CAST" ...
>> > "CEIL" ...
>> >
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>


Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
下边的代码里,没有用 `table` 字段,我现在只要把 table, database 这两行去掉,可以能跑,只要加上,就会挂

Benchao Li  于2020年6月7日周日 下午3:38写道:

> Hi,
> 看起来你的DDL应该没有太大问题,你可以把代码贴的再完整一点么,可能跟下面的代码有关系。
>
> macia kk  于2020年6月7日周日 下午3:33写道:
>
> > 各位大佬,
> >
> > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢
> >
> > val bsSettings =
> >
> >
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> > val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
> > val sourceTable = """CREATE TABLE my_kafak_source (
> > |`table` varchar,
> > |`database` varchar,
> > |`data` row < transaction_id varchar,
> > |   user_id int,
> > |   amount int,
> > |>,
> > |maxwell_ts bigint,
> > |ts_watermark as
> > TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
> > |) WITH (
> > |)""".stripMargin
> >
> > error
> >
> >  The program finished with the following exception:
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main
> > method caused an error: SQL parse failed. Encountered "table" at line
> > 1, column 8.
> > Was expecting one of:
> > "ABS" ...
> > "ALL" ...
> > "ARRAY" ...
> > "AVG" ...
> > "CARDINALITY" ...
> > "CASE" ...
> > "CAST" ...
> > "CEIL" ...
> >
>
>
> --
>
> Best,
> Benchao Li
>


Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
各位大佬,

我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢

val bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
val sourceTable = """CREATE TABLE my_kafak_source (
|`table` varchar,
|`database` varchar,
|`data` row < transaction_id varchar,
|   user_id int,
|   amount int,
|>,
|maxwell_ts bigint,
|ts_watermark as
TO_TIMESTAMP(FROM_UNIXTIME(maxwell_ts/1000))
|) WITH (
|)""".stripMargin

error

 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: SQL parse failed. Encountered "table" at line
1, column 8.
Was expecting one of:
"ABS" ...
"ALL" ...
"ARRAY" ...
"AVG" ...
"CARDINALITY" ...
"CASE" ...
"CAST" ...
"CEIL" ...


Re: Table API 有 类似于 SQL API 的 TO_TIMESTAMP 函数吗

2020-05-27 文章 macia kk
Get it, thanks

Leonard Xu  于2020年5月28日周四 上午10:34写道:

>
> > 我原以为 Table API 会比  SQL API 自由度大一些,毕竟可以代码层面定制
>
>
> Table API 确实是要灵活些,只是最近两个版本中,SQL模块社区的主要精力在搞DDL,DDL可以降低用户门槛,提升易用性,Table
> API的迭代稍微慢了些,
> 我理解1.12中应该会完善Descriptor API,这也是比较重要的用户入口,目前建议优先使用DDL。
>
> Best,
> Leonard Xu
>
> > 在 2020年5月28日,10:23,macia kk  写道:
> >
> > 好的,谢谢,
> >
> > 放弃治疗,我先尝试DDL,先把 job 跑通,我原以为 Table API 会比  SQL API 自由度大一些,毕竟可以代码层面定制
> >
> > Leonard Xu  于2020年5月28日周四 上午10:17写道:
> >
> >> Hi,
> >> 我看了下Descriptor的代码,如果数据源是Kafka应该有地方绕,很绕, 你可以试下,建议使用DDL。
> >>
> >> Best
> >> Leonard Xu
> >>
> >> [1]
> >>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java#L111
> >> <
> >>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java#L111
> >>>
> >> [2]
> >>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java#L156
> >> <
> >>
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java#L156
> >>>
> >>
> >>
> >>
> >>> 在 2020年5月28日,00:45,macia kk  写道:
> >>>
> >>> Hi 各位大佬
> >>>
> >>>  .field("event_time", TIMESTAMP()).rowtime(
> >>> new Rowtime()
> >>> .timestampsFromField("maxwell_ts")
> >>> .watermarksPeriodicBounded(6)
> >>>   )
> >>>
> >>>
> >>> 我这个 maxwell_ts 是 milliseconds ,直接这么用会报错:
> >>>
> >>> Type TIMESTAMP(6) of table field ‘event_time’ does not match with the
> >>> physical type
> >>>
> >>>
> >>> 有类似
> >>>
> >>> event_time as to_timestamp(maxwell_ts)
> >>>
> >>>
> >>> 这么的操作码?
> >>
> >>
>
>


Re: Table API 有 类似于 SQL API 的 TO_TIMESTAMP 函数吗

2020-05-27 文章 macia kk
好的,谢谢,

放弃治疗,我先尝试DDL,先把 job 跑通,我原以为 Table API 会比  SQL API 自由度大一些,毕竟可以代码层面定制

Leonard Xu  于2020年5月28日周四 上午10:17写道:

> Hi,
> 我看了下Descriptor的代码,如果数据源是Kafka应该有地方绕,很绕, 你可以试下,建议使用DDL。
>
> Best
> Leonard Xu
>
> [1]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java#L111
> <
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java#L111
> >
> [2]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java#L156
> <
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java#L156
> >
>
>
>
> > 在 2020年5月28日,00:45,macia kk  写道:
> >
> > Hi 各位大佬
> >
> >   .field("event_time", TIMESTAMP()).rowtime(
> >  new Rowtime()
> >  .timestampsFromField("maxwell_ts")
> >  .watermarksPeriodicBounded(6)
> >)
> >
> >
> > 我这个 maxwell_ts 是 milliseconds ,直接这么用会报错:
> >
> > Type TIMESTAMP(6) of table field ‘event_time’ does not match with the
> > physical type
> >
> >
> > 有类似
> >
> > event_time as to_timestamp(maxwell_ts)
> >
> >
> > 这么的操作码?
>
>


Table API 有 类似于 SQL API 的 TO_TIMESTAMP 函数吗

2020-05-27 文章 macia kk
Hi 各位大佬

   .field("event_time", TIMESTAMP()).rowtime(
  new Rowtime()
  .timestampsFromField("maxwell_ts")
  .watermarksPeriodicBounded(6)
)


我这个 maxwell_ts 是 milliseconds ,直接这么用会报错:

Type TIMESTAMP(6) of table field ‘event_time’ does not match with the
physical type


有类似

event_time as to_timestamp(maxwell_ts)


这么的操作码?


Re: Flink SQL FIRST_VALUE 是append 模式吗?

2020-05-27 文章 macia kk
感谢 Benchao

原来如此,我之前用的是 spark structured Streming, 可能模式跟Flink不太一样,它会缓存没有 join 到的数据,直到
watermark 结束之后才 emit

Flink 新的数据进来跟右边的缓存数据 join, 没有 join 到先发 null,但是这个数据还会缓存, 后边右边如果有新的数据可以 join
到左边已经发出去的这条数据,会产生 retract. (我的理解)


那我这种情况有别的解决方案吗?因为我的 Sink (Kafka) 下游是 Druid, 数据会直接 index 后作为查询,不支持 retract
场景。




Benchao Li  于2020年5月27日周三 下午6:32写道:

> 产生retract消息的场景有很多,暂时还没有一篇文档来介绍这个,我大概列举几个典型的场景吧:
> 1. regular group by,因为聚合的结果是实时下发的,所以更新了聚合结果就会retract老的聚合结果
> 2. 非inner/anti 的join(不包括time interval
> join),这种原因是如果当前join不上,会发送null,但是后面可能对面可能又会有数据进来,导致下发的null需要被retract
> 3. 取latest的去重
> 4. topn,排名变化需要更新结果
> 5. window + emit,提前emit的结果需要retract来更新
>
> macia kk  于2020年5月27日周三 下午6:19写道:
>
> > 感谢 Benchao 和  Leonard 的回复
> >
> > 我理解错误的地方在于,我以为 Left join 是 append 模式的,对于左边的数据来说,join 上一条就会 emit
> > 出去,但是什么情况下会产生 react 消息呢?
> >
> > Leonard Xu  于2020年5月27日周三 下午3:50写道:
> >
> > > Hi
> > > Kafka 的 sink 是 appendStreamSink,所以query只能是append的, 因为Kafka 的
> > > sink无法处理retract消息。
> > > 你SQL中的 left join 是个双流join,会产生retract消息,编译时发现query和sink不匹配会报你贴的错误,
> > >
> > > 通常看到的业务场景是写入kafka的数据是不用去更新的,kafka主要用作一个消息队列,如果是需要更新
> > > 结果的数据,一般是存到db里,jdbc,hbase, es 这几个 connector 的sink都是upsertSink。
> > >
> > >
> > > 祝好,
> > > Leonard Xu
> > >
> > >
> > >
> > > > 在 2020年5月27日,10:23,Benchao Li  写道:
> > > >
> > > > 而且你的SQL里面有一部分是会产生retract的:
> > > > 这里用的是regular left join,这种join类型是会产生retract结果的。
> > > >
> > > >   | FROM (
> > > >   |SELECT `database`, `table`,
> > > > `transaction_type`, `transaction_id`,
> > > >   |`merchant_id`, `event_time`, `status`,
> > > > `reference_id`
> > > >   |FROM main_table
> > > >   |LEFT JOIN merchant_table
> > > >   |ON main_table.reference_id =
> > > > merchant_table.transaction_sn
> > > >   | )
> > > >
> > > >
> > > > macia kk  于2020年5月27日周三 上午1:20写道:
> > > >
> > > >> Hi,各位大佬,谁有空帮我看下这个问题
> > > >>
> > > >> Source: Kafka
> > > >> SinkL Kafka
> > > >>
> > > >> 主要逻辑是 *main_table*  left join *merchatn_table* 以后,使用 FIRST_VALUE
> > 函数取第一条
> > > >> transaction_id,我这个模式应该是 append 模式,但是结果好像不是
> > > >>
> > > >> Error
> > > >>
> > > >> org.apache.flink.client.program.ProgramInvocationException: The main
> > > >> method caused an error: AppendStreamTableSink requires that Table
> has
> > > >> only insert changes.
> > > >>at
> > > >>
> > >
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> > > >>at
> > > >>
> > >
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> > > >>at
> > > >>
> > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> Code
> > > >>
> > > >>   val main_column = "`database`, `table`, `transaction_type`,
> > > >> `transaction_id`, `reference_id`, `merchant_id`, `event_time`,
> > > >> `status`"
> > > >>val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM
> > > >> Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ")
> > > >>bsTableEnv.createTemporaryView("main_table", main_table)
> > > >>
> > > >>val merchant_column = "transaction_sn, user_id"
> > > >>val merchant_table = bsTableEnv.sqlQuery(s"SELECT
> $merchant_column
> > > >> FROM Keystats_airpay_consumer WHERE `table` LIKE
> > > >> 'wallet_id_merchant_db%' ")
> > > >>bsTableEnv.createTemporaryView("merchant_table", merchant_table)
> > > >>
> > > >>bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer
> > > >>   | SELECT `database`, `table`,
> > > >> `transaction_type`,
> > > >>   |   `merchant_id`, `event_time`, `status`,
> > > >>   |FIRST_VALUE(`transaction_id`) OVER
> > > >> (PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED
> > > >> PRECEDING)
> > > >>   | FROM (
> > > >>   |SELECT `database`, `table`,
> > > >> `transaction_type`, `transaction_id`,
> > > >>   |`merchant_id`, `event_time`,
> `status`,
> > > >> `reference_id`
> > > >>   |FROM main_table
> > > >>   |LEFT JOIN merchant_table
> > > >>   |ON main_table.reference_id =
> > > >> merchant_table.transaction_sn
> > > >>   | )
> > > >>   |""".stripMargin)
> > > >>
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > >
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: Flink SQL FIRST_VALUE 是append 模式吗?

2020-05-27 文章 macia kk
感谢 Benchao 和  Leonard 的回复

我理解错误的地方在于,我以为 Left join 是 append 模式的,对于左边的数据来说,join 上一条就会 emit
出去,但是什么情况下会产生 react 消息呢?

Leonard Xu  于2020年5月27日周三 下午3:50写道:

> Hi
> Kafka 的 sink 是 appendStreamSink,所以query只能是append的, 因为Kafka 的
> sink无法处理retract消息。
> 你SQL中的 left join 是个双流join,会产生retract消息,编译时发现query和sink不匹配会报你贴的错误,
>
> 通常看到的业务场景是写入kafka的数据是不用去更新的,kafka主要用作一个消息队列,如果是需要更新
> 结果的数据,一般是存到db里,jdbc,hbase, es 这几个 connector 的sink都是upsertSink。
>
>
> 祝好,
> Leonard Xu
>
>
>
> > 在 2020年5月27日,10:23,Benchao Li  写道:
> >
> > 而且你的SQL里面有一部分是会产生retract的:
> > 这里用的是regular left join,这种join类型是会产生retract结果的。
> >
> >   | FROM (
> >   |SELECT `database`, `table`,
> > `transaction_type`, `transaction_id`,
> >   |`merchant_id`, `event_time`, `status`,
> > `reference_id`
> >   |FROM main_table
> >   |LEFT JOIN merchant_table
> >   |    ON main_table.reference_id =
> > merchant_table.transaction_sn
> >   | )
> >
> >
> > macia kk  于2020年5月27日周三 上午1:20写道:
> >
> >> Hi,各位大佬,谁有空帮我看下这个问题
> >>
> >> Source: Kafka
> >> SinkL Kafka
> >>
> >> 主要逻辑是 *main_table*  left join *merchatn_table* 以后,使用 FIRST_VALUE 函数取第一条
> >> transaction_id,我这个模式应该是 append 模式,但是结果好像不是
> >>
> >> Error
> >>
> >> org.apache.flink.client.program.ProgramInvocationException: The main
> >> method caused an error: AppendStreamTableSink requires that Table has
> >> only insert changes.
> >>at
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> >>at
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> >>at
> >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> Code
> >>
> >>   val main_column = "`database`, `table`, `transaction_type`,
> >> `transaction_id`, `reference_id`, `merchant_id`, `event_time`,
> >> `status`"
> >>val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM
> >> Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ")
> >>bsTableEnv.createTemporaryView("main_table", main_table)
> >>
> >>val merchant_column = "transaction_sn, user_id"
> >>val merchant_table = bsTableEnv.sqlQuery(s"SELECT $merchant_column
> >> FROM Keystats_airpay_consumer WHERE `table` LIKE
> >> 'wallet_id_merchant_db%' ")
> >>bsTableEnv.createTemporaryView("merchant_table", merchant_table)
> >>
> >>bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer
> >>   | SELECT `database`, `table`,
> >> `transaction_type`,
> >>   |   `merchant_id`, `event_time`, `status`,
> >>   |FIRST_VALUE(`transaction_id`) OVER
> >> (PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED
> >> PRECEDING)
> >>   | FROM (
> >>   |SELECT `database`, `table`,
> >> `transaction_type`, `transaction_id`,
> >>   |`merchant_id`, `event_time`, `status`,
> >> `reference_id`
> >>   |FROM main_table
> >>   |LEFT JOIN merchant_table
> >>   |ON main_table.reference_id =
> >> merchant_table.transaction_sn
> >>   | )
> >>   |""".stripMargin)
> >>
> >
> >
> > --
> >
> > Best,
> > Benchao Li
>
>


Flink SQL FIRST_VALUE 是append 模式吗?

2020-05-26 文章 macia kk
Hi,各位大佬,谁有空帮我看下这个问题

Source: Kafka
SinkL Kafka

主要逻辑是 *main_table*  left join *merchatn_table* 以后,使用 FIRST_VALUE 函数取第一条
transaction_id,我这个模式应该是 append 模式,但是结果好像不是

Error

org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: AppendStreamTableSink requires that Table has
only insert changes.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)








Code

   val main_column = "`database`, `table`, `transaction_type`,
`transaction_id`, `reference_id`, `merchant_id`, `event_time`,
`status`"
val main_table = bsTableEnv.sqlQuery(s"SELECT $main_column FROM
Keystats_airpay_consumer WHERE `table` LIKE 'transaction_tab%' ")
bsTableEnv.createTemporaryView("main_table", main_table)

val merchant_column = "transaction_sn, user_id"
val merchant_table = bsTableEnv.sqlQuery(s"SELECT $merchant_column
FROM Keystats_airpay_consumer WHERE `table` LIKE
'wallet_id_merchant_db%' ")
bsTableEnv.createTemporaryView("merchant_table", merchant_table)

bsTableEnv.sqlUpdate(""" INSERT INTO Keystats_airpay_producer
   | SELECT `database`, `table`, `transaction_type`,
   |   `merchant_id`, `event_time`, `status`,
   |FIRST_VALUE(`transaction_id`) OVER
(PARTITION BY `transaction_id` ORDER BY PROCTIME() RANGE UNBOUNDED
PRECEDING)
   | FROM (
   |SELECT `database`, `table`,
`transaction_type`, `transaction_id`,
   |`merchant_id`, `event_time`, `status`,
`reference_id`
   |FROM main_table
   |LEFT JOIN merchant_table
   |ON main_table.reference_id =
merchant_table.transaction_sn
   | )
   |""".stripMargin)


Re: Flink SQL 嵌套 nested Json 解析

2020-05-25 文章 macia kk
Flink version: 1.10

Json:

{
"database":"main_db",
"maxwell_ts":1590416550358000,
"table":"transaction_tab",
"data":{
"transaction_sn":"",
"parent_id":0,
"user_id":333,
"amount":555,
"reference_id":"666",
"status":3,
"transaction_type":3,
    "merchant_id":2,
"update_time":1590416550,
"create_time":1590416550
}}


我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame


macia kk  于2020年5月26日周二 上午9:34写道:

> Flink version: 1.10
>
> Json:
> ```j
> {
> "database":"main_db",
> "maxwell_ts":1590416550358000,
> "table":"transaction_tab",
> "data":{
> "transaction_sn":"",
> "parent_id":0,
> "user_id":333,
> "amount":555,
> "reference_id":"666",
> "status":3,
>     "transaction_type":3,
> "merchant_id":2,
> "update_time":1590416550,
> "create_time":1590416550
> }
> }
> ```
>
> 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame
>
>
>
> Leonard Xu  于2020年5月26日周二 上午8:58写道:
>
>> Hi, kk
>>
>> 使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看
>>
>>
>> 祝好,
>> Leonard Xu
>>
>>
>> > 在 2020年5月26日,01:26,macia kk  写道:
>> >
>> > 有哪位大佬帮我看下,谢谢
>> >
>> >
>> > 尝试了很久,还是无法解析嵌套结构的Json
>> >
>> > Error
>> >
>> > Caused by: org.apache.flink.table.api.ValidationException: SQL
>> > validation failed. From line 4, column 9 to line 4, column 31: Column
>> > 'data.transaction_type' not found in any table
>> >at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
>> >at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
>> >at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
>> >at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
>> >at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
>> >at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
>> >at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
>> >at
>> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133)
>> >at
>> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39)
>> >at
>> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala)
>> >at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >at java.lang.reflect.Method.invoke(Method.java:498)
>> >at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>> >
>> >
>> > 嵌套Json 定义的 format 和 schema 如下:
>> >
>> >  .withFormat(new Json()
>> >.jsonSchema(
>> > """{type: 'object',
>> >   |  properties: {
>> >   |  database: {
>> >   |  type: 'string'
>> >   |  },
>> >   |  table: {
>> >   |  type: 'string'
>> >   |  },
>> >   |  maxwell_ts: {
>> >   |  type: 'integer'
>> >   |  },
>> >   |  data: {
>> >   |  type: 'object',
>> >   |  properties :{
>> >   |  reference_id :{
>> >   |  type: 'string'
>> >   |  },
>> >   |  transaction_type :{
>> >   |  type: 'integer'
>> >   |  },
>> &

Re: Flink SQL 嵌套 nested Json 解析

2020-05-25 文章 macia kk
Flink version: 1.10

Json:
```j
{
"database":"main_db",
"maxwell_ts":1590416550358000,
"table":"transaction_tab",
"data":{
"transaction_sn":"",
"parent_id":0,
"user_id":333,
"amount":555,
"reference_id":"666",
"status":3,
"transaction_type":3,
"merchant_id":2,
"update_time":1590416550,
"create_time":1590416550
}
}
```

我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame



Leonard Xu  于2020年5月26日周二 上午8:58写道:

> Hi, kk
>
> 使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看
>
>
> 祝好,
> Leonard Xu
>
>
> > 在 2020年5月26日,01:26,macia kk  写道:
> >
> > 有哪位大佬帮我看下,谢谢
> >
> >
> > 尝试了很久,还是无法解析嵌套结构的Json
> >
> > Error
> >
> > Caused by: org.apache.flink.table.api.ValidationException: SQL
> > validation failed. From line 4, column 9 to line 4, column 31: Column
> > 'data.transaction_type' not found in any table
> >at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
> >at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
> >at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
> >at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
> >at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
> >at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
> >at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
> >at
> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133)
> >at
> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39)
> >at
> com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala)
> >at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >at java.lang.reflect.Method.invoke(Method.java:498)
> >at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> >
> >
> > 嵌套Json 定义的 format 和 schema 如下:
> >
> >  .withFormat(new Json()
> >.jsonSchema(
> > """{type: 'object',
> >   |  properties: {
> >   |  database: {
> >   |  type: 'string'
> >   |  },
> >   |  table: {
> >   |  type: 'string'
> >   |  },
> >   |  maxwell_ts: {
> >   |  type: 'integer'
> >   |  },
> >   |  data: {
> >   |  type: 'object',
> >   |  properties :{
> >   |  reference_id :{
> >   |  type: 'string'
> >   |  },
> >   |  transaction_type :{
> >   |  type: 'integer'
> >   |  },
> >   |  merchant_id :{
> >   |  type: 'integer'
> >   |  },
> >   |  create_time :{
> >   |  type: 'integer'
> >   |  },
> >   |  status :{
> >   |  type: 'integer'
> >   |  }
> >   |  }
> >   |  }
> >   |   }
> >   | }
> > """.stripMargin.replaceAll("\n", " ")
> > )
> >  )
> >  .withSchema(new Schema()
> >.field("table", STRING())
> >.field("database", STRING())
> >.field("data", ROW(FIELD("reference_id",STRING()),
> > FIELD("transaction_type",INT()), FIELD("merchant_id",INT()),
> > FIELD("status",INT(
> >//.field("event_time", BIGINT())
> >//  .from("maxwell_ts")
> >//.rowtime(new Rowtime()
> >//  //.timestampsFromField("ts" * 1000)
> >//  .timestampsFromField("ts")
> >//  .watermarksPeriodicBounded(6)
> >//)
> >  )
> >
> >
> >bsTableEnv.sqlUpdate("""INSERT INTO y
> >   | SELECT `table`, `database`
> >   |`data.reference_id`,
> >   |`data.transaction_type`,
> >   |`data.merchant_id`,
> >   |`data.create_time`,
> >   |`data.status`
> >   | FROM """.stripMargin)
>
>


Flink SQL 嵌套 nested Json 解析

2020-05-25 文章 macia kk
有哪位大佬帮我看下,谢谢


尝试了很久,还是无法解析嵌套结构的Json

Error

Caused by: org.apache.flink.table.api.ValidationException: SQL
validation failed. From line 4, column 9 to line 4, column 31: Column
'data.transaction_type' not found in any table
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)
at 
com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133)
at 
com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39)
at 
com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)


嵌套Json 定义的 format 和 schema 如下:

  .withFormat(new Json()
.jsonSchema(
 """{type: 'object',
   |  properties: {
   |  database: {
   |  type: 'string'
   |  },
   |  table: {
   |  type: 'string'
   |  },
   |  maxwell_ts: {
   |  type: 'integer'
   |  },
   |  data: {
   |  type: 'object',
   |  properties :{
   |  reference_id :{
   |  type: 'string'
   |  },
   |  transaction_type :{
   |  type: 'integer'
   |  },
   |  merchant_id :{
   |  type: 'integer'
   |  },
   |  create_time :{
   |  type: 'integer'
   |  },
   |  status :{
   |  type: 'integer'
   |  }
   |  }
   |  }
   |   }
   | }
 """.stripMargin.replaceAll("\n", " ")
 )
  )
  .withSchema(new Schema()
.field("table", STRING())
.field("database", STRING())
.field("data", ROW(FIELD("reference_id",STRING()),
FIELD("transaction_type",INT()), FIELD("merchant_id",INT()),
FIELD("status",INT(
//.field("event_time", BIGINT())
//  .from("maxwell_ts")
//.rowtime(new Rowtime()
//  //.timestampsFromField("ts" * 1000)
//  .timestampsFromField("ts")
//  .watermarksPeriodicBounded(6)
//)
  )


bsTableEnv.sqlUpdate("""INSERT INTO y
   | SELECT `table`, `database`
   |`data.reference_id`,
   |`data.transaction_type`,
   |`data.merchant_id`,
   |`data.create_time`,
   |`data.status`
   | FROM """.stripMargin)


Re: Could not find a suitable table factory for 'TableSourceFactory'

2020-05-24 文章 macia kk
对了还有个问题,我之前看文档使用 `flink-connector-kafka_2.11`一直都无法运行,后来看别人也遇到这道这个问题,改成
`flink-sql-connector-kafka-0.11`
才可以运行,这两个有什么区别,如果不一样的话,对于 table API 最好标明一下用后者

macia kk  于2020年5月25日周一 上午10:05写道:

> built.sbt
>
> val flinkVersion = "1.10.0"
> libraryDependencies ++= Seq(
>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion ,
>   "org.apache.flink" %% "flink-scala" % flinkVersion,
>   "org.apache.flink" %% "flink-statebackend-rocksdb" % flinkVersion,
>
>   "org.apache.flink" % "flink-table-common" % flinkVersion,
>   "org.apache.flink" %% "flink-table-api-scala" % flinkVersion,
>   "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion,
>   "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion % 
> "provided",
>
>   "org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
>   "org.apache.flink" %% "flink-sql-connector-kafka-0.11" % flinkVersion,  
>   // <<<<<<<<<<<<<<<<<<<<< Kafka 0.11
>   "org.apache.flink" % "flink-json" % flinkVersion
> )
>
>
> Leonard Xu  于2020年5月25日周一 上午9:33写道:
>
>> Hi,
>> 你使用的kafka connector的版本是0.11的吗?报错看起来有点像版本不对
>>
>> Best,
>> Leonard Xu
>>
>> > 在 2020年5月25日,02:44,macia kk  写道:
>> >
>> > 感谢,我在之前的邮件记录中搜索到了答案。我现在遇到了新的问题,卡主了好久:
>> >
>> >  Table API, sink to Kafka
>> >
>> >val result = bsTableEnv.sqlQuery("SELECT * FROM " + "")
>> >
>> >bsTableEnv
>> >  .connect(
>> >new Kafka()
>> >  .version("0.11") // required: valid connector versions are
>> >  .topic("aaa") // required: topic name from which the table is
>> read
>> >  .property("zookeeper.connect", "xxx")
>> >  .property("bootstrap.servers", "yyy")
>> >)
>> >  .withFormat(new Json())
>> >  .withSchema(new Schema()
>> >.field("ts", INT())
>> >.field("table", STRING())
>> >.field("database", STRING())
>> >  )
>> >  .createTemporaryTable("z")
>> >
>> >result.insertInto("m")
>> >
>> > Error:
>> >
>> > java.lang.NoSuchMethodError:
>> >
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(Ljava/lang/String;Lorg/apache/flink/streaming/util/serialization/KeyedSerializationSchema;Ljava/util/Properties;Ljava/util/Optional;)V
>> >at
>> org.apache.flink.streaming.connectors.kafka.Kafka011TableSink.createKafkaProducer(Kafka011TableSink.java:58)
>> >at
>> org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase.consumeDataStream(KafkaTableSinkBase.java:95)
>> >at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:140)
>> >at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>> >at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> >at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>> >at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>> >at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>> >at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> >at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> >at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> >at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> >at
>> org.apache.flink.table.planner.delegation.StreamPlanner.tr

Re: Could not find a suitable table factory for 'TableSourceFactory'

2020-05-24 文章 macia kk
built.sbt

val flinkVersion = "1.10.0"
libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion ,
  "org.apache.flink" %% "flink-scala" % flinkVersion,
  "org.apache.flink" %% "flink-statebackend-rocksdb" % flinkVersion,

  "org.apache.flink" % "flink-table-common" % flinkVersion,
  "org.apache.flink" %% "flink-table-api-scala" % flinkVersion,
  "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion,
  "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion % "provided",

  "org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
  "org.apache.flink" %% "flink-sql-connector-kafka-0.11" %
flinkVersion,// <<<<<<<<<<<<<<<<<<<<< Kafka 0.11
  "org.apache.flink" % "flink-json" % flinkVersion
)


Leonard Xu  于2020年5月25日周一 上午9:33写道:

> Hi,
> 你使用的kafka connector的版本是0.11的吗?报错看起来有点像版本不对
>
> Best,
> Leonard Xu
>
> > 在 2020年5月25日,02:44,macia kk  写道:
> >
> > 感谢,我在之前的邮件记录中搜索到了答案。我现在遇到了新的问题,卡主了好久:
> >
> >  Table API, sink to Kafka
> >
> >val result = bsTableEnv.sqlQuery("SELECT * FROM " + "")
> >
> >bsTableEnv
> >  .connect(
> >new Kafka()
> >  .version("0.11") // required: valid connector versions are
> >  .topic("aaa") // required: topic name from which the table is
> read
> >  .property("zookeeper.connect", "xxx")
> >  .property("bootstrap.servers", "yyy")
> >)
> >  .withFormat(new Json())
> >  .withSchema(new Schema()
> >.field("ts", INT())
> >.field("table", STRING())
> >.field("database", STRING())
> >  )
> >  .createTemporaryTable("z")
> >
> >result.insertInto("m")
> >
> > Error:
> >
> > java.lang.NoSuchMethodError:
> >
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(Ljava/lang/String;Lorg/apache/flink/streaming/util/serialization/KeyedSerializationSchema;Ljava/util/Properties;Ljava/util/Optional;)V
> >at
> org.apache.flink.streaming.connectors.kafka.Kafka011TableSink.createKafkaProducer(Kafka011TableSink.java:58)
> >at
> org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase.consumeDataStream(KafkaTableSinkBase.java:95)
> >at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:140)
> >at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> >at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> >at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> >at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> >at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
> >at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> >at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> >at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
> >at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> >at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
> >at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
> >at
> org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.jav

Re: Could not find a suitable table factory for 'TableSourceFactory'

2020-05-24 文章 macia kk
感谢,我在之前的邮件记录中搜索到了答案。我现在遇到了新的问题,卡主了好久:

  Table API, sink to Kafka

val result = bsTableEnv.sqlQuery("SELECT * FROM " + "")

bsTableEnv
  .connect(
new Kafka()
  .version("0.11") // required: valid connector versions are
  .topic("aaa") // required: topic name from which the table is read
  .property("zookeeper.connect", "xxx")
  .property("bootstrap.servers", "yyy")
)
  .withFormat(new Json())
  .withSchema(new Schema()
.field("ts", INT())
.field("table", STRING())
.field("database", STRING())
  )
  .createTemporaryTable("z")

result.insertInto("m")

Error:

java.lang.NoSuchMethodError:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.(Ljava/lang/String;Lorg/apache/flink/streaming/util/serialization/KeyedSerializationSchema;Ljava/util/Properties;Ljava/util/Optional;)V
at 
org.apache.flink.streaming.connectors.kafka.Kafka011TableSink.createKafkaProducer(Kafka011TableSink.java:58)
at 
org.apache.flink.streaming.connectors.kafka.KafkaTableSinkBase.consumeDataStream(KafkaTableSinkBase.java:95)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:140)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at 
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.insertIntoInternal(TableEnvironmentImpl.java:355)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.insertInto(TableEnvironmentImpl.java:334)
at 
org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
at 
com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:74)
at 
com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:30)
at 
com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1982)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)


麻烦帮我看下,谢谢

Lijie Wang  于2020年5月25日周一 上午12:34写道:

> Hi,我不能加载你邮件中的图片。从下面的报错看起来是因为找不到 match 的connector。可以检查一下 DDL 中的 with 属性是否正确。
>
>
>
> 在 2020-05-25 00:11:

Could not find a suitable table factory for 'TableSourceFactory'

2020-05-24 文章 macia kk
有人帮我看下这个问题吗,谢谢

[image: image.png]
[image: image.png]

org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: findAndCreateTableSource failed.
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
'format.type' expects 'csv', but is 'json'

The following properties are requested:
connector.properties.bootstrap.servers=ip-10-128-145-1.idata-server.shopee.io:9092connector.properties.group.id=keystats_aripay
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=ebisu_wallet_id_db_mirror_v1
connector.type=kafka
format.property-version=1
format.type=json
schema.0.data-type=INTschema.0.name=ts
schema.1.data-type=VARCHAR(2147483647)schema.1.name=table
schema.2.data-type=VARCHAR(2147483647)schema.2.name=database
update-mode=append

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
at 
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
... 39 more