现在有一个work around,就是你可以用子查询先把row展开,比如: select ... from ( select data.rule_results as rule_results, ... ) cross join unnest(rule_results) as t(...)
Benchao Li <[email protected]> 于2020年7月23日周四 下午12:44写道: > 我感觉这可能是calcite的bug,CC Danny老师 > > Dream-底限 <[email protected]> 于2020年7月22日周三 下午5:46写道: > >> hi 、Benchao Li >> 我尝试了将数组打散的方式,但是报了一个莫名其妙的错误,可以帮忙看看嘛 >> >> tableEnv.executeSql("CREATE TABLE parser_data_test (\n" + >> " data ROW<flow_task_id BIGINT,flow_id STRING,flow_version >> STRING,path STRING,country_id INT,create_time BIGINT," + >> "spent_time DECIMAL(10,2),features >> ROW<`user_ic_no_aku_uid.pdl_cdpd` >> STRING,`user_ic_no_aku_uid.pdl_current_unpay` INT," + >> "`user_ic_no_aku_uid.current_overdue_collection` >> INT>,rule_results ARRAY<ROW<rule_id STRING,rule_name STRING," + >> "rule_type_name STRING,`result` INT,in_path BOOLEAN>>>,\n" + >> " createTime BIGINT,\n" + >> " tindex INT\n" + >> ") WITH (\n" + >> " 'connector' = 'kafka-0.11',\n" + >> " 'topic' = 'parser_data_test',\n" + >> " 'properties.bootstrap.servers' = 'localhost:9092',\n" + >> " 'properties.group.id' = 'testGroup',\n" + >> " 'scan.startup.mode' = 'earliest-offset',\n" + >> " 'format' = 'json',\n" + >> " 'json.fail-on-missing-field' = 'false',\n" + >> " 'json.ignore-parse-errors' = 'true'\n" + >> ")"); >> >> Table table = tableEnv.sqlQuery("select >> >> data.flow_task_id,data.features.`user_ic_no_aku_uid.pdl_current_unpay`,rule_id,tindex >> from parser_data_test CROSS JOIN UNNEST(data.rule_results) AS t >> (rule_id,rule_name,rule_type_name,`result`,in_path)"); >> >> table.printSchema(); >> tableEnv.toAppendStream(table, >> >> Types.ROW(TypeConversions.fromDataTypeToLegacyInfo(table.getSchema().getFieldDataTypes()))).print(); >> >> >> 异常信息: >> >> rg.apache.flink.table.api.ValidationException: SQL validation failed. >> From line 0, column 0 to line 1, column 139: Column 'data.data' not >> found in table 'parser_data_test' >> >> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) >> at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) >> at >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:185) >> at >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) >> at >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:664) >> at >> com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:63) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) >> at >> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >> at >> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) >> at >> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >> at >> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) >> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) >> at >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) >> at >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) >> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) >> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) >> at >> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) >> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) >> at >> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) >> at org.junit.runners.ParentRunner.run(ParentRunner.java:363) >> at org.junit.runner.JUnitCore.run(JUnitCore.java:137) >> at >> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) >> at >> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) >> at >> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) >> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) >> Caused by: org.apache.calcite.runtime.CalciteContextException: From >> line 0, column 0 to line 1, column 139: Column 'data.data' not found >> in table 'parser_data_test' >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> Method) >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> at >> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) >> at >> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) >> at >> org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) >> at >> org.apache.calcite.sql.validate.DelegatingScope.fullyQualify(DelegatingScope.java:439) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5991) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:5976) >> at >> org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:321) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:5583) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateUnnest(SqlValidatorImpl.java:3271) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3253) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303) >> at >> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) >> at >> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) >> at >> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) >> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) >> at >> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) >> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) >> ... 28 more >> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: >> Column 'data.data' not found in table 'parser_data_test' >> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> Method) >> at >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> at >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> at >> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) >> at >> org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) >> ... 51 more >> >> >> Benchao Li <[email protected]> 于2020年7月22日周三 下午2:05写道: >> >> > Hi, >> > 如果只是想打平array,Flink有个内置的方法,可以参考[1]的”Expanding arrays into a relation“部分 >> > >> > [1] >> > >> > >> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#joins >> > >> > Jark Wu <[email protected]> 于2020年7月22日周三 上午11:17写道: >> > >> > > Hi, >> > > >> > > Row[] 作为 eval 参数,目前还不支持。社区已经有一个 issue 在跟进支持这个功能: >> > > https://issues.apache.org/jira/browse/FLINK-17855 >> > > >> > > >> > > Best, >> > > Jark >> > > >> > > On Wed, 22 Jul 2020 at 10:45, Dream-底限 <[email protected]> wrote: >> > > >> > > > hi, >> > > > 我想将一个array<row>打散成多行,但是并没有成功 >> > > > >> > > > @FunctionHint(input =@DataTypeHint("ARRAY<ROW<rule_id >> STRING,rule_name >> > > > STRING,rule_type_name STRING,`result` INT,in_path BOOLEAN>>") >> ,output >> > > > = @DataTypeHint("ROW<rule_id STRING,rule_name STRING,rule_type_name >> > > > STRING,`result` INT,in_path BOOLEAN>")) >> > > > public static class FlatRowFunction extends TableFunction<Row> { >> > > > private static final long serialVersionUID = 1L; >> > > > >> > > > public void eval(Row[] rows) { >> > > > for (Row row : rows) { >> > > > collect(row); >> > > > } >> > > > } >> > > > } >> > > > >> > > > 异常如下: >> > > > >> > > > org.apache.flink.table.api.ValidationException: SQL validation >> failed. >> > > > From line 1, column 149 to line 1, column 174: No match found for >> > > > function signature >> > > > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id, >> > > > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name, >> > > > INTEGER result, BOOLEAN in_path) ARRAY>) >> > > > >> > > > at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> > > > >> > > >> > >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146) >> > > > at >> > > > >> > > >> > >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108) >> > > > at >> > > > >> > > >> > >> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187) >> > > > at >> > > > >> > > >> > >> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) >> > > > at >> > > > >> > > >> > >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658) >> > > > at >> > > > >> > > >> > >> com.akulaku.data.flink.ParserDataTest.parserDataTest(ParserDataTest.java:60) >> > > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native >> Method) >> > > > at >> > > > >> > > >> > >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> > > > at >> > > > >> > > >> > >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> > > > at java.lang.reflect.Method.invoke(Method.java:498) >> > > > at >> > > > >> > > >> > >> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) >> > > > at >> > > > >> > > >> > >> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >> > > > at >> > > > >> > > >> > >> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) >> > > > at >> > > > >> > > >> > >> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >> > > > at >> > > > >> > > >> > >> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) >> > > > at >> > org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) >> > > > at >> > > > >> > > >> > >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) >> > > > at >> > > > >> > > >> > >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) >> > > > at >> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) >> > > > at >> > > org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) >> > > > at >> > > > org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) >> > > > at >> > > org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) >> > > > at >> > > org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) >> > > > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) >> > > > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) >> > > > at >> > > > >> > > >> > >> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) >> > > > at >> > > > >> > > >> > >> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) >> > > > at >> > > > >> > > >> > >> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230) >> > > > at >> > com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58) >> > > > Caused by: org.apache.calcite.runtime.CalciteContextException: From >> > > > line 1, column 149 to line 1, column 174: No match found for >> function >> > > > signature flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) >> > > > rule_id, VARCHAR(2147483647) rule_name, VARCHAR(2147483647) >> > > > rule_type_name, INTEGER result, BOOLEAN in_path) ARRAY>) >> > > > at >> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> > > > Method) >> > > > at >> > > > >> > > >> > >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> > > > at >> > > > >> > > >> > >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> > > > at >> > > java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) >> > > > at >> > > > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839) >> > > > at >> > > > org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1882) >> > > > at >> > > > org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:305) >> > > > at >> > > > org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845) >> > > > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.ProcedureNamespace.validateImpl(ProcedureNamespace.java:57) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateJoin(SqlValidatorImpl.java:3303) >> > > > at >> > > > >> > > >> > >> org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator.validateJoin(FlinkCalciteSqlValidator.java:86) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3247) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084) >> > > > at >> > org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766) >> > > > at >> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >> > > > >> > > >> > >> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141) >> > > > ... 28 more >> > > > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No >> > > > match found for function signature >> > > > flatRow(<RecordType:peek_no_expand(VARCHAR(2147483647) rule_id, >> > > > VARCHAR(2147483647) rule_name, VARCHAR(2147483647) rule_type_name, >> > > > INTEGER result, BOOLEAN in_path) ARRAY>) >> > > > at >> > sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >> > > > Method) >> > > > at >> > > > >> > > >> > >> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >> > > > at >> > > > >> > > >> > >> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >> > > > at >> > > java.lang.reflect.Constructor.newInstance(Constructor.java:423) >> > > > at >> > > > >> > > >> > >> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457) >> > > > at >> > > > org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550) >> > > > ... 56 more >> > > > >> > > > >> > > > godfrey he <[email protected]> 于2020年7月21日周二 下午7:41写道: >> > > > >> > > > > 可以,定义清楚 getResultType 和 getParameterTypes, 可以参考[1] >> > > > > >> > > > > [1] >> > > > > >> > > > > >> > > > >> > > >> > >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide >> > > > > >> > > > > >> > > > > Dream-底限 <[email protected]> 于2020年7月21日周二 下午7:25写道: >> > > > > >> > > > > > hi >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> 我这面在定义一个表函数,通过继承TableFunction完成操作,但是eval方法中的参数类型都是java基本类型(至少看到的demo都是如此),想问一下eval方法中可以传flink >> > > > > > 内部类型吗,比如说我想在eval()方法中传递Row类型要怎么操作,eval(Row row) >> > > > > > >> > > > > >> > > > >> > > >> > >> > >> > -- >> > >> > Best, >> > Benchao Li >> > >> > > > -- > > Best, > Benchao Li > -- Best, Benchao Li
