请问ColumnVector类为何没map类型的实现类

2021-05-06 文章 Jun Zhang
大家好:
请问flink中org.apache.flink.table.data.vector.ColumnVector这个类的子类为什么没map类型的实现类呢?是什么原因呢?谢谢


Some questions about limit push down

2020-12-28 文章 Jun Zhang
when I query hive table by sql, like this `select * from hivetable where id
= 1 limit 1`,   I found that the limit push down is invalid, is it a bug or
was it designed like this?

if the sql is  'select * from hivetable  limit 1'  ,it is ok

thanks


Re: flink 1.9.2 容器ha部署是jobid 一直是00000000000000000000000000000000

2020-12-09 文章 Jun Zhang
https://issues.apache.org/jira/browse/FLINK-19358

Jun Zhang <825875...@qq.com> 于2020年12月10日周四 上午10:00写道:

> https://issues.apache.org/jira/browse/FLINK-19358
>
>
>
>
>
>
>  在2020年12月10日 09:32,Jeff
>
>
>
> 这个问题我也遇到了,请问后来怎么解决的呢? 更换成flink1.11.2都不行!
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-03-24 07:14:07,"Peihui He"  大家好,我在用flink
> 1.9.2
> 部署到容器的时候如果不启动ha的情况下jobid是正常的,但是启动了就变成了
> 这样的话,checkpoint的地址和ha的文件地址都一样了,导致checkpoint总是失败。
> 不知道这是什么原因呢?


回复:flink 1.9.2 容器ha部署是jobid 一直是00000000000000000000000000000000

2020-12-09 文章 Jun Zhang
https://issues.apache.org/jira/browse/FLINK-19358
   
  

 
 
 
 在2020年12月10日 09:32,Jeff

关于CatalogPartitionSpec类的一些想法

2020-11-19 文章 Jun Zhang
大家好:
我现在在使用flink中Catalog的查询分区的方法:Catalog#listPartitions(ObjectPath,
CatalogPartitionSpec)的时候遇到一个问题。
   我发现CatalogPartitionSpec中存放分区信息的字partitionSpec类型是Map类型是否会更加合理和通用呢?

 谢谢


关于CatalogPartitionSpec类的一些想法

2020-11-19 文章 Jun Zhang
大家好:
  
我现在在使用flink中Catalog的查询分区的方法:Catalog#listPartitions(ObjectPath, 
CatalogPartitionSpec)的时候遇到一个问题。
  我发现CatalogPartitionSpec中存放分区信息的字partitionSpec类型是Map

回复:flink1.11实时写入hive,写入速度很慢,checkpoint为60秒,并行度为1

2020-09-29 文章 Jun Zhang
你的kafka的分区数是多少,把flink的并行度加大到kafka的分区数。



BestJun


-- 原始邮件 --
发件人: me 

回复:flinksql接收到字段值格式为2020-09-23T20:58:24+08:00,如何转成TIMESTAMP

2020-09-24 文章 Jun Zhang
这个水印比正常北京时间多八小时的问题,我是在1.11刚发布的时候测试发现的,我看了看源码,我的理解是把从sql里获取的东八区的时间,当成了utc时间来处理,所以会比北京时间多八小时。



BestJun


-- 原始邮件 --
发件人: Joker 

回复:flinksql接收到字段值格式为2020-09-23T20:58:24+08:00,如何转成TIMESTAMP

2020-09-24 文章 Jun Zhang
Hi jack
如果我手动减去八小时,那么是不是使用eventtime落地的时候,就成了utc时区的值,比如现在是北京时间10点,那么我落地的时间将会是两点,对于使用东八区的人来说,会产生误解。



BestJun


-- 原始邮件 --
发件人: Jark Wu 

回复:flink1.9.3 on yarn 提交任务问题

2020-09-09 文章 Jun Zhang
这个应该是和你配置的HA有关,你去掉HA配置试试,或者检查一下HA配置



BestJun


-- 原始邮件 --
发件人: 宁吉浩 

Re: 请教:用flink实现实时告警的功能

2020-08-05 文章 Jun Zhang
可以使用广播,我自己写过一个文章,给你参考下,你可以把source换成每隔几秒钟去读mysql的配置

https://blog.csdn.net/zhangjun5965/article/details/106573528

samuel@ubtrobot.com  于2020年8月6日周四 上午10:26写道:

> 由于需要实时告警功能,经调研,用flink 来实现是比较合适,但有几个问题没搞清楚,请大神们指教,感谢!
>
> 告警有分两部分:
>一是告警规则的设置,数据存放在mysql,存储的格式是json
> {"times":5}  ---就是事件发生大于5次就发出告警;
> {"temperature": 80} ---就是温度大于80就告警;
>二是告警实现
>   1)上报的数据写入到kafka
>   2)flink读取kafka的数据,然后通过翻滚窗口进行计算,如果满足规则就生产告警。
>
>
> 现在遇到的问题是:
> 1. 当规则变更时,如何及时生效?
> 2.如果用flink CEP来是实现,对于同一数据源,能否在一个作业里让多个规则同时生效?
> 3.这一功能有最佳实践吗?
>
> 希望哪位解答一下,谢谢!
>
>
>
>


