嗯,直接在DDL里面用WATERMARK语法来定义就可以,你这个定义我感觉是没有问题的。
claylin <[email protected]> 于2020年5月26日周二 上午10:07写道: > 这个问题我想问下,通过row解析出来的每条记录的event time怎么定义,是直接在每条记录里面定义吗 > > > create table my_source ( > database varchar, > maxwell_ts bigint, > table varchar, > data row< > transaction_sn varchar, > parent_id int, > user_id int, > amount int, > reference_id varchar, > status int, > transaction_type int, > merchant_id int, > update_time int, > create_time int > ts AS CAST(FROM_UNIXTIME(create_time) AS > TIMESTAMP(3)), // 定义事件时间 > WATERMARK FOR ts AS ts - INTERVAL '61' MINUTE > > > ) with ( > ... > ) > > > 这样可以行吗 > > > ------------------ 原始邮件 ------------------ > 发件人: "Benchao Li"<[email protected]>; > 发送时间: 2020年5月26日(星期二) 上午9:55 > 收件人: "user-zh"<[email protected]>; > > 主题: Re: Flink SQL 嵌套 nested Json 解析 > > > > Hi, > > 你可以尝试一下直接用DDL来定义source和format。比如你的数据的话,大概的DDL 类似于下面这样子: > create table my_source ( > database varchar, > maxwell_ts bigint, > table varchar, > data row< > transaction_sn varchar, > parent_id int, > user_id int, > amount int, > reference_id varchar, > status int, > transaction_type int, > merchant_id int, > update_time int, > create_time int > > > ) with ( > ... > ) > > macia kk <[email protected]> 于2020年5月26日周二 上午9:36写道: > > > Flink version: 1.10 > > > > Json: > > > > { > > "database":"main_db", > > "maxwell_ts":1590416550358000, > > "table":"transaction_tab", > > "data":{ > > > "transaction_sn":"8888", > > "parent_id":0, > > "user_id":333, > > "amount":555, > > "reference_id":"666", > > "status":3, > > "transaction_type":3, > > "merchant_id":2, > > > "update_time":1590416550, > > > "create_time":1590416550 > > }} > > > > > > 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame > > > > > > macia kk <[email protected]> 于2020年5月26日周二 上午9:34写道: > > > > > Flink version: 1.10 > > > > > > Json: > > > ```j > > > { > > > "database":"main_db", > > > "maxwell_ts":1590416550358000, > > > "table":"transaction_tab", > > > "data":{ > > > > "transaction_sn":"8888", > > > "parent_id":0, > > > "user_id":333, > > > "amount":555, > > > > "reference_id":"666", > > > "status":3, > > > > "transaction_type":3, > > > "merchant_id":2, > > > > "update_time":1590416550, > > > > "create_time":1590416550 > > > } > > > } > > > ``` > > > > > > 我看文档里说,嵌套的json需要使用 jsonSchema 来定义Sechame > > > > > > > > > > > > Leonard Xu <[email protected]> 于2020年5月26日周二 上午8:58写道: > > > > > >> Hi, kk > > >> > > >> 使用的flink版本是多少?1.10可以不用声明format的,方便贴下一条json数据吗?我可以看看 > > >> > > >> > > >> 祝好, > > >> Leonard Xu > > >> > > >> > > >> > 在 2020年5月26日,01:26,macia kk <[email protected]> 写道: > > >> > > > >> > 有哪位大佬帮我看下,谢谢 > > >> > > > >> > > > >> > 尝试了很久,还是无法解析嵌套结构的Json > > >> > > > >> > Error > > >> > > > >> > Caused by: > org.apache.flink.table.api.ValidationException: SQL > > >> > validation failed. From line 4, column 9 to line 4, > column 31: Column > > >> > 'data.transaction_type' not found in any table > > >> > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > > >> > > > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:130) > > >> > at > > >> > > > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) > > >> > at > > >> > > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:127) > > >> > at > > >> > > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342) > > >> > at > > >> > > > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142) > > >> > at > > >> > > > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > > >> > at > > >> > > > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) > > >> > at > > >> > > > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.createPipeline(AirpayV3Flink.scala:133) > > >> > at > > >> > > > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink$.main(AirpayV3Flink.scala:39) > > >> > at > > >> > > > com.shopee.data.ordermart.airpay_v3.AirpayV3Flink.main(AirpayV3Flink.scala) > > >> > at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > >> > at > > >> > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > >> > at > > >> > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > >> > at > java.lang.reflect.Method.invoke(Method.java:498) > > >> > at > > >> > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321) > > >> > > > >> > > > >> > 嵌套Json 定义的 format 和 schema 如下: > > >> > > > >> > .withFormat(new Json() > > >> > .jsonSchema( > > >> > > """{type: 'object', > > >> > > | > properties: { > > >> > > > | database: { > > >> > > > | type: 'string' > > >> > > > | }, > > >> > > > | table: { > > >> > > > | type: 'string' > > >> > > > | }, > > >> > > > | maxwell_ts: { > > >> > > > | type: 'integer' > > >> > > > | }, > > >> > > > | data: { > > >> > > > | type: 'object', > > >> > > > | properties :{ > > >> > > > | > reference_id :{ > > >> > > > | > type: 'string' > > >> > > > | > }, > > >> > > > | > transaction_type :{ > > >> > > > | > type: 'integer' > > >> > > > | > }, > > >> > > > | > merchant_id :{ > > >> > > > | > type: 'integer' > > >> > > > | > }, > > >> > > > | > create_time :{ > > >> > > > | > type: 'integer' > > >> > > > | > }, > > >> > > > | > status :{ > > >> > > > | > type: 'integer' > > >> > > > | > } > > >> > > > | } > > >> > > > | } > > >> > > > | } > > >> > > | } > > >> > > """.stripMargin.replaceAll("\n", " ") > > >> > ) > > >> > ) > > >> > .withSchema(new Schema() > > >> > > .field("table", STRING()) > > >> > > .field("database", STRING()) > > >> > > .field("data", ROW(FIELD("reference_id",STRING()), > > >> > FIELD("transaction_type",INT()), > FIELD("merchant_id",INT()), > > >> > FIELD("status",INT()))) > > >> > > //.field("event_time", BIGINT()) > > >> > // > .from("maxwell_ts") > > >> > > //.rowtime(new Rowtime() > > >> > // > //.timestampsFromField("ts" * 1000) > > >> > // > .timestampsFromField("ts") > > >> > // > .watermarksPeriodicBounded(60000) > > >> > //) > > >> > ) > > >> > > > >> > > > >> > bsTableEnv.sqlUpdate("""INSERT INTO > yyyyy > > >> > > > | SELECT `table`, `database` > > >> > > > | `data.reference_id`, > > >> > > > | `data.transaction_type`, > > >> > > > | `data.merchant_id`, > > >> > > > | `data.create_time`, > > >> > > > | `data.status` > > >> > > > | FROM xxxx""".stripMargin) > > >> > > >> > > > > > -- > > Best, > Benchao Li -- Best, Benchao Li
