退订

2021-06-27 文章 高耀军
退订

Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 文章 zhisheng
看下你引入的 jar 包是咋引入的,scope 设置的是 provided 吧?

Wei JI10 季伟  于2021年6月28日周一 下午12:19写道:

> 您好,
> 版本都是1.12.3的,有其他方式能够定位到这个问题出在哪么?
>
> 在 2021/6/28 上午11:59,“Jingsong Li” 写入:
>
> 注意:此封邮件来自于公司外部,请注意信息安全!
> Attention: This email comes from outside of the company, please pay
> attention to the information security!
>
> Hi, 你的版本check下?集群和flink-parquet是同一个版本吗?
>
> BEST,
> Jingsong
>
> On Mon, Jun 28, 2021 at 11:22 AM Wei JI10 季伟  >
> wrote:
>
> > 您好,
> > 不是sql client,是在yarn环境上跑的,运行的jar包中引入了上面的依赖,然后就看到有这个报错。
> >
> >
>
> --
> Best, Jingsong Lee
>
>


Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 文章 Wei JI10 季伟
您好,
版本都是1.12.3的,有其他方式能够定位到这个问题出在哪么?

在 2021/6/28 上午11:59,“Jingsong Li” 写入:

注意:此封邮件来自于公司外部,请注意信息安全!
Attention: This email comes from outside of the company, please pay 
attention to the information security!

Hi, 你的版本check下?集群和flink-parquet是同一个版本吗?

BEST,
Jingsong

On Mon, Jun 28, 2021 at 11:22 AM Wei JI10 季伟 
wrote:

> 您好,
> 不是sql client,是在yarn环境上跑的,运行的jar包中引入了上面的依赖,然后就看到有这个报错。
>
>

--
Best, Jingsong Lee



Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 文章 Jingsong Li
Hi, 你的版本check下?集群和flink-parquet是同一个版本吗?

BEST,
Jingsong

On Mon, Jun 28, 2021 at 11:22 AM Wei JI10 季伟 
wrote:

> 您好,
> 不是sql client,是在yarn环境上跑的,运行的jar包中引入了上面的依赖,然后就看到有这个报错。
>
>

-- 
Best, Jingsong Lee


Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 文章 Wei JI10 季伟
您好,
不是sql client,是在yarn环境上跑的,运行的jar包中引入了上面的依赖,然后就看到有这个报错。



Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 文章 zhisheng
使用的是 sql client 测试的 sql 吗?如果是的话,记得在 flink lib 目录下添加 flink-sql-parquet jar
包,然后重启集群和 sql client

Wei JI10 季伟  于2021年6月28日周一 上午9:35写道:

> 您好,
> 添加的parquet 依赖如下,不知道全不全
> 
> org.apache.flink
> flink-parquet_${scala.binary.version}
> ${flink.version}
> 
> 
> org.apache.parquet
> parquet-avro
> 1.10.1
> 
> 
>
>
>


Re: 回复:flink 1.12如何实现window topN功能

2021-06-27 文章 zhisheng
可以将 1.13 的这个功能打在 flink 1.12 上面,然后引用你们新打的依赖

casel.chen  于2021年6月23日周三 下午12:08写道:

