Re: 退订

2021-06-28 文章 Leonard Xu
如果需要取消订阅 u...@flink.apache.org   邮件组,请发送任意内容的邮件到 
user-zh-unsubscr...@flink.apache.org 
 。


> 在 2021年6月29日,11:01,大雨 <95133...@qq.com.INVALID> 写道:
> 
> 退订



Re: flink 1.13.0 中cumulate window 使用

2021-06-28 文章 yidan zhao
是否insert应该和这个cumulate window没关系了。需要考虑你的test_out的定义,定primary key可以试一试。

此外,@Leonard Xu,我想知道window tvf,table values function,这么个词想表达啥呢?表值函数?
其次,为啥语法中是 from table(CUMULATE(...)) 而不是直接 from
CUMULATE(...)。此处的table()的这个table算是啥呢?函数?
那么CUMULATE(...)返回的是啥呢,是某种类型,可以用于table函数的参数吗。 还是 table(xx)理解为类型转换呢?

邹云鹤  于2021年6月3日周四 下午4:25写道:
>
> hello 大佬,
> 我现在 使用 cumulate 的SQL 如下:insert into `test_out` select a.uid, 'dt', max(a.age) 
> from
> TABLE(
> CUMULATE(TABLE test_in, DESCRIPTOR(proctime), INTERVAL '1' MINUTES, INTERVAL 
> '1' hours)) as a group by uid, window_start, window_end;
>
>
> 是可以运行了,但是发现每次窗口触发, 通过JDBC Sink 写入到数据库执行的都是insert 操作, 如果这个地方需要根据key 
> 在数据库里面进行update 操作,使用CUMULATE WINDOW 可以实现吗?该怎么用这个SQL?
> | |
> 邹云鹤
> |
> |
> kevinyu...@163.com
> |
> 签名由网易邮箱大师定制
> 在2021年5月28日 11:51,Leonard Xu 写道:
> Hi,
>
> Cumulate window 是基于 window TVF 语法的,和之前的 groupWindowFunction 不一样, 你可以参考 [1]
> Window TVF 也支持了 tumble window, hop window, 并且性能更优,session window 预计会在1.14支持, 
> session window 有需要可以使用老的语法。
>
> Best,
> Leonard
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/window-agg/#windowing-tvfs
>
>
> 在 2021年5月28日,11:43,邹云鹤  写道:
>
> insert into `test_out` select a.uid, 'dt', max(a.age) from `test_in` as a 
> group by uid, cumulate(PROCTIME(), interval '1' minute, interval '1' hour);
>
>
> hello, 各位大佬, flink 1.13.0 中流式作业中该怎么使用cumulate window呢?我用上面的SQL 貌似是不行, 
> 有没有使用过的大佬给点建议?
>
>
>
>
> | |
> 邹云鹤
> |
> |
> kevinyu...@163.com
> |
> 签名由网易邮箱大师定制


Re: 退订

2021-06-28 文章 Jingsong Li
退订请发送到 user-zh-unsubscr...@flink.apache.org 而不是 user-zh@flink.apache.org

Best,
Jingsong

On Mon, Jun 28, 2021 at 5:56 PM luoye <13033709...@163.com> wrote:

> 退订



-- 
Best, Jingsong Lee


Re: Flink Sql 1.13 UDF ERROR

2021-06-28 文章 Jingsong Li
Hi,

你可以创建个JIRA,让Timo看看,UDAF引入了新的类型推导,可能有问题

Best,
Jingsong

On Tue, Jun 29, 2021 at 7:10 AM Roc Marshal  wrote:

