SQL????????????1.12.0??watermark??????????????????????????
CREATE TABLE KafkaTable (
&nbsp; test array<row<signalValue STRING&gt;&gt;,
&nbsp; gatherTime STRING,
&nbsp; log_ts as TO_TIMESTAMP(FROM_UNIXTIME(CAST(gatherTime AS 
bigint)/1000,'yyyy-MM-dd HH:mm:ss')),
&nbsp; WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (
&nbsp; 'connector' = 'kafka',
&nbsp; 'topic' = 'user_behavior',
&nbsp; 'properties.bootstrap.servers' = 'localhost:9092',
&nbsp; 'properties.group.id' = 'testGroup',
&nbsp; 'scan.startup.mode' = 'earliest-offset',
&nbsp; 'format' = 'json'
);

SELECT test[1].signalValue from KafkaTable;




Exception in thread "main" scala.MatchError: ITEM($0, 1) (of class 
org.apache.calcite.rex.RexCall)
        at 
org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273)
        at 
org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:283)
        at 
org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:269)
        at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92)
        at 
org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:112)
        at 
org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:111)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$.build(NestedProjectionUtil.scala:111)
        at 
org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala)
        at 
org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:127)
        at 
org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:62)
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:284)
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.matchRecurse(VolcanoRuleCall.java:411)
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.match(VolcanoRuleCall.java:268)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.fireRules(VolcanoPlanner.java:985)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1245)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:84)
        at 
org.apache.calcite.rel.AbstractRelNode.onRegister(AbstractRelNode.java:268)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1132)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.register(VolcanoPlanner.java:589)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.ensureRegistered(VolcanoPlanner.java:604)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.changeTraits(VolcanoPlanner.java:486)
        at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:309)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:163)
        at 
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:79)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:286)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:165)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1267)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:703)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1067)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:665)
        at com.job.SchemaJob.callSelect(SchemaJob.java:110)
        at com.job.SchemaJob.callCommand(SchemaJob.java:82)
        at com.job.SchemaJob.main(SchemaJob.java:56)
Disconnected from the target VM, address: '127.0.0.1:57324', transport: 'socket'


Process finished with exit code 1

回复