Re: Flink SQL temporal table join with Hive 报错

2021-02-19 文章 Leonard Xu

> 
>  二,维表有有分区,每个分区仅仅包含当天的数据,没有 primary key
> 
>   这种情况因为要 Join 全部的数据,所以还是需要设置 'streaming-source.partition.include' = 
> 'all',但是还是因为没有 primary Key,所以无法 run。
> 
> 现在就是针对第二种情况,因为Hive的维度表不是我维护的,很多人都在用,所以不能修改去加上 primary key,无法进行 join.

第二种情况,hive表不是streaming读的,相当于是一张静态表,每次都是加载最新的全量,所以配置如下参数即可
  'streaming-source.enable' = 'false',  -- option with default value, 
can be ignored.
  'streaming-source.partition.include' = 'all', -- option with default value, 
can be ignored.
  'lookup.join.cache.ttl' = '12 h’
   'streaming-source.partition.include' = ‘all’  是默认值,也可以不配, 参考【1】
> 
> 
> 还有我看文档现在不支持 event time join, 官网的汇率是按照 process time 
> join,但是如果要回溯昨天的数据的时候,其实就会有问题。
> 
> 我看 FLIP-132 
> 
>  有提到 Event Time semantics, 这是以后回支持的吗?

Kafka connector已经支持了 event time join, 但hive表目前还不支持在上面声明watermark,所以还不支持


祝好,
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/hive/hive_read_write/#temporal-join-the-latest-table

> 
> 
> macia kk mailto:pre...@gmail.com>> 于2021年2月8日周一 下午6:53写道:
> Hi. Leonard
> 
>   麻烦帮忙看下 Flink 邮件里的这个问题,卡了我很久了,谢谢



Re: Flink SQL temporal table join with Hive 报错

