Hi Sam,

I am curious to know the business use case for this solution if any?


HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 12 Feb 2023 at 17:37, sam smith <qustacksm2123...@gmail.com> wrote:

> @Sean Correct. But I was hoping to improve my solution even more.
>
> Le dim. 12 févr. 2023 à 18:03, Sean Owen <sro...@gmail.com> a écrit :
>
>> That's the answer, except, you can never select a result set into a
>> column right? you just collect() each of those results. Or, what do you
>> want? I'm not clear.
>>
>> On Sun, Feb 12, 2023 at 10:59 AM sam smith <qustacksm2123...@gmail.com>
>> wrote:
>>
>>> @Enrico Minack <enrico-min...@gmx.de> Thanks for "unpivot" but I am
>>> using version 3.3.0 (you are taking it way too far as usual :) )
>>> @Sean Owen <sro...@gmail.com> Pls then show me how it can be improved
>>> by code.
>>>
>>> Also, why such an approach (using withColumn() ) doesn't work:
>>>
>>> for (String columnName : df.columns()) {
>>>     df= df.withColumn(columnName,
>>> df.select(columnName).distinct().col(columnName));
>>> }
>>>
>>> Le sam. 11 févr. 2023 à 13:11, Enrico Minack <i...@enrico.minack.dev> a
>>> écrit :
>>>
>>>> You could do the entire thing in DataFrame world and write the result
>>>> to disk. All you need is unpivot (to be released in Spark 3.4.0, soon).
>>>>
>>>> Note this is Scala but should be straightforward to translate into Java:
>>>>
>>>> import org.apache.spark.sql.functions.collect_set
>>>>
>>>> val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10,
>>>> 123)).toDF("a", "b", "c")
>>>>
>>>> df.unpivot(Array.empty, "column", "value")
>>>>   .groupBy("column")
>>>>   .agg(collect_set("value").as("distinct_values"))
>>>>
>>>> The unpivot operation turns
>>>> +---+---+---+
>>>> |  a|  b|  c|
>>>> +---+---+---+
>>>> |  1| 10|123|
>>>> |  2| 20|124|
>>>> |  3| 20|123|
>>>> |  4| 10|123|
>>>> +---+---+---+
>>>>
>>>> into
>>>>
>>>> +------+-----+
>>>> |column|value|
>>>> +------+-----+
>>>> |     a|    1|
>>>> |     b|   10|
>>>> |     c|  123|
>>>> |     a|    2|
>>>> |     b|   20|
>>>> |     c|  124|
>>>> |     a|    3|
>>>> |     b|   20|
>>>> |     c|  123|
>>>> |     a|    4|
>>>> |     b|   10|
>>>> |     c|  123|
>>>> +------+-----+
>>>>
>>>> The groupBy("column").agg(collect_set("value").as("distinct_values"))
>>>> collects distinct values per column:
>>>> +------+---------------+
>>>>
>>>> |column|distinct_values|
>>>> +------+---------------+
>>>> |     c|     [123, 124]|
>>>> |     b|       [20, 10]|
>>>> |     a|   [1, 2, 3, 4]|
>>>> +------+---------------+
>>>>
>>>> Note that unpivot only works if all columns have a "common" type. Then
>>>> all columns are cast to that common type. If you have incompatible types
>>>> like Integer and String, you would have to cast them all to String first:
>>>>
>>>> import org.apache.spark.sql.types.StringType
>>>>
>>>> df.select(df.columns.map(col(_).cast(StringType)): _*).unpivot(...)
>>>>
>>>> If you want to preserve the type of the values and have multiple value
>>>> types, you cannot put everything into a DataFrame with one
>>>> distinct_values column. You could still have multiple DataFrames, one
>>>> per data type, and write those, or collect the DataFrame's values into 
>>>> Maps:
>>>>
>>>> import scala.collection.immutable
>>>>
>>>> import org.apache.spark.sql.DataFrame
>>>> import org.apache.spark.sql.functions.collect_set
>>>>
>>>> // if all you columns have the same type
>>>> def distinctValuesPerColumnOneType(df: DataFrame):
>>>> immutable.Map[String, immutable.Seq[Any]] = {
>>>>   df.unpivot(Array.empty, "column", "value")
>>>>     .groupBy("column")
>>>>     .agg(collect_set("value").as("distinct_values"))
>>>>     .collect()
>>>>     .map(row => row.getString(0) -> row.getSeq[Any](1).toList)
>>>>     .toMap
>>>> }
>>>>
>>>>
>>>> // if your columns have different types
>>>> def distinctValuesPerColumn(df: DataFrame): immutable.Map[String,
>>>> immutable.Seq[Any]] = {
>>>>   df.schema.fields
>>>>     .groupBy(_.dataType)
>>>>     .mapValues(_.map(_.name))
>>>>     .par
>>>>     .map { case (dataType, columns) => df.select(columns.map(col): _*) }
>>>>     .map(distinctValuesPerColumnOneType)
>>>>     .flatten
>>>>     .toList
>>>>     .toMap
>>>> }
>>>>
>>>> val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4, 10,
>>>> "one")).toDF("a", "b", "c")
>>>> distinctValuesPerColumn(df)
>>>>
>>>> The result is: (list values are of original type)
>>>> Map(b -> List(20, 10), a -> List(1, 2, 3, 4), c -> List(one, two))
>>>>
>>>> Hope this helps,
>>>> Enrico
>>>>
>>>>
>>>> Am 10.02.23 um 22:56 schrieb sam smith:
>>>>
>>>> Hi Apotolos,
>>>> Can you suggest a better approach while keeping values within a
>>>> dataframe?
>>>>
>>>> Le ven. 10 févr. 2023 à 22:47, Apostolos N. Papadopoulos <
>>>> papad...@csd.auth.gr> a écrit :
>>>>
>>>>> Dear Sam,
>>>>>
>>>>> you are assuming that the data fits in the memory of your local
>>>>> machine. You are using as a basis a dataframe, which potentially can be
>>>>> very large, and then you are storing the data in local lists. Keep in mind
>>>>> that that the number of distinct elements in a column may be very large
>>>>> (depending on the app). I suggest to work on a solution that assumes that
>>>>> the number of distinct values is also large. Thus, you should keep your
>>>>> data in dataframes or RDDs, and store them as csv files, parquet, etc.
>>>>>
>>>>> a.p.
>>>>>
>>>>>
>>>>> On 10/2/23 23:40, sam smith wrote:
>>>>>
>>>>> I want to get the distinct values of each column in a List (is it good
>>>>> practice to use List here?), that contains as first element the column
>>>>> name, and the other element its distinct values so that for a dataset we
>>>>> get a list of lists, i do it this way (in my opinion no so fast):
>>>>>
>>>>> List<List<String>> finalList = new ArrayList<List<String>>();
>>>>>     Dataset<Row> df = spark.read().format("csv").option("header", 
>>>>> "true").load("/pathToCSV");
>>>>>     String[] columnNames = df.columns();
>>>>>  for (int i=0;i<columnNames.length;i++) {
>>>>>     List<String> columnList = new ArrayList<String>();
>>>>>
>>>>>     columnList.add(columnNames[i]);
>>>>>
>>>>>
>>>>>     List<Row> columnValues = 
>>>>> df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList();
>>>>>     for (int j=0;j<columnValues.size();j++)
>>>>>         columnList.add(columnValues.get(j).apply(0).toString());
>>>>>
>>>>>     finalList.add(columnList);
>>>>>
>>>>>
>>>>> How to improve this?
>>>>>
>>>>> Also, can I get the results in JSON format?
>>>>>
>>>>> --
>>>>> Apostolos N. Papadopoulos, Associate Professor
>>>>> Department of Informatics
>>>>> Aristotle University of Thessaloniki
>>>>> Thessaloniki, GREECE
>>>>> tel: ++0030312310991918
>>>>> email: papad...@csd.auth.gr
>>>>> twitter: @papadopoulos_ap
>>>>> web: http://datalab.csd.auth.gr/~apostol
>>>>>
>>>>>
>>>>

Reply via email to