请问你使用的是哪个版本? 这个似乎是一个已知的修复的bug[1]

[1] https://github.com/apache/flink/pull/15548

gen <[email protected]> 于2021年4月27日周二 上午9:40写道:

> Hi, all
>
> 请教下为什么 无法通过t.* 将 自定义函数返回的嵌套字段查出来。
>
> tEnv.executeSql(
>       """
>         | SELECT t.* FROM (
>         |     SELECT EvtParser(request) as t FROM parsed_nginx_log
>         |     )
>         |""".stripMargin)
>
>
>
>
> 自定义函数 EvtParser
>
> @DataTypeHint("ROW<evt STRING, app STRING, uid STRING, ts STRING, url
> STRING, action STRING>")
>   def eval(line: String) = {...}
>
>
>
> 详细代码
>
> class EvtEtlJobTest {
>
>   @Test
>   def testMain(): Unit = {
>     val settings = EnvironmentSettings.newInstance.useBlinkPlanner().build
>     val tEnv = TableEnvironment.create(settings)
>
>     tEnv.executeSql(
>       """
>         | CREATE TABLE parsed_nginx_log (
>         |   remote_addr STRING,
>         |   remote_user STRING,
>         |   time_local BIGINT,
>         |   request STRING,
>         |   status STRING,
>         |   body_bytes_sent STRING,
>         |   http_referer STRING,
>         |   http_user_agent STRING,
>         |   http_x_forwarded_for STRING
>         | ) WITH (
>         |   'connector' = 'filesystem',
>         |   'path' = 'src/test/resources/nginx-parsed.log',
>         |   'format' = 'json'
>         | )
>         |""".stripMargin).getTableSchema
>
>     tEnv.executeSql(
>       """
>         | CREATE TABLE evt_log_index (
>         |   evt STRING,
>         |   app STRING,
>         |   uid STRING,
>         |   ts STRING,
>         |   url STRING,
>         |   action STRING
>         | ) WITH (
>         |   'connector' = 'filesystem',
>         |   'path' = '/tmp/evt_log_index',
>         |   'format' = 'json'
>         | )
>         |""".stripMargin)
>
>     // 注册函数
>     tEnv.createTemporarySystemFunction("EvtParser", classOf[EvtParser])
>
>     val schema = tEnv.executeSql(
>       """
>         | SELECT t.* FROM (
>         |     SELECT EvtParser(request) as t FROM parsed_nginx_log
>         |     )
>         |""".stripMargin).getTableSchema
>
>     println("++++++++", schema)
>   }
>
> }
>
>
>
>
>
> 详细报错:
>
>
> java.lang.RuntimeException: Error while applying rule
> PushProjectIntoTableSourceScanRule, args
> [rel#63:LogicalProject.NONE.any.None:
> 0.[NONE].[NONE](input=RelSubset#62,exprs=[EvtParser($3).evt,
> EvtParser($3).app, EvtParser($3).uid, EvtParser($3).ts, EvtParser($3).url,
> EvtParser($3).action]), rel#1:LogicalTableScan.NONE.any.None:
> 0.[NONE].[NONE](table=[default_catalog, default_database,
> parsed_nginx_log])]
>
>         at
>
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
>         at
>
> org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
>         at
>
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
>         at
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
>         at
>
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
>         at
>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>         at
>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>         at
>
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>         at
>
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>         at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>         at
> scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>         at
>
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>         at
>
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
>         at
>
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
>         at
>
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>         at
>
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:287)
>         at
>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:160)
>         at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
>         at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:707)
>         at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1107)
>         at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)
>         at
> cn.bestlang.starry.evt.EvtEtlJobTest.testMain(EvtEtlJobTest.scala:51)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
>
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
>         at
>
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>         at
>
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>         at
>
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
>         at
>
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
>         at
>
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
>         at
>
> org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
>         at
>
> org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
>         at
>
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>         at
>
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>         at
>
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>         at
>
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>         at
>
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
>         at
>
> org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
>         at
>
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
>         at
>
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>         at
>
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
>         at
>
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
>         at
>
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
>         at
>
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
>         at
>
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>         at
>
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>         at
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>         at
>
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>         at
>
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>         at
>
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
>         at
>
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
>         at java.util.ArrayList.forEach(ArrayList.java:1257)
>         at
>
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
>         at
>
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
>         at
>
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>         at
>
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>         at
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>         at
>
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>         at
>
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>         at
>
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
>         at
>
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
>         at java.util.ArrayList.forEach(ArrayList.java:1257)
>         at
>
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
>         at
>
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
>         at
>
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>         at
>
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>         at
> org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>         at
>
> org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>         at
>
> org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>         at
>
> org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
>         at
>
> org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
>         at
>
> org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
>         at
>
> org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
>         at
>
> org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
>         at
>
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
>         at
>
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
>         at
>
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
>         at
>
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
>         at
>
> org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
>         at
>
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
>         at
>
> org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
>         at
>
> org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
>         at
>
> org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
>         at
>
> com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
>         at
>
> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>         at
>
> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)
>         at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
> Caused by: scala.MatchError: EvtParser($3) (of class
> org.apache.calcite.rex.RexCall)
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>

回复