flinksql维表join

2020-12-04 文章 leiyanrui
flinksql维表join之后不能做些过滤这样的操作吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 生产hive sql 迁移flink 11 引擎,碰到的问题

2020-12-04 文章 Rui Li
Hi,

目前加载HiveModule可以使用(大部分)hive内置函数,也能解决调用内置函数时的类型转换问题。不过更全面的语法兼容还需要等FLIP-152实现了才能支持,欢迎关注。

On Fri, Dec 4, 2020 at 8:44 PM Jark Wu  wrote:

> Hi,
>
> Flink SQL 1.11 暂时还不兼容 Hive SQL 语法。这个功能的设计,最近才在社区中讨论,预计1.13中支持。可以关注下这个
> design 的讨论:
>
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-152-Hive-Query-Syntax-Compatibility-td46928.html
>
>
> Best,
> Jark
>
> On Fri, 4 Dec 2020 at 11:45, 莫失莫忘  wrote:
>
> > 最近尝试把一个生产 hive sql 任务,执行引擎切换成 flink 1.11.2 ,发现flink 11 对hive
> > SQL的支持有下列问题1、不支持 双引号 表示字符串
> > 2、不支持 != 表示不等运算
> > 3、不支持 类型隐式转换
> > 4、不支持 split 函数
> > 5、hive 不区分大小写,flink区分大小写
> > 6、join右表 不支持是一个子查询(Calcite bug
> > https://issues.apache.org/jira/browse/CALCITE-2152)
> > 7、不支持 create table table1 as select * from pokes; 中的 as
> >
> >
> >
> > 暂时只测到这些问题。总体感觉flink11 对 hive SQL的语句支持还不够,无法把已有离线 hive sql 任务直接 切换到flink
> 引擎。
>


-- 
Best regards!
Rui Li


Re: flink 1.11.2写hive 2.1.1 orc 遇到的问题

2020-12-04 文章 Rui Li
Hi,

现在CDC的数据是没办法直接对接hive的,目前流式数据写hive只能是insert-only的。

On Fri, Dec 4, 2020 at 10:56 AM yang xu <316481...@qq.com> wrote:

> Hi
> 如果不支持ACID,那如果监听binlog日志的更新和删除操作需要另外写任务来处理么,如何才能做到真的批流统一
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



-- 
Best regards!
Rui Li


Re: 动态表 Change Log 格式

2020-12-04 文章 Jark Wu
是完整的记录。

upsert kafka 就是这样子实现的,只存储最新镜像。
但是有的 query 是会产生 delete 消息的,所以有时候还是需要存下 delete,像 upsert kafka 里就存成了kafka 的
tombstone 消息。

Best,
Jark

On Fri, 4 Dec 2020 at 17:00, jie mei  wrote:

> Hi, Community
>
> Flink 动态表的 Change Log 会有四种消息(INSERT, UPDATE_BEFORE, UPDATE_AFTER,
> DELETE).
> 其中 UPDATE_BEFORE 对应的是更新之前的完整记录,UPDATE_AFTER 对应的是更新之后的完整记录吗?
> 我的问题特意强调完整记录,是为了确认 Change Log 的内容是更新后的完整数据,而不是 Set A = 'a' 这样的操作日志/更新语句。
> 此外,Delete语句对应的数据是完整记录还是操作日志呢?
>
> 这意味着Table Sink的时候,只需要获得INSERT, UPDATE_AFTER的数据,写入不支持UPSERT的存储。
> 并通过额外的逻辑判断来获得最新的数据是可行的。
>
> --
>
> *Best Regards*
> *Jeremy Mei*
>


Re: 为什么要关闭calcite的隐式转换功能

2020-12-04 文章 Jark Wu
社区已经开始 Hive query 语法兼容的设计讨论,可以关注下:

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-152-Hive-Query-Syntax-Compatibility-td46928.html

Best,
Jark

On Fri, 4 Dec 2020 at 15:37, stgztsw  wrote:

> 我觉得既然社区准备兼容hive,隐式转换和其他hive的语法兼容还是必须的。实际生产环境里运行的hive
> sql往往都是很复杂的,目前按flink对于hive的兼容程度,大部分的hivesql基本都无法运行成功。(其他欠缺的还有不支持bangEquel,
> create table as 等等,这边就不一一列举了),希望社区能够对hive这块支持的更完善一点。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 生产hive sql 迁移flink 11 引擎,碰到的问题

2020-12-04 文章 Jark Wu
Hi,

Flink SQL 1.11 暂时还不兼容 Hive SQL 语法。这个功能的设计,最近才在社区中讨论,预计1.13中支持。可以关注下这个
design 的讨论:

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-152-Hive-Query-Syntax-Compatibility-td46928.html


Best,
Jark

On Fri, 4 Dec 2020 at 11:45, 莫失莫忘  wrote:

> 最近尝试把一个生产 hive sql 任务,执行引擎切换成 flink 1.11.2 ,发现flink 11 对hive
> SQL的支持有下列问题1、不支持 双引号 表示字符串
> 2、不支持 != 表示不等运算
> 3、不支持 类型隐式转换
> 4、不支持 split 函数
> 5、hive 不区分大小写,flink区分大小写
> 6、join右表 不支持是一个子查询(Calcite bug
> https://issues.apache.org/jira/browse/CALCITE-2152)
> 7、不支持 create table table1 as select * from pokes; 中的 as
>
>
>
> 暂时只测到这些问题。总体感觉flink11 对 hive SQL的语句支持还不够,无法把已有离线 hive sql 任务直接 切换到flink 引擎。


Re: SQL解析复杂JSON问题

2020-12-04 文章 Wei Zhong
是的,1.11想做JSON的自定义解析和映射只能在json format以外的地方进行了

> 在 2020年12月4日,17:19,李轲  写道:
> 
> 如果1.11想做自定义解析和映射,只能通过udf么?
> 
> 发自我的iPhone
> 
>> 在 2020年12月4日,16:52,Wei Zhong  写道:
>> 
>> Hi 你好,
>> 
>> 这个取决于你使用的flink版本,1.11版本会自动从table schema中解析,而1.10版本如果table schema和json 
>> schema不是完全相同的话,需要手动写json-schema:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#data-type-mapping
>>  
>> 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#json-format
>>  
>> 
>> 
>> 
>>> 在 2020年12月4日,16:39,guaishushu1...@163.com 写道:
>>> 
>>> 麻烦问下我已经在字段上面定了结构,还需要再写format.json-schema吗?CREATE TABLE user_log(
>>>  id VARCHAR,
>>>  timestam VARCHAR,
>>>  user_info ROW(user_id string, name string ),
>>>  jsonArray ARRAY
>>> ) WITH (
>>>  'connector.type' = 'kafka',
>>>  'connector.version' = 'universal',
>>>  'connector.topic' = 'complex_string',
>>>  'connector.properties.zookeeper.connect' = 'venn:2181',
>>>  'connector.properties.bootstrap.servers' = 'venn:9092',
>>>  'connector.startup-mode' = 'earliest-offset',
>>>  'format.type' = 'json',
>>>  'format.json-schema' = '{
>>>  "type": "object",
>>>  "properties": {
>>> "id": {type: "string"},
>>> "timestam": {type: "string"},
>>> "user_info":{type: "object",
>>> "properties" : {
>>> "user_id" : {type:"string"},
>>> "name":{type:"string"}
>>> }
>>>   },
>>>  "jsonArray":{"type": "array",
>>>   "items": {
>>>"type": "object",
>>>"properties" : {
>>>"user_id222" : {type:"string"},
>>>"name222" : {type:"string"}
>>>   }
>>>}
>>>   }
>>>  }
>>>  }'
>>> );
>>> 
>>> 
>>> 
>>> 
>>> guaishushu1...@163.com
>> 
>> 
> 



使用DataStream → IterativeStream → DataStream 方式做流关联

2020-12-04 文章 163
想请教各位一个问题:目前有一个这样的需求:
两个事实流A 和B, 需要使用B 去关联A 。现在A的消息可能比B或早或晚达到,时间长度最长可能晚两天。
目前方案是:StreamA connect StreamB , 
将A、B分别去对方的mapstate中去关联,关联上则下发,关联不上则写入自己的mapstate中,等待对方来关联。但是目前还是存在一些误差,猜测是 
部分AB的消息同时到达同时写入自身的mapstate中,无法触发关联下发。

所以目前在想了另一个方案: 将 流A写入外部kv(tair)存储中, 
然后用B采用.iterate()的方式持续去关联tair。现在有个疑问,比如有部分数据关联不上,那么B中有消息产生死循环持续运行关联,这个问题有没有什么好的解决方案



Re: 回复: re:Re: 回复:一个关于实时合并数据的问题

2020-12-04 文章 bradyMk
对对对,可以取hashCode,我短路了,谢谢哈~



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复: re:Re: 回复:一个关于实时合并数据的问题

2020-12-04 文章 xuhaiLong
id 是字符串 走哈希取余试试看?


在2020年12月4日 18:12,502347601<502347...@qq.com> 写道:
hello~不能按照keyId来keyby,这样state的个数也就10亿个了,checkpoint会有性能问题。你可以先求余一下,比如求余分成10组。类似这样keyid%10。


-- Original Message --
From: "bradyMk";
Date: 2020-12-04 18:05
To: "user-zh";
Subject: Re: re:Re: 回复:一个关于实时合并数据的问题



所以您说的这个思路应该是和我上面说的是一样的了吧,根据10亿id做keyby,不会有什么问题么?
- Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: re:Re: re:Re: 回复:一个关于实时合并数据的问题

2020-12-04 文章 bradyMk
这样啊。。那请问如果id是字符串的话,有什么好办法去减少分组么?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

re:Re: re:Re: 回复:一个关于实时合并数据的问题

2020-12-04 文章 502347601
hello~不能按照keyId来keyby,这样state的个数也就10亿个了,checkpoint会有性能问题。你可以先求余一下,比如求余分成10组。类似这样keyid%10。


-- Original Message --
From: "bradyMk";
Date: 2020-12-04 18:05
To: "user-zh";
Subject: Re: re:Re: 回复:一个关于实时合并数据的问题



所以您说的这个思路应该是和我上面说的是一样的了吧,根据10亿id做keyby,不会有什么问题么? 
- Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: re:Re: 回复:一个关于实时合并数据的问题

2020-12-04 文章 bradyMk
所以您说的这个思路应该是和我上面说的是一样的了吧,根据10亿id做keyby,不会有什么问题么?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

re:Re: 回复:一个关于实时合并数据的问题

2020-12-04 文章 502347601
嗯嗯是的,所以你需要keyby下~


-- Original Message --
From: "bradyMk";
Date: 2020-12-04 17:58
To: "user-zh";
Subject: Re: 回复:一个关于实时合并数据的问题



Hi~

可是MapState是只针对keyby后的流才能用啊



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复:一个关于实时合并数据的问题

2020-12-04 文章 bradyMk
Hi~

可是MapState是只针对keyby后的流才能用啊



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-cdc 无法读出binlog,程序也不报错

2020-12-04 文章 Jark Wu
这个听起来不太合理。总得报个什么错 作业再失败吧。 或者TaskManager 的日志中有没有什么异常信息?

On Fri, 4 Dec 2020 at 09:23, chenjb  wrote:

>
> 谢谢老哥关注,我是int对应成bigint了,原以为bigint范围更大应该可以兼容,没认真看文档,然后mysql有错误日志但程序没报错,注意力就放mysql那边了。这两天试了下flink-cdc-sql发现有些有错误的场景不报错,比如连不上mysql(密码错误)也是程序直接退出exit
> 0,没任何报错或提示,是还要设置什么吗?我是java小白,这块不是很懂
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: SQL解析复杂JSON问题

2020-12-04 文章 李轲
如果1.11想做自定义解析和映射,只能通过udf么?

发自我的iPhone

> 在 2020年12月4日,16:52,Wei Zhong  写道:
> 
> Hi 你好,
> 
> 这个取决于你使用的flink版本,1.11版本会自动从table schema中解析,而1.10版本如果table schema和json 
> schema不是完全相同的话,需要手动写json-schema:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#data-type-mapping
>  
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#json-format
>  
> 
> 
> 
>> 在 2020年12月4日,16:39,guaishushu1...@163.com 写道:
>> 
>> 麻烦问下我已经在字段上面定了结构,还需要再写format.json-schema吗?CREATE TABLE user_log(
>>   id VARCHAR,
>>   timestam VARCHAR,
>>   user_info ROW(user_id string, name string ),
>>   jsonArray ARRAY
>> ) WITH (
>>   'connector.type' = 'kafka',
>>   'connector.version' = 'universal',
>>   'connector.topic' = 'complex_string',
>>   'connector.properties.zookeeper.connect' = 'venn:2181',
>>   'connector.properties.bootstrap.servers' = 'venn:9092',
>>   'connector.startup-mode' = 'earliest-offset',
>>   'format.type' = 'json',
>>   'format.json-schema' = '{
>>   "type": "object",
>>   "properties": {
>>  "id": {type: "string"},
>>  "timestam": {type: "string"},
>>  "user_info":{type: "object",
>>  "properties" : {
>>  "user_id" : {type:"string"},
>>  "name":{type:"string"}
>>  }
>>},
>>   "jsonArray":{"type": "array",
>>"items": {
>> "type": "object",
>> "properties" : {
>> "user_id222" : {type:"string"},
>> "name222" : {type:"string"}
>>}
>> }
>>}
>>   }
>>   }'
>> );
>> 
>> 
>> 
>> 
>> guaishushu1...@163.com
> 
> 



