2020-06-16 21:01:09,756 INFO  
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser  
- Kafka version: unknown
2020-06-16 21:01:09,757 INFO  
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser  
- Kafka commitId: unknown
2020-06-16 21:01:09,758 INFO  
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer 
 - [Consumer clientId=consumer-7, groupId=null] Subscribed to partition(s): 
user_behavior-0
2020-06-16 21:01:09,765 INFO  
org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata  - Cluster 
ID: cAT_xBISQNWghT9kR5UuIw
2020-06-16 21:01:09,766 WARN  
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig 
 - The configuration 'zookeeper.connect' was supplied but isn't a known config.
2020-06-16 21:01:09,766 INFO  
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser  
- Kafka version: unknown
2020-06-16 21:01:09,767 INFO  
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser  
- Kafka commitId: unknown
2020-06-16 21:01:09,768 INFO  
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.Fetcher 
 - [Consumer clientId=consumer-7, groupId=null] Resetting offset for partition 
user_behavior-0 to offset 43545.
2020-06-16 21:01:35,904 INFO  
org.apache.flink.addons.hbase.HBaseLookupFunction        
     - start close ...
2020-06-16 21:01:35,906 INFO  
org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient        
    - Close zookeeper connection 0x72d39885 to 
cdh1:2181,cdh2:2181,cdh3:2181
2020-06-16 21:01:35,908 INFO  
org.apache.flink.addons.hbase.HBaseLookupFunction        
     - end close.
2020-06-16 21:01:35,908 INFO  org.apache.zookeeper.ZooKeeper    
                      
      - Session: 0x172b776fac80ae4 closed
2020-06-16 21:01:35,909 INFO  org.apache.zookeeper.ClientCnxn    
                      
     - EventThread shut down
2020-06-16 21:01:35,911 INFO  
org.apache.flink.runtime.taskmanager.Task          
           - Source: KafkaTableSource(uid, 
phoneType, clickCount, time) -> 
SourceConversion(table=[default_catalog.default_database.user_behavior, source: 
[KafkaTableSource(uid, phoneType, clickCount, time)]], fields=[uid, phoneType, 
clickCount, time]) -> Calc(select=[uid, time]) -> 
LookupJoin(table=[HBaseTableSource[schema=[rowkey, cf], projectFields=null]], 
joinType=[InnerJoin], async=[false], lookup=[rowkey=uid], select=[uid, time, 
rowkey, cf]) -> Calc(select=[CAST(time) AS time, cf.age AS age]) -> 
SinkConversionToTuple2 -> Sink: JDBCUpsertTableSink(time, age) (1/2) 
(e45989f173dc35aefc52413349db7f30) switched from RUNNING to FAILED.
java.lang.IllegalArgumentException: offset (0) + length (4) exceed the capacity 
of the array: 2
        at 
org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:838)
        at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:1004)
        at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:980)
        at 
org.apache.flink.addons.hbase.util.HBaseTypeUtils.deserializeToObject(HBaseTypeUtils.java:55)
        at 
org.apache.flink.addons.hbase.util.HBaseReadWriteHelper.parseToRow(HBaseReadWriteHelper.java:158)
        at 
org.apache.flink.addons.hbase.HBaseLookupFunction.eval(HBaseLookupFunction.java:78)
        at LookupFunction$12.flatMap(Unknown Source)
        at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82)
        at 
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36)
        at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
        at StreamExecCalc$7.processElement(Unknown Source)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
        at SourceConversion$6.processElement(Unknown Source)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
        at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
        at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
        at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)



Query??
val hConf = HBaseConfiguration.create()
hConf.set(HConstants.ZOOKEEPER_QUORUM, "cdh1:2181,cdh2:2181,cdh3:2181")
hConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase")

val users = new HBaseTableSource(hConf, "user_hbase5")
users.setRowKey("rowkey", classOf[String]) // currency as the primary key
users.addColumn("cf", "age", classOf[Integer])

streamTableEnv.registerTableSource("users", users)


streamTableEnv.sqlUpdate(
  """
    |
    |insert into  time_age
    |SELECT
    |  cast(b.`time` as string) as `time`,  u.cf.age
    |FROM
    |  (select * , PROCTIME() AS proctime from user_behavior) AS b
    |  JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
    |  ON b.uid = u.rowkey
    |
    |""".stripMargin)
offset (0) + length (4) exceed the capacity of the array: 2
???????????? ?????? hbase????????int??????
??users.addColumn("cf", "age", classOf[Integer]) ??????????????????
??????int??????Integer????????Integer??????int




------------------ ???????? ------------------
??????:&nbsp;"libenchao"<[email protected]&gt;;
????????:&nbsp;2020??6??16??(??????) ????7:56
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;Re: Re: flink sql read hbase sink mysql data type not match



????????cast??ROW????????????????????????????????????????????????`.`????????
???????????????????????? SELECT rowkey, cf.age FROM users

Zhou Zach <[email protected]&gt; ??2020??6??16?????? ????6:59??????

&gt; flink sql ??????ROW<`age` INT&amp;gt;??????INT??
&gt;
&gt;
&gt; streamTableEnv.sqlUpdate(
&gt;&nbsp;&nbsp; """
&gt;&nbsp;&nbsp;&nbsp;&nbsp; |
&gt;&nbsp;&nbsp;&nbsp;&nbsp; |insert into&nbsp; user_age
&gt;&nbsp;&nbsp;&nbsp;&nbsp; |SELECT rowkey, cast(cf as int) as age
&gt;&nbsp;&nbsp;&nbsp;&nbsp; |FROM
&gt;&nbsp;&nbsp;&nbsp;&nbsp; |&nbsp; users
&gt;&nbsp;&nbsp;&nbsp;&nbsp; |
&gt;&nbsp;&nbsp;&nbsp;&nbsp; |""".stripMargin)??????????????

回复