Hi
 Uh if the problem is really with parallel exec u can try to call
repartition(1) before u save
Alternatively try to store data in a csv file and see if u have same
behaviour, to exclude dynamodb issues
Also ..are the multiple rows being written dupes (they have all same
fields/values)?
Hth


On Oct 17, 2017 1:08 PM, "Harsh Choudhary" <shry.ha...@gmail.com> wrote:

> This is the code -
> hdfs_path=<path to a file in hdfs>
> if(hdfs_path.contains(".avro")){
>       data_df = spark.read.format("com.databricks.spark.avro").load(hdfs_
> path)
>     }else if(hdfs_path.contains(".tsv")){
>       data_df = spark.read.option("delimiter",
> "\t").option("header","true").csv(hdfs_path)
>     }else if(hdfs_path.contains(".scsv")){
>       data_df = spark.read.option("delimiter",
> ";").option("header","true").csv(hdfs_path)
>     }else{
>       System.exit(1)
>     }
>     data_df = data_df.withColumn("edl_created_by",
> lit("IndexerSpark")).withColumn("edl_created_at",lit(currentTime))
>     data_df.write.mode("append").parquet(dest_file)
>     val status1 = AddLogToDynamo(Json.toJson(fil
> eLineageEntity)(fileLineAgeFormat), conf.getString("lambda.filelin
> eage.dynamodb.update.function.name"), GetAuth.getLambdaClient)
>
>     def AddLogToDynamo(updatedLambdaJson: JsValue, updateFunctionName:
> String,lambdaClient: AWSLambdaClient):String = {
>       System.out.println("new metadata to be updated: "+updatedLambdaJson);
>       val updatelambdaReq:InvokeRequest = new InvokeRequest();
>       updatelambdaReq.setFunctionName(updateFunctionName);
>       updatelambdaReq.setPayload(updatedLambdaJson.toString());
>       System.out.println("Calling lambda to add log");
>       val updateLambdaResult = byteBufferToString(lambdaClien
> t.invoke(updatelambdaReq).getPayload(),Charset.forName("UTF-8"));
>       return updateLambdaResult;
>   }
>
>
> Harsh Choudhary
>
> On Tue, Oct 17, 2017 at 5:32 PM, ayan guha <guha.a...@gmail.com> wrote:
>
>> Can you share your code?
>>
>> On Tue, 17 Oct 2017 at 10:22 pm, Harsh Choudhary <shry.ha...@gmail.com>
>> wrote:
>>
>>> Hi
>>>
>>> I'm running a Spark job in which I am appending new data into Parquet
>>> file. At last, I make a log entry in my Dynamodb table stating the number
>>> of records appended, time etc. Instead of one single entry in the database,
>>> multiple entries are being made to it. Is it because of parallel execution
>>> of code in workers? If it is so then how can I solve it so that it only
>>> writes once.
>>>
>>> *Thanks!*
>>>
>>> *Cheers!*
>>>
>>> Harsh Choudhary
>>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>

Reply via email to