Re: [PySpark Structured Streaming] How to tune .repartition(N) ?

2023-10-04 Thread Raghavendra Ganesh
Hi,
What is the purpose for which you want to use repartition() .. to reduce
the number of files in delta?
Also note that there is an alternative option of using coalesce() instead
of repartition().
--
Raghavendra


On Thu, Oct 5, 2023 at 10:15 AM Shao Yang Hong
 wrote:

> Hi all on user@spark:
>
> We are looking for advice and suggestions on how to tune the
> .repartition() parameter.
>
> We are using Spark Streaming on our data pipeline to consume messages
> and persist them to a Delta Lake
> (https://delta.io/learn/getting-started/).
>
> We read messages from a Kafka topic, then add a generated date column
> as a daily partitioning, and save these records to Delta Lake. We have
> 60 Kafka partitions on the Kafka topic, 15 Spark executor instances
> (so 4 Kafka partitions per executor).
>
> How then, should we use .repartition()? Should we omit this parameter?
> Or set it to 15? or 4?
>
> Our code looks roughly like the below:
>
> ```
> df = (
> spark.readStream.format("kafka")
> .option("kafka.bootstrap.servers", os.environ["KAFKA_BROKERS"])
> .option("subscribe", os.environ["KAFKA_TOPIC"])
> .load()
> )
>
> table = (
> df.select(
> from_protobuf(
> "value", "table", "/opt/protobuf-desc/table.desc"
> ).alias("msg")
> )
> .withColumn("uuid", col("msg.uuid"))
> # etc other columns...
>
> # generated column for daily partitioning in Delta Lake
> .withColumn(CREATED_DATE,
> date_format(from_unixtime("msg.logged_at"), "-MM-dd"))
> .drop("msg")
> )
>
> query = (
> table
> .repartition(10).writeStream
> .queryName(APP_NAME)
> .outputMode("append")
> .format("delta")
> .partitionBy(CREATED_DATE)
> .option("checkpointLocation", os.environ["CHECKPOINT"])
> .start(os.environ["DELTA_PATH"])
> )
>
> query.awaitTermination()
> spark.stop()
> ```
>
> Any advice would be appreciated.
>
> --
> Best Regards,
> Shao Yang HONG
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Incremental Value dependents on another column of Data frame Spark

2023-05-23 Thread Raghavendra Ganesh
Given, you are already stating the above can be imagined as a partition, I
can think of mapPartitions iterator.

  val inputSchema = inputDf.schema
  val outputRdd = inputDf.rdd.mapPartitions(rows => new SomeClass(rows))
  val outputDf = sparkSession.createDataFrame(outputRdd,
inputSchema.add("counter", IntegerType))
}

class SomeClass(rows: Iterator[Row]) extends Iterator[Row] {
  var counter: Int = 0
  override def hasNext: Boolean = rows.hasNext

  override def next(): Row = {
val row = rows.next()
val rowType:String = row.getAs[String]("Type")
if(rowType == "M01")
  counter = 0
else
  counter += 1
Row.fromSeq(row.toSeq ++ Seq(counter))
  }
}

--
Raghavendra


On Tue, May 23, 2023 at 11:44 PM Nipuna Shantha 
wrote:

> Hi all,
>
> This is the sample set of data that I used for this task
>
> [image: image.png]
>
> My expected output is as below
>
> [image: image.png]
>
> My scenario is if Type is M01 the count should be 0 and if Type is M02 it
> should be incremented from 1 or 0 until the sequence of M02 is finished.
> Imagine this as a partition so row numbers cannot jumble. So can you guys
> suggest a method to this scenario. Also for your concern this dataset is
> really large; it has around 1 records and I am using spark with
> scala
>
> Thank You,
> Best Regards
>
>
> 
> Virus-free.www.avast.com
> 
> <#m_4627475067266622656_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>


Re: Spark Aggregator with ARRAY input and ARRAY output

2023-04-23 Thread Raghavendra Ganesh
For simple array types setting encoder to ExpressionEncoder() should work.
--
Raghavendra


On Sun, Apr 23, 2023 at 9:20 PM Thomas Wang  wrote:

