Re: Flink SQL No Watermark

2020-08-12 Thread Shengkai Fang
hi, 你的意思是没有办法在codegen出来的代码上加断点的意思吗? 这里倒是有一个比较hack的方法: 将生成的类放在一个java文件之中,然后修改改下GeneratedClass下的newInstance方法,如果classname == “WatermarkGenerator$2” 则将刚才的类则返回 new WatermarkGenerator$2 这个类。 我个人对于问题的猜测是有一条数据的rowtime远远晚于其他数据,从而将整体的watermark提得很高,导致后面的“晚到”的数据一直无法触发watermark的生成。 forideal 于2020年8月13日周四

请教关于Flink算子FINISHED状态时无法保存Checkpoint的问题

2020-08-12 Thread yulu yang
请教大佬一个我最近在配置Flink流的过程中遇到问题, flink作业中关联使用了物理表(字典表),在flinkjob启动后,会对字典表进行一次读取,然后该算子会变成FINISHED状态,导致该flinkjob无法保存checkpoint和savepoint。一般大家遇到这种问题都是怎么处理的,我这个作业在数据加工过程中必须用到字典表赋值。

FlinkSQL命令行方式提交sql任务的jar依赖问题

2020-08-12 Thread Zhao,Yi(SEC)
背景,我的flink集群(10机器)A,此外还有一个单独的flink机器(作为提交任务的机器)B。 现在比较混乱,哪些jar需要放到A,哪些放到B。 (1) kafka ssl 证书,这个肯定需要放到A,因为kafka提供的属性只是ssl证书的路径,真正读取证书是任务开始运行之后,因此这个路径必须是集群A可访问,B是否可访问其实无所谓。 (2)

Re:Flink SQL No Watermark