Re: [DISCUSS] add a '--filename' parameter for sql-client

2020-07-28 文章 Jun Zhang
hi,godfrey:
Thanks for your reply

1. I have seen the -u parameter, but my sql file may not only include
'insert into select ', but also SET, DDL, etc.

2. I may not have noticed this issue. I took a look at this issue. I think
this issue may have some problems. For example, he finally called the
CliClient.callCommand method.
But I think that many options in callCommand are not completely suitable
for sql files, such as HELP, CLEAR, SELECT, etc. The select operation opens
a window to display the results, obviously this is not suitable for
executing sql files

godfrey he  于2020年7月29日周三 上午9:56写道:

> hi Jun,
>
> Currently, sql client has supported -u option, just like:
>  ./bin/sql-client.sh embedded -u "insert_statement".
>
> There is already a JIRA [1] that wants to support -f option
>
> [1] https://issues.apache.org/jira/browse/FLINK-12828
>
> Best,
> Godfrey
>
> Jun Zhang  于2020年7月29日周三 上午9:22写道:
>
>> I want to execute some flink sql batch jobs regularly, such as 'insert
>> into
>> select .', but I can't find a suitable method so far, so reference
>>  hive, I changed the source code and add a  '--filename'  parameter  so
>> that we can execute a sql file.
>>
>> like this:
>>
>> /home/flink/bin/sql-client.sh embedded -f flink.sql
>>
>> what about any ideas or plans for this feature community?
>>
>


[DISCUSS] add a '--filename' parameter for sql-client

2020-07-28 文章 Jun Zhang
I want to execute some flink sql batch jobs regularly, such as 'insert into
select .', but I can't find a suitable method so far, so reference
 hive, I changed the source code and add a  '--filename'  parameter  so
that we can execute a sql file.

like this:

/home/flink/bin/sql-client.sh embedded -f flink.sql

what about any ideas or plans for this feature community?


Re: application模式提交操作hive的任务相关疑问

2020-07-23 文章 Jun Zhang
hi,Yang Wang:
*谢谢你的建议,稍后我测试一下。*

Yang Wang  于2020年7月24日周五 上午10:09写道:

>
> 可以使用-Dyarn.ship-directories=/path/of/hiveConfDir把hive的配置ship到JobManager端,hiveConfDir默认会在
> 当前目录下,同时这个目录也会自动加入到classpath,不太清楚这样是否可以让hive正常加载到
>
>
> Best,
> Yang
>
> Jun Zhang  于2020年7月23日周四 下午3:51写道:
>
> > 大家好:
> >
> >
> 我现在有一个flink的程序要读写hive的数据,在程序中构造HiveCatalog的时候需要有一个hiveConfDir,如果我使用的是新的application模式去提交任务,这个任务的解析应该是放到了master端,这个时候hadoop集群上没有hive的相关配置,那么这个hiveConfDir该怎么配置呢?
> >
> > 谢谢
> >
>


回复:application模式提交操作hive的任务相关疑问

2020-07-23 文章 Jun Zhang
我现在是改了源码,是把hivecatalog里面接收HiveConf参数的protected类型的构造方法改成了public类型,然后自己在代码里构造了HiveConf对象,传了一些必要的参数,比如metastore地址等。



BestJun


-- 原始邮件 --
发件人: Rui Li 

application模式提交操作hive的任务相关疑问

2020-07-23 文章 Jun Zhang
大家好:
我现在有一个flink的程序要读写hive的数据,在程序中构造HiveCatalog的时候需要有一个hiveConfDir,如果我使用的是新的application模式去提交任务,这个任务的解析应该是放到了master端,这个时候hadoop集群上没有hive的相关配置,那么这个hiveConfDir该怎么配置呢?

谢谢


Re: flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-22 文章 Jun Zhang
hi,jinsong
我们生产环境hdfs是cdh 2.6的,我换了一个hadoop 3 版本的hdfs,还真没问题了,不知道是哪里出问题了。

Jingsong Li  于2020年7月23日周四 上午11:45写道:

> 相同操作我也没有复现。。是可以成功执行的
>
> 你的HDFS是什么版本?是否可以考虑换个来测试下
>
> On Thu, Jul 23, 2020 at 11:34 AM Jun Zhang 
> wrote:
>
>> hi,jinsong:
>>
>> 这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。
>>
>> Jun Zhang  于2020年7月23日周四 上午11:15写道:
>>
>>> hi,夏帅:
>>>
>>> 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。
>>>
>>> 你测试没有问题的情况并行度是 1 吗?写入hdfs?
>>>
>>> 夏帅  于2020年7月10日周五 下午5:39写道:
>>>
>>>> 你好,
>>>> 我这边同样的代码,并没有出现类似的问题
>>>> 是本地跑么,可以提供下日志信息么?
>>>>
>>>>
>
> --
> Best, Jingsong Lee
>


