Hi,

there are multiple issues in your query. First of all, "SELECT DISTINCT(user), product" is MySQL specific syntax and is interpreted as "SELECT DISTINCT user, product" which is not what you want I guess. Secondly, SQL windows can only be applied on time attributes. Meaning:

"As long as a time attribute is not modified and is simply forwarded from one part of the query to another, it remains a valid time attribute. Time attributes behave like regular timestamps and can be accessed for calculations. If a time attribute is used in a calculation, it will be materialized and becomes a regular timestamp. Regular timestamps do not cooperate with Flink’s time and watermarking system and thus can not be used for time-based operations anymore." [1]

So you cannot get a time attribute out of an aggregation (distinct). Could you do the windowing first and maybe do the DISTINCT afterwards?

Some hint for while developing: You can always output the current schema e.g. tableEnv.sql("...").printSchema();

Regards,
Timo


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/table/streaming.html#time-attributes


Am 03.04.18 um 10:15 schrieb 杨力:
You should add a column
    TUMBLE_ROWTIME(t, INTERVAL '60' SECOND) AS t
to the select part of your subquery.

韩宁宁 <453673...@qq.com <mailto:453673...@qq.com>> 于 2018年4月3日周二 下午3:34写道:

    Thank you for your reply.

    I think the table registration no problem。I guess it's a question
    of subquery。
    It's no problem to execute this SQL:
    select
            user,
            count(product),
            TUMBLE_START(t, INTERVAL '60' SECOND) as wStart,
            TUMBLE_END(t, INTERVAL '60' SECOND) as wEnd
     from myFlinkTable GROUP BY user,TUMBLE(t, INTERVAL '60' SECOND)



    ------------------ 原始邮件 ------------------
    *发件人:* "李玥"<liyue2...@gmail.com <mailto:liyue2...@gmail.com>>;
    *发送时间:* 2018年4月3日(星期二) 中午11:49
    *收件人:* "韩宁宁"<453673...@qq.com <mailto:453673...@qq.com>>;
    *抄送:* "user"<user@flink.apache.org
    <mailto:user@flink.apache.org>>; "skycrab68"<skycra...@163.com
    <mailto:skycra...@163.com>>;
    *主题:* Re: subuquery about flink sql

    The exception logs tells that your table “myFlinkTable” does not
    contain a column/field named “t”. Could be something  wrong  about
    your table registration.  It would be helpful to show us your
    table registration code,  like:

    |// register a Table tableEnv.registerTable("table1", ...) // or
    tableEnv.registerTableSource("table2", ...); // or
    tableEnv.registerExternalCatalog("extCat", ...);|



    LiYue
    http://tig.jd.com
    liyue2...@gmail.com <mailto:liyue2...@gmail.com>



    在 2018年4月3日,上午11:23,韩宁宁 <453673...@qq.com
    <mailto:453673...@qq.com>> 写道:

    Deal All
          I have a question about subquery of flink sql.
          My sql like this:
          select
            user,
            count(product),
            TUMBLE_START(t, INTERVAL '60' SECOND) as wStart,
            TUMBLE_END(t, INTERVAL '60' SECOND) as wEnd
        from (
            select
                distinct(user),
                product,
                amount,
                actionTime
            from myFlinkTable
        ) GROUP BY user,TUMBLE(t, INTERVAL '60' SECOND)

        The typeinfo of field like this:
        TypeInformation<Row> typeInfo = Types.ROW(
                    new String[]
    {"user","product","amount","actionTime"},
                    new TypeInformation<?>[] {
    BasicTypeInfo.STRING_TYPE_INFO,
    BasicTypeInfo.INT_TYPE_INFO,
    BasicTypeInfo.INT_TYPE_INFO,
    BasicTypeInfo.LONG_TYPE_INFO,
                    }
            );

        My datasource implements DefinedRowtimeAttribute,as follows:
        @Override
        public String getRowtimeAttribute() {
            return "t";
        }

        I run the test code,and get the following error.
        Exception in thread "main"
    org.apache.flink.table.api.ValidationException: SQL validation
    failed. At line 13, column 24: Column 't' not found in any table
    at
    
org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:92)
    at
    org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:499)
    at com.didi.flink.sql.Main.main(Main.java:103)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at
    
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at
    
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at
    com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
    Caused by: org.apache.calcite.runtime.CalciteContextException: At
    line 13, column 24: Column 't' not found in any table
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
    Method)
    at
    
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at
    
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at
    org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
    at
    org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:804)
    at
    org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:789)
    at
    
org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4363)
    at
    
org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:258)
    at
    
org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5018)
    at
    
org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5000)
    at
    org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:344)
    at
    
org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:134)
    at
    
org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild(SqlShuttle.java:101)
    at
    org.apache.calcite.sql.SqlOperator.acceptCall(SqlOperator.java:859)
    at
    
org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped(SqlValidatorImpl.java:5053)
    at
    
org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:50)
    at
    
org.apache.calcite.sql.validate.SqlScopedShuttle.visit(SqlScopedShuttle.java:33)
    at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:137)
    at
    
org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:4624)
    at
    
org.apache.calcite.sql.validate.SqlValidatorImpl.validateGroupClause(SqlValidatorImpl.java:3529)
    at
    
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3172)
    at
    
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
    at
    
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
    at
    
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:931)
    at
    
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:912)
    at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:220)
    at
    
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:887)
    at
    
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:597)
    at
    
org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:88)
    ... 7 more
    Caused by: org.apache.calcite.sql.validate.SqlValidatorException:
    Column 't' not found in any table
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
    Method)
    at
    
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at
    
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at
    org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463)
    at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572)
    ... 32 more
    ==========================================
    How to solve this problem?

    Best wishes
    Thanks





Reply via email to