Hi, 从错误上看应该是你作业里某个字符串字段被作为时间戳处理,导致作业codegen失败了。你的作业逻辑比较复杂,你可以排查一下跟时间相关的字段,检查一下字段类型处理是否正确,比如eventTime字段
Best, Shammon FY On Mon, May 15, 2023 at 7:29 PM lxk <lxk7...@163.com> wrote: > 你好,从报错来看是类型不兼容导致的。 > Caused by: org.codehaus.commons.compiler.CompileException: Line 17, Column > 103: Cannot cast "java.lang.String" to "java.time.LocalDateTime" > 可以尝试对表结构进行优化,或者使用相关函数对字段类型进行转换 > > > > > > > > > > > > > > > > > > At 2023-05-15 18:29:15, "小昌同学" <ccc0606fight...@163.com> wrote: > >| > >package job; > >import bean.BaseInfo; > >import bean.MidInfo; > >import bean.OutInfo; > >import bean.ResultInfo; > >import com.alibaba.fastjson.JSON; > >import com.alibaba.fastjson.JSONObject; > >import config.FlinkConfig; > >import function.MyProcessFunction; > >import org.apache.flink.api.common.functions.MapFunction; > >import org.apache.flink.api.common.serialization.SimpleStringSchema; > >import org.apache.flink.api.java.tuple.Tuple2; > >import org.apache.flink.streaming.api.TimeCharacteristic; > >import org.apache.flink.streaming.api.datastream.DataStream; > >import org.apache.flink.streaming.api.datastream.DataStreamSource; > >import > org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > >import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > >import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > >import org.apache.flink.table.api.DataTypes; > >import org.apache.flink.table.api.Schema; > >import org.apache.flink.table.api.Table; > >import org.apache.flink.table.api.TableSchema; > >import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > >import org.apache.flink.table.types.DataType; > >import org.apache.flink.util.OutputTag; > >import sink.Sink2Mysql; > >import utils.DateUtil; > >import utils.DateUtils; > >import utils.JdbcUtil; > > > >import java.sql.Connection; > >import java.sql.PreparedStatement; > >import java.sql.ResultSet; > >import java.time.*; > >import java.util.Date; > >import java.util.HashMap; > >import java.util.Properties; > > > >public class RytLogAnly4 { > >public static void main(String[] args) throws Exception { > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > >//使用侧输出流 > > OutputTag<BaseInfo> requestStream = new > OutputTag<BaseInfo>("requestStream") { > > }; > > OutputTag<BaseInfo> answerStream = new > OutputTag<BaseInfo>("answerStream") { > > }; > > > >//1、连接测试环境kafka的数据 > > String servers = > FlinkConfig.config.getProperty("dev_bootstrap.servers"); > > String topicName = > FlinkConfig.config.getProperty("dev_topicName"); > > String groupId = FlinkConfig.config.getProperty("dev_groupId"); > > String devMode = FlinkConfig.config.getProperty("dev_mode"); > > Properties prop = new Properties(); > > prop.setProperty("bootstrap.servers", servers); > > prop.setProperty("group.id", groupId); > > prop.setProperty("auto.offset.reset", devMode); > > DataStreamSource<String> sourceStream = env.addSource(new > FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), prop)); > >//{"ip":"10.125.8.141",{"data":"请求: -- 14:28:05.395 -- > <315.22604>1D971BEEF23AE63\nAction=686\nMobileCode=18243533656\nReqno=10797698288=22=2=0.2.4596628816=703492175447.712\nCellIndex=0102\nIphoneKey=10797944048\nForward=2\nCfrom=dbzq.iphone\nTFrom=iphone\nGateWayIp=2409:893c:5212:4943:acf2:43c1:3904:253c\nHandleSerialNo=TmuAbK5TAAC9CttSU/3lQGAHAABrBAAAAwACABYAAACuAgAAAAAAAAAAAACuAgAATQAAAAFIAAAAAAFSMAAAADEwNzk3Njk4Mjg4PTIyPTI9MC4yLjQ1OTY2Mjg4MTY9NzAzNDkyMTc1NDQ3LjcxMgFJCwAAADEwNzk3OTQ0MDQ4AA==\nGateWayPort=60696\nnewindex=1\nlinksession=4\nuniqueid=8488717B-B476-4A82-BFD0-9DDBB5151D0A\ntztsno=5DFB64E9E8BF7C67A158C3022E970E0F\nClientVersion=1.01.096\ntztreqfrom=ios.webview\nReqlinkType=2\ntztSDKType=0\n"} > > > > //2、对源数据进行处理,生成baseInfo基类的数据 > > SingleOutputStreamOperator<BaseInfo> baseInfoStream = > sourceStream.map(new MapFunction<String, BaseInfo>() { > >@Override > > public BaseInfo map(String value) throws Exception { > > JSONObject jsonObject = JSON.parseObject(value); > >//获取到不同的服务器IP > > String serverIp = jsonObject.getString("ip"); > >//获取到不同的data的数据 > > String datas = jsonObject.getString("data"); > > > > String[] splits = datas.split("\n"); > > HashMap<String, String> dataMap = new HashMap<>(); > >//将time填充到自定义类型中,用来判断同一个num的请求以及相应时间 > > String time = splits[0].substring(7, 19); > >//将subData填充到自定义类型中,用来判断时请求还是应答 > > String subData = datas.substring(0, 10); > >for (int i = 0; i < splits.length; i++) { > >if (splits[i].contains("=")) { > > splits[i] = splits[i].replaceFirst("=", "&"); > > String[] temp = splits[i].split("&"); > >if (temp.length > 1) { > > dataMap.put(temp[0].toLowerCase(), temp[1]); > > } > > } > > } > >return new BaseInfo(dataMap.get("action"), serverIp, > DateUtil.string2Long(time), dataMap.get("handleserialno"), subData); > > } > > }); > > > >//3、使用process方法进行baseInfoStream流切割 > > SingleOutputStreamOperator<BaseInfo> tagStream = > baseInfoStream.process(new MyProcessFunction(requestStream, answerStream)); > > > >//4、根据不同的tag进行不同的输出流设定 > > DataStream<BaseInfo> requestDataStream = > tagStream.getSideOutput(requestStream); > > DataStream<BaseInfo> answerDataStream = > tagStream.getSideOutput(answerStream); > > > > requestDataStream.print("requestDataStream"); > > answerDataStream.print("answerDataStream"); > > > >//5、上面的流仅仅只是携带了action编码,没有对应的action中午注释,需要去关联一下MySQL中的表 > > //5.1 先对请求流进行处理 > > SingleOutputStreamOperator<OutInfo> outRequestDataStream = > requestDataStream.map(new MapFunction<BaseInfo, OutInfo>() { > >@Override > > public OutInfo map(BaseInfo value) throws Exception { > >//拿到数据中携带的数字的action > > String actionType = value.getFuncId(); > > System.out.println(actionType); > > String actionName = null; > > Connection connection = null; > > PreparedStatement ps = null; > > > >//根据数据的action去MySQL中查找到对应的中午注释 > > try { > > String sql = "select action_name from ActionType > where action = ?"; > > connection = JdbcUtil.getConnection(); > > ps = connection.prepareStatement(sql); > > ps.setString(1, actionType); > > ResultSet resultSet = ps.executeQuery(); > > System.out.println("resultSet是" + resultSet); > >if (resultSet.next()) { > > actionName = resultSet.getString("action_name"); > > } > > } catch (Exception e) { > >throw new RuntimeException(e); > > } finally { > > JdbcUtil.closeResource(connection, ps); > > } > >// return new OutInfo(value.getFuncId(), > value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), > value.getInfo(), actionName,DateUtils.format(new Date())); > > return new OutInfo(value.getFuncId(), > value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), > value.getInfo(), actionName, LocalDateTime.now().toString()); > > } > > }); > > outRequestDataStream.print("outRequestDataStream"); > > > > > >//5.2 对应答流进行处理 > > SingleOutputStreamOperator<OutInfo> outAnswerDataStream = > answerDataStream.map(new MapFunction<BaseInfo, OutInfo>() { > >@Override > > public OutInfo map(BaseInfo value) throws Exception { > >//拿到数据中携带的数字的action > > String actionType = value.getFuncId(); > > System.out.println(actionType); > > String actionName = null; > > Connection connection = null; > > PreparedStatement ps = null; > > > >//根据数据的action去MySQL中查找到对应的中午注释 > > try { > > String sql = "select action_name from ActionType > where action = ?"; > > connection = JdbcUtil.getConnection(); > > ps = connection.prepareStatement(sql); > > ps.setString(1, actionType); > > ResultSet resultSet = ps.executeQuery(); > > System.out.println("resultSet是" + resultSet); > >if (resultSet.next()) { > > actionName = resultSet.getString("action_name"); > > } > > } catch (Exception e) { > >throw new RuntimeException(e); > > } finally { > > JdbcUtil.closeResource(connection, ps); > > } > >// return new OutInfo(value.getFuncId(), > value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), > value.getInfo(), actionName, DateUtils.format(new Date())); > > return new OutInfo(value.getFuncId(), > value.getServerIp(), value.getBaseTime(), value.getHandleSerialNo(), > value.getInfo(), actionName, LocalDateTime.now().toString()); > > } > > }); > > outAnswerDataStream.print("outAnswerDataStream"); > > > >//6、讲两条流转换为对应的表 进行关联取最小值 > > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > >//设置状态值为1天 > > tableEnv.getConfig().setIdleStateRetention(Duration.ofDays(1L)); > >// tableEnv.createTemporaryView("tableRequest", > outRequestDataStream); > >// tableEnv.createTemporaryView("tableAnswer", > outAnswerDataStream); > > Table tableRequest = > tableEnv.fromDataStream(outRequestDataStream, Schema.newBuilder() > > .column("funcId", DataTypes.STRING()) > > .column("serverIp", DataTypes.STRING()) > > .column("outTime", DataTypes.BIGINT()) > > .column("handleSerialNo", DataTypes.STRING()) > > .column("info", DataTypes.STRING()) > > .column("funcIdDesc", DataTypes.STRING()) > > .column("eventTime", DataTypes.TIMESTAMP(3)) > > .watermark("eventTime", "eventTime - INTERVAL '5' SECOND > ") > > .build()); > > Table tableAnswer = > tableEnv.fromDataStream(outAnswerDataStream,Schema.newBuilder() > > .column("funcId", DataTypes.STRING()) > > .column("serverIp", DataTypes.STRING()) > > .column("outTime", DataTypes.BIGINT()) > > .column("handleSerialNo", DataTypes.STRING()) > > .column("info", DataTypes.STRING()) > > .column("funcIdDesc", DataTypes.STRING()) > > .column("eventTime", DataTypes.TIMESTAMP(3)) > > .watermark("eventTime", "eventTime - INTERVAL '5' SECOND > ") > > .build()); > > > > > > Table result = tableEnv.sqlQuery("select \n" + > >"\ta.funcId as funcId ,\n" + > >"\ta.funcIdDesc as funcIdDesc,\n" + > >"\ta.serverIp as serverIp,\n" + > >"\tb.outTime as maxTime,\n" + > >"\ta.outTime as minTime,\t\n" + > >"\tconcat(a.funcId,a.serverIp) as pk ,\n" + > >" a.eventTime as eventTime\n" + > >" from "+ tableRequest +" a\n " + > >" inner join "+ tableAnswer +" b" + > >" on a.handleSerialNo=b.handleSerialNo "); > > System.out.println("这个是resultTable"+result); > > result.printSchema(); > > tableEnv.createTemporaryView("resultTable", result); > > > > DataStream<MidInfo> midStream = tableEnv.toAppendStream(result, > MidInfo.class); > > Table midTable = tableEnv.fromDataStream(midStream, $("funcId"), > $("funcIdDesc"), $("serverIp"), $("maxTime"), $("minTime"), $("pk"), > $("eventTime").rowtime()); > > > > tableEnv.createTemporaryView("midTable",midTable); > > > >//使用TVF的采用渐进式累计窗口进行计算 > > Table resulTable = tableEnv.sqlQuery("SELECT > funcId,funcIdDesc,serverIp,pk,min(maxTime-minTime) as minTime\n" + > >"FROM TABLE(CUMULATE(\n" + > >" TABLE midTable "+ > >" , DESCRIPTOR(eventTime)\n" + > >" , INTERVAL '60' SECOND\n" + > >" , INTERVAL '1' DAY))\n" + > >" GROUP BY window_start,window_end,funcId,funcIdDesc,serverIp,pk"); > > > > > > > > DataStream<Tuple2<Boolean, ResultInfo>> resultStream = > tableEnv.toRetractStream(resulTable, ResultInfo.class); > > resultStream.print("resultStream"); > > SingleOutputStreamOperator<ResultInfo> resultInfoStream = > resultStream.map(new MapFunction<Tuple2<Boolean, ResultInfo>, ResultInfo>() > { > >@Override > > public ResultInfo map(Tuple2<Boolean, ResultInfo> value) > throws Exception { > >return value.f1; > > } > > }); > > resultInfoStream.print("resultInfoStream"); > > resultInfoStream.addSink(new Sink2Mysql()); > > env.execute(); > > } > >} > >| > >你好,以上是我的代码,相关报错如下; > > > > > >| 这个是resultTableUnnamedTable$2 > >( > > `funcId` STRING, > > `funcIdDesc` STRING, > > `serverIp` STRING, > > `maxTime` BIGINT, > > `minTime` BIGINT, > > `pk` STRING, > > `eventTime` TIMESTAMP(3) *ROWTIME* > >) > >/* 1 */public class bean$OutInfo$2$Converter implements > org.apache.flink.table.data.conversion.DataStructureConverter { > >/* 2 */ private final org.apache.flink.table.data.RowData.FieldGetter[] > fieldGetters; > >/* 3 */ private final > org.apache.flink.table.data.conversion.DataStructureConverter[] > fieldConverters; > >/* 4 */ public > bean$OutInfo$2$Converter(org.apache.flink.table.data.RowData.FieldGetter[] > fieldGetters, > org.apache.flink.table.data.conversion.DataStructureConverter[] > fieldConverters) { > >/* 5 */ this.fieldGetters = fieldGetters; > >/* 6 */ this.fieldConverters = fieldConverters; > >/* 7 */ } > >/* 8 */ public java.lang.Object toInternal(java.lang.Object o) { > >/* 9 */ final bean.OutInfo external = (bean.OutInfo) o; > >/* 10 */ final org.apache.flink.table.data.GenericRowData genericRow = > new org.apache.flink.table.data.GenericRowData(7); > >/* 11 */ genericRow.setField(0, > fieldConverters[0].toInternalOrNull(((java.lang.String) > external.getFuncId()))); > >/* 12 */ genericRow.setField(1, > fieldConverters[1].toInternalOrNull(((java.lang.String) > external.getServerIp()))); > >/* 13 */ genericRow.setField(2, > fieldConverters[2].toInternalOrNull(((java.lang.Long) > external.getOutTime()))); > >/* 14 */ genericRow.setField(3, > fieldConverters[3].toInternalOrNull(((java.lang.String) > external.getHandleSerialNo()))); > >/* 15 */ genericRow.setField(4, > fieldConverters[4].toInternalOrNull(((java.lang.String) > external.getInfo()))); > >/* 16 */ genericRow.setField(5, > fieldConverters[5].toInternalOrNull(((java.lang.String) > external.getFuncIdDesc()))); > >/* 17 */ genericRow.setField(6, > fieldConverters[6].toInternalOrNull(((java.time.LocalDateTime) > external.getEventTime()))); > >/* 18 */ return genericRow; > >/* 19 */ } > >/* 20 */ public java.lang.Object toExternal(java.lang.Object o) { > >/* 21 */ final org.apache.flink.table.data.RowData internal = > (org.apache.flink.table.data.RowData) o; > >/* 22 */ final bean.OutInfo structured = new bean.OutInfo(); > >/* 23 */ structured.setFuncId(((java.lang.String) > fieldConverters[0].toExternalOrNull(fieldGetters[0].getFieldOrNull(internal)))); > >/* 24 */ structured.setServerIp(((java.lang.String) > fieldConverters[1].toExternalOrNull(fieldGetters[1].getFieldOrNull(internal)))); > >/* 25 */ structured.setOutTime(((java.lang.Long) > fieldConverters[2].toExternalOrNull(fieldGetters[2].getFieldOrNull(internal)))); > >/* 26 */ structured.setHandleSerialNo(((java.lang.String) > fieldConverters[3].toExternalOrNull(fieldGetters[3].getFieldOrNull(internal)))); > >/* 27 */ structured.setInfo(((java.lang.String) > fieldConverters[4].toExternalOrNull(fieldGetters[4].getFieldOrNull(internal)))); > >/* 28 */ structured.setFuncIdDesc(((java.lang.String) > fieldConverters[5].toExternalOrNull(fieldGetters[5].getFieldOrNull(internal)))); > >/* 29 */ structured.setEventTime(((java.lang.String) > fieldConverters[6].toExternalOrNull(fieldGetters[6].getFieldOrNull(internal)))); > >/* 30 */ return structured; > >/* 31 */ } > >/* 32 */} > > > >/* 1 */public class bean$OutInfo$2$Converter implements > org.apache.flink.table.data.conversion.DataStructureConverter { > >/* 2 */ private final org.apache.flink.table.data.RowData.FieldGetter[] > fieldGetters; > >/* 3 */ private final > org.apache.flink.table.data.conversion.DataStructureConverter[] > fieldConverters; > >/* 4 */ public > bean$OutInfo$2$Converter(org.apache.flink.table.data.RowData.FieldGetter[] > fieldGetters, > org.apache.flink.table.data.conversion.DataStructureConverter[] > fieldConverters) { > >/* 5 */ this.fieldGetters = fieldGetters; > >/* 6 */ this.fieldConverters = fieldConverters; > >/* 7 */ } > >/* 8 */ public java.lang.Object toInternal(java.lang.Object o) { > >/* 9 */ final bean.OutInfo external = (bean.OutInfo) o; > >/* 10 */ final org.apache.flink.table.data.GenericRowData genericRow = > new org.apache.flink.table.data.GenericRowData(7); > >/* 11 */ genericRow.setField(0, > fieldConverters[0].toInternalOrNull(((java.lang.String) > external.getFuncId()))); > >/* 12 */ genericRow.setField(1, > fieldConverters[1].toInternalOrNull(((java.lang.String) > external.getServerIp()))); > >/* 13 */ genericRow.setField(2, > fieldConverters[2].toInternalOrNull(((java.lang.Long) > external.getOutTime()))); > >/* 14 */ genericRow.setField(3, > fieldConverters[3].toInternalOrNull(((java.lang.String) > external.getHandleSerialNo()))); > >/* 15 */ genericRow.setField(4, > fieldConverters[4].toInternalOrNull(((java.lang.String) > external.getInfo()))); > >/* 16 */ genericRow.setField(5, > fieldConverters[5].toInternalOrNull(((java.lang.String) > external.getFuncIdDesc()))); > >/* 17 */ genericRow.setField(6, > fieldConverters[6].toInternalOrNull(((java.time.LocalDateTime) > external.getEventTime()))); > >/* 18 */ return genericRow; > >/* 19 */ } > >/* 20 */ public java.lang.Object toExternal(java.lang.Object o) { > >/* 21 */ final org.apache.flink.table.data.RowData internal = > (org.apache.flink.table.data.RowData) o; > >/* 22 */ final bean.OutInfo structured = new bean.OutInfo(); > >/* 23 */ structured.setFuncId(((java.lang.String) > fieldConverters[0].toExternalOrNull(fieldGetters[0].getFieldOrNull(internal)))); > >/* 24 */ structured.setServerIp(((java.lang.String) > fieldConverters[1].toExternalOrNull(fieldGetters[1].getFieldOrNull(internal)))); > >/* 25 */ structured.setOutTime(((java.lang.Long) > fieldConverters[2].toExternalOrNull(fieldGetters[2].getFieldOrNull(internal)))); > >/* 26 */ structured.setHandleSerialNo(((java.lang.String) > fieldConverters[3].toExternalOrNull(fieldGetters[3].getFieldOrNull(internal)))); > >/* 27 */ structured.setInfo(((java.lang.String) > fieldConverters[4].toExternalOrNull(fieldGetters[4].getFieldOrNull(internal)))); > >/* 28 */ structured.setFuncIdDesc(((java.lang.String) > fieldConverters[5].toExternalOrNull(fieldGetters[5].getFieldOrNull(internal)))); > >/* 29 */ structured.setEventTime(((java.lang.String) > fieldConverters[6].toExternalOrNull(fieldGetters[6].getFieldOrNull(internal)))); > >/* 30 */ return structured; > >/* 31 */ } > >/* 32 */} > > > >Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) > > at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137) > > at > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) > > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) > > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:250) > > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > > at > org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389) > > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93) > > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) > > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92) > > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) > > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) > > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) > > at > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) > > at > org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47) > > at akka.dispatch.OnComplete.internal(Future.scala:300) > > at akka.dispatch.OnComplete.internal(Future.scala:297) > > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224) > > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221) > > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) > > at > org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65) > > at > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) > > at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) > > at > scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) > > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) > > at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621) > > at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24) > > at > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23) > > at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) > > at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) > > at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) > > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) > > at > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63) > > at > akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100) > > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) > > at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81) > > at > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100) > > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) > > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) > > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) > >Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed > by NoRestartBackoffTimeStrategy > > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) > > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) > > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228) > > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218) > > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209) > > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679) > > at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) > > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444) > > at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316) > > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) > > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) > > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) > > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) > > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) > > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > > at akka.actor.Actor.aroundReceive(Actor.scala:537) > > at akka.actor.Actor.aroundReceive$(Actor.scala:535) > > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) > > at akka.actor.ActorCell.invoke(ActorCell.scala:548) > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) > > at akka.dispatch.Mailbox.run(Mailbox.scala:231) > > at akka.dispatch.Mailbox.exec(Mailbox.scala:243) > > ... 4 more > >Caused by: org.apache.flink.table.api.TableException: Error while > generating structured type converter. > > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:89) > > at > org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.open(DataStructureConverterWrapper.java:46) > > at > org.apache.flink.table.runtime.operators.source.InputConversionOperator.open(InputConversionOperator.java:76) > > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711) > > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) > > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > > at java.lang.Thread.run(Thread.java:748) > >Caused by: org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot > be compiled. This is a bug. Please file an issue. > > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:76) > > at > org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:80) > > ... 12 more > >Caused by: > org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot > be compiled. This is a bug. Please file an issue. > > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051) > > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962) > > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859) > > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:74) > > ... 13 more > >Caused by: org.apache.flink.api.common.InvalidProgramException: Table > program cannot be compiled. This is a bug. Please file an issue. > > at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:89) > > at > org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:74) > > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864) > > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) > > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278) > > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) > > at > org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045) > > ... 16 more > >Caused by: org.codehaus.commons.compiler.CompileException: Line 17, > Column 103: Cannot cast "java.lang.String" to "java.time.LocalDateTime" > > at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211) > > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5051) > > at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215) > > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4418) > > at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4396) > > at org.codehaus.janino.Java$Cast.accept(Java.java:4898) > > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) > > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5057) > > at org.codehaus.janino.UnitCompiler.access$8100(UnitCompiler.java:215) > > at > org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4409) > > at > org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4400) > > at > org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:4924) > > at > org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4400) > > at > org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4396) > > at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148) > > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) > > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662) > > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182) > > at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) > > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423) > > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396) > > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073) > > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) > > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662) > > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182) > > at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) > > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423) > > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396) > > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073) > > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) > > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662) > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3783) > > at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215) > > at > org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3762) > > at > org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3734) > > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073) > > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3734) > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) > > at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) > > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494) > > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487) > > at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2874) > > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) > > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) > > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) > > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) > > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) > > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) > > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) > > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) > > at > org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) > > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) > > at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) > > at > org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) > > at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:86) > > ... 22 more > > > >Process finished with exit code 1 > > > >| > > > > > >| | > >小昌同学 > >| > >| > >ccc0606fight...@163.com > >| > >---- 回复的原邮件 ---- > >| 发件人 | lxk<lxk7...@163.com> | > >| 发送日期 | 2023年5月15日 18:21 | > >| 收件人 | <user-zh@flink.apache.org> | > >| 主题 | Re:报错显示为bug | > >你好,可以把相关代码贴上来吗,方便大家进行分析。如果使用sql的话还可以把执行计划贴上来。 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >在 2023-05-15 17:11:42,"小昌同学" <ccc0606fight...@163.com> 写道: > >各位老师,请教一下我在使用table API进行编程的时候,报错信息为”Caused by: > org.apache.flink.api.common.InvalidProgramException: Table program cannot > be compiled. This is a bug. Please file an issue. “ > >flink使用版本为1.14,请问一下有相关社区的技术人员可以进行对接吗,还是怎么操作 > > > > > >| | > >小昌同学 > >| > >| > >ccc0606fight...@163.com > >| >