Hi gen 我在1.13分支上验证了下你的case,发现能够跑通。建议cp下那个patch到自己的分支,再验证下。
Best, Shengkai Shengkai Fang <[email protected]> 于2021年4月27日周二 上午11:46写道: > 请问你使用的是哪个版本? 这个似乎是一个已知的修复的bug[1] > > [1] https://github.com/apache/flink/pull/15548 > > gen <[email protected]> 于2021年4月27日周二 上午9:40写道: > >> Hi, all >> >> 请教下为什么 无法通过t.* 将 自定义函数返回的嵌套字段查出来。 >> >> tEnv.executeSql( >> """ >> | SELECT t.* FROM ( >> | SELECT EvtParser(request) as t FROM parsed_nginx_log >> | ) >> |""".stripMargin) >> >> >> >> >> 自定义函数 EvtParser >> >> @DataTypeHint("ROW<evt STRING, app STRING, uid STRING, ts STRING, url >> STRING, action STRING>") >> def eval(line: String) = {...} >> >> >> >> 详细代码 >> >> class EvtEtlJobTest { >> >> @Test >> def testMain(): Unit = { >> val settings = EnvironmentSettings.newInstance.useBlinkPlanner().build >> val tEnv = TableEnvironment.create(settings) >> >> tEnv.executeSql( >> """ >> | CREATE TABLE parsed_nginx_log ( >> | remote_addr STRING, >> | remote_user STRING, >> | time_local BIGINT, >> | request STRING, >> | status STRING, >> | body_bytes_sent STRING, >> | http_referer STRING, >> | http_user_agent STRING, >> | http_x_forwarded_for STRING >> | ) WITH ( >> | 'connector' = 'filesystem', >> | 'path' = 'src/test/resources/nginx-parsed.log', >> | 'format' = 'json' >> | ) >> |""".stripMargin).getTableSchema >> >> tEnv.executeSql( >> """ >> | CREATE TABLE evt_log_index ( >> | evt STRING, >> | app STRING, >> | uid STRING, >> | ts STRING, >> | url STRING, >> | action STRING >> | ) WITH ( >> | 'connector' = 'filesystem', >> | 'path' = '/tmp/evt_log_index', >> | 'format' = 'json' >> | ) >> |""".stripMargin) >> >> // 注册函数 >> tEnv.createTemporarySystemFunction("EvtParser", classOf[EvtParser]) >> >> val schema = tEnv.executeSql( >> """ >> | SELECT t.* FROM ( >> | SELECT EvtParser(request) as t FROM parsed_nginx_log >> | ) >> |""".stripMargin).getTableSchema >> >> println("++++++++", schema) >> } >> >> } >> >> >> >> >> >> 详细报错: >> >> >> java.lang.RuntimeException: Error while applying rule >> PushProjectIntoTableSourceScanRule, args >> [rel#63:LogicalProject.NONE.any.None: >> 0.[NONE].[NONE](input=RelSubset#62,exprs=[EvtParser($3).evt, >> EvtParser($3).app, EvtParser($3).uid, EvtParser($3).ts, EvtParser($3).url, >> EvtParser($3).action]), rel#1:LogicalTableScan.NONE.any.None: >> 0.[NONE].[NONE](table=[default_catalog, default_database, >> parsed_nginx_log])] >> >> at >> >> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256) >> at >> >> org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) >> at >> >> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) >> at >> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) >> at >> >> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) >> at >> >> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) >> at >> >> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) >> at >> >> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >> at >> >> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) >> at scala.collection.Iterator$class.foreach(Iterator.scala:891) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) >> at >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> at >> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) >> at >> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) >> at >> >> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) >> at >> >> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163) >> at >> >> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79) >> at >> >> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) >> at >> >> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287) >> at >> >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160) >> at >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329) >> at >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:707) >> at >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1107) >> at >> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666) >> at >> cn.bestlang.starry.evt.EvtEtlJobTest.testMain(EvtEtlJobTest.scala:51) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> >> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688) >> at >> >> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) >> at >> >> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) >> at >> >> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149) >> at >> >> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140) >> at >> >> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84) >> at >> >> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) >> at >> >> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105) >> at >> >> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) >> at >> >> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) >> at >> >> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) >> at >> >> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) >> at >> >> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104) >> at >> >> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98) >> at >> >> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210) >> at >> >> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) >> at >> >> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206) >> at >> >> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131) >> at >> >> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65) >> at >> >> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) >> at >> >> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) >> at >> >> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) >> at >> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) >> at >> >> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) >> at >> >> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) >> at >> >> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) >> at >> >> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) >> at java.util.ArrayList.forEach(ArrayList.java:1257) >> at >> >> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) >> at >> >> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) >> at >> >> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) >> at >> >> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) >> at >> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) >> at >> >> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) >> at >> >> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) >> at >> >> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) >> at >> >> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) >> at java.util.ArrayList.forEach(ArrayList.java:1257) >> at >> >> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) >> at >> >> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) >> at >> >> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) >> at >> >> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) >> at >> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) >> at >> >> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) >> at >> >> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) >> at >> >> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) >> at >> >> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) >> at >> >> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35) >> at >> >> org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57) >> at >> >> org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54) >> at >> >> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107) >> at >> >> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) >> at >> >> org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54) >> at >> >> org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67) >> at >> >> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52) >> at >> >> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114) >> at >> >> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86) >> at >> >> org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86) >> at >> >> org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53) >> at >> >> com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71) >> at >> >> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33) >> at >> >> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221) >> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54) >> Caused by: scala.MatchError: EvtParser($3) (of class >> org.apache.calcite.rex.RexCall) >> >> >> >> -- >> Sent from: http://apache-flink.147419.n8.nabble.com/ >> >