Re: flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-22 文章 Jun Zhang
hi,jinsong:
这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。

Jun Zhang  于2020年7月23日周四 上午11:34写道:

> hi,jinsong:
>
> 这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。
>
> Jun Zhang  于2020年7月23日周四 上午11:15写道:
>
>> hi,夏帅:
>>
>> 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。
>>
>> 你测试没有问题的情况并行度是 1 吗?写入hdfs?
>>
>> 夏帅  于2020年7月10日周五 下午5:39写道:
>>
>>> 你好,
>>> 我这边同样的代码,并没有出现类似的问题
>>> 是本地跑么,可以提供下日志信息么?
>>>
>>>


Re: flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-22 文章 Jun Zhang
hi,jinsong:
这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。

Jun Zhang  于2020年7月23日周四 上午11:15写道:

> hi,夏帅:
>
> 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。
>
> 你测试没有问题的情况并行度是 1 吗?写入hdfs?
>
> 夏帅  于2020年7月10日周五 下午5:39写道:
>
>> 你好,
>> 我这边同样的代码,并没有出现类似的问题
>> 是本地跑么,可以提供下日志信息么?
>>
>>


Re: flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-22 文章 Jun Zhang
hi,夏帅:
抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。

你测试没有问题的情况并行度是 1 吗?写入hdfs?

夏帅  于2020年7月10日周五 下午5:39写道:

> 你好,
> 我这边同样的代码,并没有出现类似的问题
> 是本地跑么,可以提供下日志信息么?
>
>


Re: Re: Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.

2020-07-14 文章 Jun Zhang
hi,Zhou Zach :
问一下,你把你的程序,并行度设置成 1,还能正常读取hive的数据吗?

Zhou Zach  于2020年7月13日周一 下午8:17写道:

> 好的,感谢答疑
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-13 19:49:10,"Jingsong Li"  写道:
> >创建kafka_table需要在default dialect下。
> >
> >不管什么dialect,都会保存到hive metastore中 (除非使用temporary table的语法)
> >
> >Best,
> >Jingsong
> >
> >On Mon, Jul 13, 2020 at 7:46 PM Zhou Zach  wrote:
> >
> >> 创建kafka_table的时候,是default Dialect,改成HiveCatalog时,WATERMARK 和with语法都不支持了,
> >> 如果是default Dialect创建的表,是不是只是在临时会话有效
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-13 19:27:44,"Jingsong Li"  写道:
> >> >Hi,
> >> >
> >> >问题一:
> >> >
> >> >只要current catalog是HiveCatalog。
> >> >理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS.
> >> >
> >> >明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗?
> >> >
> >> >问题二:
> >> >
> >> >用filesystem创建出来的是filesystem的表,它和hive
> >> >metastore是没有关系的,你需要使用创建filesystem表的语法[1]。
> >> >
> >> >filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。
> >> >但是它的partition commit是不支持metastore的,所以不会有自动add
> >> >partition到hive的默认实现,你需要自定义partition-commit-policy.
> >> >
> >> >[1]
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html
> >> >
> >> >Best,
> >> >Jingsong
> >> >
> >> >On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach  wrote:
> >> >
> >> >> 尴尬
> >> >> 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li,  @夏帅
> >> >> 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢!
> >> >> 还有两个问题问下,
> >> >> 问题1:
> >> >> 创建的kafka_table,在hive和Flink
> >> >>
> >>
> SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 问题2:
> >> >> 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector
> 也是可以创建hive表,我尝试了一下,报错了:
> >> >> java.util.concurrent.CompletionException:
> >> >>
> >>
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> >> >> Could not execute application.
> >> >> at
> >> >>
> >>
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> >> >> ~[?:1.8.0_161]
> >> >> at
> >> >>
> >>
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> >> >> ~[?:1.8.0_161]
> >> >> at
> >> >>
> >>
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
> >> >> ~[?:1.8.0_161]
> >> >> at
> >> >>
> >>
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
> >> >> ~[?:1.8.0_161]
> >> >> at
> >> >>
> >>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> >> >> ~[?:1.8.0_161]
> >> >> at
> >> >>
> >>
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> >> >> ~[?:1.8.0_161]
> >> >> at
> >> >>
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245)
> >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> >> at
> >> >>
> >>
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199)
> >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0]
> >> >> at
> >> >>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >> >> [?:1.8.0_161]
> >> >> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >> >> [?:1.8.0_161]
> >> >> at
> >> >>
> >>
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
> >> >> [qile-data-flow-1.0.jar:?]
> >> >> at
> akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> >> >> [qile-data-flow-1.0.jar:?]
> >> >> at
> >> >>
> >>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> >> >> [qile-data-flow-1.0.jar:?]
> >> >> at
> >> >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> >> [qile-data-flow-1.0.jar:?]
> >> >> at
> >> >>
> >>
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >> >> [qile-data-flow-1.0.jar:?]
> >> >> at
> >> >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> >> [qile-data-flow-1.0.jar:?]
> >> >> at
> >> >>
> >>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >> >> [qile-data-flow-1.0.jar:?]
> >> >> Caused by:
> >> >>
> >>
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> >> >> Could not execute application.
> >> >> ... 11 more
> >> >> Caused by:
> org.apache.flink.client.program.ProgramInvocationException:
> >> The
> >> >> main method caused an error: Unable to create a sink for writing
> table
> >> >> 

