Hi!

stmtSet.execute() 默认是异步的,只是提交作业而不会等待作业完成。如果需要等待作业完成再进行后续步骤,需要用
stmtSet.execute().await()。

陈卓宇 <[email protected]> 于2021年12月10日周五 20:25写道:

> 您好社区:
> &nbsp;&nbsp; &nbsp;
> 我在使用flinksql将数据表A_now写入到数据库中后还有一步操作:将表A删除,完成将A_now更名为A,的切表操作。
> 发现当执行:
> //sql 插入数据到数据库操作
>
> StatementSet stmtSet = tenv.createStatementSet () ;
> stmtSet.addInsertSql ( insertSqlMongoDB ) ;
> stmtSet.addInsertSql ( insertSql ) ;
> stmtSet.execute () ;
> //完成后进行切表:
> /**  进行切表,删表 */
> 试试 {
>     MongoUtil2 实例 = MongoUtil2。 获取实例 () ;
> MongoCollection < Document &gt; oldData = instance.getCollection ( db,
> "t_up_tag_data_" +mongoKey ) ;
>     MongoCollection<Document&gt; oldmetadata = instance.getCollection(db,
> "t_up_tag_metadata_"+mongoKey);
>
>     如果 ( 旧数据!= null ){
>         oldData.drop () ;
>     }
>     如果 ( 旧元数据!= null ){
>         oldmetadata.drop () ;
>     }
>     MongoCollection < 文档 &gt; data = instance.getCollection ( db,
> "t_up_tag_data_" +mongoKey+ "_now" ) ;
> MongoCollection < Document &gt; metadata = instance.getCollection ( db,
> "t_up_tag_metadata_" +mongoKey+ "_now" ) ;
>
> MongoCollection < Document &gt; newData = instance.getCollection ( db,
> "t_up_tag_data_" +mongoKey ) ;
> MongoCollection < 文档 &gt; newmetadata = instance.getCollection ( db,
> "t_up_tag_metadata_" +mongoKey ) ;
> data.renameCollection ( newData.getNamespace ()) ;
> metadata.renameCollection ( newmetadata.getNamespace ()) ;
>     如果 ( 数据!= 空 ){
>         数据.drop () ;
>     }
>     如果 ( 元数据!= null ){
>         元数据.drop () ;
>     }
> } 捕获 ( 异常 e ){
> 记录 .info ( e.getMessage ()) ;
> }
> 发现切表逻辑并未触发,请问这是什么原因,我该如何修改使整个流程完整走完
>
> 陈卓
>
>
> &nbsp;
> &nbsp;

回复