2021-02-10 文章 macia kk
Hi, Leonard

 我们的业务变得越来越复杂,所以现在需要 Join Hive 维表的情况非常普遍。现在维表分三种情况

 一,维表没有分区,没有 primary key

  这时候 `'streaming-source.partition.include' = 'latest',因为没有
parition,所以 latest 应该加载的就是全部的数据。

 二,维表有有分区,每个分区仅仅包含当天的数据,没有 primary key

  这种情况因为要 Join 全部的数据,所以还是需要设置 'streaming-source.partition.include' =
'all',但是还是因为没有 primary Key,所以无法 run。

三,维表有分区,每个分区包含全量数据,没有 primiary key

  这种情况可以设置,'streaming-source.partition.include' = 'latest',这个是是官网的案例,测试没有问题。


现在就是针对第二种情况,因为Hive的维度表不是我维护的,很多人都在用,所以不能修改去加上 primary key,无法进行 join.




--- BYW---

还有我看文档现在不支持 event time join, 官网的汇率是按照 process time
join,但是如果要回溯昨天的数据的时候,其实就会有问题。

我看 FLIP-132

有提到 Event
Time semantics, 这是以后回支持的吗?



Leonard Xu  于2021年2月10日周三 上午11:36写道:

> Hi,  macia
>
> > 在 2021年2月9日,10:40,macia kk  写道:
> >
> > SELECT *FROM
> >(
> >SELECT  tt.*
> >FROM
> >input_tabe_01 tt
> >FULL OUTER JOIN input_tabe_02 mt
> >ON (mt.transaction_sn = tt.reference_id)
> >and tt.create_time >= mt.create_time + INTERVAL '5' MINUTES
> >and tt.create_time <= mt.create_time - INTERVAL '5' MINUTES
> >WHERE COALESCE(tt.create_time, mt.create_time) is not NULL
> >) lt
> >LEFT JOIN exchange_rate ex
> >/*+
> OPTIONS('streaming-source.enable'='true','streaming-source.partition.include'
> > = 'all') */
> >FOR SYSTEM_TIME AS OF lt.event_time ex ON DATE_FORMAT
> > (lt.event_time, '-MM-dd') = cast(ex.date_id as String)
>
>
> 你说的异常我本地没有复现,异常栈能直接贴下吗?
>
> 另外看你写的是lt.event_time,
> 这个sql的是要做版本表的维表关联吗?目前Hive还不支持指定版本表的,只支持最新分区作为维表或者整个hive表作为维表,
> 两种维表的option你可参考下[1]
>
> 祝好,
> Leonard
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/hive/hive_read_write.html#temporal-join-the-latest-table
>
>
>
>


Re: Flink SQL temporal table join with Hive 报错

2021-02-09 文章 Leonard Xu
Hi,  macia

> 在 2021年2月9日,10:40,macia kk  写道:
> 
> SELECT *FROM
>(
>SELECT  tt.*
>FROM
>input_tabe_01 tt
>FULL OUTER JOIN input_tabe_02 mt
>ON (mt.transaction_sn = tt.reference_id)
>and tt.create_time >= mt.create_time + INTERVAL '5' MINUTES
>and tt.create_time <= mt.create_time - INTERVAL '5' MINUTES
>WHERE COALESCE(tt.create_time, mt.create_time) is not NULL
>) lt
>LEFT JOIN exchange_rate ex
>/*+ 
> OPTIONS('streaming-source.enable'='true','streaming-source.partition.include'
> = 'all') */
>FOR SYSTEM_TIME AS OF lt.event_time ex ON DATE_FORMAT
> (lt.event_time, '-MM-dd') = cast(ex.date_id as String)


你说的异常我本地没有复现,异常栈能直接贴下吗?

另外看你写的是lt.event_time, 
这个sql的是要做版本表的维表关联吗?目前Hive还不支持指定版本表的,只支持最新分区作为维表或者整个hive表作为维表,
两种维表的option你可参考下[1]

祝好,
Leonard
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/hive/hive_read_write.html#temporal-join-the-latest-table





Re: Flink SQL temporal table join with Hive 报错

2021-02-08 文章 macia kk
SELECT *FROM
(
SELECT  tt.*
FROM
input_tabe_01 tt
FULL OUTER JOIN input_tabe_02 mt
ON (mt.transaction_sn = tt.reference_id)
and tt.create_time >= mt.create_time + INTERVAL '5' MINUTES
and tt.create_time <= mt.create_time - INTERVAL '5' MINUTES
WHERE COALESCE(tt.create_time, mt.create_time) is not NULL
) lt
LEFT JOIN exchange_rate ex
/*+ 
OPTIONS('streaming-source.enable'='true','streaming-source.partition.include'
= 'all') */
FOR SYSTEM_TIME AS OF lt.event_time ex ON DATE_FORMAT
(lt.event_time, '-MM-dd') = cast(ex.date_id as String)


Rui Li  于2021年2月9日周二 上午10:20写道:

> Hi,
>
> 那join的语句是怎么写的呢?
>
> On Mon, Feb 8, 2021 at 2:45 PM macia kk  wrote:
>
> > 图就是哪个报错
> >
> > 建表语句如下,表示公共表,我也没有改的权限.
> >
> > CREATE EXTERNAL TABLE `exchange_rate`(`grass_region` string COMMENT
> > 'country', `currency` string COMMENT 'currency', `exchange_rate`
> > decimal(25,10) COMMENT 'exchange rate')
> > PARTITIONED BY (`grass_date` date COMMENT 'partition key, -MM-dd')
> > ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io
> > .parquet.serde.ParquetHiveSerDe'
> > WITH SERDEPROPERTIES (
> >   'serialization.format' = '1'
> > )
> >
> >
> > Rui Li  于2021年2月8日周一 下午2:17写道:
> >
> > > 你好,图挂了,可以贴一下hive建表的DDL和join的语句是怎么写的么?
> > >
> > > On Mon, Feb 8, 2021 at 10:33 AM macia kk  wrote:
> > >
> > > > Currently the join key in Temporal Table Join can not be empty.
> > > >
> > > > 我的 Hive 表 join DDL 没有设置 is not null ,但是都是有值的,还是会报这个错
> > > >
> > > > [image: image.png]
> > > >
> > >
> > >
> > > --
> > > Best regards!
> > > Rui Li
> > >
> >
>
>
> --
> Best regards!
> Rui Li
>


Re: Flink SQL temporal table join with Hive 报错

2021-02-08 文章 Rui Li
Hi,

那join的语句是怎么写的呢?

On Mon, Feb 8, 2021 at 2:45 PM macia kk  wrote:

> 图就是哪个报错
>
> 建表语句如下,表示公共表,我也没有改的权限.
>
> CREATE EXTERNAL TABLE `exchange_rate`(`grass_region` string COMMENT
> 'country', `currency` string COMMENT 'currency', `exchange_rate`
> decimal(25,10) COMMENT 'exchange rate')
> PARTITIONED BY (`grass_date` date COMMENT 'partition key, -MM-dd')
> ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io
> .parquet.serde.ParquetHiveSerDe'
> WITH SERDEPROPERTIES (
>   'serialization.format' = '1'
> )
>
>
> Rui Li  于2021年2月8日周一 下午2:17写道:
>
> > 你好,图挂了,可以贴一下hive建表的DDL和join的语句是怎么写的么?
> >
> > On Mon, Feb 8, 2021 at 10:33 AM macia kk  wrote:
> >
> > > Currently the join key in Temporal Table Join can not be empty.
> > >
> > > 我的 Hive 表 join DDL 没有设置 is not null ,但是都是有值的,还是会报这个错
> > >
> > > [image: image.png]
> > >
> >
> >
> > --
> > Best regards!
> > Rui Li
> >
>


-- 
Best regards!
Rui Li


Re: Flink SQL temporal table join with Hive 报错

2021-02-07 文章 macia kk
图就是哪个报错

建表语句如下,表示公共表,我也没有改的权限.

CREATE EXTERNAL TABLE `exchange_rate`(`grass_region` string COMMENT
'country', `currency` string COMMENT 'currency', `exchange_rate`
decimal(25,10) COMMENT 'exchange rate')
PARTITIONED BY (`grass_date` date COMMENT 'partition key, -MM-dd')
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = '1'
)


Rui Li  于2021年2月8日周一 下午2:17写道:

> 你好,图挂了,可以贴一下hive建表的DDL和join的语句是怎么写的么?
>
> On Mon, Feb 8, 2021 at 10:33 AM macia kk  wrote:
>
> > Currently the join key in Temporal Table Join can not be empty.
> >
> > 我的 Hive 表 join DDL 没有设置 is not null ,但是都是有值的,还是会报这个错
> >
> > [image: image.png]
> >
>
>
> --
> Best regards!
> Rui Li
>


Re: Flink SQL temporal table join with Hive 报错

2021-02-07 文章 Rui Li
你好,图挂了,可以贴一下hive建表的DDL和join的语句是怎么写的么?

On Mon, Feb 8, 2021 at 10:33 AM macia kk  wrote:

> Currently the join key in Temporal Table Join can not be empty.
>
> 我的 Hive 表 join DDL 没有设置 is not null ,但是都是有值的,还是会报这个错
>
> [image: image.png]
>


-- 
Best regards!
Rui Li