Thanks Robin for your reply.

I'm pretty sure that writing to Oracle is taking longer as when writing to
HDFS is only taking ~5minutes.

The job is writing about ~5 Million of records. I've set the job to call
executeBatch() when the batchSize reaches 200,000 of records, so I assume
that commit will be invoked at every 200K batch. In this case, it should
only call commit 25 times, is this too much? I wouldn't want to increase
the batch size any further as it may cause Java heap issue. I do not have
much knowledge in Oracle side, so any advice with the configuration will be
grateful.

Thanks,

Raj





On 22 July 2015 at 20:20, Robin East <robin.e...@xense.co.uk> wrote:

> The first question I would ask is have you determined whether you have a
> performance issue writing to Oracle? In particular how many commits are you
> making? If you are issuing a lot of commits that would be a performance
> problem.
>
> Robin
>
> On 22 Jul 2015, at 19:11, diplomatic Guru <diplomaticg...@gmail.com>
> wrote:
>
> Hello all,
>
> We are having a major performance issue with the Spark, which is holding
> us from going live.
>
> We have a job that carries out computation on log files and write the
> results into Oracle DB.
>
> The reducer 'reduceByKey'  have been set to parallelize by 4 as we don't
> want to establish too many DB connections.
>
> We are then calling the foreachPartition on the RDD pairs that were
> reduced by the key. Within this foreachPartition method we establish DB
> connection, then iterate the results, prepare the Oracle statement for
> batch insertion then we commit the batch and close the connection. All
> these are working fine.
>
> However, when we execute the job to process 12GB of data, it takes forever
> to complete, especially at the foreachPartition stage.
>
> We submitted the job with 6 executors, 2 cores, and 6GB memory of which
> 0.3 is assigned to spark.storage.memoryFraction.
>
> The job is taking about 50 minutes to complete, which is not ideal. I'm
> not sure how we could enhance the performance. I've provided the main body
> of the codes, please take a look and advice:
>
> From Driver:
>
> reduceResultsRDD.foreachPartition(new DB.InsertFunction(
> dbuser,dbpass,batchsize));
>
>
> DB class:
>
> public class DB {
> private static final Logger logger = LoggerFactory
> .getLogger(DB.class);
> public static class InsertFunction implements
> VoidFunction<Iterator<Tuple2<String, String>>> {
>
> private static final long serialVersionUID = 999955766876878L;
> private String dbuser = "";
> private String dbpass = "";
> private int batchsize;
>
> public InsertFunction(String dbuser, String dbpass, int batchsize) {
> super();
> this.dbuser = dbuser;
> this.dbuser = dbuser;
> this.batchsize=batchsize;
> }
>
> @Override
> public void call(Iterator<Tuple2<String, String>> results) {
> Connection connect = null;
> PreparedStatement pstmt = null;
> try {
> connect = getDBConnection(dbuser,
> dbpass);
>
> int count = 0;
>
> if (batchsize <= 0) {
> batchsize = 10000;
> }
>
> pstmt1 = connect
> .prepareStatement("MERGE INTO SOME TABLE IF RECORD FOUND, IF NOT INSERT");
>
> while (results.hasNext()) {
>
> Tuple2<String, String> kv = results.next();
>  String [] data = kv._1.concat("," +kv._2).split(",");
>
>  pstmt.setString(1, data[0].toString());
> pstmt.setString(2, data[1].toString());
> .....
>
> pstmt.addBatch();
>
> count++;
>
> if (count == batchsize) {
> logger.info("BulkCount : " + count);
> pstmt.executeBatch();
> connect.commit();
> count = 0;
> }
>
> pstmt.executeBatch();
> connect.commit();
>
> }
>
> pstmt.executeBatch();
> connect.commit();
>
> } catch (Exception e) {
> logger.error("InsertFunction error: " + e.getMessage());
> } finally {
>
> if (pstmt != null) {
> pstmt.close();
> }
>
> try {
>  connect.close();
> } catch (SQLException e) {
> logger.error("InsertFunction Connection Close error: "
> + e.getMessage());
> }
> }
> }
>
> }
> }
>
>
>

Reply via email to