????????????????????????????????

2020-12-04 文章 ????
hello??  id ?? id ?? keyby  id  
mod ?? processFunction ?? MapState  http://apache-flink.147419.n8.nabble.com/

动态表 Change Log 格式

2020-12-04 文章 jie mei
Hi, Community

Flink 动态表的 Change Log 会有四种消息(INSERT, UPDATE_BEFORE, UPDATE_AFTER,
DELETE).
其中 UPDATE_BEFORE 对应的是更新之前的完整记录,UPDATE_AFTER 对应的是更新之后的完整记录吗?
我的问题特意强调完整记录,是为了确认 Change Log 的内容是更新后的完整数据,而不是 Set A = 'a' 这样的操作日志/更新语句。
此外,Delete语句对应的数据是完整记录还是操作日志呢?

这意味着Table Sink的时候,只需要获得INSERT, UPDATE_AFTER的数据,写入不支持UPSERT的存储。
并通过额外的逻辑判断来获得最新的数据是可行的。

-- 

*Best Regards*
*Jeremy Mei*


Re: SQL解析复杂JSON问题

2020-12-04 文章 Wei Zhong
Hi 你好,

这个取决于你使用的flink版本,1.11版本会自动从table schema中解析,而1.10版本如果table schema和json 
schema不是完全相同的话,需要手动写json-schema:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/json.html#data-type-mapping
 

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#json-format
 



