Re: HELP!!! Flink1.10 sql insert into HBase, error like validateSchemaAndApplyImplicitCast

2020-07-16 Thread Danny Chan
I suspect there are some inconsistency in the nullability of the whole record 
field, can you compare the 2 schema and see the diff ? For a table, you can get 
the TableSchema first and print it out.

Best,
Danny Chan
在 2020年7月16日 +0800 AM10:56,Leonard Xu ,写道:
> Hi, Jim
>
> Could you post error message in text that contains the entire schema of query 
> and sink? I doubt there are some  fields type were mismatched.
>
> Best,
> Leonard Xu
>
> > 在 2020年7月16日,10:29,Jim Chen  写道:
> >
> > Hi,
> >   I use flink1.10.1 sql to connect hbase1.4.3. When inset into hbase, 
> > report an error like validateSchemaAndApplyImplicitCast. Means that the 
> > Query Schema and Sink Schema are inconsistent.
> >   Mainly Row (EXPR$0) in Query Schema, which are all expressions. Sink 
> > Schema is Row(device_id). I don't know how to write in sql to be consistent 
> > with hbase's sink schema.
> >   I try to write sql like select device_id as rowkey, ROW( device_id as 
> > [cannot write as]  ) as f1
> >
> > error message as follow:
> > 
> >
> > sample code like:
> > HBase sink ddl:
> > String ddlSource = "CREATE TABLE 
> > test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping (\n" +
> >                 "  rowkey STRING,\n" +
> >                 "  f1 ROW< \n" +
> >                 "        device_id STRING,\n" +
> >                 "        pass_id STRING,\n" +
> >                 "        first_date STRING,\n" +
> >                 "        first_channel_id STRING,\n" +
> >                 "        first_app_version STRING,\n" +
> >                 "        first_server_time STRING,\n" +
> >                 "        first_server_hour STRING,\n" +
> >                 "        first_ip_location STRING,\n" +
> >                 "        first_login_time STRING,\n" +
> >                 "        sys_can_uninstall STRING,\n" +
> >                 "        update_date STRING,\n" +
> >                 "        server_time BIGINT,\n" +
> >                 "        last_pass_id STRING,\n" +
> >                 "        last_channel_id STRING,\n" +
> >                 "        last_app_version STRING,\n" +
> >                 "        last_date STRING,\n" +
> >                 "        os STRING,\n" +
> >                 "        attribution_channel_id STRING,\n" +
> >                 "        attribution_first_date STRING,\n" +
> >                 "        p_product STRING,\n" +
> >                 "        p_project STRING,\n" +
> >                 "        p_dt STRING\n" +
> >                 "        >\n" +
> >                 ") WITH (\n" +
> >                 "  'connector.type' = 'hbase',\n" +
> >                 "  'connector.version' = '1.4.3',\n" + // 
> > 即使绕过语法编译,换其他版本的hbase,还是有问题,如线上的版本就不行
> >                 "  'connector.table-name' = 
> > 'dw_common_mobile_device_user_mapping_new',\n" +
> >                 "  'connector.zookeeper.quorum' = '"+ zookeeperServers 
> > +"',\n" +
> >                 "  'connector.zookeeper.znode.parent' = '/hbase143',\n" +
> >                 "  'connector.write.buffer-flush.max-size' = '2mb',\n" +
> >                 "  'connector.write.buffer-flush.max-rows' = '1000',\n" +
> >                 "  'connector.write.buffer-flush.interval' = '2s'\n" +
> >                 ")";
> >
> > insert into sql:
> >
> > String bodyAndLocalSql = "" +
> > //                "insert into 
> > test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping " +
> >                 "SELECT CAST(rowkey AS STRING) AS rowkey, " +
> >                 " ROW(" +
> >                 " device_id, pass_id, first_date, first_channel_id, 
> > first_app_version, first_server_time, first_server_hour, first_ip_location, 
> > first_login_time, sys_can_uninstall, update_date, server_time, 
> > last_pass_id, last_channel_id, last_app_version, last_date, os, 
> > attribution_channel_id, attribution_first_date, p_product, p_project, p_dt 
> > " +
> >                 ") AS f1" +
> >                 " FROM " +
> >                 "(" +
> >                 " SELECT " +
> >                 " MD5(CONCAT_WS('|', kafka.uid, kafka.p_product, 
> > kafka.p_project)) AS rowkey, " +
> >                 " kafka.uid AS device_id " +
> >                 ",kafka.pass_id " +
> >
> >                 // first_date
> >                 ",CASE WHEN COALESCE(hbase.server_time, 0) <= 
> > kafka.server_time " +
> >                 // 新用户
> >                 " THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd') " +
> >                 // 老用户
> >                 " ELSE hbase.first_date END AS first_date " +
> >
> >                 // first_channel_id
> >                 ",CASE WHEN COALESCE(hbase.server_time, 0) <= 
> > kafka.server_time " +
> >                 // 新用户
> >                 " THEN kafka.wlb_channel_id" +
> >                 // 老用户
> >                 " ELSE hbase.first_channel_id END AS first_channel_id " +
> >
> >                 // first_app_version
> >                 ",CASE WHEN COALESCE(hbase.server_time, 0) <= 
> > 

