Try using insert instead of merge. Typically we use insert append to do bulk inserts to oracle.
On Thu, Jul 23, 2015 at 1:12 AM, diplomatic Guru <diplomaticg...@gmail.com> wrote: > Thanks Robin for your reply. > > I'm pretty sure that writing to Oracle is taking longer as when writing to > HDFS is only taking ~5minutes. > > The job is writing about ~5 Million of records. I've set the job to call > executeBatch() when the batchSize reaches 200,000 of records, so I assume > that commit will be invoked at every 200K batch. In this case, it should > only call commit 25 times, is this too much? I wouldn't want to increase > the batch size any further as it may cause Java heap issue. I do not have > much knowledge in Oracle side, so any advice with the configuration will be > grateful. > > Thanks, > > Raj > > > > > > On 22 July 2015 at 20:20, Robin East <robin.e...@xense.co.uk> wrote: > >> The first question I would ask is have you determined whether you have a >> performance issue writing to Oracle? In particular how many commits are you >> making? If you are issuing a lot of commits that would be a performance >> problem. >> >> Robin >> >> On 22 Jul 2015, at 19:11, diplomatic Guru <diplomaticg...@gmail.com> >> wrote: >> >> Hello all, >> >> We are having a major performance issue with the Spark, which is holding >> us from going live. >> >> We have a job that carries out computation on log files and write the >> results into Oracle DB. >> >> The reducer 'reduceByKey' have been set to parallelize by 4 as we don't >> want to establish too many DB connections. >> >> We are then calling the foreachPartition on the RDD pairs that were >> reduced by the key. Within this foreachPartition method we establish DB >> connection, then iterate the results, prepare the Oracle statement for >> batch insertion then we commit the batch and close the connection. All >> these are working fine. >> >> However, when we execute the job to process 12GB of data, it >> takes forever to complete, especially at the foreachPartition stage. >> >> We submitted the job with 6 executors, 2 cores, and 6GB memory of which >> 0.3 is assigned to spark.storage.memoryFraction. >> >> The job is taking about 50 minutes to complete, which is not ideal. I'm >> not sure how we could enhance the performance. I've provided the main body >> of the codes, please take a look and advice: >> >> From Driver: >> >> reduceResultsRDD.foreachPartition(new DB.InsertFunction( >> dbuser,dbpass,batchsize)); >> >> >> DB class: >> >> public class DB { >> private static final Logger logger = LoggerFactory >> .getLogger(DB.class); >> public static class InsertFunction implements >> VoidFunction<Iterator<Tuple2<String, String>>> { >> >> private static final long serialVersionUID = 999955766876878L; >> private String dbuser = ""; >> private String dbpass = ""; >> private int batchsize; >> >> public InsertFunction(String dbuser, String dbpass, int batchsize) { >> super(); >> this.dbuser = dbuser; >> this.dbuser = dbuser; >> this.batchsize=batchsize; >> } >> >> @Override >> public void call(Iterator<Tuple2<String, String>> results) { >> Connection connect = null; >> PreparedStatement pstmt = null; >> try { >> connect = getDBConnection(dbuser, >> dbpass); >> >> int count = 0; >> >> if (batchsize <= 0) { >> batchsize = 10000; >> } >> >> pstmt1 = connect >> .prepareStatement("MERGE INTO SOME TABLE IF RECORD FOUND, IF NOT INSERT"); >> >> while (results.hasNext()) { >> >> Tuple2<String, String> kv = results.next(); >> String [] data = kv._1.concat("," +kv._2).split(","); >> >> pstmt.setString(1, data[0].toString()); >> pstmt.setString(2, data[1].toString()); >> ..... >> >> pstmt.addBatch(); >> >> count++; >> >> if (count == batchsize) { >> logger.info("BulkCount : " + count); >> pstmt.executeBatch(); >> connect.commit(); >> count = 0; >> } >> >> pstmt.executeBatch(); >> connect.commit(); >> >> } >> >> pstmt.executeBatch(); >> connect.commit(); >> >> } catch (Exception e) { >> logger.error("InsertFunction error: " + e.getMessage()); >> } finally { >> >> if (pstmt != null) { >> pstmt.close(); >> } >> >> try { >> connect.close(); >> } catch (SQLException e) { >> logger.error("InsertFunction Connection Close error: " >> + e.getMessage()); >> } >> } >> } >> >> } >> } >> >> >> >