> 在 2020年12月4日,16:39,guaishushu1...@163.com 写道:
> 
> 麻烦问下我已经在字段上面定了结构,还需要再写format.json-schema吗?CREATE TABLE user_log(
>id VARCHAR,
>timestam VARCHAR,
>user_info ROW(user_id string, name string ),
>jsonArray ARRAY
> ) WITH (
>'connector.type' = 'kafka',
>'connector.version' = 'universal',
>'connector.topic' = 'complex_string',
>'connector.properties.zookeeper.connect' = 'venn:2181',
>'connector.properties.bootstrap.servers' = 'venn:9092',
>'connector.startup-mode' = 'earliest-offset',
>'format.type' = 'json',
>'format.json-schema' = '{
>"type": "object",
>"properties": {
>   "id": {type: "string"},
>   "timestam": {type: "string"},
>   "user_info":{type: "object",
>   "properties" : {
>   "user_id" : {type:"string"},
>   "name":{type:"string"}
>   }
> },
>"jsonArray":{"type": "array",
> "items": {
>  "type": "object",
>  "properties" : {
>  "user_id222" : {type:"string"},
>  "name222" : {type:"string"}
> }
>  }
> }
>}
>}'
> );
> 
> 
> 
> 
> guaishushu1...@163.com



一个关于实时合并数据的问题