2020-08-12 Thread forideal
大家好 关于这个问题我进行了一些 debug,发现了 watermark 对应的一个 physical relnode 是 StreamExecWatermarkAssigner 在translateToPlanInternal 中生成了如下一个 class 代码, public final class WatermarkGenerator$2 extends org.apache.flink.table.runtime.generated.WatermarkGenerator { public

Re: Avro format in pyFlink

2020-08-12 Thread Rodrigo de Souza Oliveira Brochado
Thank you Xingbo. I've managed to get it working adding the Avro jar and the three artifacts from the *com.fasterxml.jackson.core* group [1]. Is it required to also add the jackson-mapper-asl jar? About joda-time, I suppose that it'll not be required, as I won't use date types in my Avro schema.

Re: Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-08-12 Thread Dream-底限
flink暴漏的lookup 是支持设置缓存记录条数和缓存时间的吧,把时间和条数设置的小一点或者直接禁用缓存,如果流表数据量不大的话可以不用异步访问,数据量大的话异步访问不加缓存维表存储引擎可能压力过大 Jim Chen 于2020年8月13日周四 上午11:53写道: > 请问下,如果使用了localcache+asyncIO的方式,缓存一致性,有什么解决的思路吗?维表的状态是频繁更新的 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re: 对于维表频繁更新,状态越来越大的场景如何保证数据的准确性

2020-08-12 Thread Jim Chen
请问下,如果使用了localcache+asyncIO的方式,缓存一致性,有什么解决的思路吗?维表的状态是频繁更新的 -- Sent from: http://apache-flink.147419.n8.nabble.com/

如何设置FlinkSQL并行度

2020-08-12 Thread Zhao,Yi(SEC)
看配置文件有 execution. Parallelism,但这个明显是全局类型的配置。 如何给Sql生成的数据源结点,window结点,sink结点等设置不同的并行度呢? 比如数据源理论上应该和kafka分区数一致比较好,window则需要根据数据量考虑计算压力,sink也应该有相应的场景考虑。

Re: 关于FlinkSQL的一些最佳实践咨询

2020-08-12 Thread Leonard Xu
FLIP-129 [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API > 在 2020年8月13日,11:26,zhao liang 写道:

答复: 关于FlinkSQL的一些最佳实践咨询

2020-08-12 Thread zhao liang
请问table api要重构是哪个FLIP,我想关注下 发件人: Shengkai Fang 日期: 星期四, 2020年8月13日 11:09 收件人: user-zh@flink.apache.org 主题: Re: 关于FlinkSQL的一些最佳实践咨询 针对(3)社区建议使用sql api, table api现在正准备重构。 靳亚洽 于2020年8月13日周四 上午11:00写道: > 针对2, 我们公司采用的是通过提交一个解析flink sql的jar包,把sql作为jar包的参数来执行的。采用接口机将用户的udf > jar包和平台提供的解析flink

Re: flink1.11错误

2020-08-12 Thread Xingbo Huang
Hi, 我网上查了一下,这个似乎是你Python环境的问题,你可以看看这个和你是不是类似的 https://blog.csdn.net/m0_38024592/article/details/88410878 Best, Xingbo 小学生 <201782...@qq.com> 于2020年8月13日周四 上午11:05写道: > 各位大佬,使用pyflink自定义udf,运行中出现这个错误,这个怎么解决呢 > Caused by: java.io.IOException: Failed to execute the command: python -c > import

Re: 关于FlinkSQL的一些最佳实践咨询

2020-08-12 Thread Shengkai Fang
针对(3)社区建议使用sql api, table api现在正准备重构。 靳亚洽 于2020年8月13日周四 上午11:00写道: > 针对2, 我们公司采用的是通过提交一个解析flink sql的jar包,把sql作为jar包的参数来执行的。采用接口机将用户的udf > jar包和平台提供的解析flink sql的jar包提交到集群。 > 针对3, 既然使用了flink sql,当然期望sql搞定一切了 > 针对4, > 我们改造了下flink-client模块的一小块代码,支持提交多jar包到集群,所以connector那些包就通过这种方式上传了。当然提前放集群上面也是可以的。

TableColumn为啥不包含comment

2020-08-12 Thread Harold.Miao
hi all 我发现TableColumn class不包含column comment , 给开发带来了一点麻烦,请教大家一下,谢谢 -- Best Regards, Harold Miao

flink1.11????

2020-08-12 Thread ??????
??pyflink??udf Caused by: java.io.IOException: Failed to execute the command: python -c import pyflink;import os;print(os.path.join(os.path.abspath(os.path.dirname(pyflink.__file__)), 'bin')) output: Could not find platform independent

回复: 关于FlinkSQL的一些最佳实践咨询

2020-08-12 Thread 靳亚洽
针对2, 我们公司采用的是通过提交一个解析flink sql的jar包,把sql作为jar包的参数来执行的。采用接口机将用户的udf jar包和平台提供的解析flink sql的jar包提交到集群。 针对3, 既然使用了flink sql,当然期望sql搞定一切了 针对4, 我们改造了下flink-client模块的一小块代码,支持提交多jar包到集群,所以connector那些包就通过这种方式上传了。当然提前放集群上面也是可以的。 | | 靳亚洽 | | jinya...@163.com | 签名由网易邮箱大师定制 在2020年08月13日

Re: Change in sub-task id assignment from 1.9 to 1.10?

2020-08-12 Thread Zhu Zhu
Hi Ken, There were no such changes in my mind. And in Flink there was no designed logic to scatter subtasks of the same operator into different taskmanagers. One workaround to solve your problem could be to increase the parallelism of your source vertex to be no smaller than no other operator so

Re: Flink sql TUMBLE window 不支持offset吗

2020-08-12 Thread Benchao Li
不管是SQL还是DataStream,底层最终用来划分窗口的时候的时间,都是unix timestamp。 SQL里面的Timestamp类型,是可以带着时区的,但是转成unix timestamp表示的时候,时区信息就丢掉了。 Zhao,Yi(SEC) 于2020年8月13日周四 上午10:12写道: > 大概懂你意思,sql情况下event time都是Timestamp数据类型,这是一种-MM-dd >

Re: 使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes

2020-08-12 Thread xiao cai
Dear Leonard Xu: 我会去关注这个issue,非常感谢答疑。 原始邮件 发件人: Leonard Xu 收件人: user-zh 发送时间: 2020年8月12日(周三) 16:05 主题: Re: 使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes Hi Group by 和 left join 都是会有 retract 消息的,这类消息需要UpsertStreamTableSink才能处理, Kafka connetor

Re: flink 1.11 udtf动态定义返回类型

2020-08-12 Thread Zou Dan
动态定义你指的是说字段的类型和数量都不是固定的吗?这个应该是不行的。你举的 1.10 例子也不是动态的呀 > 2020年8月12日 下午5:32,李强 写道: > > flink 1.11 udtf可以像1.10那样自定义返回类型不我希望可以像flink 1.10这样: > > @Override > public TypeInformation return new RowTypeInfo(Types.STRING, > Types.STRING); > } > > > 不希望像flink 1.11这样 > @FunctionHint(output =

Re:Re:Re: 用hive streaming写 orc文件的问题

2020-08-12 Thread flink小猪
添加不了附件,我就直接贴代码了 import java.time.Duration import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import

Re: 关于FlinkSQL的一些最佳实践咨询

2020-08-12 Thread Zhao,Yi(SEC)
没有小伙伴帮忙看下这个问题吗。可以优先看下第2、3、4个问题。 其他问题,第1/5个问题,我自己大概感觉,直接使用memory catalog就ok啦。我看了下FlinkSQL好像没有临时表的概念。10个SQL开10个sql-client的session,各个session独享自己的memory catalog,创建的表也类似于临时表的效果。 发件人: "Zhao,Yi(SEC)" 日期: 2020年8月12日 星期三 下午2:20 收件人: "user-zh@flink.apache.org" 主题: 关于FlinkSQL的一些最佳实践咨询

Re: Flink sql TUMBLE window 不支持offset吗

2020-08-12 Thread Zhao,Yi(SEC)
大概懂你意思,sql情况下event time都是Timestamp数据类型,这是一种-MM-dd HH:mm:ss格式的,从这个角度讲的确是不需要考虑offset的问题(只要周期是1min/1h/1d等类似)。只需要这个Timestamp是我需要的时区格式的日期即可。 其实我是联系DatastreamAPI的,因为DatastreamAPI中是基于unix timestamp(时间戳)来划分的。 在 2020/8/12 下午9:21,“Benchao Li” 写入: Hi, 目前还没有支持offset,有一个issue[1] 在跟进解决这个问题。

flink 1.11 udtf只能通过注解定义输出字段个数和类型吗?

2020-08-12 Thread 李强
各位大佬你们好,我想请教一个问题: flink 1.11 udtf只能通过注解定义输出字段个数和类型吗,好像不能再通过flink1.10那样重写getResultType方法来定义输出字段类型了,flink1.11里使用getResultType会报错如下: Caused by: org.apache.flink.table.api.ValidationException: Error in extracting a signature to output mapping. at

flink 1.11 发布sql任务到yarn session报java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.hbase.util.ByteStringer

2020-08-12 Thread wind.fly....@outlook.com
Hi, all: 本人试图将flink-sql-gateway(https://github.com/ververica/flink-sql-gateway)升级到1.11支持版本,将flink sql(用到hbase connector)提交到yarn session后运行时报: org.apache.hadoop.hbase.DoNotRetryIOException: java.lang.NoClassDefFoundError: Could not initialize class

Re:Re: 用hive streaming写 orc文件的问题

2020-08-12 Thread flink小猪
尴尬,我明明上传了附件但是找不到了- -,我又上传了一次。 1.写orc和写parquet的作业在同一个作业中,并没有报错,但是hive中查不到数据,在hdfs目录里面有但是并没有按照checkpoint间隔生成,也没有生成_success文件。 2.没有设置table.exec.hive.fallback-mapred-writer。 以下是我的几个疑问 1.为什么hive streaming 生成orc文件需要导入flink-orc_2.11jar包,而parquet不需要? 2.sql client 我想要设置checkpoint生成间隔我应该在哪里设置?

Re: 关于Flink CDC问题第二弹

2020-08-12 Thread 陈韬
谢谢,那我等下再试一下 Best, TonyChen > 2020年8月12日 下午10:27,Jark Wu 写道: > > 1. 是的。目前JDK11还不支持。因为用了 Unsafe 的一些方法。 这个会在未来修复。 > 2. 第二个问题貌似是类冲突的问题。 flink-cdc-connectors 依赖了Kafka Connect 2.5.0,并且没有做 > shade。如果你的项目中也依赖了Kafka Connect,要保持版本一致。 > > Best, > Jark > > On Wed, 12 Aug 2020 at 20:07, 陈韬 wrote: > >>

Re: Flink CPU load metrics in K8s

2020-08-12 Thread Bajaj, Abhinav
Thanks Xintong for your input. From the information I could find, I understand the JDK version 1.8.0_212 we use includes the docker/container support. I also had a quick test inside the docker image using the below – Runtime.getRuntime().availableProcessors() It showed the right number of CPU

Re: Please help, I need to bootstrap keyed state into a stream

2020-08-12 Thread Marco Villalobos
Hi Seth, Thank you for the advice. The solution you mentioned is exactly what I did. I wrote a small tutorial that explains how to repeat that pattern. You can read about my solution at https://github.com/minmay/flink-patterns/tree/master/bootstrap-keyed-state-into-stream

Re: What async database library does the asyncio code example use?

2020-08-12 Thread KristoffSC
Hi, I do believe that example from [1] where you see DatabaseClient is just a hint that whatever library you would use (db or REST based or whatever else) should be asynchronous or should actually not block. It does not have to be non blocking until it runs on its own thread pool that will return

Re: What async database library does the asyncio code example use?

2020-08-12 Thread Marco Villalobos
So, I searched for an async DatabaseClient class, and I found r2dbc. Is that it? https://docs.spring.io/spring-data/r2dbc/docs/1.1.3.RELEASE/reference/html On Wed, Aug 12, 2020 at 9:31 AM Marco Villalobos wrote: > I would like to enrich my stream with database calls as documented at: > > >

Re: Please help, I need to bootstrap keyed state into a stream

2020-08-12 Thread Seth Wiesman
Just to summarize the conversation so far: The state processor api reads data from a 3rd party system - such as JDBC in this example - and generates a savepoint file that is written out to some DFS. This savepoint can then be used to when starting a flink streaming application. It is a two-step

What async database library does the asyncio code example use?

2020-08-12 Thread Marco Villalobos
I would like to enrich my stream with database calls as documented at: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/operators/asyncio.html What async database library

Re: Pushing metrics to Influx from Flink 1.9 on AWS EMR(5.28)

2020-08-12 Thread bat man
An update in the yarn logs I could see the below - Classpath:

Re: Is there a way to start a timer without ever receiving an event?

2020-08-12 Thread Andrey Zagrebin
I do not think so. Each timer in KeyedProcessFunction is associated with the key. The key is implicitly set into the context from the record which is currently being processed. On Wed, Aug 12, 2020 at 8:00 AM Marco Villalobos wrote: > In the Stream API KeyedProcessFunction,is there a way to

Re: JM & TM readiness probe

2020-08-12 Thread Andrey Zagrebin
Hi Alexey, As far as I know, TaskManager does not expose the REST API. ResourceManager redirects some REST calls to TaskManager [1]: /taskmanagers/:taskmanagerid/metrics /taskmanagers/:taskmanagerid/thread-dump These calls may be not so lightweight. I do not know others or how you ask e.g. the

Pushing metrics to Influx from Flink 1.9 on AWS EMR(5.28)

2020-08-12 Thread bat man
Hello Experts, I am running Flink - 1.9.0 on AWS EMR(emr-5.28.1). I want to push metrics to Influxdb. I followed the documentation[1]. I added the configuration to /usr/lib/flink/conf/flink-conf.yaml and copied the jar to /usr/lib/flink//lib folder on master node. However, I also understand that

[DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-12 Thread Kostas Kloudas
Hi all, As described in FLIP-131 [1], we are aiming at deprecating the DataSet API in favour of the DataStream API and the Table API. After this work is done, the user will be able to write a program using the DataStream API and this will execute efficiently on both bounded and unbounded data.

Re: 关于Flink CDC问题第二弹

2020-08-12 Thread Jark Wu
1. 是的。目前JDK11还不支持。因为用了 Unsafe 的一些方法。 这个会在未来修复。 2. 第二个问题貌似是类冲突的问题。 flink-cdc-connectors 依赖了Kafka Connect 2.5.0,并且没有做 shade。如果你的项目中也依赖了Kafka Connect,要保持版本一致。 Best, Jark On Wed, 12 Aug 2020 at 20:07, 陈韬 wrote: > hi everyone: > > 今天试用了下1.11.1下的cdc功能,遇到几个问题,来请教一下大家。 > > 环境为

Re: FlinkSQL even time 定义问题

2020-08-12 Thread Shengkai Fang
对于第一个问题- 在查询语句之中定义watermark: 现在并不支持。这主要是由于在同一个作业之中,如果select的数据源是同一个表,那么在实际的优化过程之中,会将source进行复用,而现在同一个source并不支持多个watermark assigner。如果在不同的作业之中,那么显然只要修改watermark的定义语句即可。 对于第二个问题:rowtime的定义是必须建立在创建表的过程之中的。 对于第三个问题:社区正在讨论这个问题。现在仅支持多个insert的sql在同一个job之中。 Zhao,Yi(SEC) 于2020年8月12日周三 下午5:36写道: >

Re:Re: ​请问在flinksql中如何使用聚合函数 LISTAGG

2020-08-12 Thread chenxuying
好的 , 原来是bug , 感谢回答 在 2020-08-12 21:32:40,"Benchao Li" 写道: >看起来是一个已知bug[1],已经修复,但是还没有发布。 > >[1] https://issues.apache.org/jira/browse/FLINK-18862 > >chenxuying 于2020年8月12日周三 下午9:25写道: > >> 版本: >> flinksql 1.11.0 >> 需求: >> 需要实现多行聚合成一行功能 >> 代码如下: >> environmentSettings = >>

Re: ​请问在flinksql中如何使用聚合函数 LISTAGG

2020-08-12 Thread Benchao Li
看起来是一个已知bug[1],已经修复,但是还没有发布。 [1] https://issues.apache.org/jira/browse/FLINK-18862 chenxuying 于2020年8月12日周三 下午9:25写道: > 版本: > flinksql 1.11.0 > 需求: > 需要实现多行聚合成一行功能 > 代码如下: > environmentSettings = > EnvironmentSettings.new_instance().in_streaming_mode().build() > t_env =

​请问在flinksql中如何使用聚合函数 LISTAGG

2020-08-12 Thread chenxuying
版本: flinksql 1.11.0 需求: 需要实现多行聚合成一行功能 代码如下: environmentSettings = EnvironmentSettings.new_instance().in_streaming_mode().build() t_env = StreamTableEnvironment.create(environment_settings = environmentSettings)

Re: Flink sql TUMBLE window 不支持offset吗

2020-08-12 Thread Benchao Li
Hi, 目前还没有支持offset,有一个issue[1] 在跟进解决这个问题。 但是你这个case应该是不需要offset就可以的,现在划分窗口的逻辑是直接按照毫秒级别的时间戳来整除进行计算的。 所以如果窗口是1min/1h/1d 这种,都是整点的窗口,没有问题的。如果你有这个问题,那应该是你的时间字段有时区问题。 有一种case是需要offset的,比如一周这种窗口,因为1980-1-1是周四,所以一周的窗口是从周四开始的。 [1] https://issues.apache.org/jira/browse/FLINK-17767 Zhao,Yi(SEC)

Re: AppendStreamTableSink doesn't support consuming update changes

2020-08-12 Thread Benchao Li
Hi, 这个是正常现象。 如果你用了普通的group by的话,那么它的结果就是有更新的,所以需要sink支持写入update的结果, 但是kafka目前只能写入append的数据,所以会报上面的错误。 你可以尝试下用window group[1],在这个文档的第二个示例里,它的结果是append的,不会有更新。 [1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#aggregations xiao cai 于2020年8月12日周三 下午3:42写道: >

Re: flink 1.10.1 sql limit 不生效

2020-08-12 Thread Benchao Li
我有点迷惑了 1. 所以你现在的问题得到了解决了么? 2. 我看你最开始写的那个SQL里面并没有window呀,为啥后面又说用到了TUMBLE_START? Peihui He 于2020年8月12日周三 下午3:40写道: > Hi BenChao > > SQL是流模式下执行的,看着不生效的表现就是显示的数量超过limit的数量。 > order by TUMBLE_START desc 好像不是预期的结果 > > 这个是需要给 TUMBLE_START 得到的timestamp 转为 long么? > > Best Wishes. > > Benchao Li

Flink sql TUMBLE window 不支持offset吗

2020-08-12 Thread Zhao,Yi(SEC)
如题,是不是不支持offset呀。在中国,天窗口实际应该是 INTVERVAL ‘1’ DAY + 8小时offset。 但是看了文档没发现添加offset的语法。 如果仅仅是时间显示问题还好说,搞个函数转一转,但分窗这个不提供入口没办法。

关于Flink CDC问题第二弹

2020-08-12 Thread 陈韬
hi everyone: 今天试用了下1.11.1下的cdc功能,遇到几个问题,来请教一下大家。 环境为 https://github.com/ververica/flink-cdc-connectors/wiki/中文教程 中提供的 docker-compose.yml 问题1:通过sql-cli提交了flink-sql,之后修改本地mysql中的数据时,报错。是否对jdk版本有要求?

Re: Flink CPU load metrics in K8s

2020-08-12 Thread Xintong Song
Hi Abhinav, Do you know how many total cpus does the physical machine have where the kubernetes container is running? I'm asking because I suspect whether JVM is aware that only 1 cpu is configured for the container. It does not work like JVM understands how many cpu are configured and controls

Re: 当数据源产生的速度过快时使用AscendingTimestampExtractor并没有忽略非递增的事件时间数据

2020-08-12 Thread SSY
我想明白了,AscendingTimestampExtractor产生的水印是周期水印,当数据源速率过快时,水印还没来得及发送,后面的数据已经进入到算子当中了,所以造成了非递增事件也会被包含在算子中。。在测试环境下,如果换成Punctuated Watermarks对每个事件发送水印,就没问题了 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink slot之间内存隔离问题

2020-08-12 Thread Xintong Song
slot 之间只有 managed memory 是隔离的。 取决于你的 flink 版本,1.10 之后 managed memory 除了用于 batch operator,还会用于 rocksdb state backend。 Thank you~ Xintong Song On Wed, Aug 12, 2020 at 3:55 PM Cayden chen <1193216...@qq.com> wrote: > hi,all > 对于flink slot之间内存隔离有个疑问。就是slot中的task请求networkbufferpool的时候是否会限制为 >

Re: Using Event Timestamp sink get's back with machine timezone

2020-08-12 Thread Timo Walther
Hi Faye, the problem lies in the wrong design of JDK's java.sql.Timestamp. You can also find a nice summary in the answer here [1]. java.sql.Timestamp is timezone dependent. Internally, we subtract/normalize the timezone and work with the UNIX timestamp. Beginning from Flink 1.9 we are using

FlinkSQL even time 定义问题

2020-08-12 Thread Zhao,Yi(SEC)
咨询下,FlinkSQl的event time必须在DDL中定义吗。能否DDL只是定义普通数据字段,比如有个time属性。 然后在select 的时候指定具体使用的watermark策略。 目的:假设基于同一个表A,我查询1需要使用watermark为time-1min,查询2需要使用watermark为time-2min。 其次除了这种case,如果我基于表1查询得到结果输出到表2,那么表2的event time定义呢?比如在表2的定义中基于表2的某个属性(比如叫time2),然后插入表2的时候只要time2属性存在就可以? 此外,如果对比datastream

Re: 用hive streaming写 orc文件的问题

2020-08-12 Thread Rui Li
Hi, 写orc表的作业有报错么?还是成功执行但是hive查不到数据呢? 看不到你贴的代码,有没有设置table.exec.hive.fallback-mapred-writer? On Wed, Aug 12, 2020 at 5:14 PM 18579099...@163.com <18579099...@163.com> wrote: > > > 我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到, > >

flink 1.11 udtf动态定义返回类型

2020-08-12 Thread 李强
flink 1.11 udtf可以像1.10那样自定义返回类型不我希望可以像flink 1.10这样: @Override public TypeInformation

Re: how to add a new runtime operator

2020-08-12 Thread Timo Walther
Hi Vincent, we don't have a step by step guide for adding new operators. Most of the important operations are exposed via DataStream API. Esp. ProcessFunction [1] fits for most complex use cases with access to the primitives such as time and state. What kind of operator is missing for your

用hive streaming写 orc文件的问题

2020-08-12 Thread 18579099...@163.com
我通过datagen作为流表,分别写入两个表结构相同,存储格式不同的hive表(一个orc,一个parquet)中,其中parquet表正常写入并且生成了_SUCCESS文件,hive也能查询到, 但是orc表没有生成_SUCCESS文件,并且hive中无法查询到,我是在本地ide上直接运行的,hive版本是1.2.1,flink版本是1.11.1,同时我发现orc表的分区中生成的文件数量比parquet多,

?????? pyflink 1.11.1 execute_sql sql_update????????????????????????????????????????????, ??????????????Deprecated

2020-08-12 Thread xuzh
---- ??: "user-zh"

Re: pyflink 1.11.1 execute_sql sql_update执行问题,下面三种情况只有第一种情况执行成功, 但是该方法是被Deprecated

2020-08-12 Thread Leonard Xu
Hi 现象是正常的, execute_sql是一个异步的方法,提交后就退出了,如果需要等待执行结果,可以调用如下方法显式地等待 sql_result = t_env.execute_sql("insert into print_sink select wd,count(wd) cnt from sc group by wd") sql_result.get_job_client().get_job_execution_result().result() 祝好 Leonard Xu > 在 2020年8月12日,16:00,徐振华 写道: > > from

Re: pyflink 1.11.1 execute_sql sql_update执行问题,下面三种情况只有第一种情况执行成功, 但是该方法是被Deprecated

2020-08-12 Thread Xingbo Huang
Hi, execute_sql已经包含有了execute的意思了无非前者是异步非阻塞的,所以,你就别execute_sql之后还来一个execute了,具体你可以看下文档[1]。如果你使用execute_sql且在本地跑的话,你需要进行如下操作,否则会程序直接跑完没有结果。 result = t_env.execute_sql("你写的sql") result.get_job_client().get_job_execution_result().result() [1]

Re:flink1.10.0任务失败但是application仍然在yarn运行不会自动kill

2020-08-12 Thread 魏烽
是的,但是连接超时判定任务失败后也应该把application在yarn kill掉吧,不然会一直挂着占用资源 原始邮件 发件人: Michael Ran 收件人: user-zh 发送时间: 2020年8月12日(周三) 15:41 主题: Re:flink1.10.0任务失败但是application仍然在yarn运行不会自动kill Connection timed out: nb-bdh-hadoop.slave14/10.10.5.14:21913是不是连接超时,请求没发送过去? 在 2020-08-12 13:57:52,"魏烽"

Re: 使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes

2020-08-12 Thread Leonard Xu
Hi Group by 和 left join 都是会有 retract 消息的,这类消息需要UpsertStreamTableSink才能处理, Kafka connetor 目前的实现是AppendStreamTableSink,所以不能处理 社区已经有一个issue在处理这个问题了,应该1.12能提供这个功能。 Best Leonard [1]https://issues.apache.org/jira/browse/FLINK-18826 > 在

回复:使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes

2020-08-12 Thread xuzh
是不是update-mode 改用 update模式 --原始邮件-- 发件人: "user-zh"

Re: Re: Re: Re:Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-08-12 Thread Jingsong Li
另外问一下,是什么格式?csv还是parquet。 有等到10分钟(rollover-interval)过后和下一次checkpoint后再看吗? On Wed, Aug 12, 2020 at 2:45 PM kandy.wang wrote: > > > > > > > 有的。就是写了一半,做了一个checkpoint ,然后程序 做一个savepoint cancel掉, > 重启的时候,从最新的savepoint恢复,但是重启的时候已经属于新分区了。 > 就是感觉停止之前正在写的那个分区,没有触发commit > > > > > 在 2020-08-12

Re: Updating kafka connector with state

2020-08-12 Thread Piotr Nowojski
(adding back user mailing list) Yes, that is correct. Flink 1.8.0 is causing the problem here. 1. Upgrade Flink to 1.11.1 without upgrading the connector 2. Take a new savepoint 3. Upgrade connector to the universal one 4. Restore upgraded job from the new savepoint (2) If it doesn't work,

pyflink 1.11.1 execute_sql sql_update????????????????????????????????????????????, ??????????????Deprecated

2020-08-12 Thread ??????
from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import EnvironmentSettings, StreamTableEnvironment # pyflink 1.11.1 environment_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() senv =

使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes

2020-08-12 Thread xiao cai
Hi Jark: 版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错: AppendStreamTableSink doesn't support consuming update changes which is produced by node GroupAggregate 我希望能够在sql校验时也能使upsert操作作用于kafka sink,或者等upsert完成,再写入kafka 附上执行sql: create table kafka_table_1 (

flink slot????????????????

2020-08-12 Thread Cayden chen
hi??all flink slot??slottasknetworkbufferpool?? 1/slost(??)??managed memory??

Re: Updating kafka connector with state

2020-08-12 Thread Piotr Nowojski
Hi Nikola, Which Flink version are you using? Can you describe step by step what you are doing? This error that you have should have been fixed in Flink 1.9.0+ [1], so if you are using an older version of Flink, please first upgrade Flink - without upgrading the job, then upgrade the connector.

当数据源产生的速度过快时使用AscendingTimestampExtractor并没有忽略非递增的事件时间数据

2020-08-12 Thread SSY
数据源如上图所示,6行3列。这里的逻辑是以第三列为事件事件,采用滚动窗口(10s),统计窗口内最大的第一列的时间(PS:第一列数据这里故意设置成升序),输出为“窗口内最大的第一列时间所在行的第二列的值,窗口内元素的个数”。如果一切正常,我认为结果应该是 2,2 2,5 我是用kafka来发送数据源,当发送速率为100毫秒每条数据时,结果和预期相符,如下图

Re:flink1.10.0任务失败但是application仍然在yarn运行不会自动kill

2020-08-12 Thread Michael Ran
Connection timed out: nb-bdh-hadoop.slave14/10.10.5.14:21913是不是连接超时,请求没发送过去? 在 2020-08-12 13:57:52,"魏烽" 写道: >各位好: > >Flink1.10.0版本发现偶发任务失败后但是web ui仍然挂着该任务并没有停止,日志如下: > >请问有遇到一样的情况的嘛 > >[INFO] 2020-07-28 16:34:00.938 - [taskAppId=TASK-38-97-193]:[106] - -> >2020-07-28 16:33:52,863 INFO

Re: flink 1.10.1 sql limit 不生效

2020-08-12 Thread Peihui He
[image: image.png] order by TUMBLE_START 结果如上图 Peihui He 于2020年8月12日周三 下午3:40写道: > Hi BenChao > > SQL是流模式下执行的,看着不生效的表现就是显示的数量超过limit的数量。 > order by TUMBLE_START desc 好像不是预期的结果 > > 这个是需要给 TUMBLE_START 得到的timestamp 转为 long么? > > Best Wishes. > > Benchao Li 于2020年8月12日周三 下午3:12写道: > >>

AppendStreamTableSink doesn't support consuming update changes

2020-08-12 Thread xiao cai
Hi Jark: 版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错: AppendStreamTableSink doesn't support consuming update changes which is produced by node GroupAggregate

AppendStreamTableSink doesn't support consuming update changes

2020-08-12 Thread xiao cai
Hi Jark: 版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错:

Re: flink 1.10.1 sql limit 不生效

2020-08-12 Thread Peihui He
Hi BenChao 发现问题了,是因为select 的字段中包含了array,导致数据显示的比实际limit数据要多 Best Wishes. Benchao Li 于2020年8月12日周三 下午3:12写道: > 你这个SQL是在流式还是批式模式下执行的呢?limit不生效的表现是什么呢? > > Peihui He 于2020年8月12日周三 下午3:03写道: > > > Hi all, > > > > 用zeppelin执行sql的时候发现用了 LEFT JOIN LATERAL TABLE 时候 limit 不生效 > > sql 类似下面: > > select

Re: flink 1.10.1 sql limit 不生效

2020-08-12 Thread Benchao Li
你这个SQL是在流式还是批式模式下执行的呢?limit不生效的表现是什么呢? Peihui He 于2020年8月12日周三 下午3:03写道: > Hi all, > > 用zeppelin执行sql的时候发现用了 LEFT JOIN LATERAL TABLE 时候 limit 不生效 > sql 类似下面: > select a, b, c, t from tb LEFT JOIN LATERAL TABLE (Tf(a)) as T(c, d, e) ON > TRUE order by t desc limit 10 > > 如果select

Re: flink 1.10.1 sql limit 不生效

2020-08-12 Thread Peihui He
应该是我这边sql问题,我这边在看看,打扰大家了 Peihui He 于2020年8月12日周三 下午3:03写道: > Hi all, > > 用zeppelin执行sql的时候发现用了 LEFT JOIN LATERAL TABLE 时候 limit 不生效 > sql 类似下面: > select a, b, c, t from tb LEFT JOIN LATERAL TABLE (Tf(a)) as T(c, d, e) > ON TRUE order by t desc limit 10 > > 如果select 结果中不包括c的化,就正常了 > >

Hostname for taskmanagers when running in docker

2020-08-12 Thread Nikola Hrusov
Hello, After upgrading the docker image for flink to 1.11.1 from 1.9 the hostname of the taskmanagers reported to our metrics show as IPs (e.g. 10.0.23.101) instead of hostnames. In the docker compose file we specify the hostname as such: *hostname: "taskmanager-{{ '{{' }}.Node.Hostname{{ '}}'

flink 1.10.1 sql limit 不生效

2020-08-12 Thread Peihui He
Hi all, 用zeppelin执行sql的时候发现用了 LEFT JOIN LATERAL TABLE 时候 limit 不生效 sql 类似下面: select a, b, c, t from tb LEFT JOIN LATERAL TABLE (Tf(a)) as T(c, d, e) ON TRUE order by t desc limit 10 如果select 结果中不包括c的化,就正常了 请问这个是什么问题呢?sql是写的不对么? Best Wishes.

回复: flink中同时有多个大数据组件开启kerberos时keytab的选择

2020-08-12 Thread zjfpla...@hotmail.com
图片挂了,其内容为: security.kerberos.login.use-ticket-cache: false security.kerberos.login.keytab: /home/zjf/kafka.keytab security.kerberos.login.principal: ka...@test.com security.kerberos.login.contexts: Client,KafkaClient zookeeper.sasl.service-name: zookeeper zookeeper.sasl.login-context-name: Client

Re:Re: Re: Re:Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-08-12 Thread kandy.wang
有的。就是写了一半,做了一个checkpoint ,然后程序 做一个savepoint cancel掉, 重启的时候,从最新的savepoint恢复,但是重启的时候已经属于新分区了。 就是感觉停止之前正在写的那个分区,没有触发commit 在 2020-08-12 14:26:53,"Jingsong Li" 写道: >那你之前的分区除了in-progress文件,有已完成的文件吗? > >On Wed, Aug 12, 2020 at 1:57 PM kandy.wang wrote: > >> >> >> >> source就是kafka >>

flink中同时有多个大数据组件开启kerberos时keytab的选择

2020-08-12 Thread zjfpla...@hotmail.com
各位好, flink-conf.yaml中kerberos配置段如下: CDH中hbase+kafka同时开启了kerberos,类似此方式,如果上述配置中的“security.kerberos.login.principal”我用kafka.keytab,会报无权限操作hbase;如果用hbase.keytab ,会报无权限操作kafka。 我查过kerberos的资料,发现有种方式是整合hbase.keytab和kafka.keytab,但是其实是包含2个principal(ka...@test.com,hb...@test.com),而不是一个pincial,如下操作:

Re: Re: Re:Re: flink 1.11 StreamingFileWriter及sql-client问题

2020-08-12 Thread Jingsong Li
那你之前的分区除了in-progress文件,有已完成的文件吗? On Wed, Aug 12, 2020 at 1:57 PM kandy.wang wrote: > > > > source就是kafka > json格式,是exactly-once,按照process-time处理就已经写完了呢。起来的时候,process-time已经属于新的分区了,很正常。但以前的老分区状态还没提交呢。 > > > > > > > in-progress还在,就证明了这个分区的数据还没写完,理论上源头数据需要回退消费,那为什么你重启后作业不会再写这个分区了呢? > > > >

关于FlinkSQL的一些最佳实践咨询

2020-08-12 Thread Zhao,Yi(SEC)
最近刚刚开始研究FlinkSQL,之前一直用的DataStream API,现在想尝试下FlinkSQL的玩法。 目前简单看下来,感觉相对来说相比DataStream API不够灵活,所以有一些实践疑问咨询下。 (1) 通常Kafka的动态表会通过临时表的方式写在程序开头声明定义呢?还是直接写在sql-client的配置文件中呢。 如果写在配置文件,好处是类似于一个固化的表定义一样,每次执行sql-client的cli之后可以马上查询相关数据。但是有个问题是这种情况下kafka设置读取earliest还是latest?貌似根据具体查询需求临时改配置好像也不是很方便。

Re: 关于flink sql的内置函数实现与debug

2020-08-12 Thread shizk233
感谢大佬们的指点 Benchao Li 于2020年8月12日周三 上午11:04写道: > Hi, > > 内置的scalar > function都是通过代码生成来关联到的,入口是`ExprCodeGenerator#generateCallExpression(...)`, > 你可以顺着这里找到你需要看的具体的函数的对应的方法。 > PS:有很多方法是纯代码生成的,可能没法调试 > > 内置的aggregate function有两种,一种是通过表达式直接写的,叫做`DeclarativeAggregateFunction`; >