Re: HELP!!! Flink1.10 sql insert into HBase, error like validateSchemaAndApplyImplicitCast
I suspect there are some inconsistency in the nullability of the whole record field, can you compare the 2 schema and see the diff ? For a table, you can get the TableSchema first and print it out. Best, Danny Chan 在 2020年7月16日 +0800 AM10:56,Leonard Xu ,写道: > Hi, Jim > > Could you post error message in text that contains the entire schema of query > and sink? I doubt there are some fields type were mismatched. > > Best, > Leonard Xu > > > 在 2020年7月16日,10:29,Jim Chen 写道: > > > > Hi, > > I use flink1.10.1 sql to connect hbase1.4.3. When inset into hbase, > > report an error like validateSchemaAndApplyImplicitCast. Means that the > > Query Schema and Sink Schema are inconsistent. > > Mainly Row (EXPR$0) in Query Schema, which are all expressions. Sink > > Schema is Row(device_id). I don't know how to write in sql to be consistent > > with hbase's sink schema. > > I try to write sql like select device_id as rowkey, ROW( device_id as > > [cannot write as] ) as f1 > > > > error message as follow: > > > > > > sample code like: > > HBase sink ddl: > > String ddlSource = "CREATE TABLE > > test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping (\n" + > > " rowkey STRING,\n" + > > " f1 ROW< \n" + > > " device_id STRING,\n" + > > " pass_id STRING,\n" + > > " first_date STRING,\n" + > > " first_channel_id STRING,\n" + > > " first_app_version STRING,\n" + > > " first_server_time STRING,\n" + > > " first_server_hour STRING,\n" + > > " first_ip_location STRING,\n" + > > " first_login_time STRING,\n" + > > " sys_can_uninstall STRING,\n" + > > " update_date STRING,\n" + > > " server_time BIGINT,\n" + > > " last_pass_id STRING,\n" + > > " last_channel_id STRING,\n" + > > " last_app_version STRING,\n" + > > " last_date STRING,\n" + > > " os STRING,\n" + > > " attribution_channel_id STRING,\n" + > > " attribution_first_date STRING,\n" + > > " p_product STRING,\n" + > > " p_project STRING,\n" + > > " p_dt STRING\n" + > > " >\n" + > > ") WITH (\n" + > > " 'connector.type' = 'hbase',\n" + > > " 'connector.version' = '1.4.3',\n" + // > > 即使绕过语法编译,换其他版本的hbase,还是有问题,如线上的版本就不行 > > " 'connector.table-name' = > > 'dw_common_mobile_device_user_mapping_new',\n" + > > " 'connector.zookeeper.quorum' = '"+ zookeeperServers > > +"',\n" + > > " 'connector.zookeeper.znode.parent' = '/hbase143',\n" + > > " 'connector.write.buffer-flush.max-size' = '2mb',\n" + > > " 'connector.write.buffer-flush.max-rows' = '1000',\n" + > > " 'connector.write.buffer-flush.interval' = '2s'\n" + > > ")"; > > > > insert into sql: > > > > String bodyAndLocalSql = "" + > > // "insert into > > test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping " + > > "SELECT CAST(rowkey AS STRING) AS rowkey, " + > > " ROW(" + > > " device_id, pass_id, first_date, first_channel_id, > > first_app_version, first_server_time, first_server_hour, first_ip_location, > > first_login_time, sys_can_uninstall, update_date, server_time, > > last_pass_id, last_channel_id, last_app_version, last_date, os, > > attribution_channel_id, attribution_first_date, p_product, p_project, p_dt > > " + > > ") AS f1" + > > " FROM " + > > "(" + > > " SELECT " + > > " MD5(CONCAT_WS('|', kafka.uid, kafka.p_product, > > kafka.p_project)) AS rowkey, " + > > " kafka.uid AS device_id " + > > ",kafka.pass_id " + > > > > // first_date > > ",CASE WHEN COALESCE(hbase.server_time, 0) <= > > kafka.server_time " + > > // 新用户 > > " THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd') " + > > // 老用户 > > " ELSE hbase.first_date END AS first_date " + > > > > // first_channel_id > > ",CASE WHEN COALESCE(hbase.server_time, 0) <= > > kafka.server_time " + > > // 新用户 > > " THEN kafka.wlb_channel_id" + > > // 老用户 > > " ELSE hbase.first_channel_id END AS first_channel_id " + > > > > // first_app_version > > ",CASE WHEN COALESCE(hbase.server_time, 0) <= > >
Re: HELP!!! Flink1.10 sql insert into HBase, error like validateSchemaAndApplyImplicitCast
Hi, Jim Could you post error message in text that contains the entire schema of query and sink? I doubt there are some fields type were mismatched. Best, Leonard Xu > 在 2020年7月16日,10:29,Jim Chen 写道: > > Hi, > I use flink1.10.1 sql to connect hbase1.4.3. When inset into hbase, report > an error like validateSchemaAndApplyImplicitCast. Means that the Query Schema > and Sink Schema are inconsistent. > Mainly Row (EXPR$0) in Query Schema, which are all expressions. Sink Schema > is Row(device_id). I don't know how to write in sql to be consistent with > hbase's sink schema. > I try to write sql like select device_id as rowkey, ROW( device_id as > [cannot write as] ) as f1 > > error message as follow: > > > sample code like: > HBase sink ddl: > String ddlSource = "CREATE TABLE > test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping (\n" + > " rowkey STRING,\n" + > " f1 ROW< \n" + > "device_id STRING,\n" + > "pass_id STRING,\n" + > "first_date STRING,\n" + > "first_channel_id STRING,\n" + > "first_app_version STRING,\n" + > "first_server_time STRING,\n" + > "first_server_hour STRING,\n" + > "first_ip_location STRING,\n" + > "first_login_time STRING,\n" + > "sys_can_uninstall STRING,\n" + > "update_date STRING,\n" + > "server_time BIGINT,\n" + > "last_pass_id STRING,\n" + > "last_channel_id STRING,\n" + > "last_app_version STRING,\n" + > "last_date STRING,\n" + > "os STRING,\n" + > "attribution_channel_id STRING,\n" + > "attribution_first_date STRING,\n" + > "p_product STRING,\n" + > "p_project STRING,\n" + > "p_dt STRING\n" + > ">\n" + > ") WITH (\n" + > " 'connector.type' = 'hbase',\n" + > " 'connector.version' = '1.4.3',\n" + // > 即使绕过语法编译,换其他版本的hbase,还是有问题,如线上的版本就不行 > " 'connector.table-name' = > 'dw_common_mobile_device_user_mapping_new',\n" + > " 'connector.zookeeper.quorum' = '"+ zookeeperServers > +"',\n" + > " 'connector.zookeeper.znode.parent' = '/hbase143',\n" + > " 'connector.write.buffer-flush.max-size' = '2mb',\n" + > " 'connector.write.buffer-flush.max-rows' = '1000',\n" + > " 'connector.write.buffer-flush.interval' = '2s'\n" + > ")"; > > insert into sql: > > String bodyAndLocalSql = "" + > //"insert into > test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping " + > "SELECT CAST(rowkey AS STRING) AS rowkey, " + > " ROW(" + > " device_id, pass_id, first_date, first_channel_id, > first_app_version, first_server_time, first_server_hour, first_ip_location, > first_login_time, sys_can_uninstall, update_date, server_time, last_pass_id, > last_channel_id, last_app_version, last_date, os, attribution_channel_id, > attribution_first_date, p_product, p_project, p_dt " + > ") AS f1" + > " FROM " + > "(" + > " SELECT " + > " MD5(CONCAT_WS('|', kafka.uid, kafka.p_product, > kafka.p_project)) AS rowkey, " + > " kafka.uid AS device_id " + > ",kafka.pass_id " + > > // first_date > ",CASE WHEN COALESCE(hbase.server_time, 0) <= > kafka.server_time " + > // 新用户 > " THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd') " + > // 老用户 > " ELSE hbase.first_date END AS first_date " + > > // first_channel_id > ",CASE WHEN COALESCE(hbase.server_time, 0) <= > kafka.server_time " + > // 新用户 > " THEN kafka.wlb_channel_id" + > // 老用户 > " ELSE hbase.first_channel_id END AS first_channel_id " + > > // first_app_version > ",CASE WHEN COALESCE(hbase.server_time, 0) <= > kafka.server_time " + > // 新用户 > " THEN kafka.app_version " + > // 老用户 > " ELSE hbase.first_app_version END AS first_app_version " + > > // first_server_time > ",CASE WHEN COALESCE(hbase.server_time, 0) <= > kafka.server_time " + > // 新用户 > " THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd > HH:mm:ss') " + > // 老用户 >
HELP!!! Flink1.10 sql insert into HBase, error like validateSchemaAndApplyImplicitCast
Hi, I use flink1.10.1 sql to connect hbase1.4.3. When inset into hbase, report an error like validateSchemaAndApplyImplicitCast. Means that the Query Schema and Sink Schema are inconsistent. Mainly Row (EXPR$0) in Query Schema, which are all expressions. Sink Schema is Row(device_id). I don't know how to write in sql to be consistent with hbase's sink schema. I try to write sql like select device_id as rowkey, ROW( device_id as [cannot write as] ) as f1 error message as follow: [image: image.png] sample code like: HBase sink ddl: String ddlSource = "CREATE TABLE test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping (\n" + " rowkey STRING,\n" + " f1 ROW< \n" + "device_id STRING,\n" + "pass_id STRING,\n" + "first_date STRING,\n" + "first_channel_id STRING,\n" + "first_app_version STRING,\n" + "first_server_time STRING,\n" + "first_server_hour STRING,\n" + "first_ip_location STRING,\n" + "first_login_time STRING,\n" + "sys_can_uninstall STRING,\n" + "update_date STRING,\n" + "server_time BIGINT,\n" + "last_pass_id STRING,\n" + "last_channel_id STRING,\n" + "last_app_version STRING,\n" + "last_date STRING,\n" + "os STRING,\n" + "attribution_channel_id STRING,\n" + "attribution_first_date STRING,\n" + "p_product STRING,\n" + "p_project STRING,\n" + "p_dt STRING\n" + ">\n" + ") WITH (\n" + " 'connector.type' = 'hbase',\n" + " 'connector.version' = '1.4.3',\n" + // 即使绕过语法编译,换其他版本的hbase,还是有问题,如线上的版本就不行 " 'connector.table-name' = 'dw_common_mobile_device_user_mapping_new',\n" + " 'connector.zookeeper.quorum' = '"+ zookeeperServers +"',\n" + " 'connector.zookeeper.znode.parent' = '/hbase143',\n" + " 'connector.write.buffer-flush.max-size' = '2mb',\n" + " 'connector.write.buffer-flush.max-rows' = '1000',\n" + " 'connector.write.buffer-flush.interval' = '2s'\n" + ")"; insert into sql: String bodyAndLocalSql = "" + //"insert into test_hive_catalog.test_dim.dw_common_mobile_device_user_mapping " + "SELECT CAST(rowkey AS STRING) AS rowkey, " + " ROW(" + " device_id, pass_id, first_date, first_channel_id, first_app_version, first_server_time, first_server_hour, first_ip_location, first_login_time, sys_can_uninstall, update_date, server_time, last_pass_id, last_channel_id, last_app_version, last_date, os, attribution_channel_id, attribution_first_date, p_product, p_project, p_dt " + ") AS f1" + " FROM " + "(" + " SELECT " + " MD5(CONCAT_WS('|', kafka.uid, kafka.p_product, kafka.p_project)) AS rowkey, " + " kafka.uid AS device_id " + ",kafka.pass_id " + // first_date ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " + // 新用户 " THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd') " + // 老用户 " ELSE hbase.first_date END AS first_date " + // first_channel_id ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " + // 新用户 " THEN kafka.wlb_channel_id" + // 老用户 " ELSE hbase.first_channel_id END AS first_channel_id " + // first_app_version ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " + // 新用户 " THEN kafka.app_version " + // 老用户 " ELSE hbase.first_app_version END AS first_app_version " + // first_server_time ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " + // 新用户 " THEN FROM_UNIXTIME(kafka.server_time, '-MM-dd HH:mm:ss') " + // 老用户 " ELSE hbase.first_server_time END AS first_server_time " + // first_server_hour ",CASE WHEN COALESCE(hbase.server_time, 0) <= kafka.server_time " + // 新用户 " THEN FROM_UNIXTIME(kafka.server_time, 'HH') " + // 老用户 " ELSE hbase.first_server_hour END AS first_server_hour " + // first_ip_location