回复: Split a stream into any number of streams

2019-09-17 文章 Jun Zhang
对DataStream进行keyBy操作是否能解决呢? -- 原始邮件 -- 发件人: "venn"https://stackoverflow.com/questions/53588554/apache-flink-using-filter > -or-split-to-split-a-stream > > regards. > >

????????json schema??????????????????

2019-09-19 文章 Jun Zhang
?? Types.LIST(Types.STRING) -- -- ??: ?? ?? https://go.microsoft.com/fwlink/?LinkId=550986>;

How to write stream data to other Hadoop Cluster by StreamingFileSink

2019-10-04 文章 Jun Zhang
Hi,all: I have 2 hadoop cluster (hdfs://mycluster1 and hdfs://mycluster2),both of them configured the HA, I have a job ,read from streaming data from kafka, and write it to hdfs by StreamingFileSink,now I deployed my job on mycluster1 (flink on yarn),and I want to write the data to mycluster2

Re: How to write stream data to other Hadoop Cluster by StreamingFileSink

2019-10-08 文章 Jun Zhang
Hi,Yang : thank you very much for your reply. I had add the configurations on my hadoop cluster client , both hdfs-site.xml and core-site.xml are configured, the client can read mycluster1 and mycluter2, but when I submit the flink job to yarn cluster , the hadoop client configurations is inva

???????????? state ????????????????

2019-10-30 文章 Jun Zhang
?? https://mp.weixin.qq.com/s/VUEvvoHoupZMpxWQsEeHEA ??2019??10??31?? 10:16??wangl...@geekplus.com.cn

????????????Flink trigger????????????????????

2019-11-01 文章 Jun Zhang
??Evictor?? Best  Jun -- -- ??: Qi Kang

??????yarn-cluster ??????????????

2019-12-02 文章 Jun Zhang
jarcom.dora.job.stream.AliLogStreamingJob ?? ??2019??12??2?? 13:59??

Re: flink1.10 yarn模式无法提交作业

2020-02-09 文章 Jun Zhang
你看看是不是yarn队列有很多accept状态的任务,如果有很多正在排队的任务,那就是可能超过了flink设置的超时时间还没有来得及给你这个任务分配资源,就会报这个错误。 但是这个不影响任务的提交,等一会任务提交成功了,客户端就会返回提交成功的结果。 Fei Han 于2020年2月9日周日 下午5:35写道: > @all: >在Flink1.10中,用yarn模式无法提交作业。 > 提示如下: > lease check if the requested resources are available in the YARN cluster > 2020-02-09 1

?????? ????Flink1.10.0????hive??source??????????

2020-03-03 文章 Jun Zhang
hi??jinsong?? ??10?? ??sql   select * from  mytable limit 1; hive??mytable??10??10?? ??2020??03??2?? 16:38??JingsongLee

?????? ????Flink1.10.0????hive??source??????????

2020-03-03 文章 Jun Zhang
??sql??sql??hive??sql?? ??2020??03??4?? 13:25??JingsongLeehttps://issues.apache.org/jira

Re: 关于flink sql 1.10 source并行度自动推断的疑问

2020-03-24 文章 Jun Zhang
hi,Chief: 目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数, 你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。 Kurt Young 于2020年3月25日周三 上午8:53写道: > 你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。 > > Best, > Kurt > > > On Tue, Mar 24, 20

回复:关于flink sql 1.10 source并行度自动推断的疑问

2020-03-24 文章 Jun Zhang
hi,Chief:     目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数, 你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。 Best  Jun -- 原始邮件 -- 发件人: Kurt Young

flink动态分区写入hive如何处理数据倾斜的问题

2020-03-25 文章 Jun Zhang
大家好: 有一个类似的sql 拿官网的这个做示例:INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08’; 如果实际上第三个type字段,某一种type数据量特别大,导致了数据倾斜,这种情况一般怎么处理呢? 谢谢。

Re: flinksql如何控制结果输出的频率

2020-03-26 文章 Jun Zhang
hi: 你可以自定义一个trigger [1] 第二个场景是可以的,第一种场景我没有遇到过这种场景,你可以试试。 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html flink小猪 <18579099...@163.com> 于2020年3月27日周五 上午11:29写道: > 我有两个需求 > 1.如果我需要开一个一分钟的窗口,但是我希望能够每5分钟输出一次结果(输出每分钟的聚合结果,即输出5行聚合结果),该怎么办? > 2.如果我需要开一个一个小时的窗口,但

Re: 动态处理字段动态sink

2020-04-02 文章 Jun Zhang
hi: 不知道广播流能否满足你的需求 出发 <573693...@qq.com> 于2020年4月2日周四 下午4:25写道: > > 老铁们,消费kafka一些数据时候,根据规则,将满足条件的一些字段生成新字段,插入到es或者pg等里面,其中映射字段是动态的,插入字段也是动态的,结合drools来做比较好吗。

请教一个flink CEP的问题

2020-05-26 文章 Jun Zhang
大家好:     请教一个flink cep的问题,我想做一个简单的报警,比如连续三次大于5就报警,连续三次小于等于5就算报警恢复。      示例程序如下: DataStream

关于注册定时器的一些疑问

2020-06-28 文章 Jun Zhang
大家好: 官网的解释中,注册定时器只能是keyed stream,我使用BroadcastConnectedStream 接一个KeyedBroadcastProcessFunction函数发现也能注册定时器,我测试了一下,只限于使用processtime的时候,如果使用的是eventtime就不好使了,请问这个是什么原因呢?谢谢。

Re: Re: flink 1.11 作业执行异常

2020-07-07 文章 Jun Zhang
hi.sunfulin 你有没有导入blink的planner呢,加入这个试试 org.apache.flink flink-table-planner-blink_${scala.binary.version} ${flink.version} sunfulin 于2020年7月7日周二 下午3:21写道: > > > > hi, jark > 我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink > configuration里的DeployOptions.TARGET > (exec

flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-09 文章 Jun Zhang
大家好: 我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢 我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。 public static void main(String[] args) throws Exception{ StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnv

Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-09 文章 Jun Zhang
hi,大家好 对于json schema的问题,我想问一个其他的问题, 比如我要做一个实时报警系统,需要消费kafka的json数据来进行实时报警,我的想法是对于每一个报警都生成一个flink任务,主要报警逻辑翻译成一个flink sql。 其中kafka里面的json数据,每一个字段都是可以生成报警条件的,比如有一个json格式的header字段,这个字段里面的内容是不固定的, 某一个用户想用header.aaa字段,另一个用户想用header.bbb字段,比如每分钟header.aaa的count值大于100就报警。 这种情况下,我该如何定义我的schema呢?大家有没有什么想法,谢

Re: flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-10 文章 Jun Zhang
。 Jingsong Li 于2020年7月10日周五 下午2:54写道: > Hi, > > 默认情况下,对ORC来说,理论上一旦有正式数据文件的生成,就会有对应SUCCESS文件产生,你是怎么确认没有SUCCESS文件的呢? > 我用同样SQL在我的环境是有的。 > > Best, > Jingsong > > On Fri, Jul 10, 2020 at 9:07 AM Jun Zhang > wrote: > > > 大家好: > > 我在用flink 1.11 的sql

Re: flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-10 文章 Jun Zhang
此外,如果设置并行度是大于1,虽然可以生成success文件,但是貌似不是第一次checkpoint结束的时候就生成了,我反复测试之后,好像是不固定的时间,比如可能是第5次,也可能是第10次checkpoint之后才生成的。 Jingsong Li 于2020年7月10日周五 下午3:35写道: > Hi, Jun, > > 非常感谢详细充分的测试~ > > 接下来我复现排查下~ > > Best, > Jingsong > > On Fri, Jul 10, 2020 at 3:09 PM Jun Zh

Re: Re: Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-14 文章 Jun Zhang
hi,Zhou Zach : 问一下,你把你的程序,并行度设置成 1,还能正常读取hive的数据吗? Zhou Zach 于2020年7月13日周一 下午8:17写道: > 好的,感谢答疑 > > > > > > > > > > > > > > > > > > 在 2020-07-13 19:49:10,"Jingsong Li" 写道: > >创建kafka_table需要在default dialect下。 > > > >不管什么dialect,都会保存到hive metastore中 (除非使用temporary table的语法) > > > >Best, > >Jingso

Re: flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-22 文章 Jun Zhang
hi,夏帅: 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。 你测试没有问题的情况并行度是 1 吗?写入hdfs? 夏帅 于2020年7月10日周五 下午5:39写道: > 你好, > 我这边同样的代码,并没有出现类似的问题 > 是本地跑么,可以提供下日志信息么? > >

Re: flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-22 文章 Jun Zhang
hi,jinsong: 这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。 Jun Zhang 于2020年7月23日周四 上午11:15写道: > hi,夏帅: > > 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。 > > 你测试没有问题的情况并行度是 1 吗?写入hdfs? > >

Re: flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-22 文章 Jun Zhang
hi,jinsong: 这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。 Jun Zhang 于2020年7月23日周四 上午11:34写道: > hi,jinsong: > > 这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。 > > Jun Zhang 于2020年7月23日周四 上午11:1

Re: flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-22 文章 Jun Zhang
hi,jinsong 我们生产环境hdfs是cdh 2.6的,我换了一个hadoop 3 版本的hdfs,还真没问题了,不知道是哪里出问题了。 Jingsong Li 于2020年7月23日周四 上午11:45写道: > 相同操作我也没有复现。。是可以成功执行的 > > 你的HDFS是什么版本?是否可以考虑换个来测试下 > > On Thu, Jul 23, 2020 at 11:34 AM Jun Zhang > wrote: > >> hi,jinsong: >> >> 这个问题不知道你后来有没有

application模式提交操作hive的任务相关疑问

2020-07-23 文章 Jun Zhang
大家好: 我现在有一个flink的程序要读写hive的数据,在程序中构造HiveCatalog的时候需要有一个hiveConfDir,如果我使用的是新的application模式去提交任务,这个任务的解析应该是放到了master端,这个时候hadoop集群上没有hive的相关配置,那么这个hiveConfDir该怎么配置呢? 谢谢

回复:application模式提交操作hive的任务相关疑问

2020-07-23 文章 Jun Zhang
我现在是改了源码,是把hivecatalog里面接收HiveConf参数的protected类型的构造方法改成了public类型,然后自己在代码里构造了HiveConf对象,传了一些必要的参数,比如metastore地址等。 Best  Jun -- 原始邮件 -- 发件人: Rui Li

Re: application模式提交操作hive的任务相关疑问

2020-07-23 文章 Jun Zhang
hi,Yang Wang: *谢谢你的建议,稍后我测试一下。* Yang Wang 于2020年7月24日周五 上午10:09写道: > > 可以使用-Dyarn.ship-directories=/path/of/hiveConfDir把hive的配置ship到JobManager端,hiveConfDir默认会在 > 当前目录下,同时这个目录也会自动加入到classpath,不太清楚这样是否可以让hive正常加载到 > > > Best, > Yang > > Jun Zhang 于2020年7月23日周四 下午3:51写道

[DISCUSS] add a '--filename' parameter for sql-client

2020-07-28 文章 Jun Zhang
I want to execute some flink sql batch jobs regularly, such as 'insert into select .', but I can't find a suitable method so far, so reference hive, I changed the source code and add a '--filename' parameter so that we can execute a sql file. like this: /home/flink/bin/sql-client.sh embed

Re: [DISCUSS] add a '--filename' parameter for sql-client

2020-07-28 文章 Jun Zhang
7月29日周三 上午9:56写道: > hi Jun, > > Currently, sql client has supported -u option, just like: > ./bin/sql-client.sh embedded -u "insert_statement". > > There is already a JIRA [1] that wants to support -f option > > [1] https://issues.apache.org/jira/browse/FLINK-1282

Re: 请教:用flink实现实时告警的功能

2020-08-05 文章 Jun Zhang
可以使用广播,我自己写过一个文章,给你参考下,你可以把source换成每隔几秒钟去读mysql的配置 https://blog.csdn.net/zhangjun5965/article/details/106573528 samuel@ubtrobot.com 于2020年8月6日周四 上午10:26写道: > 由于需要实时告警功能,经调研,用flink 来实现是比较合适,但有几个问题没搞清楚,请大神们指教,感谢! > > 告警有分两部分: >一是告警规则的设置,数据存放在mysql,存储的格式是json > {"times":5} ---就是事件发生大于

回复:flink1.9.3 on yarn 提交任务问题

2020-09-09 文章 Jun Zhang
这个应该是和你配置的HA有关,你去掉HA配置试试,或者检查一下HA配置 Best  Jun -- 原始邮件 -- 发件人: 宁吉浩

回复:flinksql接收到字段值格式为2020-09-23T20:58:24+08:00,如何转成TIMESTAMP

2020-09-24 文章 Jun Zhang
Hi  jack 如果我手动减去八小时,那么是不是使用eventtime落地的时候,就成了utc时区的值,比如现在是北京时间10点,那么我落地的时间将会是两点,对于使用东八区的人来说,会产生误解。 Best  Jun -- 原始邮件 -- 发件人: Jark Wu

回复:flinksql接收到字段值格式为2020-09-23T20:58:24+08:00,如何转成TIMESTAMP

2020-09-24 文章 Jun Zhang
这个水印比正常北京时间多八小时的问题,我是在1.11刚发布的时候测试发现的,我看了看源码,我的理解是把从sql里获取的东八区的时间,当成了utc时间来处理,所以会比北京时间多八小时。 Best  Jun -- 原始邮件 -- 发件人: Joker

回复:flink1.11实时写入hive,写入速度很慢,checkpoint为60秒,并行度为1

2020-09-29 文章 Jun Zhang
你的kafka的分区数是多少,把flink的并行度加大到kafka的分区数。 Best  Jun -- 原始邮件 -- 发件人: me

关于CatalogPartitionSpec类的一些想法

2020-11-19 文章 Jun Zhang
大家好:      我现在在使用flink中Catalog的查询分区的方法:Catalog#listPartitions(ObjectPath, CatalogPartitionSpec)的时候遇到一个问题。     我发现CatalogPartitionSpec中存放分区信息的字partitionSpec类型是Map

关于CatalogPartitionSpec类的一些想法

2020-11-19 文章 Jun Zhang
大家好: 我现在在使用flink中Catalog的查询分区的方法:Catalog#listPartitions(ObjectPath, CatalogPartitionSpec)的时候遇到一个问题。 我发现CatalogPartitionSpec中存放分区信息的字partitionSpec类型是Map类型是否会更加合理和通用呢? 谢谢

回复:flink 1.9.2 容器ha部署是jobid 一直是00000000000000000000000000000000

2020-12-09 文章 Jun Zhang
https://issues.apache.org/jira/browse/FLINK-19358 在2020年12月10日 09:32,Jeff

Re: flink 1.9.2 容器ha部署是jobid 一直是00000000000000000000000000000000

2020-12-09 文章 Jun Zhang
https://issues.apache.org/jira/browse/FLINK-19358 Jun Zhang <825875...@qq.com> 于2020年12月10日周四 上午10:00写道: > https://issues.apache.org/jira/browse/FLINK-19358 > > > > > > > 在2020年12月10日 09:32,Jeff > > > > 这个问题我也遇到了,请问后来怎么解决的呢? 更换成flink1.11.2都不行! >

Some questions about limit push down

2020-12-28 文章 Jun Zhang
when I query hive table by sql, like this `select * from hivetable where id = 1 limit 1`, I found that the limit push down is invalid, is it a bug or was it designed like this? if the sql is 'select * from hivetable limit 1' ,it is ok thanks

请问ColumnVector类为何没map类型的实现类

2021-05-06 文章 Jun Zhang
大家好: 请问flink中org.apache.flink.table.data.vector.ColumnVector这个类的子类为什么没map类型的实现类呢?是什么原因呢?谢谢