Hi! stmtSet.execute() 默认是异步的,只是提交作业而不会等待作业完成。如果需要等待作业完成再进行后续步骤,需要用 stmtSet.execute().await()。
陈卓宇 <[email protected]> 于2021年12月10日周五 20:25写道: > 您好社区: > > 我在使用flinksql将数据表A_now写入到数据库中后还有一步操作:将表A删除,完成将A_now更名为A,的切表操作。 > 发现当执行: > //sql 插入数据到数据库操作 > > StatementSet stmtSet = tenv.createStatementSet () ; > stmtSet.addInsertSql ( insertSqlMongoDB ) ; > stmtSet.addInsertSql ( insertSql ) ; > stmtSet.execute () ; > //完成后进行切表: > /** 进行切表,删表 */ > 试试 { > MongoUtil2 实例 = MongoUtil2。 获取实例 () ; > MongoCollection < Document > oldData = instance.getCollection ( db, > "t_up_tag_data_" +mongoKey ) ; > MongoCollection<Document> oldmetadata = instance.getCollection(db, > "t_up_tag_metadata_"+mongoKey); > > 如果 ( 旧数据!= null ){ > oldData.drop () ; > } > 如果 ( 旧元数据!= null ){ > oldmetadata.drop () ; > } > MongoCollection < 文档 > data = instance.getCollection ( db, > "t_up_tag_data_" +mongoKey+ "_now" ) ; > MongoCollection < Document > metadata = instance.getCollection ( db, > "t_up_tag_metadata_" +mongoKey+ "_now" ) ; > > MongoCollection < Document > newData = instance.getCollection ( db, > "t_up_tag_data_" +mongoKey ) ; > MongoCollection < 文档 > newmetadata = instance.getCollection ( db, > "t_up_tag_metadata_" +mongoKey ) ; > data.renameCollection ( newData.getNamespace ()) ; > metadata.renameCollection ( newmetadata.getNamespace ()) ; > 如果 ( 数据!= 空 ){ > 数据.drop () ; > } > 如果 ( 元数据!= null ){ > 元数据.drop () ; > } > } 捕获 ( 异常 e ){ > 记录 .info ( e.getMessage ()) ; > } > 发现切表逻辑并未触发,请问这是什么原因,我该如何修改使整个流程完整走完 > > 陈卓 > > > >
