flink 版本:1.12
列:col varchar
使用where col is null时可以过滤出col为null的记录
使用where col is null or col = ''时就不可以
同时试了下另外一种写法
where (case when col is null then true else false end) 可以过滤出来
where (case when col is null then true when col = '' then true else false end)
过滤不出来
请问这个bug吗,还是语法有问题
用的什么版本这个应该已经在1.13中修复了https://issues.apache.org/jira/browse/FLINK-18726
不行的话可以在ddl中限制列的数量
--
发件人:Ye Chen
发送时间:2021年8月2日(星期一) 11:37
收件人:user-zh ; silence
主 题:Re:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
你好,我试了一下,如
如果只想更新部分字段的话可以试下
insert into t(a,b) select a,b from x
--
发件人:Ye Chen
发送时间:2021年7月30日(星期五) 17:57
收件人:user-zh
主 题:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?
现有table
CREATE TABLE t (
abigint,
bbig
你在你的sink ddl定义了主键会自动的按主键进行upsert的
参考https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/#idempotent-writes
--
发件人:Ye Chen
发送时间:2021年7月30日(星期五) 17:57
收件人:user-zh
主 题:场景题:Flink SQL 不支持 INSERT INTO…
就是单独引用的啊,但任务逻辑比较复杂时会同时混用多个udf这个是没法避免的啊
--
发件人:Michael Ran
发送时间:2021年7月23日(星期五) 17:42
收件人:user-zh ; silence
主 题:Re:回复:flink sql 依赖隔离
建议上传的时候单独放,提交任务的时候 拉下来单独引用
在 2021-07-23 11:01:59,"silence" 写道:
>
>这边目前主要还是yarn,
这边目前主要还是yarn,目前痛点是同一个sql任务会用到多个udf,udf通过-C http://xxx.jar进行加载
udf和sql jar之间、udf和udf之间都可能会有依赖冲突,
目前初步想法是每个udf绑定一个jarpath,使用独立的classloader进行加载,避免和主jar以及其他udf之间的依赖冲突
--
发件人:Michael Ran
发送时间:2021年7月22日(星期四) 20:07
收件人:user-zh ; silence
已解决 where 条件始终为假。
--
发件人:silence
发送时间:2021年7月7日(星期三) 12:05
收件人:user-zh
主 题:flinksql问题请教
请教一下下面的sql为什么会被翻译成有限数据集?源表是个kafka source,用了row_number进行开窗和两次Table Function join
flink版本:1.12.2
Stage 1 : Data Source
content : Source: Values
请教一下下面的sql为什么会被翻译成有限数据集?源表是个kafka source,用了row_number进行开窗和两次Table Function join
flink版本:1.12.2
Stage 1 : Data Source
content : Source: Values(tuples=[[]])
Stage 2 : Operator
content : Correlate(invocation=[LateralArray($cor3.gift_list)],
correlate=[table(LateralArray($cor3.gift_list))],
select=
没用放在lib下,是启动时通过-C动态添加udf jar,一个sql作业可能会用到很多udf,可能是不同的用户写的,所以经常会出现依赖冲突
--
发件人:yzhhui
发送时间:2021年7月5日(星期一) 14:09
收件人:user-zh@flink.apache.org ; silence
抄 送:user-zh
主 题:回复:flink sql 依赖隔离
提交任务的时候提交自己的jar就好了,这个不要放公共lib下 就OK
在2021年
请教大家目前flink sql有没有办法做到依赖隔离
比如connector,format,udf(这个最重要)等,
很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。
目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划
目前interval join和维表的时态join不会进行回撤,其他场景会产生回撤数据
--
发件人:杨光跃
发送时间:2021年6月30日(星期三) 17:47
收件人:user-zh@flink.apache.org
主 题:普通表join版本表,怎么得到append表
大佬们,请教个问题,
insert into sink_2
select a.`time`,c.cust,b.mobile
from case2_TOPIC_A a
left joi
可参考
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-source-idle-timeout
--
发件人:杨光跃
发送时间:2021年6月30日(星期三) 10:54
收件人:user-zh@flink.apache.org
主 题:flink sql 空闲数据源场景如何配置
在代码中可以通过
flink 版本1.12
异常如下:
java.io.IOException: Can't get next record for channel
InputChannelInfo{gateIdx=0, inputChannelIdx=0}
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:166)
at
org.apache.flink.streaming.runtime.io.StreamOneInputP
可以在metrics 上报时或落地前对source两次上报间隔的numRecordsOut值进行相减,最后呈现的时候按时间段累计就可以了
--
Sent from: http://apache-flink.147419.n8.nabble.com/
可以用explain
--
Sent from: http://apache-flink.147419.n8.nabble.com/
启动时通过-C加到classpath里试试
--
Sent from: http://apache-flink.147419.n8.nabble.com/
多个insert的话要用statementset去提交
--
Sent from: http://apache-flink.147419.n8.nabble.com/
个人也维护了个flink平台的开源项目,希望可以帮助到你
https://github.com/hairless/plink
--
Sent from: http://apache-flink.147419.n8.nabble.com/
问题描述:
TimeStamp类型和异常格式的字符串进行比较时会在任务运行时报空指针
像这种错误虽然是用户书写错误导致的,但运行时才能发现问题,且sql太长时不好定位具体原因
是否可以在编译期进行类型的验证,尽早发现问题并给出sql的文本坐标
例:where CURRENT_TIMESTAMP=''
where CURRENT_TIMESTAMP='19700101'
java.lang.NullPointerException: null
at
org.apache.flink.table.data.TimestampData.compareTo(Times
那用自定义的catalog怎么定义hive表来读写hive呢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
我理解各个公司都会有自己的元数据管理平台,hive表的创建修改都需要经过严格的权限控制在平台上进行操作,包括调度任务、实时写入任务、数据血缘等。
我个人觉得理想的方式是单个flink
sql的所有的connector通过自维护的元数据进行生成,不需要引入hivecatalog,使用默认的MemoryCatalog即可。
总结一下就是不希望引入HiveCatalog来进行hive表的读写
--
Sent from: http://apache-flink.147419.n8.nabble.com/
你好
感谢回复
主要有以下几点原因:
1、直接使用hive catalog进行hive表的创建修改风险太高,更希望在平台层限制hive表的创建和修改
2、connector的配置是保存在hive表的DBPROPERTIES里的,这是否就意味着想通过flink往现有hive表里写数据需要先通过alter语句修改hive表的属性配置,这里不希望对用户直接暴露alter
hive的能力
3、使用普通的ddl可以与现有connector的定义统一风格,不需要来回切换方言
4、可以不用将配置信息持久化,通过GenericInMemoryCatalog使用即可
--
Sent from: http
问一下社区有没有计划支持普通的ddl(不用hive的catalog)来进行读写hive表吗
现在不支持是有什么考虑吗
--
Sent from: http://apache-flink.147419.n8.nabble.com/
可以尝试在shade插件里加个transformer
--
Sent from: http://apache-flink.147419.n8.nabble.com/
flink1.12后所有的yarn相关的参数通过-D进行指定
例:-D yarn.application.name=xxx 替代以前的-ynm xxx
更多配置参考文档https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#yarn
--
Sent from: http://apache-flink.147419.n8.nabble.com/
你可以尝试同时指定-C "file:///path/to/conf/cmp_online.cfg" 以及 -yt /path/to/conf 来进行测试
然后代码里这么获取this.getClass().getResourceAsStream("cmp_online.cfg")
--
Sent from: http://apache-flink.147419.n8.nabble.com/
flink sql官方文档中数组的取值方式如下定义
array ‘[’ integer ‘]’ Returns the element at position integer in array. The
index starts from 1.
参考链接
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html#collection-functions
主要问题就是数组的下标是从1开始的,这不符合数组从0开始的常识,也和hive sql不兼容,
应该是-D不是-yD
--
Sent from: http://apache-flink.147419.n8.nabble.com/
flink已经不建议将hadoop的jar放到lib里了
可以通过
export HADOOP_CLASSPATH=`hadoop classpath`
加载hadoop的依赖
参考链接:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/hadoop.html#providing-hadoop-classes
--
Sent from: http://apache-flink.147419.n8.nabble.com/
这个问题我们也遇到过,目前这个issue在跟进,https://issues.apache.org/jira/browse/FLINK-12130
--
Sent from: http://apache-flink.147419.n8.nabble.com/
可以用string
--
Sent from: http://apache-flink.147419.n8.nabble.com/
你好:
这个原因最开始已经说明了,main jar就是将传入的sql参数进行解析封装,而sql里用到的udf、connector之类的类型希望可以做到动态指定
一方面可以做到灵活的依赖控制,减少main jar的大小
另一方吧可以减少不同connector和udf,或不同版本connector和udf的依赖冲突的可能性
ps:假如平台有数十种connector和数百个udf都打到一个fast jar里想想都觉得不太优雅吧
--
Sent from: http://apache-flink.147419.n8.nabble.com/
看了很多同学回复yarn的解决方案
我这再补充一下:
还是希望可以提供更通用的submit参数来解决此问题,
包括提交到standalone集群时可以额外指定本地依赖jar
有没有cli相关的同学可以跟进下建议
谢谢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
感谢回复,还是希望可以从submit上解决这个问题,不能添加依赖限制了很多应用场景,特别是针对平台来说
--
Sent from: http://apache-flink.147419.n8.nabble.com/
大家好
由于目前用了flink SQL封装了jar包,sql是作为参数动态传入的,
因此需要动态的引入一下依赖jar,比如udf jar,connector的jar等,
由于不同任务的依赖jar是不同的,不适合都放在flink lib目录下(可能会冲突)
因此想请教一下有没有办法在submit时才指定任务依赖的jar包,类似spark submit的--jars
没有的话有没有相关的issue可以跟进这个问题
谢谢
--
Sent from: http://apache-flink.147419.n8.nabble.com/
说一下我们平台的实现方式
1、自定义metricReporter,假如任务开启了checkpoint,reporter会自动的将最新完成的checkpoint路径进行上报
可参考https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html#checkpointing
2、平台会有是否重试和是否基于checkpoint进行恢复的选项
3、假如上述两选项都开启了之后,可以对运行失败的任务基于最新的checkpoint进行拉起
--
Sent from: http://apac
hi zhisheng
我找到两篇相关的参考博客你看一下
https://blog.csdn.net/a1240466196/article/details/107853926
https://www.jianshu.com/p/c7515bdde1f7
--
Sent from: http://apache-flink.147419.n8.nabble.com/
目前消费kafka会有lag的情况发生,因此想基于flink metric进行上报监控kakfa的消费延时情况
主要是两种情况:
1、当前group消费的offset和对应topic最大offset之间的差值,也就是积压的数据量
2、当前消费的最新记录的timestamp和系统时间之间的差值,也就是消费的时间延时
kafka lag的监控对实时任务的稳定运行有着非常重要的作用,
网上也检索到了一些基于源码修改的实现,但修改源码的话也不利于后面flink版本的升级,还是希望官方可以考虑支持一下
--
Sent from: http://apache-flink.147419.n8.na
;
import org.apache.flink.sql.parser.validate.FlinkSqlConformance;
/**
* @author: silence
* @date: 2020/10/22
*/
public class Test {
public static void main(String[] args) throws SqlParseException {
String sql = "xxx";
SqlParser.Config sqlParserConfig =
也可以通过普通的非窗口聚合进行实现吧,minibatch设大点
--
Sent from: http://apache-flink.147419.n8.nabble.com/
我写过一个类似的可以参考一下
private static List lookupSelectTable(SqlNode sqlNode) {
List list = new ArrayList<>();
if (sqlNode instanceof SqlSelect) {
SqlNode from = ((SqlSelect) sqlNode).getFrom();
list.addAll(lookupSelectTable(from));
} else if (sqlNode instan
写过一个类似的可以参考一下
private static List lookupSelectTable(SqlNode sqlNode) {
List list = new ArrayList<>();
if (sqlNode instanceof SqlSelect) {
SqlNode from = ((SqlSelect) sqlNode).getFrom();
list.addAll(lookupSelectTable(from));
} else if (sqlNode instanc
可以写一个group_array的udaf
select * from aa as a left join (
select userId,group_array(row(userId, userBankNo, userBankNo)) from bb
group by userId
) as b where a.userId=b.userId
--
Sent from: http://apache-flink.147419.n8.nabble.com/
如题,最近想实现一些类似于LAST_VALUE之类的UDAF,看了官网文档自己写了一下目前有以下一些疑问:
1、聚合结果需要重写AggregateFunction的getValue方法,而该方法需要返回固定的数据类型,如果要实现不同返回值的UDAF是否需要进行多个实现?
2、如果是需要多个实现类的话如何注册到同一个方法名上?测试发现后注册的UDAF会覆盖之前的注册,也就是只有最后注册的UDAF生效,还是只能支持一种数据类型
3、看了源码中的aggFuction的注册过程,发现也是对不同的数据类型进行了多次实现,然后在使用时根据参数的类型进行不同的实现类的创建
没有insert语句也就是没有sink无法触发计算
--
Sent from: http://apache-flink.147419.n8.nabble.com/
个人理解有几种实现方案
1、通过主键加LAST_VALUE()使用最新的记录进行计算
2、通过flink-cdc connector source
3、自己根据操作类型写计算逻辑
--
Sent from: http://apache-flink.147419.n8.nabble.com/
手动停止再恢复的话需要启动时通过 (-s 上一次checkpoint的mate路径)进行恢复
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/check
points.html#resuming-from-a-retained-checkpoint
-邮件原件-
发件人: 凌天荣 <466792...@qq.com>
发送时间: 2020年9月8日 15:50
收件人: user-zh
主题: flink-sql 1.11版本都还没完全支持checkpoint吗
代码里设置了e
47 matches
Mail list logo