Re: flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-10 文章 Jun Zhang
此外,如果设置并行度是大于1,虽然可以生成success文件,但是貌似不是第一次checkpoint结束的时候就生成了,我反复测试之后,好像是不固定的时间,比如可能是第5次,也可能是第10次checkpoint之后才生成的。

Jingsong Li  于2020年7月10日周五 下午3:35写道:

> Hi, Jun,
>
> 非常感谢详细充分的测试~
>
> 接下来我复现排查下~
>
> Best,
> Jingsong
>
> On Fri, Jul 10, 2020 at 3:09 PM Jun Zhang 
> wrote:
>
> > hi,jinsong:
> >
> > 在我的测试环境下,对于我贴出来的那个代码。
> >
> >
> 1.如果source使用的有界的数据,比如bsEnv.fromElements(...),这样会有success文件生成,如果是kafka数据,就不行。
> > 2.如果设置程序的并行度是大于1,那么也会有success生成。
> > 3.如果我写入的是local file,比如 file:///tmp/aaa ,而不是hdfs,也会有success文件生成。
> >
> >
> >
> 综上,在并行度设置为1,消费的是kafka的永不停止的数据,写入的是hdfs,我的checkpoint设置是10s,这种情况下,我测试了好多遍,都没有success文件生成。
> >
> > Jingsong Li  于2020年7月10日周五 下午2:54写道:
> >
> > > Hi,
> > >
> > > 默认情况下,对ORC来说,理论上一旦有正式数据文件的生成,就会有对应SUCCESS文件产生,你是怎么确认没有SUCCESS文件的呢?
> > > 我用同样SQL在我的环境是有的。
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Fri, Jul 10, 2020 at 9:07 AM Jun Zhang 
> > > wrote:
> > >
> > > > 大家好:
> > > >  我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢
> > > >
> > > >
> > > >
> > >
> >
> 我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。
> > > >
> > > > public static void main(String[] args) throws Exception{
> > > > StreamExecutionEnvironment bsEnv =
> > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > > bsEnv.enableCheckpointing(1);
> > > > bsEnv.setParallelism(1);
> > > > StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
> > > >
> > > > String sqlSource = "CREATE TABLE  source_kafka (\n" +
> > > >   "appName  STRING,\n" +
> > > >   "appVersion STRING,\n" +
> > > >   "uploadTime STRING\n" +
> > > >   ") WITH (\n" +
> > > >   "  'connector.type' = 'kafka',   \n" +
> > > >   "  'connector.version' = '0.10',\n" +
> > > >   "  'connector.topic' = 'mytest',\n" +
> > > >   "  'connector.properties.zookeeper.connect' =
> > > > 'localhost:2181',\n" +
> > > >   "  'connector.properties.bootstrap.servers' =
> > > > 'localhost:9092',\n" +
> > > >   "  'connector.properties.group.id' =
> > 'testGroup',\n" +
> > > >   "  'format.type'='json',\n" +
> > > >   "  'update-mode' = 'append' )";
> > > >
> > > > tEnv.executeSql(sqlSource);
> > > >
> > > > String sql = "CREATE TABLE fs_table (\n" +
> > > > "appName  STRING,\n" +
> > > > "appVersion STRING,\n" +
> > > > "uploadTime STRING,\n" +
> > > > "  dt STRING," +
> > > > "  h string" +
> > > > ")  PARTITIONED BY (dt,h)  WITH (\n" +
> > > > "  'connector'='filesystem',\n" +
> > > >  "  'path'='hdfs://localhost/tmp/',\n" +
> > > >  " 'sink.partition-commit.policy.kind' =
> > > > 'success-file', " +
> > > >  "  'format'='orc'\n" +
> > > >  ")";
> > > > tEnv.executeSql(sql);
> > > >
> > > > String insertSql = "insert into  fs_table SELECT appName
> > > > ,appVersion,uploadTime, " +
> > > >   " DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd'),
> > > > DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka";
> > > >
> > > > tEnv.executeSql(insertSql);
> > > >
> > > > }
> > > >
> > >
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
>
>
> --
> Best, Jingsong Lee
>


