Hi all,
I'm using Flink 1.3.1 and I'm trying to register an UDF but there's
something wrong.
I always get the following exception:

java.lang.UnsupportedOperationException:
org.apache.flink.table.expressions.TableFunctionCall cannot be transformed
to RexNode
at
org.apache.flink.table.expressions.Expression.toRexNode(Expression.scala:53)
at
org.apache.flink.table.expressions.Alias.toRexNode(fieldExpression.scala:79)
at
org.apache.flink.table.plan.logical.Project$$anonfun$construct$1.apply(operators.scala:94)
at
org.apache.flink.table.plan.logical.Project$$anonfun$construct$1.apply(operators.scala:94)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.flink.table.plan.logical.Project.construct(operators.scala:94)
at
org.apache.flink.table.plan.logical.LogicalNode.toRelNode(LogicalNode.scala:77)
at org.apache.flink.table.api.Table.getRelNode(table.scala:94)
at

-------------------------------------------------
This is my Program:

final ExecutionEnvironment env =
DatalinksExecutionEnvironment.getExecutionEnv();
final BatchTableEnvironment tEnv =
TableEnvironment.getTableEnvironment(env);

DataSet<Row> dataSet = null;
dataSet = env.fromElements("{\"test\":\"val\"}").map(new
MapFunction<String, Row>() {

        @Override
        public Row map(String value) throws Exception {
          Row ret = new Row(1);
          ret.setField(0, value);
          return ret;
        }
      }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO));

tEnv.registerFunction("myFunc", new MyTableFunction());
Table test = tEnv.fromDataSet(dataSet, "field1");
 Table res = test.select("field1,myFunc(recon)");
 dataSet = tEnv.toDataSet(res,
           new RowTypeInfo(
                   BasicTypeInfo.STRING_TYPE_INFO,
                   BasicTypeInfo.STRING_TYPE_INFO));
  dataSet.print();


MyTableFunction is something like:

public class MyTableFunction extends TableFunction<String> {
  public String eval(String str) {
    return "XXX";
  }
}


What I'm doing wrong here?

Thanks in advance,
Flavio

Reply via email to