????????????????????????????????????????????override getResultType????
public TypeInformation<?> getResultType(Class<?>[] signature) {
return Types.SQL_TIMESTAMP;
}
------------------ ???????? ------------------
??????: "JingsongLee"<[email protected]>;
????????: 2019??9??5??(??????) ????2:09
??????: "????"<[email protected]>;"user-zh"<[email protected]>;
????: Re: Re: flink1.9??blinkSQL??????udf??TIMESTAMP????????
override getResultType??????????Types.SQL_TIMESTAMP.
??????????????????
1.10????????????????
Best,
Jingsong Lee
------------------------------------------------------------------
From:???? <[email protected]>
Send Time:2019??9??5??(??????) 12:11
To:[email protected] JingsongLee <[email protected]>; user-zh
<[email protected]>
Subject:?????? Re: flink1.9??blinkSQL??????udf??TIMESTAMP????????
????????DataType??????????????????????????????????udf??????????
import org.apache.flink.table.functions.ScalarFunction;
import java.sql.Timestamp;
public class UTC2Local extends ScalarFunction {
public Timestamp eval(Timestamp s) {
long timestamp = s.getTime() +
28800000;
return new Timestamp(timestamp);
}
}
------------------ ???????? ------------------
??????: "JingsongLee"<[email protected]>;
????????: 2019??9??5??(??????) ????11:55
??????: "user-zh"<[email protected]>;
????: Re: flink1.9??blinkSQL??????udf??TIMESTAMP????????
????????DataType??????????????????
??????????????????<=3????????????DataTypes.TIMESTAMP(3)????????
Best,
Jingsong Lee
------------------------------------------------------------------
From:???? <[email protected]>
Send Time:2019??9??5??(??????) 11:48
To:user-zh <[email protected]>
Subject:flink1.9??blinkSQL??????udf??TIMESTAMP????????
??????????????????
??????????flink1.9????????flinkSQL????udf????????????????????????blinkSQL????????udf????????TIMESTAMP??????????udf??????????????????????????????+8??????????????????
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Not support dataType: TIMESTAMP(9)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
&nbsp; &nbsp; &nbsp; &nbsp; at
java.security.AccessController.doPrivileged(Native Method)
&nbsp; &nbsp; &nbsp; &nbsp; at
javax.security.auth.Subject.doAs(Subject.java:422)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: java.lang.RuntimeException: Not support dataType: TIMESTAMP(9)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.table.dataformat.DataFormatConverters.getConverterForDataType(DataFormatConverters.java:248)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.table.planner.codegen.CodeGenUtils$.isConverterIdentity(CodeGenUtils.scala:661)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:669)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:665)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternalIfNeeded(CodeGenUtils.scala:687)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:79)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:737)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:451)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
&nbsp; &nbsp; &nbsp; &nbsp; at
org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
&nbsp; &nbsp; &nbsp; &nbsp; at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&nbsp; &nbsp; &nbsp; &nbsp; at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
&nbsp; &nbsp; &nbsp; &nbsp; at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
&nbsp; &nbsp; &nbsp; &nbsp; at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
&nbsp; &nbsp; &nbsp; &nbsp; at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
&nbsp; &nbsp; &nbsp; &nbsp; at
scala.collection.AbstractTraversable.map(Traversable.scala:104)