Dear All,
近期在research Hive on Flink的一些特性,之前有一个Mapreduce的任务,语句【HQL】如下:
逻辑简单,全连接两个表(table1和table2),将结果写进新表table3。table1、table2和table3三张表结构相同,都有35个字段,join的时候用id作为连接,进行比较,如果table2的字段值不为null,或者不为空,就用table2的字段,反之用table1字段。最后把结果写进新表table3。
到目前为止,已经执行了17h,还没有结束,看数据流,好像快完了,不知道我的使用方法是否合理?
【HQL 】
insert into table3 select
if(t2.id is not null and t2.id <> '', t2.id, t1.id) as id
,if(t2.field2 is not null and t2.field2 <> '', t2.field2, t1.field2) as
field2
......
......
......
......
......
......
......
......
,if(t2.field35 is not null and field35.dt <> '', field35.dt , field35.dt )
as field35
from (
select * from table1 where (id is not null and id <> '')
) as t1 full join (
select * from table2 where (id is not null and id <> '')
) as t2 on (t1.id = t2.id)
代码如下:
public class FlinkHiveIntegration1 {
public static void main(String[] args) throws Exception {
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "myhive";
String database = "mydatabase";
String version = "1.1.0-cdh5.8.3";
HiveConf hiveConf = new HiveConf();
hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS,"thrift://*******:9083");
hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE,"hdfs://nameservice1/user/hive/warehouse");
HiveCatalog hive = new HiveCatalogTest(name, database, hiveConf ,
version);
tableEnv.registerCatalog(name, hive);
tableEnv.useCatalog(name);
tableEnv.useDatabase(database);
String HQL = HQL ;
tableEnv.getConfig().addConfiguration(new
Configuration().set(CoreOptions.DEFAULT_PARALLELISM, 8));
tableEnv.executeSql(HQL );
}
}
<http://apache-flink.147419.n8.nabble.com/file/t1162/hiveonFlink.png>
-----
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/