>
>
> Hi, All.
>
>
> 请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家:
>
>
> 版本: 1.13.1
> 运行模式: IDE-application
> ---
> about udf define...
>
>
> public static class UDFAggregateFunction extends
> AggregateFunction {
>
>
> //返回最终结果
> @Override
> public Double getValue(AccumulatorBean acc) {
> return acc.totalPrice / acc.totalNum;
> }
>
>
> //构建保存中间结果的对象
> @Override
> public AccumulatorBean createAccumulator() {
> return new AccumulatorBean();
> }
>
>
> //减去要撤回的值
> public void retract(AccumulatorBean acc, double price, long num) {
> acc.totalPrice -= price * num;
> acc.totalNum -= num;
> }
>
>
> //从每个分区把数据取出来然后合并
> public void merge(AccumulatorBean acc, Iterable
> it) {
>
>
> Iterator iter = it.iterator();
> while (iter.hasNext()) {
> AccumulatorBean a = iter.next();
> this.accumulate(acc, a.totalPrice, a.totalNum);
> }
> }
>
>
> //重置内存中值时调用
> public void resetAccumulator(AccumulatorBean acc) {
> acc.totalNum = 0;
> acc.totalPrice = 0;
> }
>
>
> //和传入数据进行计算的逻辑
> public void accumulate(AccumulatorBean acc, double price, long
> num) {
> acc.totalPrice += price * num;
> acc.totalNum += num;
> }
> }
>
>
>
> 
> About main calling
> //TODO 流批一体的 Table API
> TableEnvironment tableEnvironment =
> TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
> List dataList = new ArrayList<>();
> dataList.add(Row.of("张三", "可乐", 20.0D, 4L));
> dataList.add(Row.of("张三", "果汁", 10.0D, 4L));
> dataList.add(Row.of("李四", "咖啡", 10.0D, 2L));
> Table table = tableEnvironment.fromValues(DataTypes.ROW(
> DataTypes.FIELD("user", DataTypes.STRING()),
> DataTypes.FIELD("name", DataTypes.STRING()),
> DataTypes.FIELD("price", DataTypes.DOUBLE()),
> DataTypes.FIELD("num", DataTypes.BIGINT())
> ),
> dataList);
> tableEnvironment.createTemporaryView("orders", table);
>
>
> tableEnvironment.createTemporaryFunction("c_agg", new
> UDFAggregateFunction());
>
>
> tableEnvironment.executeSql("select user, c_agg(price, num) as
> udf_field from orders group by user").print();
>
>
>
>
>
>
>
> 异常堆栈-
>
>
>
>
> default_catalog.default_database.c_agg(DOUBLE, BIGINT)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
> at
> com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139)
> Caused by: org.apache.flink.table.api.ValidationException: Invalid
> function call:
> default_catalog.default_database.c_agg(DOUBLE, BIGINT)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
> at
> org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
> at
> org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330)
> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
> at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
> at
> org.apache.calcite.sql.v

????

2021-06-28 文章 ????


Re:回复:flinksql写入hive问题

2021-06-28 文章 Geoff nie
非常感谢大佬,slot我设置成了4,按照你的方法我排查了下我的问题,应该是我在我的集群配置文件sql-client-defaults.yaml文件中设置的原始值不对:
我多加了个“hive-version: 2.1.1”,后来把这一行注释掉,可以了;而且按照你的方式注册临时catalog也可以了。
这个问题导致我一直卡在这。。。哭


总之,感谢帮助哈






sql-client-defaults.yaml文件中设置的错误原始值如下:
catalogs:  # [] # empty list
# A typical catalog definition looks like:
- name: myhive
  type: hive
#hive-conf-dir: /opt/hive_conf/
  hive-conf-dir: /etc/hive/conf
#default-database: ...
  hive-version: 2.1.1
  default-database: myhive




sql-client-defaults.yaml文件中设置的修改后值如下:

catalogs:  # [] # empty list

# A typical catalog definition looks like:

- name: myhive

  type: hive

#hive-conf-dir: /opt/hive_conf/

  hive-conf-dir: /etc/hive/conf

#default-database: ...

#  hive-version: 2.1.1

  default-database: myhive








