Re: Flink SQL No Watermark

2020-08-12 文章 Shengkai Fang
hi, 你的意思是没有办法在codegen出来的代码上加断点的意思吗?
这里倒是有一个比较hack的方法:
将生成的类放在一个java文件之中,然后修改改下GeneratedClass下的newInstance方法,如果classname ==
“WatermarkGenerator$2” 则将刚才的类则返回 new WatermarkGenerator$2 这个类。

我个人对于问题的猜测是有一条数据的rowtime远远晚于其他数据,从而将整体的watermark提得很高,导致后面的“晚到”的数据一直无法触发watermark的生成。


forideal  于2020年8月13日周四 下午12:57写道:

> 大家好
>
>
> 关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是
> StreamExecWatermarkAssigner
> 在translateToPlanInternal 中生成了如下一个 class 代码,
> public final class WatermarkGenerator$2 extends
> org.apache.flink.table.runtime.generated.WatermarkGenerator { public
> WatermarkGenerator$2(Object[] references) throws Exception { } @Override
> public void open(org.apache.flink.configuration.Configuration parameters)
> throws Exception { } @Override public Long
> currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws
> Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean
> isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp
> result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) {
> field$3 = row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5
> = null; if (!isNull$4) { result$5 =
> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
> - ((long) 1L), field$3.getNanoOfMillisecond()); } if (isNull$4) {
> return null; } else { return result$5.getMillisecond(); } } @Override
> public void close() throws Exception { } }
>
>
>
>其中关键的信息是 result$5 =
> org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
> - ((long) 1L), field$3.getNanoOfMillisecond());
> 确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的
> watermark。
> 在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark
> 这样的结果。因为这部分codegen的代码确实无法进一步debug了。
> 如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢
>
>   Best forideal
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-08-11 17:13:01,"forideal"  写道:
> >大家好,请教一个问题
> >
> >
> >   我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成
> watermark。消费大量的数据的时候,就无法生成watermark。
> >   一直是No Watermark。 暂时找不到排查问题的思路。
> >  Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了
> EventTime mode 模式,Blink Planner。
> >|
> >No Watermark |
> >   SQL如下
> >
> >
> >  DDL:
> >   create table test(
> >   user_id varchar,
> >   action varchar,
> >   event_time TIMESTAMP(3),
> >   WATERMARK FOR event_time AS event_time - INTERVAL
> '10' SECOND
> >   ) with();
> >
> >
> >  DML:
> >insert into
> >  console
> >select
> >  user_id,
> >  f_get_str(bind_id) as id_list
> >from
> >  (
> >select
> >  action as bind_id,
> >  user_id,
> >  event_time
> >from
> >  (
> >SELECT
> >  user_id,
> >  action,
> >  PROCTIME() as proc_time,
> >  event_time
> >FROM
> >  test
> >  ) T
> >where
> >  user_id is not null
> >  and user_id <> ''
> >  and CHARACTER_LENGTH(user_id) = 24
> >  ) T
> >group by
> >  SESSION(event_time, INTERVAL '10' SECOND),
> >  user_id
> >
> >Best forideal
>


请教关于Flink算子FINISHED状态时无法保存Checkpoint的问题

2020-08-12 文章 yulu yang
请教大佬一个我最近在配置Flink流的过程中遇到问题,
flink作业中关联使用了物理表(字典表),在flinkjob启动后,会对字典表进行一次读取,然后该算子会变成FINISHED状态,导致该flinkjob无法保存checkpoint和savepoint。一般大家遇到这种问题都是怎么处理的,我这个作业在数据加工过程中必须用到字典表赋值。


FlinkSQL命令行方式提交sql任务的jar依赖问题

2020-08-12 文章 Zhao,Yi(SEC)
背景,我的flink集群(10机器)A,此外还有一个单独的flink机器(作为提交任务的机器)B。
现在比较混乱,哪些jar需要放到A,哪些放到B。


(1) kafka ssl 
证书,这个肯定需要放到A,因为kafka提供的属性只是ssl证书的路径,真正读取证书是任务开始运行之后,因此这个路径必须是集群A可访问,B是否可访问其实无所谓。

(2) 
flink-sql情况下的kafak-client包。这个比较奇怪,从我的实验效果来看,貌似机器B上没有会报错,必须在启动sql-client的时候通过-j或-l参数指定jar路径,并包含kafka-client.jar,类似的还有flink-json等格式包。此外,这些包在集群A上都没有,但运行都正常。这么来看,我在sql-client中提交一个sql的时候,部分依赖应该是被打包并且上传到集群A的?这是怎么决定的呢?从我的sql中动态判定哪些包用到?还是咋的。



总结下,FlinkSQL使用sql-client命令行方式提交sql执行任务的场景下。最终运行时候哪些包需要用到sql-client端的包呢?

目前从实验来看Flink-json、各种connector的包都是用的sql-client的包。




Re:Flink SQL No Watermark

2020-08-12 文章 forideal
大家好


关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 
StreamExecWatermarkAssigner
在translateToPlanInternal 中生成了如下一个 class 代码,
public final class WatermarkGenerator$2 extends 
org.apache.flink.table.runtime.generated.WatermarkGenerator { public 
WatermarkGenerator$2(Object[] references) throws Exception { } @Override public 
void open(org.apache.flink.configuration.Configuration parameters) throws 
Exception { } @Override public Long 
currentWatermark(org.apache.flink.table.dataformat.BaseRow row) throws 
Exception { org.apache.flink.table.dataformat.SqlTimestamp field$3; boolean 
isNull$3; boolean isNull$4; org.apache.flink.table.dataformat.SqlTimestamp 
result$5; isNull$3 = row.isNullAt(12); field$3 = null; if (!isNull$3) { field$3 
= row.getTimestamp(12, 3); } isNull$4 = isNull$3 || false; result$5 = null; if 
(!isNull$4) { result$5 = 
org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
 - ((long) 1L), field$3.getNanoOfMillisecond()); } if (isNull$4) { return 
null; } else { return result$5.getMillisecond(); } } @Override public void 
close() throws Exception { } } 
 


   其中关键的信息是 result$5 = 
org.apache.flink.table.dataformat.SqlTimestamp.fromEpochMillis(field$3.getMillisecond()
 - ((long) 1L), field$3.getNanoOfMillisecond());
确实按照 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 的定义获取的 
watermark。
在 flink 的 graph 中也确实有对应的 op 在做这个事情,不知为何会出现 no watermark 
这样的结果。因为这部分codegen的代码确实无法进一步debug了。
如果大家有什么好的 debug codegen 生成的代码,可以告诉我哈,非常感谢

  Best forideal











在 2020-08-11 17:13:01,"forideal"  写道:
>大家好,请教一个问题
>
>
>   我有一条进行 session window 的 sql。这条 sql 消费较少数据量的 topic 的时候,是可以生成 
> watermark。消费大量的数据的时候,就无法生成watermark。
>   一直是No Watermark。 暂时找不到排查问题的思路。
>  Flink 版本号是 1.10,kafka 中消息是有时间的,其他的任务是可以拿到这个时间生成watermark。同时设置了 
> EventTime mode 模式,Blink Planner。
>|
>No Watermark |
>   SQL如下
>
>
>  DDL:
>   create table test(
>   user_id varchar,
>   action varchar,
>   event_time TIMESTAMP(3),
>   WATERMARK FOR event_time AS event_time - INTERVAL '10' 
> SECOND
>   ) with();
>
>
>  DML:
>insert into
>  console
>select
>  user_id,
>  f_get_str(bind_id) as id_list
>from
>  (
>select
>  action as bind_id,
>  user_id,
>  event_time
>from
>  (
>SELECT
>  user_id,
>  action,
>  PROCTIME() as proc_time,
>  event_time
>FROM
>  test
>  ) T
>where
>  user_id is not null
>  and user_id <> ''
>  and CHARACTER_LENGTH(user_id) = 24
>  ) T
>group by
>  SESSION(event_time, INTERVAL '10' SECOND),
>  user_id
> 
>Best forideal


Re: Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-08-12 文章 Dream-底限
flink暴漏的lookup
是支持设置缓存记录条数和缓存时间的吧,把时间和条数设置的小一点或者直接禁用缓存,如果流表数据量不大的话可以不用异步访问,数据量大的话异步访问不加缓存维表存储引擎可能压力过大

Jim Chen  于2020年8月13日周四 上午11:53写道:

> 请问下,如果使用了localcache+asyncIO的方式,缓存一致性,有什么解决的思路吗?维表的状态是频繁更新的
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-08-12 文章 Jim Chen
请问下,如果使用了localcache+asyncIO的方式,缓存一致性,有什么解决的思路吗?维表的状态是频繁更新的



--
Sent from: http://apache-flink.147419.n8.nabble.com/

如何设置FlinkSQL并行度

2020-08-12 文章 Zhao,Yi(SEC)
看配置文件有 execution. Parallelism,但这个明显是全局类型的配置。
如何给Sql生成的数据源结点,window结点,sink结点等设置不同的并行度呢?

比如数据源理论上应该和kafka分区数一致比较好,window则需要根据数据量考虑计算压力,sink也应该有相应的场景考虑。



Re: 关于FlinkSQL的一些最佳实践咨询

2020-08-12 文章 Leonard Xu
FLIP-129
[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API
 


> 在 2020年8月13日,11:26,zhao liang  写道:
> 
> 请问table api要重构是哪个FLIP,我想关注下
> 
> 发件人: Shengkai Fang 
> 日期: 星期四, 2020年8月13日 11:09
> 收件人: user-zh@flink.apache.org 
> 主题: Re: 关于FlinkSQL的一些最佳实践咨询
> 针对(3)社区建议使用sql api, table api现在正准备重构。
> 
> 靳亚洽  于2020年8月13日周四 上午11:00写道:
> 
>> 针对2, 我们公司采用的是通过提交一个解析flink sql的jar包,把sql作为jar包的参数来执行的。采用接口机将用户的udf
>> jar包和平台提供的解析flink sql的jar包提交到集群。
>> 针对3, 既然使用了flink sql,当然期望sql搞定一切了
>> 针对4,
>> 我们改造了下flink-client模块的一小块代码,支持提交多jar包到集群,所以connector那些包就通过这种方式上传了。当然提前放集群上面也是可以的。
>> 
>> 
>> | |
>> 靳亚洽
>> |
>> |
>> jinya...@163.com
>> |
>> 签名由网易邮箱大师定制
>> 
>> 
>> 在2020年08月13日 10:17,Zhao,Yi(SEC) 写道:
>> 没有小伙伴帮忙看下这个问题吗。可以优先看下第2、3、4个问题。
>> 
>> 其他问题,第1/5个问题,我自己大概感觉,直接使用memory
>> catalog就ok啦。我看了下FlinkSQL好像没有临时表的概念。10个SQL开10个sql-client的session,各个session独享自己的memory
>> catalog,创建的表也类似于临时表的效果。
>> 
>> 发件人: "Zhao,Yi(SEC)" 
>> 日期: 2020年8月12日 星期三 下午2:20
>> 收件人: "user-zh@flink.apache.org" 
>> 主题: 关于FlinkSQL的一些最佳实践咨询
>> 
>> 最近刚刚开始研究FlinkSQL,之前一直用的DataStream API,现在想尝试下FlinkSQL的玩法。
>> 目前简单看下来,感觉相对来说相比DataStream API不够灵活,所以有一些实践疑问咨询下。
>> 
>> (1) 通常Kafka的动态表会通过临时表的方式写在程序开头声明定义呢?还是直接写在sql-client的配置文件中呢。
>> 
>> 
>> 如果写在配置文件,好处是类似于一个固化的表定义一样,每次执行sql-client的cli之后可以马上查询相关数据。但是有个问题是这种情况下kafka设置读取earliest还是latest?貌似根据具体查询需求临时改配置好像也不是很方便。
>> 
>> 
>> 写临时表是不是更常用?比如你这个程序目的就是读取指定时间开始的数据,那就在代码中定义这个临时表的时候就指定了offset。如果是需要从最早开始读,代码中定义临时表就定义为最早。
>> 
>> (2)
>> FlinkSQL生产中,主要是通过代码方式执行呢?还是将相关jar都放好到固定的机器,然后每次sql-client的命令行进去,然后执行一些sql(提交任务)。
>> 
>> (3) 代码中,直接sql api常用呢?还是table api常用呢?哪个更推荐?包括表的定义也有sql ddl方式,以及table
>> api提供的一套方式。以及DML也一样,table api提供了一套更加面向对象的语法,sql
>> api则提供了更高层次的抽象,同时类sql(技能栈更通用)。
>> 
>> (4)
>> Flink-json这种格式包一般打包到任务jar中?还是提前放到sql-client的相关lib中?哪个更推荐。还有各种connector。
>> 
>> 今天有个实验,报错就是没这些包,加了lib就ok啦。这个有一点不是很懂,是这些包仅client需要,还是说我通过sql-client.sh
>> 的-l参数指定的lib目录会被动态上传到集群呢?毕竟这些包我并没有手动上传到集群,只是在client端有,并通过-l指定之后就ok可用了。
>> 
>> (5)
>> 在Hive中实践,更多的是提前定义好的各种表结构,然后策略同学直接写sql做各种数据查询和分析。在FlinkSQL场景下,是不是每个代码中定义自己的临时表更常见呢?毕竟有Kafka这种动态表,每次启动一个任务的开始消费位置可能也不一样。
>> 



答复: 关于FlinkSQL的一些最佳实践咨询

2020-08-12 文章 zhao liang
请问table api要重构是哪个FLIP,我想关注下

发件人: Shengkai Fang 
日期: 星期四, 2020年8月13日 11:09
收件人: user-zh@flink.apache.org 
主题: Re: 关于FlinkSQL的一些最佳实践咨询
针对(3)社区建议使用sql api, table api现在正准备重构。

靳亚洽  于2020年8月13日周四 上午11:00写道:

> 针对2, 我们公司采用的是通过提交一个解析flink sql的jar包,把sql作为jar包的参数来执行的。采用接口机将用户的udf
> jar包和平台提供的解析flink sql的jar包提交到集群。
> 针对3, 既然使用了flink sql,当然期望sql搞定一切了
> 针对4,
> 我们改造了下flink-client模块的一小块代码,支持提交多jar包到集群,所以connector那些包就通过这种方式上传了。当然提前放集群上面也是可以的。
>
>
> | |
> 靳亚洽
> |
> |
> jinya...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年08月13日 10:17,Zhao,Yi(SEC) 写道:
> 没有小伙伴帮忙看下这个问题吗。可以优先看下第2、3、4个问题。
>
> 其他问题,第1/5个问题,我自己大概感觉,直接使用memory
> catalog就ok啦。我看了下FlinkSQL好像没有临时表的概念。10个SQL开10个sql-client的session,各个session独享自己的memory
> catalog,创建的表也类似于临时表的效果。
>
> 发件人: "Zhao,Yi(SEC)" 
> 日期: 2020年8月12日 星期三 下午2:20
> 收件人: "user-zh@flink.apache.org" 
> 主题: 关于FlinkSQL的一些最佳实践咨询
>
> 最近刚刚开始研究FlinkSQL,之前一直用的DataStream API,现在想尝试下FlinkSQL的玩法。
> 目前简单看下来,感觉相对来说相比DataStream API不够灵活,所以有一些实践疑问咨询下。
>
> (1) 通常Kafka的动态表会通过临时表的方式写在程序开头声明定义呢?还是直接写在sql-client的配置文件中呢。
>
>
> 如果写在配置文件,好处是类似于一个固化的表定义一样,每次执行sql-client的cli之后可以马上查询相关数据。但是有个问题是这种情况下kafka设置读取earliest还是latest?貌似根据具体查询需求临时改配置好像也不是很方便。
>
>
> 写临时表是不是更常用?比如你这个程序目的就是读取指定时间开始的数据,那就在代码中定义这个临时表的时候就指定了offset。如果是需要从最早开始读,代码中定义临时表就定义为最早。
>
> (2)
>  
> FlinkSQL生产中,主要是通过代码方式执行呢?还是将相关jar都放好到固定的机器,然后每次sql-client的命令行进去,然后执行一些sql(提交任务)。
>
> (3) 代码中,直接sql api常用呢?还是table api常用呢?哪个更推荐?包括表的定义也有sql ddl方式,以及table
> api提供的一套方式。以及DML也一样,table api提供了一套更加面向对象的语法,sql
> api则提供了更高层次的抽象,同时类sql(技能栈更通用)。
>
> (4)
>  Flink-json这种格式包一般打包到任务jar中?还是提前放到sql-client的相关lib中?哪个更推荐。还有各种connector。
>
> 今天有个实验,报错就是没这些包,加了lib就ok啦。这个有一点不是很懂,是这些包仅client需要,还是说我通过sql-client.sh
> 的-l参数指定的lib目录会被动态上传到集群呢?毕竟这些包我并没有手动上传到集群,只是在client端有,并通过-l指定之后就ok可用了。
>
> (5)
>  
> 在Hive中实践,更多的是提前定义好的各种表结构,然后策略同学直接写sql做各种数据查询和分析。在FlinkSQL场景下,是不是每个代码中定义自己的临时表更常见呢?毕竟有Kafka这种动态表,每次启动一个任务的开始消费位置可能也不一样。
>


Re: flink1.11错误

2020-08-12 文章 Xingbo Huang
Hi,

我网上查了一下,这个似乎是你Python环境的问题,你可以看看这个和你是不是类似的
https://blog.csdn.net/m0_38024592/article/details/88410878

Best,
Xingbo

小学生 <201782...@qq.com> 于2020年8月13日周四 上午11:05写道:

> 各位大佬,使用pyflink自定义udf,运行中出现这个错误,这个怎么解决呢
> Caused by: java.io.IOException: Failed to execute the command: python -c
> import pyflink;import
> os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)),
> 'bin'))
> output: Could not find platform independent libraries  Could not find platform dependent libraries  Consider setting $PYTHONHOME to  Fatal Python error: Py_Initialize: Unable to get the locale encoding
> ModuleNotFoundError: No module named 'encodings'


