That is unfortunate, but 3.4.0 is around the corner, really!

Well, then based on your code, I'd suggest two improvements:
- cache your dataframe after reading, this way, you don't read the entire file for each column - do your outer for loop in parallel, then you have N parallel Spark jobs (only helps if your Spark cluster is not fully occupied by a single column)

Your withColumn-approach does not work because withColumn expects a column as the second argument, but df.select(columnName).distinct() is a DataFrame and .col is a column in *that* DataFrame, it is not a column of the dataframe that you call withColumn on.

It should read:

Scala:
df.select(df.columns.map(column => collect_set(col(column)).as(column)): _*).show()

Java:
for (String columnName : df.columns()) {
    df= df.withColumn(columnName, collect_set(col(columnName)).as(columnName));
}

Then you have a single DataFrame that computes all columns in a single Spark job.

But this reads all distinct values into a single partition, which has the same downside as collect, so this is as bad as using collect.

Cheers,
Enrico


Am 12.02.23 um 18:05 schrieb sam smith:
@Enrico Minack <mailto:enrico-min...@gmx.de> Thanks for "unpivot" but I am using version 3.3.0 (you are taking it way too far as usual :) ) @Sean Owen <mailto:sro...@gmail.com> Pls then show me how it can be improved by code.

Also, why such an approach (using withColumn() ) doesn't work:

for (String columnName : df.columns()) {
    df= df.withColumn(columnName, df.select(columnName).distinct().col(columnName));
}

Le sam. 11 févr. 2023 à 13:11, Enrico Minack <i...@enrico.minack.dev> a écrit :

    You could do the entire thing in DataFrame world and write the
    result to disk. All you need is unpivot (to be released in Spark
    3.4.0, soon).

    Note this is Scala but should be straightforward to translate into
    Java:

    import org.apache.spark.sql.functions.collect_set

    val df = Seq((1, 10, 123), (2, 20, 124), (3, 20, 123), (4, 10,
    123)).toDF("a", "b", "c")

    df.unpivot(Array.empty, "column", "value")
      .groupBy("column")
      .agg(collect_set("value").as("distinct_values"))

    The unpivot operation turns
    +---+---+---+
    |  a|  b|  c|
    +---+---+---+
    |  1| 10|123|
    |  2| 20|124|
    |  3| 20|123|
    |  4| 10|123|
    +---+---+---+

    into

    +------+-----+
    |column|value|
    +------+-----+
    |     a|    1|
    |     b|   10|
    |     c|  123|
    |     a|    2|
    |     b|   20|
    |     c|  124|
    |     a|    3|
    |     b|   20|
    |     c|  123|
    |     a|    4|
    |     b|   10|
    |     c|  123|
    +------+-----+

    The
    groupBy("column").agg(collect_set("value").as("distinct_values"))
    collects distinct values per column:
    +------+---------------+
    |column|distinct_values|
    +------+---------------+
    |     c|     [123, 124]|
    |     b|       [20, 10]|
    |     a|   [1, 2, 3, 4]|
    +------+---------------+

    Note that unpivot only works if all columns have a "common" type.
    Then all columns are cast to that common type. If you have
    incompatible types like Integer and String, you would have to cast
    them all to String first:

    import org.apache.spark.sql.types.StringType

    df.select(df.columns.map(col(_).cast(StringType)): _*).unpivot(...)

    If you want to preserve the type of the values and have multiple
    value types, you cannot put everything into a DataFrame with one
    distinct_values column. You could still have multiple DataFrames,
    one per data type, and write those, or collect the DataFrame's
    values into Maps:

    import scala.collection.immutable

    import org.apache.spark.sql.DataFrame
    import org.apache.spark.sql.functions.collect_set

    // if all you columns have the same type
    def distinctValuesPerColumnOneType(df: DataFrame):
    immutable.Map[String, immutable.Seq[Any]] = {
      df.unpivot(Array.empty, "column", "value")
        .groupBy("column")
        .agg(collect_set("value").as("distinct_values"))
        .collect()
        .map(row => row.getString(0) -> row.getSeq[Any](1).toList)
        .toMap
    }


    // if your columns have different types
    def distinctValuesPerColumn(df: DataFrame): immutable.Map[String,
    immutable.Seq[Any]] = {
      df.schema.fields
        .groupBy(_.dataType)
        .mapValues(_.map(_.name))
        .par
        .map { case (dataType, columns) => df.select(columns.map(col):
    _*) }
        .map(distinctValuesPerColumnOneType)
        .flatten
        .toList
        .toMap
    }

    val df = Seq((1, 10, "one"), (2, 20, "two"), (3, 20, "one"), (4,
    10, "one")).toDF("a", "b", "c")
    distinctValuesPerColumn(df)

    The result is: (list values are of original type)
    Map(b -> List(20, 10), a -> List(1, 2, 3, 4), c -> List(one, two))

    Hope this helps,
    Enrico


    Am 10.02.23 um 22:56 schrieb sam smith:
    Hi Apotolos,
    Can you suggest a better approach while keeping values within a
    dataframe?

    Le ven. 10 févr. 2023 à 22:47, Apostolos N. Papadopoulos
    <papad...@csd.auth.gr> a écrit :

        Dear Sam,

        you are assuming that the data fits in the memory of your
        local machine. You are using as a basis a dataframe, which
        potentially can be very large, and then you are storing the
        data in local lists. Keep in mind that that the number of
        distinct elements in a column may be very large (depending on
        the app). I suggest to work on a solution that assumes that
        the number of distinct values is also large. Thus, you should
        keep your data in dataframes or RDDs, and store them as csv
        files, parquet, etc.

        a.p.


        On 10/2/23 23:40, sam smith wrote:
        I want to get the distinct values of each column in a List
        (is it good practice to use List here?), that contains as
        first element the column name, and the other element its
        distinct values so that for a dataset we get a list of
        lists, i do it this way (in my opinion no so fast):

        |List<List<String>> finalList = new
        ArrayList<List<String>>(); Dataset<Row> df =
        spark.read().format("csv").option("header",
        "true").load("/pathToCSV"); String[] columnNames =
        df.columns(); for (int i=0;i<columnNames.length;i++) {
        List<String> columnList = new ArrayList<String>();
        columnList.add(columnNames[i]); List<Row> columnValues =
        
df.filter(org.apache.spark.sql.functions.col(columnNames[i]).isNotNull()).select(columnNames[i]).distinct().collectAsList();
        for (int j=0;j<columnValues.size();j++)
        columnList.add(columnValues.get(j).apply(0).toString());
        finalList.add(columnList);|

        How to improve this?

        Also, can I get the results in JSON format?

-- Apostolos N. Papadopoulos, Associate Professor
        Department of Informatics
        Aristotle University of Thessaloniki
        Thessaloniki, GREECE
        tel: ++0030312310991918
        email:papad...@csd.auth.gr
        twitter: @papadopoulos_ap
        web:http://datalab.csd.auth.gr/~apostol


Reply via email to