请问ColumnVector类为何没map类型的实现类
大家好: 请问flink中org.apache.flink.table.data.vector.ColumnVector这个类的子类为什么没map类型的实现类呢?是什么原因呢?谢谢
Some questions about limit push down
when I query hive table by sql, like this `select * from hivetable where id = 1 limit 1`, I found that the limit push down is invalid, is it a bug or was it designed like this? if the sql is 'select * from hivetable limit 1' ,it is ok thanks
Re: flink 1.9.2 容器ha部署是jobid 一直是00000000000000000000000000000000
https://issues.apache.org/jira/browse/FLINK-19358 Jun Zhang <825875...@qq.com> 于2020年12月10日周四 上午10:00写道: > https://issues.apache.org/jira/browse/FLINK-19358 > > > > > > > 在2020年12月10日 09:32,Jeff > > > > 这个问题我也遇到了,请问后来怎么解决的呢? 更换成flink1.11.2都不行! > > > > > > > > > > > > > > > 在 2020-03-24 07:14:07,"Peihui He" 大家好,我在用flink > 1.9.2 > 部署到容器的时候如果不启动ha的情况下jobid是正常的,但是启动了就变成了 > 这样的话,checkpoint的地址和ha的文件地址都一样了,导致checkpoint总是失败。 > 不知道这是什么原因呢?
回复:flink 1.9.2 容器ha部署是jobid 一直是00000000000000000000000000000000
https://issues.apache.org/jira/browse/FLINK-19358 在2020年12月10日 09:32,Jeff
关于CatalogPartitionSpec类的一些想法
大家好: 我现在在使用flink中Catalog的查询分区的方法:Catalog#listPartitions(ObjectPath, CatalogPartitionSpec)的时候遇到一个问题。 我发现CatalogPartitionSpec中存放分区信息的字partitionSpec类型是Map类型是否会更加合理和通用呢? 谢谢
关于CatalogPartitionSpec类的一些想法
大家好: 我现在在使用flink中Catalog的查询分区的方法:Catalog#listPartitions(ObjectPath, CatalogPartitionSpec)的时候遇到一个问题。 我发现CatalogPartitionSpec中存放分区信息的字partitionSpec类型是Map
回复:flink1.11实时写入hive,写入速度很慢,checkpoint为60秒,并行度为1
你的kafka的分区数是多少,把flink的并行度加大到kafka的分区数。 BestJun -- 原始邮件 -- 发件人: me
回复:flinksql接收到字段值格式为2020-09-23T20:58:24+08:00,如何转成TIMESTAMP
这个水印比正常北京时间多八小时的问题,我是在1.11刚发布的时候测试发现的,我看了看源码,我的理解是把从sql里获取的东八区的时间,当成了utc时间来处理,所以会比北京时间多八小时。 BestJun -- 原始邮件 -- 发件人: Joker
回复:flinksql接收到字段值格式为2020-09-23T20:58:24+08:00,如何转成TIMESTAMP
Hi jack 如果我手动减去八小时,那么是不是使用eventtime落地的时候,就成了utc时区的值,比如现在是北京时间10点,那么我落地的时间将会是两点,对于使用东八区的人来说,会产生误解。 BestJun -- 原始邮件 -- 发件人: Jark Wu
回复:flink1.9.3 on yarn 提交任务问题
这个应该是和你配置的HA有关,你去掉HA配置试试,或者检查一下HA配置 BestJun -- 原始邮件 -- 发件人: 宁吉浩
Re: 请教:用flink实现实时告警的功能
可以使用广播,我自己写过一个文章,给你参考下,你可以把source换成每隔几秒钟去读mysql的配置 https://blog.csdn.net/zhangjun5965/article/details/106573528 samuel@ubtrobot.com 于2020年8月6日周四 上午10:26写道: > 由于需要实时告警功能,经调研,用flink 来实现是比较合适,但有几个问题没搞清楚,请大神们指教,感谢! > > 告警有分两部分: >一是告警规则的设置,数据存放在mysql,存储的格式是json > {"times":5} ---就是事件发生大于5次就发出告警; > {"temperature": 80} ---就是温度大于80就告警; >二是告警实现 > 1)上报的数据写入到kafka > 2)flink读取kafka的数据,然后通过翻滚窗口进行计算,如果满足规则就生产告警。 > > > 现在遇到的问题是: > 1. 当规则变更时,如何及时生效? > 2.如果用flink CEP来是实现,对于同一数据源,能否在一个作业里让多个规则同时生效? > 3.这一功能有最佳实践吗? > > 希望哪位解答一下,谢谢! > > > >
Re: [DISCUSS] add a '--filename' parameter for sql-client
hi,godfrey: Thanks for your reply 1. I have seen the -u parameter, but my sql file may not only include 'insert into select ', but also SET, DDL, etc. 2. I may not have noticed this issue. I took a look at this issue. I think this issue may have some problems. For example, he finally called the CliClient.callCommand method. But I think that many options in callCommand are not completely suitable for sql files, such as HELP, CLEAR, SELECT, etc. The select operation opens a window to display the results, obviously this is not suitable for executing sql files godfrey he 于2020年7月29日周三 上午9:56写道: > hi Jun, > > Currently, sql client has supported -u option, just like: > ./bin/sql-client.sh embedded -u "insert_statement". > > There is already a JIRA [1] that wants to support -f option > > [1] https://issues.apache.org/jira/browse/FLINK-12828 > > Best, > Godfrey > > Jun Zhang 于2020年7月29日周三 上午9:22写道: > >> I want to execute some flink sql batch jobs regularly, such as 'insert >> into >> select .', but I can't find a suitable method so far, so reference >> hive, I changed the source code and add a '--filename' parameter so >> that we can execute a sql file. >> >> like this: >> >> /home/flink/bin/sql-client.sh embedded -f flink.sql >> >> what about any ideas or plans for this feature community? >> >
[DISCUSS] add a '--filename' parameter for sql-client
I want to execute some flink sql batch jobs regularly, such as 'insert into select .', but I can't find a suitable method so far, so reference hive, I changed the source code and add a '--filename' parameter so that we can execute a sql file. like this: /home/flink/bin/sql-client.sh embedded -f flink.sql what about any ideas or plans for this feature community?
Re: application模式提交操作hive的任务相关疑问
hi,Yang Wang: *谢谢你的建议,稍后我测试一下。* Yang Wang 于2020年7月24日周五 上午10:09写道: > > 可以使用-Dyarn.ship-directories=/path/of/hiveConfDir把hive的配置ship到JobManager端,hiveConfDir默认会在 > 当前目录下,同时这个目录也会自动加入到classpath,不太清楚这样是否可以让hive正常加载到 > > > Best, > Yang > > Jun Zhang 于2020年7月23日周四 下午3:51写道: > > > 大家好: > > > > > 我现在有一个flink的程序要读写hive的数据,在程序中构造HiveCatalog的时候需要有一个hiveConfDir,如果我使用的是新的application模式去提交任务,这个任务的解析应该是放到了master端,这个时候hadoop集群上没有hive的相关配置,那么这个hiveConfDir该怎么配置呢? > > > > 谢谢 > > >
回复:application模式提交操作hive的任务相关疑问
我现在是改了源码,是把hivecatalog里面接收HiveConf参数的protected类型的构造方法改成了public类型,然后自己在代码里构造了HiveConf对象,传了一些必要的参数,比如metastore地址等。 BestJun -- 原始邮件 -- 发件人: Rui Li
application模式提交操作hive的任务相关疑问
大家好: 我现在有一个flink的程序要读写hive的数据,在程序中构造HiveCatalog的时候需要有一个hiveConfDir,如果我使用的是新的application模式去提交任务,这个任务的解析应该是放到了master端,这个时候hadoop集群上没有hive的相关配置,那么这个hiveConfDir该怎么配置呢? 谢谢
Re: flink 1.11 使用sql写入hdfs无法自动提交分区
hi,jinsong 我们生产环境hdfs是cdh 2.6的,我换了一个hadoop 3 版本的hdfs,还真没问题了,不知道是哪里出问题了。 Jingsong Li 于2020年7月23日周四 上午11:45写道: > 相同操作我也没有复现。。是可以成功执行的 > > 你的HDFS是什么版本?是否可以考虑换个来测试下 > > On Thu, Jul 23, 2020 at 11:34 AM Jun Zhang > wrote: > >> hi,jinsong: >> >> 这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。 >> >> Jun Zhang 于2020年7月23日周四 上午11:15写道: >> >>> hi,夏帅: >>> >>> 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。 >>> >>> 你测试没有问题的情况并行度是 1 吗?写入hdfs? >>> >>> 夏帅 于2020年7月10日周五 下午5:39写道: >>> >>>> 你好, >>>> 我这边同样的代码,并没有出现类似的问题 >>>> 是本地跑么,可以提供下日志信息么? >>>> >>>> > > -- > Best, Jingsong Lee >
Re: flink 1.11 使用sql写入hdfs无法自动提交分区
hi,jinsong: 这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。 Jun Zhang 于2020年7月23日周四 上午11:34写道: > hi,jinsong: > > 这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。 > > Jun Zhang 于2020年7月23日周四 上午11:15写道: > >> hi,夏帅: >> >> 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。 >> >> 你测试没有问题的情况并行度是 1 吗?写入hdfs? >> >> 夏帅 于2020年7月10日周五 下午5:39写道: >> >>> 你好, >>> 我这边同样的代码,并没有出现类似的问题 >>> 是本地跑么,可以提供下日志信息么? >>> >>>
Re: flink 1.11 使用sql写入hdfs无法自动提交分区
hi,jinsong: 这个问题不知道你后来有没有做过测试,我这里一直不行,就是并发度是1的时候,文件写入是正常的,就是没有生成success文件,如果是hive的话,就没有自动生成分区和更新分区数据。 Jun Zhang 于2020年7月23日周四 上午11:15写道: > hi,夏帅: > > 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。 > > 你测试没有问题的情况并行度是 1 吗?写入hdfs? > > 夏帅 于2020年7月10日周五 下午5:39写道: > >> 你好, >> 我这边同样的代码,并没有出现类似的问题 >> 是本地跑么,可以提供下日志信息么? >> >>
Re: flink 1.11 使用sql写入hdfs无法自动提交分区
hi,夏帅: 抱歉,这几天没搞这个,我这个问题是文件是正常写入hdfs了,但是没有自动提交,也没有错误日志,就是如果写入的是文件系统,没有SUCCESS文件,写入hive的话,没有自动更新分区。 你测试没有问题的情况并行度是 1 吗?写入hdfs? 夏帅 于2020年7月10日周五 下午5:39写道: > 你好, > 我这边同样的代码,并没有出现类似的问题 > 是本地跑么,可以提供下日志信息么? > >
Re: Re: Re: Re: 回复:Re: Re: Table options do not contain an option key 'connector' for discovering a connector.
hi,Zhou Zach : 问一下,你把你的程序,并行度设置成 1,还能正常读取hive的数据吗? Zhou Zach 于2020年7月13日周一 下午8:17写道: > 好的,感谢答疑 > > > > > > > > > > > > > > > > > > 在 2020-07-13 19:49:10,"Jingsong Li" 写道: > >创建kafka_table需要在default dialect下。 > > > >不管什么dialect,都会保存到hive metastore中 (除非使用temporary table的语法) > > > >Best, > >Jingsong > > > >On Mon, Jul 13, 2020 at 7:46 PM Zhou Zach wrote: > > > >> 创建kafka_table的时候,是default Dialect,改成HiveCatalog时,WATERMARK 和with语法都不支持了, > >> 如果是default Dialect创建的表,是不是只是在临时会话有效 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-07-13 19:27:44,"Jingsong Li" 写道: > >> >Hi, > >> > > >> >问题一: > >> > > >> >只要current catalog是HiveCatalog。 > >> >理论上Kafka也是存到HiveMetastore里面的,如果不想报错,可以用CREATE TABLE XXX IF NOT EXISTS. > >> > > >> >明确下,看不见是什么意思?可以单独试试Kafka表,重启后就不见了吗? > >> > > >> >问题二: > >> > > >> >用filesystem创建出来的是filesystem的表,它和hive > >> >metastore是没有关系的,你需要使用创建filesystem表的语法[1]。 > >> > > >> >filesystem的表数据是直接写到 文件系统的,它的格式和hive是兼容的,所以写的路径是hive某张表的路径,就可以在hive端查询了。 > >> >但是它的partition commit是不支持metastore的,所以不会有自动add > >> >partition到hive的默认实现,你需要自定义partition-commit-policy. > >> > > >> >[1] > >> > > >> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/filesystem.html > >> > > >> >Best, > >> >Jingsong > >> > > >> >On Mon, Jul 13, 2020 at 6:51 PM Zhou Zach wrote: > >> > > >> >> 尴尬 > >> >> 我开了两个项目,改错项目了,现在 已经成功从hive查到数据了,感谢社区的热情回复,@Jingsong Li, @夏帅 > >> >> 这两天刷了Jingsong在群里的那个视频几遍了,由衷感谢! > >> >> 还有两个问题问下, > >> >> 问题1: > >> >> 创建的kafka_table,在hive和Flink > >> >> > >> > SQL客户端都看不到,而且每次重新运行程序,如果不删除hive_table,就会报错,删除hive_table1,就可以执行,但是每次都不需要删除kafka_table,就可以执行程序,所以,是不是创建的kafka_table,是临时表,只有hive_table是存储在metastore > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> 问题2: > >> >> 刚才有热心社区同学回答,不用hivecatalog,用filesystem connector > 也是可以创建hive表,我尝试了一下,报错了: > >> >> java.util.concurrent.CompletionException: > >> >> > >> > org.apache.flink.client.deployment.application.ApplicationExecutionException: > >> >> Could not execute application. > >> >> at > >> >> > >> > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > >> >> ~[?:1.8.0_161] > >> >> at > >> >> > >> > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:245) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > >> > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$1(ApplicationDispatcherBootstrap.java:199) > >> >> ~[flink-clients_2.11-1.11.0.jar:1.11.0] > >> >> at > >> >> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > >> >> [?:1.8.0_161] > >> >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) > >> >> [?:1.8.0_161] > >> >> at > >> >> > >> > org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154) > >> >> [qile-data-flow-1.0.jar:?] > >> >> at > akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > >> >> [qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > >> >> [qile-data-flow-1.0.jar:?] > >> >> at > >> >> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > >> >> [qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > >> >> [qile-data-flow-1.0.jar:?] > >> >> at > >> >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > >> >> [qile-data-flow-1.0.jar:?] > >> >> at > >> >> > >> > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >> >> [qile-data-flow-1.0.jar:?] > >> >> Caused by: > >> >> > >> > org.apache.flink.client.deployment.application.ApplicationExecutionException: > >> >> Could not execute application. > >> >> ... 11 more > >> >> Caused by: > org.apache.flink.client.program.ProgramInvocationException: > >> The > >> >> main method caused an error: Unable to create a sink for writing > table > >> >>
Re: flink 1.11 使用sql写入hdfs无法自动提交分区
此外,如果设置并行度是大于1,虽然可以生成success文件,但是貌似不是第一次checkpoint结束的时候就生成了,我反复测试之后,好像是不固定的时间,比如可能是第5次,也可能是第10次checkpoint之后才生成的。 Jingsong Li 于2020年7月10日周五 下午3:35写道: > Hi, Jun, > > 非常感谢详细充分的测试~ > > 接下来我复现排查下~ > > Best, > Jingsong > > On Fri, Jul 10, 2020 at 3:09 PM Jun Zhang > wrote: > > > hi,jinsong: > > > > 在我的测试环境下,对于我贴出来的那个代码。 > > > > > 1.如果source使用的有界的数据,比如bsEnv.fromElements(...),这样会有success文件生成,如果是kafka数据,就不行。 > > 2.如果设置程序的并行度是大于1,那么也会有success生成。 > > 3.如果我写入的是local file,比如 file:///tmp/aaa ,而不是hdfs,也会有success文件生成。 > > > > > > > 综上,在并行度设置为1,消费的是kafka的永不停止的数据,写入的是hdfs,我的checkpoint设置是10s,这种情况下,我测试了好多遍,都没有success文件生成。 > > > > Jingsong Li 于2020年7月10日周五 下午2:54写道: > > > > > Hi, > > > > > > 默认情况下,对ORC来说,理论上一旦有正式数据文件的生成,就会有对应SUCCESS文件产生,你是怎么确认没有SUCCESS文件的呢? > > > 我用同样SQL在我的环境是有的。 > > > > > > Best, > > > Jingsong > > > > > > On Fri, Jul 10, 2020 at 9:07 AM Jun Zhang > > > wrote: > > > > > > > 大家好: > > > > 我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢 > > > > > > > > > > > > > > > > > > 我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。 > > > > > > > > public static void main(String[] args) throws Exception{ > > > > StreamExecutionEnvironment bsEnv = > > > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > bsEnv.enableCheckpointing(1); > > > > bsEnv.setParallelism(1); > > > > StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv); > > > > > > > > String sqlSource = "CREATE TABLE source_kafka (\n" + > > > > "appName STRING,\n" + > > > > "appVersion STRING,\n" + > > > > "uploadTime STRING\n" + > > > > ") WITH (\n" + > > > > " 'connector.type' = 'kafka', \n" + > > > > " 'connector.version' = '0.10',\n" + > > > > " 'connector.topic' = 'mytest',\n" + > > > > " 'connector.properties.zookeeper.connect' = > > > > 'localhost:2181',\n" + > > > > " 'connector.properties.bootstrap.servers' = > > > > 'localhost:9092',\n" + > > > > " 'connector.properties.group.id' = > > 'testGroup',\n" + > > > > " 'format.type'='json',\n" + > > > > " 'update-mode' = 'append' )"; > > > > > > > > tEnv.executeSql(sqlSource); > > > > > > > > String sql = "CREATE TABLE fs_table (\n" + > > > > "appName STRING,\n" + > > > > "appVersion STRING,\n" + > > > > "uploadTime STRING,\n" + > > > > " dt STRING," + > > > > " h string" + > > > > ") PARTITIONED BY (dt,h) WITH (\n" + > > > > " 'connector'='filesystem',\n" + > > > > " 'path'='hdfs://localhost/tmp/',\n" + > > > > " 'sink.partition-commit.policy.kind' = > > > > 'success-file', " + > > > > " 'format'='orc'\n" + > > > > ")"; > > > > tEnv.executeSql(sql); > > > > > > > > String insertSql = "insert into fs_table SELECT appName > > > > ,appVersion,uploadTime, " + > > > > " DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd'), > > > > DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka"; > > > > > > > > tEnv.executeSql(insertSql); > > > > > > > > } > > > > > > > > > > > > > -- > > > Best, Jingsong Lee > > > > > > > > -- > Best, Jingsong Lee >
Re: flink 1.11 使用sql写入hdfs无法自动提交分区
hi,jinsong: 在我的测试环境下,对于我贴出来的那个代码。 1.如果source使用的有界的数据,比如bsEnv.fromElements(...),这样会有success文件生成,如果是kafka数据,就不行。 2.如果设置程序的并行度是大于1,那么也会有success生成。 3.如果我写入的是local file,比如 file:///tmp/aaa ,而不是hdfs,也会有success文件生成。 综上,在并行度设置为1,消费的是kafka的永不停止的数据,写入的是hdfs,我的checkpoint设置是10s,这种情况下,我测试了好多遍,都没有success文件生成。 Jingsong Li 于2020年7月10日周五 下午2:54写道: > Hi, > > 默认情况下,对ORC来说,理论上一旦有正式数据文件的生成,就会有对应SUCCESS文件产生,你是怎么确认没有SUCCESS文件的呢? > 我用同样SQL在我的环境是有的。 > > Best, > Jingsong > > On Fri, Jul 10, 2020 at 9:07 AM Jun Zhang > wrote: > > > 大家好: > > 我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢 > > > > > > > 我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。 > > > > public static void main(String[] args) throws Exception{ > > StreamExecutionEnvironment bsEnv = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > bsEnv.enableCheckpointing(1); > > bsEnv.setParallelism(1); > > StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv); > > > > String sqlSource = "CREATE TABLE source_kafka (\n" + > > "appName STRING,\n" + > > "appVersion STRING,\n" + > > "uploadTime STRING\n" + > > ") WITH (\n" + > > " 'connector.type' = 'kafka', \n" + > > " 'connector.version' = '0.10',\n" + > > " 'connector.topic' = 'mytest',\n" + > > " 'connector.properties.zookeeper.connect' = > > 'localhost:2181',\n" + > > " 'connector.properties.bootstrap.servers' = > > 'localhost:9092',\n" + > > " 'connector.properties.group.id' = 'testGroup',\n" + > > " 'format.type'='json',\n" + > > " 'update-mode' = 'append' )"; > > > > tEnv.executeSql(sqlSource); > > > > String sql = "CREATE TABLE fs_table (\n" + > > "appName STRING,\n" + > > "appVersion STRING,\n" + > > "uploadTime STRING,\n" + > > " dt STRING," + > > " h string" + > > ") PARTITIONED BY (dt,h) WITH (\n" + > > " 'connector'='filesystem',\n" + > > " 'path'='hdfs://localhost/tmp/',\n" + > > " 'sink.partition-commit.policy.kind' = > > 'success-file', " + > > " 'format'='orc'\n" + > > ")"; > > tEnv.executeSql(sql); > > > > String insertSql = "insert into fs_table SELECT appName > > ,appVersion,uploadTime, " + > > " DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd'), > > DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka"; > > > > tEnv.executeSql(insertSql); > > > > } > > > > > -- > Best, Jingsong Lee >
Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?
hi,大家好 对于json schema的问题,我想问一个其他的问题, 比如我要做一个实时报警系统,需要消费kafka的json数据来进行实时报警,我的想法是对于每一个报警都生成一个flink任务,主要报警逻辑翻译成一个flink sql。 其中kafka里面的json数据,每一个字段都是可以生成报警条件的,比如有一个json格式的header字段,这个字段里面的内容是不固定的, 某一个用户想用header.aaa字段,另一个用户想用header.bbb字段,比如每分钟header.aaa的count值大于100就报警。 这种情况下,我该如何定义我的schema呢?大家有没有什么想法,谢谢。 Benchao Li 于2020年7月10日周五 下午1:54写道: > Hi Peihui, > > 正如Jark所说,FLINK-18002正是想解决这个问题,可以指定任意一个JsonNode为varchar类型。 > > 当然,这个feature不能解决所有问题,比如你有一个字段,内容不太确定,而且也不需要额外处理, > 主要是想保留这个字段,下游输出json的时候仍然还是这个字段。 > 如果用FLINK-18002的思路,输出到下游的时候,会把这部分数据整体作为一个json string,所以 > 从结果上来看,*还不能完全做到原封不动的输出到下游*。 > > 不知道后面这个场景是不是你面对的场景。如果是的话,我们目前有两个思路解决这个问题: > 1. 用RAW类型,这个需要json node类型对于flink来讲,都是可以序列化的 > 2. 用BINARY类型,因为现在已经有了对BINARY类型的处理,所以还需要额外加一个选项来指定对于BINARY类型 > 的处理模式。我们可以把任意json node转成它的json字符串表达形式,再转成byte[]进行中间的传输和处理;在 > 序列化的时候,再直接通过这个byte[]数据构造一个json node(这样可以保证它跟原来的json node一模一样)。 > > Jark Wu 于2020年7月10日周五 下午12:22写道: > > > 社区有个 issue 正在解决这个问题,可以关注一下 > > https://issues.apache.org/jira/browse/FLINK-18002 > > > > Best, > > Jark > > > > On Fri, 10 Jul 2020 at 11:13, Leonard Xu wrote: > > > > > Hi, Peihui > > > > > > 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format > > 的解析的底层实现 > > > 就是按照json的标准格式解析(jackson)的,没法将一个 > > > jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明, > > > 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。 > > > > > > 一种做法是定义复杂的jsonObject对应的ROW > > > 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的, > > > 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String, > > > 然后query里用UDTF处理。 > > > > > > > > > 祝好 > > > Leonard Xu > > > > > > > > > > > > > > > > 在 2020年7月10日,10:16,Peihui He 写道: > > > > > > > > Hello, > > > > > > > > 实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。 > > > > > > > > Best wishes. > > > > > > > > Peihui He 于2020年7月10日周五 上午10:12写道: > > > > > > > >> Hello, > > > >> > > > >> 明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。 > > > >> > > > >> > > > >> Best wishes. > > > >> > > > >> LakeShen 于2020年7月10日周五 上午10:03写道: > > > >> > > > >>> Hi Peihui, > > > >>> > > > >>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式: > > > >>> > > > >>> { > > > >>>"a":"b", > > > >>>"c":{ > > > >>>"d":"e", > > > >>>"g":"f" > > > >>>} > > > >>> }, > > > >>> > > > >>> 那么在 kafka table source 可以使用 row 来定义: > > > >>> > > > >>> create table xxx ( > > > >>> a varchar, > > > >>> c row > > > >>> ) > > > >>> > > > >>> 如果 还存在嵌套,可以继续再使用 Row 来定义。 > > > >>> > > > >>> Best, > > > >>> LakeShen > > > >>> > > > >>> Peihui He 于2020年7月10日周五 上午9:12写道: > > > >>> > > > Hello: > > > > > > 在用flink > > > sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。 > > > > > > 有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。 > > > > > > > > > Best wishes. > > > > > > >>> > > > >> > > > > > > > > > > > -- > > Best, > Benchao Li >
flink 1.11 使用sql写入hdfs无法自动提交分区
大家好: 我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢 我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。 public static void main(String[] args) throws Exception{ StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); bsEnv.enableCheckpointing(1); bsEnv.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv); String sqlSource = "CREATE TABLE source_kafka (\n" + "appName STRING,\n" + "appVersion STRING,\n" + "uploadTime STRING\n" + ") WITH (\n" + " 'connector.type' = 'kafka', \n" + " 'connector.version' = '0.10',\n" + " 'connector.topic' = 'mytest',\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + " 'connector.properties.group.id' = 'testGroup',\n" + " 'format.type'='json',\n" + " 'update-mode' = 'append' )"; tEnv.executeSql(sqlSource); String sql = "CREATE TABLE fs_table (\n" + "appName STRING,\n" + "appVersion STRING,\n" + "uploadTime STRING,\n" + " dt STRING," + " h string" + ") PARTITIONED BY (dt,h) WITH (\n" + " 'connector'='filesystem',\n" + " 'path'='hdfs://localhost/tmp/',\n" + " 'sink.partition-commit.policy.kind' = 'success-file', " + " 'format'='orc'\n" + ")"; tEnv.executeSql(sql); String insertSql = "insert into fs_table SELECT appName ,appVersion,uploadTime, " + " DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd'), DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka"; tEnv.executeSql(insertSql); }
Re: Re: flink 1.11 作业执行异常
hi.sunfulin 你有没有导入blink的planner呢,加入这个试试 org.apache.flink flink-table-planner-blink_${scala.binary.version} ${flink.version} sunfulin 于2020年7月7日周二 下午3:21写道: > > > > hi, jark > 我的执行代码其实很简单,就是下面的执行逻辑。不知道是不是我缺了什么依赖配置。我debug看了下异常执行,是说Flink > configuration里的DeployOptions.TARGET > (execution.target)没有匹配到配置?之前貌似从没有关注过这个配置。 > > > //构建StreamExecutionEnvironment > public static final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > //构建EnvironmentSettings 并指定Blink Planner > private static final EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > > //构建StreamTableEnvironment > public static final StreamTableEnvironment tEnv = > StreamTableEnvironment.create(env, bsSettings); > > > > > >tEnv.executeSql(“ddl sql”); > > > > > //source注册成表 > > tEnv.createTemporaryView("test", ds, $("f0").as("id"), > $("f1").as("first"), $("p").proctime()); > > > > > //join语句 > > Table table = tEnv.sqlQuery("select b.* from test a left join > my_dim FOR SYSTEM_TIME AS OF a.p AS b on a.first = b.userId"); > > > > > //输出 > > tEnv.toAppendStream(table, Row.class).print("LookUpJoinJob"); > > > > > env.execute("LookUpJoinJob"); > > > > > > > > > 在 2020-07-06 14:59:17,"Jark Wu" 写道: > >能分享下复现的作业代码不? > > > >Best, > >Jark > > > >On Mon, 6 Jul 2020 at 11:00, sunfulin wrote: > > > >> Hi, > >> 我使用目前最新的Flink 1.11 rc4来测试我的作业。报了如下异常: > >> org.apache.flink.table.api.TableExecution: Failed to execute sql > >> > >> > >> caused by : java.lang.IlleagalStateException: No ExecutorFactory found > to > >> execute the application. > >> at > >> > org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84) > >> > >> > >> 想请教下这个异常是啥原因?我使用1.10.1跑同样的逻辑,是没有异常的。 >
关于注册定时器的一些疑问
大家好: 官网的解释中,注册定时器只能是keyed stream,我使用BroadcastConnectedStream 接一个KeyedBroadcastProcessFunction函数发现也能注册定时器,我测试了一下,只限于使用processtime的时候,如果使用的是eventtime就不好使了,请问这个是什么原因呢?谢谢。
请教一个flink CEP的问题
大家好: 请教一个flink cep的问题,我想做一个简单的报警,比如连续三次大于5就报警,连续三次小于等于5就算报警恢复。 示例程序如下: DataStream
Re: 动态处理字段动态sink
hi: 不知道广播流能否满足你的需求 出发 <573693...@qq.com> 于2020年4月2日周四 下午4:25写道: > > 老铁们,消费kafka一些数据时候,根据规则,将满足条件的一些字段生成新字段,插入到es或者pg等里面,其中映射字段是动态的,插入字段也是动态的,结合drools来做比较好吗。
Re: flinksql如何控制结果输出的频率
hi: 你可以自定义一个trigger [1] 第二个场景是可以的,第一种场景我没有遇到过这种场景,你可以试试。 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html flink小猪 <18579099...@163.com> 于2020年3月27日周五 上午11:29写道: > 我有两个需求 > 1.如果我需要开一个一分钟的窗口,但是我希望能够每5分钟输出一次结果(输出每分钟的聚合结果,即输出5行聚合结果),该怎么办? > 2.如果我需要开一个一个小时的窗口,但是我希望能够每5分钟输出一次结果(窗口数据还没全部到达,先输出当前已经聚合的结果),该怎么办?
flink动态分区写入hive如何处理数据倾斜的问题
大家好: 有一个类似的sql 拿官网的这个做示例:INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08’; 如果实际上第三个type字段,某一种type数据量特别大,导致了数据倾斜,这种情况一般怎么处理呢? 谢谢。
回复:关于flink sql 1.10 source并行度自动推断的疑问
hi,Chief: 目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数, 你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。 BestJun -- 原始邮件 -- 发件人: Kurt Young
Re: 关于flink sql 1.10 source并行度自动推断的疑问
hi,Chief: 目前flink读取hive的时候,如果开启了自动推断,系统会根据所读取的文件数来推断并发,如果没有超过最大并发数(默认1000),source的并行度就等于你文件的个数, 你可以通过table.exec.hive.infer-source-parallelism.max来设置source的最大并发度。 Kurt Young 于2020年3月25日周三 上午8:53写道: > 你的数据量有多大?有一个可能的原因是source的其他并发调度起来的时候,数据已经被先调度起来的并发读完了。 > > Best, > Kurt > > > On Tue, Mar 24, 2020 at 10:39 PM Chief wrote: > > > hi all: > > 之前用flink sql查询hive的数据,hive的数据文件是150个,sql > > client配置文件设置的并行度是10,source通过自动推断生成了150并发,但是通过看web > > ui发现只有前十个子任务是读到数据了,其他的任务显示没有读到数据,请问是我设置有问题吗? >
?????? ????Flink1.10.0????hive??source??????????
??sql??sql??hive??sql?? ??2020??03??4?? 13:25??JingsongLeehttps://issues.apache.org/jira/browse/FLINK-16413 FYI Best, Jingsong Lee -- From:JingsongLee
?????? ????Flink1.10.0????hive??source??????????
hi??jinsong?? ??10?? ??sql select * from mytable limit 1; hive??mytable??10??10?? ??2020??03??2?? 16:38??JingsongLee
Re: flink1.10 yarn模式无法提交作业
你看看是不是yarn队列有很多accept状态的任务,如果有很多正在排队的任务,那就是可能超过了flink设置的超时时间还没有来得及给你这个任务分配资源,就会报这个错误。 但是这个不影响任务的提交,等一会任务提交成功了,客户端就会返回提交成功的结果。 Fei Han 于2020年2月9日周日 下午5:35写道: > @all: >在Flink1.10中,用yarn模式无法提交作业。 > 提示如下: > lease check if the requested resources are available in the YARN cluster > 2020-02-09 17:22:26,318 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Deployment took more than 60 seconds. Please check if > the requested resources are available in the YARN cluster > 2020-02-09 17:22:26,570 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Deployment took more than 60 seconds. Please check if > the requested resources are available in the YARN cluster > 2020-02-09 17:22:26,822 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Deployment took more than 60 seconds. Please check if > the requested resources are available in the YARN cluster > 2020-02-09 17:22:27,074 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Deployment took more than 60 seconds. Please check if > the requested resources are available in the YARN cluster > 2020-02-09 17:22:27,326 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Deployment took more than 60 seconds. Please check if > the requested resources are available in the YARN cluster > 2020-02-09 17:22:27,578 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Deployment took more than 60 seconds. Please check if > the requested resources are available in the YARN cluster > 2020-02-09 17:22:27,831 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Deployment took more than 60 seconds. Please check if > the requested resources are available in the YARN cluster > 2020-02-09 17:22:28,083 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Deployment took more than 60 seconds. Please check if > the requested resources are available in the YARN cluster > 2020-02-09 17:22:28,336 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Deployment took more than 60 seconds. Please check if > the requested resources are available in the YARN cluster > > > 另外,我看了yarn资源足够。但就是提交不了。 > 我的作业提交命令如下: > ./bin/flink run -m yarn-cluster -yjm 1024 -ytm 2048 > /opt/flink-1.10/examples/batch/WordCount.jar --input > hdfs://192.168.xxx.xxx:9000/test/LICENSE --output > hdfs://192.168.xxx.xxx:9000/test/result.txt
??????yarn-cluster ??????????????
jarcom.dora.job.stream.AliLogStreamingJob ?? ??2019??12??2?? 13:59??
????????????Flink trigger????????????????????
??Evictor?? BestJun -- -- ??: Qi Kang
???????????? state ????????????????
?? https://mp.weixin.qq.com/s/VUEvvoHoupZMpxWQsEeHEA ??2019??10??31?? 10:16??wangl...@geekplus.com.cn
Re: How to write stream data to other Hadoop Cluster by StreamingFileSink
Hi,Yang : thank you very much for your reply. I had add the configurations on my hadoop cluster client , both hdfs-site.xml and core-site.xml are configured, the client can read mycluster1 and mycluter2, but when I submit the flink job to yarn cluster , the hadoop client configurations is invalid, I read the source code ,it will give priority to the configuration of the hadoop cluster. On 10/9/2019 10:57??Yang Wang
How to write stream data to other Hadoop Cluster by StreamingFileSink
Hi,all: I have 2 hadoop cluster (hdfs://mycluster1 and hdfs://mycluster2),both of them configured the HA, I have a job ,read from streaming data from kafka, and write it to hdfs by StreamingFileSink,now I deployed my job on mycluster1 (flink on yarn),and I want to write the data to mycluster2 , how did I add the configure ? If I write hdfs://mycluster2/tmp/abc on the path of the StreamingFileSink directly, it will report that mycluster2 could not be found. I look at the source code of org.apache.flink.runtime.fs.hdfs.HadoopFsFactory#create. When flink loads core-site.xml and hdfs-site.xml, it is first loaded from hadoopConfig, then flinkConfig, and finally from classpath. I see flinkConfig does not seem to be empty, and the code is loaded by flinkConfig, finally loaded from HADOOP_HOME, so the core-site.xml and hdfs-site.xml of mycluster1 cluster will not contain the information of mycluster2. Cause mycluster2 not found. thanks
回复: Split a stream into any number of streams
对DataStream进行keyBy操作是否能解决呢? --原始邮件-- 发件人:"venn"https://stackoverflow.com/questions/53588554/apache-flink-using-filter -or-split-to-split-a-stream regards.