Re: flink如何正则读取hdfs下的文件

2020-05-20 文章 Jingsong Li
Hi, 志华, 如果在Datastream层,你可以使用FiIenputFormat.setFilesFilter来设置文件的过滤器。 目前Table层并不原生支持filter,你可以考虑自己写一个table connector。 但是更推荐的是你能把这个事情换成partition来处理,这个支持的会更自然些。 jimandlice, - 如果是1.10或以前,你需要写一个Datastream作业加上StreamingFileSink来写入Hive,并且列存格式只有parquet的支持。[1] - 如果是1.11(正在测试发布中),Table/SQL层原生支持streaming

Re: flink sql jdbc 只要含有group by 语句就会报错No operations allowed after statement closed.

2020-05-20 文章 Rui Li
看异常感觉是连接超时导致的,有一个相关的jira,https://issues.apache.org/jira/browse/FLINK-16681 On Thu, May 21, 2020 at 11:41 AM shao.hongxiao <17611022...@163.com> wrote: > |INSERT INTO pvuv_sink > |SELECT > | user_id,item_id,category_id,count(1) as cnt > |FROM user_log > |group by user_id,item_id,category_id >

回复: flink sql jdbc 只要含有group by 语句就会报错No operations allowed after statement closed.

2020-05-20 文章 shao.hongxiao
|INSERT INTO pvuv_sink |SELECT | user_id,item_id,category_id,count(1) as cnt |FROM user_log |group by user_id,item_id,category_id 以上是报错得sql 11:21:57,157 ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat - JDBC executeBatch error, retry times = 1 java.sql.SQLException: No

Re: flink sql jdbc 只要含有group by 语句就会报错No operations allowed after statement closed.

2020-05-20 文章 Rui Li
Hi,看不到你贴的SQL,试试直接发一下文字呢? On Thu, May 21, 2020 at 11:25 AM shao.hongxiao <17611022...@163.com> wrote: > 报错sql > 正常执行sql > > 这是为什么呀,好奇怪 > 邵红晓 > 邮箱:17611022...@163.com > >

flink sql jdbc 只要含有group by 语句就会报错No operations allowed after statement closed.

2020-05-20 文章 shao.hongxiao
报错sql 正常执行sql 这是为什么呀,好奇怪 | | 邵红晓 | | 邮箱:17611022...@163.com | 签名由网易邮箱大师定制

Re: TM太多,作业运行失败问题

2020-05-20 文章 Xintong Song
有没有可能是 pod ip 数不够了,或者 pod 上的 ip table 限制了 entry 数量之类的? Thank you~ Xintong Song On Wed, May 20, 2020 at 6:44 PM wrote: > hi,xintong > > 我这边观察到的现象,从系统日志上没有找到被内核oom > kill的日志。作业cancel掉后,失联的tm会重连上来,pod没有被kill掉。初步怀疑是网络层面的问题,感觉是cni有什么限制。 > > thanks~ > > > > > | | > a511955993 > | > | >

回复:flink如何正则读取hdfs下的文件

2020-05-20 文章 jimandlice
flink 写入hive 使用api 思路是怎么的呢 | | jimandlice | | 邮箱:jimandl...@163.com | Signature is customized by Netease Mail Master 在2020年05月21日 10:57,阿华田 写道: flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件 | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制

flink如何正则读取hdfs下的文件

2020-05-20 文章 阿华田
flink如何支持正则读取一个目录下的文件,比如读取文件名(时间格式命名)满足一定时间范围的文件 | | 王志华 | | a15733178...@163.com | 签名由网易邮箱大师定制

Re: 这种复杂数据直接解析成null了

2020-05-20 文章 Leonard Xu
+ 补个文档链接[1], 以及可能遇到一个潜在问题的issue链接: [1] https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/types.html#%E7%BB%93%E6%9E%84%E5%8C%96%E7%9A%84%E6%95%B0%E6%8D%AE%E7%B1%BB%E5%9E%8B