Re: HELP!!! Flink1.10 sql insert into HBase, error like validateSchemaAndApplyImplicitCast

2020-07-15 Thread Leonard Xu
Hi, Jim

Could you post error message in text that contains the entire schema of query 
and sink? I doubt there are some  fields type were mismatched.

Best,
Leonard Xu

> 在 2020年7月16日,10:29,Jim Chen  写道:
> 
> Hi,
>   I use flink1.10.1 sql to connect hbase1.4.3. When inset into hbase, report 
> an error like validateSchemaAndApplyImplicitCast. Means that the Query Schema 
> and Sink Schema are inconsistent.
>   Mainly Row (EXPR$0) in Query Schema, which are all expressions. Sink Schema 
> is Row(device_id). I don't know how to write in sql to be consistent with 
> hbase's sink schema.
>   I try to write sql like select device_id as rowkey, ROW( device_id as 
> [cannot write as]  ) as f1
> 
> error message as follow:
> 
> 
> sample code like:
> HBase sink ddl:
> String ddlSource = "CREATE TABLE 
> test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping (\n" +
> "  rowkey STRING,\n" +
> "  f1 ROW< \n" +
> "device_id STRING,\n" +
> "pass_id STRING,\n" +
> "first_date STRING,\n" +
> "first_channel_id STRING,\n" +
> "first_app_version STRING,\n" +
> "first_server_time STRING,\n" +
> "first_server_hour STRING,\n" +
> "first_ip_location STRING,\n" +
> "first_login_time STRING,\n" +
> "sys_can_uninstall STRING,\n" +
> "update_date STRING,\n" +
> "server_time BIGINT,\n" +
> "last_pass_id STRING,\n" +
> "last_channel_id STRING,\n" +
> "last_app_version STRING,\n" +
> "last_date STRING,\n" +
> "os STRING,\n" +
> "attribution_channel_id STRING,\n" +
> "attribution_first_date STRING,\n" +
> "p_product STRING,\n" +
> "p_project STRING,\n" +
> "p_dt STRING\n" +
> ">\n" +
> ") WITH (\n" +
> "  'connector.type' = 'hbase',\n" +
> "  'connector.version' = '1.4.3',\n" + // 
> 即使绕过语法编译,换其他版本的hbase,还是有问题,如线上的版本就不行
> "  'connector.table-name' = 
> 'dw_common_mobile_device_user_mapping_new',\n" +
> "  'connector.zookeeper.quorum' = '"+ zookeeperServers 
> +"',\n" +
> "  'connector.zookeeper.znode.parent' = '/hbase143',\n" +
> "  'connector.write.buffer-flush.max-size' = '2mb',\n" +
> "  'connector.write.buffer-flush.max-rows' = '1000',\n" +
> "  'connector.write.buffer-flush.interval' = '2s'\n" +
> ")";
> 
> insert into sql:
> 
> String bodyAndLocalSql = "" +
> //"insert into 
> test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping " +
> "SELECT CAST(rowkey AS STRING) AS rowkey, " +
> " ROW(" +
> " device_id, pass_id, first_date, first_channel_id, 
> first_app_version, first_server_time, first_server_hour, first_ip_location, 
> first_login_time, sys_can_uninstall, update_date, server_time, last_pass_id, 
> last_channel_id, last_app_version, last_date, os, attribution_channel_id, 
> attribution_first_date, p_product, p_project, p_dt " +
> ") AS f1" +
> " FROM " +
> "(" +
> " SELECT " +
> " MD5(CONCAT_WS('|', kafka.uid, kafka.p_product, 
> kafka.p_project)) AS rowkey, " +
> " kafka.uid AS device_id " +
> ",kafka.pass_id " +
> 
> // first_date
> ",CASE WHEN COALESCE(hbase.server_time, 0) <= 
> kafka.server_time " +
> // 新用户
> " THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd') " +
> // 老用户
> " ELSE hbase.first_date END AS first_date " +
> 
> // first_channel_id
> ",CASE WHEN COALESCE(hbase.server_time, 0) <= 
> kafka.server_time " +
> // 新用户
> " THEN kafka.wlb_channel_id" +
> // 老用户
> " ELSE hbase.first_channel_id END AS first_channel_id " +
> 
> // first_app_version
> ",CASE WHEN COALESCE(hbase.server_time, 0) <= 
> kafka.server_time " +
> // 新用户
> " THEN kafka.app_version " +
> // 老用户
> " ELSE hbase.first_app_version END AS first_app_version " +
> 
> // first_server_time
> ",CASE WHEN COALESCE(hbase.server_time, 0) <= 
> kafka.server_time " +
> // 新用户
> " THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd 
> HH:mm:ss') " +
> // 老用户
>

