tEnv.executeSql(insertSql); 是异步提交完任务就返回了, 如果是IDE里运行的话话,进程就直接退出导致job也就结束了。需要需要等到job结束, 目前可以通过下面这种方式 TableResult result = tEnv.executeSql(insertSql); result..getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
另外 tEnv.executeSql(insertSql); 已经提交作业了,不需要调用 bsEnv.execute("test"); liya...@huimin100.cn <liya...@huimin100.cn> 于2020年8月11日周二 下午3:20写道: > 下面粘的就是主程序代码 > > 能在hive里建表,创建的TemporaryView也有数据,但是tEnv.executeSql(insertSql)这块好像没执行,往新建的hive表里插入数据没反应。求助!!!! > > StreamExecutionEnvironment bsEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, > bsSettings);DataStream<UserInfo> dataStream = bsEnv.addSource(new > MySource());//构造hive catalog > String name = "myhive"; > String defaultDatabase = "default"; > String hiveConfDir = "D:\\demo\\flink-hive\\src\\main\\resources"; // a > local path > String version = "1.1.0"; > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, > version); > tEnv.registerCatalog("myhive", hive); > tEnv.useCatalog("myhive"); > tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > tEnv.createTemporaryView("users", dataStream); > > Table result3= tEnv.sqlQuery("SELECT userId, amount, DATE_FORMAT(ts, > 'yyyy-MM-dd') ymd, DATE_FORMAT(ts, 'HH') h, DATE_FORMAT(ts, 'mm') m FROM > users"); > > > tEnv.toRetractStream(result3, TypeInformation.of(new > TypeHint<Tuple5<String,Double,String,String,String>>(){})).print("res");// > 如果hive中已经存在了相应的表,则这段代码省略 > // String hiveSql = "CREATE TABLE fs_table (\n" + > // " user_id STRING,\n" + > // " order_amount DOUBLE \n" + > // ") partitioned by (dt string,h string,m string) \n" > + > // "stored as textfile \n" + > // "TBLPROPERTIES (\n" + > // " > 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',\n" + > // " 'sink.partition-commit.delay'='0s',\n" + > // " > 'sink.partition-commit.trigger'='partition-time',\n" + > // " 'sink.partition-commit.policy.kind'='metastore'" > + > // ")"; > // tEnv.executeSql(hiveSql); > > String insertSql = "insert into table fs_table partition (dt,h,m) > SELECT userId, amount, DATE_FORMAT(ts, 'yyyy-MM-dd') dt, DATE_FORMAT(ts, > 'HH') h, DATE_FORMAT(ts, 'mm') m FROM users"; > > tEnv.executeSql(insertSql); > > bsEnv.execute("test"); > > > liya...@huimin100.cn >