hayden zhou created FLINK-23353:
-----------------------------------

             Summary: UDTAGG can't execute in Batch mode
                 Key: FLINK-23353
                 URL: https://issues.apache.org/jira/browse/FLINK-23353
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.13.1
            Reporter: hayden zhou



{code:java}

public class Top2Test {
    public static void main(String[] args) {

        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode()build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        Table sourceTable = tEnv.fromValues(
                DataTypes.ROW(
                    DataTypes.FIELD("id", DataTypes.INT()),
                    DataTypes.FIELD("name",DataTypes.STRING()),
                    DataTypes.FIELD("price", DataTypes.INT())
                ),
                row(1, "hayden", 18),
                row(3, "hayden", 19),
                row(4, "hayden", 20),
                row(2, "jaylin", 20)
        );

        tEnv.createTemporaryView("source", sourceTable);

        Table rT = tEnv.from("source")
                .groupBy($("name"))
                .flatAggregate(call(Top2.class, $("price")).as("price", "rank"))
                .select($("name"), $("price"), $("rank"));
        rT.execute().print();
    }


    public static class Top2Accumulator {
        public Integer first;
        public Integer second;
    }

    public static class Top2 extends TableAggregateFunction<Tuple2<Integer, 
Integer>, Top2Accumulator> {

        @Override
        public Top2Accumulator createAccumulator() {
            Top2Accumulator acc = new Top2Accumulator();
            acc.first = Integer.MIN_VALUE;
            acc.second = Integer.MIN_VALUE;
            return acc;
        }

        public void accumulate(Top2Accumulator acc, Integer value) {
            if (value > acc.first) {
                acc.second = acc.first;
                acc.first = value;
            } else if (value > acc.second) {
                acc.second = value;
            }
        }

        public void merge(Top2Accumulator acc, Iterable<Top2Accumulator> it) {
            for (Top2Accumulator otherAcc : it) {
                accumulate(acc, otherAcc.first);
                accumulate(acc, otherAcc.second);
            }
        }

        public void emitValue(Top2Accumulator acc, Collector<Tuple2<Integer, 
Integer>> out) {
            if (acc.first != Integer.MIN_VALUE) {
                out.collect(Tuple2.of(acc.first, 1));
            }
            if (acc.second != Integer.MIN_VALUE) {
                out.collect(Tuple2.of(acc.second, 2));
            }
        }
    }

}

{code}

got errors as below:
Exception in thread "main" org.apache.flink.table.api.TableException: Cannot 
generate a valid execution plan for the given query: 

LogicalSink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1],
 fields=[name, price, rank])
+- LogicalProject(name=[AS($0, _UTF-16LE'name')], price=[AS($1, 
_UTF-16LE'price')], rank=[AS($2, _UTF-16LE'rank')])
   +- LogicalTableAggregate(group=[{1}], 
tableAggregate=[[flinktest$Top2Test$Top2$4619034833a29d53c136506047509219($2)]])
      +- LogicalUnion(all=[true])
         :- LogicalProject(id=[CAST(1):INTEGER], 
name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(18):INTEGER])
         :  +- LogicalValues(tuples=[[{ 0 }]])
         :- LogicalProject(id=[CAST(3):INTEGER], 
name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(19):INTEGER])
         :  +- LogicalValues(tuples=[[{ 0 }]])
         :- LogicalProject(id=[CAST(4):INTEGER], 
name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(20):INTEGER])
         :  +- LogicalValues(tuples=[[{ 0 }]])
         +- LogicalProject(id=[CAST(2):INTEGER], 
name=[CAST(_UTF-16LE'jaylin':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(20):INTEGER])
            +- LogicalValues(tuples=[[{ 0 }]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:72)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:87)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:46)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:46)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at 
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:46)
        at 
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:791)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1225)
        at 
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:577)
        at flinktest.Top2Test.main(Top2Test.java:37)
Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are 
not enough rules to produce a node with desired properties: convention=LOGICAL, 
FlinkRelDistributionTraitDef=any, sort=[].
Missing conversion is LogicalTableAggregate[convention: NONE -> LOGICAL]
There is 1 empty subset: rel#436:RelSubset#6.LOGICAL.any.[], the relevant part 
of the original plan is as follows
409:LogicalTableAggregate(group=[{1}], 
tableAggregate=[[flinktest$Top2Test$Top2$4619034833a29d53c136506047509219($2)]])
  407:LogicalUnion(subset=[rel#408:RelSubset#5.NONE.any.[]], all=[true])
    399:LogicalProject(subset=[rel#400:RelSubset#1.NONE.any.[]], 
id=[CAST(1):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(18):INTEGER])
      0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 
}]])
    401:LogicalProject(subset=[rel#402:RelSubset#2.NONE.any.[]], 
id=[CAST(3):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(19):INTEGER])
      0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 
}]])
    403:LogicalProject(subset=[rel#404:RelSubset#3.NONE.any.[]], 
id=[CAST(4):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(20):INTEGER])
      0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 
}]])
    405:LogicalProject(subset=[rel#406:RelSubset#4.NONE.any.[]], 
id=[CAST(2):INTEGER], name=[CAST(_UTF-16LE'jaylin':VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], 
price=[CAST(20):INTEGER])
      0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 
}]])


if delete inBatchMode() method in 

{code:java}
        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
{code}
then it will running normaly




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to