Re: flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-10 文章 Jun Zhang
hi,jinsong:

在我的测试环境下,对于我贴出来的那个代码。
1.如果source使用的有界的数据,比如bsEnv.fromElements(...),这样会有success文件生成,如果是kafka数据,就不行。
2.如果设置程序的并行度是大于1,那么也会有success生成。
3.如果我写入的是local file,比如 file:///tmp/aaa ,而不是hdfs,也会有success文件生成。

综上,在并行度设置为1,消费的是kafka的永不停止的数据,写入的是hdfs,我的checkpoint设置是10s,这种情况下,我测试了好多遍,都没有success文件生成。

Jingsong Li  于2020年7月10日周五 下午2:54写道:

> Hi,
>
> 默认情况下,对ORC来说,理论上一旦有正式数据文件的生成,就会有对应SUCCESS文件产生,你是怎么确认没有SUCCESS文件的呢?
> 我用同样SQL在我的环境是有的。
>
> Best,
> Jingsong
>
> On Fri, Jul 10, 2020 at 9:07 AM Jun Zhang 
> wrote:
>
> > 大家好:
> >  我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢
> >
> >
> >
> 我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。
> >
> > public static void main(String[] args) throws Exception{
> > StreamExecutionEnvironment bsEnv =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> > bsEnv.enableCheckpointing(1);
> > bsEnv.setParallelism(1);
> > StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
> >
> > String sqlSource = "CREATE TABLE  source_kafka (\n" +
> >   "appName  STRING,\n" +
> >   "appVersion STRING,\n" +
> >   "uploadTime STRING\n" +
> >   ") WITH (\n" +
> >   "  'connector.type' = 'kafka',   \n" +
> >   "  'connector.version' = '0.10',\n" +
> >   "  'connector.topic' = 'mytest',\n" +
> >   "  'connector.properties.zookeeper.connect' =
> > 'localhost:2181',\n" +
> >   "  'connector.properties.bootstrap.servers' =
> > 'localhost:9092',\n" +
> >   "  'connector.properties.group.id' = 'testGroup',\n" +
> >   "  'format.type'='json',\n" +
> >   "  'update-mode' = 'append' )";
> >
> > tEnv.executeSql(sqlSource);
> >
> > String sql = "CREATE TABLE fs_table (\n" +
> > "appName  STRING,\n" +
> > "appVersion STRING,\n" +
> > "uploadTime STRING,\n" +
> > "  dt STRING," +
> > "  h string" +
> > ")  PARTITIONED BY (dt,h)  WITH (\n" +
> > "  'connector'='filesystem',\n" +
> >  "  'path'='hdfs://localhost/tmp/',\n" +
> >  " 'sink.partition-commit.policy.kind' =
> > 'success-file', " +
> >  "  'format'='orc'\n" +
> >  ")";
> > tEnv.executeSql(sql);
> >
> > String insertSql = "insert into  fs_table SELECT appName
> > ,appVersion,uploadTime, " +
> >   " DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd'),
> > DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka";
> >
> > tEnv.executeSql(insertSql);
> >
> > }
> >
>
>
> --
> Best, Jingsong Lee
>


Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-10 文章 Jun Zhang
hi,大家好
对于json schema的问题,我想问一个其他的问题,
比如我要做一个实时报警系统,需要消费kafka的json数据来进行实时报警,我的想法是对于每一个报警都生成一个flink任务,主要报警逻辑翻译成一个flink
sql。

其中kafka里面的json数据,每一个字段都是可以生成报警条件的,比如有一个json格式的header字段,这个字段里面的内容是不固定的,
某一个用户想用header.aaa字段,另一个用户想用header.bbb字段,比如每分钟header.aaa的count值大于100就报警。

这种情况下,我该如何定义我的schema呢?大家有没有什么想法,谢谢。

Benchao Li  于2020年7月10日周五 下午1:54写道:

