对DataStream进行keyBy操作是否能解决呢?
-- 原始邮件 --
发件人: "venn"https://stackoverflow.com/questions/53588554/apache-flink-using-filter
> -or-split-to-split-a-stream
>
> regards.
>
>
??
Types.LIST(Types.STRING)
-- --
??: ?? ?? https://go.microsoft.com/fwlink/?LinkId=550986>;
Hi,all:
I have 2 hadoop cluster (hdfs://mycluster1 and hdfs://mycluster2),both of them
configured the HA,
I have a job ,read from streaming data from kafka, and write it to hdfs by
StreamingFileSink,now I deployed my job on mycluster1 (flink on yarn),and I
want to write the data to mycluster2
Hi,Yang :
thank you very much for your reply.
I had add the configurations on my hadoop cluster client , both hdfs-site.xml
and core-site.xml are configured, the client can read mycluster1 and mycluter2,
but when I submit the flink job to yarn cluster , the hadoop client
configurations is inva
??
https://mp.weixin.qq.com/s/VUEvvoHoupZMpxWQsEeHEA
??2019??10??31??
10:16??wangl...@geekplus.com.cn
??Evictor??
Best Jun
-- --
??: Qi Kang
jarcom.dora.job.stream.AliLogStreamingJob
??
??2019??12??2?? 13:59??
你看看是不是yarn队列有很多accept状态的任务,如果有很多正在排队的任务,那就是可能超过了flink设置的超时时间还没有来得及给你这个任务分配资源,就会报这个错误。
但是这个不影响任务的提交,等一会任务提交成功了,客户端就会返回提交成功的结果。
Fei Han 于2020年2月9日周日 下午5:35写道:
> @all:
>在Flink1.10中,用yarn模式无法提交作业。
> 提示如下:
> lease check if the requested resources are available in the YARN cluster
> 2020-02-09 1
hi??jinsong??
??10??
??sql select * from mytable limit 1;
hive??mytable??10??10??
??2020??03??2?? 16:38??JingsongLee
??sql??sql??hive??sql??
??2020??03??4?? 13:25??JingsongLeehttps://issues.apache.org/jira
hi,Chief:
目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数,
你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。
Kurt Young 于2020年3月25日周三 上午8:53写道:
> 你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。
>
> Best,
> Kurt
>
>
> On Tue, Mar 24, 20
hi,Chief:
目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数,
你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。
Best Jun
-- 原始邮件 --
发件人: Kurt Young
大家好:
有一个类似的sql 拿官网的这个做示例:INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1',
'2019-08-08’;
如果实际上第三个type字段,某一种type数据量特别大,导致了数据倾斜,这种情况一般怎么处理呢?
谢谢。
hi:
你可以自定义一个trigger [1]
第二个场景是可以的,第一种场景我没有遇到过这种场景,你可以试试。
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html
flink小猪 <18579099...@163.com> 于2020年3月27日周五 上午11:29写道:
> 我有两个需求
> 1.如果我需要开一个一分钟的窗口,但是我希望能够每5分钟输出一次结果(输出每分钟的聚合结果,即输出5行聚合结果),该怎么办?
> 2.如果我需要开一个一个小时的窗口,但
hi:
不知道广播流能否满足你的需求
出发 <573693...@qq.com> 于2020年4月2日周四 下午4:25写道:
>
> 老铁们,消费kafka一些数据时候,根据规则,将满足条件的一些字段生成新字段,插入到es或者pg等里面,其中映射字段是动态的,插入字段也是动态的,结合drools来做比较好吗。
大家好:
请教一个flink cep的问题,我想做一个简单的报警,比如连续三次大于5就报警,连续三次小于等于5就算报警恢复。
示例程序如下:
DataStream
大家好:
官网的解释中,注册定时器只能是keyed stream,我使用BroadcastConnectedStream
接一个KeyedBroadcastProcessFunction函数发现也能注册定时器,我测试了一下,只限于使用processtime的时候,如果使用的是eventtime就不好使了,请问这个是什么原因呢?谢谢。
hi.sunfulin
你有没有导入blink的planner呢,加入这个试试
org.apache.flink
flink-table-planner-blink_${scala.binary.version}
${flink.version}
sunfulin 于2020年7月7日周二 下午3:21写道:
>
>
>
> hi, jark
> 我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
> configuration里的DeployOptions.TARGET
> (exec
大家好:
我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢
我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnv
hi,大家好
对于json schema的问题,我想问一个其他的问题,
比如我要做一个实时报警系统,需要消费kafka的json数据来进行实时报警,我的想法是对于每一个报警都生成一个flink任务,主要报警逻辑翻译成一个flink
sql。
其中kafka里面的json数据,每一个字段都是可以生成报警条件的,比如有一个json格式的header字段,这个字段里面的内容是不固定的,
某一个用户想用header.aaa字段,另一个用户想用header.bbb字段,比如每分钟header.aaa的count值大于100就报警。
这种情况下,我该如何定义我的schema呢?大家有没有什么想法,谢
。
Jingsong Li 于2020年7月10日周五 下午2:54写道:
> Hi,
>
> 默认情况下,对ORC来说,理论上一旦有正式数据文件的生成,就会有对应SUCCESS文件产生,你是怎么确认没有SUCCESS文件的呢?
> 我用同样SQL在我的环境是有的。
>
> Best,
> Jingsong
>
> On Fri, Jul 10, 2020 at 9:07 AM Jun Zhang
> wrote:
>
> > 大家好:
> > 我在用flink 1.11 的sql
此外,如果设置并行度是大于1,虽然可以生成success文件,但是貌似不是第一次checkpoint结束的时候就生成了,我反复测试之后,好像是不固定的时间,比如可能是第5次,也可能是第10次checkpoint之后才生成的。
Jingsong Li 于2020年7月10日周五 下午3:35写道:
> Hi, Jun,
>
> 非常感谢详细充分的测试~
>
> 接下来我复现排查下~
>
> Best,
> Jingsong
>
> On Fri, Jul 10, 2020 at 3:09 PM Jun Zh
hi,Zhou Zach :
问一下,你把你的程序,并行度设置成 1,还能正常读取hive的数据吗?
Zhou Zach 于2020年7月13日周一 下午8:17写道:
> 好的,感谢答疑
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-13 19:49:10,"Jingsong Li" 写道:
> >创建kafka_table需要在default dialect下。
> >
> >不管什么dialect,都会保存到hive metastore中 (除非使用temporary table的语法)
> >
> >Best,
> >Jingso
hi,夏帅:
抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。
你测试没有问题的情况并行度是 1 吗?写入hdfs?
夏帅 于2020年7月10日周五 下午5:39写道:
> 你好,
> 我这边同样的代码,并没有出现类似的问题
> 是本地跑么,可以提供下日志信息么?
>
>
hi,jinsong:
这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。
Jun Zhang 于2020年7月23日周四 上午11:15写道:
> hi,夏帅:
>
> 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。
>
> 你测试没有问题的情况并行度是 1 吗?写入hdfs?
>
>
hi,jinsong:
这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。
Jun Zhang 于2020年7月23日周四 上午11:34写道:
> hi,jinsong:
>
> 这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。
>
> Jun Zhang 于2020年7月23日周四 上午11:1
hi,jinsong
我们生产环境hdfs是cdh 2.6的,我换了一个hadoop 3 版本的hdfs,还真没问题了,不知道是哪里出问题了。
Jingsong Li 于2020年7月23日周四 上午11:45写道:
> 相同操作我也没有复现。。是可以成功执行的
>
> 你的HDFS是什么版本?是否可以考虑换个来测试下
>
> On Thu, Jul 23, 2020 at 11:34 AM Jun Zhang
> wrote:
>
>> hi,jinsong:
>>
>> 这个问题不知道你后来有没有
大家好:
我现在有一个flink的程序要读写hive的数据,在程序中构造HiveCatalog的时候需要有一个hiveConfDir,如果我使用的是新的application模式去提交任务,这个任务的解析应该是放到了master端,这个时候hadoop集群上没有hive的相关配置,那么这个hiveConfDir该怎么配置呢?
谢谢
我现在是改了源码,是把hivecatalog里面接收HiveConf参数的protected类型的构造方法改成了public类型,然后自己在代码里构造了HiveConf对象,传了一些必要的参数,比如metastore地址等。
Best Jun
-- 原始邮件 --
发件人: Rui Li
hi,Yang Wang:
*谢谢你的建议,稍后我测试一下。*
Yang Wang 于2020年7月24日周五 上午10:09写道:
>
> 可以使用-Dyarn.ship-directories=/path/of/hiveConfDir把hive的配置ship到JobManager端,hiveConfDir默认会在
> 当前目录下,同时这个目录也会自动加入到classpath,不太清楚这样是否可以让hive正常加载到
>
>
> Best,
> Yang
>
> Jun Zhang 于2020年7月23日周四 下午3:51写道
I want to execute some flink sql batch jobs regularly, such as 'insert into
select .', but I can't find a suitable method so far, so reference
hive, I changed the source code and add a '--filename' parameter so
that we can execute a sql file.
like this:
/home/flink/bin/sql-client.sh embed
7月29日周三 上午9:56写道:
> hi Jun,
>
> Currently, sql client has supported -u option, just like:
> ./bin/sql-client.sh embedded -u "insert_statement".
>
> There is already a JIRA [1] that wants to support -f option
>
> [1] https://issues.apache.org/jira/browse/FLINK-1282
可以使用广播,我自己写过一个文章,给你参考下,你可以把source换成每隔几秒钟去读mysql的配置
https://blog.csdn.net/zhangjun5965/article/details/106573528
samuel@ubtrobot.com 于2020年8月6日周四 上午10:26写道:
> 由于需要实时告警功能,经调研,用flink 来实现是比较合适,但有几个问题没搞清楚,请大神们指教,感谢!
>
> 告警有分两部分:
>一是告警规则的设置,数据存放在mysql,存储的格式是json
> {"times":5} ---就是事件发生大于
这个应该是和你配置的HA有关,你去掉HA配置试试,或者检查一下HA配置
Best Jun
-- 原始邮件 --
发件人: 宁吉浩
Hi jack
如果我手动减去八小时,那么是不是使用eventtime落地的时候,就成了utc时区的值,比如现在是北京时间10点,那么我落地的时间将会是两点,对于使用东八区的人来说,会产生误解。
Best Jun
-- 原始邮件 --
发件人: Jark Wu
这个水印比正常北京时间多八小时的问题,我是在1.11刚发布的时候测试发现的,我看了看源码,我的理解是把从sql里获取的东八区的时间,当成了utc时间来处理,所以会比北京时间多八小时。
Best Jun
-- 原始邮件 --
发件人: Joker
你的kafka的分区数是多少,把flink的并行度加大到kafka的分区数。
Best Jun
-- 原始邮件 --
发件人: me
大家好:
我现在在使用flink中Catalog的查询分区的方法:Catalog#listPartitions(ObjectPath,
CatalogPartitionSpec)的时候遇到一个问题。
我发现CatalogPartitionSpec中存放分区信息的字partitionSpec类型是Map
大家好:
我现在在使用flink中Catalog的查询分区的方法:Catalog#listPartitions(ObjectPath,
CatalogPartitionSpec)的时候遇到一个问题。
我发现CatalogPartitionSpec中存放分区信息的字partitionSpec类型是Map类型是否会更加合理和通用呢?
谢谢
https://issues.apache.org/jira/browse/FLINK-19358
在2020年12月10日 09:32,Jeff
https://issues.apache.org/jira/browse/FLINK-19358
Jun Zhang <825875...@qq.com> 于2020年12月10日周四 上午10:00写道:
> https://issues.apache.org/jira/browse/FLINK-19358
>
>
>
>
>
>
> 在2020年12月10日 09:32,Jeff
>
>
>
> 这个问题我也遇到了,请问后来怎么解决的呢? 更换成flink1.11.2都不行!
>
when I query hive table by sql, like this `select * from hivetable where id
= 1 limit 1`, I found that the limit push down is invalid, is it a bug or
was it designed like this?
if the sql is 'select * from hivetable limit 1' ,it is ok
thanks
大家好:
请问flink中org.apache.flink.table.data.vector.ColumnVector这个类的子类为什么没map类型的实现类呢?是什么原因呢?谢谢
43 matches
Mail list logo