Thanks Robin for your reply. I'm pretty sure that writing to Oracle is taking longer as when writing to HDFS is only taking ~5minutes.
The job is writing about ~5 Million of records. I've set the job to call executeBatch() when the batchSize reaches 200,000 of records, so I assume that commit will be invoked at every 200K batch. In this case, it should only call commit 25 times, is this too much? I wouldn't want to increase the batch size any further as it may cause Java heap issue. I do not have much knowledge in Oracle side, so any advice with the configuration will be grateful. Thanks, Raj On 22 July 2015 at 20:20, Robin East <robin.e...@xense.co.uk> wrote: > The first question I would ask is have you determined whether you have a > performance issue writing to Oracle? In particular how many commits are you > making? If you are issuing a lot of commits that would be a performance > problem. > > Robin > > On 22 Jul 2015, at 19:11, diplomatic Guru <diplomaticg...@gmail.com> > wrote: > > Hello all, > > We are having a major performance issue with the Spark, which is holding > us from going live. > > We have a job that carries out computation on log files and write the > results into Oracle DB. > > The reducer 'reduceByKey' have been set to parallelize by 4 as we don't > want to establish too many DB connections. > > We are then calling the foreachPartition on the RDD pairs that were > reduced by the key. Within this foreachPartition method we establish DB > connection, then iterate the results, prepare the Oracle statement for > batch insertion then we commit the batch and close the connection. All > these are working fine. > > However, when we execute the job to process 12GB of data, it takes forever > to complete, especially at the foreachPartition stage. > > We submitted the job with 6 executors, 2 cores, and 6GB memory of which > 0.3 is assigned to spark.storage.memoryFraction. > > The job is taking about 50 minutes to complete, which is not ideal. I'm > not sure how we could enhance the performance. I've provided the main body > of the codes, please take a look and advice: > > From Driver: > > reduceResultsRDD.foreachPartition(new DB.InsertFunction( > dbuser,dbpass,batchsize)); > > > DB class: > > public class DB { > private static final Logger logger = LoggerFactory > .getLogger(DB.class); > public static class InsertFunction implements > VoidFunction<Iterator<Tuple2<String, String>>> { > > private static final long serialVersionUID = 999955766876878L; > private String dbuser = ""; > private String dbpass = ""; > private int batchsize; > > public InsertFunction(String dbuser, String dbpass, int batchsize) { > super(); > this.dbuser = dbuser; > this.dbuser = dbuser; > this.batchsize=batchsize; > } > > @Override > public void call(Iterator<Tuple2<String, String>> results) { > Connection connect = null; > PreparedStatement pstmt = null; > try { > connect = getDBConnection(dbuser, > dbpass); > > int count = 0; > > if (batchsize <= 0) { > batchsize = 10000; > } > > pstmt1 = connect > .prepareStatement("MERGE INTO SOME TABLE IF RECORD FOUND, IF NOT INSERT"); > > while (results.hasNext()) { > > Tuple2<String, String> kv = results.next(); > String [] data = kv._1.concat("," +kv._2).split(","); > > pstmt.setString(1, data[0].toString()); > pstmt.setString(2, data[1].toString()); > ..... > > pstmt.addBatch(); > > count++; > > if (count == batchsize) { > logger.info("BulkCount : " + count); > pstmt.executeBatch(); > connect.commit(); > count = 0; > } > > pstmt.executeBatch(); > connect.commit(); > > } > > pstmt.executeBatch(); > connect.commit(); > > } catch (Exception e) { > logger.error("InsertFunction error: " + e.getMessage()); > } finally { > > if (pstmt != null) { > pstmt.close(); > } > > try { > connect.close(); > } catch (SQLException e) { > logger.error("InsertFunction Connection Close error: " > + e.getMessage()); > } > } > } > > } > } > > >