RE: [External] Re: Sorting in Spark on multiple partitions

2018-06-06 Thread Sing, Jasbir
Hi Jorn,

We are using Spark 2.2.0 for our development.
Below is the code snippet for your reference:

var newDf = data.repartition(col(userid)).sortWithinPartitions(sid,time)
newDf.write.format("parquet").saveAsTable("tempData")
newDf.coalesce(1).write.format(outputFormat).option("header", 
"true").save(hdfsUri + destFilePath)

var groupedData = newDf.rdd.map { x => (x.get(0),x)}.groupByKey();

 //Get schema fields of dataframe
 var structFieldArray:Array[StructField] = newDf.schema.fields
 //Create Map for storing Dataframe's columnName,ColumnNumber and their 
dataType
 var i=0
 val cache = collection.mutable.Map[String, DataFrameBO]()
 for(structField<-structFieldArray)
 {
  val dataFrameBO = new 
DataFrameBO(i,structField.name,structField.dataType.typeName)
  cache.put(structField.name, dataFrameBO)
  i = i + 1
 }
 var dfWithoutDuplicateRows = groupedData.mapValues { x => {
  var ls:List[Row]=List()
  var linkedMap = collection.mutable.Map[String, String]()
  val linkedSid =ArrayBuffer.empty[String]


  x.foreach { y => {
 var subpathId = y(cache(sid).columnNumber)
 var salesTimeColumn = y(cache(time).columnNumber)
 var orderId = 
y(cache(orderIdColumnName).columnNumber)

 var seq = ArrayBuffer[Any]()
 for(i <- 0 to (y.size - 2)){
  seq += y(i)
 }

  
if(!linkedSid.contains(y(cache(sid).columnNumber)))
  {
  
if(linkedMap.exists(x => x._1.equals(y(cache(time).columnNumber)) && 
x._2.equals(y(cache(orderIdColumnName).columnNumber
  {
 seq += 0 // Appends 0 to rows which needs 
to be deleted.
  }
  else
  {
  linkedSid += 
y(cache(sid).columnNumber).toString()
  
linkedMap.put(y(cache(time).columnNumber).toString(),y(cache(orderIdColumnName).columnNumber).toString())
  seq += 1  // Appends 1 to rows which need 
not be deleted.
  }
   }
  else
  {
   seq += 0 // Appends 0 to rows which needs to 
be deleted.
  }

 ls::= Row.fromSeq(seq)
  }}
 ls
  }}

  var flatDataframe = dfWithoutDuplicateRows.values.flatMap { x => {x} }
  var finalDF = data.sqlContext.createDataFrame(flatDataframe, newDf.schema)

finalDF should have picked up data on first cum first basis and updated the 
flag accordingly.
Please let me know if you need any other information regarding the same.

From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Monday, June 4, 2018 10:59 PM
To: Jain, Neha T. mailto:neha.t.j...@accenture.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>; Patel, Payal 
mailto:payal.pa...@accenture.com>>; Sing, Jasbir 
mailto:jasbir.s...@accenture.com>>
Subject: Re: [External] Re: Sorting in Spark on multiple partitions

I think also there is a misunderstanding how repartition works. It keeps the 
existing number of partitions, but hash partitions according to userid. Means 
in each partition it is likely to have different user ids.

That would also explain your observed behavior. However without having the full 
source code these are just assumptions.

On 4. Jun 2018, at 17:33, Jain, Neha T. 
mailto:neha.t.j...@accenture.com>> wrote:
Hi Jorn,

I tried removing userid from my sort clause but still the same issue- data not 
sorted.

var newDf = data.repartition(col(userid)).sortWithinPartitions(sid,time)

I am checking the sorting results  by temporary writing this file to Hive as 
well as HDFS. Now, when I see the user wise data it is not sorted.
Attaching the output file for your reference.

On the basis of sorting within userid partitions, I want to add a flag which 
marks first item in the partition as true other items in that partition as 
false.
If my sorting order is disturbed, the flag is wrongly set.

Please suggest what else could be done to fix this very basic scenario of 
sorting in Spark across multiple partitions across multiple nodes.

Thanks & Regards,
Neha Jain

From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Monday, June 4, 2018 10:48 AM
To: Sing, Jasbir mailto:jasbir.s...@accenture.com>>
Cc: user@s

Re: [External] Re: Sorting in Spark on multiple partitions

2018-06-04 Thread Jörn Franke
I think also there is a misunderstanding how repartition works. It keeps the 
existing number of partitions, but hash partitions according to userid. Means 
in each partition it is likely to have different user ids.
 
That would also explain your observed behavior. However without having the full 
source code these are just assumptions.

> On 4. Jun 2018, at 17:33, Jain, Neha T.  wrote:
> 
> Hi Jorn,
>  
> I tried removing userid from my sort clause but still the same issue- data 
> not sorted.
>  
> var newDf = data.repartition(col(userid)).sortWithinPartitions(sid,time)
>  
> I am checking the sorting results  by temporary writing this file to Hive as 
> well as HDFS. Now, when I see the user wise data it is not sorted.
> Attaching the output file for your reference.
>  
> On the basis of sorting within userid partitions, I want to add a flag which 
> marks first item in the partition as true other items in that partition as 
> false.
> If my sorting order is disturbed, the flag is wrongly set.
>  
> Please suggest what else could be done to fix this very basic scenario of 
> sorting in Spark across multiple partitions across multiple nodes.
>  
> Thanks & Regards,
> Neha Jain
>  
> From: Jörn Franke [mailto:jornfra...@gmail.com] 
> Sent: Monday, June 4, 2018 10:48 AM
> To: Sing, Jasbir 
> Cc: user@spark.apache.org; Patel, Payal ; Jain, 
> Neha T. 
> Subject: [External] Re: Sorting in Spark on multiple partitions
>  
> You partition by userid, why do you then sort again by userid in the 
> partition? Can you try to remove userid from the sort? 
>  
> How do you check if the sort is correct or not?
>  
> What is the underlying objective of the sort? Do you have more information on 
> schema and data?
> 
> On 4. Jun 2018, at 05:47, Sing, Jasbir  wrote:
> 
> Hi Team,
>  
> We are currently using Spark 2.2.0 and facing some challenges in sorting of 
> data on multiple partitions.
> We have tried below approaches:
>  
> Spark SQL approach:
> a.  var query = "select * from data distribute by " + userid + " sort by 
> " + userid + ", " + time “
>  
> This query returns correct results in Hive but not in Spark SQL.  
> var newDf = data.repartition(col(userud)).orderBy(userid, time)
> var newDf = data.repartition(col(userid)).sortWithinPartitions(userid,time)
>  
>  
> But none of the above approach is giving correct results for sorting of data.
> Please suggest what could be done for the same.
>  
> Thanks & Regards,
> Neha Jain
>  
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Where allowed by local law, 
> electronic communications with Accenture and its affiliates, including e-mail 
> and instant messaging (including content), may be scanned by our systems for 
> the purposes of information security and assessment of internal compliance 
> with Accenture policy. Your privacy is important to us. Accenture uses your 
> personal data only in compliance with data protection laws. For further 
> information on how Accenture processes your personal data, please see our 
> privacy statement at https://www.accenture.com/us-en/privacy-policy. 
> __
> 
> www.accenture.com
> 


Re: [External] Re: Sorting in Spark on multiple partitions

2018-06-04 Thread Jörn Franke
How do you load the data? How do you write it?
I fear without a full source code it will be difficult to troubleshoot the 
issue.

Which Spark version?

Use case is not yet 100% clear to me. You want to set the row with the 
oldest/newest date to true? I would just use top or something similar when 
processing the data.


> On 4. Jun 2018, at 17:33, Jain, Neha T.  wrote:
> 
> Hi Jorn,
>  
> I tried removing userid from my sort clause but still the same issue- data 
> not sorted.
>  
> var newDf = data.repartition(col(userid)).sortWithinPartitions(sid,time)
>  
> I am checking the sorting results  by temporary writing this file to Hive as 
> well as HDFS. Now, when I see the user wise data it is not sorted.
> Attaching the output file for your reference.
>  
> On the basis of sorting within userid partitions, I want to add a flag which 
> marks first item in the partition as true other items in that partition as 
> false.
> If my sorting order is disturbed, the flag is wrongly set.
>  
> Please suggest what else could be done to fix this very basic scenario of 
> sorting in Spark across multiple partitions across multiple nodes.
>  
> Thanks & Regards,
> Neha Jain
>  
> From: Jörn Franke [mailto:jornfra...@gmail.com] 
> Sent: Monday, June 4, 2018 10:48 AM
> To: Sing, Jasbir 
> Cc: user@spark.apache.org; Patel, Payal ; Jain, 
> Neha T. 
> Subject: [External] Re: Sorting in Spark on multiple partitions
>  
> You partition by userid, why do you then sort again by userid in the 
> partition? Can you try to remove userid from the sort? 
>  
> How do you check if the sort is correct or not?
>  
> What is the underlying objective of the sort? Do you have more information on 
> schema and data?
> 
> On 4. Jun 2018, at 05:47, Sing, Jasbir  wrote:
> 
> Hi Team,
>  
> We are currently using Spark 2.2.0 and facing some challenges in sorting of 
> data on multiple partitions.
> We have tried below approaches:
>  
> Spark SQL approach:
> a.  var query = "select * from data distribute by " + userid + " sort by 
> " + userid + ", " + time “
>  
> This query returns correct results in Hive but not in Spark SQL.  
> var newDf = data.repartition(col(userud)).orderBy(userid, time)
> var newDf = data.repartition(col(userid)).sortWithinPartitions(userid,time)
>  
>  
> But none of the above approach is giving correct results for sorting of data.
> Please suggest what could be done for the same.
>  
> Thanks & Regards,
> Neha Jain
>  
> 
> This message is for the designated recipient only and may contain privileged, 
> proprietary, or otherwise confidential information. If you have received it 
> in error, please notify the sender immediately and delete the original. Any 
> other use of the e-mail by you is prohibited. Where allowed by local law, 
> electronic communications with Accenture and its affiliates, including e-mail 
> and instant messaging (including content), may be scanned by our systems for 
> the purposes of information security and assessment of internal compliance 
> with Accenture policy. Your privacy is important to us. Accenture uses your 
> personal data only in compliance with data protection laws. For further 
> information on how Accenture processes your personal data, please see our 
> privacy statement at https://www.accenture.com/us-en/privacy-policy. 
> __
> 
> www.accenture.com
>