使用场景:FLink 1.11.1 读取mysql,一直报序列化错误,但感觉需要序列化的bean对象已经序列化,实在百思不得其解,前来求教各位大佬!!! 
代码再附件!!!


Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: 
java.util.concurrent.CountDownLatch@45ca843[Count = 2] is not serializable. The 
object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1901)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1614)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1571)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1553)
at com.hsq.APP.main(APP.java:43)
Caused by: java.io.NotSerializableException: java.util.concurrent.CountDownLatch
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
... 9 more


| |
引领
|
|
[email protected]
|
签名由网易邮箱大师定制

public class JdbcReaderTest extends RichSourceFunction<Tuple2<Long,Integer>> {


    private DataSource dataSource;
    private String querySql;
    private Connection conn;
    private PreparedStatement pst;

    public JdbcReaderTest(DataSource dataSource, String querySql) {
        this.dataSource = dataSource;
        this.querySql = querySql;
    }


    /**
     * 打开相应的数据连接
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        conn = dataSource.getConnection();
        pst = conn.prepareStatement(querySql);
    }

    /**
     * 执行查询并获取结果
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<Tuple2<Long,Integer>> ctx) throws Exception {

        ResultSet rs = pst.executeQuery();
        while (rs.next()) {
            long id = rs.getLong(1);
            int userId = rs.getInt(2);
            Tuple2<Long, Integer> tuple2 = new Tuple2<>(id, userId);
            ctx.collect(tuple2);
        }
        try {
            rs.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    /**
     * 关闭相应的连接
     */
    @Override
    public void cancel() {
        DbUtil.close(conn,pst);
    }
}


Test class APP{

    public static void main(String[] args) throws Exception {

        String url = null;

        if (args.length > 0){
            url = args[0];
        } else {
            url = URL;
        }

        Setting setting = new Setting(url);
        DSFactory dsFactory = DSFactory.create(setting);
        DataSource dataSource = dsFactory.getDataSource(MYSQL_HSQ_GROUP);

        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1);

        String sql = "select id,user_id from order limit 10";
        JdbcReaderTest jdbcReader = new JdbcReaderTest(dataSource,sql);
        DataStreamSource<org.apache.flink.api.java.tuple.Tuple2<Long, Integer>> 
tuple2DataStreamSource = env.addSource(jdbcReader);
        tuple2DataStreamSource.print();

        env.execute();

    }
}

回复