Re: How to use Hbase Connector Sink

2020-06-11 Thread Caizhi Weng
Hi,

The stack trace indicates that your query schema does not match with your
sink schema. It seems that `active_ratio*25 score` in your query is a
double value, not a `ROW` you declared in your sink.

op <520075...@qq.com> 于2020年6月11日周四 下午3:31写道:

> hi
> flink1.10,wen i want to sink data to hbase table like this:
>
>  bstEnv.sqlUpdate("""CREATE TABLE circle_weight (
>rowkey String,
>info ROW
>  ) WITH (
>'connector.type' = 'hbase',
>'connector.version' = '1.4.3',
>'connector.table-name' = 'ms:test_circle_info',
>'connector.zookeeper.quorum' = 'localhost:2181',
>'connector.zookeeper.znode.parent' =
> '/hbase-secure',
>'connector.write.buffer-flush.max-size' =
> '10mb',
>'connector.write.buffer-flush.max-rows' =
> '1000',
>'connector.write.buffer-flush.interval' = '2s'
>  )""")
>
> bstEnv.sqlUpdate(
>   """
> |insert into circle_weight
> |select
> |concat_ws('_',circleName,dt) rowkey,
> |active_ratio*25 score
> |from tb""")
>
> but i get following exceptions,can anybody tell me what is wrong?
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Field types of query result and registered TableSink
> default_catalog.default_database.circle_weight do not match.
> Query schema: [rowkey: STRING, score: DOUBLE]
> Sink schema: [rowkey: STRING, info: ROW<`score` DOUBLE>]
> at
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:198)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
> at scala.Option.map(Option.scala:146)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> at
> com.souhu.msns.huyou.test.table.sql.CircleWeightRank$.main(CircleWeightRank.scala:170)
> at
> com.souhu.msns.huyou.test.table.sql.CircleWeightRank.main(CircleWeightRank.scala)
>


Re: How to use Hbase Connector Sink

2020-06-11 Thread godfrey he
hi,

you should make sure the types of the selected fields and the types of sink
table are the same,
otherwise you will get the above exception. you can change `active_ratio*25
score` to row type, just like:

insert into circle_weight select rowkey, ROW(info) from (
select concat_ws('_',circleName,dt) rowkey, active_ratio*25 score as info
from tb) t;


Best,
Godfrey

op <520075...@qq.com> 于2020年6月11日周四 下午3:31写道:

> hi
> flink1.10,wen i want to sink data to hbase table like this:
>
>  bstEnv.sqlUpdate("""CREATE TABLE circle_weight (
>rowkey String,
>info ROW
>  ) WITH (
>'connector.type' = 'hbase',
>'connector.version' = '1.4.3',
>'connector.table-name' = 'ms:test_circle_info',
>'connector.zookeeper.quorum' = 'localhost:2181',
>'connector.zookeeper.znode.parent' =
> '/hbase-secure',
>'connector.write.buffer-flush.max-size' =
> '10mb',
>'connector.write.buffer-flush.max-rows' =
> '1000',
>'connector.write.buffer-flush.interval' = '2s'
>  )""")
>
> bstEnv.sqlUpdate(
>   """
> |insert into circle_weight
> |select
> |concat_ws('_',circleName,dt) rowkey,
> |active_ratio*25 score
> |from tb""")
>
> but i get following exceptions,can anybody tell me what is wrong?
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Field types of query result and registered TableSink
> default_catalog.default_database.circle_weight do not match.
> Query schema: [rowkey: STRING, score: DOUBLE]
> Sink schema: [rowkey: STRING, info: ROW<`score` DOUBLE>]
> at
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:96)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:198)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$2.apply(PlannerBase.scala:190)
> at scala.Option.map(Option.scala:146)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:190)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:150)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:150)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> at
> com.souhu.msns.huyou.test.table.sql.CircleWeightRank$.main(CircleWeightRank.scala:170)
> at
> com.souhu.msns.huyou.test.table.sql.CircleWeightRank.main(CircleWeightRank.scala)
>


How to use Hbase Connector Sink

2020-06-11 Thread op
hi
flink1.10??wen i want to sink data to hbase table like this??


bstEnv.sqlUpdate("""CREATE TABLE circle_weight (
   
  rowkey String,
   
  info ROW