在 2021-06-28 10:35:22,"杨光跃"  写道:
>写入hive在读取,我试了下是可以的。。。
>第一步:
>CREATE CATALOG myhive WITH (
>'type' = 'hive',
>'default-database' = 'default',
>'hive-conf-dir' = '/home/admin/hive/conf'
>);
>第二部
>USE CATALOG myhive;
>第三步
>select * from hive_table;
>
>
>猜测可能的问题,我们本地部署设置的slot都是1,你可能是在跑着写入任务,没有资源跑读取任务?
>
>
>你可以设置把写入任务停了,或者设置方言问 : SET table.sql-dialect=hive;
>然后在查询试试。
>
>
>
>
>
>
>| |
>杨光跃
>|
>|
>yangguangyuem...@163.com
>|
>
>
>签名由网易邮箱大师定制
>在2021年6月24日 18:00,Geoff nie 写道:
>非常感谢!我是在sql-client上提交的,修改配置文件已经成功提交了。hive表下分区文件名如下:
>part-f3fa374b-c563-49c8-bd7a-b3bd7a5fb66d-0-2
>
>
>还有两个问题请教下:
>1.我通过如下创建了kafka流表,通过flink-sql查
>kafka_table 是有数据的,
>但是hdfs上却无文件,为什么呢
>。
>2.hive_table如上已经成功写入数据了,但是为啥flink-sql及hive却读取不到hive表数据呢,SELECT * FROM 
>hive_table WHERE dt='2021-06-21' and hr='18';
>SET table.sql-dialect=default;
>CREATE TABLE kafka_table (
>user_id STRING,
>order_amount DOUBLE,
>log_ts TIMESTAMP(3),
>WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
>) WITH (
>'connector'='kafka',
>'topic'='t_kafka_03',
>'scan.startup.mode'='earliest-offset',
>'properties.bootstrap.servers'='192.168.1.*:19092,192.168.1.*:19092,192.168.1.*:19092',
>'properties.group.id' = 'testGroup10',
>'format'='json'
>);
>
>
>
>
>烦请帮忙看下。感谢感谢。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-06-24 16:12:35,"杨光跃"  写道:
>
>
>检查点,checkpoint ,如果是jar包发布,直接在代码里写就可以。 如果用的sql-client提交sql ,可以在配置文件:  
>sql-client-defaults.yaml 中加入如下配置:
>configuration:
>execution.checkpointing.interval: 1000
>| |
>杨光跃
>|
>|
>yangguangyuem...@163.com
>|
>签名由网易邮箱大师定制
>在2021年6月24日 16:09,Geoff nie 写道:
>非常感谢答复,不过我仔细考虑了下也没想明白,能具体说下在哪里配置参数吗。感谢!
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-06-24 14:47:24,"杨光跃"  写道:
>分区的提交需要开启checkpoint,要配置下
>
>
>| |
>杨光跃
>|
>|
>yangguangyuem...@163.com
>|
>签名由网易邮箱大师定制
>在2021年6月24日 14:44,Geoff nie 写道:
>您好!我也遇到这个问题了,跟以下问题类似,请问,这个问题解决了吗?非常感谢。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-02-14 10:43:33,"潘永克" <13311533...@163.com> 写道:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 转发邮件信息 
>发件人:"潘永克" <13311533...@163.com>
>发送日期:2021-02-11 11:12:39
>收件人:d...@flink.apache.org
>主题:flinksql写入hive问题
>
>咨询一个flink问题。flinsql,能写入数据到hive表。但是hive表中的数据,都是基于 
>".partinprogress"类似的文件。flink1.12.0
>基于cdh6.2.0编译的,hive版本是2.1.1、hadoop-3.0.0.  问题截图如下:
>创建hive表
>SET table.sql-dialect=hive;
>CREATE TABLE hive_table (
>user_id STRING,
>order_amount DOUBLE
>) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
>'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>'sink.partition-commit.trigger'='partition-time',
>'sink.partition-commit.delay'='1 min',
>'sink.partition-commit.policy.kind'='metastore,success-file'
>);
>插入数据
>INSERT INTO TABLE hive_table
>SELECT user_id, order_amount, DATE_FORMAT(log_ts, '-MM-dd'), 
>DATE_FORMAT(log_ts, 'HH')
>FROM kafka_table;
>
>
>文件始终不落地,一直都是 ".part-inprogress。。。"。文件。
>
>
>
>
>
>
>
>
>
>
>
>
>
>


