Bagavath,

Sometimes we need to merge existing records, due to recomputations of the
whole data. I don't think we could achieve this with pure insert, or is
there a way?



On 24 July 2015 at 08:53, Bagavath <[email protected]> wrote:

> Try using insert instead of merge.  Typically we use insert append to do
> bulk inserts to oracle.
>
> On Thu, Jul 23, 2015 at 1:12 AM, diplomatic Guru <[email protected]
> > wrote:
>
>> Thanks Robin for your reply.
>>
>> I'm pretty sure that writing to Oracle is taking longer as when writing
>> to HDFS is only taking ~5minutes.
>>
>> The job is writing about ~5 Million of records. I've set the job to call
>> executeBatch() when the batchSize reaches 200,000 of records, so I
>> assume that commit will be invoked at every 200K batch. In this case, it
>> should only call commit 25 times, is this too much? I wouldn't want to
>> increase the batch size any further as it may cause Java heap issue. I do
>> not have much knowledge in Oracle side, so any advice with the
>> configuration will be grateful.
>>
>> Thanks,
>>
>> Raj
>>
>>
>>
>>
>>
>> On 22 July 2015 at 20:20, Robin East <[email protected]> wrote:
>>
>>> The first question I would ask is have you determined whether you have a
>>> performance issue writing to Oracle? In particular how many commits are you
>>> making? If you are issuing a lot of commits that would be a performance
>>> problem.
>>>
>>> Robin
>>>
>>> On 22 Jul 2015, at 19:11, diplomatic Guru <[email protected]>
>>> wrote:
>>>
>>> Hello all,
>>>
>>> We are having a major performance issue with the Spark, which is holding
>>> us from going live.
>>>
>>> We have a job that carries out computation on log files and write the
>>> results into Oracle DB.
>>>
>>> The reducer 'reduceByKey'  have been set to parallelize by 4 as we don't
>>> want to establish too many DB connections.
>>>
>>> We are then calling the foreachPartition on the RDD pairs that were
>>> reduced by the key. Within this foreachPartition method we establish DB
>>> connection, then iterate the results, prepare the Oracle statement for
>>> batch insertion then we commit the batch and close the connection. All
>>> these are working fine.
>>>
>>> However, when we execute the job to process 12GB of data, it
>>> takes forever to complete, especially at the foreachPartition stage.
>>>
>>> We submitted the job with 6 executors, 2 cores, and 6GB memory of which
>>> 0.3 is assigned to spark.storage.memoryFraction.
>>>
>>> The job is taking about 50 minutes to complete, which is not ideal. I'm
>>> not sure how we could enhance the performance. I've provided the main body
>>> of the codes, please take a look and advice:
>>>
>>> From Driver:
>>>
>>> reduceResultsRDD.foreachPartition(new DB.InsertFunction(
>>> dbuser,dbpass,batchsize));
>>>
>>>
>>> DB class:
>>>
>>> public class DB {
>>> private static final Logger logger = LoggerFactory
>>> .getLogger(DB.class);
>>> public static class InsertFunction implements
>>> VoidFunction<Iterator<Tuple2<String, String>>> {
>>>
>>> private static final long serialVersionUID = 999955766876878L;
>>> private String dbuser = "";
>>> private String dbpass = "";
>>> private int batchsize;
>>>
>>> public InsertFunction(String dbuser, String dbpass, int batchsize) {
>>> super();
>>> this.dbuser = dbuser;
>>> this.dbuser = dbuser;
>>> this.batchsize=batchsize;
>>> }
>>>
>>> @Override
>>> public void call(Iterator<Tuple2<String, String>> results) {
>>> Connection connect = null;
>>> PreparedStatement pstmt = null;
>>> try {
>>> connect = getDBConnection(dbuser,
>>> dbpass);
>>>
>>> int count = 0;
>>>
>>> if (batchsize <= 0) {
>>> batchsize = 10000;
>>> }
>>>
>>> pstmt1 = connect
>>> .prepareStatement("MERGE INTO SOME TABLE IF RECORD FOUND, IF NOT
>>> INSERT");
>>>
>>> while (results.hasNext()) {
>>>
>>> Tuple2<String, String> kv = results.next();
>>>  String [] data = kv._1.concat("," +kv._2).split(",");
>>>
>>>  pstmt.setString(1, data[0].toString());
>>> pstmt.setString(2, data[1].toString());
>>> .....
>>>
>>> pstmt.addBatch();
>>>
>>> count++;
>>>
>>> if (count == batchsize) {
>>> logger.info("BulkCount : " + count);
>>> pstmt.executeBatch();
>>> connect.commit();
>>> count = 0;
>>> }
>>>
>>> pstmt.executeBatch();
>>> connect.commit();
>>>
>>> }
>>>
>>> pstmt.executeBatch();
>>> connect.commit();
>>>
>>> } catch (Exception e) {
>>> logger.error("InsertFunction error: " + e.getMessage());
>>> } finally {
>>>
>>> if (pstmt != null) {
>>> pstmt.close();
>>> }
>>>
>>> try {
>>>  connect.close();
>>> } catch (SQLException e) {
>>> logger.error("InsertFunction Connection Close error: "
>>> + e.getMessage());
>>> }
>>> }
>>> }
>>>
>>> }
>>> }
>>>
>>>
>>>
>>
>

Reply via email to