附件是代码,按照官网写的demo。
不知道哪里有问题,麻烦帮忙看下。
root
|-- orderId: STRING
|-- userId: INT
|-- money: INT
|-- createTime: BIGINT
|-- pt: TIMESTAMP(3) *PROCTIME*
17:17:11,935 INFO org.apache.flink.api.java.typeutils.TypeExtractor
[] - class org.apache.flink.types.Row is missing a default constructor so it
cannot be used as a POJO type and must be processed as GenericType. Please read
the Flink documentation on "Data Types & Serialization" for details of the
effect on performance.
Exception in thread "main" java.lang.RuntimeException: Unknown call expression:
count(orderId)
at
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:102)
at
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)
at
org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:126)
at
org.apache.flink.table.planner.expressions.converter.ExpressionConverter$1.toRexNode(ExpressionConverter.java:226)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at
org.apache.flink.table.planner.expressions.converter.OverConvertRule.convert(OverConvertRule.java:81)
at
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97)
at
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)
at
org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:126)
at
org.apache.flink.table.planner.expressions.converter.ExpressionConverter$1.toRexNode(ExpressionConverter.java:226)
at
org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.convertAs(CustomizedConvertRule.java:251)
at
org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.lambda$convert$0(CustomizedConvertRule.java:93)
at java.util.Optional.map(Optional.java:215)
at
org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.convert(CustomizedConvertRule.java:93)
at
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97)
at
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)
at
org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:126)
at
org.apache.flink.table.planner.plan.QueryOperationConverter.convertExprToRexNode(QueryOperationConverter.java:734)
at
org.apache.flink.table.planner.plan.QueryOperationConverter.access$800(QueryOperationConverter.java:129)
at
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.lambda$convertToRexNodes$6(QueryOperationConverter.java:540)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.convertToRexNodes(QueryOperationConverter.java:541)
at
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:153)
at
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:149)
at
org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
at
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:146)
at
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)
at
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)
at
org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
at
org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:186)
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:250)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:331)
at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:307)
at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:298)
at
com.guanyq.study.TableAPIAndSQL.TableAPI.Aggregations.DistinctAggregation3.main(DistinctAggregation3.java:68)
package com.guanyq.study.TableAPIAndSQL.TableAPI.Aggregations;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.table.api.Expressions.*;
/**
* æ°å¢
*
* @author guanyq
* @date 2021/2/24
*/
public class DistinctAggregation3 {
public static void main(String[] args) throws Exception {
// get env
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
// get StreamTableEnvironment.
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
bsSettings);
DataStreamSource<Order> stream = env.addSource(new
SourceFunction<Order>() {
boolean isRunning = true;
@Override
public void run(SourceContext<Order> ctx) throws Exception {
Random random = new Random();
while (isRunning) {
//Order order = new Order(UUID.randomUUID().toString(),
random.nextInt(3), random.nextInt(101), System.currentTimeMillis());
Order order = new Order("0X:" + random.nextInt(3), 0,
random.nextInt(101), System.currentTimeMillis());
System.out.println(order.toString());
TimeUnit.SECONDS.sleep(2);
ctx.collect(order);
}
}
@Override
public void cancel() {
isRunning = false;
}
});
// declare an additional logical field as a processing time attribute
Table orders = tableEnv.fromDataStream(stream, $("orderId"),
$("userId"), $("money"), $("createTime"), $("pt").proctime());
orders.printSchema();
// Distinct aggregation on over window
Table result = orders
.window(Over
.partitionBy($("userId"))
.orderBy($("pt"))
.preceding(UNBOUNDED_RANGE)
.following(CURRENT_RANGE)
.as("w"))
.select(
$("userId"),
$("orderId").count().distinct().over($("w")),
$("money").max().over($("w")),
$("money").min().over($("w"))
);
tableEnv.toRetractStream(result, Row.class).print();
env.execute("Go");
}
}