HyukjinKwon commented on code in PR #40092:
URL: https://github.com/apache/spark/pull/40092#discussion_r1112016388
##########
python/docs/source/getting_started/quickstart_connect.ipynb:
##########
@@ -0,0 +1,1118 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Quickstart: DataFrame with Spark Connect\n",
+ "\n",
+ "This is a short introduction and quickstart for the DataFrame with Spark
Connect. A DataFrame with Spark Connect is virtually, conceptually identical to
an existing [PySpark
DataFrame](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html?highlight=dataframe#pyspark.sql.DataFrame),
so most of the examples from 'Live Notebook: DataFrame' at [the quickstart
page](https://spark.apache.org/docs/latest/api/python/getting_started/index.html)
can be reused directly.\n",
+ "\n",
+ "However, it does not yet support some key features such as
[RDD](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html?highlight=rdd#pyspark.RDD)
and
[SparkSession.conf](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.conf.html#pyspark.sql.SparkSession.conf),
so you need to consider it when using DataFrame with Spark Connect.\n",
+ "\n",
+ "This notebook shows the basic usages of the DataFrame with Spark Connect
geared mainly for those new to Spark Connect, along with comments of which
features is not supported compare to the existing DataFrame.\n",
+ "\n",
+ "There is also other useful information in Apache Spark documentation
site, see the latest version of [Spark SQL and
DataFrames](https://spark.apache.org/docs/latest/sql-programming-guide.html).\n",
+ "\n",
+ "PySpark applications start with initializing `SparkSession` which is the
entry point of PySpark as below. In case of running it in PySpark shell via
<code>pyspark</code> executable, the shell automatically creates the session in
the variable <code>spark</code> for users."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Spark Connect uses SparkSession from `pyspark.sql.connect.session`
instead of `pyspark.sql.SparkSession`.\n",
+ "from pyspark.sql.connect.session import SparkSession\n",
+ "\n",
+ "spark = SparkSession.builder.getOrCreate()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## DataFrame Creation\n",
+ "\n",
+ "A PySpark DataFrame with Spark Connect can be created via
`pyspark.sql.connect.session.SparkSession.createDataFrame` typically by passing
a list of lists, tuples, dictionaries and `pyspark.sql.Row`s, a [pandas
DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
and an RDD consisting of such a list.\n",
+ "`pyspark.sql.connect.session.SparkSession.createDataFrame` takes the
`schema` argument to specify the schema of the DataFrame. When it is omitted,
PySpark infers the corresponding schema by taking a sample from the data.\n",
+ "\n",
+ "Firstly, you can create a PySpark DataFrame from a list of rows"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 2,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "from datetime import datetime, date\n",
+ "import pandas as pd\n",
+ "from pyspark.sql import Row\n",
+ "\n",
+ "df = spark.createDataFrame([\n",
+ " Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1,
1, 12, 0)),\n",
+ " Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1,
2, 12, 0)),\n",
+ " Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1,
3, 12, 0))\n",
+ "])\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame with an explicit schema."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 3,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df = spark.createDataFrame([\n",
+ " (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),\n",
+ " (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),\n",
+ " (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))\n",
+ "], schema='a long, b double, c string, d date, e timestamp')\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame from a pandas DataFrame"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 4,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "pandas_df = pd.DataFrame({\n",
+ " 'a': [1, 2, 3],\n",
+ " 'b': [2., 3., 4.],\n",
+ " 'c': ['string1', 'string2', 'string3'],\n",
+ " 'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],\n",
+ " 'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0),
datetime(2000, 1, 3, 12, 0)]\n",
+ "})\n",
+ "df = spark.createDataFrame(pandas_df)\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "**[NOT SUPPORTED YET]** Create a PySpark DataFrame from an RDD consisting
of a list of tuples.\n",
+ "\n",
+ "This is **NOT** supported with Spark Connect yet. If you want to use
[SparkContext](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.html?highlight=sparkcontext#pyspark.SparkContext),
use plain PySpark DataFrame instead of DataFrame with Spark Connect."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# rdd = spark.sparkContext.parallelize([\n",
+ "# (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12,
0)),\n",
+ "# (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12,
0)),\n",
+ "# (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12,
0))\n",
+ "# ])\n",
+ "# df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])\n",
+ "# df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The DataFrames created above all have the same results and schema."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|\n",
+ "| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "\n",
+ "root\n",
+ " |-- a: long (nullable = true)\n",
+ " |-- b: double (nullable = true)\n",
+ " |-- c: string (nullable = true)\n",
+ " |-- d: date (nullable = true)\n",
+ " |-- e: timestamp (nullable = true)\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "# All DataFrames above result same.\n",
+ "df.show()\n",
+ "df.printSchema()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Viewing Data\n",
+ "\n",
+ "The top rows of a DataFrame can be displayed using `DataFrame.show()`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "only showing top 1 row\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.show(1)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "**[NOT SUPPORTED YET]** Alternatively, you can enable
`spark.sql.repl.eagerEval.enabled` configuration for the eager evaluation of
PySpark DataFrame in notebooks such as Jupyter. The number of rows to show can
be controlled via `spark.sql.repl.eagerEval.maxNumRows` configuration.\n",
+ "\n",
+ "This is **NOT** supported with Spark Connect yet. If you want to set
various [Spark
Configuration](https://spark.apache.org/docs/latest/configuration.html), use
plain PySpark DataFrame instead of DataFrame with Spark Connect."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# spark.conf.set('spark.sql.repl.eagerEval.enabled', True)\n",
+ "# df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The rows can also be shown vertically. This is useful when rows are too
long to show horizontally."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "-RECORD 0------------------\n",
+ " a | 1 \n",
+ " b | 2.0 \n",
+ " c | string1 \n",
+ " d | 2000-01-01 \n",
+ " e | 2000-01-01 12:00:00 \n",
+ "only showing top 1 row\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.show(1, vertical=True)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "You can see the DataFrame's schema and column names as follows:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 10,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "['a', 'b', 'c', 'd', 'e']"
+ ]
+ },
+ "execution_count": 10,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.columns"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 11,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "root\n",
+ " |-- a: long (nullable = true)\n",
+ " |-- b: double (nullable = true)\n",
+ " |-- c: string (nullable = true)\n",
+ " |-- d: date (nullable = true)\n",
+ " |-- e: timestamp (nullable = true)\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.printSchema()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Show the summary of the DataFrame"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 12,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-------+---+---+-------+\n",
+ "|summary| a| b| c|\n",
+ "+-------+---+---+-------+\n",
+ "| count| 3| 3| 3|\n",
+ "| mean|2.0|3.0| null|\n",
+ "| stddev|1.0|1.0| null|\n",
+ "| min| 1|2.0|string1|\n",
+ "| max| 3|4.0|string3|\n",
+ "+-------+---+---+-------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.select(\"a\", \"b\", \"c\").describe().show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "`DataFrame.collect()` collects the distributed data to the driver side as
the local data in Python. Note that this can throw an out-of-memory error when
the dataset is too large to fit in the driver side because it collects all the
data from executors to the driver side."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 13,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1),
e=datetime.datetime(2000, 1, 1, 12, 0)),\n",
+ " Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1),
e=datetime.datetime(2000, 1, 2, 12, 0)),\n",
+ " Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1),
e=datetime.datetime(2000, 1, 3, 12, 0))]"
+ ]
+ },
+ "execution_count": 13,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.collect()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "In order to avoid throwing an out-of-memory exception, use
`DataFrame.take()` or `DataFrame.tail()`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 14,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1),
e=datetime.datetime(2000, 1, 1, 12, 0))]"
+ ]
+ },
+ "execution_count": 14,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.take(1)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "PySpark DataFrame also provides the conversion back to a [pandas
DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
to leverage pandas API. Note that `toPandas` also collects all data into the
driver side that can easily cause an out-of-memory-error when the data is too
large to fit into the driver side."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 15,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<div>\n",
+ "<style scoped>\n",
+ " .dataframe tbody tr th:only-of-type {\n",
+ " vertical-align: middle;\n",
+ " }\n",
+ "\n",
+ " .dataframe tbody tr th {\n",
+ " vertical-align: top;\n",
+ " }\n",
+ "\n",
+ " .dataframe thead th {\n",
+ " text-align: right;\n",
+ " }\n",
+ "</style>\n",
+ "<table border=\"1\" class=\"dataframe\">\n",
+ " <thead>\n",
+ " <tr style=\"text-align: right;\">\n",
+ " <th></th>\n",
+ " <th>a</th>\n",
+ " <th>b</th>\n",
+ " <th>c</th>\n",
+ " <th>d</th>\n",
+ " <th>e</th>\n",
+ " </tr>\n",
+ " </thead>\n",
+ " <tbody>\n",
+ " <tr>\n",
+ " <th>0</th>\n",
+ " <td>1</td>\n",
+ " <td>2.0</td>\n",
+ " <td>string1</td>\n",
+ " <td>2000-01-01</td>\n",
+ " <td>2000-01-01 12:00:00</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>1</th>\n",
+ " <td>2</td>\n",
+ " <td>3.0</td>\n",
+ " <td>string2</td>\n",
+ " <td>2000-02-01</td>\n",
+ " <td>2000-01-02 12:00:00</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>2</th>\n",
+ " <td>3</td>\n",
+ " <td>4.0</td>\n",
+ " <td>string3</td>\n",
+ " <td>2000-03-01</td>\n",
+ " <td>2000-01-03 12:00:00</td>\n",
+ " </tr>\n",
+ " </tbody>\n",
+ "</table>\n",
+ "</div>"
+ ],
+ "text/plain": [
+ " a b c d e\n",
+ "0 1 2.0 string1 2000-01-01 2000-01-01 12:00:00\n",
+ "1 2 3.0 string2 2000-02-01 2000-01-02 12:00:00\n",
+ "2 3 4.0 string3 2000-03-01 2000-01-03 12:00:00"
+ ]
+ },
+ "execution_count": 15,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.toPandas()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Selecting and Accessing Data\n",
+ "\n",
+ "PySpark DataFrame is lazily evaluated and simply selecting a column does
not trigger the computation but it returns a `Column` instance."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 16,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "Column<'a'>"
+ ]
+ },
+ "execution_count": 16,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.a"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "In fact, most of column-wise operations return `Column`s."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 17,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "True"
+ ]
+ },
+ "execution_count": 17,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "from pyspark.sql import Column\n",
+ "from pyspark.sql.functions import upper\n",
+ "\n",
+ "type(df.c) == type(upper(df.c)) == type(df.c.isNull())"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "These `Column`s can be used to select the columns from a DataFrame. For
example, `DataFrame.select()` takes the `Column` instances that returns another
DataFrame."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 18,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-------+\n",
+ "| c|\n",
+ "+-------+\n",
+ "|string1|\n",
+ "|string2|\n",
+ "|string3|\n",
+ "+-------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.select(df.c).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Assign new `Column` instance."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 19,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+-------+\n",
+ "| a| b| c| d| e|upper_c|\n",
+ "+---+---+-------+----------+-------------------+-------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|\n",
+ "| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|\n",
+ "| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|\n",
+ "+---+---+-------+----------+-------------------+-------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.withColumn('upper_c', upper(df.c)).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "To select a subset of rows, use `DataFrame.filter()`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 20,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.filter(df.a == 1).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Applying a Function\n",
+ "\n",
+ "PySpark supports various UDFs and APIs to allow users to execute Python
native functions. See also the latest [Pandas
UDFs](https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#pandas-udfs-aka-vectorized-udfs)
and [Pandas Function
APIs](https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#pandas-function-apis).
For instance, the example below allows users to directly use the APIs in [a
pandas
Series](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.html)
within Python native function."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 21,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+------------------+\n",
+ "|pandas_plus_one(a)|\n",
+ "+------------------+\n",
+ "| 2|\n",
+ "| 3|\n",
+ "| 4|\n",
+ "+------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "import pandas as pd\n",
+ "from pyspark.sql.functions import pandas_udf\n",
+ "\n",
+ "@pandas_udf('long')\n",
+ "def pandas_plus_one(series: pd.Series) -> pd.Series:\n",
+ " # Simply plus one by using pandas Series.\n",
+ " return series + 1\n",
+ "\n",
+ "df.select(pandas_plus_one(df.a)).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "**[NOT SUPPORTED YET]** Another example is
[mapInPandas](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.mapInPandas.html?highlight=mapinpandas#pyspark.sql.DataFrame.mapInPandas)
which allows users directly use the APIs in a [pandas
DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
without any restrictions such as the result length.\n",
+ "\n",
+ "This is **NOT** supported with Spark Connect yet. If you want to use
`mapInPandas`, use plain PySpark DataFrame instead of DataFrame with Spark
Connect."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 22,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# def pandas_filter_func(iterator):\n",
+ "# for pandas_df in iterator:\n",
+ "# yield pandas_df[pandas_df.a == 1]\n",
+ "\n",
+ "# df.mapInPandas(pandas_filter_func, schema=df.schema).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Grouping Data\n",
+ "\n",
+ "PySpark DataFrame also provides a way of handling grouped data by using
the common approach, split-apply-combine strategy.\n",
+ "It groups the data by a certain condition applies a function to each
group and then combines them back to the DataFrame."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 23,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-----+------+---+---+\n",
+ "|color| fruit| v1| v2|\n",
+ "+-----+------+---+---+\n",
+ "| red|banana| 1| 10|\n",
+ "| blue|banana| 2| 20|\n",
+ "| red|carrot| 3| 30|\n",
+ "| blue| grape| 4| 40|\n",
+ "| red|carrot| 5| 50|\n",
+ "|black|carrot| 6| 60|\n",
+ "| red|banana| 7| 70|\n",
+ "| red| grape| 8| 80|\n",
+ "+-----+------+---+---+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df = spark.createDataFrame([\n",
+ " ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red',
'carrot', 3, 30],\n",
+ " ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black',
'carrot', 6, 60],\n",
+ " ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color',
'fruit', 'v1', 'v2'])\n",
+ "df.show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Grouping and then applying the `avg()` function to the resulting groups."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 24,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-----+-------+-------+\n",
+ "|color|avg(v1)|avg(v2)|\n",
+ "+-----+-------+-------+\n",
+ "| red| 4.8| 48.0|\n",
+ "| blue| 3.0| 30.0|\n",
+ "|black| 6.0| 60.0|\n",
+ "+-----+-------+-------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.groupby('color').avg().show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "**[NOT SUPPORTED YET]** You can also apply a Python native function
against each group by using pandas API.\n",
+ "\n",
+ "This is **NOT** supported with Spark Connect yet. If you want to use
[applyInPandas](file:///Users/haejoon.lee/Desktop/git_store/spark/python/docs/build/html/reference/pyspark.sql/api/pyspark.sql.GroupedData.applyInPandas.html?highlight=applyinpandas#pyspark.sql.GroupedData.applyInPandas),
use plain PySpark DataFrame instead of DataFrame with Spark Connect."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 25,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# def plus_mean(pandas_df):\n",
+ "# return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())\n",
+ "\n",
+ "# df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "**[NOT SUPPORTED YET]** Co-grouping and applying a function.\n",
+ "\n",
+ "This is **NOT** supported with Spark Connect yet. If you want to use
[applyInPandas](file:///Users/haejoon.lee/Desktop/git_store/spark/python/docs/build/html/reference/pyspark.sql/api/pyspark.sql.GroupedData.applyInPandas.html?highlight=applyinpandas#pyspark.sql.GroupedData.applyInPandas),
use plain PySpark DataFrame instead of DataFrame with Spark Connect."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 26,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# df1 = spark.createDataFrame(\n",
+ "# [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0),
(20000102, 2, 4.0)],\n",
+ "# ('time', 'id', 'v1'))\n",
+ "\n",
+ "# df2 = spark.createDataFrame(\n",
+ "# [(20000101, 1, 'x'), (20000101, 2, 'y')],\n",
+ "# ('time', 'id', 'v2'))\n",
+ "\n",
+ "# def merge_ordered(l, r):\n",
+ "# return pd.merge_ordered(l, r)\n",
+ "\n",
+ "# df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(\n",
+ "# merge_ordered, schema='time int, id int, v1 double, v2
string').show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Getting Data In/Out\n",
+ "\n",
+ "CSV is straightforward and easy to use. Parquet and ORC are efficient and
compact file formats to read and write faster.\n",
+ "\n",
+ "There are many other data sources available in PySpark such as JDBC,
text, binaryFile, Avro, etc. See also the latest [Spark SQL, DataFrames and
Datasets
Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html) in
Apache Spark documentation."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### CSV"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 27,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-----+------+---+---+\n",
+ "|color| fruit| v1| v2|\n",
+ "+-----+------+---+---+\n",
+ "|black|carrot| 6| 60|\n",
+ "| blue|banana| 2| 20|\n",
+ "| blue| grape| 4| 40|\n",
+ "| red|banana| 7| 70|\n",
+ "| red|carrot| 3| 30|\n",
+ "| red|carrot| 5| 50|\n",
+ "| red|banana| 1| 10|\n",
+ "| red| grape| 8| 80|\n",
+ "+-----+------+---+---+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.write.csv('foo.csv', header=True)\n",
+ "spark.read.csv('foo.csv', header=True).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Parquet"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 28,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "23/02/20 14:23:05 WARN MemoryManager: Total allocation exceeds 95.00%
(1,020,054,720 bytes) of heap memory\n",
Review Comment:
Please remove these warnings .. by setting `sc.setLogLevel("FATAL")` before
you run this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]