> -- source
> CREATE TABLE tmall_item(
> itemID VARCHAR,
> itemType VARCHAR,
> onSellTime TIMESTAMP(3),
> price DOUBLE,
> proctime AS PROCTIME(),
> WATERMARK FOR onSellTime AS onSellTime - INTERVAL '5' SECOND
> ) WITH (
> 'connector' = 'filesystem' ,
> 'path' = 'file:///path/to/over-window.csv',
> 'format' = 'csv'
> );
>
> -- sink
> CREATE TABLE print_table (
> itemID VARCHAR,
> itemType VARCHAR,
> onSellTime TIMESTAMP(3),
> price DOUBLE
> ) WITH (
> 'connector' = 'print'
> );
>
> -- insert
> INSERT INTO print_table
> SELECT itemID,
> itemType,
> onSellTime,
> price
> FROM (
> SELECT itemID,
> itemType,
> onSellTime,
> price,
> ROW_NUMBER() OVER (
> PARTITION BY itemID, DATE_FORMAT(proctime, 'MMddHHmm')
> ORDER BY onSellTime DESC
> ) AS row_num
> FROM tmall_item
>  ) WHERE row_num = 1;
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-06-23 11:06:39,"杨光跃"  写道:
> >应该是这样吧
> >
> >
> >1. 第一步以主键group by 以及分时间窗口
> >SELECT 主键, TUMBLE_START(ts, INTERVAL '10' SECOND) as wStart, FROM
> source_event  group by TUMBLE(ts, INTERVAL '10' SECOND), 主键
> >
> >
> >2. 根据上一步的结果取top5
> >select * from (select * ,ROW_NUMBER() OVER (PARTITION BY wStart ORDER BY
> 处理时间字段 ) as rownum from 上一步的虚拟表) where rownum <= 5
> >
> >| |
> >杨光跃
> >|
> >|
> >yangguangyuem...@163.com
> >|
> >签名由网易邮箱大师定制
> >在2021年6月23日 10:58,casel.chen 写道:
> >你指的是TopN吗?
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/queries.html#top-n
> >但我想知道window topN写法,跟这个还不一样。
> >我的需求是:
> >cdc场景同一个主键数据变更频繁,我想定义一个5秒处理时间窗口,在该窗口内取同一主键最新变更记录。用flink sql 1.12如何实现?
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2021-06-23 10:18:01,"杨光跃"  写道:
> >
> >
> >Apache Flink 1.12 Documentation: Queries
> >| |
> >杨光跃
> >|
> >|
> >yangguangyuem...@163.com
> >|
> >签名由网易邮箱大师定制
> >在2021年6月23日 10:09,casel.chen 写道:
> >请不要截图哦
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >在 2021-06-23 09:47:46,"杨光跃"  写道:
> >
> >1.12也支持的
> >| |
> >杨光跃
> >|
> >|
> >yangguangyuem...@163.com
> >|
> >签名由网易邮箱大师定制
> >在2021年6月23日 09:45,casel.chen 写道:
> >官网文档上显示flink 1.13支持window topN,但flink 1.12没看到相关文档,请问有什么work around方法吗?
>


回复:flinksql写入hive问题

2021-06-27 文章 杨光跃
写入hive在读取,我试了下是可以的。。。
第一步:
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'default',
'hive-conf-dir' = '/home/admin/hive/conf'
);
第二部
USE CATALOG myhive;
第三步
select * from hive_table;


猜测可能的问题,我们本地部署设置的slot都是1,你可能是在跑着写入任务,没有资源跑读取任务?


你可以设置把写入任务停了,或者设置方言问 : SET table.sql-dialect=hive;
然后在查询试试。






| |
杨光跃
|
|
yangguangyuem...@163.com
|


签名由网易邮箱大师定制
在2021年6月24日 18:00,Geoff nie 写道:
非常感谢!我是在sql-client上提交的,修改配置文件已经成功提交了。hive表下分区文件名如下:
part-f3fa374b-c563-49c8-bd7a-b3bd7a5fb66d-0-2


还有两个问题请教下:
1.我通过如下创建了kafka流表,通过flink-sql查
kafka_table 是有数据的,
但是hdfs上却无文件,为什么呢
。
2.hive_table如上已经成功写入数据了,但是为啥flink-sql及hive却读取不到hive表数据呢,SELECT * FROM 
hive_table WHERE dt='2021-06-21' and hr='18';
SET table.sql-dialect=default;
CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (
'connector'='kafka',
'topic'='t_kafka_03',
'scan.startup.mode'='earliest-offset',
'properties.bootstrap.servers'='192.168.1.*:19092,192.168.1.*:19092,192.168.1.*:19092',
'properties.group.id' = 'testGroup10',
'format'='json'
);




烦请帮忙看下。感谢感谢。

















在 2021-06-24 16:12:35,"杨光跃"  写道:


检查点,checkpoint ,如果是jar包发布,直接在代码里写就可以。 如果用的sql-client提交sql ,可以在配置文件:  
sql-client-defaults.yaml 中加入如下配置:
configuration:
execution.checkpointing.interval: 1000
| |
杨光跃
|
|
yangguangyuem...@163.com
|
签名由网易邮箱大师定制
在2021年6月24日 16:09,Geoff nie 写道:
非常感谢答复,不过我仔细考虑了下也没想明白,能具体说下在哪里配置参数吗。感谢!

















