Hi, Mich:

Thanks for your reply, but maybe I didn't make my question clear.

I am looking for a solution to compute the count of each element in an array, 
without "exploding" the array, and output a Map structure as a column.
For example, for an array as ('a', 'b', 'a'), I want to output a column as 
Map('a' -> 2, 'b' -> 1).
I think that "aggregate" function should be able to, using the example shown in 
the link of my original email, as

SELECT aggregate(array('a', 'b', 'a'),
                   map(),
                   (acc, x) -> ???,
                   acc -> acc) AS feq_cnt

Here are my questions:

  *   Is using "map()" above the best way? The "start" structure in this case 
should be Map.empty[String, Int], but of course, it won't work in pure Spark 
SQL, so the best solution I can think of is "map()", and it is a mutable Map.
  *   How to implement the logic in "???" place? If I do it in the Scala, I 
will do "acc.update(x, acc.getOrElse(x, 0) + 1)", which means if element 
exists, plus one for the value; otherwise, start the element with count of 0. 
Of course, the above code wont' work in Spark SQL.
  *   As I said, I am NOT running in either Scale or PySpark session, but in a 
pure Spark SQL.
  *   Is it possible to do the above logic in Spark SQL, without using 
"exploding"?

Thanks

________________________________
From: Mich Talebzadeh <mich.talebza...@gmail.com>
Sent: Saturday, May 6, 2023 4:52 PM
To: Yong Zhang <java8...@hotmail.com>
Cc: user@spark.apache.org <user@spark.apache.org>
Subject: Re: Can Spark SQL (not DataFrame or Dataset) aggregate array into map 
of element of count?

you can create DF from your SQL RS and work with that in Python the way you want

## you don't need all these
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf, col, current_timestamp, lit
from pyspark.sql.types import *
sqltext = """
SELECT aggregate(array(1, 2, 3, 4),
                   named_struct('sum', 0, 'cnt', 0),
                   (acc, x) -> named_struct('sum', acc.sum + x, 'cnt', acc.cnt 
+ 1),
                   acc -> acc.sum / acc.cnt) AS avg
"""
df = spark.sql(sqltext)
df.printSchema()

root
 |-- avg: double (nullable = true)


Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


 
[https://ci3.googleusercontent.com/mail-sig/AIorK4zholKucR2Q9yMrKbHNn-o1TuS4mYXyi2KO6Xmx6ikHPySa9MLaLZ8t2hrA6AUcxSxDgHIwmKE]
   view my Linkedin 
profile<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Fri, 5 May 2023 at 20:33, Yong Zhang 
<java8...@hotmail.com<mailto:java8...@hotmail.com>> wrote:
Hi, This is on Spark 3.1 environment.

For some reason, I can ONLY do this in Spark SQL, instead of either Scala or 
PySpark environment.

I want to aggregate an array into a Map of element count, within that array, 
but in Spark SQL.
I know that there is an aggregate function available like

aggregate(expr, start, merge [, finish])

But I want to know if this can be done in the Spark SQL only, and:

  *   How to represent an empty Map as "start" element above
  *   How to merge each element (as String type) into Map (as adding count if 
exist in the Map, or add as (element -> 1) as new entry in the Map if not exist)

Like the following example -> 
https://docs.databricks.com/sql/language-manual/functions/aggregate.html

SELECT aggregate(array(1, 2, 3, 4),
                   named_struct('sum', 0, 'cnt', 0),
                   (acc, x) -> named_struct('sum', acc.sum + x, 'cnt', acc.cnt 
+ 1),
                   acc -> acc.sum / acc.cnt) AS avg

I wonder:
select
      aggregate(
      array('a','b','a')),
      map('', 0),
      (acc, x) -> ???
      acc -> acc) as output

How to do the logic after "(acc, x) -> ", so I can output a map of count of 
each element in the array?
I know I can "explode", then groupby + count, but since I have multi array 
columns need to transform, so I want to do more a high order function way, and 
in pure Spark SQL.

Thanks

Reply via email to