hbase中维表:
streamTableEnv.sqlUpdate(
"""
|
|CREATE TABLE user_hbase3(
| rowkey string,
| cf ROW(sex VARCHAR, age INT, created_time TIMESTAMP(3))
|) WITH (
| 'connector.type' = 'hbase',
| 'connector.version' = '2.1.0',
| 'connector.table-name' = 'user_hbase3',
| 'connector.zookeeper.quorum' = 'cdh1:2181,cdh2:2181,cdh3:2181',
| 'connector.zookeeper.znode.parent' = '/hbase',
| 'connector.write.buffer-flush.max-size' = '10mb',
| 'connector.write.buffer-flush.max-rows' = '1000',
| 'connector.write.buffer-flush.interval' = '2s'
|)
|""".stripMargin)
At 2020-06-15 20:19:22, "Zhou Zach" <[email protected]> wrote:
>val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
>val blinkEnvSettings =
>EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>val streamTableEnv = StreamTableEnvironment.create(streamExecutionEnv,
>blinkEnvSettings)
>
>val conf = new Configuration
>val users = new HBaseTableSource(conf, "user_hbase3")
>users.setRowKey("rowkey", classOf[String]) // currency as the primary key
>users.addColumn("cf", "age", classOf[Array[Byte]])
>
>streamTableEnv.registerTableSource("users", users)
>
>
>streamTableEnv.sqlUpdate(
>"""
> |
> |CREATE TABLE user_behavior (
> | uid VARCHAR,
> | phoneType VARCHAR,
> | clickCount INT,
> | `time` TIMESTAMP(3)
> |) WITH (
> | 'connector.type' = 'kafka',
> | 'connector.version' = 'universal',
> | 'connector.topic' = 'user_behavior',
> | 'connector.startup-mode' = 'earliest-offset',
> | 'connector.properties.0.key' = 'zookeeper.connect',
> | 'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
> | 'connector.properties.1.key' = 'bootstrap.servers',
> | 'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
> | 'update-mode' = 'append',
> | 'format.type' = 'json',
> | 'format.derive-schema' = 'true'
> |)
> |""".stripMargin)
>
>streamTableEnv.sqlUpdate(
>"""
> |
> |CREATE TABLE user_cnt (
> | `time` VARCHAR,
> | sum_age INT
> |) WITH (
> | 'connector.type' = 'jdbc',
> | 'connector.url' = 'jdbc:mysql://localhost:3306/dashboard',
> | 'connector.table' = 'user_cnt',
> | 'connector.username' = 'root',
> | 'connector.password' = '123456',
> | 'connector.write.flush.max-rows' = '1'
> |)
> |""".stripMargin)
>
>
>streamTableEnv.sqlUpdate(
>"""
> |
> |insert into user_cnt
> |SELECT
> | cast(b.`time` as string) as `time`, u.age
> |FROM
> | (select * , PROCTIME() AS proctime from user_behavior) AS b
> | JOIN users FOR SYSTEM_TIME AS OF b.`proctime` AS u
> | ON b.uid = u.rowkey
> |
> |""".stripMargin)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-06-15 20:01:16,"Leonard Xu" <[email protected]> 写道:
>>Hi,
>>看起来是你query的 schema 和 table (sink) 的schema 没有对应上,hbase中的数据都是bytes存储,在 flink
>>sql 中一般不需要读取bytes,读取到的数据应该是 FLINK SQL对应的类型,如 int, bigint,string等,方便把你的 SQL
>>贴下吗?
>>
>>祝好,
>>Leonard Xu
>>
>>> 在 2020年6月15日,19:55,Zhou Zach <[email protected]> 写道:
>>>
>>>
>>>
>>>
>>>
>>> Exception in thread "main" org.apache.flink.table.api.ValidationException:
>>> Field types of query result and registered TableSink
>>> default_catalog.default_database.user_cnt do not match.
>>> Query schema: [time: STRING, age: BYTES]
>>> Sink schema: [time: STRING, sum_age: INT]