Re: 关于FlinkSQL的一些最佳实践咨询

2020-08-12 文章 Shengkai Fang
针对(3)社区建议使用sql api, table api现在正准备重构。

靳亚洽  于2020年8月13日周四 上午11:00写道:

> 针对2, 我们公司采用的是通过提交一个解析flink sql的jar包,把sql作为jar包的参数来执行的。采用接口机将用户的udf
> jar包和平台提供的解析flink sql的jar包提交到集群。
> 针对3, 既然使用了flink sql,当然期望sql搞定一切了
> 针对4,
> 我们改造了下flink-client模块的一小块代码,支持提交多jar包到集群,所以connector那些包就通过这种方式上传了。当然提前放集群上面也是可以的。
>
>
> | |
> 靳亚洽
> |
> |
> jinya...@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年08月13日 10:17,Zhao,Yi(SEC) 写道:
> 没有小伙伴帮忙看下这个问题吗。可以优先看下第2、3、4个问题。
>
> 其他问题,第1/5个问题,我自己大概感觉,直接使用memory
> catalog就ok啦。我看了下FlinkSQL好像没有临时表的概念。10个SQL开10个sql-client的session,各个session独享自己的memory
> catalog,创建的表也类似于临时表的效果。
>
> 发件人: "Zhao,Yi(SEC)" 
> 日期: 2020年8月12日 星期三 下午2:20
> 收件人: "user-zh@flink.apache.org" 
> 主题: 关于FlinkSQL的一些最佳实践咨询
>
> 最近刚刚开始研究FlinkSQL,之前一直用的DataStream API,现在想尝试下FlinkSQL的玩法。
> 目前简单看下来,感觉相对来说相比DataStream API不够灵活,所以有一些实践疑问咨询下。
>
> (1) 通常Kafka的动态表会通过临时表的方式写在程序开头声明定义呢?还是直接写在sql-client的配置文件中呢。
>
>
> 如果写在配置文件,好处是类似于一个固化的表定义一样,每次执行sql-client的cli之后可以马上查询相关数据。但是有个问题是这种情况下kafka设置读取earliest还是latest?貌似根据具体查询需求临时改配置好像也不是很方便。
>
>
> 写临时表是不是更常用?比如你这个程序目的就是读取指定时间开始的数据,那就在代码中定义这个临时表的时候就指定了offset。如果是需要从最早开始读,代码中定义临时表就定义为最早。
>
> (2)
>  
> FlinkSQL生产中,主要是通过代码方式执行呢?还是将相关jar都放好到固定的机器,然后每次sql-client的命令行进去,然后执行一些sql(提交任务)。
>
> (3) 代码中,直接sql api常用呢?还是table api常用呢?哪个更推荐?包括表的定义也有sql ddl方式,以及table
> api提供的一套方式。以及DML也一样,table api提供了一套更加面向对象的语法,sql
> api则提供了更高层次的抽象,同时类sql(技能栈更通用)。
>
> (4)
>  Flink-json这种格式包一般打包到任务jar中?还是提前放到sql-client的相关lib中?哪个更推荐。还有各种connector。
>
> 今天有个实验,报错就是没这些包,加了lib就ok啦。这个有一点不是很懂,是这些包仅client需要,还是说我通过sql-client.sh
> 的-l参数指定的lib目录会被动态上传到集群呢?毕竟这些包我并没有手动上传到集群,只是在client端有,并通过-l指定之后就ok可用了。
>
> (5)
>  
> 在Hive中实践,更多的是提前定义好的各种表结构,然后策略同学直接写sql做各种数据查询和分析。在FlinkSQL场景下,是不是每个代码中定义自己的临时表更常见呢?毕竟有Kafka这种动态表,每次启动一个任务的开始消费位置可能也不一样。
>


TableColumn为啥不包含comment

2020-08-12 文章 Harold.Miao
hi all
我发现TableColumn class不包含column comment  , 给开发带来了一点麻烦,请教大家一下,谢谢


-- 

Best Regards,
Harold Miao


flink1.11????

2020-08-12 文章 ??????
??pyflink??udf
Caused by: java.io.IOException: Failed to execute the command: python -c import 
pyflink;import 
os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)), 
'bin'))
output: Could not find platform independent libraries 

回复: 关于FlinkSQL的一些最佳实践咨询

2020-08-12 文章 靳亚洽
针对2, 我们公司采用的是通过提交一个解析flink sql的jar包,把sql作为jar包的参数来执行的。采用接口机将用户的udf 
jar包和平台提供的解析flink sql的jar包提交到集群。
针对3, 既然使用了flink sql,当然期望sql搞定一切了
针对4, 
我们改造了下flink-client模块的一小块代码,支持提交多jar包到集群,所以connector那些包就通过这种方式上传了。当然提前放集群上面也是可以的。


| |
靳亚洽
|
|
jinya...@163.com
|
签名由网易邮箱大师定制


在2020年08月13日 10:17,Zhao,Yi(SEC) 写道:
没有小伙伴帮忙看下这个问题吗。可以优先看下第2、3、4个问题。

其他问题,第1/5个问题,我自己大概感觉,直接使用memory 
catalog就ok啦。我看了下FlinkSQL好像没有临时表的概念。10个SQL开10个sql-client的session,各个session独享自己的memory
 catalog,创建的表也类似于临时表的效果。

发件人: "Zhao,Yi(SEC)" 
日期: 2020年8月12日 星期三 下午2:20
收件人: "user-zh@flink.apache.org" 
主题: 关于FlinkSQL的一些最佳实践咨询

最近刚刚开始研究FlinkSQL,之前一直用的DataStream API,现在想尝试下FlinkSQL的玩法。
目前简单看下来,感觉相对来说相比DataStream API不够灵活,所以有一些实践疑问咨询下。

(1) 通常Kafka的动态表会通过临时表的方式写在程序开头声明定义呢?还是直接写在sql-client的配置文件中呢。

如果写在配置文件,好处是类似于一个固化的表定义一样,每次执行sql-client的cli之后可以马上查询相关数据。但是有个问题是这种情况下kafka设置读取earliest还是latest?貌似根据具体查询需求临时改配置好像也不是很方便。

写临时表是不是更常用?比如你这个程序目的就是读取指定时间开始的数据,那就在代码中定义这个临时表的时候就指定了offset。如果是需要从最早开始读,代码中定义临时表就定义为最早。

(2) 
FlinkSQL生产中,主要是通过代码方式执行呢?还是将相关jar都放好到固定的机器,然后每次sql-client的命令行进去,然后执行一些sql(提交任务)。

(3) 代码中,直接sql api常用呢?还是table api常用呢?哪个更推荐?包括表的定义也有sql ddl方式,以及table 
api提供的一套方式。以及DML也一样,table api提供了一套更加面向对象的语法,sql api则提供了更高层次的抽象,同时类sql(技能栈更通用)。

(4) Flink-json这种格式包一般打包到任务jar中?还是提前放到sql-client的相关lib中?哪个更推荐。还有各种connector。

今天有个实验,报错就是没这些包,加了lib就ok啦。这个有一点不是很懂,是这些包仅client需要,还是说我通过sql-client.sh 
的-l参数指定的lib目录会被动态上传到集群呢?毕竟这些包我并没有手动上传到集群,只是在client端有,并通过-l指定之后就ok可用了。

(5) 
在Hive中实践,更多的是提前定义好的各种表结构,然后策略同学直接写sql做各种数据查询和分析。在FlinkSQL场景下,是不是每个代码中定义自己的临时表更常见呢?毕竟有Kafka这种动态表,每次启动一个任务的开始消费位置可能也不一样。


Re: Flink sql TUMBLE window 不支持offset吗

2020-08-12 文章 Benchao Li
不管是SQL还是DataStream,底层最终用来划分窗口的时候的时间,都是unix timestamp。
SQL里面的Timestamp类型,是可以带着时区的,但是转成unix timestamp表示的时候,时区信息就丢掉了。

Zhao,Yi(SEC)  于2020年8月13日周四 上午10:12写道:

> 大概懂你意思,sql情况下event time都是Timestamp数据类型,这是一种-MM-dd
> HH:mm:ss格式的,从这个角度讲的确是不需要考虑offset的问题(只要周期是1min/1h/1d等类似)。只需要这个Timestamp是我需要的时区格式的日期即可。
> 其实我是联系DatastreamAPI的,因为DatastreamAPI中是基于unix timestamp(时间戳)来划分的。
>
> 在 2020/8/12 下午9:21,“Benchao Li” 写入:
>
> Hi,
>
> 目前还没有支持offset,有一个issue[1] 在跟进解决这个问题。
> 但是你这个case应该是不需要offset就可以的,现在划分窗口的逻辑是直接按照毫秒级别的时间戳来整除进行计算的。
> 所以如果窗口是1min/1h/1d 这种,都是整点的窗口,没有问题的。如果你有这个问题,那应该是你的时间字段有时区问题。
> 有一种case是需要offset的,比如一周这种窗口,因为1980-1-1是周四,所以一周的窗口是从周四开始的。
>
> [1] https://issues.apache.org/jira/browse/FLINK-17767
>
> Zhao,Yi(SEC)  于2020年8月12日周三 下午8:15写道:
>
> > 如题,是不是不支持offset呀。在中国,天窗口实际应该是 INTVERVAL ‘1’ DAY + 8小时offset。
> > 但是看了文档没发现添加offset的语法。
> >
> >
> > 如果仅仅是时间显示问题还好说,搞个函数转一转,但分窗这个不提供入口没办法。
> >
>
>
> --
>
> Best,
> Benchao Li
>
>
>

