The first question I would ask is have you determined whether you have a performance issue writing to Oracle? In particular how many commits are you making? If you are issuing a lot of commits that would be a performance problem.
Robin > On 22 Jul 2015, at 19:11, diplomatic Guru <diplomaticg...@gmail.com> wrote: > > Hello all, > > We are having a major performance issue with the Spark, which is holding us > from going live. > > We have a job that carries out computation on log files and write the results > into Oracle DB. > > The reducer 'reduceByKey' have been set to parallelize by 4 as we don't want > to establish too many DB connections. > > We are then calling the foreachPartition on the RDD pairs that were reduced > by the key. Within this foreachPartition method we establish DB connection, > then iterate the results, prepare the Oracle statement for batch insertion > then we commit the batch and close the connection. All these are working fine. > > However, when we execute the job to process 12GB of data, it takes forever to > complete, especially at the foreachPartition stage. > > We submitted the job with 6 executors, 2 cores, and 6GB memory of which 0.3 > is assigned to spark.storage.memoryFraction. > > The job is taking about 50 minutes to complete, which is not ideal. I'm not > sure how we could enhance the performance. I've provided the main body of the > codes, please take a look and advice: > > From Driver: > > reduceResultsRDD.foreachPartition(new DB.InsertFunction( > dbuser,dbpass,batchsize)); > > > DB class: > > public class DB { > private static final Logger logger = LoggerFactory > .getLogger(DB.class); > > public static class InsertFunction implements > VoidFunction<Iterator<Tuple2<String, String>>> { > > private static final long serialVersionUID = 999955766876878L; > private String dbuser = ""; > private String dbpass = ""; > private int batchsize; > > public InsertFunction(String dbuser, String dbpass, int > batchsize) { > super(); > this.dbuser = dbuser; > this.dbuser = dbuser; > this.batchsize=batchsize; > } > > @Override > public void call(Iterator<Tuple2<String, String>> results) { > Connection connect = null; > PreparedStatement pstmt = null; > try { > connect = getDBConnection(dbuser, > dbpass); > > int count = 0; > > if (batchsize <= 0) { > batchsize = 10000; > } > > pstmt1 = connect > .prepareStatement("MERGE INTO > SOME TABLE IF RECORD FOUND, IF NOT INSERT"); > > while (results.hasNext()) { > > Tuple2<String, String> kv = > results.next(); > > String [] data = > kv._1.concat("," +kv._2).split(","); > > > pstmt.setString(1, data[0].toString()); > pstmt.setString(2, data[1].toString()); > ..... > > pstmt.addBatch(); > > count++; > > if (count == batchsize) { > logger.info > <http://logger.info/>("BulkCount : " + count); > pstmt.executeBatch(); > connect.commit(); > count = 0; > } > > pstmt.executeBatch(); > connect.commit(); > > } > > pstmt.executeBatch(); > connect.commit(); > > } catch (Exception e) { > logger.error("InsertFunction error: " + > e.getMessage()); > } finally { > > if (pstmt != null) { > pstmt.close(); > } > > try { > > connect.close(); > } catch (SQLException e) { > logger.error("InsertFunction Connection > Close error: " > + e.getMessage()); > } > } > } > > } > }