tEnv.executeSql(insertSql); 是异步提交完任务就返回了,
如果是IDE里运行的话话,进程就直接退出导致job也就结束了。需要需要等到job结束,
目前可以通过下面这种方式
TableResult result = tEnv.executeSql(insertSql);
result..getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();

另外  tEnv.executeSql(insertSql); 已经提交作业了,不需要调用  bsEnv.execute("test");

liya...@huimin100.cn <liya...@huimin100.cn> 于2020年8月11日周二 下午3:20写道:

> 下面粘的就是主程序代码
>
> 能在hive里建表,创建的TemporaryView也有数据,但是tEnv.executeSql(insertSql)这块好像没执行,往新建的hive表里插入数据没反应。求助!!!!
>
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv,
> bsSettings);DataStream<UserInfo> dataStream = bsEnv.addSource(new
> MySource());//构造hive catalog
> String name = "myhive";
> String defaultDatabase = "default";
> String hiveConfDir = "D:\\demo\\flink-hive\\src\\main\\resources"; // a
> local path
> String version = "1.1.0";
>
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir,
> version);
> tEnv.registerCatalog("myhive", hive);
> tEnv.useCatalog("myhive");
> tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
> tEnv.createTemporaryView("users", dataStream);
>
> Table result3= tEnv.sqlQuery("SELECT userId, amount, DATE_FORMAT(ts,
> 'yyyy-MM-dd') ymd, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM
> users");
>
>
> tEnv.toRetractStream(result3, TypeInformation.of(new
> TypeHint<Tuple5<String,Double,String,String,String>>(){})).print("res");//
>     如果hive中已经存在了相应的表,则这段代码省略
> //    String hiveSql = "CREATE TABLE fs_table (\n" +
> //                     "  user_id STRING,\n" +
> //                     "  order_amount DOUBLE \n" +
> //                     ") partitioned by (dt string,h string,m string) \n"
> +
> //                     "stored as textfile \n" +
> //                     "TBLPROPERTIES (\n" +
> //                     "
> 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" +
> //                     "  'sink.partition-commit.delay'='0s',\n" +
> //                     "
> 'sink.partition-commit.trigger'='partition-time',\n" +
> //                     "  'sink.partition-commit.policy.kind'='metastore'"
> +
> //                     ")";
> //    tEnv.executeSql(hiveSql);
>
>         String insertSql = "insert into table fs_table partition (dt,h,m)
> SELECT userId, amount, DATE_FORMAT(ts, 'yyyy-MM-dd') dt, DATE_FORMAT(ts,
> 'HH') h, DATE_FORMAT(ts, 'mm') m FROM users";
>
>         tEnv.executeSql(insertSql);
>
>         bsEnv.execute("test");
>
>
> liya...@huimin100.cn
>

回复