-- 

Best,
Benchao Li


Re: 使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes

2020-08-12 文章 xiao cai
Dear Leonard Xu:
我会去关注这个issue,非常感谢答疑。


 原始邮件 
发件人: Leonard Xu
收件人: user-zh
发送时间: 2020年8月12日(周三) 16:05
主题: Re: 使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update 
changes


Hi Group by 和 left join 都是会有 retract 消息的,这类消息需要UpsertStreamTableSink才能处理, Kafka 
connetor 目前的实现是AppendStreamTableSink,所以不能处理 社区已经有一个issue在处理这个问题了,应该1.12能提供这个功能。 
Best Leonard [1]https://issues.apache.org/jira/browse/FLINK-18826 
<">https://issues.apache.org/jira/browse/FLINK-18826> > 在 2020年8月12日,15:58,xiao 
cai  写道: > > Hi Jark: > 版本:1.11.0 > 问题:flink-sql,数据经过group by 
和left join后写入kafka sink,会在语法校验阶段报错: > AppendStreamTableSink doesn't support 
consuming update changes which is produced by node GroupAggregate > > > 
我希望能够在sql校验时也能使upsert操作作用于kafka sink,或者等upsert完成,再写入kafka > > > 附上执行sql: > 
create table kafka_table_1 ( > `shop_id` varchar, > `user_id` bigint, > 
`category_id` int, > `ts` bigint, > `row_time` timestamp(3), > `proc_time` 
timestamp(3), > ) with ( > 'connector.type' = 'kafka', > 'connector.version' = 
'universal', > 'connector.topic' = 'user_visit_1', > 'connector.startup-mode' = 
'latest-offset', > 'connector.properties.bootstrap.servers' = 'ip:9092', > 
'connector.properties.zookeeper.connect' = 'ip:2181', > 'update-mode' = 
'append', > 'format.type' = 'avro-registry', > 'format.schema-subject' = 
'user_visit', > 'format.schema-url'='http://ip:8081', > ) > > > CREATE TABLE 
hbase_table ( > rowKey STRING, > cf ROW > ) WITH ( > 
'connector.type' = 'hbase', > 'connector.version' = '1.4.3', > 
'connector.table-name' = 'hbase_table', > 'connector.zookeeper.quorum' = 
'ip:2181', > 'connector.zookeeper.znode.parent' = '/hbase', > 
'connector.write.buffer-flush.max-rows' = '1000' > ) > > > > > create table 
kafka_table_2 ( > `shop_id` varchar, > `age` varchar, > `area` varchar > ) with 
( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 
'connector.topic' = 'user_visit_2', > 'connector.startup-mode' = 
'latest-offset', > 'connector.properties.bootstrap.servers' = 'ip:9092', > 
'connector.properties.zookeeper.connect' = 'ip:2181', > 'update-mode' = 
'append', > 'format.type' = 'avro-registry', > 'format.schema-subject' = 
'user_visit', > 'format.schema-url'='http://ip:8081', > ) > > > insert into 
kafka_table_2(shop_id, user_id, category_id, ts, row_time, proc_time) > select 
shop_id, age, area > from kafka_table_1 left join hbase_table > for system_time 
as of kafka_table_1.proc_time as temp on kafka_table_1.shop_id = temp.rowKey > 
group by shop_id, age, area > > > 原始邮件 > 发件人: xiao cai > 收件人: 
user-zh > 发送时间: 2020年8月12日(周三) 15:41 > 主题: 
AppendStreamTableSink doesn't support consuming update changes > > > Hi Jark: 
版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错: 
AppendStreamTableSink doesn't support consuming update changes which is 
produced by node GroupAggregate

Re: flink 1.11 udtf动态定义返回类型

2020-08-12 文章 Zou Dan
动态定义你指的是说字段的类型和数量都不是固定的吗?这个应该是不行的。你举的 1.10 例子也不是动态的呀

> 2020年8月12日 下午5:32,李强  写道:
> 
> flink 1.11 udtf可以像1.10那样自定义返回类型不我希望可以像flink 1.10这样:
> 
> @Override
>   public TypeInformation return new RowTypeInfo(Types.STRING, 
> Types.STRING);
>   }
> 
> 
> 不希望像flink 1.11这样
> @FunctionHint(output = @DataTypeHint("ROW 
> 
> udtf返回的字段个数和类型我们希望是可以动态的定义,就想flink 1.10那样




Re:Re:Re: 用hive streaming写 orc文件的问题

2020-08-12 文章 flink小猪
添加不了附件,我就直接贴代码了

import java.time.Duration


import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect, TableResult}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.catalog.hive.HiveCatalog




/**
  * author dinghh
  * time 2020-08-11 17:03
  */
object WriteHiveStreaming {
def main(args: Array[String]): Unit = {


val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.setParallelism(3)


val tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(streamEnv, 
tableEnvSettings)

tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
 CheckpointingMode.EXACTLY_ONCE)

tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
 Duration.ofSeconds(20))






val catalogName = "my_catalog"
val catalog = new HiveCatalog(
catalogName,  // catalog name
"default",// default database

"D:\\ideaspace\\data-integrate-bigdata\\flink-restart\\flink-sql\\src\\main\\resources",
  // Hive config (hive-site.xml) directory
"1.1.0"   // Hive version
)
tableEnv.registerCatalog(catalogName, catalog)
tableEnv.useCatalog(catalogName)




//删除流表
tableEnv.executeSql(
"""
  |DROP TABLE IF EXISTS `stream_db`.`datagen_user`
""".stripMargin)


//创建流表
tableEnv.executeSql(
"""
  |CREATE TABLE `stream_db`.`datagen_user` (
  | id INT,
  | name STRING,
  | dt AS localtimestamp,
  | WATERMARK FOR dt AS dt
  |) WITH (
  | 'connector' = 'datagen',
  | 'rows-per-second'='10',
  | 'fields.id.kind'='random',
  | 'fields.id.min'='1',
  | 'fields.id.max'='1000',
  | 'fields.name.length'='5'
  |)
""".stripMargin)


//切换hive方言
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)


//删除hive orc表
tableEnv.executeSql(
"""
  |DROP TABLE IF EXISTS `default`.`hive_user_orc`
  |
""".stripMargin)


//创建hive orc表
tableEnv.executeSql(
"""
  |CREATE TABLE `default`.`hive_user_orc` (
  |  id INT,
  |  name STRING
  |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute STRING 
) STORED AS ORC TBLPROPERTIES (
  |  'partition.time-extractor.timestamp-pattern'='$ts_dt 
$ts_hour:$ts_minute:00.000',
  |  'sink.partition-commit.trigger'='partition-time',
  |  'sink.partition-commit.delay'='1 min',
  |  'sink.partition-commit.policy.kind'='metastore,success-file'
  |)
""".stripMargin)


