It should not be parallel exec as the logging code is called in driver.
Have you checked if your driver is reran by cluster manager due to any
failure or error situation>

On Tue, Oct 17, 2017 at 11:52 PM, Marco Mistroni <mmistr...@gmail.com>
wrote:

> Hi
>  Uh if the problem is really with parallel exec u can try to call
> repartition(1) before u save
> Alternatively try to store data in a csv file and see if u have same
> behaviour, to exclude dynamodb issues
> Also ..are the multiple rows being written dupes (they have all same
> fields/values)?
> Hth
>
>
> On Oct 17, 2017 1:08 PM, "Harsh Choudhary" <shry.ha...@gmail.com> wrote:
>
>> This is the code -
>> hdfs_path=<path to a file in hdfs>
>> if(hdfs_path.contains(".avro")){
>>       data_df = spark.read.format("com.databri
>> cks.spark.avro").load(hdfs_path)
>>     }else if(hdfs_path.contains(".tsv")){
>>       data_df = spark.read.option("delimiter",
>> "\t").option("header","true").csv(hdfs_path)
>>     }else if(hdfs_path.contains(".scsv")){
>>       data_df = spark.read.option("delimiter",
>> ";").option("header","true").csv(hdfs_path)
>>     }else{
>>       System.exit(1)
>>     }
>>     data_df = data_df.withColumn("edl_created_by",
>> lit("IndexerSpark")).withColumn("edl_created_at",lit(currentTime))
>>     data_df.write.mode("append").parquet(dest_file)
>>     val status1 = AddLogToDynamo(Json.toJson(fil
>> eLineageEntity)(fileLineAgeFormat), conf.getString("lambda.filelin
>> eage.dynamodb.update.function.name"), GetAuth.getLambdaClient)
>>
>>     def AddLogToDynamo(updatedLambdaJson: JsValue, updateFunctionName:
>> String,lambdaClient: AWSLambdaClient):String = {
>>       System.out.println("new metadata to be updated:
>> "+updatedLambdaJson);
>>       val updatelambdaReq:InvokeRequest = new InvokeRequest();
>>       updatelambdaReq.setFunctionName(updateFunctionName);
>>       updatelambdaReq.setPayload(updatedLambdaJson.toString());
>>       System.out.println("Calling lambda to add log");
>>       val updateLambdaResult = byteBufferToString(lambdaClien
>> t.invoke(updatelambdaReq).getPayload(),Charset.forName("UTF-8"));
>>       return updateLambdaResult;
>>   }
>>
>>
>> Harsh Choudhary
>>
>> On Tue, Oct 17, 2017 at 5:32 PM, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> Can you share your code?
>>>
>>> On Tue, 17 Oct 2017 at 10:22 pm, Harsh Choudhary <shry.ha...@gmail.com>
>>> wrote:
>>>
>>>> Hi
>>>>
>>>> I'm running a Spark job in which I am appending new data into Parquet
>>>> file. At last, I make a log entry in my Dynamodb table stating the number
>>>> of records appended, time etc. Instead of one single entry in the database,
>>>> multiple entries are being made to it. Is it because of parallel execution
>>>> of code in workers? If it is so then how can I solve it so that it only
>>>> writes once.
>>>>
>>>> *Thanks!*
>>>>
>>>> *Cheers!*
>>>>
>>>> Harsh Choudhary
>>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>


-- 
Best Regards,
Ayan Guha

Reply via email to