> Hi Peihui,
>
> 正如Jark所说,FLINK-18002正是想解决这个问题,可以指定任意一个JsonNode为varchar类型。
>
> 当然,这个feature不能解决所有问题,比如你有一个字段,内容不太确定,而且也不需要额外处理,
> 主要是想保留这个字段,下游输出json的时候仍然还是这个字段。
> 如果用FLINK-18002的思路,输出到下游的时候,会把这部分数据整体作为一个json string,所以
> 从结果上来看,*还不能完全做到原封不动的输出到下游*。
>
> 不知道后面这个场景是不是你面对的场景。如果是的话,我们目前有两个思路解决这个问题:
> 1. 用RAW类型,这个需要json node类型对于flink来讲,都是可以序列化的
> 2. 用BINARY类型,因为现在已经有了对BINARY类型的处理,所以还需要额外加一个选项来指定对于BINARY类型
>   的处理模式。我们可以把任意json node转成它的json字符串表达形式,再转成byte[]进行中间的传输和处理;在
>   序列化的时候,再直接通过这个byte[]数据构造一个json node(这样可以保证它跟原来的json node一模一样)。
>
> Jark Wu  于2020年7月10日周五 下午12:22写道:
>
> > 社区有个 issue 正在解决这个问题,可以关注一下
> > https://issues.apache.org/jira/browse/FLINK-18002
> >
> > Best,
> > Jark
> >
> > On Fri, 10 Jul 2020 at 11:13, Leonard Xu  wrote:
> >
> > > Hi, Peihui
> > >
> > > 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format
> > 的解析的底层实现
> > > 就是按照json的标准格式解析(jackson)的,没法将一个
> > > jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明,
> > > 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。
> > >
> > > 一种做法是定义复杂的jsonObject对应的ROW
> > > 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的,
> > > 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String,
> > > 然后query里用UDTF处理。
> > >
> > >
> > > 祝好
> > > Leonard Xu
> > >
> > >
> > >
> > >
> > > > 在 2020年7月10日,10:16,Peihui He  写道:
> > > >
> > > > Hello,
> > > >
> > > >   实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。
> > > >
> > > > Best wishes.
> > > >
> > > > Peihui He  于2020年7月10日周五 上午10:12写道:
> > > >
> > > >> Hello,
> > > >>
> > > >>   明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。
> > > >>
> > > >>
> > > >> Best wishes.
> > > >>
> > > >> LakeShen  于2020年7月10日周五 上午10:03写道:
> > > >>
> > > >>> Hi Peihui,
> > > >>>
> > > >>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式:
> > > >>>
> > > >>> {
> > > >>>"a":"b",
> > > >>>"c":{
> > > >>>"d":"e",
> > > >>>"g":"f"
> > > >>>}
> > > >>> },
> > > >>>
> > > >>> 那么在 kafka table source 可以使用 row 来定义:
> > > >>>
> > > >>> create table xxx (
> > > >>> a varchar,
> > > >>> c row
> > > >>> )
> > > >>>
> > > >>> 如果 还存在嵌套,可以继续再使用 Row 来定义。
> > > >>>
> > > >>> Best,
> > > >>> LakeShen
> > > >>>
> > > >>> Peihui He  于2020年7月10日周五 上午9:12写道:
> > > >>>
> > >  Hello:
> > > 
> > > 在用flink
> > > sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。
> > > 
> > >  有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。
> > > 
> > > 
> > >  Best wishes.
> > > 
> > > >>>
> > > >>
> > >
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>


flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-09 文章 Jun Zhang
大家好:
 我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢

我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。

public static void main(String[] args) throws Exception{
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.enableCheckpointing(1);
bsEnv.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);

String sqlSource = "CREATE TABLE  source_kafka (\n" +
  "appName  STRING,\n" +
  "appVersion STRING,\n" +
  "uploadTime STRING\n" +
  ") WITH (\n" +
  "  'connector.type' = 'kafka',   \n" +
  "  'connector.version' = '0.10',\n" +
  "  'connector.topic' = 'mytest',\n" +
  "  'connector.properties.zookeeper.connect' =
'localhost:2181',\n" +
  "  'connector.properties.bootstrap.servers' =
'localhost:9092',\n" +
  "  'connector.properties.group.id' = 'testGroup',\n" +
  "  'format.type'='json',\n" +
  "  'update-mode' = 'append' )";

tEnv.executeSql(sqlSource);

String sql = "CREATE TABLE fs_table (\n" +
"appName  STRING,\n" +
"appVersion STRING,\n" +
"uploadTime STRING,\n" +
"  dt STRING," +
"  h string" +
")  PARTITIONED BY (dt,h)  WITH (\n" +
"  'connector'='filesystem',\n" +
 "  'path'='hdfs://localhost/tmp/',\n" +
 " 'sink.partition-commit.policy.kind' =
'success-file', " +
 "  'format'='orc'\n" +
 ")";
tEnv.executeSql(sql);

String insertSql = "insert into  fs_table SELECT appName
,appVersion,uploadTime, " +
  " DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd'),
DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka";

tEnv.executeSql(insertSql);

}


Re: Re: flink 1.11 作业执行异常

2020-07-07 文章 Jun Zhang
hi.sunfulin
你有没有导入blink的planner呢,加入这个试试


org.apache.flink
flink-table-planner-blink_${scala.binary.version}
${flink.version}



sunfulin  于2020年7月7日周二 下午3:21写道:

