Hi,

The stack trace indicates that your query schema does not match with your
sink schema. It seems that `active_ratio*25 score` in your query is a
double value, not a `ROW<score double>` you declared in your sink.

op <520075...@qq.com> 于2020年6月11日周四 下午3:31写道:

> hi
> flink1.10,wen i want to sink data to hbase table like this:
>
>  bstEnv.sqlUpdate("""CREATE TABLE circle_weight (
>                            rowkey String,
>                            info ROW<score double>
>                          ) WITH (
>                            'connector.type' = 'hbase',
>                            'connector.version' = '1.4.3',
>                            'connector.table-name' = 'ms:test_circle_info',
>                            'connector.zookeeper.quorum' = 'localhost:2181',
>                            'connector.zookeeper.znode.parent' =
> '/hbase-secure',
>                            'connector.write.buffer-flush.max-size' =
> '10mb',
>                            'connector.write.buffer-flush.max-rows' =
> '1000',
>                            'connector.write.buffer-flush.interval' = '2s'
>                          )""")
>
>     bstEnv.sqlUpdate(
>       """
>         |insert into circle_weight
>         |select
>         |concat_ws('_',circleName,dt) rowkey,
>         |active_ratio*25 score
>         |from tb""")
>
> but i get following exceptions,can anybody tell me what is wrong?
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Field types of query result and registered TableSink
> default_catalog.default_database.circle_weight do not match.
> Query schema: [rowkey: STRING, score: DOUBLE]
> Sink schema: [rowkey: STRING, info: ROW<`score` DOUBLE>]
> at
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:198)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
> at scala.Option.map(Option.scala:146)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> at
> com.souhu.msns.huyou.test.table.sql.CircleWeightRank$.main(CircleWeightRank.scala:170)
> at
> com.souhu.msns.huyou.test.table.sql.CircleWeightRank.main(CircleWeightRank.scala)
>

Reply via email to