附件是代码,按照官网写的demo。
不知道哪里有问题,麻烦帮忙看下。


root

 |-- orderId: STRING

 |-- userId: INT

 |-- money: INT

 |-- createTime: BIGINT

 |-- pt: TIMESTAMP(3) *PROCTIME*




17:17:11,935 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            
[] - class org.apache.flink.types.Row is missing a default constructor so it 
cannot be used as a POJO type and must be processed as GenericType. Please read 
the Flink documentation on "Data Types & Serialization" for details of the 
effect on performance.

Exception in thread "main" java.lang.RuntimeException: Unknown call expression: 
count(orderId)

at 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:102)

at 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)

at 
org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:126)

at 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter$1.toRexNode(ExpressionConverter.java:226)

at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)

at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)

at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)

at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)

at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)

at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)

at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)

at 
org.apache.flink.table.planner.expressions.converter.OverConvertRule.convert(OverConvertRule.java:81)

at 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97)

at 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)

at 
org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:126)

at 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter$1.toRexNode(ExpressionConverter.java:226)

at 
org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.convertAs(CustomizedConvertRule.java:251)

at 
org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.lambda$convert$0(CustomizedConvertRule.java:93)

at java.util.Optional.map(Optional.java:215)

at 
org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.convert(CustomizedConvertRule.java:93)

at 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97)

at 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)

at 
org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:126)

at 
org.apache.flink.table.planner.plan.QueryOperationConverter.convertExprToRexNode(QueryOperationConverter.java:734)

at 
org.apache.flink.table.planner.plan.QueryOperationConverter.access$800(QueryOperationConverter.java:129)

at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.lambda$convertToRexNodes$6(QueryOperationConverter.java:540)

at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)

at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)

at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)

at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)

at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)

at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)

at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)

at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.convertToRexNodes(QueryOperationConverter.java:541)

at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:153)

at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:149)

at 
org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)

at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:146)

at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)

at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)

at 
org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)

at 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:186)

at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:250)

at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164)

at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164)

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.Iterator$class.foreach(Iterator.scala:891)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)

at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)

at 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:331)

at 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:307)

at 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:298)

at 
com.guanyq.study.TableAPIAndSQL.TableAPI.Aggregations.DistinctAggregation3.main(DistinctAggregation3.java:68)


package com.guanyq.study.TableAPIAndSQL.TableAPI.Aggregations;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.util.Random;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.table.api.Expressions.*;

/**
 * 新增
 *
 * @author guanyq
 * @date 2021/2/24
 */
public class DistinctAggregation3 {
    public static void main(String[] args) throws Exception {
        // get env
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        // get StreamTableEnvironment.
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
bsSettings);
        DataStreamSource<Order> stream = env.addSource(new 
SourceFunction<Order>() {
            boolean isRunning = true;

            @Override
            public void run(SourceContext<Order> ctx) throws Exception {
                Random random = new Random();
                while (isRunning) {
                    //Order order = new Order(UUID.randomUUID().toString(), 
random.nextInt(3), random.nextInt(101), System.currentTimeMillis());
                    Order order = new Order("0X:" + random.nextInt(3), 0, 
random.nextInt(101), System.currentTimeMillis());
                    System.out.println(order.toString());
                    TimeUnit.SECONDS.sleep(2);
                    ctx.collect(order);
                }
            }
            @Override
            public void cancel() {
                isRunning = false;
            }
        });

        // declare an additional logical field as a processing time attribute
        Table orders = tableEnv.fromDataStream(stream, $("orderId"), 
$("userId"), $("money"), $("createTime"), $("pt").proctime());
        orders.printSchema();

        // Distinct aggregation on over window
        Table result = orders
                .window(Over
                        .partitionBy($("userId"))
                        .orderBy($("pt"))
                        .preceding(UNBOUNDED_RANGE)
                        .following(CURRENT_RANGE)
                        .as("w"))
                .select(
                        $("userId"),
                        $("orderId").count().distinct().over($("w")),
                        $("money").max().over($("w")),
                        $("money").min().over($("w"))
                );
        tableEnv.toRetractStream(result, Row.class).print();

        env.execute("Go");
    }
}

回复