hi,
?????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????? public class Sink extends TwoPhaseCommitSinkFunction <ObjectNode, Connection, Void> { // private Connection connection; public Sink() { super(new KryoSerializer <>(Connection.class , new ExecutionConfig()) , VoidSerializer.INSTANCE); } @Override protected void invoke(Connection connection , ObjectNode objectNode , Context context) throws Exception { String stu = objectNode.get("value").toString(); Student student = JSON.parseObject(stu , Student.class); System.err.println("start invoke......." + "id = " + student.getId() + " name = " + student.getName() + " password" + " = " + student.getPassword() + " age = " + student.getAge()); String sql = "insert into Student(id,name,password,age) values (?,?,?,?)"; PreparedStatement ps = connection.prepareStatement(sql); ps.setInt(1 , student.getId()); ps.setString(2 , student.getName()); ps.setString(3 , student.getPassword()); ps.setInt(4 , student.getAge()); ps.executeUpdate(); //???????????? if (student.getId() == 33) { System.out.println(1 / 0); } } @Override protected Connection beginTransaction() throws Exception { String url = "jdbc:mysql:"; return DBConnectUtil.getConnection(url , "" , ""); } @Override protected void preCommit(Connection connection) throws Exception { } @Override protected void commit(Connection connection) { if (connection != null) { try { connection.commit(); } catch (SQLException e) { System.err.println("commit error ............" + e.getMessage()); } finally { try { connection.close(); } catch (SQLException e) { System.err.println(" finally commit error ............" + e.getMessage()); } } } } @Override protected void abort(Connection connection) { if (connection != null) { try { connection.rollback(); } catch (SQLException e) { System.err.println("abort error ............" + e.getMessage()); } finally { try { connection.close(); } catch (SQLException e) { System.err.println(" finally abort error ............" + e.getMessage()); } } } } } ------------------ ???????? ------------------ ??????: "Yun Tang"<myas...@live.com>; ????????: 2019??8??27??(??????) ????3:00 ??????: "user-zh"<user-zh@flink.apache.org>; ????: Re: ????flink???????????????????????????? Hi ??????????TwoPhaseCommitSinkFunction????????preCommit????snapshotState??????????????????currentTransactionHolder??????pendingCommitTransactions??????notifyCheckpointComplete??????????commit????????pendingCommitTransactions????????????????????????preCommit???????????????????????????? ????????TwoPhaseCommitSinkFunction??????????????????checkpoint interval??????????????????????????????????????state?????????????????? ???? ???? ________________________________ From: 1900 <575209...@qq.com> Sent: Tuesday, August 27, 2019 14:15 To: user-zh <user-zh@flink.apache.org> Subject: ????flink???????????????????????????? ????flink??????????????????????????????flink????????????????????TwoPhaseCommitSinkFunction??????????????????????????, ?C beginTransaction ?C preCommit ?C commit ?C abort ????sink??????????????????????????????????????????????????????MYSQL???????????????????????????????????????? preCommit?????????? ????????commit??????????????????????????????????checkpoint??????????????????checkpoint???????????????????????????????????????????????? ????????checkpoint??????????????????????????????????checkpoint???????????????????????????????????????? ????????????????????????checkpoint?????????????????????????????????????????????????? ????????????????????????????????????