Re: JDBCInputFormat does not support json type

2019-10-25 Thread Fabian Hueske
Hi Fanbin,

One approach would be to ingest the field as a VARCHAR / String and
implement a Scalar UDF to convert it into a nested tuple.
The UDF could use the code of the flink-json module.

AFAIK, there is some work on the way to add built-in JSON functions.

Best, Fabian

Am Do., 24. Okt. 2019 um 10:03 Uhr schrieb Fanbin Bu :

> Looks like SnowflakeColumnMetadata treats VARIANT as VARCHAR
>
> case VARIANT:
>   colType = Types.VARCHAR;
>   extColTypeName = "VARIANT";
>   break;
>
> and SnowflakeResultSet just return the string of the field
>
> switch(type)
> {
>   case Types.VARCHAR:
>   case Types.CHAR:
> return getString(columnIndex);
>
> What would be the best way to handle this on Flink side?
>
>
>
> On Thu, Oct 24, 2019 at 12:36 AM Fanbin Bu  wrote:
>
>> Hi there,
>>
>> Flink Version: 1.8.1
>> JDBC driver: net.snowflake.client.jdbc.SnowflakeDriver
>>
>> Here is the code snippet:
>>
>> val rowTypeInfo = new RowTypeInfo(
>>   Array[TypeInformation[_]](
>> new RowTypeInfo(
>>   Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO,
>> BasicTypeInfo.STRING_TYPE_INFO),
>>   Array[String]("a", "b")
>> )
>>   ),
>>   Array[String]("fieldName")
>> )
>> val inputFormat = buildInputFormat(query, rowTypeInfo)
>> env.createInput(inputFormat)
>>
>> my snowflake table data looks like this (fieldName has type VARIANT)
>>
>> fieldName
>> --
>> {
>> "a": "1",
>> "b": "2"
>> }
>>
>> I got err msg:
>> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast
>> to org.apache.flink.types.Row
>>
>>
>> Looks like the record I got from snowflake is a string. The error
>> prevents me from doing something like
>> sqlQuery("select fieldName.a from table")
>>
>> Any help is appreciated!
>>
>> Thanks,
>> Fanbin
>>
>


Re: JDBCInputFormat does not support json type

2019-10-24 Thread Fanbin Bu
Looks like SnowflakeColumnMetadata treats VARIANT as VARCHAR

case VARIANT:
  colType = Types.VARCHAR;
  extColTypeName = "VARIANT";
  break;

and SnowflakeResultSet just return the string of the field

switch(type)
{
  case Types.VARCHAR:
  case Types.CHAR:
return getString(columnIndex);

What would be the best way to handle this on Flink side?



On Thu, Oct 24, 2019 at 12:36 AM Fanbin Bu  wrote:

> Hi there,
>
> Flink Version: 1.8.1
> JDBC driver: net.snowflake.client.jdbc.SnowflakeDriver
>
> Here is the code snippet:
>
> val rowTypeInfo = new RowTypeInfo(
>   Array[TypeInformation[_]](
> new RowTypeInfo(
>   Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO),
>   Array[String]("a", "b")
> )
>   ),
>   Array[String]("fieldName")
> )
> val inputFormat = buildInputFormat(query, rowTypeInfo)
> env.createInput(inputFormat)
>
> my snowflake table data looks like this (fieldName has type VARIANT)
>
> fieldName
> --
> {
> "a": "1",
> "b": "2"
> }
>
> I got err msg:
> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast
> to org.apache.flink.types.Row
>
>
> Looks like the record I got from snowflake is a string. The error prevents
> me from doing something like
> sqlQuery("select fieldName.a from table")
>
> Any help is appreciated!
>
> Thanks,
> Fanbin
>


JDBCInputFormat does not support json type

2019-10-24 Thread Fanbin Bu
Hi there,

Flink Version: 1.8.1
JDBC driver: net.snowflake.client.jdbc.SnowflakeDriver

Here is the code snippet:

val rowTypeInfo = new RowTypeInfo(
  Array[TypeInformation[_]](
new RowTypeInfo(
  Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO),
  Array[String]("a", "b")
)
  ),
  Array[String]("fieldName")
)
val inputFormat = buildInputFormat(query, rowTypeInfo)
env.createInput(inputFormat)

my snowflake table data looks like this (fieldName has type VARIANT)

fieldName
--
{
"a": "1",
"b": "2"
}

I got err msg:
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.flink.types.Row


Looks like the record I got from snowflake is a string. The error prevents
me from doing something like
sqlQuery("select fieldName.a from table")

Any help is appreciated!

Thanks,
Fanbin