hi,

??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????


public class Sink extends TwoPhaseCommitSinkFunction <ObjectNode, Connection, 
Void> {


//    private Connection connection;


    public Sink() {
        super(new KryoSerializer <>(Connection.class , new ExecutionConfig()) , 
VoidSerializer.INSTANCE);
    }


    @Override
    protected void invoke(Connection connection , ObjectNode objectNode , 
Context context) throws Exception {
        String  stu     = objectNode.get("value").toString();
        Student student = JSON.parseObject(stu , Student.class);


        System.err.println("start invoke......." + "id = " + student.getId() + 
"  name = " + student.getName() + "   password"
                           + " = " + student.getPassword() + "  age = " + 
student.getAge());


        String            sql = "insert into Student(id,name,password,age) 
values (?,?,?,?)";
        PreparedStatement ps  = connection.prepareStatement(sql);
        ps.setInt(1 , student.getId());
        ps.setString(2 , student.getName());
        ps.setString(3 , student.getPassword());
        ps.setInt(4 , student.getAge());
        ps.executeUpdate();
        //????????????
        if (student.getId() == 33) {
            System.out.println(1 / 0);
        }
    }


    @Override
    protected Connection beginTransaction() throws Exception {
        String url = "jdbc:mysql:";
        return DBConnectUtil.getConnection(url , "" , "");
    }


    @Override
    protected void preCommit(Connection connection) throws Exception {
    }


    @Override
    protected void commit(Connection connection) {
        if (connection != null) {
            try {
                connection.commit();
            } catch (SQLException e) {
                System.err.println("commit  error ............" + 
e.getMessage());
            } finally {
                try {
                    connection.close();
                } catch (SQLException e) {
                    System.err.println(" finally  commit error ............" + 
e.getMessage());
                }
            }
        }
    }


    @Override
    protected void abort(Connection connection) {
        if (connection != null) {
            try {
                connection.rollback();
            } catch (SQLException e) {
                System.err.println("abort error ............" + e.getMessage());
            } finally {
                try {
                    connection.close();
                } catch (SQLException e) {
                    System.err.println(" finally  abort error ............" + 
e.getMessage());
                }
            }
        }
    }
}







------------------ ???????? ------------------
??????: "Yun Tang"<myas...@live.com>;
????????: 2019??8??27??(??????) ????3:00
??????: "user-zh"<user-zh@flink.apache.org>;

????: Re: ????flink????????????????????????????



Hi

??????????TwoPhaseCommitSinkFunction????????preCommit????snapshotState??????????????????currentTransactionHolder??????pendingCommitTransactions??????notifyCheckpointComplete??????????commit????????pendingCommitTransactions????????????????????????preCommit????????????????????????????

????????TwoPhaseCommitSinkFunction??????????????????checkpoint 
interval??????????????????????????????????????state??????????????????

????
????
________________________________
From: 1900 <575209...@qq.com>
Sent: Tuesday, August 27, 2019 14:15
To: user-zh <user-zh@flink.apache.org>
Subject: ????flink????????????????????????????

????flink??????????????????????????????flink????????????????????TwoPhaseCommitSinkFunction??????????????????????????,



?C beginTransaction

?C preCommit

?C commit

?C abort



????sink??????????????????????????????????????????????????????MYSQL????????????????????????????????????????
 preCommit??????????
????????commit??????????????????????????????????checkpoint??????????????????checkpoint????????????????????????????????????????????????
????????checkpoint??????????????????????????????????checkpoint????????????????????????????????????????
????????????????????????checkpoint??????????????????????????????????????????????????
????????????????????????????????????

回复