source.map(......).addSink(new MySQLSink())

MySQLSink 就是接收前面算子生成的要执行的 SQL 并执行。 

@Override
public void invoke(JDBCStatement statement, Context context) throws Exception {

    log.info(statement.getSql());
    log.info(statement.getParasMap().toString());
    try {
        namedTemplate.update(statement.getSql(), statement.getParasMap());
    } catch (Exception e) {
        e.printStackTrace();
    }
}
 
Flink 能保证 namedTemplate.update(statement.getSql(), statement.getParasMap()) 
一定执行成功吗?

谢谢,
王磊



[email protected] 

回复