Flink ??????????MR????????opts??????????????????
??Flink??TM??OOMDump??HeapDumpOnOutOfMemoryErrorFlinkoomdump??MR??mapreduce.map.java.opts??-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=
回复: Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效
好的,我会调整下JIRA描述并提交代码,感谢大家回复~ --原始邮件-- 发件人:"Jingsong Li"https://issues.apache.org/jira/browse/FLINK-16281 Best, Jingsong Lee On Mon, Mar 23, 2020 at 2:34 PM lucas.wu http://shipilev.net/blog/2016/arrays-wisdom-ancients/ gt
回复: Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效
hi,我简单的看了下你提供的issue,这个应该可以解决如果connection被关闭,第二次执行被误认为正常的问题,另外还有一个问题是,假设我们的connection被关闭了,即使你重试三次,好像也是不能正常的执行,这里的重试如果加上判断,如果connection被关闭,则进行重新尝试连接,直到三次都异常才退出,这样会不会更好点。 --原始邮件-- 发件人:"Jingsong Li"https://issues.apache.org/jira/browse/FLINK-16281 Best, Jingsong Lee On Mon, Mar 23, 2020 at 2:34 PM lucas.wu http://shipilev.net/blog/2016/arrays-wisdom-ancients/ Query[] queries = batchStatements.toArray(new Query[0]); ParameterList[] parameterLists = batchParameters.toArray(new ParameterList[0]); batchStatements.clear(); // 这里已经被清空 batchParameters.clear(); ... if (connection.getAutoCommit()) { // 抛出异常 flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN; } ... } 所以在Flink第二次重试执行这段代码的时候jdbcWriter.executeBatch的时候,batchStatements已经为Empty并直接返回了,导致Flink认为是成功的,这样会导致其实语句没有被执行? // PgStatement.java public int[] executeBatch() throws SQLException { checkClosed(); closeForNextExecution(); if (batchStatements == null || batchStatements.isEmpty()) { //这里就直接返回了 return new int[0]; } return internalExecuteBatch().getUpdateCount(); } 目前的想法是,当我出现异常捕获的时候,在重试之前,判断这个链接是否已经被关闭,如果已经被关闭,则重新open,不知道大家有啥看法,这是我提的issue https://issues.apache.org/jira/browse/FLINK-16708 -- Best, Jingsong Lee
Flink 1.10 ?? JDBCUpsertOutputFormat flush??????????????????????
??JDBCpostgresql??tcpkill 2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat - JDBC executeBatch error, retry times = 1 org.postgresql.util.PSQLException: This connection has been closed. at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857) at org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813) at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873) at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569) at org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62) at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159) at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ??Flink?? //JDBCUpsertOutputFormat.javapublic synchronized void flush() throws Exception { checkFlushException(); for (int i = 1; i <= maxRetryTimes; i++) { try { jdbcWriter.executeBatch(); batchCount = 0; break; } catch (SQLException e) { LOG.error("JDBC executeBatch error, retry times = {}", i, e); if (i = maxRetryTimes) { throw e; } Thread.sleep(1000 * i); } } } debug?? JDBCUpsertOutputFormat.flush - AppendOnlyWriter.executeBatch ... - PgConnection.getAutoCommit PSQLException: This connection has been closedbatchStatements?? // PgStatement.java private BatchResultHandler internalExecuteBatch() throws SQLException { // Construct query/parameter arrays. transformQueriesAndParameters(); // Empty arrays should be passed to toArray // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/ Query[] queries = batchStatements.toArray(new Query[0]); ParameterList[] parameterLists = batchParameters.toArray(new ParameterList[0]); batchStatements.clear(); // ?? batchParameters.clear(); ... if (connection.getAutoCommit()) { // flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN; } ... } ??FlinkjdbcWriter.executeBatchbatchStatements??Empty??Flink // PgStatement.java public int[] executeBatch() throws SQLException { checkClosed(); closeForNextExecution(); if (batchStatements == null || batchStatements.isEmpty()) { // return new int[0]; } return internalExecuteBatch().getUpdateCount(); } openissue https://issues.apache.org/jira/browse/FLINK-16708