任务流程:
OGG->KAFKA->FLINK->HIVE


KAFKA数据样例:
其中会有多个 
"table",所以"before","after"中的字段是不一致的,同一个表如果有有DDL变更也会导致"before","after"字段的变更。
{
    "table": "SCOOT.TABLENAME",
    "op_type": "U",
    "op_ts": "2020-08-11 07:53:40.008001",
    "current_ts": "2020-08-11T15:56:41.233000",
    "pos": "00000000980119769930",
    "before": {
        "C1": 4499000,
        "C2": null,
        "C3": null,
        "C4": null,
        "C5": null
    },
    "after": {
        "C1": 4499000,
        "C2": null,
        "C3": "0000",
        "C4": "0000",
        "C5": "通过"
    }
}
问题:有没有优雅的方式在入到hive中可以跟源库表及结构一致?
看到很多FLINK to HIVE 的案例,很多大公司也都在用实时数仓,不知入hive这部分如果做到灵活,拓展,通用的?


例如 样例数据在hive中建表
create table TABLENAME
(
        op_type      STRING,
        op_ts          STRING,
        current_ts   STRING,
        pos             STRING,
        "C1" STRING,
        "C2" STRING,
        "C3" STRING,
        "C4" STRING,
        "C5" STRING
)
理解的难点,
1.同一FLINK任务需要写入多个表,每个表的字段是不一致的
2.同一FLINK任务会有新增的表,需自动适配
3.同一个表结构不是固定的,需要随着源库DDL变更而变更,可能的字段类型长度变更,新增删除字段等


或者只能采用通过表结构
create table TABLENAME
(
        table           STRING,
        op_type      STRING,
        op_ts          STRING,
        current_ts   STRING,
        pos             STRING,
        "before"      STRING,
        "after"         STRING
)
然后剩下的在HIVE中解决。


或者有其他更好的方案?

回复