为什么要用DataStream解析之后再注册成table呢? 可以尝试下直接用DDL声明一个source,用内置的json format来解析。
chuyuan <[email protected]> 于2020年9月21日周一 下午4:44写道: > 我本地依赖的是Flink1.11.1的版本,大概业务是Flink消费kafka中json数据,示例: > { > "properties":{ > "platformType":"APP", > "$os":"iOS", > "$screen_width":414, > "$app_version":"1.0", > "$is_first_day":false, > "$model":"x86_64", > "$device_id":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9", > "imei":"c75fa3ea670bebc01cf08f1750745d65d9f33dd3", > "isLogin":false, > "zrIdfa":"00000000-0000-0000-0000-000000000000", > "$network_type":"WIFI", > "$wifi":true, > "$timezone_offset":-480, > "$resume_from_background":false, > "tdid":"", > "zrIdfv":"7B2363CF-1491-4EFF-A096-9ADD6CCA5BC9", > "$screen_height":896, > "$lib_version":"2.0.10", > "$lib":"iOS", > "$os_version":"13.4.1", > "$manufacturer":"Apple", > "$is_first_time":false, > "$app_id":"Com.ziroom..ZRSensorsSDK" > }, > "type":"track", > "lib":{ > "$lib_version":"2.0.10", > "$lib":"iOS", > "$app_version":"1.0", > "$lib_method":"autoTrack" > } > } > 其中key为lib和properties的value是Json类型,其中字段可动态追加。 > > > 第一步,使用DataStream对象,接收kafka的Json,并实现MapFunction方法把Json串封装为DO,最后转换成DTO(为了把lib和properties的Json值转换为Map<String,String>类型), > @Data > public static class CustomBuriedPointDTO { > /** > * 跟踪ID > */ > private Long track_id; > /** > * 事件时间 > */ > private Long event_time; > > /** > * 类型 > */ > private String type; > /** > * 排重后Id > */ > private String distinct_id; > /** > * 匿名ID > */ > private String anonymous_id; > /** > * 包信息 > */ > private @DataTypeHint("RAW") Map<String, String> lib; > /** > * 事件 > */ > private String event; > /** > * 属性 > */ > // private Map<String, String> properties; > private @DataTypeHint("RAW") Map<String, String> > properties; > /** > * 刷新时间 > */ > private Long flush_time; > /** > * 事件日期 > */ > private String dt; > > > /** > * 封装数据对象中字段信息 > */ > public void assembly(CustomBuriedPointDO pointDO) { > // 复制DO属性到DTO > BeanUtils.copyProperties(pointDO, this); > > /* > 转换特殊字段 > */ > // 设置分区日期 > Long eventTimeLong = pointDO.getEvent_time(); > if (eventTimeLong == null) { > eventTimeLong = System.currentTimeMillis(); > } > Date eventTime = new Date(eventTimeLong); > DateFormat dateFormatDate = new > SimpleDateFormat("yyyy-MM-dd"); > this.setDt(dateFormatDate.format(eventTime)); > > // json字段转换为Map类型 > Map<String, String> propertiesMap = null; > if > (StringUtils.isNotBlank(pointDO.getProperties())) > { > propertiesMap = (Map<String, String>) > JSON.parse(pointDO.getProperties()); > } > this.setProperties(propertiesMap); > Map<String, String> libMap = null; > if (StringUtils.isNotBlank(pointDO.getLib())) { > libMap = (Map<String, String>) > JSON.parse(pointDO.getLib()); > } > this.setLib(libMap); > } > } > > 第二步,把DataStream转成了Hive临时表,最后写入Hive目标表,hive目标表定义如下: > "CREATE TABLE test.test(" + > " type STRING," + > " lib MAP<STRING,STRING>," > + > " properties > MAP<STRING,STRING>" + > ") PARTITIONED BY (" + > " dt string" + > " ) stored as orcfile " + > " TBLPROPERTIES" + > " (" + > > "'partition.time-extractor.kind'='custom'," + > > "'partition.time-extractor.timestamp-pattern'='$dt'," + > > > "'partition.time-extractor.class'='com.ziroom.dataaccess.module.SensorsPartTimeExtractor'," > + > > "'sink.partition-commit.trigger'='partition-time'," + > > "'sink.partition-commit.delay'='0s'," + > > "'sink.partition-commit.policy.kind'='metastore'" + > ")"); > > > 第三步,把临时表的数据insert into到目标表,此时出现异常: > org.apache.flink.table.api.TableException: A raw type backed by type > information has no serializable string representation. It needs to be > resolved into a proper raw type. > > 然后打印临时表的数据结构,发现lib和properties在临时表中数据结构被解析为: > |-- lib: LEGACY('RAW', 'ANY<java.util.Map>') > |-- properties: LEGACY('RAW', 'ANY<java.util.Map>') > |-- track_id: BIGINT > |-- type: STRING > ,这说明lib LEGACY('RAW', 'ANY<java.util.Map>')无法匹配hive目标表中lib > MAP<STRING,STRING>数据结构,写入失败,大概流程是这样。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li