HELP!!! Flink1.10 sql insert into HBase, error like validateSchemaAndApplyImplicitCast

2020-07-15 Thread Jim Chen
Hi,
  I use flink1.10.1 sql to connect hbase1.4.3. When inset into hbase,
report an error like validateSchemaAndApplyImplicitCast. Means that the
Query Schema and Sink Schema are inconsistent.
  Mainly Row (EXPR$0) in Query Schema, which are all expressions. Sink
Schema is Row(device_id). I don't know how to write in sql to be consistent
with hbase's sink schema.
  I try to write sql like select device_id as rowkey, ROW( device_id as
[cannot write as]  ) as f1

error message as follow:
[image: image.png]

sample code like:
HBase sink ddl:
String ddlSource = "CREATE TABLE
test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping (\n" +
"  rowkey STRING,\n" +
"  f1 ROW< \n" +
"device_id STRING,\n" +
"pass_id STRING,\n" +
"first_date STRING,\n" +
"first_channel_id STRING,\n" +
"first_app_version STRING,\n" +
"first_server_time STRING,\n" +
"first_server_hour STRING,\n" +
"first_ip_location STRING,\n" +
"first_login_time STRING,\n" +
"sys_can_uninstall STRING,\n" +
"update_date STRING,\n" +
"server_time BIGINT,\n" +
"last_pass_id STRING,\n" +
"last_channel_id STRING,\n" +
"last_app_version STRING,\n" +
"last_date STRING,\n" +
"os STRING,\n" +
"attribution_channel_id STRING,\n" +
"attribution_first_date STRING,\n" +
"p_product STRING,\n" +
"p_project STRING,\n" +
"p_dt STRING\n" +
">\n" +
") WITH (\n" +
"  'connector.type' = 'hbase',\n" +
"  'connector.version' = '1.4.3',\n" + //
即使绕过语法编译,换其他版本的hbase,还是有问题,如线上的版本就不行
"  'connector.table-name' =
'dw_common_mobile_device_user_mapping_new',\n" +
"  'connector.zookeeper.quorum' = '"+ zookeeperServers
+"',\n" +
"  'connector.zookeeper.znode.parent' = '/hbase143',\n" +
"  'connector.write.buffer-flush.max-size' = '2mb',\n" +
"  'connector.write.buffer-flush.max-rows' = '1000',\n" +
"  'connector.write.buffer-flush.interval' = '2s'\n" +
")";

insert into sql:

String bodyAndLocalSql = "" +
//"insert into
test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping " +
"SELECT CAST(rowkey AS STRING) AS rowkey, " +
" ROW(" +
" device_id, pass_id, first_date, first_channel_id,
first_app_version, first_server_time, first_server_hour, first_ip_location,
first_login_time, sys_can_uninstall, update_date, server_time,
last_pass_id, last_channel_id, last_app_version, last_date, os,
attribution_channel_id, attribution_first_date, p_product, p_project, p_dt
" +
") AS f1" +
" FROM " +
"(" +
" SELECT " +
" MD5(CONCAT_WS('|', kafka.uid, kafka.p_product,
kafka.p_project)) AS rowkey, " +
" kafka.uid AS device_id " +
",kafka.pass_id " +

// first_date
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd') " +
// 老用户
" ELSE hbase.first_date END AS first_date " +

// first_channel_id
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN kafka.wlb_channel_id" +
// 老用户
" ELSE hbase.first_channel_id END AS first_channel_id " +

// first_app_version
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN kafka.app_version " +
// 老用户
" ELSE hbase.first_app_version END AS first_app_version " +

// first_server_time
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd
HH:mm:ss') " +
// 老用户
" ELSE hbase.first_server_time END AS first_server_time " +

// first_server_hour
",CASE WHEN COALESCE(hbase.server_time, 0) <=
kafka.server_time " +
// 新用户
" THEN FROM_UNIXTIME(kafka.server_time, 'HH') " +
// 老用户
" ELSE hbase.first_server_hour END AS first_server_hour " +

// first_ip_location