Dynamic value as the offset of lag() function

2023-05-23 Thread Nipuna Shantha
Hi all

This is the sample set of data that I used for this task

,[image: image.png]

My need is to pass count as the offset of lag() function. *[ lag(col(),
lag(count)).over(windowspec) ]* But as the lag function expects lag(Column,
Int) above code does not work.

So can you guys suggest a method to achieve this scenario. I am using spark
3.3.0 with scala



Virus-free.www.avast.com

<#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>


Re: Incremental Value dependents on another column of Data frame Spark

2023-05-23 Thread Raghavendra Ganesh
Given, you are already stating the above can be imagined as a partition, I
can think of mapPartitions iterator.

  val inputSchema = inputDf.schema
  val outputRdd = inputDf.rdd.mapPartitions(rows => new SomeClass(rows))
  val outputDf = sparkSession.createDataFrame(outputRdd,
inputSchema.add("counter", IntegerType))
}

class SomeClass(rows: Iterator[Row]) extends Iterator[Row] {
  var counter: Int = 0
  override def hasNext: Boolean = rows.hasNext

  override def next(): Row = {
val row = rows.next()
val rowType:String = row.getAs[String]("Type")
if(rowType == "M01")
  counter = 0
else
  counter += 1
Row.fromSeq(row.toSeq ++ Seq(counter))
  }
}

--
Raghavendra


On Tue, May 23, 2023 at 11:44 PM Nipuna Shantha 
wrote:

> Hi all,
>
> This is the sample set of data that I used for this task
>
> [image: image.png]
>
> My expected output is as below
>
> [image: image.png]
>
> My scenario is if Type is M01 the count should be 0 and if Type is M02 it
> should be incremented from 1 or 0 until the sequence of M02 is finished.
> Imagine this as a partition so row numbers cannot jumble. So can you guys
> suggest a method to this scenario. Also for your concern this dataset is
> really large; it has around 1 records and I am using spark with
> scala
>
> Thank You,
> Best Regards
>
>
> 
> Virus-free.www.avast.com
> 
> <#m_4627475067266622656_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>


Incremental Value dependents on another column of Data frame Spark

2023-05-23 Thread Nipuna Shantha
Hi all,

This is the sample set of data that I used for this task

[image: image.png]

My expected output is as below

[image: image.png]

My scenario is if Type is M01 the count should be 0 and if Type is M02 it
should be incremented from 1 or 0 until the sequence of M02 is finished.
Imagine this as a partition so row numbers cannot jumble. So can you guys
suggest a method to this scenario. Also for your concern this dataset is
really large; it has around 1 records and I am using spark with
scala

Thank You,
Best Regards


Virus-free.www.avast.com

<#DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>


Re: Shuffle with Window().partitionBy()

2023-05-23 Thread ashok34...@yahoo.com.INVALID
 Thanks great Rauf.
Regards
On Tuesday, 23 May 2023 at 13:18:55 BST, Rauf Khan  
wrote:  
 
 Hi ,
PartitionBy() is analogous to group by, all rows  that will have the 
same value in the specified column will form one window.The data will be 
shuffled to form group.
RegardsRaouf
On Fri, May 12, 2023, 18:48 ashok34...@yahoo.com.INVALID 
 wrote:

Hello,
In Spark windowing does call with  Window().partitionBy() can cause 
shuffle to take place?
If so what is the performance impact if any if the data result set is large.
Thanks
  

Re: Shuffle with Window().partitionBy()

2023-05-23 Thread Rauf Khan
Hi ,

PartitionBy() is analogous to group by, all rows  that will have
the same value in the specified column will form one window.
The data will be shuffled to form group.

Regards
Raouf

On Fri, May 12, 2023, 18:48 ashok34...@yahoo.com.INVALID
 wrote:

> Hello,
>
> In Spark windowing does call with  Window().partitionBy() can
> cause shuffle to take place?
>
> If so what is the performance impact if any if the data result set is
> large.
>
> Thanks
>


cannot load model using pyspark

2023-05-23 Thread second_co...@yahoo.com.INVALID

spark.sparkContext.textFile("s3a://a_bucket/models/random_forest_zepp/bestModel/metadata",
 1).getNumPartitions()
when i run above code, i get below error. Can advice how to troubleshoot? i' 
using spark 3.3.0. the above file path exist. 

---Py4JJavaError
 Traceback (most recent call last)Cell In[16], line 1> 1 
spark.sparkContext.textFile("s3a://a)bucket/models/random_forest_zepp/bestModel/metadata",
 1).getNumPartitions()File /spark/python/lib/pyspark.zip/pyspark/rdd.py:599, in 
RDD.getNumPartitions(self) 589 def getNumPartitions(self) -> int: 590  """ 591  
Returns the number of partitions in RDD 592  (...) 597  2 598  """--> 599 
return self._jrdd.partitions().size()File 
/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321, in 
JavaMember.__call__(self, *args) 1315 command = proto.CALL_COMMAND_NAME +\ 1316 
self.command_header +\ 1317 args_command +\ 1318 proto.END_COMMAND_PART 1320 
answer = self.gateway_client.send_command(command)-> 1321 return_value = 
get_return_value( 1322  answer, self.gateway_client, self.target_id, self.name) 
1324 for temp_arg in temp_args: 1325 temp_arg._detach()File 
/spark/python/lib/pyspark.zip/pyspark/sql/utils.py:190, in 
capture_sql_exception..deco(*a, **kw) 188 def deco(*a: Any, **kw: Any) 
-> Any: 189 try:--> 190 return f(*a, **kw) 191 except Py4JJavaError as e: 192 
converted = convert_exception(e.java_exception)File 
/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py:326, in 
get_return_value(answer, gateway_client, target_id, name) 324 value = 
OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == 
REFERENCE_TYPE:--> 326 raise Py4JJavaError( 327 "An error occurred while 
calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 
raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. 
Trace:\n{3}\n". 332 format(target_id, ".", name, value))Py4JJavaError: An error 
occurred while calling o114.partitions.: java.lang.NullPointerException at 
org.apache.hadoop.mapred.TextInputFormat.isSplitable(TextInputFormat.java:49) 
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:370) 
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:208) at 
org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:292) at 
scala.Option.getOrElse(Option.scala:189) at 
org.apache.spark.rdd.RDD.partitions(RDD.scala:288) at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) 
at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:292) at 
scala.Option.getOrElse(Option.scala:189) at 
org.apache.spark.rdd.RDD.partitions(RDD.scala:288) at 
org.apache.spark.api.java.JavaRDDLike.partitions(JavaRDDLike.scala:61) at 
org.apache.spark.api.java.JavaRDDLike.partitions$(JavaRDDLike.scala:61) at 
org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at 
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at 
py4j.Gateway.invoke(Gateway.java:282) at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at 
py4j.commands.CallCommand.execute(CallCommand.java:79) at 
py4j.GatewayConnection.run(GatewayConnection.java:238) at 
java.lang.Thread.run(Thread.java:750)