>
>
>
> hi, jark
> 我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink
> configuration里的DeployOptions.TARGET
> (execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。
>
>
> //构建StreamExecutionEnvironment
> public static final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> //构建EnvironmentSettings 并指定Blink Planner
> private static final EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>
> //构建StreamTableEnvironment
> public static final StreamTableEnvironment tEnv =
> StreamTableEnvironment.create(env, bsSettings);
>
>
>
>
>
>tEnv.executeSql(“ddl sql”);
>
>
>
>
> //source注册成表
>
> tEnv.createTemporaryView("test", ds, $("f0").as("id"),
> $("f1").as("first"), $("p").proctime());
>
>
>
>
> //join语句
>
> Table table = tEnv.sqlQuery("select b.* from test a left join
> my_dim FOR SYSTEM_TIME AS OF a.p AS b on a.first = b.userId");
>
>
>
>
> //输出
>
> tEnv.toAppendStream(table, Row.class).print("LookUpJoinJob");
>
>
>
>
> env.execute("LookUpJoinJob");
>
>
>
>
>
>
>
>
> 在 2020-07-06 14:59:17,"Jark Wu"  写道:
> >能分享下复现的作业代码不?
> >
> >Best,
> >Jark
> >
> >On Mon, 6 Jul 2020 at 11:00, sunfulin  wrote:
> >
> >> Hi,
> >> 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常:
> >> org.apache.flink.table.api.TableExecution: Failed to execute sql
> >>
> >>
> >> caused by : java.lang.IlleagalStateException: No ExecutorFactory found
> to
> >> execute the application.
> >>   at
> >>
> org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
> >>
> >>
> >> 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。
>


关于注册定时器的一些疑问

2020-06-28 文章 Jun Zhang
大家好:
 官网的解释中,注册定时器只能是keyed stream,我使用BroadcastConnectedStream
接一个KeyedBroadcastProcessFunction函数发现也能注册定时器,我测试了一下,只限于使用processtime的时候,如果使用的是eventtime就不好使了,请问这个是什么原因呢?谢谢。


请教一个flink CEP的问题

2020-05-26 文章 Jun Zhang
大家好:
  请教一个flink cep的问题,我想做一个简单的报警,比如连续三次大于5就报警,连续三次小于等于5就算报警恢复。
 
 示例程序如下:


 DataStream

Re: 动态处理字段动态sink

2020-04-02 文章 Jun Zhang
hi:
不知道广播流能否满足你的需求

出发 <573693...@qq.com> 于2020年4月2日周四 下午4:25写道:

>
> 老铁们,消费kafka一些数据时候,根据规则,将满足条件的一些字段生成新字段,插入到es或者pg等里面,其中映射字段是动态的,插入字段也是动态的,结合drools来做比较好吗。


Re: flinksql如何控制结果输出的频率

2020-03-26 文章 Jun Zhang
hi:
你可以自定义一个trigger [1]
第二个场景是可以的,第一种场景我没有遇到过这种场景,你可以试试。

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html

flink小猪 <18579099...@163.com> 于2020年3月27日周五 上午11:29写道:

> 我有两个需求
> 1.如果我需要开一个一分钟的窗口,但是我希望能够每5分钟输出一次结果(输出每分钟的聚合结果,即输出5行聚合结果),该怎么办?
> 2.如果我需要开一个一个小时的窗口,但是我希望能够每5分钟输出一次结果(窗口数据还没全部到达,先输出当前已经聚合的结果),该怎么办?


flink动态分区写入hive如何处理数据倾斜的问题

2020-03-25 文章 Jun Zhang
大家好:
有一个类似的sql 拿官网的这个做示例:INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', 
'2019-08-08’;
如果实际上第三个type字段,某一种type数据量特别大,导致了数据倾斜,这种情况一般怎么处理呢?
谢谢。

回复:关于flink sql 1.10 source并行度自动推断的疑问

2020-03-24 文章 Jun Zhang
hi,Chief:
  
目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数,
你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。



BestJun


-- 原始邮件 --
发件人: Kurt Young 

Re: 关于flink sql 1.10 source并行度自动推断的疑问

2020-03-24 文章 Jun Zhang
hi,Chief:

目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数,
你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。

Kurt Young  于2020年3月25日周三 上午8:53写道:

> 你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。
>
> Best,
> Kurt
>
>
> On Tue, Mar 24, 2020 at 10:39 PM Chief  wrote:
>
> > hi all:
> > 之前用flink sql查询hive的数据,hive的数据文件是150个,sql
> > client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web
> > ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗?
>


?????? ????Flink1.10.0????hive??source??????????

2020-03-03 文章 Jun Zhang
??sql??sql??hive??sql??
   
  

 
 
 
 ??2020??03??4?? 13:25??JingsongLeehttps://issues.apache.org/jira/browse/FLINK-16413
FYI


Best,
Jingsong Lee


--
From:JingsongLee 

?????? ????Flink1.10.0????hive??source??????????

2020-03-03 文章 Jun Zhang
hi??jinsong??

 
  ??10??
??sql  select * from mytable limit 1;
hive??mytable??10??10??
 ??2020??03??2?? 16:38??JingsongLee

Re: flink1.10 yarn模式无法提交作业

2020-02-09 文章 Jun Zhang
你看看是不是yarn队列有很多accept状态的任务,如果有很多正在排队的任务,那就是可能超过了flink设置的超时时间还没有来得及给你这个任务分配资源,就会报这个错误。
但是这个不影响任务的提交,等一会任务提交成功了,客户端就会返回提交成功的结果。

Fei Han  于2020年2月9日周日 下午5:35写道:

> @all:
>在Flink1.10中,用yarn模式无法提交作业。
>   提示如下:
> lease check if the requested resources are available in the YARN cluster
> 2020-02-09 17:22:26,318 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>  - Deployment took more than 60 seconds. Please check if
> the requested resources are available in the YARN cluster
> 2020-02-09 17:22:26,570 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>  - Deployment took more than 60 seconds. Please check if
> the requested resources are available in the YARN cluster
> 2020-02-09 17:22:26,822 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>  - Deployment took more than 60 seconds. Please check if
> the requested resources are available in the YARN cluster
> 2020-02-09 17:22:27,074 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>  - Deployment took more than 60 seconds. Please check if
> the requested resources are available in the YARN cluster
> 2020-02-09 17:22:27,326 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>  - Deployment took more than 60 seconds. Please check if
> the requested resources are available in the YARN cluster
> 2020-02-09 17:22:27,578 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>  - Deployment took more than 60 seconds. Please check if
> the requested resources are available in the YARN cluster
> 2020-02-09 17:22:27,831 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>  - Deployment took more than 60 seconds. Please check if
> the requested resources are available in the YARN cluster
> 2020-02-09 17:22:28,083 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>  - Deployment took more than 60 seconds. Please check if
> the requested resources are available in the YARN cluster
> 2020-02-09 17:22:28,336 INFO  org.apache.flink.yarn.YarnClusterDescriptor
>  - Deployment took more than 60 seconds. Please check if
> the requested resources are available in the YARN cluster
>
>
> 另外,我看了yarn资源足够。但就是提交不了。
> 我的作业提交命令如下:
>  ./bin/flink run -m yarn-cluster  -yjm 1024  -ytm 2048
> /opt/flink-1.10/examples/batch/WordCount.jar  --input
> hdfs://192.168.xxx.xxx:9000/test/LICENSE  --output
> hdfs://192.168.xxx.xxx:9000/test/result.txt


??????yarn-cluster ??????????????

2019-12-02 文章 Jun Zhang
jarcom.dora.job.stream.AliLogStreamingJob 
??

 
 
 
 ??2019??12??2?? 13:59??

????????????Flink trigger????????????????????

2019-11-01 文章 Jun Zhang
??Evictor??

BestJun


--  --
??: Qi Kang 

???????????? state ????????????????

2019-10-30 文章 Jun Zhang



??


https://mp.weixin.qq.com/s/VUEvvoHoupZMpxWQsEeHEA
 ??2019??10??31?? 
10:16??wangl...@geekplus.com.cn

Re: How to write stream data to other Hadoop Cluster by StreamingFileSink

2019-10-09 文章 Jun Zhang
Hi,Yang :
thank you very much for your reply.


I had add the configurations on my hadoop cluster client , both hdfs-site.xml 
and core-site.xml are configured, the client can read mycluster1 and mycluter2, 
but when I submit the flink job to yarn cluster , the hadoop client 
configurations is invalid, I read the source code ,it will give priority to the 
configuration of the hadoop cluster.

   
  

 
 
 
 On 10/9/2019 10:57??Yang Wang

How to write stream data to other Hadoop Cluster by StreamingFileSink

2019-10-04 文章 Jun Zhang
Hi,all:


I have 2 hadoop cluster (hdfs://mycluster1 and hdfs://mycluster2),both of them 
configured the HA,
I have a job ,read from streaming data from kafka, and write it to hdfs by 
StreamingFileSink,now I deployed my job on mycluster1 (flink on yarn),and I 
want to write the data to mycluster2 , how did I add the configure ? If I write 
hdfs://mycluster2/tmp/abc  on the path of the StreamingFileSink directly, 
it will report that mycluster2 could not be found.




I look at the source code of 
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory#create. When flink loads 
core-site.xml and hdfs-site.xml, it is first loaded from hadoopConfig, then 
flinkConfig, and finally from classpath. I see flinkConfig does not seem to be 
empty, and the code is loaded by flinkConfig, finally loaded from HADOOP_HOME, 
so the core-site.xml and hdfs-site.xml of mycluster1 cluster will not contain 
the information of mycluster2. Cause mycluster2 not found.




thanks

回复: Split a stream into any number of streams

2019-09-17 文章 Jun Zhang
对DataStream进行keyBy操作是否能解决呢?




--原始邮件--
发件人:"venn"https://stackoverflow.com/questions/53588554/apache-flink-using-filter
 -or-split-to-split-a-stream

 regards.