nchammas commented on a change in pull request #29491:
URL: https://github.com/apache/spark/pull/29491#discussion_r474018689
##########
File path: dev/lint-python
##########
@@ -196,6 +196,22 @@ function sphinx_test {
return
fi
+ # TODO(SPARK-32666): Install nbsphinx in Jenkins machines
+ PYTHON_HAS_NBSPHINX=$("$PYTHON_EXECUTABLE" -c 'import importlib.util;
print(importlib.util.find_spec("sphinx") is not None)')
Review comment:
Is there anything downstream being driven by the value of this variable?
##########
File path: apt.txt
##########
@@ -0,0 +1 @@
+openjdk-8-jre
Review comment:
What does this do / where is it used?
##########
File path: python/docs/source/getting_started/quickstart.ipynb
##########
@@ -0,0 +1,1091 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Quickstart\n",
+ "\n",
+ "This is a short introduction and quickstart for PySpark DataFrame.
PySpark DataFrame is lazily evaludated and implemented on thetop of
[RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html#overview).
When the data is
[transformed](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations),
it does not actually compute but plans how to compute later. When the
[actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions)
such as `collect()` are explicitly called, the computation starts.\n",
+ "This notebook shows the basic usages of the DataFrame, geared mainly for
new users. You can run the latest version of these examples by yourself on a
live notebook
[here](https://mybinder.org/v2/gh/databricks/apache/master?filepath=python%2Fdocs%2Fsource%2Fgetting_started%2Fquickstart.ipynb).\n",
+ "\n",
+ "There are also other useful information in Apache Spark documentation
site, see the latest version of [Spark SQL and
DataFrames](https://spark.apache.org/docs/latest/sql-programming-guide.html),
[RDD Programming
Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html),
[Structured Streaming Programming
Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html),
[Spark Streaming Programming
Guide](https://spark.apache.org/docs/latest/streaming-programming-guide.html)
and [Machine Learning Library (MLlib)
Guide](https://spark.apache.org/docs/latest/ml-guide.html).\n",
+ "\n",
+ "Usually PySaprk applications start with initializing `SparkSession` which
is the entry point of PySpark as below. In case of running it in PySpark shell
via <code>pyspark</code> executable, the shell automatically creates the
session in the variable <code>spark</code> for users."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from pyspark.sql import SparkSession\n",
+ "\n",
+ "spark = SparkSession.builder.getOrCreate()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## DataFrame Creation\n",
+ "\n",
+ "A PySpark DataFrame can be created via
`pyspark.sql.SparkSession.createDataFrame` typically by passing a list of
lists, tuples, dictionaries and `pyspark.sql.Row`s, a [pandas
DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
and an RDD consisting of such a list.\n",
Review comment:
General comment on this:
> typically by passing a list of ... dictionaries
Inferring the schema from a list of dictionaries is deprecated, even though
I think that would be natural for Python users to think of and understand.
e.g.
```python
>>> spark.createDataFrame([{'a': 5}])
.../python/pyspark/sql/session.py:378: UserWarning: inferring schema from
dict is deprecated,please use pyspark.sql.Row instead
warnings.warn("inferring schema from dict is deprecated,"
DataFrame[a: bigint]
```
##########
File path: python/docs/source/getting_started/quickstart.ipynb
##########
@@ -0,0 +1,1091 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Quickstart\n",
+ "\n",
+ "This is a short introduction and quickstart for PySpark DataFrame.
PySpark DataFrame is lazily evaludated and implemented on thetop of
[RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html#overview).
When the data is
[transformed](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations),
it does not actually compute but plans how to compute later. When the
[actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions)
such as `collect()` are explicitly called, the computation starts.\n",
+ "This notebook shows the basic usages of the DataFrame, geared mainly for
new users. You can run the latest version of these examples by yourself on a
live notebook
[here](https://mybinder.org/v2/gh/databricks/apache/master?filepath=python%2Fdocs%2Fsource%2Fgetting_started%2Fquickstart.ipynb).\n",
+ "\n",
+ "There are also other useful information in Apache Spark documentation
site, see the latest version of [Spark SQL and
DataFrames](https://spark.apache.org/docs/latest/sql-programming-guide.html),
[RDD Programming
Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html),
[Structured Streaming Programming
Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html),
[Spark Streaming Programming
Guide](https://spark.apache.org/docs/latest/streaming-programming-guide.html)
and [Machine Learning Library (MLlib)
Guide](https://spark.apache.org/docs/latest/ml-guide.html).\n",
+ "\n",
+ "Usually PySaprk applications start with initializing `SparkSession` which
is the entry point of PySpark as below. In case of running it in PySpark shell
via <code>pyspark</code> executable, the shell automatically creates the
session in the variable <code>spark</code> for users."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from pyspark.sql import SparkSession\n",
+ "\n",
+ "spark = SparkSession.builder.getOrCreate()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## DataFrame Creation\n",
+ "\n",
+ "A PySpark DataFrame can be created via
`pyspark.sql.SparkSession.createDataFrame` typically by passing a list of
lists, tuples, dictionaries and `pyspark.sql.Row`s, a [pandas
DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
and an RDD consisting of such a list.\n",
+ "`pyspark.sql.SparkSession.createDataFrame` takes the `schema` argument to
specify the schema of the DataFrame. When it is omitted, PySpark infers the
corresponding schema by taking a sample from the data.\n",
+ "\n",
+ "The example below creates a PySpark DataFrame from a list of rows"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 2,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "import datetime\n",
+ "import pandas as pd\n",
+ "from pyspark.sql import Row\n",
+ "\n",
+ "df = spark.createDataFrame([\n",
+ " Row(a=1, b=2., c='string1', d=datetime.date(2000, 1, 1),
e=datetime.datetime(2000, 1, 1, 12, 0)),\n",
+ " Row(a=2, b=3., c='string2', d=datetime.date(2000, 2, 1),
e=datetime.datetime(2000, 1, 2, 12, 0)),\n",
+ " Row(a=4, b=5., c='string3', d=datetime.date(2000, 3, 1),
e=datetime.datetime(2000, 1, 3, 12, 0))\n",
+ "])\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame with the schema."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 3,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df = spark.createDataFrame([\n",
+ " (1, 2., 'string1', datetime.date(2000, 1, 1), datetime.datetime(2000,
1, 1, 12, 0)),\n",
+ " (2, 3., 'string2', datetime.date(2000, 2, 1), datetime.datetime(2000,
1, 2, 12, 0)),\n",
+ " (3, 4., 'string3', datetime.date(2000, 3, 1), datetime.datetime(2000,
1, 3, 12, 0))\n",
+ "], schema='a long, b double, c string, d date, e timestamp')\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame from a pandas DataFrame"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 4,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "pandas_df = pd.DataFrame({\n",
+ " 'a': [1, 2, 3],\n",
+ " 'b': [2., 3., 4.],\n",
+ " 'c': ['string1', 'string2', 'string3'],\n",
+ " 'd': [datetime.date(2000, 1, 1), datetime.date(2000, 2, 1),
datetime.date(2000, 3, 1)],\n",
+ " 'e': [datetime.datetime(2000, 1, 1, 12, 0), datetime.datetime(2000,
1, 2, 12, 0), datetime.datetime(2000, 1, 3, 12, 0)]\n",
+ "})\n",
+ "df = spark.createDataFrame(pandas_df)\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame from an RDD consisting of a list of tuples."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 5,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "rdd = spark.sparkContext.parallelize([\n",
+ " (1, 2., 'string1', datetime.date(2000, 1, 1), datetime.datetime(2000,
1, 1, 12, 0)),\n",
+ " (2, 3., 'string2', datetime.date(2000, 2, 1), datetime.datetime(2000,
1, 2, 12, 0)),\n",
+ " (3, 4., 'string3', datetime.date(2000, 3, 1), datetime.datetime(2000,
1, 3, 12, 0))\n",
+ "])\n",
+ "df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The DataFrames created above all have the same results and schema."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|\n",
+ "| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "\n",
+ "root\n",
+ " |-- a: long (nullable = true)\n",
+ " |-- b: double (nullable = true)\n",
+ " |-- c: string (nullable = true)\n",
+ " |-- d: date (nullable = true)\n",
+ " |-- e: timestamp (nullable = true)\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "# All DataFrames above result same.\n",
+ "df.show()\n",
+ "df.printSchema()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Viewing Data\n",
+ "\n",
+ "The top rows of a DataFrame can be displayed using `DataFrame.show()`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "only showing top 1 row\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.show(1)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Alternatively, you can enable `spark.sql.repl.eagerEval.enabled`
configuration to enable the eager evaluation of PySpark DataFrame in notebooks
such as Jupyter."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<table border='1'>\n",
+ "<tr><th>a</th><th>b</th><th>c</th><th>d</th><th>e</th></tr>\n",
+
"<tr><td>1</td><td>2.0</td><td>string1</td><td>2000-01-01</td><td>2000-01-01
12:00:00</td></tr>\n",
+
"<tr><td>2</td><td>3.0</td><td>string2</td><td>2000-02-01</td><td>2000-01-02
12:00:00</td></tr>\n",
+
"<tr><td>3</td><td>4.0</td><td>string3</td><td>2000-03-01</td><td>2000-01-03
12:00:00</td></tr>\n",
+ "</table>\n"
+ ],
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 8,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "spark.conf.set('spark.sql.repl.eagerEval.enabled', True)\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The rows can also be shown vertically. This is useful when rows are too
long to show horizontally."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "-RECORD 0------------------\n",
+ " a | 1 \n",
+ " b | 2.0 \n",
+ " c | string1 \n",
+ " d | 2000-01-01 \n",
+ " e | 2000-01-01 12:00:00 \n",
+ "only showing top 1 row\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.show(1, vertical=True)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Its schema and column names can be shown as below:"
Review comment:
Wording suggestion: "You can see the DataFrame's schema and column names
as follows:"
##########
File path: python/docs/source/getting_started/quickstart.ipynb
##########
@@ -0,0 +1,1091 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Quickstart\n",
+ "\n",
+ "This is a short introduction and quickstart for PySpark DataFrame.
PySpark DataFrame is lazily evaludated and implemented on thetop of
[RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html#overview).
When the data is
[transformed](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations),
it does not actually compute but plans how to compute later. When the
[actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions)
such as `collect()` are explicitly called, the computation starts.\n",
+ "This notebook shows the basic usages of the DataFrame, geared mainly for
new users. You can run the latest version of these examples by yourself on a
live notebook
[here](https://mybinder.org/v2/gh/databricks/apache/master?filepath=python%2Fdocs%2Fsource%2Fgetting_started%2Fquickstart.ipynb).\n",
+ "\n",
+ "There are also other useful information in Apache Spark documentation
site, see the latest version of [Spark SQL and
DataFrames](https://spark.apache.org/docs/latest/sql-programming-guide.html),
[RDD Programming
Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html),
[Structured Streaming Programming
Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html),
[Spark Streaming Programming
Guide](https://spark.apache.org/docs/latest/streaming-programming-guide.html)
and [Machine Learning Library (MLlib)
Guide](https://spark.apache.org/docs/latest/ml-guide.html).\n",
+ "\n",
+ "Usually PySaprk applications start with initializing `SparkSession` which
is the entry point of PySpark as below. In case of running it in PySpark shell
via <code>pyspark</code> executable, the shell automatically creates the
session in the variable <code>spark</code> for users."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from pyspark.sql import SparkSession\n",
+ "\n",
+ "spark = SparkSession.builder.getOrCreate()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## DataFrame Creation\n",
+ "\n",
+ "A PySpark DataFrame can be created via
`pyspark.sql.SparkSession.createDataFrame` typically by passing a list of
lists, tuples, dictionaries and `pyspark.sql.Row`s, a [pandas
DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
and an RDD consisting of such a list.\n",
+ "`pyspark.sql.SparkSession.createDataFrame` takes the `schema` argument to
specify the schema of the DataFrame. When it is omitted, PySpark infers the
corresponding schema by taking a sample from the data.\n",
+ "\n",
+ "The example below creates a PySpark DataFrame from a list of rows"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 2,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "import datetime\n",
+ "import pandas as pd\n",
+ "from pyspark.sql import Row\n",
+ "\n",
+ "df = spark.createDataFrame([\n",
+ " Row(a=1, b=2., c='string1', d=datetime.date(2000, 1, 1),
e=datetime.datetime(2000, 1, 1, 12, 0)),\n",
+ " Row(a=2, b=3., c='string2', d=datetime.date(2000, 2, 1),
e=datetime.datetime(2000, 1, 2, 12, 0)),\n",
+ " Row(a=4, b=5., c='string3', d=datetime.date(2000, 3, 1),
e=datetime.datetime(2000, 1, 3, 12, 0))\n",
+ "])\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame with the schema."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 3,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df = spark.createDataFrame([\n",
+ " (1, 2., 'string1', datetime.date(2000, 1, 1), datetime.datetime(2000,
1, 1, 12, 0)),\n",
+ " (2, 3., 'string2', datetime.date(2000, 2, 1), datetime.datetime(2000,
1, 2, 12, 0)),\n",
+ " (3, 4., 'string3', datetime.date(2000, 3, 1), datetime.datetime(2000,
1, 3, 12, 0))\n",
+ "], schema='a long, b double, c string, d date, e timestamp')\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame from a pandas DataFrame"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 4,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "pandas_df = pd.DataFrame({\n",
+ " 'a': [1, 2, 3],\n",
+ " 'b': [2., 3., 4.],\n",
+ " 'c': ['string1', 'string2', 'string3'],\n",
+ " 'd': [datetime.date(2000, 1, 1), datetime.date(2000, 2, 1),
datetime.date(2000, 3, 1)],\n",
+ " 'e': [datetime.datetime(2000, 1, 1, 12, 0), datetime.datetime(2000,
1, 2, 12, 0), datetime.datetime(2000, 1, 3, 12, 0)]\n",
+ "})\n",
+ "df = spark.createDataFrame(pandas_df)\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame from an RDD consisting of a list of tuples."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 5,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "rdd = spark.sparkContext.parallelize([\n",
+ " (1, 2., 'string1', datetime.date(2000, 1, 1), datetime.datetime(2000,
1, 1, 12, 0)),\n",
+ " (2, 3., 'string2', datetime.date(2000, 2, 1), datetime.datetime(2000,
1, 2, 12, 0)),\n",
+ " (3, 4., 'string3', datetime.date(2000, 3, 1), datetime.datetime(2000,
1, 3, 12, 0))\n",
+ "])\n",
+ "df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The DataFrames created above all have the same results and schema."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|\n",
+ "| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "\n",
+ "root\n",
+ " |-- a: long (nullable = true)\n",
+ " |-- b: double (nullable = true)\n",
+ " |-- c: string (nullable = true)\n",
+ " |-- d: date (nullable = true)\n",
+ " |-- e: timestamp (nullable = true)\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "# All DataFrames above result same.\n",
+ "df.show()\n",
+ "df.printSchema()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Viewing Data\n",
+ "\n",
+ "The top rows of a DataFrame can be displayed using `DataFrame.show()`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "only showing top 1 row\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.show(1)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Alternatively, you can enable `spark.sql.repl.eagerEval.enabled`
configuration to enable the eager evaluation of PySpark DataFrame in notebooks
such as Jupyter."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<table border='1'>\n",
+ "<tr><th>a</th><th>b</th><th>c</th><th>d</th><th>e</th></tr>\n",
+
"<tr><td>1</td><td>2.0</td><td>string1</td><td>2000-01-01</td><td>2000-01-01
12:00:00</td></tr>\n",
+
"<tr><td>2</td><td>3.0</td><td>string2</td><td>2000-02-01</td><td>2000-01-02
12:00:00</td></tr>\n",
+
"<tr><td>3</td><td>4.0</td><td>string3</td><td>2000-03-01</td><td>2000-01-03
12:00:00</td></tr>\n",
+ "</table>\n"
+ ],
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 8,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "spark.conf.set('spark.sql.repl.eagerEval.enabled', True)\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The rows can also be shown vertically. This is useful when rows are too
long to show horizontally."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "-RECORD 0------------------\n",
+ " a | 1 \n",
+ " b | 2.0 \n",
+ " c | string1 \n",
+ " d | 2000-01-01 \n",
+ " e | 2000-01-01 12:00:00 \n",
+ "only showing top 1 row\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.show(1, vertical=True)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Its schema and column names can be shown as below:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 10,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "['a', 'b', 'c', 'd', 'e']"
+ ]
+ },
+ "execution_count": 10,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.columns"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 11,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "root\n",
+ " |-- a: long (nullable = true)\n",
+ " |-- b: double (nullable = true)\n",
+ " |-- c: string (nullable = true)\n",
+ " |-- d: date (nullable = true)\n",
+ " |-- e: timestamp (nullable = true)\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.printSchema()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Show the summary of the DataFrame"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 12,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-------+---+---+-------+\n",
+ "|summary| a| b| c|\n",
+ "+-------+---+---+-------+\n",
+ "| count| 3| 3| 3|\n",
+ "| mean|2.0|3.0| null|\n",
+ "| stddev|1.0|1.0| null|\n",
+ "| min| 1|2.0|string1|\n",
+ "| max| 3|4.0|string3|\n",
+ "+-------+---+---+-------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.describe().show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "`DataFrame.collect()` collects the distributed data to the driver side as
Python premitive representation. Note that this can throw out-of-memory error
when the dataset is too larget to fit in the driver side because it collects
all the data from executors to the driver side."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 13,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1),
e=datetime.datetime(2000, 1, 1, 12, 0)),\n",
+ " Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1),
e=datetime.datetime(2000, 1, 2, 12, 0)),\n",
+ " Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1),
e=datetime.datetime(2000, 1, 3, 12, 0))]"
+ ]
+ },
+ "execution_count": 13,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.collect()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "In order to avoid throwing an out-of-memory exception, use
`DataFrame.take()` or `DataFrame.tail()`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 14,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1),
e=datetime.datetime(2000, 1, 1, 12, 0))]"
+ ]
+ },
+ "execution_count": 14,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.take(1)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "PySpark DataFrame also provides the conversion back to a [pandas
DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
in order to leverage pandas APIs."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 15,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<div>\n",
+ "<style scoped>\n",
+ " .dataframe tbody tr th:only-of-type {\n",
+ " vertical-align: middle;\n",
+ " }\n",
+ "\n",
+ " .dataframe tbody tr th {\n",
+ " vertical-align: top;\n",
+ " }\n",
+ "\n",
+ " .dataframe thead th {\n",
+ " text-align: right;\n",
+ " }\n",
+ "</style>\n",
+ "<table border=\"1\" class=\"dataframe\">\n",
+ " <thead>\n",
+ " <tr style=\"text-align: right;\">\n",
+ " <th></th>\n",
+ " <th>a</th>\n",
+ " <th>b</th>\n",
+ " <th>c</th>\n",
+ " <th>d</th>\n",
+ " <th>e</th>\n",
+ " </tr>\n",
+ " </thead>\n",
+ " <tbody>\n",
+ " <tr>\n",
+ " <th>0</th>\n",
+ " <td>1</td>\n",
+ " <td>2.0</td>\n",
+ " <td>string1</td>\n",
+ " <td>2000-01-01</td>\n",
+ " <td>2000-01-01 12:00:00</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>1</th>\n",
+ " <td>2</td>\n",
+ " <td>3.0</td>\n",
+ " <td>string2</td>\n",
+ " <td>2000-02-01</td>\n",
+ " <td>2000-01-02 12:00:00</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>2</th>\n",
+ " <td>3</td>\n",
+ " <td>4.0</td>\n",
+ " <td>string3</td>\n",
+ " <td>2000-03-01</td>\n",
+ " <td>2000-01-03 12:00:00</td>\n",
+ " </tr>\n",
+ " </tbody>\n",
+ "</table>\n",
+ "</div>"
+ ],
+ "text/plain": [
+ " a b c d e\n",
+ "0 1 2.0 string1 2000-01-01 2000-01-01 12:00:00\n",
+ "1 2 3.0 string2 2000-02-01 2000-01-02 12:00:00\n",
+ "2 3 4.0 string3 2000-03-01 2000-01-03 12:00:00"
+ ]
+ },
+ "execution_count": 15,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.toPandas()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Selecting and Accessing Data\n",
+ "\n",
+ "PySpark DataFrame is lazily evaluated and simply selecting a column does
not trigger the computation but it returns a `Column` instance."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 16,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "Column<b'a'>"
+ ]
+ },
+ "execution_count": 16,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.a"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "In fact, most of column-weise operations return `Column`s."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 17,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "True"
+ ]
+ },
+ "execution_count": 17,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "from pyspark.sql import Column\n",
+ "from pyspark.sql.functions import upper\n",
+ "\n",
+ "type(df.c) == type(upper(df.c)) == type(df.c.isNull())"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "These `Column`s can be used to select the columns from a DataFrame. For
example, `DataFrame.select()` takes the `Column` instances that returns another
DataFrame."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 18,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-------+\n",
+ "| c|\n",
+ "+-------+\n",
+ "|string1|\n",
+ "|string2|\n",
+ "|string3|\n",
+ "+-------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.select(df.c).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Assign new `Column` instance."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 19,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+-------+\n",
+ "| a| b| c| d| e|upper_c|\n",
+ "+---+---+-------+----------+-------------------+-------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|\n",
+ "| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|\n",
+ "| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|\n",
+ "+---+---+-------+----------+-------------------+-------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.withColumn('upper_c', upper(df.c)).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "In order to select rows instead of columns, use `DataFrame.filter()`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 20,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.filter(df.a == 1).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Applying a Function\n",
+ "\n",
+ "PySpark supports various UDFs and APIs to allow users to execute Python
native functions. See also Pandas UDFs and Pandas Function APIs in User Guide.
For instance, the example below allows users to directly use the APIs in [a
pandas
Series](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.html)
within Python native function."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 21,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+------------------+\n",
+ "|pandas_plus_one(a)|\n",
+ "+------------------+\n",
+ "| 2|\n",
+ "| 3|\n",
+ "| 4|\n",
+ "+------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "import pandas\n",
+ "from pyspark.sql.functions import pandas_udf\n",
+ "\n",
+ "@pandas_udf('long')\n",
+ "def pandas_plus_one(series: pd.Series) -> pd.Series:\n",
+ " # Simply plus one by using pandas Series.\n",
+ " return series + 1\n",
+ "\n",
+ "df.select(pandas_plus_one(df.a)).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Another example is `DataFrame.mapInPandas` which allows users directly
use the APIs in a [pandas
DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
without any restrictions such as the result length."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 22,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "def pandas_filter_func(iterator):\n",
+ " for pandas_df in iterator:\n",
+ " yield pandas_df[pandas_df.a == 1]\n",
+ "\n",
+ "df.mapInPandas(pandas_filter_func, schema=df.schema).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Grouping Data\n",
+ "\n",
+ "PySpark DataFrame also provides a way of handling grouped data by using
the common approach, split-apply-combine strategy.\n",
+ "It groups the data by a certain condition, applies a function to each
group, and then combines them back to the DataFrame."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 23,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-----+------+---+---+\n",
+ "|color| fruit| v1| v2|\n",
+ "+-----+------+---+---+\n",
+ "| red|banana| 1| 10|\n",
+ "| blue|banana| 2| 20|\n",
+ "| red|carrot| 3| 30|\n",
+ "| blue| grape| 4| 40|\n",
+ "| red|carrot| 5| 50|\n",
+ "|black|carrot| 6| 60|\n",
+ "| red|banana| 7| 70|\n",
+ "| red| grape| 8| 80|\n",
+ "+-----+------+---+---+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df = spark.createDataFrame([\n",
+ " ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red',
'carrot', 3, 30],\n",
+ " ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black',
'carrot', 6, 60],\n",
+ " ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color',
'fruit', 'v1', 'v2'])\n",
+ "df.show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Grouping and then applying the avg() function to the resulting groups."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 24,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-----+-------+-------+\n",
+ "|color|avg(v1)|avg(v2)|\n",
+ "+-----+-------+-------+\n",
+ "| red| 4.8| 48.0|\n",
+ "|black| 6.0| 60.0|\n",
+ "| blue| 3.0| 30.0|\n",
+ "+-----+-------+-------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.groupby('color').avg().show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "You can also apply a Python native function against each group by using
pandas APIs."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 25,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-----+------+---+---+\n",
+ "|color| fruit| v1| v2|\n",
+ "+-----+------+---+---+\n",
+ "| red|banana| -3| 10|\n",
+ "| red|carrot| -1| 30|\n",
+ "| red|carrot| 0| 50|\n",
+ "| red|banana| 2| 70|\n",
+ "| red| grape| 3| 80|\n",
+ "|black|carrot| 0| 60|\n",
+ "| blue|banana| -1| 20|\n",
+ "| blue| grape| 1| 40|\n",
+ "+-----+------+---+---+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "def plus_mean(pandas_df):\n",
+ " return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())\n",
+ "\n",
+ "df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Co-grouping and applying a function."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 26,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------+---+---+---+\n",
+ "| time| id| v1| v2|\n",
+ "+--------+---+---+---+\n",
+ "|20000101| 1|1.0| x|\n",
+ "|20000102| 1|3.0| x|\n",
+ "|20000101| 2|2.0| y|\n",
+ "|20000102| 2|4.0| y|\n",
+ "+--------+---+---+---+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df1 = spark.createDataFrame(\n",
+ " [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0),
(20000102, 2, 4.0)],\n",
+ " ('time', 'id', 'v1'))\n",
+ "\n",
+ "df2 = spark.createDataFrame(\n",
+ " [(20000101, 1, 'x'), (20000101, 2, 'y')],\n",
+ " ('time', 'id', 'v2'))\n",
+ "\n",
+ "def asof_join(l, r):\n",
+ " return pd.merge_asof(l, r, on='time', by='id')\n",
+ "\n",
+ "df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(\n",
+ " asof_join, schema='time int, id int, v1 double, v2 string').show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Getting Data in/out\n",
+ "\n",
+ "CSV is straightforward and easy to use. Parquet and ORC are efficient and
compact file formats to read and write faster.\n",
+ "\n",
+ "There are many other data sources available in PySpark such as JDBC,
text, binaryFile, Avro, etc. See also \"Spark SQL, DataFrames and Datasets
Guide\" in Apache Spark documentation."
Review comment:
"See also..." should link to the relevant docs.
##########
File path: postBuild
##########
@@ -0,0 +1,21 @@
+#!/bin/bash
Review comment:
Where/how does this file get used?
##########
File path: python/docs/source/getting_started/quickstart.ipynb
##########
@@ -0,0 +1,1091 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Quickstart\n",
+ "\n",
+ "This is a short introduction and quickstart for PySpark DataFrame.
PySpark DataFrame is lazily evaludated and implemented on thetop of
[RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html#overview).
When the data is
[transformed](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations),
it does not actually compute but plans how to compute later. When the
[actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions)
such as `collect()` are explicitly called, the computation starts.\n",
Review comment:
Wording suggestions:
* "for the PySpark DataFrame API."
* "PySpark DataFrames are lazily evaluated. They are implemented on top of
RDDs"
* "When Spark transforms data, it does not immediately compute the
transformation but plans"
* "When actions such as"
##########
File path: docs/README.md
##########
@@ -63,7 +63,7 @@ See also https://github.com/sphinx-doc/sphinx/issues/7551.
-->
```sh
-$ sudo pip install 'sphinx<3.1.0' mkdocs numpy pydata_sphinx_theme
+$ sudo pip install 'sphinx<3.1.0' mkdocs numpy pydata_sphinx_theme ipython
nbsphinx
Review comment:
The need to repeat these new requirements highlights the utility of
#27928.
##########
File path: python/docs/source/getting_started/quickstart.ipynb
##########
@@ -0,0 +1,1091 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Quickstart\n",
+ "\n",
+ "This is a short introduction and quickstart for PySpark DataFrame.
PySpark DataFrame is lazily evaludated and implemented on thetop of
[RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html#overview).
When the data is
[transformed](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations),
it does not actually compute but plans how to compute later. When the
[actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions)
such as `collect()` are explicitly called, the computation starts.\n",
+ "This notebook shows the basic usages of the DataFrame, geared mainly for
new users. You can run the latest version of these examples by yourself on a
live notebook
[here](https://mybinder.org/v2/gh/databricks/apache/master?filepath=python%2Fdocs%2Fsource%2Fgetting_started%2Fquickstart.ipynb).\n",
+ "\n",
+ "There are also other useful information in Apache Spark documentation
site, see the latest version of [Spark SQL and
DataFrames](https://spark.apache.org/docs/latest/sql-programming-guide.html),
[RDD Programming
Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html),
[Structured Streaming Programming
Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html),
[Spark Streaming Programming
Guide](https://spark.apache.org/docs/latest/streaming-programming-guide.html)
and [Machine Learning Library (MLlib)
Guide](https://spark.apache.org/docs/latest/ml-guide.html).\n",
+ "\n",
+ "Usually PySaprk applications start with initializing `SparkSession` which
is the entry point of PySpark as below. In case of running it in PySpark shell
via <code>pyspark</code> executable, the shell automatically creates the
session in the variable <code>spark</code> for users."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from pyspark.sql import SparkSession\n",
+ "\n",
+ "spark = SparkSession.builder.getOrCreate()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## DataFrame Creation\n",
+ "\n",
+ "A PySpark DataFrame can be created via
`pyspark.sql.SparkSession.createDataFrame` typically by passing a list of
lists, tuples, dictionaries and `pyspark.sql.Row`s, a [pandas
DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
and an RDD consisting of such a list.\n",
+ "`pyspark.sql.SparkSession.createDataFrame` takes the `schema` argument to
specify the schema of the DataFrame. When it is omitted, PySpark infers the
corresponding schema by taking a sample from the data.\n",
+ "\n",
+ "The example below creates a PySpark DataFrame from a list of rows"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 2,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "import datetime\n",
+ "import pandas as pd\n",
+ "from pyspark.sql import Row\n",
+ "\n",
+ "df = spark.createDataFrame([\n",
+ " Row(a=1, b=2., c='string1', d=datetime.date(2000, 1, 1),
e=datetime.datetime(2000, 1, 1, 12, 0)),\n",
+ " Row(a=2, b=3., c='string2', d=datetime.date(2000, 2, 1),
e=datetime.datetime(2000, 1, 2, 12, 0)),\n",
+ " Row(a=4, b=5., c='string3', d=datetime.date(2000, 3, 1),
e=datetime.datetime(2000, 1, 3, 12, 0))\n",
+ "])\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame with the schema."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 3,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df = spark.createDataFrame([\n",
+ " (1, 2., 'string1', datetime.date(2000, 1, 1), datetime.datetime(2000,
1, 1, 12, 0)),\n",
+ " (2, 3., 'string2', datetime.date(2000, 2, 1), datetime.datetime(2000,
1, 2, 12, 0)),\n",
+ " (3, 4., 'string3', datetime.date(2000, 3, 1), datetime.datetime(2000,
1, 3, 12, 0))\n",
+ "], schema='a long, b double, c string, d date, e timestamp')\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame from a pandas DataFrame"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 4,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "pandas_df = pd.DataFrame({\n",
+ " 'a': [1, 2, 3],\n",
+ " 'b': [2., 3., 4.],\n",
+ " 'c': ['string1', 'string2', 'string3'],\n",
+ " 'd': [datetime.date(2000, 1, 1), datetime.date(2000, 2, 1),
datetime.date(2000, 3, 1)],\n",
+ " 'e': [datetime.datetime(2000, 1, 1, 12, 0), datetime.datetime(2000,
1, 2, 12, 0), datetime.datetime(2000, 1, 3, 12, 0)]\n",
+ "})\n",
+ "df = spark.createDataFrame(pandas_df)\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame from an RDD consisting of a list of tuples."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 5,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "rdd = spark.sparkContext.parallelize([\n",
+ " (1, 2., 'string1', datetime.date(2000, 1, 1), datetime.datetime(2000,
1, 1, 12, 0)),\n",
+ " (2, 3., 'string2', datetime.date(2000, 2, 1), datetime.datetime(2000,
1, 2, 12, 0)),\n",
+ " (3, 4., 'string3', datetime.date(2000, 3, 1), datetime.datetime(2000,
1, 3, 12, 0))\n",
+ "])\n",
+ "df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The DataFrames created above all have the same results and schema."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|\n",
+ "| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "\n",
+ "root\n",
+ " |-- a: long (nullable = true)\n",
+ " |-- b: double (nullable = true)\n",
+ " |-- c: string (nullable = true)\n",
+ " |-- d: date (nullable = true)\n",
+ " |-- e: timestamp (nullable = true)\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "# All DataFrames above result same.\n",
+ "df.show()\n",
+ "df.printSchema()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Viewing Data\n",
+ "\n",
+ "The top rows of a DataFrame can be displayed using `DataFrame.show()`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "only showing top 1 row\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.show(1)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Alternatively, you can enable `spark.sql.repl.eagerEval.enabled`
configuration to enable the eager evaluation of PySpark DataFrame in notebooks
such as Jupyter."
Review comment:
Maybe add a note here that you wouldn't want to do this if you were
dealing with really large amounts of data?
##########
File path: python/docs/source/getting_started/quickstart.ipynb
##########
@@ -0,0 +1,1091 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Quickstart\n",
+ "\n",
+ "This is a short introduction and quickstart for PySpark DataFrame.
PySpark DataFrame is lazily evaludated and implemented on thetop of
[RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html#overview).
When the data is
[transformed](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations),
it does not actually compute but plans how to compute later. When the
[actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions)
such as `collect()` are explicitly called, the computation starts.\n",
+ "This notebook shows the basic usages of the DataFrame, geared mainly for
new users. You can run the latest version of these examples by yourself on a
live notebook
[here](https://mybinder.org/v2/gh/databricks/apache/master?filepath=python%2Fdocs%2Fsource%2Fgetting_started%2Fquickstart.ipynb).\n",
+ "\n",
+ "There are also other useful information in Apache Spark documentation
site, see the latest version of [Spark SQL and
DataFrames](https://spark.apache.org/docs/latest/sql-programming-guide.html),
[RDD Programming
Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html),
[Structured Streaming Programming
Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html),
[Spark Streaming Programming
Guide](https://spark.apache.org/docs/latest/streaming-programming-guide.html)
and [Machine Learning Library (MLlib)
Guide](https://spark.apache.org/docs/latest/ml-guide.html).\n",
+ "\n",
+ "Usually PySaprk applications start with initializing `SparkSession` which
is the entry point of PySpark as below. In case of running it in PySpark shell
via <code>pyspark</code> executable, the shell automatically creates the
session in the variable <code>spark</code> for users."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from pyspark.sql import SparkSession\n",
+ "\n",
+ "spark = SparkSession.builder.getOrCreate()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## DataFrame Creation\n",
+ "\n",
+ "A PySpark DataFrame can be created via
`pyspark.sql.SparkSession.createDataFrame` typically by passing a list of
lists, tuples, dictionaries and `pyspark.sql.Row`s, a [pandas
DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
and an RDD consisting of such a list.\n",
+ "`pyspark.sql.SparkSession.createDataFrame` takes the `schema` argument to
specify the schema of the DataFrame. When it is omitted, PySpark infers the
corresponding schema by taking a sample from the data.\n",
+ "\n",
+ "The example below creates a PySpark DataFrame from a list of rows"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 2,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "import datetime\n",
+ "import pandas as pd\n",
+ "from pyspark.sql import Row\n",
+ "\n",
+ "df = spark.createDataFrame([\n",
+ " Row(a=1, b=2., c='string1', d=datetime.date(2000, 1, 1),
e=datetime.datetime(2000, 1, 1, 12, 0)),\n",
+ " Row(a=2, b=3., c='string2', d=datetime.date(2000, 2, 1),
e=datetime.datetime(2000, 1, 2, 12, 0)),\n",
+ " Row(a=4, b=5., c='string3', d=datetime.date(2000, 3, 1),
e=datetime.datetime(2000, 1, 3, 12, 0))\n",
+ "])\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame with the schema."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 3,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df = spark.createDataFrame([\n",
+ " (1, 2., 'string1', datetime.date(2000, 1, 1), datetime.datetime(2000,
1, 1, 12, 0)),\n",
+ " (2, 3., 'string2', datetime.date(2000, 2, 1), datetime.datetime(2000,
1, 2, 12, 0)),\n",
+ " (3, 4., 'string3', datetime.date(2000, 3, 1), datetime.datetime(2000,
1, 3, 12, 0))\n",
+ "], schema='a long, b double, c string, d date, e timestamp')\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame from a pandas DataFrame"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 4,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "pandas_df = pd.DataFrame({\n",
+ " 'a': [1, 2, 3],\n",
+ " 'b': [2., 3., 4.],\n",
+ " 'c': ['string1', 'string2', 'string3'],\n",
+ " 'd': [datetime.date(2000, 1, 1), datetime.date(2000, 2, 1),
datetime.date(2000, 3, 1)],\n",
+ " 'e': [datetime.datetime(2000, 1, 1, 12, 0), datetime.datetime(2000,
1, 2, 12, 0), datetime.datetime(2000, 1, 3, 12, 0)]\n",
+ "})\n",
+ "df = spark.createDataFrame(pandas_df)\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame from an RDD consisting of a list of tuples."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 5,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "rdd = spark.sparkContext.parallelize([\n",
+ " (1, 2., 'string1', datetime.date(2000, 1, 1), datetime.datetime(2000,
1, 1, 12, 0)),\n",
+ " (2, 3., 'string2', datetime.date(2000, 2, 1), datetime.datetime(2000,
1, 2, 12, 0)),\n",
+ " (3, 4., 'string3', datetime.date(2000, 3, 1), datetime.datetime(2000,
1, 3, 12, 0))\n",
+ "])\n",
+ "df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The DataFrames created above all have the same results and schema."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|\n",
+ "| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "\n",
+ "root\n",
+ " |-- a: long (nullable = true)\n",
+ " |-- b: double (nullable = true)\n",
+ " |-- c: string (nullable = true)\n",
+ " |-- d: date (nullable = true)\n",
+ " |-- e: timestamp (nullable = true)\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "# All DataFrames above result same.\n",
+ "df.show()\n",
+ "df.printSchema()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Viewing Data\n",
+ "\n",
+ "The top rows of a DataFrame can be displayed using `DataFrame.show()`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "only showing top 1 row\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.show(1)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Alternatively, you can enable `spark.sql.repl.eagerEval.enabled`
configuration to enable the eager evaluation of PySpark DataFrame in notebooks
such as Jupyter."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<table border='1'>\n",
+ "<tr><th>a</th><th>b</th><th>c</th><th>d</th><th>e</th></tr>\n",
+
"<tr><td>1</td><td>2.0</td><td>string1</td><td>2000-01-01</td><td>2000-01-01
12:00:00</td></tr>\n",
+
"<tr><td>2</td><td>3.0</td><td>string2</td><td>2000-02-01</td><td>2000-01-02
12:00:00</td></tr>\n",
+
"<tr><td>3</td><td>4.0</td><td>string3</td><td>2000-03-01</td><td>2000-01-03
12:00:00</td></tr>\n",
+ "</table>\n"
+ ],
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 8,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "spark.conf.set('spark.sql.repl.eagerEval.enabled', True)\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The rows can also be shown vertically. This is useful when rows are too
long to show horizontally."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "-RECORD 0------------------\n",
+ " a | 1 \n",
+ " b | 2.0 \n",
+ " c | string1 \n",
+ " d | 2000-01-01 \n",
+ " e | 2000-01-01 12:00:00 \n",
+ "only showing top 1 row\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.show(1, vertical=True)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Its schema and column names can be shown as below:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 10,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "['a', 'b', 'c', 'd', 'e']"
+ ]
+ },
+ "execution_count": 10,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.columns"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 11,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "root\n",
+ " |-- a: long (nullable = true)\n",
+ " |-- b: double (nullable = true)\n",
+ " |-- c: string (nullable = true)\n",
+ " |-- d: date (nullable = true)\n",
+ " |-- e: timestamp (nullable = true)\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.printSchema()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Show the summary of the DataFrame"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 12,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-------+---+---+-------+\n",
+ "|summary| a| b| c|\n",
+ "+-------+---+---+-------+\n",
+ "| count| 3| 3| 3|\n",
+ "| mean|2.0|3.0| null|\n",
+ "| stddev|1.0|1.0| null|\n",
+ "| min| 1|2.0|string1|\n",
+ "| max| 3|4.0|string3|\n",
+ "+-------+---+---+-------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.describe().show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "`DataFrame.collect()` collects the distributed data to the driver side as
Python premitive representation. Note that this can throw out-of-memory error
when the dataset is too larget to fit in the driver side because it collects
all the data from executors to the driver side."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 13,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1),
e=datetime.datetime(2000, 1, 1, 12, 0)),\n",
+ " Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1),
e=datetime.datetime(2000, 1, 2, 12, 0)),\n",
+ " Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1),
e=datetime.datetime(2000, 1, 3, 12, 0))]"
+ ]
+ },
+ "execution_count": 13,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.collect()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "In order to avoid throwing an out-of-memory exception, use
`DataFrame.take()` or `DataFrame.tail()`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 14,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1),
e=datetime.datetime(2000, 1, 1, 12, 0))]"
+ ]
+ },
+ "execution_count": 14,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.take(1)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "PySpark DataFrame also provides the conversion back to a [pandas
DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
in order to leverage pandas APIs."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 15,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<div>\n",
+ "<style scoped>\n",
+ " .dataframe tbody tr th:only-of-type {\n",
+ " vertical-align: middle;\n",
+ " }\n",
+ "\n",
+ " .dataframe tbody tr th {\n",
+ " vertical-align: top;\n",
+ " }\n",
+ "\n",
+ " .dataframe thead th {\n",
+ " text-align: right;\n",
+ " }\n",
+ "</style>\n",
+ "<table border=\"1\" class=\"dataframe\">\n",
+ " <thead>\n",
+ " <tr style=\"text-align: right;\">\n",
+ " <th></th>\n",
+ " <th>a</th>\n",
+ " <th>b</th>\n",
+ " <th>c</th>\n",
+ " <th>d</th>\n",
+ " <th>e</th>\n",
+ " </tr>\n",
+ " </thead>\n",
+ " <tbody>\n",
+ " <tr>\n",
+ " <th>0</th>\n",
+ " <td>1</td>\n",
+ " <td>2.0</td>\n",
+ " <td>string1</td>\n",
+ " <td>2000-01-01</td>\n",
+ " <td>2000-01-01 12:00:00</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>1</th>\n",
+ " <td>2</td>\n",
+ " <td>3.0</td>\n",
+ " <td>string2</td>\n",
+ " <td>2000-02-01</td>\n",
+ " <td>2000-01-02 12:00:00</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>2</th>\n",
+ " <td>3</td>\n",
+ " <td>4.0</td>\n",
+ " <td>string3</td>\n",
+ " <td>2000-03-01</td>\n",
+ " <td>2000-01-03 12:00:00</td>\n",
+ " </tr>\n",
+ " </tbody>\n",
+ "</table>\n",
+ "</div>"
+ ],
+ "text/plain": [
+ " a b c d e\n",
+ "0 1 2.0 string1 2000-01-01 2000-01-01 12:00:00\n",
+ "1 2 3.0 string2 2000-02-01 2000-01-02 12:00:00\n",
+ "2 3 4.0 string3 2000-03-01 2000-01-03 12:00:00"
+ ]
+ },
+ "execution_count": 15,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.toPandas()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Selecting and Accessing Data\n",
+ "\n",
+ "PySpark DataFrame is lazily evaluated and simply selecting a column does
not trigger the computation but it returns a `Column` instance."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 16,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "Column<b'a'>"
+ ]
+ },
+ "execution_count": 16,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.a"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "In fact, most of column-weise operations return `Column`s."
Review comment:
Typo: -wise
##########
File path: python/docs/source/getting_started/quickstart.ipynb
##########
@@ -0,0 +1,1091 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Quickstart\n",
+ "\n",
+ "This is a short introduction and quickstart for PySpark DataFrame.
PySpark DataFrame is lazily evaludated and implemented on thetop of
[RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html#overview).
When the data is
[transformed](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations),
it does not actually compute but plans how to compute later. When the
[actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions)
such as `collect()` are explicitly called, the computation starts.\n",
+ "This notebook shows the basic usages of the DataFrame, geared mainly for
new users. You can run the latest version of these examples by yourself on a
live notebook
[here](https://mybinder.org/v2/gh/databricks/apache/master?filepath=python%2Fdocs%2Fsource%2Fgetting_started%2Fquickstart.ipynb).\n",
+ "\n",
+ "There are also other useful information in Apache Spark documentation
site, see the latest version of [Spark SQL and
DataFrames](https://spark.apache.org/docs/latest/sql-programming-guide.html),
[RDD Programming
Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html),
[Structured Streaming Programming
Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html),
[Spark Streaming Programming
Guide](https://spark.apache.org/docs/latest/streaming-programming-guide.html)
and [Machine Learning Library (MLlib)
Guide](https://spark.apache.org/docs/latest/ml-guide.html).\n",
+ "\n",
+ "Usually PySaprk applications start with initializing `SparkSession` which
is the entry point of PySpark as below. In case of running it in PySpark shell
via <code>pyspark</code> executable, the shell automatically creates the
session in the variable <code>spark</code> for users."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from pyspark.sql import SparkSession\n",
+ "\n",
+ "spark = SparkSession.builder.getOrCreate()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## DataFrame Creation\n",
+ "\n",
+ "A PySpark DataFrame can be created via
`pyspark.sql.SparkSession.createDataFrame` typically by passing a list of
lists, tuples, dictionaries and `pyspark.sql.Row`s, a [pandas
DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
and an RDD consisting of such a list.\n",
+ "`pyspark.sql.SparkSession.createDataFrame` takes the `schema` argument to
specify the schema of the DataFrame. When it is omitted, PySpark infers the
corresponding schema by taking a sample from the data.\n",
+ "\n",
+ "The example below creates a PySpark DataFrame from a list of rows"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 2,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "import datetime\n",
+ "import pandas as pd\n",
+ "from pyspark.sql import Row\n",
+ "\n",
+ "df = spark.createDataFrame([\n",
+ " Row(a=1, b=2., c='string1', d=datetime.date(2000, 1, 1),
e=datetime.datetime(2000, 1, 1, 12, 0)),\n",
+ " Row(a=2, b=3., c='string2', d=datetime.date(2000, 2, 1),
e=datetime.datetime(2000, 1, 2, 12, 0)),\n",
+ " Row(a=4, b=5., c='string3', d=datetime.date(2000, 3, 1),
e=datetime.datetime(2000, 1, 3, 12, 0))\n",
+ "])\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame with the schema."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 3,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df = spark.createDataFrame([\n",
+ " (1, 2., 'string1', datetime.date(2000, 1, 1), datetime.datetime(2000,
1, 1, 12, 0)),\n",
+ " (2, 3., 'string2', datetime.date(2000, 2, 1), datetime.datetime(2000,
1, 2, 12, 0)),\n",
+ " (3, 4., 'string3', datetime.date(2000, 3, 1), datetime.datetime(2000,
1, 3, 12, 0))\n",
+ "], schema='a long, b double, c string, d date, e timestamp')\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame from a pandas DataFrame"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 4,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "pandas_df = pd.DataFrame({\n",
+ " 'a': [1, 2, 3],\n",
+ " 'b': [2., 3., 4.],\n",
+ " 'c': ['string1', 'string2', 'string3'],\n",
+ " 'd': [datetime.date(2000, 1, 1), datetime.date(2000, 2, 1),
datetime.date(2000, 3, 1)],\n",
+ " 'e': [datetime.datetime(2000, 1, 1, 12, 0), datetime.datetime(2000,
1, 2, 12, 0), datetime.datetime(2000, 1, 3, 12, 0)]\n",
+ "})\n",
+ "df = spark.createDataFrame(pandas_df)\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame from an RDD consisting of a list of tuples."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 5,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "rdd = spark.sparkContext.parallelize([\n",
+ " (1, 2., 'string1', datetime.date(2000, 1, 1), datetime.datetime(2000,
1, 1, 12, 0)),\n",
+ " (2, 3., 'string2', datetime.date(2000, 2, 1), datetime.datetime(2000,
1, 2, 12, 0)),\n",
+ " (3, 4., 'string3', datetime.date(2000, 3, 1), datetime.datetime(2000,
1, 3, 12, 0))\n",
+ "])\n",
+ "df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The DataFrames created above all have the same results and schema."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|\n",
+ "| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "\n",
+ "root\n",
+ " |-- a: long (nullable = true)\n",
+ " |-- b: double (nullable = true)\n",
+ " |-- c: string (nullable = true)\n",
+ " |-- d: date (nullable = true)\n",
+ " |-- e: timestamp (nullable = true)\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "# All DataFrames above result same.\n",
+ "df.show()\n",
+ "df.printSchema()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Viewing Data\n",
+ "\n",
+ "The top rows of a DataFrame can be displayed using `DataFrame.show()`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "only showing top 1 row\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.show(1)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Alternatively, you can enable `spark.sql.repl.eagerEval.enabled`
configuration to enable the eager evaluation of PySpark DataFrame in notebooks
such as Jupyter."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<table border='1'>\n",
+ "<tr><th>a</th><th>b</th><th>c</th><th>d</th><th>e</th></tr>\n",
+
"<tr><td>1</td><td>2.0</td><td>string1</td><td>2000-01-01</td><td>2000-01-01
12:00:00</td></tr>\n",
+
"<tr><td>2</td><td>3.0</td><td>string2</td><td>2000-02-01</td><td>2000-01-02
12:00:00</td></tr>\n",
+
"<tr><td>3</td><td>4.0</td><td>string3</td><td>2000-03-01</td><td>2000-01-03
12:00:00</td></tr>\n",
+ "</table>\n"
+ ],
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 8,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "spark.conf.set('spark.sql.repl.eagerEval.enabled', True)\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The rows can also be shown vertically. This is useful when rows are too
long to show horizontally."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "-RECORD 0------------------\n",
+ " a | 1 \n",
+ " b | 2.0 \n",
+ " c | string1 \n",
+ " d | 2000-01-01 \n",
+ " e | 2000-01-01 12:00:00 \n",
+ "only showing top 1 row\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.show(1, vertical=True)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Its schema and column names can be shown as below:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 10,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "['a', 'b', 'c', 'd', 'e']"
+ ]
+ },
+ "execution_count": 10,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.columns"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 11,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "root\n",
+ " |-- a: long (nullable = true)\n",
+ " |-- b: double (nullable = true)\n",
+ " |-- c: string (nullable = true)\n",
+ " |-- d: date (nullable = true)\n",
+ " |-- e: timestamp (nullable = true)\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.printSchema()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Show the summary of the DataFrame"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 12,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-------+---+---+-------+\n",
+ "|summary| a| b| c|\n",
+ "+-------+---+---+-------+\n",
+ "| count| 3| 3| 3|\n",
+ "| mean|2.0|3.0| null|\n",
+ "| stddev|1.0|1.0| null|\n",
+ "| min| 1|2.0|string1|\n",
+ "| max| 3|4.0|string3|\n",
+ "+-------+---+---+-------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.describe().show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "`DataFrame.collect()` collects the distributed data to the driver side as
Python premitive representation. Note that this can throw out-of-memory error
when the dataset is too larget to fit in the driver side because it collects
all the data from executors to the driver side."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 13,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1),
e=datetime.datetime(2000, 1, 1, 12, 0)),\n",
+ " Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1),
e=datetime.datetime(2000, 1, 2, 12, 0)),\n",
+ " Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1),
e=datetime.datetime(2000, 1, 3, 12, 0))]"
+ ]
+ },
+ "execution_count": 13,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.collect()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "In order to avoid throwing an out-of-memory exception, use
`DataFrame.take()` or `DataFrame.tail()`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 14,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1),
e=datetime.datetime(2000, 1, 1, 12, 0))]"
+ ]
+ },
+ "execution_count": 14,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.take(1)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "PySpark DataFrame also provides the conversion back to a [pandas
DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
in order to leverage pandas APIs."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 15,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<div>\n",
+ "<style scoped>\n",
+ " .dataframe tbody tr th:only-of-type {\n",
+ " vertical-align: middle;\n",
+ " }\n",
+ "\n",
+ " .dataframe tbody tr th {\n",
+ " vertical-align: top;\n",
+ " }\n",
+ "\n",
+ " .dataframe thead th {\n",
+ " text-align: right;\n",
+ " }\n",
+ "</style>\n",
+ "<table border=\"1\" class=\"dataframe\">\n",
+ " <thead>\n",
+ " <tr style=\"text-align: right;\">\n",
+ " <th></th>\n",
+ " <th>a</th>\n",
+ " <th>b</th>\n",
+ " <th>c</th>\n",
+ " <th>d</th>\n",
+ " <th>e</th>\n",
+ " </tr>\n",
+ " </thead>\n",
+ " <tbody>\n",
+ " <tr>\n",
+ " <th>0</th>\n",
+ " <td>1</td>\n",
+ " <td>2.0</td>\n",
+ " <td>string1</td>\n",
+ " <td>2000-01-01</td>\n",
+ " <td>2000-01-01 12:00:00</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>1</th>\n",
+ " <td>2</td>\n",
+ " <td>3.0</td>\n",
+ " <td>string2</td>\n",
+ " <td>2000-02-01</td>\n",
+ " <td>2000-01-02 12:00:00</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>2</th>\n",
+ " <td>3</td>\n",
+ " <td>4.0</td>\n",
+ " <td>string3</td>\n",
+ " <td>2000-03-01</td>\n",
+ " <td>2000-01-03 12:00:00</td>\n",
+ " </tr>\n",
+ " </tbody>\n",
+ "</table>\n",
+ "</div>"
+ ],
+ "text/plain": [
+ " a b c d e\n",
+ "0 1 2.0 string1 2000-01-01 2000-01-01 12:00:00\n",
+ "1 2 3.0 string2 2000-02-01 2000-01-02 12:00:00\n",
+ "2 3 4.0 string3 2000-03-01 2000-01-03 12:00:00"
+ ]
+ },
+ "execution_count": 15,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.toPandas()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Selecting and Accessing Data\n",
+ "\n",
+ "PySpark DataFrame is lazily evaluated and simply selecting a column does
not trigger the computation but it returns a `Column` instance."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 16,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "Column<b'a'>"
+ ]
+ },
+ "execution_count": 16,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.a"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "In fact, most of column-weise operations return `Column`s."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 17,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "True"
+ ]
+ },
+ "execution_count": 17,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "from pyspark.sql import Column\n",
+ "from pyspark.sql.functions import upper\n",
+ "\n",
+ "type(df.c) == type(upper(df.c)) == type(df.c.isNull())"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "These `Column`s can be used to select the columns from a DataFrame. For
example, `DataFrame.select()` takes the `Column` instances that returns another
DataFrame."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 18,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-------+\n",
+ "| c|\n",
+ "+-------+\n",
+ "|string1|\n",
+ "|string2|\n",
+ "|string3|\n",
+ "+-------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.select(df.c).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Assign new `Column` instance."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 19,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+-------+\n",
+ "| a| b| c| d| e|upper_c|\n",
+ "+---+---+-------+----------+-------------------+-------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|\n",
+ "| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|\n",
+ "| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|\n",
+ "+---+---+-------+----------+-------------------+-------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.withColumn('upper_c', upper(df.c)).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "In order to select rows instead of columns, use `DataFrame.filter()`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 20,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.filter(df.a == 1).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Applying a Function\n",
+ "\n",
+ "PySpark supports various UDFs and APIs to allow users to execute Python
native functions. See also Pandas UDFs and Pandas Function APIs in User Guide.
For instance, the example below allows users to directly use the APIs in [a
pandas
Series](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.html)
within Python native function."
Review comment:
"See also ..."
This part should link to the relevant docs.
##########
File path: python/docs/source/getting_started/quickstart.ipynb
##########
@@ -0,0 +1,1091 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Quickstart\n",
+ "\n",
+ "This is a short introduction and quickstart for PySpark DataFrame.
PySpark DataFrame is lazily evaludated and implemented on thetop of
[RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html#overview).
When the data is
[transformed](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations),
it does not actually compute but plans how to compute later. When the
[actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions)
such as `collect()` are explicitly called, the computation starts.\n",
+ "This notebook shows the basic usages of the DataFrame, geared mainly for
new users. You can run the latest version of these examples by yourself on a
live notebook
[here](https://mybinder.org/v2/gh/databricks/apache/master?filepath=python%2Fdocs%2Fsource%2Fgetting_started%2Fquickstart.ipynb).\n",
+ "\n",
+ "There are also other useful information in Apache Spark documentation
site, see the latest version of [Spark SQL and
DataFrames](https://spark.apache.org/docs/latest/sql-programming-guide.html),
[RDD Programming
Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html),
[Structured Streaming Programming
Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html),
[Spark Streaming Programming
Guide](https://spark.apache.org/docs/latest/streaming-programming-guide.html)
and [Machine Learning Library (MLlib)
Guide](https://spark.apache.org/docs/latest/ml-guide.html).\n",
+ "\n",
+ "Usually PySaprk applications start with initializing `SparkSession` which
is the entry point of PySpark as below. In case of running it in PySpark shell
via <code>pyspark</code> executable, the shell automatically creates the
session in the variable <code>spark</code> for users."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from pyspark.sql import SparkSession\n",
+ "\n",
+ "spark = SparkSession.builder.getOrCreate()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## DataFrame Creation\n",
+ "\n",
+ "A PySpark DataFrame can be created via
`pyspark.sql.SparkSession.createDataFrame` typically by passing a list of
lists, tuples, dictionaries and `pyspark.sql.Row`s, a [pandas
DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
and an RDD consisting of such a list.\n",
+ "`pyspark.sql.SparkSession.createDataFrame` takes the `schema` argument to
specify the schema of the DataFrame. When it is omitted, PySpark infers the
corresponding schema by taking a sample from the data.\n",
+ "\n",
+ "The example below creates a PySpark DataFrame from a list of rows"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 2,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "import datetime\n",
+ "import pandas as pd\n",
+ "from pyspark.sql import Row\n",
+ "\n",
+ "df = spark.createDataFrame([\n",
+ " Row(a=1, b=2., c='string1', d=datetime.date(2000, 1, 1),
e=datetime.datetime(2000, 1, 1, 12, 0)),\n",
+ " Row(a=2, b=3., c='string2', d=datetime.date(2000, 2, 1),
e=datetime.datetime(2000, 1, 2, 12, 0)),\n",
+ " Row(a=4, b=5., c='string3', d=datetime.date(2000, 3, 1),
e=datetime.datetime(2000, 1, 3, 12, 0))\n",
+ "])\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame with the schema."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 3,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df = spark.createDataFrame([\n",
+ " (1, 2., 'string1', datetime.date(2000, 1, 1), datetime.datetime(2000,
1, 1, 12, 0)),\n",
+ " (2, 3., 'string2', datetime.date(2000, 2, 1), datetime.datetime(2000,
1, 2, 12, 0)),\n",
+ " (3, 4., 'string3', datetime.date(2000, 3, 1), datetime.datetime(2000,
1, 3, 12, 0))\n",
+ "], schema='a long, b double, c string, d date, e timestamp')\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame from a pandas DataFrame"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 4,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "pandas_df = pd.DataFrame({\n",
+ " 'a': [1, 2, 3],\n",
+ " 'b': [2., 3., 4.],\n",
+ " 'c': ['string1', 'string2', 'string3'],\n",
+ " 'd': [datetime.date(2000, 1, 1), datetime.date(2000, 2, 1),
datetime.date(2000, 3, 1)],\n",
+ " 'e': [datetime.datetime(2000, 1, 1, 12, 0), datetime.datetime(2000,
1, 2, 12, 0), datetime.datetime(2000, 1, 3, 12, 0)]\n",
+ "})\n",
+ "df = spark.createDataFrame(pandas_df)\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame from an RDD consisting of a list of tuples."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 5,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "rdd = spark.sparkContext.parallelize([\n",
+ " (1, 2., 'string1', datetime.date(2000, 1, 1), datetime.datetime(2000,
1, 1, 12, 0)),\n",
+ " (2, 3., 'string2', datetime.date(2000, 2, 1), datetime.datetime(2000,
1, 2, 12, 0)),\n",
+ " (3, 4., 'string3', datetime.date(2000, 3, 1), datetime.datetime(2000,
1, 3, 12, 0))\n",
+ "])\n",
+ "df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The DataFrames created above all have the same results and schema."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|\n",
+ "| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "\n",
+ "root\n",
+ " |-- a: long (nullable = true)\n",
+ " |-- b: double (nullable = true)\n",
+ " |-- c: string (nullable = true)\n",
+ " |-- d: date (nullable = true)\n",
+ " |-- e: timestamp (nullable = true)\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "# All DataFrames above result same.\n",
+ "df.show()\n",
+ "df.printSchema()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Viewing Data\n",
+ "\n",
+ "The top rows of a DataFrame can be displayed using `DataFrame.show()`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "only showing top 1 row\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.show(1)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Alternatively, you can enable `spark.sql.repl.eagerEval.enabled`
configuration to enable the eager evaluation of PySpark DataFrame in notebooks
such as Jupyter."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<table border='1'>\n",
+ "<tr><th>a</th><th>b</th><th>c</th><th>d</th><th>e</th></tr>\n",
+
"<tr><td>1</td><td>2.0</td><td>string1</td><td>2000-01-01</td><td>2000-01-01
12:00:00</td></tr>\n",
+
"<tr><td>2</td><td>3.0</td><td>string2</td><td>2000-02-01</td><td>2000-01-02
12:00:00</td></tr>\n",
+
"<tr><td>3</td><td>4.0</td><td>string3</td><td>2000-03-01</td><td>2000-01-03
12:00:00</td></tr>\n",
+ "</table>\n"
+ ],
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 8,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "spark.conf.set('spark.sql.repl.eagerEval.enabled', True)\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The rows can also be shown vertically. This is useful when rows are too
long to show horizontally."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "-RECORD 0------------------\n",
+ " a | 1 \n",
+ " b | 2.0 \n",
+ " c | string1 \n",
+ " d | 2000-01-01 \n",
+ " e | 2000-01-01 12:00:00 \n",
+ "only showing top 1 row\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.show(1, vertical=True)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Its schema and column names can be shown as below:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 10,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "['a', 'b', 'c', 'd', 'e']"
+ ]
+ },
+ "execution_count": 10,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.columns"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 11,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "root\n",
+ " |-- a: long (nullable = true)\n",
+ " |-- b: double (nullable = true)\n",
+ " |-- c: string (nullable = true)\n",
+ " |-- d: date (nullable = true)\n",
+ " |-- e: timestamp (nullable = true)\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.printSchema()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Show the summary of the DataFrame"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 12,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-------+---+---+-------+\n",
+ "|summary| a| b| c|\n",
+ "+-------+---+---+-------+\n",
+ "| count| 3| 3| 3|\n",
+ "| mean|2.0|3.0| null|\n",
+ "| stddev|1.0|1.0| null|\n",
+ "| min| 1|2.0|string1|\n",
+ "| max| 3|4.0|string3|\n",
+ "+-------+---+---+-------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.describe().show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "`DataFrame.collect()` collects the distributed data to the driver side as
Python premitive representation. Note that this can throw out-of-memory error
when the dataset is too larget to fit in the driver side because it collects
all the data from executors to the driver side."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 13,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1),
e=datetime.datetime(2000, 1, 1, 12, 0)),\n",
+ " Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1),
e=datetime.datetime(2000, 1, 2, 12, 0)),\n",
+ " Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1),
e=datetime.datetime(2000, 1, 3, 12, 0))]"
+ ]
+ },
+ "execution_count": 13,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.collect()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "In order to avoid throwing an out-of-memory exception, use
`DataFrame.take()` or `DataFrame.tail()`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 14,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1),
e=datetime.datetime(2000, 1, 1, 12, 0))]"
+ ]
+ },
+ "execution_count": 14,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.take(1)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "PySpark DataFrame also provides the conversion back to a [pandas
DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
in order to leverage pandas APIs."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 15,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<div>\n",
+ "<style scoped>\n",
+ " .dataframe tbody tr th:only-of-type {\n",
+ " vertical-align: middle;\n",
+ " }\n",
+ "\n",
+ " .dataframe tbody tr th {\n",
+ " vertical-align: top;\n",
+ " }\n",
+ "\n",
+ " .dataframe thead th {\n",
+ " text-align: right;\n",
+ " }\n",
+ "</style>\n",
+ "<table border=\"1\" class=\"dataframe\">\n",
+ " <thead>\n",
+ " <tr style=\"text-align: right;\">\n",
+ " <th></th>\n",
+ " <th>a</th>\n",
+ " <th>b</th>\n",
+ " <th>c</th>\n",
+ " <th>d</th>\n",
+ " <th>e</th>\n",
+ " </tr>\n",
+ " </thead>\n",
+ " <tbody>\n",
+ " <tr>\n",
+ " <th>0</th>\n",
+ " <td>1</td>\n",
+ " <td>2.0</td>\n",
+ " <td>string1</td>\n",
+ " <td>2000-01-01</td>\n",
+ " <td>2000-01-01 12:00:00</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>1</th>\n",
+ " <td>2</td>\n",
+ " <td>3.0</td>\n",
+ " <td>string2</td>\n",
+ " <td>2000-02-01</td>\n",
+ " <td>2000-01-02 12:00:00</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>2</th>\n",
+ " <td>3</td>\n",
+ " <td>4.0</td>\n",
+ " <td>string3</td>\n",
+ " <td>2000-03-01</td>\n",
+ " <td>2000-01-03 12:00:00</td>\n",
+ " </tr>\n",
+ " </tbody>\n",
+ "</table>\n",
+ "</div>"
+ ],
+ "text/plain": [
+ " a b c d e\n",
+ "0 1 2.0 string1 2000-01-01 2000-01-01 12:00:00\n",
+ "1 2 3.0 string2 2000-02-01 2000-01-02 12:00:00\n",
+ "2 3 4.0 string3 2000-03-01 2000-01-03 12:00:00"
+ ]
+ },
+ "execution_count": 15,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.toPandas()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Selecting and Accessing Data\n",
+ "\n",
+ "PySpark DataFrame is lazily evaluated and simply selecting a column does
not trigger the computation but it returns a `Column` instance."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 16,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "Column<b'a'>"
+ ]
+ },
+ "execution_count": 16,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.a"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "In fact, most of column-weise operations return `Column`s."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 17,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "True"
+ ]
+ },
+ "execution_count": 17,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "from pyspark.sql import Column\n",
+ "from pyspark.sql.functions import upper\n",
+ "\n",
+ "type(df.c) == type(upper(df.c)) == type(df.c.isNull())"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "These `Column`s can be used to select the columns from a DataFrame. For
example, `DataFrame.select()` takes the `Column` instances that returns another
DataFrame."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 18,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-------+\n",
+ "| c|\n",
+ "+-------+\n",
+ "|string1|\n",
+ "|string2|\n",
+ "|string3|\n",
+ "+-------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.select(df.c).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Assign new `Column` instance."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 19,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+-------+\n",
+ "| a| b| c| d| e|upper_c|\n",
+ "+---+---+-------+----------+-------------------+-------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|\n",
+ "| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|\n",
+ "| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|\n",
+ "+---+---+-------+----------+-------------------+-------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.withColumn('upper_c', upper(df.c)).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "In order to select rows instead of columns, use `DataFrame.filter()`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 20,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.filter(df.a == 1).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Applying a Function\n",
+ "\n",
+ "PySpark supports various UDFs and APIs to allow users to execute Python
native functions. See also Pandas UDFs and Pandas Function APIs in User Guide.
For instance, the example below allows users to directly use the APIs in [a
pandas
Series](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.html)
within Python native function."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 21,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+------------------+\n",
+ "|pandas_plus_one(a)|\n",
+ "+------------------+\n",
+ "| 2|\n",
+ "| 3|\n",
+ "| 4|\n",
+ "+------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "import pandas\n",
+ "from pyspark.sql.functions import pandas_udf\n",
+ "\n",
+ "@pandas_udf('long')\n",
+ "def pandas_plus_one(series: pd.Series) -> pd.Series:\n",
+ " # Simply plus one by using pandas Series.\n",
+ " return series + 1\n",
+ "\n",
+ "df.select(pandas_plus_one(df.a)).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Another example is `DataFrame.mapInPandas` which allows users directly
use the APIs in a [pandas
DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
without any restrictions such as the result length."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 22,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "def pandas_filter_func(iterator):\n",
+ " for pandas_df in iterator:\n",
+ " yield pandas_df[pandas_df.a == 1]\n",
+ "\n",
+ "df.mapInPandas(pandas_filter_func, schema=df.schema).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Grouping Data\n",
+ "\n",
+ "PySpark DataFrame also provides a way of handling grouped data by using
the common approach, split-apply-combine strategy.\n",
+ "It groups the data by a certain condition, applies a function to each
group, and then combines them back to the DataFrame."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 23,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-----+------+---+---+\n",
+ "|color| fruit| v1| v2|\n",
+ "+-----+------+---+---+\n",
+ "| red|banana| 1| 10|\n",
+ "| blue|banana| 2| 20|\n",
+ "| red|carrot| 3| 30|\n",
+ "| blue| grape| 4| 40|\n",
+ "| red|carrot| 5| 50|\n",
+ "|black|carrot| 6| 60|\n",
+ "| red|banana| 7| 70|\n",
+ "| red| grape| 8| 80|\n",
+ "+-----+------+---+---+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df = spark.createDataFrame([\n",
+ " ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red',
'carrot', 3, 30],\n",
+ " ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black',
'carrot', 6, 60],\n",
+ " ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color',
'fruit', 'v1', 'v2'])\n",
+ "df.show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Grouping and then applying the avg() function to the resulting groups."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 24,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-----+-------+-------+\n",
+ "|color|avg(v1)|avg(v2)|\n",
+ "+-----+-------+-------+\n",
+ "| red| 4.8| 48.0|\n",
+ "|black| 6.0| 60.0|\n",
+ "| blue| 3.0| 30.0|\n",
+ "+-----+-------+-------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.groupby('color').avg().show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "You can also apply a Python native function against each group by using
pandas APIs."
Review comment:
Ew! 😄
Wouldn't it be better / more useful to show people how to use a plain Python
UDF without Pandas?
##########
File path: python/docs/source/getting_started/quickstart.ipynb
##########
@@ -0,0 +1,1091 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Quickstart\n",
+ "\n",
+ "This is a short introduction and quickstart for PySpark DataFrame.
PySpark DataFrame is lazily evaludated and implemented on thetop of
[RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html#overview).
When the data is
[transformed](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations),
it does not actually compute but plans how to compute later. When the
[actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions)
such as `collect()` are explicitly called, the computation starts.\n",
+ "This notebook shows the basic usages of the DataFrame, geared mainly for
new users. You can run the latest version of these examples by yourself on a
live notebook
[here](https://mybinder.org/v2/gh/databricks/apache/master?filepath=python%2Fdocs%2Fsource%2Fgetting_started%2Fquickstart.ipynb).\n",
+ "\n",
+ "There are also other useful information in Apache Spark documentation
site, see the latest version of [Spark SQL and
DataFrames](https://spark.apache.org/docs/latest/sql-programming-guide.html),
[RDD Programming
Guide](https://spark.apache.org/docs/latest/rdd-programming-guide.html),
[Structured Streaming Programming
Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html),
[Spark Streaming Programming
Guide](https://spark.apache.org/docs/latest/streaming-programming-guide.html)
and [Machine Learning Library (MLlib)
Guide](https://spark.apache.org/docs/latest/ml-guide.html).\n",
+ "\n",
+ "Usually PySaprk applications start with initializing `SparkSession` which
is the entry point of PySpark as below. In case of running it in PySpark shell
via <code>pyspark</code> executable, the shell automatically creates the
session in the variable <code>spark</code> for users."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from pyspark.sql import SparkSession\n",
+ "\n",
+ "spark = SparkSession.builder.getOrCreate()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## DataFrame Creation\n",
+ "\n",
+ "A PySpark DataFrame can be created via
`pyspark.sql.SparkSession.createDataFrame` typically by passing a list of
lists, tuples, dictionaries and `pyspark.sql.Row`s, a [pandas
DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
and an RDD consisting of such a list.\n",
+ "`pyspark.sql.SparkSession.createDataFrame` takes the `schema` argument to
specify the schema of the DataFrame. When it is omitted, PySpark infers the
corresponding schema by taking a sample from the data.\n",
+ "\n",
+ "The example below creates a PySpark DataFrame from a list of rows"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 2,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "import datetime\n",
+ "import pandas as pd\n",
+ "from pyspark.sql import Row\n",
+ "\n",
+ "df = spark.createDataFrame([\n",
+ " Row(a=1, b=2., c='string1', d=datetime.date(2000, 1, 1),
e=datetime.datetime(2000, 1, 1, 12, 0)),\n",
+ " Row(a=2, b=3., c='string2', d=datetime.date(2000, 2, 1),
e=datetime.datetime(2000, 1, 2, 12, 0)),\n",
+ " Row(a=4, b=5., c='string3', d=datetime.date(2000, 3, 1),
e=datetime.datetime(2000, 1, 3, 12, 0))\n",
+ "])\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame with the schema."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 3,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df = spark.createDataFrame([\n",
+ " (1, 2., 'string1', datetime.date(2000, 1, 1), datetime.datetime(2000,
1, 1, 12, 0)),\n",
+ " (2, 3., 'string2', datetime.date(2000, 2, 1), datetime.datetime(2000,
1, 2, 12, 0)),\n",
+ " (3, 4., 'string3', datetime.date(2000, 3, 1), datetime.datetime(2000,
1, 3, 12, 0))\n",
+ "], schema='a long, b double, c string, d date, e timestamp')\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame from a pandas DataFrame"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 4,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "pandas_df = pd.DataFrame({\n",
+ " 'a': [1, 2, 3],\n",
+ " 'b': [2., 3., 4.],\n",
+ " 'c': ['string1', 'string2', 'string3'],\n",
+ " 'd': [datetime.date(2000, 1, 1), datetime.date(2000, 2, 1),
datetime.date(2000, 3, 1)],\n",
+ " 'e': [datetime.datetime(2000, 1, 1, 12, 0), datetime.datetime(2000,
1, 2, 12, 0), datetime.datetime(2000, 1, 3, 12, 0)]\n",
+ "})\n",
+ "df = spark.createDataFrame(pandas_df)\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Create a PySpark DataFrame from an RDD consisting of a list of tuples."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 5,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "rdd = spark.sparkContext.parallelize([\n",
+ " (1, 2., 'string1', datetime.date(2000, 1, 1), datetime.datetime(2000,
1, 1, 12, 0)),\n",
+ " (2, 3., 'string2', datetime.date(2000, 2, 1), datetime.datetime(2000,
1, 2, 12, 0)),\n",
+ " (3, 4., 'string3', datetime.date(2000, 3, 1), datetime.datetime(2000,
1, 3, 12, 0))\n",
+ "])\n",
+ "df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The DataFrames created above all have the same results and schema."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|\n",
+ "| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "\n",
+ "root\n",
+ " |-- a: long (nullable = true)\n",
+ " |-- b: double (nullable = true)\n",
+ " |-- c: string (nullable = true)\n",
+ " |-- d: date (nullable = true)\n",
+ " |-- e: timestamp (nullable = true)\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "# All DataFrames above result same.\n",
+ "df.show()\n",
+ "df.printSchema()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Viewing Data\n",
+ "\n",
+ "The top rows of a DataFrame can be displayed using `DataFrame.show()`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "only showing top 1 row\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.show(1)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Alternatively, you can enable `spark.sql.repl.eagerEval.enabled`
configuration to enable the eager evaluation of PySpark DataFrame in notebooks
such as Jupyter."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<table border='1'>\n",
+ "<tr><th>a</th><th>b</th><th>c</th><th>d</th><th>e</th></tr>\n",
+
"<tr><td>1</td><td>2.0</td><td>string1</td><td>2000-01-01</td><td>2000-01-01
12:00:00</td></tr>\n",
+
"<tr><td>2</td><td>3.0</td><td>string2</td><td>2000-02-01</td><td>2000-01-02
12:00:00</td></tr>\n",
+
"<tr><td>3</td><td>4.0</td><td>string3</td><td>2000-03-01</td><td>2000-01-03
12:00:00</td></tr>\n",
+ "</table>\n"
+ ],
+ "text/plain": [
+ "DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]"
+ ]
+ },
+ "execution_count": 8,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "spark.conf.set('spark.sql.repl.eagerEval.enabled', True)\n",
+ "df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "The rows can also be shown vertically. This is useful when rows are too
long to show horizontally."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "-RECORD 0------------------\n",
+ " a | 1 \n",
+ " b | 2.0 \n",
+ " c | string1 \n",
+ " d | 2000-01-01 \n",
+ " e | 2000-01-01 12:00:00 \n",
+ "only showing top 1 row\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.show(1, vertical=True)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Its schema and column names can be shown as below:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 10,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "['a', 'b', 'c', 'd', 'e']"
+ ]
+ },
+ "execution_count": 10,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.columns"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 11,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "root\n",
+ " |-- a: long (nullable = true)\n",
+ " |-- b: double (nullable = true)\n",
+ " |-- c: string (nullable = true)\n",
+ " |-- d: date (nullable = true)\n",
+ " |-- e: timestamp (nullable = true)\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.printSchema()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Show the summary of the DataFrame"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 12,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-------+---+---+-------+\n",
+ "|summary| a| b| c|\n",
+ "+-------+---+---+-------+\n",
+ "| count| 3| 3| 3|\n",
+ "| mean|2.0|3.0| null|\n",
+ "| stddev|1.0|1.0| null|\n",
+ "| min| 1|2.0|string1|\n",
+ "| max| 3|4.0|string3|\n",
+ "+-------+---+---+-------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.describe().show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "`DataFrame.collect()` collects the distributed data to the driver side as
Python premitive representation. Note that this can throw out-of-memory error
when the dataset is too larget to fit in the driver side because it collects
all the data from executors to the driver side."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 13,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1),
e=datetime.datetime(2000, 1, 1, 12, 0)),\n",
+ " Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1),
e=datetime.datetime(2000, 1, 2, 12, 0)),\n",
+ " Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1),
e=datetime.datetime(2000, 1, 3, 12, 0))]"
+ ]
+ },
+ "execution_count": 13,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.collect()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "In order to avoid throwing an out-of-memory exception, use
`DataFrame.take()` or `DataFrame.tail()`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 14,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1),
e=datetime.datetime(2000, 1, 1, 12, 0))]"
+ ]
+ },
+ "execution_count": 14,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.take(1)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "PySpark DataFrame also provides the conversion back to a [pandas
DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
in order to leverage pandas APIs."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 15,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<div>\n",
+ "<style scoped>\n",
+ " .dataframe tbody tr th:only-of-type {\n",
+ " vertical-align: middle;\n",
+ " }\n",
+ "\n",
+ " .dataframe tbody tr th {\n",
+ " vertical-align: top;\n",
+ " }\n",
+ "\n",
+ " .dataframe thead th {\n",
+ " text-align: right;\n",
+ " }\n",
+ "</style>\n",
+ "<table border=\"1\" class=\"dataframe\">\n",
+ " <thead>\n",
+ " <tr style=\"text-align: right;\">\n",
+ " <th></th>\n",
+ " <th>a</th>\n",
+ " <th>b</th>\n",
+ " <th>c</th>\n",
+ " <th>d</th>\n",
+ " <th>e</th>\n",
+ " </tr>\n",
+ " </thead>\n",
+ " <tbody>\n",
+ " <tr>\n",
+ " <th>0</th>\n",
+ " <td>1</td>\n",
+ " <td>2.0</td>\n",
+ " <td>string1</td>\n",
+ " <td>2000-01-01</td>\n",
+ " <td>2000-01-01 12:00:00</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>1</th>\n",
+ " <td>2</td>\n",
+ " <td>3.0</td>\n",
+ " <td>string2</td>\n",
+ " <td>2000-02-01</td>\n",
+ " <td>2000-01-02 12:00:00</td>\n",
+ " </tr>\n",
+ " <tr>\n",
+ " <th>2</th>\n",
+ " <td>3</td>\n",
+ " <td>4.0</td>\n",
+ " <td>string3</td>\n",
+ " <td>2000-03-01</td>\n",
+ " <td>2000-01-03 12:00:00</td>\n",
+ " </tr>\n",
+ " </tbody>\n",
+ "</table>\n",
+ "</div>"
+ ],
+ "text/plain": [
+ " a b c d e\n",
+ "0 1 2.0 string1 2000-01-01 2000-01-01 12:00:00\n",
+ "1 2 3.0 string2 2000-02-01 2000-01-02 12:00:00\n",
+ "2 3 4.0 string3 2000-03-01 2000-01-03 12:00:00"
+ ]
+ },
+ "execution_count": 15,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.toPandas()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Selecting and Accessing Data\n",
+ "\n",
+ "PySpark DataFrame is lazily evaluated and simply selecting a column does
not trigger the computation but it returns a `Column` instance."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 16,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "Column<b'a'>"
+ ]
+ },
+ "execution_count": 16,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "df.a"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "In fact, most of column-weise operations return `Column`s."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 17,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "True"
+ ]
+ },
+ "execution_count": 17,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "from pyspark.sql import Column\n",
+ "from pyspark.sql.functions import upper\n",
+ "\n",
+ "type(df.c) == type(upper(df.c)) == type(df.c.isNull())"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "These `Column`s can be used to select the columns from a DataFrame. For
example, `DataFrame.select()` takes the `Column` instances that returns another
DataFrame."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 18,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-------+\n",
+ "| c|\n",
+ "+-------+\n",
+ "|string1|\n",
+ "|string2|\n",
+ "|string3|\n",
+ "+-------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.select(df.c).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Assign new `Column` instance."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 19,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+-------+\n",
+ "| a| b| c| d| e|upper_c|\n",
+ "+---+---+-------+----------+-------------------+-------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|\n",
+ "| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|\n",
+ "| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|\n",
+ "+---+---+-------+----------+-------------------+-------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.withColumn('upper_c', upper(df.c)).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "In order to select rows instead of columns, use `DataFrame.filter()`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 20,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.filter(df.a == 1).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Applying a Function\n",
+ "\n",
+ "PySpark supports various UDFs and APIs to allow users to execute Python
native functions. See also Pandas UDFs and Pandas Function APIs in User Guide.
For instance, the example below allows users to directly use the APIs in [a
pandas
Series](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.html)
within Python native function."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 21,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+------------------+\n",
+ "|pandas_plus_one(a)|\n",
+ "+------------------+\n",
+ "| 2|\n",
+ "| 3|\n",
+ "| 4|\n",
+ "+------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "import pandas\n",
+ "from pyspark.sql.functions import pandas_udf\n",
+ "\n",
+ "@pandas_udf('long')\n",
+ "def pandas_plus_one(series: pd.Series) -> pd.Series:\n",
+ " # Simply plus one by using pandas Series.\n",
+ " return series + 1\n",
+ "\n",
+ "df.select(pandas_plus_one(df.a)).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Another example is `DataFrame.mapInPandas` which allows users directly
use the APIs in a [pandas
DataFrame](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html)
without any restrictions such as the result length."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 22,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+---+---+-------+----------+-------------------+\n",
+ "| a| b| c| d| e|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|\n",
+ "+---+---+-------+----------+-------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "def pandas_filter_func(iterator):\n",
+ " for pandas_df in iterator:\n",
+ " yield pandas_df[pandas_df.a == 1]\n",
+ "\n",
+ "df.mapInPandas(pandas_filter_func, schema=df.schema).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Grouping Data\n",
+ "\n",
+ "PySpark DataFrame also provides a way of handling grouped data by using
the common approach, split-apply-combine strategy.\n",
+ "It groups the data by a certain condition, applies a function to each
group, and then combines them back to the DataFrame."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 23,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-----+------+---+---+\n",
+ "|color| fruit| v1| v2|\n",
+ "+-----+------+---+---+\n",
+ "| red|banana| 1| 10|\n",
+ "| blue|banana| 2| 20|\n",
+ "| red|carrot| 3| 30|\n",
+ "| blue| grape| 4| 40|\n",
+ "| red|carrot| 5| 50|\n",
+ "|black|carrot| 6| 60|\n",
+ "| red|banana| 7| 70|\n",
+ "| red| grape| 8| 80|\n",
+ "+-----+------+---+---+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df = spark.createDataFrame([\n",
+ " ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red',
'carrot', 3, 30],\n",
+ " ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black',
'carrot', 6, 60],\n",
+ " ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color',
'fruit', 'v1', 'v2'])\n",
+ "df.show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Grouping and then applying the avg() function to the resulting groups."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 24,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-----+-------+-------+\n",
+ "|color|avg(v1)|avg(v2)|\n",
+ "+-----+-------+-------+\n",
+ "| red| 4.8| 48.0|\n",
+ "|black| 6.0| 60.0|\n",
+ "| blue| 3.0| 30.0|\n",
+ "+-----+-------+-------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.groupby('color').avg().show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "You can also apply a Python native function against each group by using
pandas APIs."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 25,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-----+------+---+---+\n",
+ "|color| fruit| v1| v2|\n",
+ "+-----+------+---+---+\n",
+ "| red|banana| -3| 10|\n",
+ "| red|carrot| -1| 30|\n",
+ "| red|carrot| 0| 50|\n",
+ "| red|banana| 2| 70|\n",
+ "| red| grape| 3| 80|\n",
+ "|black|carrot| 0| 60|\n",
+ "| blue|banana| -1| 20|\n",
+ "| blue| grape| 1| 40|\n",
+ "+-----+------+---+---+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "def plus_mean(pandas_df):\n",
+ " return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())\n",
+ "\n",
+ "df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "Co-grouping and applying a function."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 26,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------+---+---+---+\n",
+ "| time| id| v1| v2|\n",
+ "+--------+---+---+---+\n",
+ "|20000101| 1|1.0| x|\n",
+ "|20000102| 1|3.0| x|\n",
+ "|20000101| 2|2.0| y|\n",
+ "|20000102| 2|4.0| y|\n",
+ "+--------+---+---+---+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df1 = spark.createDataFrame(\n",
+ " [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0),
(20000102, 2, 4.0)],\n",
+ " ('time', 'id', 'v1'))\n",
+ "\n",
+ "df2 = spark.createDataFrame(\n",
+ " [(20000101, 1, 'x'), (20000101, 2, 'y')],\n",
+ " ('time', 'id', 'v2'))\n",
+ "\n",
+ "def asof_join(l, r):\n",
+ " return pd.merge_asof(l, r, on='time', by='id')\n",
+ "\n",
+ "df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(\n",
+ " asof_join, schema='time int, id int, v1 double, v2 string').show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Getting Data in/out\n",
+ "\n",
+ "CSV is straightforward and easy to use. Parquet and ORC are efficient and
compact file formats to read and write faster.\n",
+ "\n",
+ "There are many other data sources available in PySpark such as JDBC,
text, binaryFile, Avro, etc. See also \"Spark SQL, DataFrames and Datasets
Guide\" in Apache Spark documentation."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### CSV"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 27,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-----+------+---+---+\n",
+ "|color| fruit| v1| v2|\n",
+ "+-----+------+---+---+\n",
+ "| red|banana| 1| 10|\n",
+ "| blue|banana| 2| 20|\n",
+ "| red|carrot| 3| 30|\n",
+ "| blue| grape| 4| 40|\n",
+ "| red|carrot| 5| 50|\n",
+ "|black|carrot| 6| 60|\n",
+ "| red|banana| 7| 70|\n",
+ "| red| grape| 8| 80|\n",
+ "+-----+------+---+---+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.write.csv('foo.csv', header=True)\n",
+ "spark.read.csv('foo.csv', header=True).show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Parquet"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 28,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-----+------+---+---+\n",
+ "|color| fruit| v1| v2|\n",
+ "+-----+------+---+---+\n",
+ "| red|banana| 1| 10|\n",
+ "| blue|banana| 2| 20|\n",
+ "| red|carrot| 3| 30|\n",
+ "| blue| grape| 4| 40|\n",
+ "| red|carrot| 5| 50|\n",
+ "|black|carrot| 6| 60|\n",
+ "| red|banana| 7| 70|\n",
+ "| red| grape| 8| 80|\n",
+ "+-----+------+---+---+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.write.parquet('bar.parquet')\n",
+ "spark.read.parquet('bar.parquet').show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### ORC"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 29,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-----+------+---+---+\n",
+ "|color| fruit| v1| v2|\n",
+ "+-----+------+---+---+\n",
+ "| red|banana| 1| 10|\n",
+ "| blue|banana| 2| 20|\n",
+ "| red|carrot| 3| 30|\n",
+ "| blue| grape| 4| 40|\n",
+ "| red|carrot| 5| 50|\n",
+ "|black|carrot| 6| 60|\n",
+ "| red|banana| 7| 70|\n",
+ "| red| grape| 8| 80|\n",
+ "+-----+------+---+---+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.write.orc('zoo.orc')\n",
+ "spark.read.orc('zoo.orc').show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "## Working with SQL\n",
Review comment:
Do you plan to expand this section?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]