local运行模式下不会生成checkpoint吗?

2021-06-28 文章 casel.chen
我在本地使用local运行模式运行flink sql,将数据从kafka写到mongodb,mongodb 
connector是自己开发的,实现了CheckpointedFunction接口,debug的时候发现数据进来的时候有调用invoke方法,但没有调用initialState和snapshotState方法,我有设置enableCheckpoint,同样的程序使用kubernetes部署发现是会调用snapshotState方法。我的问题是:local运行模式下不会生成checkpoint吗?

Flink Sql 1.13 UDF ERROR

2021-06-28 文章 Roc Marshal


Hi, All.


请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家:


版本: 1.13.1
运行模式: IDE-application
---
about udf define...


public static class UDFAggregateFunction extends AggregateFunction {


//返回最终结果
@Override
public Double getValue(AccumulatorBean acc) {
return acc.totalPrice / acc.totalNum;
}


//构建保存中间结果的对象
@Override
public AccumulatorBean createAccumulator() {
return new AccumulatorBean();
}


//减去要撤回的值
public void retract(AccumulatorBean acc, double price, long num) {
acc.totalPrice -= price * num;
acc.totalNum -= num;
}


//从每个分区把数据取出来然后合并
public void merge(AccumulatorBean acc, Iterable it) {


Iterator iter = it.iterator();
while (iter.hasNext()) {
AccumulatorBean a = iter.next();
this.accumulate(acc, a.totalPrice, a.totalNum);
}
}


//重置内存中值时调用
public void resetAccumulator(AccumulatorBean acc) {
acc.totalNum = 0;
acc.totalPrice = 0;
}


//和传入数据进行计算的逻辑
public void accumulate(AccumulatorBean acc, double price, long num) {
acc.totalPrice += price * num;
acc.totalNum += num;
}
}



About main calling
//TODO 流批一体的 Table API
TableEnvironment tableEnvironment = 
TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
List dataList = new ArrayList<>();
dataList.add(Row.of("张三", "可乐", 20.0D, 4L));
dataList.add(Row.of("张三", "果汁", 10.0D, 4L));
dataList.add(Row.of("李四", "咖啡", 10.0D, 2L));
Table table = tableEnvironment.fromValues(DataTypes.ROW(
DataTypes.FIELD("user", DataTypes.STRING()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("price", DataTypes.DOUBLE()),
DataTypes.FIELD("num", DataTypes.BIGINT())
),
dataList);
tableEnvironment.createTemporaryView("orders", table);


tableEnvironment.createTemporaryFunction("c_agg", new 
UDFAggregateFunction());


tableEnvironment.executeSql("select user, c_agg(price, num) as 
udf_field from orders group by user").print();






异常堆栈-




default_catalog.default_database.c_agg(DOUBLE, BIGINT)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
at 
com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139)
Caused by: org.apache.flink.table.api.ValidationException: Invalid function 
call:
default_catalog.default_database.c_agg(DOUBLE, BIGINT)
at 
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194)
at 
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
at 
org.apache.calcite.sql.

Flink Sql 1.13 UDF ERROR

2021-06-28 文章 Roc Marshal


Hi, All.


请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家:


版本: 1.13.1
运行模式: IDE-application
---
about udf define...