> Hi Spark Community,
>
> I'm trying to implement a custom Spark Aggregator (a subclass to
> org.apache.spark.sql.expressions.Aggregator). Correct me if I'm wrong,
> but I'm assuming I will be able to use it as an aggregation function like
> SUM.
>
> What I'm trying to do is that I have a column of ARRAY and I
> would like to GROUP BY another column and perform element-wise SUM if the
> boolean flag is set to True. The result of such aggregation should return
> ARRAY.
>
> Here is my implementation so far:
>
> package mypackage.udf;
>
> import org.apache.spark.sql.Encoder;
> import org.apache.spark.sql.expressions.Aggregator;
>
> import java.util.ArrayList;
> import java.util.List;
>
> public class ElementWiseAgg extends Aggregator, List, 
> List> {
>
> @Override
> public List zero() {
> return new ArrayList<>();
> }
>
> @Override
> public List reduce(List b, List a) {
> if (a == null) return b;
> int diff = a.size() - b.size();
> for (int i = 0; i < diff; i++) {
> b.add(0L);
> }
> for (int i = 0; i < a.size(); i++) {
> if (a.get(i)) b.set(i, b.get(i) + 1);
> }
> return b;
> }
>
> @Override
> public List merge(List b1, List b2) {
> List longer;
> List shorter;
> if (b1.size() > b2.size()) {
> longer = b1;
> shorter = b2;
> } else {
> longer = b2;
> shorter = b1;
> }
> for (int i = 0; i < shorter.size(); i++) {
> longer.set(i, longer.get(i) + shorter.get(i));
> }
> return longer;
> }
>
> @Override
> public List finish(List reduction) {
> return reduction;
> }
>
> @Override
> public Encoder> bufferEncoder() {
> return null;
> }
>
> @Override
> public Encoder> outputEncoder() {
> return null;
> }
> }
>
> The part I'm not quite sure is how to override bufferEncoder and
> outputEncoder. The default Encoders list does not provide encoding for
> List.
>
> Can someone point me to the right direction? Thanks!
>
>
> Thomas
>
>
>


Re: [PySpark] Getting the best row from each group

2022-12-20 Thread Raghavendra Ganesh
you can groupBy(country). and use mapPartitions method in which you can
iterate over all rows keeping 2 variables for maxPopulationSoFar and
corresponding city. Then return the city with max population.
I think as others suggested, it may be possible to use Bucketing, it would
give a more friendly SQL'ish way of doing and but not be the best in
performance as it needs to order/sort.
--
Raghavendra


On Mon, Dec 19, 2022 at 8:57 PM Oliver Ruebenacker <
oliv...@broadinstitute.org> wrote:

>
>  Hello,
>
>   How can I retain from each group only the row for which one value is the
> maximum of the group? For example, imagine a DataFrame containing all major
> cities in the world, with three columns: (1) City name (2) Country (3)
> population. How would I get a DataFrame that only contains the largest city
> in each country? Thanks!
>
>  Best, Oliver
>
> --
> Oliver Ruebenacker, Ph.D. (he)
> Senior Software Engineer, Knowledge Portal Network , 
> Flannick
> Lab , Broad Institute
> 
>


Re: how to add a column for percent

2022-05-23 Thread Raghavendra Ganesh
withColumn takes a column as the second argument, not string.
If you want formatting before show() you can use the round() function.
--
Raghavendra


On Mon, May 23, 2022 at 11:35 AM wilson  wrote:

> hello
>
> how to add a column for percent for the current row of counted data?
>
> scala>
> df2.groupBy("_c1").count.withColumn("percent",f"${col(count)/df2.count}%.2f").show
>
> :30: error: type mismatch;
>
>
> This doesn't work.
>
> so please help. thanks.
>
>
>
>


Re: Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Raghavendra Ganesh
What is optimal depends on the context of the problem.
Is the intent here to find the best solution for top n values with a group
by ?

