Hi @Marco, the multiple rows written are not dupes as current timestamp field is different in each of them.
@Ayan I checked and found that my whole code is rerun twice. Although there seems to be no error, is it configurable to re-run by cluster manager? On Tue, Oct 17, 2017 at 6:45 PM, ayan guha <guha.a...@gmail.com> wrote: > It should not be parallel exec as the logging code is called in driver. > Have you checked if your driver is reran by cluster manager due to any > failure or error situation> > > On Tue, Oct 17, 2017 at 11:52 PM, Marco Mistroni <mmistr...@gmail.com> > wrote: > >> Hi >> Uh if the problem is really with parallel exec u can try to call >> repartition(1) before u save >> Alternatively try to store data in a csv file and see if u have same >> behaviour, to exclude dynamodb issues >> Also ..are the multiple rows being written dupes (they have all same >> fields/values)? >> Hth >> >> >> On Oct 17, 2017 1:08 PM, "Harsh Choudhary" <shry.ha...@gmail.com> wrote: >> >>> This is the code - >>> hdfs_path=<path to a file in hdfs> >>> if(hdfs_path.contains(".avro")){ >>> data_df = spark.read.format("com.databri >>> cks.spark.avro").load(hdfs_path) >>> }else if(hdfs_path.contains(".tsv")){ >>> data_df = spark.read.option("delimiter", >>> "\t").option("header","true").csv(hdfs_path) >>> }else if(hdfs_path.contains(".scsv")){ >>> data_df = spark.read.option("delimiter", >>> ";").option("header","true").csv(hdfs_path) >>> }else{ >>> System.exit(1) >>> } >>> data_df = data_df.withColumn("edl_created_by", >>> lit("IndexerSpark")).withColumn("edl_created_at",lit(currentTime)) >>> data_df.write.mode("append").parquet(dest_file) >>> val status1 = AddLogToDynamo(Json.toJson(fil >>> eLineageEntity)(fileLineAgeFormat), conf.getString("lambda.filelin >>> eage.dynamodb.update.function.name"), GetAuth.getLambdaClient) >>> >>> def AddLogToDynamo(updatedLambdaJson: JsValue, updateFunctionName: >>> String,lambdaClient: AWSLambdaClient):String = { >>> System.out.println("new metadata to be updated: >>> "+updatedLambdaJson); >>> val updatelambdaReq:InvokeRequest = new InvokeRequest(); >>> updatelambdaReq.setFunctionName(updateFunctionName); >>> updatelambdaReq.setPayload(updatedLambdaJson.toString()); >>> System.out.println("Calling lambda to add log"); >>> val updateLambdaResult = byteBufferToString(lambdaClien >>> t.invoke(updatelambdaReq).getPayload(),Charset.forName("UTF-8")); >>> return updateLambdaResult; >>> } >>> >>> >>> Harsh Choudhary >>> >>> On Tue, Oct 17, 2017 at 5:32 PM, ayan guha <guha.a...@gmail.com> wrote: >>> >>>> Can you share your code? >>>> >>>> On Tue, 17 Oct 2017 at 10:22 pm, Harsh Choudhary <shry.ha...@gmail.com> >>>> wrote: >>>> >>>>> Hi >>>>> >>>>> I'm running a Spark job in which I am appending new data into Parquet >>>>> file. At last, I make a log entry in my Dynamodb table stating the number >>>>> of records appended, time etc. Instead of one single entry in the >>>>> database, >>>>> multiple entries are being made to it. Is it because of parallel execution >>>>> of code in workers? If it is so then how can I solve it so that it only >>>>> writes once. >>>>> >>>>> *Thanks!* >>>>> >>>>> *Cheers!* >>>>> >>>>> Harsh Choudhary >>>>> >>>> -- >>>> Best Regards, >>>> Ayan Guha >>>> >>> >>> > > > -- > Best Regards, > Ayan Guha >