flink版本是1.11.1
创建了一个udf函数如下
public class FromUnixTime extends ScalarFunction {
private static final Logger logger =
LoggerFactory.getLogger(FromUnixTime.class);
public String eval(long unixTime, String timeZone, String format) {
try {
DateFormat dateFormat = new SimpleDateFormat(format);
dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
return dateFormat.format(new Date(unixTime));
} catch (Exception e) {
logger.error("parse unixTime error, unixTime: " + unixTime +
",timeZone: " + timeZone, ",format:" + format, e);
return null;
}
}
public String eval(long unixTime, String timeZone) {
return eval(unixTime, timeZone, "yyyy-MM-dd HH:mm:ss");
}
}
测试执行如下语句是能正常成功的。
<http://apache-flink.147419.n8.nabble.com/file/t572/lALPDiCptXS2iL3NAUbNBPo_1274_326.jpg>
但是创建view报如下错误,是啥原因?
create view xx as
select
*,
UDF_FromUnixTime(exeAt, 'utc', 'yyyy-MM-ddHH:mm' ) as biz_min
from timeout_topic_source;
其中exeAt是个long类型的时间
错误如下:
org.apache.flink.table.api.ValidationException: SQL validation failed.
Invalid function call:
default_catalog.default_database.UDF_FromUnixTime(BIGINT, CHAR(3) NOT NULL,
CHAR(15) NOT NULL)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:148)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCreateView(FlinkSqlInterrpeter.java:343)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:267)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:145)
at
org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:114)
at
org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:110)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:776)
at
org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:668)
at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
at
org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
at
org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.ValidationException: Invalid function
call:
default_catalog.default_database.UDF_FromUnixTime(BIGINT, CHAR(3) NOT NULL,
CHAR(15) NOT NULL)
at
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:207)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:93)
at
org.apache.calcite.sql.SqlOperator.checkOperandTypes(SqlOperator.java:684)
at
org.apache.calcite.sql.SqlOperator.validateOperands(SqlOperator.java:448)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:314)
at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:218)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
at
org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:133)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5858)
at
org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5845)
at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1800)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1785)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:481)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4255)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3523)
at
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
at
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:143)
... 17 more
Caused by: org.apache.flink.table.api.ValidationException: Invalid input
arguments. Expected signatures are:
default_catalog.default_database.UDF_FromUnixTime(unixTime BIGINT NOT NULL,
timeZone STRING)
default_catalog.default_database.UDF_FromUnixTime(unixTime BIGINT NOT NULL,
timeZone STRING, format STRING)
at
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
... 43 more
Caused by: org.apache.flink.table.api.ValidationException: Invalid input
arguments.
at
org.apache.flink.table.types.inference.TypeInferenceUtil.inferInputTypes(TypeInferenceUtil.java:467)
at
org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:123)
at
org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
... 44 more
--
Sent from: http://apache-flink.147419.n8.nabble.com/