Hi:
该问题有进一步的进展了。
我把cdc对应的表在创建表时设置了primary key,然后该问题解决了。
现在有一点弄不明白,在查找primary key的时候,是去查找source表的primary key吗?
源码位置(org.apache.flink.table.planner.plan.utils.UpdatingPlanChecker#getUniqueKeyForUpsertSink)如下:
def getUniqueKeyForUpsertSink(
sinkNode: LegacySink,
planner: PlannerBase,
sink: UpsertStreamTableSink[_]): Option[Array[String]] = {
// extract unique key fields
// Now we pick shortest one to sink
// TODO UpsertStreamTableSink setKeyFields interface should be
Array[Array[String]]
val sinkFieldNames = sink.getTableSchema.getFieldNames
/** Extracts the unique keys of the table produced by the plan. */
val fmq = FlinkRelMetadataQuery.reuseOrCreate(
planner.getRelBuilder.getCluster.getMetadataQuery)
val uniqueKeys = fmq.getUniqueKeys(sinkNode.getInput) // 此处是查找source的primary
key吗?
if (uniqueKeys != null && uniqueKeys.size() > 0) {
uniqueKeys
.filter(_.nonEmpty)
.map(_.toArray.map(sinkFieldNames))
.toSeq
.sortBy(_.length)
.headOption
} else {
None
}
}
flink 版本为1.12.0。
在1.13.0上也有出现。
请知道的大佬告知。
祝好!
automths
On 05/14/2021 11:00,automths<[email protected]> wrote:
Hi:
我自定义了一个UpsertStreamTableSink,在接入cdc数据向我的这个Sink写数据的时候,抛出了下面的异常:
Exception in thread "main" org.apache.flink.table.api.TableException:
UpsertStreamTableSink requires that Table has a full primary keys if it is
updated.
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
我在构建tableSchema的时候,已经设置了primary key了,但依旧抛出这个错误。
我的flink版本是flink-1.12.0的。
请教一下,这个问题,该怎么解决?
祝好!
automths