??????????????????JDBC????postgresql??????????tcpkill????????????????????????????????????????????????????????????????????
2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat - JDBC
executeBatch error, retry times = 1
org.postgresql.util.PSQLException: This connection has been closed.
at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
at org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
at
org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
at
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
at
org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
at
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
at
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
??????????????????????????????????????????????????Flink??????????
//JDBCUpsertOutputFormat.javapublic synchronized void flush() throws Exception {
checkFlushException();
for (int i = 1; i <= maxRetryTimes; i++) {
try {
jdbcWriter.executeBatch();
batchCount = 0;
break;
} catch (SQLException e) {
LOG.error("JDBC executeBatch error, retry times = {}", i, e);
if (i >= maxRetryTimes) {
throw e;
}
Thread.sleep(1000 * i);
}
}
}
????????debug??????????????????
JDBCUpsertOutputFormat.flush
-> AppendOnlyWriter.executeBatch
...
-> PgConnection.getAutoCommit
????PSQLException: This connection has been
closed????batchStatements??????????????????
// PgStatement.java private BatchResultHandler internalExecuteBatch() throws
SQLException { // Construct query/parameter arrays.
transformQueriesAndParameters(); // Empty arrays should be passed to toArray
// see http://shipilev.net/blog/2016/arrays-wisdom-ancients/ Query[] queries
= batchStatements.toArray(new Query[0]); ParameterList[] parameterLists =
batchParameters.toArray(new ParameterList[0]); batchStatements.clear(); //
?????????????? batchParameters.clear(); ... if
(connection.getAutoCommit()) { // ???????? flags |=
QueryExecutor.QUERY_SUPPRESS_BEGIN; } ... }
??????Flink????????????????????????????jdbcWriter.executeBatch????????batchStatements??????Empty??????????????????Flink????????????????????????????????????????????
// PgStatement.java public int[] executeBatch() throws SQLException {
checkClosed(); closeForNextExecution(); if (batchStatements == null ||
batchStatements.isEmpty()) { //???????????????? return new int[0]; }
return internalExecuteBatch().getUpdateCount(); }
????????????????????????????????????????????????????????????????????????????????????????????????????open????????????????????????????????issue
https://issues.apache.org/jira/browse/FLINK-16708