Try using insert instead of merge.  Typically we use insert append to do
bulk inserts to oracle.

On Thu, Jul 23, 2015 at 1:12 AM, diplomatic Guru <diplomaticg...@gmail.com>
wrote:

> Thanks Robin for your reply.
>
> I'm pretty sure that writing to Oracle is taking longer as when writing to
> HDFS is only taking ~5minutes.
>
> The job is writing about ~5 Million of records. I've set the job to call
> executeBatch() when the batchSize reaches 200,000 of records, so I assume
> that commit will be invoked at every 200K batch. In this case, it should
> only call commit 25 times, is this too much? I wouldn't want to increase
> the batch size any further as it may cause Java heap issue. I do not have
> much knowledge in Oracle side, so any advice with the configuration will be
> grateful.
>
> Thanks,
>
> Raj
>
>
>
>
>
> On 22 July 2015 at 20:20, Robin East <robin.e...@xense.co.uk> wrote:
>
>> The first question I would ask is have you determined whether you have a
>> performance issue writing to Oracle? In particular how many commits are you
>> making? If you are issuing a lot of commits that would be a performance
>> problem.
>>
>> Robin
>>
>> On 22 Jul 2015, at 19:11, diplomatic Guru <diplomaticg...@gmail.com>
>> wrote:
>>
>> Hello all,
>>
>> We are having a major performance issue with the Spark, which is holding
>> us from going live.
>>
>> We have a job that carries out computation on log files and write the
>> results into Oracle DB.
>>
>> The reducer 'reduceByKey'  have been set to parallelize by 4 as we don't
>> want to establish too many DB connections.
>>
>> We are then calling the foreachPartition on the RDD pairs that were
>> reduced by the key. Within this foreachPartition method we establish DB
>> connection, then iterate the results, prepare the Oracle statement for
>> batch insertion then we commit the batch and close the connection. All
>> these are working fine.
>>
>> However, when we execute the job to process 12GB of data, it
>> takes forever to complete, especially at the foreachPartition stage.
>>
>> We submitted the job with 6 executors, 2 cores, and 6GB memory of which
>> 0.3 is assigned to spark.storage.memoryFraction.
>>
>> The job is taking about 50 minutes to complete, which is not ideal. I'm
>> not sure how we could enhance the performance. I've provided the main body
>> of the codes, please take a look and advice:
>>
>> From Driver:
>>
>> reduceResultsRDD.foreachPartition(new DB.InsertFunction(
>> dbuser,dbpass,batchsize));
>>
>>
>> DB class:
>>
>> public class DB {
>> private static final Logger logger = LoggerFactory
>> .getLogger(DB.class);
>> public static class InsertFunction implements
>> VoidFunction<Iterator<Tuple2<String, String>>> {
>>
>> private static final long serialVersionUID = 999955766876878L;
>> private String dbuser = "";
>> private String dbpass = "";
>> private int batchsize;
>>
>> public InsertFunction(String dbuser, String dbpass, int batchsize) {
>> super();
>> this.dbuser = dbuser;
>> this.dbuser = dbuser;
>> this.batchsize=batchsize;
>> }
>>
>> @Override
>> public void call(Iterator<Tuple2<String, String>> results) {
>> Connection connect = null;
>> PreparedStatement pstmt = null;
>> try {
>> connect = getDBConnection(dbuser,
>> dbpass);
>>
>> int count = 0;
>>
>> if (batchsize <= 0) {
>> batchsize = 10000;
>> }
>>
>> pstmt1 = connect
>> .prepareStatement("MERGE INTO SOME TABLE IF RECORD FOUND, IF NOT INSERT");
>>
>> while (results.hasNext()) {
>>
>> Tuple2<String, String> kv = results.next();
>>  String [] data = kv._1.concat("," +kv._2).split(",");
>>
>>  pstmt.setString(1, data[0].toString());
>> pstmt.setString(2, data[1].toString());
>> .....
>>
>> pstmt.addBatch();
>>
>> count++;
>>
>> if (count == batchsize) {
>> logger.info("BulkCount : " + count);
>> pstmt.executeBatch();
>> connect.commit();
>> count = 0;
>> }
>>
>> pstmt.executeBatch();
>> connect.commit();
>>
>> }
>>
>> pstmt.executeBatch();
>> connect.commit();
>>
>> } catch (Exception e) {
>> logger.error("InsertFunction error: " + e.getMessage());
>> } finally {
>>
>> if (pstmt != null) {
>> pstmt.close();
>> }
>>
>> try {
>>  connect.close();
>> } catch (SQLException e) {
>> logger.error("InsertFunction Connection Close error: "
>> + e.getMessage());
>> }
>> }
>> }
>>
>> }
>> }
>>
>>
>>
>

Reply via email to