Re:Re: Flink DataTypes json parse exception
Hi, I am using the latest Flink 1.10 rc. When I run the same code using Flink 1.8.2, there is no problem. But using 1.10 the issue just occur. Confused by the related reason. At 2020-02-11 18:33:50, "Timo Walther" wrote: >Hi, > >from which Flink version are you upgrading? There were some changes in >1.9 for how to parse timestamps in JSON format. > >Your error might be related to those changes: > >https://issues.apache.org/jira/browse/FLINK-11727 > >I hope this helps. > >Timo > > >On 07.02.20 07:57, sunfulin wrote: >> Hi, guys >> When upgrading to Flink 1.10 rc0, I am confused by the new DataTypes schema >> defination. >> I am reading and consuming records from kafka with json schema like >> {"user_id":1866071598998,"recv_time":1547011832729}. and the code snippet is >> : >> >> >> >> .withSchema( >> new Schema() >> // eventTime >> .field("rowtime", >> DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)).rowtime( >> new Rowtime() >> .timestampsFromField("recv_time") >> .watermarksPeriodicBounded(1000) >> ) >> .field("user_id", DataTypes.STRING()) >> >> >> >> >> >> >> But, I am running an issue and got exception like the following: >> >> >> Caused by: java.time.format.DateTimeParseException: Text '1549705104542' >> could not be parsed at index 0 >> at >> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) >> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366) >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >> at >> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) >>
Re: Flink DataTypes json parse exception
Hi, from which Flink version are you upgrading? There were some changes in 1.9 for how to parse timestamps in JSON format. Your error might be related to those changes: https://issues.apache.org/jira/browse/FLINK-11727 I hope this helps. Timo On 07.02.20 07:57, sunfulin wrote: Hi, guys When upgrading to Flink 1.10 rc0, I am confused by the new DataTypes schema defination. I am reading and consuming records from kafka with json schema like {"user_id":1866071598998,"recv_time":1547011832729}. and the code snippet is : .withSchema( new Schema() // eventTime .field("rowtime", DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)).rowtime( new Rowtime() .timestampsFromField("recv_time") .watermarksPeriodicBounded(1000) ) .field("user_id", DataTypes.STRING()) But, I am running an issue and got exception like the following: Caused by: java.time.format.DateTimeParseException: Text '1549705104542' could not be parsed at index 0 at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
Flink DataTypes json parse exception
Hi, guys When upgrading to Flink 1.10 rc0, I am confused by the new DataTypes schema defination. I am reading and consuming records from kafka with json schema like {"user_id":1866071598998,"recv_time":1547011832729}. and the code snippet is : .withSchema( new Schema() // eventTime .field("rowtime", DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class)).rowtime( new Rowtime() .timestampsFromField("recv_time") .watermarksPeriodicBounded(1000) ) .field("user_id", DataTypes.STRING()) But, I am running an issue and got exception like the following: Caused by: java.time.format.DateTimeParseException: Text '1549705104542' could not be parsed at index 0 at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)