使用场景:FLink 1.11.1 读取mysql,一直报序列化错误,但感觉需要序列化的bean对象已经序列化,实在百思不得其解,前来求教各位大佬!!!
代码再附件!!!
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException:
java.util.concurrent.CountDownLatch@45ca843[Count = 2] is not serializable. The
object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1901)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1614)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1571)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1553)
at com.hsq.APP.main(APP.java:43)
Caused by: java.io.NotSerializableException: java.util.concurrent.CountDownLatch
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
... 9 more
| |
引领
|
|
[email protected]
|
签名由网易邮箱大师定制
public class JdbcReaderTest extends RichSourceFunction<Tuple2<Long,Integer>> {
private DataSource dataSource;
private String querySql;
private Connection conn;
private PreparedStatement pst;
public JdbcReaderTest(DataSource dataSource, String querySql) {
this.dataSource = dataSource;
this.querySql = querySql;
}
/**
* æå¼ç¸åºçæ°æ®è¿æ¥
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
conn = dataSource.getConnection();
pst = conn.prepareStatement(querySql);
}
/**
* æ§è¡æ¥è¯¢å¹¶è·åç»æ
*
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<Tuple2<Long,Integer>> ctx) throws Exception {
ResultSet rs = pst.executeQuery();
while (rs.next()) {
long id = rs.getLong(1);
int userId = rs.getInt(2);
Tuple2<Long, Integer> tuple2 = new Tuple2<>(id, userId);
ctx.collect(tuple2);
}
try {
rs.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
/**
* å
³éç¸åºçè¿æ¥
*/
@Override
public void cancel() {
DbUtil.close(conn,pst);
}
}
Test class APP{
public static void main(String[] args) throws Exception {
String url = null;
if (args.length > 0){
url = args[0];
} else {
url = URL;
}
Setting setting = new Setting(url);
DSFactory dsFactory = DSFactory.create(setting);
DataSource dataSource = dsFactory.getDataSource(MYSQL_HSQ_GROUP);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1);
String sql = "select id,user_id from order limit 10";
JdbcReaderTest jdbcReader = new JdbcReaderTest(dataSource,sql);
DataStreamSource<org.apache.flink.api.java.tuple.Tuple2<Long, Integer>>
tuple2DataStreamSource = env.addSource(jdbcReader);
tuple2DataStreamSource.print();
env.execute();
}
}