Hi Fanbin, One approach would be to ingest the field as a VARCHAR / String and implement a Scalar UDF to convert it into a nested tuple. The UDF could use the code of the flink-json module.
AFAIK, there is some work on the way to add built-in JSON functions. Best, Fabian Am Do., 24. Okt. 2019 um 10:03 Uhr schrieb Fanbin Bu <fanbin...@coinbase.com >: > Looks like SnowflakeColumnMetadata treats VARIANT as VARCHAR > > case VARIANT: > colType = Types.VARCHAR; > extColTypeName = "VARIANT"; > break; > > and SnowflakeResultSet just return the string of the field > > switch(type) > { > case Types.VARCHAR: > case Types.CHAR: > return getString(columnIndex); > > What would be the best way to handle this on Flink side? > > > > On Thu, Oct 24, 2019 at 12:36 AM Fanbin Bu <fanbin...@coinbase.com> wrote: > >> Hi there, >> >> Flink Version: 1.8.1 >> JDBC driver: net.snowflake.client.jdbc.SnowflakeDriver >> >> Here is the code snippet: >> >> val rowTypeInfo = new RowTypeInfo( >> Array[TypeInformation[_]]( >> new RowTypeInfo( >> Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, >> BasicTypeInfo.STRING_TYPE_INFO), >> Array[String]("a", "b") >> ) >> ), >> Array[String]("fieldName") >> ) >> val inputFormat = buildInputFormat(query, rowTypeInfo) >> env.createInput(inputFormat) >> >> my snowflake table data looks like this (fieldName has type VARIANT) >> >> fieldName >> -------------- >> { >> "a": "1", >> "b": "2" >> } >> >> I got err msg: >> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast >> to org.apache.flink.types.Row >> >> >> Looks like the record I got from snowflake is a string. The error >> prevents me from doing something like >> sqlQuery("select fieldName.a from table") >> >> Any help is appreciated! >> >> Thanks, >> Fanbin >> >