Hi Fanbin,

One approach would be to ingest the field as a VARCHAR / String and
implement a Scalar UDF to convert it into a nested tuple.
The UDF could use the code of the flink-json module.

AFAIK, there is some work on the way to add built-in JSON functions.

Best, Fabian

Am Do., 24. Okt. 2019 um 10:03 Uhr schrieb Fanbin Bu <fanbin...@coinbase.com
>:

> Looks like SnowflakeColumnMetadata treats VARIANT as VARCHAR
>
> case VARIANT:
>   colType = Types.VARCHAR;
>   extColTypeName = "VARIANT";
>   break;
>
> and SnowflakeResultSet just return the string of the field
>
> switch(type)
> {
>   case Types.VARCHAR:
>   case Types.CHAR:
>     return getString(columnIndex);
>
> What would be the best way to handle this on Flink side?
>
>
>
> On Thu, Oct 24, 2019 at 12:36 AM Fanbin Bu <fanbin...@coinbase.com> wrote:
>
>> Hi there,
>>
>> Flink Version: 1.8.1
>> JDBC driver: net.snowflake.client.jdbc.SnowflakeDriver
>>
>> Here is the code snippet:
>>
>> val rowTypeInfo = new RowTypeInfo(
>>       Array[TypeInformation[_]](
>>         new RowTypeInfo(
>>           Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO,
>> BasicTypeInfo.STRING_TYPE_INFO),
>>           Array[String]("a", "b")
>>         )
>>       ),
>>       Array[String]("fieldName")
>>     )
>> val inputFormat = buildInputFormat(query, rowTypeInfo)
>> env.createInput(inputFormat)
>>
>> my snowflake table data looks like this (fieldName has type VARIANT)
>>
>> fieldName
>> --------------
>> {
>> "a": "1",
>> "b": "2"
>> }
>>
>> I got err msg:
>> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast
>> to org.apache.flink.types.Row
>>
>>
>> Looks like the record I got from snowflake is a string. The error
>> prevents me from doing something like
>> sqlQuery("select fieldName.a from table")
>>
>> Any help is appreciated!
>>
>> Thanks,
>> Fanbin
>>
>

Reply via email to