请问你使用的是哪个版本? 这个似乎是一个已知的修复的bug[1]
[1] https://github.com/apache/flink/pull/15548 gen <[email protected]> 于2021年4月27日周二 上午9:40写道: > Hi, all > > 请教下为什么 无法通过t.* 将 自定义函数返回的嵌套字段查出来。 > > tEnv.executeSql( > """ > | SELECT t.* FROM ( > | SELECT EvtParser(request) as t FROM parsed_nginx_log > | ) > |""".stripMargin) > > > > > 自定义函数 EvtParser > > @DataTypeHint("ROW<evt STRING, app STRING, uid STRING, ts STRING, url > STRING, action STRING>") > def eval(line: String) = {...} > > > > 详细代码 > > class EvtEtlJobTest { > > @Test > def testMain(): Unit = { > val settings = EnvironmentSettings.newInstance.useBlinkPlanner().build > val tEnv = TableEnvironment.create(settings) > > tEnv.executeSql( > """ > | CREATE TABLE parsed_nginx_log ( > | remote_addr STRING, > | remote_user STRING, > | time_local BIGINT, > | request STRING, > | status STRING, > | body_bytes_sent STRING, > | http_referer STRING, > | http_user_agent STRING, > | http_x_forwarded_for STRING > | ) WITH ( > | 'connector' = 'filesystem', > | 'path' = 'src/test/resources/nginx-parsed.log', > | 'format' = 'json' > | ) > |""".stripMargin).getTableSchema > > tEnv.executeSql( > """ > | CREATE TABLE evt_log_index ( > | evt STRING, > | app STRING, > | uid STRING, > | ts STRING, > | url STRING, > | action STRING > | ) WITH ( > | 'connector' = 'filesystem', > | 'path' = '/tmp/evt_log_index', > | 'format' = 'json' > | ) > |""".stripMargin) > > // 注册函数 > tEnv.createTemporarySystemFunction("EvtParser", classOf[EvtParser]) > > val schema = tEnv.executeSql( > """ > | SELECT t.* FROM ( > | SELECT EvtParser(request) as t FROM parsed_nginx_log > | ) > |""".stripMargin).getTableSchema > > println("++++++++", schema) > } > > } > > > > > > 详细报错: > > > java.lang.RuntimeException: Error while applying rule > PushProjectIntoTableSourceScanRule, args > [rel#63:LogicalProject.NONE.any.None: > 0.[NONE].[NONE](input=RelSubset#62,exprs=[EvtParser($3).evt, > EvtParser($3).app, EvtParser($3).uid, EvtParser($3).ts, EvtParser($3).url, > EvtParser($3).action]), rel#1:LogicalTableScan.NONE.any.None: > 0.[NONE].[NONE](table=[default_catalog, default_database, > parsed_nginx_log])] > > at > > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) > at > > org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) > at > > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) > at > org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) > at > > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) > at > > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) > at > > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) > at > > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at > > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) > at > scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) > at > > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) > at > > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) > at > > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) > at > > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287) > at > > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:707) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1107) > at > > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666) > at > cn.bestlang.starry.evt.EvtEtlJobTest.testMain(EvtEtlJobTest.scala:51) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) > at > > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) > at > > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) > at > > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) > at > > org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) > at > > org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) > at > > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) > at > > org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) > at > > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210) > at > > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206) > at > > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131) > at > > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65) > at > > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at > > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) > at > > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) > at > > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) > at java.util.ArrayList.forEach(ArrayList.java:1257) > at > > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) > at > > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) > at > > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) > at > > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) > at > > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) > at java.util.ArrayList.forEach(ArrayList.java:1257) > at > > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) > at > > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) > at > > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) > at > > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > > org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) > at > > org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) > at > > org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35) > at > > org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) > at > > org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54) > at > > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107) > at > > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) > at > > org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54) > at > > org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67) > at > > org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52) > at > > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114) > at > > org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86) > at > > org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86) > at > > org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53) > at > > com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71) > at > > com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) > at > > com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221) > at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) > Caused by: scala.MatchError: EvtParser($3) (of class > org.apache.calcite.rex.RexCall) > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
