The first question I would ask is have you determined whether you have a 
performance issue writing to Oracle? In particular how many commits are you 
making? If you are issuing a lot of commits that would be a performance problem.

Robin

> On 22 Jul 2015, at 19:11, diplomatic Guru <diplomaticg...@gmail.com> wrote:
> 
> Hello all,
> 
> We are having a major performance issue with the Spark, which is holding us 
> from going live.
> 
> We have a job that carries out computation on log files and write the results 
> into Oracle DB.
> 
> The reducer 'reduceByKey'  have been set to parallelize by 4 as we don't want 
> to establish too many DB connections.
> 
> We are then calling the foreachPartition on the RDD pairs that were reduced 
> by the key. Within this foreachPartition method we establish DB connection, 
> then iterate the results, prepare the Oracle statement for batch insertion 
> then we commit the batch and close the connection. All these are working fine.
> 
> However, when we execute the job to process 12GB of data, it takes forever to 
> complete, especially at the foreachPartition stage.
> 
> We submitted the job with 6 executors, 2 cores, and 6GB memory of which 0.3 
> is assigned to spark.storage.memoryFraction.
> 
> The job is taking about 50 minutes to complete, which is not ideal. I'm not 
> sure how we could enhance the performance. I've provided the main body of the 
> codes, please take a look and advice:
> 
> From Driver:
> 
> reduceResultsRDD.foreachPartition(new DB.InsertFunction( 
> dbuser,dbpass,batchsize));
> 
> 
> DB class:
> 
> public class DB {
>       private static final Logger logger = LoggerFactory
>                       .getLogger(DB.class);
>       
> public static class InsertFunction implements
>                       VoidFunction<Iterator<Tuple2<String, String>>> {
> 
>               private static final long serialVersionUID = 999955766876878L;
>               private String dbuser = "";
>               private String dbpass = "";
>               private int batchsize;
> 
>               public InsertFunction(String dbuser, String dbpass, int 
> batchsize) {
>                       super();
>                       this.dbuser = dbuser;
>                       this.dbuser = dbuser;
>                       this.batchsize=batchsize;
>               }
> 
> @Override
>               public void call(Iterator<Tuple2<String, String>> results) {
>                       Connection connect = null;
>                       PreparedStatement pstmt = null;
>                       try {
>                               connect = getDBConnection(dbuser,
>                                               dbpass);
> 
>                               int count = 0;
> 
>                               if (batchsize <= 0) {
>                                       batchsize = 10000;
>                               }
> 
>                               pstmt1 = connect
>                                               .prepareStatement("MERGE INTO 
> SOME TABLE IF RECORD FOUND, IF NOT INSERT");
> 
>                               while (results.hasNext()) {
> 
>                                       Tuple2<String, String> kv = 
> results.next();
>       
>                                               String [] data = 
> kv._1.concat("," +kv._2).split(",");
> 
>                               
>                                       pstmt.setString(1, data[0].toString());
>                                       pstmt.setString(2, data[1].toString());
>                .....
> 
>                                       pstmt.addBatch();
> 
>                                       count++;
> 
>                                       if (count == batchsize) {
>                                               logger.info 
> <http://logger.info/>("BulkCount : " + count);
>                                               pstmt.executeBatch();
>                                               connect.commit();
>                                               count = 0;
>                                       }
> 
>                                       pstmt.executeBatch();
>                                       connect.commit();
> 
>                               }
> 
>                               pstmt.executeBatch();
>                               connect.commit();
> 
>                       } catch (Exception e) {
>                               logger.error("InsertFunction error: " + 
> e.getMessage());
>                       } finally {
> 
>                               if (pstmt != null) {
>                                       pstmt.close();
>                               }
> 
>                               try {
>       
>                                       connect.close();
>                               } catch (SQLException e) {
>                                       logger.error("InsertFunction Connection 
> Close error: "
>                                                       + e.getMessage());
>                               }
>                       }
>               }
> 
>       }
> }

Reply via email to