public static class UDFAggregateFunction extends AggregateFunction {


//返回最终结果
@Override
public Double getValue(AccumulatorBean acc) {
return acc.totalPrice / acc.totalNum;
}


//构建保存中间结果的对象
@Override
public AccumulatorBean createAccumulator() {
return new AccumulatorBean();
}


//减去要撤回的值
public void retract(AccumulatorBean acc, double price, long num) {
acc.totalPrice -= price * num;
acc.totalNum -= num;
}


//从每个分区把数据取出来然后合并
public void merge(AccumulatorBean acc, Iterable it) {


Iterator iter = it.iterator();
while (iter.hasNext()) {
AccumulatorBean a = iter.next();
this.accumulate(acc, a.totalPrice, a.totalNum);
}
}


//重置内存中值时调用
public void resetAccumulator(AccumulatorBean acc) {
acc.totalNum = 0;
acc.totalPrice = 0;
}


//和传入数据进行计算的逻辑
public void accumulate(AccumulatorBean acc, double price, long num) {
acc.totalPrice += price * num;
acc.totalNum += num;
}
}



About main calling
//TODO 流批一体的 Table API
TableEnvironment tableEnvironment = 
TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
List dataList = new ArrayList<>();
dataList.add(Row.of("张三", "可乐", 20.0D, 4L));
dataList.add(Row.of("张三", "果汁", 10.0D, 4L));
dataList.add(Row.of("李四", "咖啡", 10.0D, 2L));
Table table = tableEnvironment.fromValues(DataTypes.ROW(
DataTypes.FIELD("user", DataTypes.STRING()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("price", DataTypes.DOUBLE()),
DataTypes.FIELD("num", DataTypes.BIGINT())
),
dataList);
tableEnvironment.createTemporaryView("orders", table);


tableEnvironment.createTemporaryFunction("c_agg", new 
UDFAggregateFunction());


tableEnvironment.executeSql("select user, c_agg(price, num) as 
udf_field from orders group by user").print();






异常堆栈-




default_catalog.default_database.c_agg(DOUBLE, BIGINT)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
at 
com.intsmaze.flink.table.udf.aggre.AggregateFunctionTemplate.main(AggregateFunctionTemplate.java:139)
Caused by: org.apache.flink.table.api.ValidationException: Invalid function 
call:
default_catalog.default_database.c_agg(DOUBLE, BIGINT)
at 
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:194)
at 
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
at org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:679)
at org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:444)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:330)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5709)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5696)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1735)
at 
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1726)
at 
org.apache.calcite.sql.

退订

2021-06-28 文章 luoye
退订

退订

2021-06-28 文章 luoye
退订

Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-28 文章 Wei JI10 季伟
Hi,
貌似是jar包冲突了,我再确认确认。

在 2021/6/28 下午2:33,“王刚” 写入:

注意:此封邮件来自于公司外部,请注意信息安全!
Attention: This email comes from outside of the company, please pay 
attention to the information security!

把flink parquet包放在flink客户端lib包下试试呢



在本地环境IDEA远程调试Flink报错

2021-06-28 文章 tangzhi8...@gmail.com
目的:想在本地环境IDEA远程调试Flink
步骤:
1.这是Debug的配置项
2.报错堆栈信息:
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Failed to execute job 'Streaming WordCount'.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:374)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:120)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:817)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:249)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1060)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1148)
at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1148)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 
'Streaming WordCount'.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1984)
at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)
at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1845)
at 
org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:97)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:357)
... 8 more
Caused by: java.lang.RuntimeException: Error while waiting for job to be 
initialized
at 
org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:166)
at 
org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$execute$2(AbstractSessionClusterExecutor.java:82)
at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:73)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
complete the operation. Number of retries has been exhausted.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.lambda$null$0(AbstractSessionClusterExecutor.java:83)
at 
org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(ClientUtils.java:146)
... 9 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
Could not complete the operation. Number of retries has been exhausted.
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:386)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.runtime.rest.util.RestClientException: Response was neither of 
the expected type([simple type, class 
org.apache.flink.runtime.rest.messages.job.JobDetailsInfo]) nor an error.
at 
java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at 
java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at java.util.concurrent.CompletableFuture.uniRelay(Complet