Hi, 请教下
启动任务的时候抛异常了, 但是没看懂报错原因, 麻烦各位大佬帮看下
这里我是先创建了一个DataStreamSource, 然后配置转为view, 配置EventTime, 后面再用SQL DDL进行数据处理
DataStreamSource<PerfEvent> source = env.addSource(consumer);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
tableEnv.createTemporaryView(table_name, source,
$(timeField).rowtime(), $("cpu"));
tableEnv.from(table_name).window(
Tumble.over(lit(1).minutes())
.on($(timeField))
.as(table_name + "Window")
);
tableEnv.executeSql(sql1); // CREATE TABLE t_out (`ts` TIMESTAMP(3),
`count` BIGINT) WITH ('connector' = 'print') 没有报错
tableEnv.executeSql(sql2); // INSERT INTO t_out SELECT
TUMBLE_START(`ts`, INTERVAL '1' MINUTE), COUNT(1) as `count` FROM t1 GROUP BY
TUMBLE(`ts`, INTERVAL '1' MINUTE) 抛异常
异常堆栈:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Index: 1, Size: 1
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 1
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at java.util.Collections$UnmodifiableList.get(Collections.java:1311)
at
org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:682)
at
org.apache.flink.table.planner.codegen.GenerateUtils$.generateFieldAccess(GenerateUtils.scala:665)
at
org.apache.flink.table.planner.codegen.GenerateUtils$.generateInputAccess(GenerateUtils.scala:561)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$generateConverterResultExpression$1(ExprCodeGenerator.scala:184)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$generateConverterResultExpression$1$adapted(ExprCodeGenerator.scala:158)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:242)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.mutable.ArrayOps$ofInt.map(ArrayOps.scala:242)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateConverterResultExpression(ExprCodeGenerator.scala:158)
at
org.apache.flink.table.planner.plan.utils.ScanUtil$.convertToInternalRow(ScanUtil.scala:103)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan.translateToPlanInternal(StreamExecDataStreamScan.scala:126)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan.translateToPlanInternal(StreamExecDataStreamScan.scala:55)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecDataStreamScan.translateToPlan(StreamExecDataStreamScan.scala:55)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:76)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:117)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlanInternal(StreamExecGroupWindowAggregateBase.scala:54)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupWindowAggregateBase.translateToPlan(StreamExecGroupWindowAggregateBase.scala:54)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:79)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:43)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:43)
at
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:67)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
...