Re: flink1.10.x 解析 arrar 问题

2020-05-20 文章 Jingsong Li
谢谢Benchao的回答。 虽然可以work around,但是这看起来应该是blink planner要去支持的事情。 我建个JIRA去跟踪下:https://issues.apache.org/jira/browse/FLINK-17855 Best, Jingsong Lee On Wed, May 20, 2020 at 8:02 PM 了不起的盖茨比 <573693...@qq.com> wrote: > 谢谢大佬,终于弄好了。谢谢。 > public TypeInformation return new >

Re: 数组越界

2020-05-20 文章 Leonard Xu
Hi, allanqinjy 方便贴下查询的query吗?今天在排查另外一个问题时也遇到了这个问题,我建了issue来跟踪[1],想看下是不是相同原因。 Best, Leonard [1] https://issues.apache.org/jira/browse/FLINK-17847 > 在 2020年5月18日,19:52,Leonard Xu 写道: > > Hi, allanqinjy > >

Re: 这种复杂数据直接解析成null了

2020-05-20 文章 Leonard Xu
Hi, > 语句: > CREATE TABLE A ( > w_data STRING, > w_table STRING, > w_ts TIMESTAMP(3) 如果需要 w_data 字段是一个json数组,需要声明表时声明对应的结构化数据类型[1] 即你这里的A表需要声明成: create table json_table( w_es BIGINT, w_type STRING, w_isDdl BOOLEAN, w_data ARRAY>, w_ts TIMESTAMP(3),

Flink Weekly | 每周社区动态更新 - 2020/05/20

2020-05-20 文章 王雷
Flink 开发进展 1.Release ■ Piotr Nowojski 宣布 release-1.11 分支冻结。 [1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNECE-release-1-11-branch-cut-td41668.html ■ 1.10.1 已成功发版,发版日志见下链接。 [2] https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12346891 ■ 1.10.1

?????? flink1.10.x ???? arrar ????

2020-05-20 文章 ??????????????
public TypeInformation

Re: flink1.10.x 解析 arrar 问题

2020-05-20 文章 Benchao Li
不是这个意思,你需要覆盖`ScalarFunction#getParameterTypes(Class[] signature)`这个方法,显示指定你的输入数据的类型 比如你说的是Row[],那你需要指定Types.OBJECT_ARRAY(Types.ROW(Types.INT, Types.STRING...)),Row里面的类型需要填写 你真实的类型。 了不起的盖茨比 <573693...@qq.com> 于2020年5月20日周三 下午7:24写道: > udf的指定参数类型是 org.apache.flink.types.Row[],也是定义了这个。 > > >

?????? flink1.10.x ???? arrar ????

2020-05-20 文章 ??????????????
udf org.apache.flink.types.Row[]?? ---- ??:"Benchao Li"

Re: flink1.10.x 解析 arrar 问题

2020-05-20 文章 Benchao Li
你可以让你的UDF指定一下输入参数的类型,不指定类型的话,可能会推导出错误的类型,导致拿不到真正的数据 了不起的盖茨比 <573693...@qq.com> 于2020年5月20日周三 下午4:25写道: > 1.blink_planner 定义ddl解析array字段时候,假如select 那个字段可以解析出。 > 2.blink_planner 当我去定义自己函数时候,数组有长度但是没有元素,flink直接跳过解析 > > > 3.如果使用flink-planner那么是正确的 > > > > CREATE TABLE sourceTable ( > >

Re: 这种复杂数据直接解析成null了

2020-05-20 文章 Benchao Li
Flink里面对于Json的解析,是直接用的jackson,然后如果你声明的是varchar类型,会直接调用JsonNode.asText(),这个如果是container类型(也就是复杂类型)的话,是空字符串吧。 guaishushu1...@163.com 于2020年5月20日周三 下午6:06写道: > 语句: > CREATE TABLE A ( > w_data STRING, > w_table STRING, > w_ts TIMESTAMP(3) > > > CREATE TABLE B ( > w_ts TIMESTAMP(3), > city1_id

回复:TM太多,作业运行失败问题

2020-05-20 文章 a511955993
hi,xintong 我这边观察到的现象,从系统日志上没有找到被内核oom kill的日志。作业cancel掉后,失联的tm会重连上来,pod没有被kill掉。初步怀疑是网络层面的问题,感觉是cni有什么限制。 thanks~ | | a511955993 | | 邮箱:a511955...@163.com | 签名由 网易邮箱大师 定制 在2020年05月20日 17:56,Xintong Song 写道: Hi, 从日志看,报错的根本原因是有 TM 挂掉了,导致 pod 被 remove,这样从其他 TM 上就找不到挂掉的 TM

这种复杂数据直接解析成null了

2020-05-20 文章 guaishushu1...@163.com
语句: CREATE TABLE A ( w_data STRING, w_table STRING, w_ts TIMESTAMP(3) CREATE TABLE B ( w_ts TIMESTAMP(3), city1_id STRING, cate3_id STRING, pay_order_id STRING ) insert into B select w_ts, 'test' as city1_id, ArrayIndexOf(w_data, 0) AS cate3_id, w_data as pay_order_id from A 部分数据 A

Re: TM太多,作业运行失败问题

2020-05-20 文章 Xintong Song
Hi, 从日志看,报错的根本原因是有 TM 挂掉了,导致 pod 被 remove,这样从其他 TM 上就找不到挂掉的 TM 的地址。你可以确认一下,发生错误的时候是否有 TM 挂掉/重启。 至于 TM 挂掉的原因,需要想办法获取到失败 TM 的日志。按照你之前的描述,集群启动的时候是没问题的,作业执行的时候才有问题。我现在怀疑的方向是,作业执行造成的资源问题使得 TM 发生了 OOM 或者是内存超用被 Kubernetes 杀掉了。你在修改 TM 数量、slot 数量的过程中,是否调整了 TM 的资源大小?另外即使没有调整,作业本身消耗的资源也会有所变化,例如 TM 数量变多导致每个

回复:TM太多,作业运行失败问题

2020-05-20 文章 a511955993
hi,xintong,堆栈信息如下。 2020-05-20 16:46:20 org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException: Connection for partition 66c378b86c3e100e4a2d34927c4b7281@bb397f70ad4474d2beac18d484d726af not reachable. at

flink1.10.x ???? arrar ????

2020-05-20 文章 ??????????????
1.blink_planner ddlarray??select 2.blink_planner flink 3.flink-planner CREATE TABLE sourceTable ( event_time_line array

Re: TM太多,作业运行失败问题

2020-05-20 文章 Xintong Song
hi 最好能把完整的日志以及 error stack 发出来。 这个报错通常是 TM 运行的机器/pod 之间网络不通造成的,有可能和 kubernetes 的配置有关,但就目前的信息比较难确定。 Thank you~ Xintong Song On Wed, May 20, 2020 at 3:50 PM wrote: > > hi, all > > 集群信息: > flink版本是1.10.1,部署在kubernetes上。 > > 现象: >

Flink 1.10-SQL解析复杂json问题

2020-05-20 文章 guaishushu1...@163.com
kafka数据写入kafka 数据,flink1.10-sql解析复杂json中字段为string,导致数据丢失。 语句: CREATE TABLE A ( w_data STRING, w_table STRING, w_ts TIMESTAMP(3) CREATE TABLE B ( w_ts TIMESTAMP(3), city1_id STRING, cate3_id STRING, pay_order_id STRING ) insert into B select w_ts, 'test' as city1_id,

sql client定义指向elasticsearch索引密码问题

2020-05-20 文章 naturalfree
在flink sql client配置文件中定义指向es的索引。发现没有设置用户名密码的属性,现在的es connector是否支持安全认证呢 | | naturalfree | | 邮箱:naturalf...@126.com | 签名由 网易邮箱大师 定制

sinktable更新部分字段问题

2020-05-20 文章 naturalfree
现在有一个es索引,想通过flink sql根据主键更新部分字段。不知是否有可行方案 | | naturalfree | | 邮箱:naturalf...@126.com | 签名由 网易邮箱大师 定制

Re: Flink 1.10-SQL解析复杂json问题

2020-05-20 文章 Leonard Xu
Hi, guaishushu 贴query或者图床链接吧,flink-sql中的json的解析支持是比较全的[1],可以把json的 schema 和 异常数据贴下吗? 用个单元测试应该就可以复现问题 Best, Leonard [1] https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java

Flink 1.10-SQL解析复杂json问题

2020-05-20 文章 guaishushu1...@163.com
kafka数据写入kafka 数据,flink1.10-sql解析复杂json中字段为string,导致数据丢失。 guaishushu1...@163.com

TM太多,作业运行失败问题

2020-05-20 文章 a511955993
hi, all 集群信息: flink版本是1.10.1,部署在kubernetes上。 现象: 需要200个slot,如果指定TM个数为40,每个TM的slot个数为4,可以正常运行作业。如果指定TM为200,每个TM的slot个数为1,集群可以正常构建,ui上Available Task Slots显示为200,提交作业的时候,就会出现如下报错: Cased by: java.net.NoRouteToHostException: No route to host. 目前我遭遇到的使用场景说明如上,希望得到一些回复和解答说明,非常感谢。 Looking forward

Re:Re: flink1.10怎么获得flink-shaded-hadoop包以支持hadoop3.2.1?

2020-05-20 文章 Jeff
好的,刚刚也查了其它资料,flink现在还不支持hadoop3,但用2.X的包也是可以的,只要不用到hadoop3特有API就行了 在 2020-05-20 09:22:39,"刘大龙" 写道: >Hi, >你可以看一下这两个链接: >1: https://www.mail-archive.com/dev@flink.apache.org/msg37293.html >2: https://issues.apache.org/jira/browse/FLINK-11086 >> -原始邮件- >> 发件人: Jeff >> 发送时间:

Re: Flink-1.10-SQL TopN语法问题

2020-05-20 文章 Leonard Xu
+ user-zh > 在 2020年5月20日,15:27,Leonard Xu 写道: > > Hi,guaishushu > > 先说声抱歉邮件回复晚了,过了下你的sql,问题是1.10 中 对于upsertSink的primary > key是通过query来推断的,部分query是推断不出来的,你的query刚好推断 > 不出来PK的,所以会提示:Exception in thread "main" > org.apache.flink.table.api.TableException: UpsertStreamTableSink requires > that

Re: Flink convert Table to DataSet[Row]

2020-05-20 文章 Jingsong Li
可能也有问题,flink planner可能已经不支持hive connector了。 On Wed, May 20, 2020 at 2:57 PM 张锴 wrote: > 我用的是flink1.10的,那意思只能用flink planner的方式了吗 > > Jingsong Li 于2020年5月20日周三 下午2:55写道: > > > blink planner是不支持和Dataset的转换的。 > > > > Best, > > Jingsong Lee > > > > On Wed, May 20, 2020 at 2:49 PM 张锴 wrote: > > > >

Re: Flink convert Table to DataSet[Row]

2020-05-20 文章 张锴
我用的是flink1.10的,那意思只能用flink planner的方式了吗 Jingsong Li 于2020年5月20日周三 下午2:55写道: > blink planner是不支持和Dataset的转换的。 > > Best, > Jingsong Lee > > On Wed, May 20, 2020 at 2:49 PM 张锴 wrote: > > > def main(args: Array[String]): Unit = { > > val tableEnvSettings = EnvironmentSettings.newInstance() >

Re: Flink convert Table to DataSet[Row]

2020-05-20 文章 Jingsong Li
blink planner是不支持和Dataset的转换的。 Best, Jingsong Lee On Wed, May 20, 2020 at 2:49 PM 张锴 wrote: > def main(args: Array[String]): Unit = { > val tableEnvSettings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inBatchMode() > .build() > > val tableEnv:

回复: 关于FlinkSQL slot数量过多的问题

2020-05-20 文章 111
Hi, 感谢回复,我研究下引用的资料,期待1.11 Best, Xinghalo

Re: Flink convert Table to DataSet[Row]

2020-05-20 文章 张锴
def main(args: Array[String]): Unit = { val tableEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode() .build() val tableEnv: TableEnvironment = TableEnvironment.create(tableEnvSettings) val catalog = new HiveCatalog( "myhive", //

Re: Blink Planner构造Remote Env

2020-05-20 文章 Jark Wu
Hi, 因为 Blink planner 不支持 org.apache.flink.table.api.java.BatchTableEnvironment,所以无法对接 ExecutionEnvironment。 Blink planner 的 batch 模式,目前只支持 TableEnvironemnt,不过也可以通过 hack 的方式去使用 StreamTableEnvironment, 需要直接去构造 StreamTableEnvironmentImpl: StreamExecutionEnvironment execEnv =

Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'

2020-05-20 文章 Leonard Xu
Hi, 如劲松所说,这是 hiveCatalog的一个bug,正常是支持的,1.11中会修复 Best, Leonard > 在 2020年5月20日,13:57,wind.fly@outlook.com 写道: > > Hi, >版本用的是1.10.0,x.log.yanfa_log是正常的表格式,本人demo中用的是hive catalog: > Catalog myCatalog = new HiveCatalog("x", "default", > >"D:\\conf", "1.1.0"); > >

关于FlinkSQL slot数量过多的问题

2020-05-20 文章 111
Hi, 各位好,最近在使用Flink SQL实现离线处理时,遇到资源占用过多的问题: 1 由于之前一个taskmanager配置了多个slot,导致slot之间内存抢占溢出的问题,后来每个taskmanager就配置了一个slot 2 有的sql非常复杂,需要读取多个hive source,我们配置了开启hive推断并配置最大的并行度为10; 3 当多个操作节点并行时,一个普通的任务可能需要申请上百个slot 直接导致yarn集群资源被耗光。 想了解下,针对slot是否有相关配置,限制最大slot的申请数量,使得不同任务可以共享slot执行。

Re: Flink convert Table to DataSet[Row]

2020-05-20 文章 Jingsong Li
不好意思, 还是看不到你的图,可以考虑copy异常栈。 方便问一下后续的指标计算用Table/SQL搞不定吗? Best, Jingsong Lee On Wed, May 20, 2020 at 1:52 PM 张锴 wrote: > [image: 微信图片_20200520132244.png] > [image: 微信图片_20200520132343.png] > > Jingsong Li 于2020年5月20日周三 下午1:30写道: > >> Hi, >> >> 没看见有附件,请问为啥需要转车DateSet,Table里有啥搞不定呢? >> >> Best,

Re: flink sql使用维表关联时报Temporal table join currently only supports 'FOR SYSTEM_TIME AS OF' left table's proctime field, doesn't support 'PROCTIME()'

2020-05-20 文章 Jingsong Li
Hi Junbao, Xinghalo, 抱歉,现在HiveCatalog保存proctime字段是有bug的,[1]。所以就像你说的,proctime不能在建表时创建。需要在select的时候基于PROCTIME()函数生成,这样来绕过。 正在修复中,你也可以打上patch来试试,或者等下1.11.0或1.10.2的发布。 [1]https://issues.apache.org/jira/browse/FLINK-17189 Best, Jingsong Lee On Wed, May 20, 2020 at 1:58 PM wind.fly@outlook.com