Bagavath, Sometimes we need to merge existing records, due to recomputations of the whole data. I don't think we could achieve this with pure insert, or is there a way?
On 24 July 2015 at 08:53, Bagavath <[email protected]> wrote: > Try using insert instead of merge. Typically we use insert append to do > bulk inserts to oracle. > > On Thu, Jul 23, 2015 at 1:12 AM, diplomatic Guru <[email protected] > > wrote: > >> Thanks Robin for your reply. >> >> I'm pretty sure that writing to Oracle is taking longer as when writing >> to HDFS is only taking ~5minutes. >> >> The job is writing about ~5 Million of records. I've set the job to call >> executeBatch() when the batchSize reaches 200,000 of records, so I >> assume that commit will be invoked at every 200K batch. In this case, it >> should only call commit 25 times, is this too much? I wouldn't want to >> increase the batch size any further as it may cause Java heap issue. I do >> not have much knowledge in Oracle side, so any advice with the >> configuration will be grateful. >> >> Thanks, >> >> Raj >> >> >> >> >> >> On 22 July 2015 at 20:20, Robin East <[email protected]> wrote: >> >>> The first question I would ask is have you determined whether you have a >>> performance issue writing to Oracle? In particular how many commits are you >>> making? If you are issuing a lot of commits that would be a performance >>> problem. >>> >>> Robin >>> >>> On 22 Jul 2015, at 19:11, diplomatic Guru <[email protected]> >>> wrote: >>> >>> Hello all, >>> >>> We are having a major performance issue with the Spark, which is holding >>> us from going live. >>> >>> We have a job that carries out computation on log files and write the >>> results into Oracle DB. >>> >>> The reducer 'reduceByKey' have been set to parallelize by 4 as we don't >>> want to establish too many DB connections. >>> >>> We are then calling the foreachPartition on the RDD pairs that were >>> reduced by the key. Within this foreachPartition method we establish DB >>> connection, then iterate the results, prepare the Oracle statement for >>> batch insertion then we commit the batch and close the connection. All >>> these are working fine. >>> >>> However, when we execute the job to process 12GB of data, it >>> takes forever to complete, especially at the foreachPartition stage. >>> >>> We submitted the job with 6 executors, 2 cores, and 6GB memory of which >>> 0.3 is assigned to spark.storage.memoryFraction. >>> >>> The job is taking about 50 minutes to complete, which is not ideal. I'm >>> not sure how we could enhance the performance. I've provided the main body >>> of the codes, please take a look and advice: >>> >>> From Driver: >>> >>> reduceResultsRDD.foreachPartition(new DB.InsertFunction( >>> dbuser,dbpass,batchsize)); >>> >>> >>> DB class: >>> >>> public class DB { >>> private static final Logger logger = LoggerFactory >>> .getLogger(DB.class); >>> public static class InsertFunction implements >>> VoidFunction<Iterator<Tuple2<String, String>>> { >>> >>> private static final long serialVersionUID = 999955766876878L; >>> private String dbuser = ""; >>> private String dbpass = ""; >>> private int batchsize; >>> >>> public InsertFunction(String dbuser, String dbpass, int batchsize) { >>> super(); >>> this.dbuser = dbuser; >>> this.dbuser = dbuser; >>> this.batchsize=batchsize; >>> } >>> >>> @Override >>> public void call(Iterator<Tuple2<String, String>> results) { >>> Connection connect = null; >>> PreparedStatement pstmt = null; >>> try { >>> connect = getDBConnection(dbuser, >>> dbpass); >>> >>> int count = 0; >>> >>> if (batchsize <= 0) { >>> batchsize = 10000; >>> } >>> >>> pstmt1 = connect >>> .prepareStatement("MERGE INTO SOME TABLE IF RECORD FOUND, IF NOT >>> INSERT"); >>> >>> while (results.hasNext()) { >>> >>> Tuple2<String, String> kv = results.next(); >>> String [] data = kv._1.concat("," +kv._2).split(","); >>> >>> pstmt.setString(1, data[0].toString()); >>> pstmt.setString(2, data[1].toString()); >>> ..... >>> >>> pstmt.addBatch(); >>> >>> count++; >>> >>> if (count == batchsize) { >>> logger.info("BulkCount : " + count); >>> pstmt.executeBatch(); >>> connect.commit(); >>> count = 0; >>> } >>> >>> pstmt.executeBatch(); >>> connect.commit(); >>> >>> } >>> >>> pstmt.executeBatch(); >>> connect.commit(); >>> >>> } catch (Exception e) { >>> logger.error("InsertFunction error: " + e.getMessage()); >>> } finally { >>> >>> if (pstmt != null) { >>> pstmt.close(); >>> } >>> >>> try { >>> connect.close(); >>> } catch (SQLException e) { >>> logger.error("InsertFunction Connection Close error: " >>> + e.getMessage()); >>> } >>> } >>> } >>> >>> } >>> } >>> >>> >>> >> >
