flink 版本是 1.9.1 release
Doc
完整不太好给因为涉及到业务信息了,抱歉,但可以给一个片段,这就是一个普通Pojo,里面只有一层,所有类型都是基础类型(及衍生)+String,大约
30 多个字段,我理解这跟字段数关系不大
```
import org.apache.commons.lang3.builder.ToStringBuilder;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
/**
* @author liaoxu Date: 2020/1/13 Time: 12:03 下午.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class Doc {
private String suv;
private Float factor = 1F;
private String st;
private String agentId;
private Long timestamp;
... // omit some, omit getters and setters
```
希望有帮助,或者您可以在钉钉联系我(钉钉号 ib1x1zy)
JingsongLee <[email protected]> 于2020年1月14日周二 上午11:25写道:
> Hi Kevin,
>
> 这是什么版本?
> Doc类能完整提供下吗?方便我们复现。
>
> Best,
> Jingsong Lee
>
>
> ------------------------------------------------------------------
> From:Kevin Liao <[email protected]>
> Send Time:2020年1月13日(星期一) 17:37
> To:user-zh <[email protected]>
> Subject:blink planner的org.apache.flink.table.api.ValidationException报错
>
> tEnv.connect(new Kafka()
> .version("universal")
> .topic("xxx")
> .startFromLatest()
> .property("bootstrap.servers",
> "xxxx")
> .property("group.id", "xxxx"))
> .withFormat(new Json().failOnMissingField(false).deriveSchema())
> .withSchema(new Schema()
> // .field("logger_name", Types.STRING)
> // .field("host", Types.STRING)
> // .field("@timestamp", Types.SQL_TIMESTAMP)
> // .field("_rowtime", Types.SQL_TIMESTAMP)
> // .rowtime(
> // new
>
> Rowtime().timestampsFromField("@timestamp").watermarksPeriodicBounded(1000))
> .field("doc", Types.POJO(Doc.class))
> )
> .inAppendMode()
> .registerTableSource("xxx");
>
> Table result = tEnv.sqlQuery(
> "SELECT doc.xxx1, doc.xxx2, ... , doc.xxxN as seq FROM xxx");
>
> // result.printSchema();
> tEnv.toAppendStream(result,
> new TupleTypeInfo<>(STRING, STRING, STRING, STRING, STRING,
> STRING, STRING, STRING,
> STRING, STRING, BOOLEAN, LONG, STRING, STRING, STRING,
> STRING, STRING, STRING,
> STRING, LONG, STRING, INT, STRING, INT)).print();
>
>
>
> 以上代码在 flink planner 下可以运行,但切换到 blink planner 则报错如下:
>
>
> 、、、
>
> Exception in thread "main"
> org.apache.flink.table.api.ValidationException: Type
> LEGACY(PojoType<com.sogou.qidian.Doc, fields = [(。。。。// 省略)]>) of
> table field 'doc' does not match with type
> PojoType<com.sogou.qidian.Doc, fields = [(。。。。//省略)]> of the field
> 'doc' of the TableSource return type.
> at
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:121)
> at
> org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$4.apply(TableSourceUtil.scala:92)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
> at
> org.apache.flink.table.planner.sources.TableSourceUtil$.computeIndexMapping(TableSourceUtil.scala:92)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:100)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:55)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:86)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:46)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlan(StreamExecCalc.scala:46)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:185)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:154)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:319)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:227)
> at com.sogou.qidian.BatchJob.main(BatchJob.java:83)
>
> Execution failed for task ':BatchJob.main()'.
> > Process 'command
> '/Users/liaoxu/.sdkman/candidates/java/current/bin/java'' finished with
> non-zero exit value 1
>
>
> 、、、
>
>
> 仔细比对了报错日志里两个 Doc类型是相同的
>
>
> 谢谢
>