任务流程:
OGG->KAFKA->FLINK->HIVE
KAFKA数据样例:
其中会有多个
"table",所以"before","after"中的字段是不一致的,同一个表如果有有DDL变更也会导致"before","after"字段的变更。
{
"table": "SCOOT.TABLENAME",
"op_type": "U",
"op_ts": "2020-08-11 07:53:40.008001",
"current_ts": "2020-08-11T15:56:41.233000",
"pos": "00000000980119769930",
"before": {
"C1": 4499000,
"C2": null,
"C3": null,
"C4": null,
"C5": null
},
"after": {
"C1": 4499000,
"C2": null,
"C3": "0000",
"C4": "0000",
"C5": "通过"
}
}
问题:有没有优雅的方式在入到hive中可以跟源库表及结构一致?
看到很多FLINK to HIVE 的案例,很多大公司也都在用实时数仓,不知入hive这部分如果做到灵活,拓展,通用的?
例如 样例数据在hive中建表
create table TABLENAME
(
op_type STRING,
op_ts STRING,
current_ts STRING,
pos STRING,
"C1" STRING,
"C2" STRING,
"C3" STRING,
"C4" STRING,
"C5" STRING
)
理解的难点,
1.同一FLINK任务需要写入多个表,每个表的字段是不一致的
2.同一FLINK任务会有新增的表,需自动适配
3.同一个表结构不是固定的,需要随着源库DDL变更而变更,可能的字段类型长度变更,新增删除字段等
或者只能采用通过表结构
create table TABLENAME
(
table STRING,
op_type STRING,
op_ts STRING,
current_ts STRING,
pos STRING,
"before" STRING,
"after" STRING
)
然后剩下的在HIVE中解决。
或者有其他更好的方案?