Hi all
When I use udf, it throws Unable to serialize Exception as follows:
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Unable to serialize object 'UserTableFunction' of class
‘....udtf.UserTableFunction'.
at
org.apache.flink.table.utils.EncodingUtils.encodeObjectToString(EncodingUtils.java:72)
at
org.apache.flink.table.functions.UserDefinedFunction.functionIdentifier(UserDefinedFunction.java:45)
at
org.apache.flink.table.planner.codegen.CodeGenUtils$.udfFieldName(CodeGenUtils.scala:715)
at
org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:615)
My udf as follows.
public class UserTableFunction extends TableFunction<Row> {
private static final long serialVersionUID = 1L;
private HikariCPUtils dbUtils = new HikariCPUtils();
protected Connection connection;
protected PreparedStatement preparedStatement = null;
@Override
public void open(FunctionContext context) throws Exception {
connection = dbUtils.getConnection();
}
@Override
public void close() throws Exception {
if (null != connection)
connection.close();
if (null != preparedStatement)
preparedStatement.close();
}
public void eval(long uid, int countryId) {
...
Row row = new Row(8);
try {
...
collect(row);
} catch (SQLException e) {
e.printStackTrace();
}
}
@Override
public TypeInformation<Row> getResultType() {
return Types.ROW(Types.STRING, Types.STRING, Types.STRING,
Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING);
}
}
[email protected]