2020-12-04 文章 bradyMk
想请教各位一个问题:目前有一个这样的需求:

数据流40W/s,数据有id,time,type等字段,id有10亿个,现在想30分钟内,同一个id的信息只保存一条,时间的话要用事件的事件,不能用处理的时间。

本人现在的思路是:根据id分组,然后做增量ck,状态信息存储每个id的最后的时间,然后每来一条数据会读取状态信息,然后做时间判断。但是发现这样做背压很高,数据消费很慢

请问各位,我这种思路是否可行?根据id分组会产生10亿个分组,这样会影响什么?还有其他更好的方法么?

谢谢各位解答疑惑!




-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink1.9设置TTL不生效

2020-12-04 文章 Yang Peng
没人遇到这种问题吗?

Yang Peng  于2020年12月3日周四 下午8:49写道:

> Hi,咨询一个问题 我们生产环境使用flink1.9版本,使用的statebackend为RocksDB,具体代码设置如下:
>
> private static final String EV_STATE_FLAG = "EV_EID_FLAG";
>
> StateTtlConfig ttlConfig = StateTtlConfig
> .newBuilder(Time.minutes(60))
> .updateTtlOnCreateAndWrite()
> .neverReturnExpired()
> .cleanupInRocksdbCompactFilter(1000)
> .build();
> MapStateDescriptor eidMapStateDesc = new
> MapStateDescriptor<>( EV_STATE_FLAG , BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO);
> eidMapStateDesc.enableTimeToLive(ttlConfig);
> eidMapState = getRuntimeContext().getMapState(eidMapStateDesc);
>
> 设置TTL过期时间为60mins
> 但是目前已经运行了一天了,通过rocksdb监控我们查看EV_STATE_FLAG这个名称的SST文件一直在增加没有降低的趋势,我们从TM日志发现如下信息:
> WARN org.rocksdb.FlinkCompactionFilter - Cannot configure RocksDB TTL
> compaction filter for state < EV_EID_FLAG >: feature is disabled for the
> state backend.
> 但是我们在添加完  state.backend.rocksdb.ttl.compaction.filter.enabled:
> true这个参数重启任务之后上述warn
> 信息就会消失,但是任务运行一段时间后就会执行cp失败,我们查看jstack发现执行cp失败是卡在了获取state数据的代码位置,去掉这个参数之后任务就会恢复,但是TTL
> 配置不生效这个warn就会复现,大家有遇到过这种问题吗?
>


SQL解析复杂JSON问题

2020-12-04 文章 guaishushu1...@163.com
麻烦问下我已经在字段上面定了结构,还需要再写format.json-schema吗?CREATE TABLE user_log(
id VARCHAR,
timestam VARCHAR,
user_info ROW(user_id string, name string ),
jsonArray ARRAY
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'complex_string',
'connector.properties.zookeeper.connect' = 'venn:2181',
'connector.properties.bootstrap.servers' = 'venn:9092',
'connector.startup-mode' = 'earliest-offset',
'format.type' = 'json',
'format.json-schema' = '{
"type": "object",
"properties": {
   "id": {type: "string"},
   "timestam": {type: "string"},
   "user_info":{type: "object",
   "properties" : {
   "user_id" : {type:"string"},
   "name":{type:"string"}
   }
 },
"jsonArray":{"type": "array",
 "items": {
  "type": "object",
  "properties" : {
  "user_id222" : {type:"string"},
  "name222" : {type:"string"}
 }
  }
 }
}
}'
);




guaishushu1...@163.com