//删除hive parquet表
tableEnv.executeSql(
"""
  |DROP TABLE IF EXISTS `default`.`hive_user_parquet`
""".stripMargin)
//创建hive parquet表
tableEnv.executeSql(
"""
  |CREATE TABLE `default`.`hive_user_parquet` (
  |  id INT,
  |  name STRING
  |) PARTITIONED BY (ts_dt STRING, ts_hour STRING,ts_minute STRING) 
STORED AS PARQUET TBLPROPERTIES (
  |  'partition.time-extractor.timestamp-pattern'='$ts_dt 
$ts_hour:$ts_minute:00.000',
  |  'sink.partition-commit.trigger'='partition-time',
  |  'sink.partition-commit.delay'='1 min',
  |  'sink.partition-commit.policy.kind'='metastore,success-file'
  |)
""".stripMargin)
//设置flink方言
tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
//流式写入orc表
tableEnv.executeSql(
"""
  |INSERT INTO `default`.`hive_user_orc`
  |SELECT
  |id,name,
  |DATE_FORMAT(dt,'-MM-dd'),
  |DATE_FORMAT(dt,'HH'),
  |DATE_FORMAT(dt,'mm')
  |FROM
  |stream_db.datagen_user
""".stripMargin)
//流式写入parquet表
tableEnv.executeSql(
"""
  |INSERT INTO `default`.`hive_user_parquet`
  |SELECT
  |id,name,
  |DATE_FORMAT(dt,'-MM-dd'),
  |DATE_FORMAT(dt,'HH'),
  |

Re: 关于FlinkSQL的一些最佳实践咨询

2020-08-12 文章 Zhao,Yi(SEC)
没有小伙伴帮忙看下这个问题吗。可以优先看下第2、3、4个问题。

其他问题,第1/5个问题,我自己大概感觉,直接使用memory 
catalog就ok啦。我看了下FlinkSQL好像没有临时表的概念。10个SQL开10个sql-client的session,各个session独享自己的memory
 catalog,创建的表也类似于临时表的效果。

发件人: "Zhao,Yi(SEC)" 
日期: 2020年8月12日 星期三 下午2:20
收件人: "user-zh@flink.apache.org" 
主题: 关于FlinkSQL的一些最佳实践咨询

最近刚刚开始研究FlinkSQL,之前一直用的DataStream API,现在想尝试下FlinkSQL的玩法。
目前简单看下来,感觉相对来说相比DataStream API不够灵活,所以有一些实践疑问咨询下。

(1) 通常Kafka的动态表会通过临时表的方式写在程序开头声明定义呢?还是直接写在sql-client的配置文件中呢。

如果写在配置文件,好处是类似于一个固化的表定义一样,每次执行sql-client的cli之后可以马上查询相关数据。但是有个问题是这种情况下kafka设置读取earliest还是latest?貌似根据具体查询需求临时改配置好像也不是很方便。

写临时表是不是更常用?比如你这个程序目的就是读取指定时间开始的数据,那就在代码中定义这个临时表的时候就指定了offset。如果是需要从最早开始读,代码中定义临时表就定义为最早。

(2) 
FlinkSQL生产中,主要是通过代码方式执行呢?还是将相关jar都放好到固定的机器,然后每次sql-client的命令行进去,然后执行一些sql(提交任务)。

(3) 代码中,直接sql api常用呢?还是table api常用呢?哪个更推荐?包括表的定义也有sql ddl方式,以及table 
api提供的一套方式。以及DML也一样,table api提供了一套更加面向对象的语法,sql api则提供了更高层次的抽象,同时类sql(技能栈更通用)。

(4) Flink-json这种格式包一般打包到任务jar中?还是提前放到sql-client的相关lib中?哪个更推荐。还有各种connector。

今天有个实验,报错就是没这些包,加了lib就ok啦。这个有一点不是很懂,是这些包仅client需要,还是说我通过sql-client.sh 
的-l参数指定的lib目录会被动态上传到集群呢?毕竟这些包我并没有手动上传到集群,只是在client端有,并通过-l指定之后就ok可用了。

(5) 
在Hive中实践,更多的是提前定义好的各种表结构,然后策略同学直接写sql做各种数据查询和分析。在FlinkSQL场景下,是不是每个代码中定义自己的临时表更常见呢?毕竟有Kafka这种动态表,每次启动一个任务的开始消费位置可能也不一样。


Re: Flink sql TUMBLE window 不支持offset吗

2020-08-12 文章 Zhao,Yi(SEC)
大概懂你意思,sql情况下event time都是Timestamp数据类型,这是一种-MM-dd 
HH:mm:ss格式的,从这个角度讲的确是不需要考虑offset的问题(只要周期是1min/1h/1d等类似)。只需要这个Timestamp是我需要的时区格式的日期即可。
其实我是联系DatastreamAPI的,因为DatastreamAPI中是基于unix timestamp(时间戳)来划分的。

在 2020/8/12 下午9:21,“Benchao Li” 写入:

Hi,

目前还没有支持offset,有一个issue[1] 在跟进解决这个问题。
但是你这个case应该是不需要offset就可以的,现在划分窗口的逻辑是直接按照毫秒级别的时间戳来整除进行计算的。
所以如果窗口是1min/1h/1d 这种,都是整点的窗口,没有问题的。如果你有这个问题,那应该是你的时间字段有时区问题。
有一种case是需要offset的,比如一周这种窗口,因为1980-1-1是周四,所以一周的窗口是从周四开始的。

[1] https://issues.apache.org/jira/browse/FLINK-17767

Zhao,Yi(SEC)  于2020年8月12日周三 下午8:15写道:

> 如题,是不是不支持offset呀。在中国,天窗口实际应该是 INTVERVAL ‘1’ DAY + 8小时offset。
> 但是看了文档没发现添加offset的语法。
>
>
> 如果仅仅是时间显示问题还好说,搞个函数转一转,但分窗这个不提供入口没办法。
>


-- 

Best,
Benchao Li




flink 1.11 udtf只能通过注解定义输出字段个数和类型吗?

2020-08-12 文章 李强
各位大佬你们好,我想请教一个问题:
flink 1.11 
udtf只能通过注解定义输出字段个数和类型吗,好像不能再通过flink1.10那样重写getResultType方法来定义输出字段类型了,flink1.11里使用getResultType会报错如下:
Caused by: org.apache.flink.table.api.ValidationException: Error in extracting 
a signature to output mapping.
at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:115)
at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:170)
at 
org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:158)
... 22 more
Caused by: org.apache.flink.table.api.ValidationException: Unable to extract a 
type inference from method:
public void 
com.skyon.main.TestFunction.eval(java.lang.String,java.lang.String,long)
at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:172)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:110)
... 24 more
Caused by: org.apache.flink.table.api.ValidationException: Could not extract a 
data type from 'class org.apache.flink.types.Row' in generic class 
'org.apache.flink.table.functions.TableFunction' in class 
com.skyon.main.TestFunction. Please pass the required data type manually or 
allow RAW types.
at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:246)
at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:224)
at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:198)
at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromGeneric(DataTypeExtractor.java:119)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createGenericResultExtraction$13(FunctionMappingExtractor.java:443)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:309)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:252)
at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:164)
... 25 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot extract a 
data type from a pure 'org.apache.flink.types.Row' class. Please use 
annotations to define field names and field types.
at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:313)
at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:305)
at 
org.apache.flink.table.types.extraction.DataTypeExtractor.checkForCommonErrors(DataTypeExtractor.java:343)
at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:277)
at 
org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:238)
... 32 more

 有什么办法动态定义udtf的输出字段个数和类型呢,谢谢了!

flink 1.11 发布sql任务到yarn session报java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.hbase.util.ByteStringer

2020-08-12 文章 wind.fly....@outlook.com
Hi, all:
 
本人试图将flink-sql-gateway(https://github.com/ververica/flink-sql-gateway)升级到1.11支持版本,将flink
 sql(用到hbase connector)提交到yarn session后运行时报:
org.apache.hadoop.hbase.DoNotRetryIOException: 
java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.hadoop.hbase.util.ByteStringer
at 
org.apache.hadoop.hbase.client.RpcRetryingCaller.translateException(RpcRetryingCaller.java:248)
at 
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:221)
at 
org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:388)
at 
org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:362)
at 
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:142)
at 
org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture.run(ResultBoundedCompletionService.java:80)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.hadoop.hbase.util.ByteStringer
at 
org.apache.hadoop.hbase.protobuf.RequestConverter.buildRegionSpecifier(RequestConverter.java:1053)
at 
org.apache.hadoop.hbase.protobuf.RequestConverter.buildScanRequest(RequestConverter.java:496)
at 
org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:402)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:274)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:62)
at 
org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:219)
... 7 more

经过搜索怀疑可能是因为hbase-protobuf依赖的protobuf-java版本不对,但我怎么查看运行时jm、tm对应的classpath是什么样的,依赖了什么样的jar,希望给出分析思路或方法,谢谢。


Re:Re: 用hive streaming写 orc文件的问题

2020-08-12 文章 flink小猪



尴尬,我明明上传了附件但是找不到了- -,我又上传了一次。




1.写orc和写parquet的作业在同一个作业中,并没有报错,但是hive中查不到数据,在hdfs目录里面有但是并没有按照checkpoint间隔生成,也没有生成_success文件。
2.没有设置table.exec.hive.fallback-mapred-writer。
以下是我的几个疑问
1.为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要?
2.sql client 我想要设置checkpoint生成间隔我应该在哪里设置? 以下是hdfs目录图片
 
这是orc生成的文件

这是parquet生成的文件





在 2020-08-12 17:33:30,"Rui Li"  写道:
>Hi,
>
>写orc表的作业有报错么?还是成功执行但是hive查不到数据呢?
>看不到你贴的代码,有没有设置table.exec.hive.fallback-mapred-writer?
>
>On Wed, Aug 12, 2020 at 5:14 PM 18579099...@163.com <18579099...@163.com>
>wrote:
>
>>
>>
>> 我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到,
>>
>> 但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多,
>> 而且不会根据checkpoint间隔生成(parquet符合checkpoint间隔)。而且需要导入flink-orc_2.11jar包(parquet不需要),否则报Exception
>> in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/orc/TypeDescription错误。并且parquet每间隔checkpoint interval
>> 会输出parquet相关的日志,而orc的并没有日志产生,请问是什么原因?我已贴上代码。
>> --
>> 18579099...@163.com
>>
>
>
>-- 
>Best regards!
>Rui Li


Re: 关于Flink CDC问题第二弹

2020-08-12 文章 陈韬
谢谢,那我等下再试一下


Best,
TonyChen

> 2020年8月12日 下午10:27,Jark Wu  写道:
> 
> 1. 是的。目前JDK11还不支持。因为用了 Unsafe 的一些方法。 这个会在未来修复。
> 2. 第二个问题貌似是类冲突的问题。 flink-cdc-connectors 依赖了Kafka Connect 2.5.0,并且没有做
> shade。如果你的项目中也依赖了Kafka Connect,要保持版本一致。
> 
> Best,
> Jark
> 
> On Wed, 12 Aug 2020 at 20:07, 陈韬  wrote:
> 
>> hi everyone:
>> 
>> 今天试用了下1.11.1下的cdc功能,遇到几个问题,来请教一下大家。
>> 
>> 环境为 https://github.com/ververica/flink-cdc-connectors/wiki/中文教程 <
>> https://github.com/ververica/flink-cdc-connectors/wiki/%E4%B8%AD%E6%96%87%E6%95%99%E7%A8%8B>
>> 中提供的 docker-compose.yml
>> 
>> 问题1:通过sql-cli提交了flink-sql,之后修改本地mysql中的数据时,报错。是否对jdk版本有要求?
>> Caused by: java.lang.NoSuchMethodError:
>> sun.misc.Unsafe.monitorEnter(Ljava/lang/Object;)V
>>at
>> com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:101)
>>at
>> io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:81)
>>at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:810)
>>at
>> io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:170)
>>at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>at java.base/java.lang.Thread.run(Thread.java:834)
>> 我的JDK版本如下
>> openjdk version "11.0.2" 2019-01-15
>> OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
>> OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)
>> 
>> 问题2:通过代码方式提交flink-sql,(我自己实现的一个类似
>> https://github.com/ververica/flink-sql-gateway的项目)总是报错。测试sql为
>> https://github.com/ververica/flink-cdc-connectors/wiki/中文教程 <
>> https://github.com/ververica/flink-cdc-connectors/wiki/%E4%B8%AD%E6%96%87%E6%95%99%E7%A8%8B>
>> 中的demo的三流join的那个flink sql
>> 
>> 报错信息如下:
>> 
>> 2020-08-12 19:18:51,034 WARN  org.apache.flink.runtime.taskmanager.Task
>> - Source: TableSourceScan(table=[[default_catalog,
>> default_database, orders]], fields=[order_id, order_date, customer_name,
>> price, product_id, order_status]) (1/1) (f2a72e33dc272266408e47d60724b078)
>> switched from RUNNING to FAILED.
>> java.lang.AbstractMethodError: Receiver class
>> org.apache.kafka.connect.json.JsonSerializer does not define or inherit an
>> implementation of the resolved method abstract configure(Ljava/util/Map;Z)V
>> of interface org.apache.kafka.common.serialization.Serializer.
>>at
>> org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
>>at
>> org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
>>at
>> io.debezium.embedded.EmbeddedEngine.(EmbeddedEngine.java:582)
>>at
>> io.debezium.embedded.EmbeddedEngine.(EmbeddedEngine.java:79)
>>at
>> io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:300)
>>at
>> io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:216)
>>at
>> io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
>>at
>> com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
>>at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>> 
>> 
>> 
>> Best,
>> TonyChen
>> 
>> 



Re: 关于Flink CDC问题第二弹

2020-08-12 文章 Jark Wu
1. 是的。目前JDK11还不支持。因为用了 Unsafe 的一些方法。 这个会在未来修复。
2. 第二个问题貌似是类冲突的问题。 flink-cdc-connectors 依赖了Kafka Connect 2.5.0,并且没有做
shade。如果你的项目中也依赖了Kafka Connect,要保持版本一致。

Best,
Jark

On Wed, 12 Aug 2020 at 20:07, 陈韬  wrote:

> hi everyone:
>
> 今天试用了下1.11.1下的cdc功能,遇到几个问题,来请教一下大家。
>
> 环境为 https://github.com/ververica/flink-cdc-connectors/wiki/中文教程 <
> https://github.com/ververica/flink-cdc-connectors/wiki/%E4%B8%AD%E6%96%87%E6%95%99%E7%A8%8B>
> 中提供的 docker-compose.yml
>
> 问题1:通过sql-cli提交了flink-sql,之后修改本地mysql中的数据时,报错。是否对jdk版本有要求?
> Caused by: java.lang.NoSuchMethodError:
> sun.misc.Unsafe.monitorEnter(Ljava/lang/Object;)V
> at
> com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:101)
> at
> io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:81)
> at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:810)
> at
> io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:170)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:834)
> 我的JDK版本如下
> openjdk version "11.0.2" 2019-01-15
> OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
> OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)
>
> 问题2:通过代码方式提交flink-sql,(我自己实现的一个类似
> https://github.com/ververica/flink-sql-gateway的项目)总是报错。测试sql为
> https://github.com/ververica/flink-cdc-connectors/wiki/中文教程 <
> https://github.com/ververica/flink-cdc-connectors/wiki/%E4%B8%AD%E6%96%87%E6%95%99%E7%A8%8B>
> 中的demo的三流join的那个flink sql
>
> 报错信息如下:
>
> 2020-08-12 19:18:51,034 WARN  org.apache.flink.runtime.taskmanager.Task
>  - Source: TableSourceScan(table=[[default_catalog,
> default_database, orders]], fields=[order_id, order_date, customer_name,
> price, product_id, order_status]) (1/1) (f2a72e33dc272266408e47d60724b078)
> switched from RUNNING to FAILED.
> java.lang.AbstractMethodError: Receiver class
> org.apache.kafka.connect.json.JsonSerializer does not define or inherit an
> implementation of the resolved method abstract configure(Ljava/util/Map;Z)V
> of interface org.apache.kafka.common.serialization.Serializer.
> at
> org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
> at
> org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
> at
> io.debezium.embedded.EmbeddedEngine.(EmbeddedEngine.java:582)
> at
> io.debezium.embedded.EmbeddedEngine.(EmbeddedEngine.java:79)
> at
> io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:300)
> at
> io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:216)
> at
> io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
> at
> com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>
>
>
> Best,
> TonyChen
>
>


Re: FlinkSQL even time 定义问题

2020-08-12 文章 Shengkai Fang
对于第一个问题- 在查询语句之中定义watermark:
现在并不支持。这主要是由于在同一个作业之中,如果select的数据源是同一个表,那么在实际的优化过程之中,会将source进行复用,而现在同一个source并不支持多个watermark
assigner。如果在不同的作业之中,那么显然只要修改watermark的定义语句即可。
对于第二个问题:rowtime的定义是必须建立在创建表的过程之中的。
对于第三个问题:社区正在讨论这个问题。现在仅支持多个insert的sql在同一个job之中。


Zhao,Yi(SEC)  于2020年8月12日周三 下午5:36写道:

> 咨询下,FlinkSQl的event time必须在DDL中定义吗。能否DDL只是定义普通数据字段,比如有个time属性。
> 然后在select 的时候指定具体使用的watermark策略。
> 目的:假设基于同一个表A,我查询1需要使用watermark为time-1min,查询2需要使用watermark为time-2min。
>
> 其次除了这种case,如果我基于表1查询得到结果输出到表2,那么表2的event
> time定义呢?比如在表2的定义中基于表2的某个属性(比如叫time2),然后插入表2的时候只要time2属性存在就可以?
>
>
> 此外,如果对比datastream api的watermark传播机制,如果我希望查询1结构输出到表2,然后继续基于表2查询。貌似就需要
> select xxx from (select yyy from t); 这种嵌套写法,一句sql会变成一个任务。
> 那如何用sql做非常复杂的任务组合呢,比如我是2句不搭嘎的sql的,就希望在同一个job中呢。
>


Re:Re: ​请问在flinksql中如何使用聚合函数 LISTAGG

2020-08-12 文章 chenxuying
好的 , 原来是bug , 感谢回答


在 2020-08-12 21:32:40,"Benchao Li"  写道:
>看起来是一个已知bug[1],已经修复,但是还没有发布。
>
>[1] https://issues.apache.org/jira/browse/FLINK-18862
>
>chenxuying  于2020年8月12日周三 下午9:25写道:
>
>> 版本:
>> flinksql 1.11.0
>> 需求:
>> 需要实现多行聚合成一行功能
>> 代码如下:
>> environmentSettings =
>> EnvironmentSettings.new_instance().in_streaming_mode().build()
>> t_env = StreamTableEnvironment.create(environment_settings =
>> environmentSettings)
>> t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
>> 'true')
>>
>>
>> a_df = pd.DataFrame({"id":["1","1","1"],"uuid":["1","2","3"]})
>> a_table = t_env.from_pandas(a_df,
>> DataTypes.ROW([DataTypes.FIELD("id",
>> DataTypes.STRING()),
>>  DataTypes.FIELD("uuid",
>> DataTypes.STRING())]))
>> t_env.create_temporary_view("table_a",a_table)
>>
>>
>> b_df = pd.DataFrame({"val":["3","4","5","6"],"uuid":["1","2","3","4"]})
>> table_b = t_env.from_pandas(b_df ,
>> DataTypes.ROW([DataTypes.FIELD("val",
>> DataTypes.STRING()),
>>  DataTypes.FIELD("uuid",
>> DataTypes.STRING())]))
>> t_env.create_temporary_view("table_b",table_b)
>>
>>
>> t_env.sql_update("""
>> CREATE TABLE mySink (
>>
>> b varchar ,
>> c varchar
>> ) WITH (
>> 'connector' = 'print'
>> )
>> """)
>>
>>
>> t_env.sql_update("""
>> insert into mySink
>> select t1.id ,LISTAGG(t2.val , ',')
>> from table_a t1 left join table_b t2 on t1.uuid = t2.uuid
>> group by t1.id
>> """)
>> t_env.execute("tutorial_job")
>>
>>
>> 报错:
>> Caused by: java.lang.ClassCastException:
>> org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to
>> org.apache.flink.table.data.StringData at
>> org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)
>> at
>> org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139)
>> at org.apache.flink.table.data.RowData.get(RowData.java:273) at
>> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156)
>> at
>> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123)
>> at
>> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>> at
>> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205)
>> at
>> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
>> at
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$910/1247863497.runDefaultAction(Unknown
>> Source) at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at
>> java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>>
>
>-- 
>
>Best,
>Benchao Li


Re: ​请问在flinksql中如何使用聚合函数 LISTAGG

2020-08-12 文章 Benchao Li
看起来是一个已知bug[1],已经修复,但是还没有发布。

[1] https://issues.apache.org/jira/browse/FLINK-18862

chenxuying  于2020年8月12日周三 下午9:25写道:

> 版本:
> flinksql 1.11.0
> 需求:
> 需要实现多行聚合成一行功能
> 代码如下:
> environmentSettings =
> EnvironmentSettings.new_instance().in_streaming_mode().build()
> t_env = StreamTableEnvironment.create(environment_settings =
> environmentSettings)
> t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
> 'true')
>
>
> a_df = pd.DataFrame({"id":["1","1","1"],"uuid":["1","2","3"]})
> a_table = t_env.from_pandas(a_df,
> DataTypes.ROW([DataTypes.FIELD("id",
> DataTypes.STRING()),
>  DataTypes.FIELD("uuid",
> DataTypes.STRING())]))
> t_env.create_temporary_view("table_a",a_table)
>
>
> b_df = pd.DataFrame({"val":["3","4","5","6"],"uuid":["1","2","3","4"]})
> table_b = t_env.from_pandas(b_df ,
> DataTypes.ROW([DataTypes.FIELD("val",
> DataTypes.STRING()),
>  DataTypes.FIELD("uuid",
> DataTypes.STRING())]))
> t_env.create_temporary_view("table_b",table_b)
>
>
> t_env.sql_update("""
> CREATE TABLE mySink (
>
> b varchar ,
> c varchar
> ) WITH (
> 'connector' = 'print'
> )
> """)
>
>
> t_env.sql_update("""
> insert into mySink
> select t1.id ,LISTAGG(t2.val , ',')
> from table_a t1 left join table_b t2 on t1.uuid = t2.uuid
> group by t1.id
> """)
> t_env.execute("tutorial_job")
>
>
> 报错:
> Caused by: java.lang.ClassCastException:
> org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to
> org.apache.flink.table.data.StringData at
> org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169)
> at
> org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139)
> at org.apache.flink.table.data.RowData.get(RowData.java:273) at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123)
> at
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> at
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205)
> at
> org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
> at
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
> at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
> at 
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$910/1247863497.runDefaultAction(Unknown
> Source) at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at
> java.lang.Thread.run(Thread.java:745)
>
>
>
>

-- 

Best,
Benchao Li


​请问在flinksql中如何使用聚合函数 LISTAGG

2020-08-12 文章 chenxuying
版本:
flinksql 1.11.0
需求:
需要实现多行聚合成一行功能
代码如下:
environmentSettings = 
EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(environment_settings = 
environmentSettings)
t_env.get_config().get_configuration().set_string("python.fn-execution.memory.managed",
 'true')


a_df = pd.DataFrame({"id":["1","1","1"],"uuid":["1","2","3"]})
a_table = t_env.from_pandas(a_df,
DataTypes.ROW([DataTypes.FIELD("id", DataTypes.STRING()),
 DataTypes.FIELD("uuid", 
DataTypes.STRING())]))
t_env.create_temporary_view("table_a",a_table)


b_df = pd.DataFrame({"val":["3","4","5","6"],"uuid":["1","2","3","4"]})
table_b = t_env.from_pandas(b_df ,
DataTypes.ROW([DataTypes.FIELD("val", DataTypes.STRING()),
 DataTypes.FIELD("uuid", 
DataTypes.STRING())]))
t_env.create_temporary_view("table_b",table_b)


t_env.sql_update("""
CREATE TABLE mySink (   
 
b varchar ,
c varchar 
) WITH ( 
'connector' = 'print'   
) 
""")


t_env.sql_update("""
insert into mySink 
select t1.id ,LISTAGG(t2.val , ',') 
from table_a t1 left join table_b t2 on t1.uuid = t2.uuid
group by t1.id
""")
t_env.execute("tutorial_job")


报错:
Caused by: java.lang.ClassCastException: 
org.apache.flink.table.data.binary.BinaryRawValueData cannot be cast to 
org.apache.flink.table.data.StringData at 
org.apache.flink.table.data.GenericRowData.getString(GenericRowData.java:169) 
at org.apache.flink.table.data.JoinedRowData.getString(JoinedRowData.java:139) 
at org.apache.flink.table.data.RowData.get(RowData.java:273) at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156)
 at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123)
 at 
org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
 at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
 at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
 at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:205)
 at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
 at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
 at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$910/1247863497.runDefaultAction(Unknown
 Source) at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
 at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at 
java.lang.Thread.run(Thread.java:745)





Re: Flink sql TUMBLE window 不支持offset吗

2020-08-12 文章 Benchao Li
Hi,

目前还没有支持offset,有一个issue[1] 在跟进解决这个问题。
但是你这个case应该是不需要offset就可以的,现在划分窗口的逻辑是直接按照毫秒级别的时间戳来整除进行计算的。
所以如果窗口是1min/1h/1d 这种,都是整点的窗口,没有问题的。如果你有这个问题,那应该是你的时间字段有时区问题。
有一种case是需要offset的,比如一周这种窗口,因为1980-1-1是周四,所以一周的窗口是从周四开始的。

[1] https://issues.apache.org/jira/browse/FLINK-17767

Zhao,Yi(SEC)  于2020年8月12日周三 下午8:15写道:

> 如题,是不是不支持offset呀。在中国,天窗口实际应该是 INTVERVAL ‘1’ DAY + 8小时offset。
> 但是看了文档没发现添加offset的语法。
>
>
> 如果仅仅是时间显示问题还好说,搞个函数转一转,但分窗这个不提供入口没办法。
>


-- 

Best,
Benchao Li


Re: AppendStreamTableSink doesn't support consuming update changes

2020-08-12 文章 Benchao Li
Hi,

这个是正常现象。 如果你用了普通的group by的话,那么它的结果就是有更新的,所以需要sink支持写入update的结果,
但是kafka目前只能写入append的数据,所以会报上面的错误。
你可以尝试下用window group[1],在这个文档的第二个示例里,它的结果是append的,不会有更新。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#aggregations

xiao cai  于2020年8月12日周三 下午3:42写道:

> Hi Jark:
>
>
> 版本:1.11.0
> 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错:
> AppendStreamTableSink doesn't support consuming update changes which is
> produced by node GroupAggregate



-- 

Best,
Benchao Li


Re: flink 1.10.1 sql limit 不生效

2020-08-12 文章 Benchao Li
我有点迷惑了

1. 所以你现在的问题得到了解决了么?
2. 我看你最开始写的那个SQL里面并没有window呀,为啥后面又说用到了TUMBLE_START?


Peihui He  于2020年8月12日周三 下午3:40写道:

> Hi BenChao
>
> SQL是流模式下执行的,看着不生效的表现就是显示的数量超过limit的数量。
> order by TUMBLE_START desc 好像不是预期的结果
>
> 这个是需要给 TUMBLE_START 得到的timestamp 转为 long么?
>
> Best Wishes.
>
> Benchao Li  于2020年8月12日周三 下午3:12写道:
>
> > 你这个SQL是在流式还是批式模式下执行的呢?limit不生效的表现是什么呢?
> >
> > Peihui He  于2020年8月12日周三 下午3:03写道:
> >
> > > Hi all,
> > >
> > > 用zeppelin执行sql的时候发现用了 LEFT JOIN LATERAL TABLE 时候 limit 不生效
> > > sql 类似下面:
> > > select a, b, c, t from tb  LEFT JOIN LATERAL TABLE (Tf(a)) as T(c, d,
> e)
> > ON
> > > TRUE  order by t desc limit 10
> > >
> > > 如果select 结果中不包括c的化,就正常了
> > >
> > > 请问这个是什么问题呢?sql是写的不对么?
> > >
> > > Best Wishes.
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Flink sql TUMBLE window 不支持offset吗

2020-08-12 文章 Zhao,Yi(SEC)
如题,是不是不支持offset呀。在中国,天窗口实际应该是 INTVERVAL ‘1’ DAY + 8小时offset。
但是看了文档没发现添加offset的语法。


如果仅仅是时间显示问题还好说,搞个函数转一转,但分窗这个不提供入口没办法。


关于Flink CDC问题第二弹

2020-08-12 文章 陈韬
hi everyone:

今天试用了下1.11.1下的cdc功能,遇到几个问题,来请教一下大家。

环境为 https://github.com/ververica/flink-cdc-connectors/wiki/中文教程 

 中提供的 docker-compose.yml

问题1:通过sql-cli提交了flink-sql,之后修改本地mysql中的数据时,报错。是否对jdk版本有要求?
Caused by: java.lang.NoSuchMethodError: 
sun.misc.Unsafe.monitorEnter(Ljava/lang/Object;)V
at 
com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:101)
at 
io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:81)
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:810)
at 
io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:170)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
我的JDK版本如下
openjdk version "11.0.2" 2019-01-15
OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9, mixed mode)

问题2:通过代码方式提交flink-sql,(我自己实现的一个类似https://github.com/ververica/flink-sql-gateway的项目)总是报错。测试sql为https://github.com/ververica/flink-cdc-connectors/wiki/中文教程
 

 中的demo的三流join的那个flink sql

报错信息如下:

2020-08-12 19:18:51,034 WARN  org.apache.flink.runtime.taskmanager.Task 
- Source: TableSourceScan(table=[[default_catalog, 
default_database, orders]], fields=[order_id, order_date, customer_name, price, 
product_id, order_status]) (1/1) (f2a72e33dc272266408e47d60724b078) switched 
from RUNNING to FAILED.
java.lang.AbstractMethodError: Receiver class 
org.apache.kafka.connect.json.JsonSerializer does not define or inherit an 
implementation of the resolved method abstract configure(Ljava/util/Map;Z)V of 
interface org.apache.kafka.common.serialization.Serializer.
at 
org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:300)
at 
org.apache.kafka.connect.json.JsonConverter.configure(JsonConverter.java:311)
at io.debezium.embedded.EmbeddedEngine.(EmbeddedEngine.java:582)
at io.debezium.embedded.EmbeddedEngine.(EmbeddedEngine.java:79)
at 
io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:300)
at 
io.debezium.embedded.EmbeddedEngine$BuilderImpl.build(EmbeddedEngine.java:216)
at 
io.debezium.embedded.ConvertingEngineBuilder.build(ConvertingEngineBuilder.java:139)
at 
com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:299)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)



Best,
TonyChen



Re: 当数据源产生的速度过快时使用AscendingTimestampExtractor并没有忽略非递增的事件时间数据

2020-08-12 文章 SSY
我想明白了,AscendingTimestampExtractor产生的水印是周期水印,当数据源速率过快时,水印还没来得及发送,后面的数据已经进入到算子当中了,所以造成了非递增事件也会被包含在算子中。。在测试环境下,如果换成Punctuated
Watermarks对每个事件发送水印,就没问题了



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink slot之间内存隔离问题

2020-08-12 文章 Xintong Song
slot 之间只有 managed memory 是隔离的。
取决于你的 flink 版本,1.10 之后 managed memory 除了用于 batch operator,还会用于 rocksdb
state backend。

Thank you~

Xintong Song



On Wed, Aug 12, 2020 at 3:55 PM Cayden chen <1193216...@qq.com> wrote:

> hi,all
> 对于flink slot之间内存隔离有个疑问。就是slot中的task请求networkbufferpool的时候是否会限制为
> 1/slost数量(我看源码好像是没有限制)。如果没有限制的话,那么具体隔离的内存是什么内存,是否只是在批处理模式下的managed memory。


FlinkSQL even time 定义问题

2020-08-12 文章 Zhao,Yi(SEC)
咨询下,FlinkSQl的event time必须在DDL中定义吗。能否DDL只是定义普通数据字段,比如有个time属性。
然后在select 的时候指定具体使用的watermark策略。
目的:假设基于同一个表A,我查询1需要使用watermark为time-1min,查询2需要使用watermark为time-2min。

其次除了这种case,如果我基于表1查询得到结果输出到表2,那么表2的event 
time定义呢?比如在表2的定义中基于表2的某个属性(比如叫time2),然后插入表2的时候只要time2属性存在就可以?


此外,如果对比datastream api的watermark传播机制,如果我希望查询1结构输出到表2,然后继续基于表2查询。貌似就需要
select xxx from (select yyy from t); 这种嵌套写法,一句sql会变成一个任务。
那如何用sql做非常复杂的任务组合呢,比如我是2句不搭嘎的sql的,就希望在同一个job中呢。


Re: 用hive streaming写 orc文件的问题

2020-08-12 文章 Rui Li
Hi,

写orc表的作业有报错么?还是成功执行但是hive查不到数据呢?
看不到你贴的代码,有没有设置table.exec.hive.fallback-mapred-writer?

On Wed, Aug 12, 2020 at 5:14 PM 18579099...@163.com <18579099...@163.com>
wrote:

>
>
> 我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到,
>
> 但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多,
> 而且不会根据checkpoint间隔生成(parquet符合checkpoint间隔)。而且需要导入flink-orc_2.11jar包(parquet不需要),否则报Exception
> in thread "main" java.lang.NoClassDefFoundError:
> org/apache/orc/TypeDescription错误。并且parquet每间隔checkpoint interval
> 会输出parquet相关的日志,而orc的并没有日志产生,请问是什么原因?我已贴上代码。
> --
> 18579099...@163.com
>


-- 
Best regards!
Rui Li


flink 1.11 udtf动态定义返回类型

2020-08-12 文章 李强
flink 1.11 udtf可以像1.10那样自定义返回类型不我希望可以像flink 1.10这样:

@Override
  public TypeInformation

用hive streaming写 orc文件的问题

2020-08-12 文章 18579099...@163.com

我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到,
但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多,
而且不会根据checkpoint间隔生成(parquet符合checkpoint间隔)。而且需要导入flink-orc_2.11jar包(parquet不需要),否则报Exception
 in thread "main" java.lang.NoClassDefFoundError: 
org/apache/orc/TypeDescription错误。并且parquet每间隔checkpoint interval 
会输出parquet相关的日志,而orc的并没有日志产生,请问是什么原因?我已贴上代码。


18579099...@163.com


?????? pyflink 1.11.1 execute_sql sql_update????????????????????????????????????????????, ??????????????Deprecated

2020-08-12 文章 xuzh







----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#translate-and-execute-a-query

Best,
Xingbo

?? 

Re: pyflink 1.11.1 execute_sql sql_update执行问题,下面三种情况只有第一种情况执行成功, 但是该方法是被Deprecated

2020-08-12 文章 Leonard Xu
Hi
现象是正常的, 
execute_sql是一个异步的方法,提交后就退出了,如果需要等待执行结果,可以调用如下方法显式地等待
sql_result = t_env.execute_sql("insert into print_sink select wd,count(wd) cnt 
from sc group by wd")
sql_result.get_job_client().get_job_execution_result().result()

祝好
Leonard Xu

> 在 2020年8月12日,16:00,徐振华  写道:
> 
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
> 
> # pyflink 1.11.1 
> environment_settings = 
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> senv = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(senv, 
> environment_settings=environment_settings)
> source_ddl="create table sc(wd varchar,cnt int,c int,hu 
> varchar)with('connector'='filesystem','path'='/home/xzh/mypython/bigdata/flink/stream/input/','format'='csv')"
> sink_ddl = "create table print_sink(wd varchar,cnt 
> bigint)with('connector'='print')"
> 
> # 以下可以正常执行
> t_env.execute_sql(source_ddl)
> t_env.execute_sql(sink_ddl)
> t_env.sql_update("insert into print_sink select wd,count(wd) cnt from sc 
> group by wd")
> t_env.execute("soc")
> 
> # 以下不可以正常执行
> t_env.execute_sql(source_ddl)
> t_env.execute_sql(sink_ddl)
> t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc 
> group by wd")
> t_env.execute("soc")
> 
> 
> # 也不可以正常执行
> t_env.execute_sql(source_ddl)
> t_env.execute_sql(sink_ddl)
> t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc 
> group by wd")
> senv.execute("soc")
> 
> 
> 
> 
> 



Re: pyflink 1.11.1 execute_sql sql_update执行问题,下面三种情况只有第一种情况执行成功, 但是该方法是被Deprecated

2020-08-12 文章 Xingbo Huang
Hi,
execute_sql已经包含有了execute的意思了无非前者是异步非阻塞的,所以,你就别execute_sql之后还来一个execute了,具体你可以看下文档[1]。如果你使用execute_sql且在本地跑的话,你需要进行如下操作,否则会程序直接跑完没有结果。
result = t_env.execute_sql("你写的sql")
result.get_job_client().get_job_execution_result().result()

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#translate-and-execute-a-query

Best,
Xingbo

徐振华  于2020年8月12日周三 下午4:00写道:

> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import EnvironmentSettings, StreamTableEnvironment
>
> # pyflink 1.11.1
> environment_settings =
> EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
> senv = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(senv,
> environment_settings=environment_settings)
> source_ddl="create table sc(wd varchar,cnt int,c int,hu
> varchar)with('connector'='filesystem','path'='/home/xzh/mypython/bigdata/flink/stream/input/','format'='csv')"
> sink_ddl = "create table print_sink(wd varchar,cnt
> bigint)with('connector'='print')"
>
> # 以下可以正常执行
> t_env.execute_sql(source_ddl)
> t_env.execute_sql(sink_ddl)
> t_env.sql_update("insert into print_sink select wd,count(wd) cnt from sc
> group by wd")
> t_env.execute("soc")
>
> # 以下不可以正常执行
> t_env.execute_sql(source_ddl)
> t_env.execute_sql(sink_ddl)
> t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc
> group by wd")
> t_env.execute("soc")
>
>
> # 也不可以正常执行
> t_env.execute_sql(source_ddl)
> t_env.execute_sql(sink_ddl)
> t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc
> group by wd")
> senv.execute("soc")
>
> 
>
>
> 


Re:flink1.10.0任务失败但是application仍然在yarn运行不会自动kill

2020-08-12 文章 魏烽
是的,但是连接超时判定任务失败后也应该把application在yarn kill掉吧,不然会一直挂着占用资源

 原始邮件
发件人: Michael Ran
收件人: user-zh
发送时间: 2020年8月12日(周三) 15:41
主题: Re:flink1.10.0任务失败但是application仍然在yarn运行不会自动kill


Connection timed out: 
nb-bdh-hadoop.slave14/10.10.5.14:21913是不是连接超时,请求没发送过去?
在 2020-08-12 13:57:52,"魏烽" mailto:weif...@nequal.com>> 写道:
>各位好:
>
>Flink1.10.0版本发现偶发任务失败后但是web ui仍然挂着该任务并没有停止,日志如下:
>
>请问有遇到一样的情况的嘛
>
>[INFO] 2020-07-28 16:34:00.938  - [taskAppId=TASK-38-97-193]:[106] -  -> 
>2020-07-28 16:33:52,863 INFO  org.apache.flink.yarn.YarnClusterDescriptor  
> - Submitting application master application_1571540269403_52656
>
>2020-07-28 16:33:52,895 INFO  
>org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted 
>application application_1571540269403_52656
>
>2020-07-28 16:33:52,895 INFO  org.apache.flink.yarn.YarnClusterDescriptor  
> - Waiting for the cluster to be allocated
>
>2020-07-28 16:33:52,897 INFO  org.apache.flink.yarn.YarnClusterDescriptor  
> - Deploying cluster, current state ACCEPTED
>
>2020-07-28 16:34:00,938 INFO  org.apache.flink.yarn.YarnClusterDescriptor  
> - YARN application has been deployed successfully.
>
>[INFO] 2020-07-28 16:36:04.622  - [taskAppId=TASK-38-97-193]:[106] -  -> 
>2020-07-28 16:34:00,941 INFO  org.apache.flink.yarn.YarnClusterDescriptor  
> - Found Web Interface nb-bdh-hadoop.slave14:21913 of application 
>'application_1571540269403_52656'.
>
>Job has been submitted with JobID 54ac0f3db08f29022d9f3d51d797a724
>
>[INFO] 2020-07-28 16:36:04.623  - [taskAppId=TASK-38-97-193]:[106] -  -> 
>
>
>The program finished with the following exception:
>
>org.apache.flink.client.program.ProgramInvocationException: The main method 
>caused an error: 
>org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
>complete the operation. Number of retries has been exhausted.
>
>at 
>org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>
>at 
>org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>
>at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>
>at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>
>at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>
>at 
>org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>
>at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>
>at java.security.AccessController.doPrivileged(Native Method)
>
>at javax.security.auth.Subject.doAs(Subject.java:422)
>
>at 
>org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>
>at 
>org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>
>at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>
>Caused by: java.util.concurrent.ExecutionException: 
>org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
>complete the operation. Number of retries has been exhausted.
>
>at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>
>at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>
>at 
>org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
>
>at 
>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>
>at 
>org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:678)
>
>at com.nequal.bdh.cdp.IDMappingLauncher$.main(IDMappingLauncher.scala:140)
>
>at com.nequal.bdh.cdp.IDMappingLauncher.main(IDMappingLauncher.scala)
>
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>at 
>sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>at 
>sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>at java.lang.reflect.Method.invoke(Method.java:498)
>
>at 
>org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>
>... 11 more
>
>Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
>Could not complete the operation. Number of retries has been exhausted.
>
>at 
>org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:284)
>
>at 
>java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>
>at 
>java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>
>at 
>java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
>at 
>java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
>
>at 

Re: 使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes

2020-08-12 文章 Leonard Xu
Hi

Group by 和 left join 都是会有 retract 消息的,这类消息需要UpsertStreamTableSink才能处理, Kafka 
connetor 目前的实现是AppendStreamTableSink,所以不能处理
社区已经有一个issue在处理这个问题了,应该1.12能提供这个功能。


Best
Leonard
[1]https://issues.apache.org/jira/browse/FLINK-18826 


> 在 2020年8月12日,15:58,xiao cai  写道:
> 
> Hi Jark:
> 版本:1.11.0
> 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错:
> AppendStreamTableSink doesn't support consuming update changes which is 
> produced by node GroupAggregate
> 
> 
> 我希望能够在sql校验时也能使upsert操作作用于kafka sink,或者等upsert完成,再写入kafka
> 
> 
> 附上执行sql:
> create table kafka_table_1 (  
>`shop_id` varchar,  
>`user_id` bigint,  
>`category_id` int, 
>`ts` bigint,  
>`row_time` timestamp(3), 
>`proc_time` timestamp(3), 
> ) with (  
>'connector.type' = 'kafka',  
>'connector.version' = 'universal',  
>'connector.topic' = 'user_visit_1',  
>'connector.startup-mode' = 'latest-offset',  
>'connector.properties.bootstrap.servers' = 'ip:9092',  
>'connector.properties.zookeeper.connect' = 'ip:2181',  
>'update-mode' = 'append', 
>'format.type' = 'avro-registry', 
>'format.schema-subject' = 'user_visit', 
>'format.schema-url'='http://ip:8081', 
> )
> 
> 
> CREATE TABLE hbase_table ( 
>rowKey STRING, 
>cf ROW 
> ) WITH ( 
>'connector.type' = 'hbase', 
>'connector.version' = '1.4.3', 
>'connector.table-name' = 'hbase_table', 
>'connector.zookeeper.quorum' = 'ip:2181', 
>'connector.zookeeper.znode.parent' = '/hbase', 
>'connector.write.buffer-flush.max-rows' = '1000' 
> )
> 
> 
> 
> 
> create table kafka_table_2 (  
>`shop_id` varchar,  
>`age` varchar,  
>`area` varchar
> ) with (  
>'connector.type' = 'kafka',  
>'connector.version' = 'universal',  
>'connector.topic' = 'user_visit_2',  
>'connector.startup-mode' = 'latest-offset',  
>'connector.properties.bootstrap.servers' = 'ip:9092',  
>'connector.properties.zookeeper.connect' = 'ip:2181',  
>'update-mode' = 'append', 
>'format.type' = 'avro-registry', 
>'format.schema-subject' = 'user_visit', 
>'format.schema-url'='http://ip:8081', 
> )
> 
> 
> insert into kafka_table_2(shop_id, user_id, category_id, ts, row_time, 
> proc_time)
> select shop_id, age, area 
> from kafka_table_1 left join hbase_table
> for system_time as of kafka_table_1.proc_time as temp on 
> kafka_table_1.shop_id = temp.rowKey
> group by shop_id, age, area
> 
> 
> 原始邮件 
> 发件人: xiao cai
> 收件人: user-zh
> 发送时间: 2020年8月12日(周三) 15:41
> 主题: AppendStreamTableSink doesn't support consuming update changes
> 
> 
> Hi Jark: 版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka 
> sink,会在语法校验阶段报错: AppendStreamTableSink doesn't support consuming update 
> changes which is produced by node GroupAggregate



回复:使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes

2020-08-12 文章 xuzh
是不是update-mode 改用 update模式




--原始邮件--
发件人:
"user-zh"   
 

Re: Re: Re: Re:Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-08-12 文章 Jingsong Li
另外问一下,是什么格式?csv还是parquet。
有等到10分钟(rollover-interval)过后和下一次checkpoint后再看吗?

On Wed, Aug 12, 2020 at 2:45 PM kandy.wang  wrote:

>
>
>
>
>
>
> 有的。就是写了一半,做了一个checkpoint ,然后程序 做一个savepoint cancel掉,
> 重启的时候,从最新的savepoint恢复,但是重启的时候已经属于新分区了。
> 就是感觉停止之前正在写的那个分区,没有触发commit
>
>
>
>
> 在 2020-08-12 14:26:53,"Jingsong Li"  写道:
> >那你之前的分区除了in-progress文件,有已完成的文件吗?
> >
> >On Wed, Aug 12, 2020 at 1:57 PM kandy.wang  wrote:
> >
> >>
> >>
> >>
> >> source就是kafka
> >>
> json格式,是exactly-once,按照process-time处理就已经写完了呢。起来的时候,process-time已经属于新的分区了,很正常。但以前的老分区状态还没提交呢。
> >>
> >>
> >>
> >>
> >>
> >>
> >> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
> >>
> >>
> >>
> >> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
> >>
> >> 在 2020-08-12 13:28:01,"Jingsong Li"  写道:
> >> >你的source是exactly-once的source吗?
> >> >
> >> >in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
> >> >
> >> >On Wed, Aug 12, 2020 at 12:51 PM kandy.wang  wrote:
> >> >
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> >@ Jingsong
> >> >>
> >> >> >导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。
> >> >> 用presto查询查不了
> >> >> 举例:12:35分钟应当写的是 12:35 00秒 -12:39分 59秒 之间的数据,
> >> >>  'sink.partition-commit.trigger'='process-time',
> >> >>   'sink.partition-commit.delay'='0 min',
> >> >>
>  'sink.partition-commit.policy.kind'='metastore,success-file,custom',
> >> >>   'sink.rolling-policy.check-interval'='30s',
> >> >>   'sink.rolling-policy.rollover-interval'='10min',
> >> >>   'sink.rolling-policy.file-size'='128MB'
> >> >>如果是12:39分 05秒左右做一次savepoint,然后
> >> >> 12:41分程序重启后,发现之前的12:35分区不再写入,里面的in-progress文件还在,但是分区没有提交,没有往hive add
> >> >> partition,就导致有数据,但是确查不 了。
> >> >>
> >>
> 按照你说的,in-progress文件对没影响,但是影响了分区提交。就没地方触发之前12:35分区提交逻辑了。相当于丢了一个分区。这种情况我试了一下,手动add
> >> >> partition 也能查了。
> >> >> >
> >> >> >
> >> >> >
> >> >> >在 2020-08-12 12:11:53,"Jingsong Li"  写道:
> >> >> >>in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响
> >> >> >>
> >> >> >>On Wed, Aug 12, 2020 at 11:05 AM Jark Wu  wrote:
> >> >> >>
> >> >> >>> 与我所知,(2) & (3) 有希望能在 1.12 中支持。
> >> >> >>>
> >> >> >>> On Tue, 11 Aug 2020 at 21:15, kandy.wang 
> wrote:
> >> >> >>>
> >> >> >>> > 1.StreamingFileWriter
> >> 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
> >> >> >>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
> >> >> >>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm
> >> >> >>> > =2100分区的数据还存在很多的in-progress文件。
> >> >> >>> >
> 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。
> >> >> >>> >
> >> >> >>> >
> >> >> >>> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持
> >> >> >>> >
> >> >> >>> >
> >> >> >>> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持
> >> >> >>>
> >> >> >>
> >> >> >>
> >> >> >>--
> >> >> >>Best, Jingsong Lee
> >> >>
> >> >
> >> >
> >> >--
> >> >Best, Jingsong Lee
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


pyflink 1.11.1 execute_sql sql_update????????????????????????????????????????????, ??????????????Deprecated

2020-08-12 文章 ??????
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

# pyflink 1.11.1 
environment_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
senv = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(senv, 
environment_settings=environment_settings)
source_ddl="create table sc(wd varchar,cnt int,c int,hu 
varchar)with('connector'='filesystem','path'='/home/xzh/mypython/bigdata/flink/stream/input/','format'='csv')"
sink_ddl = "create table print_sink(wd varchar,cnt 
bigint)with('connector'='print')"

# 
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.sql_update("insert into print_sink select wd,count(wd) cnt from sc group 
by wd")
t_env.execute("soc")

# ??
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc group 
by wd")
t_env.execute("soc")


# 
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc group 
by wd")
senv.execute("soc")






使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes

2020-08-12 文章 xiao cai
Hi Jark:
版本:1.11.0
问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错:
AppendStreamTableSink doesn't support consuming update changes which is 
produced by node GroupAggregate


我希望能够在sql校验时也能使upsert操作作用于kafka sink,或者等upsert完成,再写入kafka


附上执行sql:
create table kafka_table_1 (  
`shop_id` varchar,  
`user_id` bigint,  
`category_id` int, 
`ts` bigint,  
`row_time` timestamp(3), 
`proc_time` timestamp(3), 
) with (  
'connector.type' = 'kafka',  
'connector.version' = 'universal',  
'connector.topic' = 'user_visit_1',  
'connector.startup-mode' = 'latest-offset',  
'connector.properties.bootstrap.servers' = 'ip:9092',  
'connector.properties.zookeeper.connect' = 'ip:2181',  
'update-mode' = 'append', 
'format.type' = 'avro-registry', 
'format.schema-subject' = 'user_visit', 
'format.schema-url'='http://ip:8081', 
)


CREATE TABLE hbase_table ( 
rowKey STRING, 
cf ROW 
) WITH ( 
'connector.type' = 'hbase', 
'connector.version' = '1.4.3', 
'connector.table-name' = 'hbase_table', 
'connector.zookeeper.quorum' = 'ip:2181', 
'connector.zookeeper.znode.parent' = '/hbase', 
'connector.write.buffer-flush.max-rows' = '1000' 
)




create table kafka_table_2 (  
`shop_id` varchar,  
`age` varchar,  
`area` varchar
) with (  
'connector.type' = 'kafka',  
'connector.version' = 'universal',  
'connector.topic' = 'user_visit_2',  
'connector.startup-mode' = 'latest-offset',  
'connector.properties.bootstrap.servers' = 'ip:9092',  
'connector.properties.zookeeper.connect' = 'ip:2181',  
'update-mode' = 'append', 
'format.type' = 'avro-registry', 
'format.schema-subject' = 'user_visit', 
'format.schema-url'='http://ip:8081', 
)


insert into kafka_table_2(shop_id, user_id, category_id, ts, row_time, 
proc_time)
select shop_id, age, area 
from kafka_table_1 left join hbase_table
for system_time as of kafka_table_1.proc_time as temp on kafka_table_1.shop_id 
= temp.rowKey
group by shop_id, age, area


 原始邮件 
发件人: xiao cai
收件人: user-zh
发送时间: 2020年8月12日(周三) 15:41
主题: AppendStreamTableSink doesn't support consuming update changes


Hi Jark: 版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka 
sink,会在语法校验阶段报错: AppendStreamTableSink doesn't support consuming update changes 
which is produced by node GroupAggregate

flink slot????????????????

2020-08-12 文章 Cayden chen
hi??all
flink 
slot??slottasknetworkbufferpool??
 
1/slost(??)??managed
 memory??

当数据源产生的速度过快时使用AscendingTimestampExtractor并没有忽略非递增的事件时间数据

2020-08-12 文章 SSY

 
数据源如上图所示,6行3列。这里的逻辑是以第三列为事件事件,采用滚动窗口(10s),统计窗口内最大的第一列的时间(PS:第一列数据这里故意设置成升序),输出为“窗口内最大的第一列时间所在行的第二列的值,窗口内元素的个数”。如果一切正常,我认为结果应该是
2,2
2,5
我是用kafka来发送数据源,当发送速率为100毫秒每条数据时,结果和预期相符,如下图

 
但是当发送速率为1毫秒每条数据时,发现红框内的数据也被包含进来了(即第二列是4的那条数据源,本来应该被忽略),如下图

 
这样看来数据发送的速率不同也会影响最后的结果吗?请问这是什么原因呢?
部分程序代码如下:

 

 

 

 






--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re:flink1.10.0任务失败但是application仍然在yarn运行不会自动kill

2020-08-12 文章 Michael Ran
Connection timed out: 
nb-bdh-hadoop.slave14/10.10.5.14:21913是不是连接超时,请求没发送过去?
在 2020-08-12 13:57:52,"魏烽"  写道:
>各位好:
>
>Flink1.10.0版本发现偶发任务失败后但是web ui仍然挂着该任务并没有停止,日志如下:
>
>请问有遇到一样的情况的嘛
>
>[INFO] 2020-07-28 16:34:00.938  - [taskAppId=TASK-38-97-193]:[106] -  -> 
>2020-07-28 16:33:52,863 INFO  org.apache.flink.yarn.YarnClusterDescriptor  
> - Submitting application master application_1571540269403_52656
>
>2020-07-28 16:33:52,895 INFO  
>org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted 
>application application_1571540269403_52656
>
>2020-07-28 16:33:52,895 INFO  org.apache.flink.yarn.YarnClusterDescriptor  
> - Waiting for the cluster to be allocated
>
>2020-07-28 16:33:52,897 INFO  org.apache.flink.yarn.YarnClusterDescriptor  
> - Deploying cluster, current state ACCEPTED
>
>2020-07-28 16:34:00,938 INFO  org.apache.flink.yarn.YarnClusterDescriptor  
> - YARN application has been deployed successfully.
>
>[INFO] 2020-07-28 16:36:04.622  - [taskAppId=TASK-38-97-193]:[106] -  -> 
>2020-07-28 16:34:00,941 INFO  org.apache.flink.yarn.YarnClusterDescriptor  
> - Found Web Interface nb-bdh-hadoop.slave14:21913 of application 
>'application_1571540269403_52656'.
>
>Job has been submitted with JobID 54ac0f3db08f29022d9f3d51d797a724
>
>[INFO] 2020-07-28 16:36:04.623  - [taskAppId=TASK-38-97-193]:[106] -  -> 
>
>
>The program finished with the following exception:
>
>org.apache.flink.client.program.ProgramInvocationException: The main method 
>caused an error: 
>org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
>complete the operation. Number of retries has been exhausted.
>
>at 
>org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>
>at 
>org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>
>at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>
>at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>
>at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>
>at 
>org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>
>at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>
>at java.security.AccessController.doPrivileged(Native Method)
>
>at javax.security.auth.Subject.doAs(Subject.java:422)
>
>at 
>org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>
>at 
>org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>
>at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>
>Caused by: java.util.concurrent.ExecutionException: 
>org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
>complete the operation. Number of retries has been exhausted.
>
>at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>
>at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
>
>at 
>org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)
>
>at 
>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>
>at 
>org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:678)
>
>at com.nequal.bdh.cdp.IDMappingLauncher$.main(IDMappingLauncher.scala:140)
>
>at com.nequal.bdh.cdp.IDMappingLauncher.main(IDMappingLauncher.scala)
>
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
>at 
>sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
>at 
>sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>at java.lang.reflect.Method.invoke(Method.java:498)
>
>at 
>org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>
>... 11 more
>
>Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
>Could not complete the operation. Number of retries has been exhausted.
>
>at 
>org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:284)
>
>at 
>java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
>
>at 
>java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
>
>at 
>java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>
>at 
>java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
>
>at 
>org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:342)
>
>at 
>org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)
>
>at 

Re: flink 1.10.1 sql limit 不生效

2020-08-12 文章 Peihui He
[image: image.png]

order by TUMBLE_START  结果如上图

Peihui He  于2020年8月12日周三 下午3:40写道:

> Hi BenChao
>
> SQL是流模式下执行的,看着不生效的表现就是显示的数量超过limit的数量。
> order by TUMBLE_START desc 好像不是预期的结果
>
> 这个是需要给 TUMBLE_START 得到的timestamp 转为 long么?
>
> Best Wishes.
>
> Benchao Li  于2020年8月12日周三 下午3:12写道:
>
>> 你这个SQL是在流式还是批式模式下执行的呢?limit不生效的表现是什么呢?
>>
>> Peihui He  于2020年8月12日周三 下午3:03写道:
>>
>> > Hi all,
>> >
>> > 用zeppelin执行sql的时候发现用了 LEFT JOIN LATERAL TABLE 时候 limit 不生效
>> > sql 类似下面:
>> > select a, b, c, t from tb  LEFT JOIN LATERAL TABLE (Tf(a)) as T(c, d,
>> e) ON
>> > TRUE  order by t desc limit 10
>> >
>> > 如果select 结果中不包括c的化,就正常了
>> >
>> > 请问这个是什么问题呢?sql是写的不对么?
>> >
>> > Best Wishes.
>> >
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>


AppendStreamTableSink doesn't support consuming update changes

2020-08-12 文章 xiao cai
Hi Jark:


版本:1.11.0
问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错:
AppendStreamTableSink doesn't support consuming update changes which is 
produced by node GroupAggregate

AppendStreamTableSink doesn't support consuming update changes

2020-08-12 文章 xiao cai
Hi Jark:


版本:1.11.0
问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错:

Re: flink 1.10.1 sql limit 不生效

2020-08-12 文章 Peihui He
Hi BenChao

发现问题了,是因为select 的字段中包含了array,导致数据显示的比实际limit数据要多

Best Wishes.

Benchao Li  于2020年8月12日周三 下午3:12写道:

> 你这个SQL是在流式还是批式模式下执行的呢?limit不生效的表现是什么呢?
>
> Peihui He  于2020年8月12日周三 下午3:03写道:
>
> > Hi all,
> >
> > 用zeppelin执行sql的时候发现用了 LEFT JOIN LATERAL TABLE 时候 limit 不生效
> > sql 类似下面:
> > select a, b, c, t from tb  LEFT JOIN LATERAL TABLE (Tf(a)) as T(c, d, e)
> ON
> > TRUE  order by t desc limit 10
> >
> > 如果select 结果中不包括c的化,就正常了
> >
> > 请问这个是什么问题呢?sql是写的不对么?
> >
> > Best Wishes.
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: flink 1.10.1 sql limit 不生效

2020-08-12 文章 Benchao Li
你这个SQL是在流式还是批式模式下执行的呢?limit不生效的表现是什么呢?

Peihui He  于2020年8月12日周三 下午3:03写道:

> Hi all,
>
> 用zeppelin执行sql的时候发现用了 LEFT JOIN LATERAL TABLE 时候 limit 不生效
> sql 类似下面:
> select a, b, c, t from tb  LEFT JOIN LATERAL TABLE (Tf(a)) as T(c, d, e) ON
> TRUE  order by t desc limit 10
>
> 如果select 结果中不包括c的化,就正常了
>
> 请问这个是什么问题呢?sql是写的不对么?
>
> Best Wishes.
>


-- 

Best,
Benchao Li


Re: flink 1.10.1 sql limit 不生效

2020-08-12 文章 Peihui He
应该是我这边sql问题,我这边在看看,打扰大家了

Peihui He  于2020年8月12日周三 下午3:03写道:

> Hi all,
>
> 用zeppelin执行sql的时候发现用了 LEFT JOIN LATERAL TABLE 时候 limit 不生效
> sql 类似下面:
> select a, b, c, t from tb  LEFT JOIN LATERAL TABLE (Tf(a)) as T(c, d, e)
> ON TRUE  order by t desc limit 10
>
> 如果select 结果中不包括c的化,就正常了
>
> 请问这个是什么问题呢?sql是写的不对么?
>
> Best Wishes.
>


flink 1.10.1 sql limit 不生效

2020-08-12 文章 Peihui He
Hi all,

用zeppelin执行sql的时候发现用了 LEFT JOIN LATERAL TABLE 时候 limit 不生效
sql 类似下面:
select a, b, c, t from tb  LEFT JOIN LATERAL TABLE (Tf(a)) as T(c, d, e) ON
TRUE  order by t desc limit 10

如果select 结果中不包括c的化,就正常了

请问这个是什么问题呢?sql是写的不对么?

Best Wishes.


回复: flink中同时有多个大数据组件开启kerberos时keytab的选择

2020-08-12 文章 zjfpla...@hotmail.com
图片挂了,其内容为:
security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: /home/zjf/kafka.keytab
security.kerberos.login.principal: ka...@test.com
security.kerberos.login.contexts: Client,KafkaClient
zookeeper.sasl.service-name: zookeeper
zookeeper.sasl.login-context-name: Client

我这边想的是两种解决方式:
1.security.kerberos.login.principal能否为2个值,ka...@test.com,hb...@test.com
2.如何整合principal为ka...@test.com和hb...@test.com为一个principal,以及keytab为1个



zjfpla...@hotmail.com
 
发件人: zjfpla...@hotmail.com
发送时间: 2020-08-12 14:39
收件人: user-zh
主题: flink中同时有多个大数据组件开启kerberos时keytab的选择
各位好,

flink-conf.yaml中kerberos配置段如下:
CDH中hbase+kafka同时开启了kerberos,类似此方式,如果上述配置中的“security.kerberos.login.principal”我用kafka.keytab,会报无权限操作hbase;如果用hbase.keytab
 ,会报无权限操作kafka。 
我查过kerberos的资料,发现有种方式是整合hbase.keytab和kafka.keytab,但是其实是包含2个principal(ka...@test.com,hb...@test.com),而不是一个pincial,如下操作:
ktutil: rkt user1.keytab
ktutil: rkt user2.keytab
ktutil: wkt user.keytab
这样的话,虽然是整合了hbase和kafka的keytab为1个keytab,但是“security.kerberos.login.principal”还是只能填其中一个,一样会有权限的问题,这块有没有什么方式能解决?




zjfpla...@hotmail.com


Re:Re: Re: Re:Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-08-12 文章 kandy.wang






有的。就是写了一半,做了一个checkpoint ,然后程序 做一个savepoint cancel掉, 
重启的时候,从最新的savepoint恢复,但是重启的时候已经属于新分区了。
就是感觉停止之前正在写的那个分区,没有触发commit




在 2020-08-12 14:26:53,"Jingsong Li"  写道:
>那你之前的分区除了in-progress文件,有已完成的文件吗?
>
>On Wed, Aug 12, 2020 at 1:57 PM kandy.wang  wrote:
>
>>
>>
>>
>> source就是kafka
>> json格式,是exactly-once,按照process-time处理就已经写完了呢。起来的时候,process-time已经属于新的分区了,很正常。但以前的老分区状态还没提交呢。
>>
>>
>>
>>
>>
>>
>> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
>>
>>
>>
>> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
>>
>> 在 2020-08-12 13:28:01,"Jingsong Li"  写道:
>> >你的source是exactly-once的source吗?
>> >
>> >in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
>> >
>> >On Wed, Aug 12, 2020 at 12:51 PM kandy.wang  wrote:
>> >
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> >@ Jingsong
>> >>
>> >> >导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。
>> >> 用presto查询查不了
>> >> 举例:12:35分钟应当写的是 12:35 00秒 -12:39分 59秒 之间的数据,
>> >>  'sink.partition-commit.trigger'='process-time',
>> >>   'sink.partition-commit.delay'='0 min',
>> >>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
>> >>   'sink.rolling-policy.check-interval'='30s',
>> >>   'sink.rolling-policy.rollover-interval'='10min',
>> >>   'sink.rolling-policy.file-size'='128MB'
>> >>如果是12:39分 05秒左右做一次savepoint,然后
>> >> 12:41分程序重启后,发现之前的12:35分区不再写入,里面的in-progress文件还在,但是分区没有提交,没有往hive add
>> >> partition,就导致有数据,但是确查不 了。
>> >>
>> 按照你说的,in-progress文件对没影响,但是影响了分区提交。就没地方触发之前12:35分区提交逻辑了。相当于丢了一个分区。这种情况我试了一下,手动add
>> >> partition 也能查了。
>> >> >
>> >> >
>> >> >
>> >> >在 2020-08-12 12:11:53,"Jingsong Li"  写道:
>> >> >>in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响
>> >> >>
>> >> >>On Wed, Aug 12, 2020 at 11:05 AM Jark Wu  wrote:
>> >> >>
>> >> >>> 与我所知,(2) & (3) 有希望能在 1.12 中支持。
>> >> >>>
>> >> >>> On Tue, 11 Aug 2020 at 21:15, kandy.wang  wrote:
>> >> >>>
>> >> >>> > 1.StreamingFileWriter
>> 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
>> >> >>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
>> >> >>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm
>> >> >>> > =2100分区的数据还存在很多的in-progress文件。
>> >> >>> > 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。
>> >> >>> >
>> >> >>> >
>> >> >>> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持
>> >> >>> >
>> >> >>> >
>> >> >>> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持
>> >> >>>
>> >> >>
>> >> >>
>> >> >>--
>> >> >>Best, Jingsong Lee
>> >>
>> >
>> >
>> >--
>> >Best, Jingsong Lee
>>
>
>
>-- 
>Best, Jingsong Lee


flink中同时有多个大数据组件开启kerberos时keytab的选择

2020-08-12 文章 zjfpla...@hotmail.com
各位好,

flink-conf.yaml中kerberos配置段如下:
CDH中hbase+kafka同时开启了kerberos,类似此方式,如果上述配置中的“security.kerberos.login.principal”我用kafka.keytab,会报无权限操作hbase;如果用hbase.keytab
 ,会报无权限操作kafka。 
我查过kerberos的资料,发现有种方式是整合hbase.keytab和kafka.keytab,但是其实是包含2个principal(ka...@test.com,hb...@test.com),而不是一个pincial,如下操作:
ktutil: rkt user1.keytab
ktutil: rkt user2.keytab
ktutil: wkt user.keytab
这样的话,虽然是整合了hbase和kafka的keytab为1个keytab,但是“security.kerberos.login.principal”还是只能填其中一个,一样会有权限的问题,这块有没有什么方式能解决?




zjfpla...@hotmail.com


Re: Re: Re:Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-08-12 文章 Jingsong Li
那你之前的分区除了in-progress文件,有已完成的文件吗?

On Wed, Aug 12, 2020 at 1:57 PM kandy.wang  wrote:

>
>
>
> source就是kafka
> json格式,是exactly-once,按照process-time处理就已经写完了呢。起来的时候,process-time已经属于新的分区了,很正常。但以前的老分区状态还没提交呢。
>
>
>
>
>
>
> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
>
>
>
> in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
>
> 在 2020-08-12 13:28:01,"Jingsong Li"  写道:
> >你的source是exactly-once的source吗?
> >
> >in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢?
> >
> >On Wed, Aug 12, 2020 at 12:51 PM kandy.wang  wrote:
> >
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> >@ Jingsong
> >>
> >> >导致的影响是停止前的那个分区,分区没有提交, 当程序起来之后,写的分区和之前分区不是同一个分区,没有_SUCCESS文件标记。
> >> 用presto查询查不了
> >> 举例:12:35分钟应当写的是 12:35 00秒 -12:39分 59秒 之间的数据,
> >>  'sink.partition-commit.trigger'='process-time',
> >>   'sink.partition-commit.delay'='0 min',
> >>   'sink.partition-commit.policy.kind'='metastore,success-file,custom',
> >>   'sink.rolling-policy.check-interval'='30s',
> >>   'sink.rolling-policy.rollover-interval'='10min',
> >>   'sink.rolling-policy.file-size'='128MB'
> >>如果是12:39分 05秒左右做一次savepoint,然后
> >> 12:41分程序重启后,发现之前的12:35分区不再写入,里面的in-progress文件还在,但是分区没有提交,没有往hive add
> >> partition,就导致有数据,但是确查不 了。
> >>
> 按照你说的,in-progress文件对没影响,但是影响了分区提交。就没地方触发之前12:35分区提交逻辑了。相当于丢了一个分区。这种情况我试了一下,手动add
> >> partition 也能查了。
> >> >
> >> >
> >> >
> >> >在 2020-08-12 12:11:53,"Jingsong Li"  写道:
> >> >>in-progress文件带来了什么具体问题吗?它们是多余的文件,对流程没有影响
> >> >>
> >> >>On Wed, Aug 12, 2020 at 11:05 AM Jark Wu  wrote:
> >> >>
> >> >>> 与我所知,(2) & (3) 有希望能在 1.12 中支持。
> >> >>>
> >> >>> On Tue, 11 Aug 2020 at 21:15, kandy.wang  wrote:
> >> >>>
> >> >>> > 1.StreamingFileWriter
> 测试下来目前发现,sql方式提交任务,不能从checkpoint、savepoint恢复。
> >> >>> >举例:5min产生一个分区,数据按照process_time来落,hm= 2100 的分区, 在
> >> >>> > 21:04分左右的时候做一次checkpoint 或savepoint,重启任务的时候,hm
> >> >>> > =2100分区的数据还存在很多的in-progress文件。
> >> >>> > 另外,目前在hdfs目录下没看到pending文件,想了解一下这文件状态是如何转换的,跟之前的bucketsink好像实现不太一样。
> >> >>> >
> >> >>> >
> >> >>> > 2. sql-client不支持 checkpoint savepoint恢复的问题,何时可以支持
> >> >>> >
> >> >>> >
> >> >>> > 3.sql-client 提交任务,不支持StatementSet批量提交,何时可以支持
> >> >>>
> >> >>
> >> >>
> >> >>--
> >> >>Best, Jingsong Lee
> >>
> >
> >
> >--
> >Best, Jingsong Lee
>


-- 
Best, Jingsong Lee


关于FlinkSQL的一些最佳实践咨询

2020-08-12 文章 Zhao,Yi(SEC)
最近刚刚开始研究FlinkSQL,之前一直用的DataStream API,现在想尝试下FlinkSQL的玩法。
目前简单看下来,感觉相对来说相比DataStream API不够灵活,所以有一些实践疑问咨询下。

(1) 通常Kafka的动态表会通过临时表的方式写在程序开头声明定义呢?还是直接写在sql-client的配置文件中呢。

如果写在配置文件,好处是类似于一个固化的表定义一样,每次执行sql-client的cli之后可以马上查询相关数据。但是有个问题是这种情况下kafka设置读取earliest还是latest?貌似根据具体查询需求临时改配置好像也不是很方便。

写临时表是不是更常用?比如你这个程序目的就是读取指定时间开始的数据,那就在代码中定义这个临时表的时候就指定了offset。如果是需要从最早开始读,代码中定义临时表就定义为最早。

(2) 
FlinkSQL生产中,主要是通过代码方式执行呢?还是将相关jar都放好到固定的机器,然后每次sql-client的命令行进去,然后执行一些sql(提交任务)。

(3) 代码中,直接sql api常用呢?还是table api常用呢?哪个更推荐?包括表的定义也有sql ddl方式,以及table 
api提供的一套方式。以及DML也一样,table api提供了一套更加面向对象的语法,sql api则提供了更高层次的抽象,同时类sql(技能栈更通用)。

(4) Flink-json这种格式包一般打包到任务jar中?还是提前放到sql-client的相关lib中?哪个更推荐。还有各种connector。

今天有个实验,报错就是没这些包,加了lib就ok啦。这个有一点不是很懂,是这些包仅client需要,还是说我通过sql-client.sh 
的-l参数指定的lib目录会被动态上传到集群呢?毕竟这些包我并没有手动上传到集群,只是在client端游,并通过-l指定之后就ok可用了。

(5) 
在Hive中实践,更多的是提前定义好的各种表结构,然后策略同学直接写sql做各种数据查询和分析。在FlinkSQL场景下,是不是每个代码中定义自己的临时表更常见呢?毕竟有Kafka这种动态表,每次启动一个任务的开始消费位置可能也不一样。


Re: 关于flink sql的内置函数实现与debug

2020-08-12 文章 shizk233
感谢大佬们的指点

Benchao Li  于2020年8月12日周三 上午11:04写道:

> Hi,
>
> 内置的scalar
> function都是通过代码生成来关联到的,入口是`ExprCodeGenerator#generateCallExpression(...)`,
> 你可以顺着这里找到你需要看的具体的函数的对应的方法。
> PS:有很多方法是纯代码生成的,可能没法调试
>
> 内置的aggregate function有两种,一种是通过表达式直接写的,叫做`DeclarativeAggregateFunction`;
> 一种是通过类似于UDAF的方式来实现的,继承的是`AggregateFunction`
> 他们都在`org.apache.flink.table.planner.functions.aggfunctions`
> 包里面(flink-table-planner-blink模块)
>
> shizk233  于2020年8月12日周三 上午10:39写道:
>
> > hi all,
> >
> > 请教一下,flink sql内置的众多functions[1]有对应的Java实现类吗?我只在blink table
> > planner模块下的functions package里找到了一部分,并且是基于Expresstion的。
> >
> > 问题来源:我试图在flink
> > sql里去做debug,如果是自定义的udf可以打断点在实现上,但内置函数没找到相应的实现,似乎也没有相应的文档在这一块。
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/systemFunctions.html
> >
>
>
> --
>
> Best,
> Benchao Li
>