退订
退订
Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath
看下你引入的 jar 包是咋引入的,scope 设置的是 provided 吧? Wei JI10 季伟 于2021年6月28日周一 下午12:19写道: > 您好, > 版本都是1.12.3的,有其他方式能够定位到这个问题出在哪么? > > 在 2021/6/28 上午11:59,“Jingsong Li” 写入: > > 注意:此封邮件来自于公司外部,请注意信息安全! > Attention: This email comes from outside of the company, please pay > attention to the information security! > > Hi, 你的版本check下?集群和flink-parquet是同一个版本吗? > > BEST, > Jingsong > > On Mon, Jun 28, 2021 at 11:22 AM Wei JI10 季伟 > > wrote: > > > 您好, > > 不是sql client,是在yarn环境上跑的,运行的jar包中引入了上面的依赖,然后就看到有这个报错。 > > > > > > -- > Best, Jingsong Lee > >
Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath
您好, 版本都是1.12.3的,有其他方式能够定位到这个问题出在哪么? 在 2021/6/28 上午11:59,“Jingsong Li” 写入: 注意:此封邮件来自于公司外部,请注意信息安全! Attention: This email comes from outside of the company, please pay attention to the information security! Hi, 你的版本check下?集群和flink-parquet是同一个版本吗? BEST, Jingsong On Mon, Jun 28, 2021 at 11:22 AM Wei JI10 季伟 wrote: > 您好, > 不是sql client,是在yarn环境上跑的,运行的jar包中引入了上面的依赖,然后就看到有这个报错。 > > -- Best, Jingsong Lee
Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath
Hi, 你的版本check下?集群和flink-parquet是同一个版本吗? BEST, Jingsong On Mon, Jun 28, 2021 at 11:22 AM Wei JI10 季伟 wrote: > 您好, > 不是sql client,是在yarn环境上跑的,运行的jar包中引入了上面的依赖,然后就看到有这个报错。 > > -- Best, Jingsong Lee
Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath
您好, 不是sql client,是在yarn环境上跑的,运行的jar包中引入了上面的依赖,然后就看到有这个报错。
Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath
使用的是 sql client 测试的 sql 吗?如果是的话,记得在 flink lib 目录下添加 flink-sql-parquet jar 包,然后重启集群和 sql client Wei JI10 季伟 于2021年6月28日周一 上午9:35写道: > 您好, > 添加的parquet 依赖如下,不知道全不全 > > org.apache.flink > flink-parquet_${scala.binary.version} > ${flink.version} > > > org.apache.parquet > parquet-avro > 1.10.1 > > > > >
Re: 回复:flink 1.12如何实现window topN功能
可以将 1.13 的这个功能打在 flink 1.12 上面,然后引用你们新打的依赖 casel.chen 于2021年6月23日周三 下午12:08写道: > -- source > CREATE TABLE tmall_item( > itemID VARCHAR, > itemType VARCHAR, > onSellTime TIMESTAMP(3), > price DOUBLE, > proctime AS PROCTIME(), > WATERMARK FOR onSellTime AS onSellTime - INTERVAL '5' SECOND > ) WITH ( > 'connector' = 'filesystem' , > 'path' = 'file:///path/to/over-window.csv', > 'format' = 'csv' > ); > > -- sink > CREATE TABLE print_table ( > itemID VARCHAR, > itemType VARCHAR, > onSellTime TIMESTAMP(3), > price DOUBLE > ) WITH ( > 'connector' = 'print' > ); > > -- insert > INSERT INTO print_table > SELECT itemID, > itemType, > onSellTime, > price > FROM ( > SELECT itemID, > itemType, > onSellTime, > price, > ROW_NUMBER() OVER ( > PARTITION BY itemID, DATE_FORMAT(proctime, 'MMddHHmm') > ORDER BY onSellTime DESC > ) AS row_num > FROM tmall_item > ) WHERE row_num = 1; > > > > > > > > > > > > > > > > > > 在 2021-06-23 11:06:39,"杨光跃" 写道: > >应该是这样吧 > > > > > >1. 第一步以主键group by 以及分时间窗口 > >SELECT 主键, TUMBLE_START(ts, INTERVAL '10' SECOND) as wStart, FROM > source_event group by TUMBLE(ts, INTERVAL '10' SECOND), 主键 > > > > > >2. 根据上一步的结果取top5 > >select * from (select * ,ROW_NUMBER() OVER (PARTITION BY wStart ORDER BY > 处理时间字段 ) as rownum from 上一步的虚拟表) where rownum <= 5 > > > >| | > >杨光跃 > >| > >| > >yangguangyuem...@163.com > >| > >签名由网易邮箱大师定制 > >在2021年6月23日 10:58,casel.chen 写道: > >你指的是TopN吗? > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n > >但我想知道window topN写法,跟这个还不一样。 > >我的需求是: > >cdc场景同一个主键数据变更频繁,我想定义一个5秒处理时间窗口,在该窗口内取同一主键最新变更记录。用flink sql 1.12如何实现? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >在 2021-06-23 10:18:01,"杨光跃" 写道: > > > > > >Apache Flink 1.12 Documentation: Queries > >| | > >杨光跃 > >| > >| > >yangguangyuem...@163.com > >| > >签名由网易邮箱大师定制 > >在2021年6月23日 10:09,casel.chen 写道: > >请不要截图哦 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >在 2021-06-23 09:47:46,"杨光跃" 写道: > > > >1.12也支持的 > >| | > >杨光跃 > >| > >| > >yangguangyuem...@163.com > >| > >签名由网易邮箱大师定制 > >在2021年6月23日 09:45,casel.chen 写道: > >官网文档上显示flink 1.13支持window topN,但flink 1.12没看到相关文档,请问有什么work around方法吗? >
回复:flinksql写入hive问题
写入hive在读取,我试了下是可以的。。。 第一步: CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-conf-dir' = '/home/admin/hive/conf' ); 第二部 USE CATALOG myhive; 第三步 select * from hive_table; 猜测可能的问题,我们本地部署设置的slot都是1,你可能是在跑着写入任务,没有资源跑读取任务? 你可以设置把写入任务停了,或者设置方言问 : SET table.sql-dialect=hive; 然后在查询试试。 | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制 在2021年6月24日 18:00,Geoff nie 写道: 非常感谢!我是在sql-client上提交的,修改配置文件已经成功提交了。hive表下分区文件名如下: part-f3fa374b-c563-49c8-bd7a-b3bd7a5fb66d-0-2 还有两个问题请教下: 1.我通过如下创建了kafka流表,通过flink-sql查 kafka_table 是有数据的, 但是hdfs上却无文件,为什么呢 。 2.hive_table如上已经成功写入数据了,但是为啥flink-sql及hive却读取不到hive表数据呢,SELECT * FROM hive_table WHERE dt='2021-06-21' and hr='18'; SET table.sql-dialect=default; CREATE TABLE kafka_table ( user_id STRING, order_amount DOUBLE, log_ts TIMESTAMP(3), WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND ) WITH ( 'connector'='kafka', 'topic'='t_kafka_03', 'scan.startup.mode'='earliest-offset', 'properties.bootstrap.servers'='192.168.1.*:19092,192.168.1.*:19092,192.168.1.*:19092', 'properties.group.id' = 'testGroup10', 'format'='json' ); 烦请帮忙看下。感谢感谢。 在 2021-06-24 16:12:35,"杨光跃" 写道: 检查点,checkpoint ,如果是jar包发布,直接在代码里写就可以。 如果用的sql-client提交sql ,可以在配置文件: sql-client-defaults.yaml 中加入如下配置: configuration: execution.checkpointing.interval: 1000 | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制 在2021年6月24日 16:09,Geoff nie 写道: 非常感谢答复,不过我仔细考虑了下也没想明白,能具体说下在哪里配置参数吗。感谢! 在 2021-06-24 14:47:24,"杨光跃" 写道: 分区的提交需要开启checkpoint,要配置下 | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制 在2021年6月24日 14:44,Geoff nie 写道: 您好!我也遇到这个问题了,跟以下问题类似,请问,这个问题解决了吗?非常感谢。 在 2021-02-14 10:43:33,"潘永克" <13311533...@163.com> 写道: 转发邮件信息 发件人:"潘永克" <13311533...@163.com> 发送日期:2021-02-11 11:12:39 收件人:d...@flink.apache.org 主题:flinksql写入hive问题 咨询一个flink问题。flinsql,能写入数据到hive表。但是hive表中的数据,都是基于 ".partinprogress"类似的文件。flink1.12.0 基于cdh6.2.0编译的,hive版本是2.1.1、hadoop-3.0.0. 问题截图如下: 创建hive表 SET table.sql-dialect=hive; CREATE TABLE hive_table ( user_id STRING, order_amount DOUBLE ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 min', 'sink.partition-commit.policy.kind'='metastore,success-file' ); 插入数据 INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, '-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table; 文件始终不落地,一直都是 ".part-inprogress。。。"。文件。
Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath
您好, 添加的parquet 依赖如下,不知道全不全 org.apache.flink flink-parquet_${scala.binary.version} ${flink.version} org.apache.parquet parquet-avro 1.10.1
Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath
parquet 相关依赖增加了吗? Zhiwen Sun On Sun, Jun 27, 2021 at 3:57 PM Wei JI10 季伟 wrote: > Hi: >在使用flink sql connector的filesytem时,指定format为parquet。抛出异常信息 > Caused by: org.apache.flink.table.api.ValidationException: Could not find > any format factory for identifier 'parquet' in the classpath. >at > org.apache.flink.table.filesystem.FileSystemTableSource.(FileSystemTableSource.java:97) >at > org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:72) >at > org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:119) >... 41 more > > Sql语句如下: > CREATE TABLE user_info ( > `user_id` bigint, > `user_name` string > ) PARTITIONED BY (user_id) WITH ( > 'connector' = 'filesystem', > 'path' = '', > 'format' = 'parquet' > ); > > CREATE TABLE sink_table ( > `user_id` bigint, > `user_name` string > ) PARTITIONED BY (datetime) WITH ( > 'connector'='filesystem', > 'path'='', > 'format'='parquet', > 'sink.partition-commit.delay'='1h', > 'sink.partition-commit.policy.kind'='success-file' > ); > > insert OVERWRITE sink_table select *, '2021062600' as datetime from > user_info; >
flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath
Hi: 在使用flink sql connector的filesytem时,指定format为parquet。抛出异常信息 Caused by: org.apache.flink.table.api.ValidationException: Could not find any format factory for identifier 'parquet' in the classpath. at org.apache.flink.table.filesystem.FileSystemTableSource.(FileSystemTableSource.java:97) at org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:72) at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:119) ... 41 more Sql语句如下: CREATE TABLE user_info ( `user_id` bigint, `user_name` string ) PARTITIONED BY (user_id) WITH ( 'connector' = 'filesystem', 'path' = '', 'format' = 'parquet' ); CREATE TABLE sink_table ( `user_id` bigint, `user_name` string ) PARTITIONED BY (datetime) WITH ( 'connector'='filesystem', 'path'='', 'format'='parquet', 'sink.partition-commit.delay'='1h', 'sink.partition-commit.policy.kind'='success-file' ); insert OVERWRITE sink_table select *, '2021062600' as datetime from user_info;