Both the solutions look sub-optimal to me. Window function would be
expensive as it needs an order by (which a top n solution shouldn't need).
It would be best to just group by department and use an aggregate function
which stores the top n values in a heap.
--
Raghavendra


On Mon, Feb 28, 2022 at 12:01 AM Sid  wrote:

> My bad.
>
> Aggregation Query:
>
> # Write your MySQL query statement below
>
>SELECT D.Name AS Department, E.Name AS Employee, E.Salary AS Salary
> FROM Employee E INNER JOIN Department D ON E.DepartmentId = D.Id
> WHERE (SELECT COUNT(DISTINCT(Salary)) FROM Employee
>WHERE DepartmentId = E.DepartmentId AND Salary > E.Salary) < 3
> ORDER by E.DepartmentId, E.Salary DESC
>
> Time Taken: 1212 ms
>
> Windowing Query:
>
> select Department,Employee,Salary from (
> select d.name as Department, e.name as Employee,e.salary as
> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
> rnk from Department d join Employee e on e.departmentId=d.id ) a where
> rnk<=3
>
> Time Taken: 790 ms
>
> Thanks,
> Sid
>
>
> On Sun, Feb 27, 2022 at 11:35 PM Sean Owen  wrote:
>
>> Those two queries are identical?
>>
>> On Sun, Feb 27, 2022 at 11:30 AM Sid  wrote:
>>
>>> Hi Team,
>>>
>>> I am aware that if windowing functions are used, then at first it loads
>>> the entire dataset into one window,scans and then performs the other
>>> mentioned operations for that particular window which could be slower when
>>> dealing with trillions / billions of records.
>>>
>>> I did a POC where I used an example to find the max 3 highest salary for
>>> an employee per department. So, I wrote a below queries and compared the
>>> time for it:
>>>
>>> Windowing Query:
>>>
>>> select Department,Employee,Salary from (
>>> select d.name as Department, e.name as Employee,e.salary as
>>> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
>>> rnk from Department d join Employee e on e.departmentId=d.id ) a where
>>> rnk<=3
>>>
>>> Time taken: 790 ms
>>>
>>> Aggregation Query:
>>>
>>> select Department,Employee,Salary from (
>>> select d.name as Department, e.name as Employee,e.salary as
>>> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
>>> rnk from Department d join Employee e on e.departmentId=d.id ) a where
>>> rnk<=3
>>>
>>> Time taken: 1212 ms
>>>
>>> But as per my understanding, the aggregation should have run faster. So,
>>> my whole point is if the dataset is huge I should force some kind of map
>>> reduce jobs like we have an option called df.groupby().reduceByGroups()
>>>
>>> So I think the aggregation query is taking more time since the dataset
>>> size here is smaller and as we all know that map reduce works faster when
>>> there is a huge volume of data. Haven't tested it yet on big data but
>>> needed some expert guidance over here.
>>>
>>> Please correct me if I am wrong.
>>>
>>> TIA,
>>> Sid
>>>
>>>
>>>
>>>


Re: how to classify column

2022-02-11 Thread Raghavendra Ganesh
You could use expr() function to achieve the same.

.withColumn("newColumn",expr(s"case when score>3 then 'good' else 'bad'
end"))
--
Raghavendra


On Fri, Feb 11, 2022 at 5:59 PM frakass  wrote:

> Hello
>
> I have a column whose value (Int type as score) is from 0 to 5.
> I want to query that, when the score > 3, classified as "good". else
> classified as "bad".
> How do I implement that? A UDF like something as this?
>
> scala> implicit class Foo(i:Int) {
>   |   def classAs(f:Int=>String) = f(i)
>   | }
> class Foo
>
> scala> 4.classAs { x => if (x > 3) "good" else "bad" }
> val res13: String = good
>
> scala> 2.classAs { x => if (x > 3) "good" else "bad" }
> val res14: String = bad
>
>
> Thank you.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Merge two dataframes

2021-05-12 Thread Raghavendra Ganesh
You can add an extra id column and perform an inner join.

val df1_with_id = df1.withColumn("id", monotonically_increasing_id())

val df2_with_id = df2.withColumn("id", monotonically_increasing_id())

df1_with_id.join(df2_with_id, Seq("id"), "inner").drop("id").show()

+-+-+

|amount_6m|amount_9m|

+-+-+

|  100|  500|

|  200|  600|

|  300|  700|

|  400|  800|

|  500|  900|

+-+-+


--
Raghavendra


On Wed, May 12, 2021 at 6:20 PM kushagra deep 
wrote:

> Hi All,
>
> I have two dataframes
>
> df1
>
> amount_6m
>  100
>  200
>  300
>  400
>  500
>
> And a second data df2 below
>
>  amount_9m
>   500
>   600
>   700
>   800
>   900
>
> The number of rows is same in both dataframes.
>
> Can I merge the two dataframes to achieve below df
>
> df3
>
> amount_6m | amount_9m
> 100   500
>  200  600
>  300  700
>  400  800
>  500  900
>
> Thanks in advance
>
> Reg,
> Kushagra Deep
>
>


Re: How to Spawn Child Thread or Sub-jobs in a Spark Session

2020-12-04 Thread Raghavendra Ganesh
There should not be any need to explicitly make DF-2, DF-3 computation
parallel. Spark generates execution plans and it can decide what can run in
parallel (ideally you should see them running parallel in spark UI).

You need to cache DF-1 if possible (either in memory/disk), otherwise
computation of DF-2 and DF-3 might trigger the DF-1 computation in
duplicate.

--
Raghavendra


On Sat, Dec 5, 2020 at 12:31 AM Artemis User  wrote:

> We have a Spark job that produces a result data frame, say DF-1 at the
> end of the pipeline (i.e. Proc-1).  From DF-1, we need to create two or
> more dataf rames, say DF-2 and DF-3 via additional SQL or ML processes,
> i.e. Proc-2 and Proc-3.  Ideally, we would like to perform Proc-2 and
> Proc-3 in parallel, since Proc-2 and Proc-3 can be executed
> independently, with DF-1 made immutable and DF-2 and DF-3 are
> mutual-exclusive.
>
> Does Spark has some built-in APIs to support spawning sub-jobs in a
> single session?  If multi-threading is needed, what are the common best
> practices in this case?
>
> Thanks in advance for your help!
>
> -- ND
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Count distinct and driver memory

2020-10-19 Thread Raghavendra Ganesh
Spark provides multiple options for caching (including disk). Have you
tried caching to disk ?
--
Raghavendra


On Mon, Oct 19, 2020 at 11:41 PM Lalwani, Jayesh
 wrote:

> I was caching it because I didn't want to re-execute the DAG when I ran
> the count query. If you have a spark application with multiple actions,
> Spark reexecutes the entire DAG for each action unless there is a cache in
> between. I was trying to avoid reloading 1/2 a terabyte of data.  Also,
> cache should use up executor memory, not driver memory.
>
> As it turns out cache was the problem. I didn't expect cache to take
> Executor memory and spill over to disk. I don't know why it's taking driver
> memory. The input data has millions of partitions which results in millions
> of tasks. Perhaps the high memory usage is a side effect of caching the
> results of lots of tasks.
>
> On 10/19/20, 1:27 PM, "Nicolas Paris"  wrote:
>
> CAUTION: This email originated from outside of the organization. Do
> not click links or open attachments unless you can confirm the sender and
> know the content is safe.
>
>
>
> > Before I write the data frame to parquet, I do df.cache. After
> writing
> > the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
> if you write the df to parquet, why would you also cache it ? caching
> by
> default loads the memory. this might affect  later use, such
> collect. the resulting GC can be explained by both caching and collect
>
>
> Lalwani, Jayesh  writes:
>
> > I have a Dataframe with around 6 billion rows, and about 20 columns.
> First of all, I want to write this dataframe out to parquet. The, Out of
> the 20 columns, I have 3 columns of interest, and I want to find how many
> distinct values of the columns are there in the file. I don’t need the
> actual distinct values. I just need the count. I knoe that there are around
> 10-16million distinct values
> >
> > Before I write the data frame to parquet, I do df.cache. After
> writing the file out, I do df.countDistinct(“a”, “b”, “c”).collect()
> >
> > When I run this, I see that the memory usage on my driver steadily
> increases until it starts getting future time outs. I guess it’s spending
> time in GC. Does countDistinct cause this behavior? Does Spark try to get
> all 10 million distinct values into the driver? Is countDistinct not
> recommended for data frames with large number of distinct values?
> >
> > What’s the solution? Should I use approx._count_distinct?
>
>
> --
> nicolas paris
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>