Hi,
there are multiple issues in your query. First of all, "SELECT
DISTINCT(user), product" is MySQL specific syntax and is interpreted as
"SELECT DISTINCT user, product" which is not what you want I guess.
Secondly, SQL windows can only be applied on time attributes. Meaning:
"As long as a time attribute is not modified and is simply forwarded
from one part of the query to another, it remains a valid time
attribute. Time attributes behave like regular timestamps and can be
accessed for calculations. If a time attribute is used in a calculation,
it will be materialized and becomes a regular timestamp. Regular
timestamps do not cooperate with Flink’s time and watermarking system
and thus can not be used for time-based operations anymore." [1]
So you cannot get a time attribute out of an aggregation (distinct).
Could you do the windowing first and maybe do the DISTINCT afterwards?
Some hint for while developing: You can always output the current schema
e.g. tableEnv.sql("...").printSchema();
Regards,
Timo
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/streaming.html#time-attributes
Am 03.04.18 um 10:15 schrieb 杨力:
You should add a column
TUMBLE_ROWTIME(t, INTERVAL '60' SECOND) AS t
to the select part of your subquery.
韩宁宁 <453673...@qq.com <mailto:453673...@qq.com>> 于 2018年4月3日周二
下午3:34写道:
Thank you for your reply.
I think the table registration no problem。I guess it's a question
of subquery。
It's no problem to execute this SQL:
select
user,
count(product),
TUMBLE_START(t, INTERVAL '60' SECOND) as wStart,
TUMBLE_END(t, INTERVAL '60' SECOND) as wEnd
from myFlinkTable GROUP BY user,TUMBLE(t, INTERVAL '60' SECOND)
------------------ 原始邮件 ------------------
*发件人:* "李玥"<liyue2...@gmail.com <mailto:liyue2...@gmail.com>>;
*发送时间:* 2018年4月3日(星期二) 中午11:49
*收件人:* "韩宁宁"<453673...@qq.com <mailto:453673...@qq.com>>;
*抄送:* "user"<user@flink.apache.org
<mailto:user@flink.apache.org>>; "skycrab68"<skycra...@163.com
<mailto:skycra...@163.com>>;
*主题:* Re: subuquery about flink sql
The exception logs tells that your table “myFlinkTable” does not
contain a column/field named “t”. Could be something wrong about
your table registration. It would be helpful to show us your
table registration code, like:
|// register a Table tableEnv.registerTable("table1", ...) // or
tableEnv.registerTableSource("table2", ...); // or
tableEnv.registerExternalCatalog("extCat", ...);|
LiYue
http://tig.jd.com
liyue2...@gmail.com <mailto:liyue2...@gmail.com>
在 2018年4月3日,上午11:23,韩宁宁 <453673...@qq.com
<mailto:453673...@qq.com>> 写道:
Deal All
I have a question about subquery of flink sql.
My sql like this:
select
user,
count(product),
TUMBLE_START(t, INTERVAL '60' SECOND) as wStart,
TUMBLE_END(t, INTERVAL '60' SECOND) as wEnd
from (
select
distinct(user),
product,
amount,
actionTime
from myFlinkTable
) GROUP BY user,TUMBLE(t, INTERVAL '60' SECOND)
The typeinfo of field like this:
TypeInformation<Row> typeInfo = Types.ROW(
new String[]
{"user","product","amount","actionTime"},
new TypeInformation<?>[] {
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO,
}
);
My datasource implements DefinedRowtimeAttribute,as follows:
@Override
public String getRowtimeAttribute() {
return "t";
}
I run the test code,and get the following error.
Exception in thread "main"
org.apache.flink.table.api.ValidationException: SQL validation
failed. At line 13, column 24: Column 't' not found in any table
at
org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:92)
at
org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:499)
at com.didi.flink.sql.Main.main(Main.java:103)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: org.apache.calcite.runtime.CalciteContextException: At
line 13, column 24: Column 't' not found in any table
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at
org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:804)
at
org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:789)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4363)
at
org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:258)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5018)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5000)
at
org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:344)
at
org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134)
at
org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101)
at
org.apache.calcite.sql.SqlOperator.acceptCall(SqlOperator.java:859)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:5053)
at
org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:50)
at
org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:33)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:137)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:4624)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateGroupClause(SqlValidatorImpl.java:3529)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3172)
at
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:931)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:912)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:220)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:887)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:597)
at
org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:88)
... 7 more
Caused by: org.apache.calcite.sql.validate.SqlValidatorException:
Column 't' not found in any table
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
... 32 more
==========================================
How to solve this problem?
Best wishes
Thanks