If it's a sink that use jdbc, why not using the flink Jdbcsink connector?
On Wed, Oct 16, 2019, 17:03 Xin Ma <kevin.xin...@gmail.com> wrote: > I have watched one of the recent Flink forward videos, Apache Flink Worst > Practices by Konstantin Knauf. The talk helps me a lot and mentions that we > should avoid using static variables to share state between tasks. > > So should I also avoid static database connection? Because I am facing a > weird issue currently, the database connection will lose at some point and > bring the whole job down. > > *I have created a database tool like this, * > > public class Phoenix { > > private static ComboPooledDataSource dataSource = new > ComboPooledDataSource(); > static { > try { > > dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName", > "org.apache.phoenix.jdbc.PhoenixDriver")); > dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url", > null)); > dataSource.setMaxPoolSize(200); > dataSource.setMinPoolSize(10); > Properties properties = new Properties(); > properties.setProperty("user", "---"); > properties.setProperty("password", "---"); > dataSource.setProperties(properties); > } catch (PropertyVetoException e) { > throw new RuntimeException("phoenix datasource conf error"); > } > } > > private static Connection getConn() throws SQLException { > return dataSource.getConnection(); > } > > public static < T > T executeQuery(String sql, Caller < T > caller) > throws SQLException { > // .. execiton logic > } > > public static int executeUpdateWithTx(List < String > sqlList) throws > SQLException { > // ..update logic > } > > } > > *Then I implemented my customized sink function like this,* > > public class CustomizedSink extends RichSinkFunction < Record > { > > private static Logger LOG = LoggerFactory.getLogger("userFlinkLogger"); > private static final int batchInsertSize = 5000; > private static final long flushInterval = 60 * 1000 L; > private long lastFlushTime; > private BatchCommit batchCommit; > private ConcurrentLinkedQueue < Object > cacheQueue; > private ExecutorService threadPool; > > @Override > public void open(Configuration parameters) throws Exception { > cacheQueue = new ConcurrentLinkedQueue < > (); > threadPool = Executors.newFixedThreadPool(1); > batchCommit = new BatchCommit(); > super.open(parameters); > } > > @Override > public void invoke(DriverLbs driverLbs) throws Exception { > cacheQueue.add(driverLbs); > if (cacheQueue.size() >= batchInsertSize || > System.currentTimeMillis() - lastFlushTime >= flushInterval) { > lastFlushTime = System.currentTimeMillis(); > threadPool.execute(batchCommit); > } > } > > private class BatchCommit implements Runnable { > @Override > public void run() { > try { > int ct; > synchronized(cacheQueue) { > List < String > sqlList = Lists.newArrayList(); > for (ct = 0; ct < batchInsertSize; ct++) { > Object obj = cacheQueue.poll(); > if (obj == null) { > break; > } > sqlList.add(generateBatchSql((Record) obj)); > } > Phoenix.executeUpdateWithTx(sqlList); > } > LOG.info("Batch insert " + ct + " cache records"); > } catch (Exception e) { > LOG.error("Batch insert error: ", e); > } > } > > private String generateBatchSql(Record record) { > // business logic > } > } > } > > *Is there any good idea to refactor the codes?* > > Best, > Kevin > >