Re: How to use Hbase Connector Sink
Hi, The stack trace indicates that your query schema does not match with your sink schema. It seems that `active_ratio*25 score` in your query is a double value, not a `ROW` you declared in your sink. op <520075...@qq.com> 于2020年6月11日周四 下午3:31写道: > hi > flink1.10,wen i want to sink data to hbase table like this: > > bstEnv.sqlUpdate("""CREATE TABLE circle_weight ( >rowkey String, >info ROW > ) WITH ( >'connector.type' = 'hbase', >'connector.version' = '1.4.3', >'connector.table-name' = 'ms:test_circle_info', >'connector.zookeeper.quorum' = 'localhost:2181', >'connector.zookeeper.znode.parent' = > '/hbase-secure', >'connector.write.buffer-flush.max-size' = > '10mb', >'connector.write.buffer-flush.max-rows' = > '1000', >'connector.write.buffer-flush.interval' = '2s' > )""") > > bstEnv.sqlUpdate( > """ > |insert into circle_weight > |select > |concat_ws('_',circleName,dt) rowkey, > |active_ratio*25 score > |from tb""") > > but i get following exceptions,can anybody tell me what is wrong? > > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Field types of query result and registered TableSink > default_catalog.default_database.circle_weight do not match. > Query schema: [rowkey: STRING, score: DOUBLE] > Sink schema: [rowkey: STRING, info: ROW<`score` DOUBLE>] > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:198) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > at > com.souhu.msns.huyou.test.table.sql.CircleWeightRank$.main(CircleWeightRank.scala:170) > at > com.souhu.msns.huyou.test.table.sql.CircleWeightRank.main(CircleWeightRank.scala) >
Re: How to use Hbase Connector Sink
hi, you should make sure the types of the selected fields and the types of sink table are the same, otherwise you will get the above exception. you can change `active_ratio*25 score` to row type, just like: insert into circle_weight select rowkey, ROW(info) from ( select concat_ws('_',circleName,dt) rowkey, active_ratio*25 score as info from tb) t; Best, Godfrey op <520075...@qq.com> 于2020年6月11日周四 下午3:31写道: > hi > flink1.10,wen i want to sink data to hbase table like this: > > bstEnv.sqlUpdate("""CREATE TABLE circle_weight ( >rowkey String, >info ROW > ) WITH ( >'connector.type' = 'hbase', >'connector.version' = '1.4.3', >'connector.table-name' = 'ms:test_circle_info', >'connector.zookeeper.quorum' = 'localhost:2181', >'connector.zookeeper.znode.parent' = > '/hbase-secure', >'connector.write.buffer-flush.max-size' = > '10mb', >'connector.write.buffer-flush.max-rows' = > '1000', >'connector.write.buffer-flush.interval' = '2s' > )""") > > bstEnv.sqlUpdate( > """ > |insert into circle_weight > |select > |concat_ws('_',circleName,dt) rowkey, > |active_ratio*25 score > |from tb""") > > but i get following exceptions,can anybody tell me what is wrong? > > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Field types of query result and registered TableSink > default_catalog.default_database.circle_weight do not match. > Query schema: [rowkey: STRING, score: DOUBLE] > Sink schema: [rowkey: STRING, info: ROW<`score` DOUBLE>] > at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:198) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190) > at scala.Option.map(Option.scala:146) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495) > at > com.souhu.msns.huyou.test.table.sql.CircleWeightRank$.main(CircleWeightRank.scala:170) > at > com.souhu.msns.huyou.test.table.sql.CircleWeightRank.main(CircleWeightRank.scala) >
How to use Hbase Connector Sink
hi flink1.10??wen i want to sink data to hbase table like this?? bstEnv.sqlUpdate("""CREATE TABLE circle_weight ( rowkey String, info ROW