在 2021-06-24 14:47:24,"杨光跃"  写道:
分区的提交需要开启checkpoint,要配置下


| |
杨光跃
|
|
yangguangyuem...@163.com
|
签名由网易邮箱大师定制
在2021年6月24日 14:44,Geoff nie 写道:
您好!我也遇到这个问题了,跟以下问题类似,请问,这个问题解决了吗?非常感谢。
















在 2021-02-14 10:43:33,"潘永克" <13311533...@163.com> 写道:



















 转发邮件信息 
发件人:"潘永克" <13311533...@163.com>
发送日期:2021-02-11 11:12:39
收件人:d...@flink.apache.org
主题:flinksql写入hive问题

咨询一个flink问题。flinsql,能写入数据到hive表。但是hive表中的数据,都是基于 
".partinprogress"类似的文件。flink1.12.0
基于cdh6.2.0编译的,hive版本是2.1.1、hadoop-3.0.0.  问题截图如下:
创建hive表
SET table.sql-dialect=hive;
CREATE TABLE hive_table (
user_id STRING,
order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 min',
'sink.partition-commit.policy.kind'='metastore,success-file'
);
插入数据
INSERT INTO TABLE hive_table
SELECT user_id, order_amount, DATE_FORMAT(log_ts, '-MM-dd'), 
DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;


文件始终不落地,一直都是 ".part-inprogress。。。"。文件。
















Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 文章 Wei JI10 季伟
您好,
添加的parquet 依赖如下,不知道全不全

org.apache.flink
flink-parquet_${scala.binary.version}
${flink.version}


org.apache.parquet
parquet-avro
1.10.1






Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 文章 Zhiwen Sun
parquet 相关依赖增加了吗?

Zhiwen Sun



On Sun, Jun 27, 2021 at 3:57 PM Wei JI10 季伟 
wrote:

> Hi:
>在使用flink sql connector的filesytem时,指定format为parquet。抛出异常信息
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any format factory for identifier 'parquet' in the classpath.
>at
> org.apache.flink.table.filesystem.FileSystemTableSource.(FileSystemTableSource.java:97)
>at
> org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:72)
>at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:119)
>... 41 more
>
> Sql语句如下:
> CREATE TABLE user_info (
> `user_id` bigint,
> `user_name` string
> ) PARTITIONED BY (user_id) WITH (
> 'connector' = 'filesystem',
> 'path' = '',
> 'format' = 'parquet'
> );
>
> CREATE TABLE sink_table (
> `user_id` bigint,
> `user_name` string
> ) PARTITIONED BY (datetime) WITH (
> 'connector'='filesystem',
> 'path'='',
> 'format'='parquet',
> 'sink.partition-commit.delay'='1h',
> 'sink.partition-commit.policy.kind'='success-file'
> );
>
> insert OVERWRITE sink_table select *, '2021062600' as  datetime from
> user_info;
>


flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 文章 Wei JI10 季伟
Hi:
   在使用flink sql connector的filesytem时,指定format为parquet。抛出异常信息
Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
format factory for identifier 'parquet' in the classpath.
   at 
org.apache.flink.table.filesystem.FileSystemTableSource.(FileSystemTableSource.java:97)
   at 
org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSource(FileSystemTableFactory.java:72)
   at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:119)
   ... 41 more

Sql语句如下:
CREATE TABLE user_info (
`user_id` bigint,
`user_name` string
) PARTITIONED BY (user_id) WITH (
'connector' = 'filesystem',
'path' = '',
'format' = 'parquet'
);

CREATE TABLE sink_table (
`user_id` bigint,
`user_name` string
) PARTITIONED BY (datetime) WITH (
'connector'='filesystem',
'path'='',
'format'='parquet',
'sink.partition-commit.delay'='1h',
'sink.partition-commit.policy.kind'='success-file'
);

insert OVERWRITE sink_table select *, '2021062600' as  datetime from 
user_info;