2020-06-16 21:01:09,756 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
- Kafka version: unknown
2020-06-16 21:01:09,757 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
- Kafka commitId: unknown
2020-06-16 21:01:09,758 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer
- [Consumer clientId=consumer-7, groupId=null] Subscribed to partition(s):
user_behavior-0
2020-06-16 21:01:09,765 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.clients.Metadata - Cluster
ID: cAT_xBISQNWghT9kR5UuIw
2020-06-16 21:01:09,766 WARN
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig
- The configuration 'zookeeper.connect' was supplied but isn't a known config.
2020-06-16 21:01:09,766 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
- Kafka version: unknown
2020-06-16 21:01:09,767 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser
- Kafka commitId: unknown
2020-06-16 21:01:09,768 INFO
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.Fetcher
- [Consumer clientId=consumer-7, groupId=null] Resetting offset for partition
user_behavior-0 to offset 43545.
2020-06-16 21:01:35,904 INFO
org.apache.flink.addons.hbase.HBaseLookupFunction
- start close ...
2020-06-16 21:01:35,906 INFO
org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient
- Close zookeeper connection 0x72d39885 to
cdh1:2181,cdh2:2181,cdh3:2181
2020-06-16 21:01:35,908 INFO
org.apache.flink.addons.hbase.HBaseLookupFunction
- end close.
2020-06-16 21:01:35,908 INFO org.apache.zookeeper.ZooKeeper
- Session: 0x172b776fac80ae4 closed
2020-06-16 21:01:35,909 INFO org.apache.zookeeper.ClientCnxn
- EventThread shut down
2020-06-16 21:01:35,911 INFO
org.apache.flink.runtime.taskmanager.Task
- Source: KafkaTableSource(uid,
phoneType, clickCount, time) ->
SourceConversion(table=[default_catalog.default_database.user_behavior, source:
[KafkaTableSource(uid, phoneType, clickCount, time)]], fields=[uid, phoneType,
clickCount, time]) -> Calc(select=[uid, time]) ->
LookupJoin(table=[HBaseTableSource[schema=[rowkey, cf], projectFields=null]],
joinType=[InnerJoin], async=[false], lookup=[rowkey=uid], select=[uid, time,
rowkey, cf]) -> Calc(select=[CAST(time) AS time, cf.age AS age]) ->
SinkConversionToTuple2 -> Sink: JDBCUpsertTableSink(time, age) (1/2)
(e45989f173dc35aefc52413349db7f30) switched from RUNNING to FAILED.
java.lang.IllegalArgumentException: offset (0) + length (4) exceed the capacity
of the array: 2
at
org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:838)
at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:1004)
at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:980)
at
org.apache.flink.addons.hbase.util.HBaseTypeUtils.deserializeToObject(HBaseTypeUtils.java:55)
at
org.apache.flink.addons.hbase.util.HBaseReadWriteHelper.parseToRow(HBaseReadWriteHelper.java:158)
at
org.apache.flink.addons.hbase.HBaseLookupFunction.eval(HBaseLookupFunction.java:78)
at LookupFunction$12.flatMap(Unknown Source)
at
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:82)
at
org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:36)
at
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at StreamExecCalc$7.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at SourceConversion$6.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
Query??
val hConf = HBaseConfiguration.create()
hConf.set(HConstants.ZOOKEEPER_QUORUM, "cdh1:2181,cdh2:2181,cdh3:2181")
hConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/hbase")
val users = new HBaseTableSource(hConf, "user_hbase5")
users.setRowKey("rowkey", classOf[String]) // currency as the primary key
users.addColumn("cf", "age", classOf[Integer])
streamTableEnv.registerTableSource("users", users)
streamTableEnv.sqlUpdate(
"""
|
|insert into time_age
|SELECT
| cast(b.`time` as string) as `time`, u.cf.age
|FROM
| (select * , PROCTIME() AS proctime from user_behavior) AS b
| JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
| ON b.uid = u.rowkey
|
|""".stripMargin)
offset (0) + length (4) exceed the capacity of the array: 2
???????????? ?????? hbase????????int??????
??users.addColumn("cf", "age", classOf[Integer]) ??????????????????
??????int??????Integer????????Integer??????int
------------------ ???????? ------------------
??????: "libenchao"<[email protected]>;
????????: 2020??6??16??(??????) ????7:56
??????: "user-zh"<[email protected]>;
????: Re: Re: flink sql read hbase sink mysql data type not match
????????cast??ROW????????????????????????????????????????????????`.`????????
???????????????????????? SELECT rowkey, cf.age FROM users
Zhou Zach <[email protected]> ??2020??6??16?????? ????6:59??????
> flink sql ??????ROW<`age` INT&gt;??????INT??
>
>
> streamTableEnv.sqlUpdate(
> """
> |
> |insert into user_age
> |SELECT rowkey, cast(cf as int) as age
> |FROM
> | users
> |
> |""".stripMargin)??????????????