Flink ??????????MR????????opts??????????????????

2020-06-02 文章 shangwen
??Flink??TM??OOMDump??HeapDumpOnOutOfMemoryErrorFlinkoomdump??MR??mapreduce.map.java.opts??-XX:+HeapDumpOnOutOfMemoryError
 
-XX:HeapDumpPath=

回复: Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效

2020-03-23 文章 shangwen
好的,我会调整下JIRA描述并提交代码,感谢大家回复~




--原始邮件--
发件人:"Jingsong Li"https://issues.apache.org/jira/browse/FLINK-16281

 Best,
 Jingsong Lee

 On Mon, Mar 23, 2020 at 2:34 PM lucas.wu http://shipilev.net/blog/2016/arrays-wisdom-ancients/
 gt 

回复: Flink 1.10 的 JDBCUpsertOutputFormat flush方法的重试机制无法生效

2020-03-23 文章 shangwen
hi,我简单的看了下你提供的issue,这个应该可以解决如果connection被关闭,第二次执行被误认为正常的问题,另外还有一个问题是,假设我们的connection被关闭了,即使你重试三次,好像也是不能正常的执行,这里的重试如果加上判断,如果connection被关闭,则进行重新尝试连接,直到三次都异常才退出,这样会不会更好点。




--原始邮件--
发件人:"Jingsong Li"https://issues.apache.org/jira/browse/FLINK-16281

Best,
Jingsong Lee

On Mon, Mar 23, 2020 at 2:34 PM lucas.wu http://shipilev.net/blog/2016/arrays-wisdom-ancients/
 Query[] queries = batchStatements.toArray(new Query[0]); ParameterList[]
 parameterLists = batchParameters.toArray(new ParameterList[0]);
 batchStatements.clear(); // 这里已经被清空 batchParameters.clear(); ... if
 (connection.getAutoCommit()) { // 抛出异常 flags |=
 QueryExecutor.QUERY_SUPPRESS_BEGIN; } ... }
 
所以在Flink第二次重试执行这段代码的时候jdbcWriter.executeBatch的时候,batchStatements已经为Empty并直接返回了,导致Flink认为是成功的,这样会导致其实语句没有被执行?
 // PgStatement.java public int[] executeBatch() throws SQLException {
 checkClosed(); closeForNextExecution(); if (batchStatements == null ||
 batchStatements.isEmpty()) { //这里就直接返回了 return new int[0]; } return
 internalExecuteBatch().getUpdateCount(); }
 目前的想法是,当我出现异常捕获的时候,在重试之前,判断这个链接是否已经被关闭,如果已经被关闭,则重新open,不知道大家有啥看法,这是我提的issue
 https://issues.apache.org/jira/browse/FLINK-16708



-- 
Best, Jingsong Lee

Flink 1.10 ?? JDBCUpsertOutputFormat flush??????????????????????

2020-03-22 文章 shangwen
??JDBCpostgresql??tcpkill


2020-03-20 21:16:21.247 [jdbc-upsert-output-format-thread-1] ERROR 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat - JDBC 
executeBatch error, retry times = 1
org.postgresql.util.PSQLException: This connection has been closed.
at org.postgresql.jdbc.PgConnection.checkClosed(PgConnection.java:857)
at org.postgresql.jdbc.PgConnection.getAutoCommit(PgConnection.java:817)
at 
org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:813)
at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:873)
at 
org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1569)
at 
org.apache.flink.api.java.io.jdbc.writer.AppendOnlyWriter.executeBatch(AppendOnlyWriter.java:62)
at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.flush(JDBCUpsertOutputFormat.java:159)
at 
org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.lambda$open$0(JDBCUpsertOutputFormat.java:124)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)



??Flink??
//JDBCUpsertOutputFormat.javapublic synchronized void flush() throws Exception {
   checkFlushException();

   for (int i = 1; i <= maxRetryTimes; i++) {
  try {
 jdbcWriter.executeBatch();
 batchCount = 0;
 break;
  } catch (SQLException e) {
 LOG.error("JDBC executeBatch error, retry times = {}", i, e);
 if (i = maxRetryTimes) {
throw e;
 }
 Thread.sleep(1000 * i);
  }
   }
}


debug??
JDBCUpsertOutputFormat.flush
 - AppendOnlyWriter.executeBatch
  ...
  - PgConnection.getAutoCommit
PSQLException: This connection has been 
closedbatchStatements??
// PgStatement.java private BatchResultHandler internalExecuteBatch() throws 
SQLException {   // Construct query/parameter arrays.   
transformQueriesAndParameters();   // Empty arrays should be passed to toArray  
 // see http://shipilev.net/blog/2016/arrays-wisdom-ancients/   Query[] queries 
= batchStatements.toArray(new Query[0]);   ParameterList[] parameterLists = 
batchParameters.toArray(new ParameterList[0]);   batchStatements.clear(); // 
??   batchParameters.clear();   ...   if 
(connection.getAutoCommit()) { //    flags |= 
QueryExecutor.QUERY_SUPPRESS_BEGIN;   }   ... }


??FlinkjdbcWriter.executeBatchbatchStatements??Empty??Flink
// PgStatement.java public int[] executeBatch() throws SQLException {   
checkClosed();   closeForNextExecution();   if (batchStatements == null || 
batchStatements.isEmpty()) { // return new int[0];   }   
return internalExecuteBatch().getUpdateCount(); }


openissue
https://issues.apache.org/jira/browse/FLINK-16708