If it's a sink that use jdbc, why not using the flink Jdbcsink connector?

On Wed, Oct 16, 2019, 17:03 Xin Ma <kevin.xin...@gmail.com> wrote:

> I have watched one of the recent Flink forward videos, Apache Flink Worst
> Practices by Konstantin Knauf. The talk helps me a lot and mentions that we
> should avoid using static variables to share state between tasks.
>
> So should I also avoid static database connection? Because I am facing a
> weird issue currently, the database connection will lose at some point and
> bring the whole job down.
>
> *I have created a database tool like this, *
>
> public class Phoenix {
>
>     private static ComboPooledDataSource dataSource = new
> ComboPooledDataSource();
>     static {
>         try {
>
> dataSource.setDriverClass(Environment.get("phoenix.jdbc.driverClassName",
> "org.apache.phoenix.jdbc.PhoenixDriver"));
>             dataSource.setJdbcUrl(Environment.get("phoenix.jdbc.url",
> null));
>             dataSource.setMaxPoolSize(200);
>             dataSource.setMinPoolSize(10);
>             Properties properties = new Properties();
>             properties.setProperty("user", "---");
>             properties.setProperty("password", "---");
>             dataSource.setProperties(properties);
>         } catch (PropertyVetoException e) {
>             throw new RuntimeException("phoenix datasource conf error");
>         }
>     }
>
>     private static Connection getConn() throws SQLException {
>         return dataSource.getConnection();
>     }
>
>     public static < T > T executeQuery(String sql, Caller < T > caller)
> throws SQLException {
>         // .. execiton logic
>     }
>
>     public static int executeUpdateWithTx(List < String > sqlList) throws
> SQLException {
>         // ..update logic
>     }
>
> }
>
> *Then I implemented my customized sink function like this,*
>
> public class CustomizedSink extends RichSinkFunction < Record > {
>
>     private static Logger LOG = LoggerFactory.getLogger("userFlinkLogger");
>     private static final int batchInsertSize = 5000;
>     private static final long flushInterval = 60 * 1000 L;
>     private long lastFlushTime;
>     private BatchCommit batchCommit;
>     private ConcurrentLinkedQueue < Object > cacheQueue;
>     private ExecutorService threadPool;
>
>     @Override
>     public void open(Configuration parameters) throws Exception {
>         cacheQueue = new ConcurrentLinkedQueue < > ();
>         threadPool = Executors.newFixedThreadPool(1);
>         batchCommit = new BatchCommit();
>         super.open(parameters);
>     }
>
>     @Override
>     public void invoke(DriverLbs driverLbs) throws Exception {
>         cacheQueue.add(driverLbs);
>         if (cacheQueue.size() >= batchInsertSize ||
>             System.currentTimeMillis() - lastFlushTime >= flushInterval) {
>             lastFlushTime = System.currentTimeMillis();
>             threadPool.execute(batchCommit);
>         }
>     }
>
>     private class BatchCommit implements Runnable {
>         @Override
>         public void run() {
>             try {
>                 int ct;
>                 synchronized(cacheQueue) {
>                     List < String > sqlList = Lists.newArrayList();
>                     for (ct = 0; ct < batchInsertSize; ct++) {
>                         Object obj = cacheQueue.poll();
>                         if (obj == null) {
>                             break;
>                         }
>                         sqlList.add(generateBatchSql((Record) obj));
>                     }
>                     Phoenix.executeUpdateWithTx(sqlList);
>                 }
>                 LOG.info("Batch insert " + ct + " cache records");
>             } catch (Exception e) {
>                 LOG.error("Batch insert error: ", e);
>             }
>         }
>
>         private String generateBatchSql(Record record) {
>             // business logic
>         }
>     }
> }
>
> *Is there any good idea to refactor the codes?*
>
> Best,
> Kevin
>
>

Reply via email to