GitHub user liancheng opened a pull request:
https://github.com/apache/spark/pull/829
[SPARK-1669 & SPARK-1379][SQL][WIP] Made SchemaRDD.cache() to leverage
InMemoryColumnarTableScan
Corresponding JIRA issues:
[SPARK-1669](https://issues.apache.org/jira/browse/SPARK-1669),
[SPARK-1379](https://issues.apache.org/jira/browse/SPARK-1379)
(To @rxin @marmbrus: this change seems to bring some subtleties, and makes
me doubt whether it's a good idea to make `SchemaRDD.cache()` to leverage
in-memory columnar storage. See the "Open issues" section below. Maybe we
should simply throw an exception in `SchemaRDD.cache()` to inform people use
`cacheTable` instead. Or maybe there are simpler and more intuitive ways that I
failed to discover.)
In this PR, we overrode cache related RDD operations in `SchemaRDD` to make
`.cache()` to leverage `InMemoryColumnarTableScan`, and made table caching
operation idempotent (calling `cacheTable()` multiple times gives the same
result).
### Basic ideas
When calling `.cache()` or `.persist()` on a `SchemaRDD`, an
`InMemoryColumnarTableScan` is constructed, and the parent RDD of the
`SchemaRDD` is replaced with the cached columns RDD returned by
`InMemoryColumnarTableScan.execute()`.
Notice that in this way, from the viewpoint of Spark runtime, the RDD being
cached is the cached columns RDD rather than the `SchemaRDD` itself.
### Updated APIs
- `SchemaRDD.persist(newLevel: StorageLevel)`
Builds in-memory columnar cache and replace the parent RDD if
`newLevel.useMemory` is true, otherwise, fallback to the normal caching
mechanism.
- `SchemaRDD.unpersist(blocking: Boolea)`
Unpersist the in-memory columnar cache and restore the parent RDD if
necessary.
- `SchemaRDD.logicalPlan`
Returns the in-memory columnar version of logical plan if the `SchemaRDD`
is cached in memory.
- `SQLContext.cacheTable(tableName: String)`
Forwards to `SQLContext.persistTable(tableName: String, newLevel:
StorageLevel)` and leverages in-memory columnar storage if necessary.
- `SQLContext.uncacheTable(tableName: String)`
Forwards to `SQLContext.unpersistTable(tableName: String, blocking:
Boolean)`, and re-registers the uncached table if it's backed with an existing
RDD.
### New APIs introduced
- `SQLContext.persistTable(tableName: String, newLevel: StorageLevel)`
Equivalents to
`table(tableName).persist(newLevel).registerAsTable(tableName)`.
- `SQLContext.unpersistTable(blocking: Boolean)`
Similar to `table(tableName).unpersist(blocking)`, despite that it
re-registers uncached table if necessary.
### Open issues
Now we can cache / uncache a table in two styles. In the plain old way:
```scala
val schemaRdd = ...
schemaRdd.registerAsTable("t")
// Caching
cacheTable("t")
// Uncaching
uncacheTable("t")
```
In this way, the name of the cached table is guaranteed to be the same as
the original table.
The new, more RDD-style way:
```scala
val schemaRdd = ...
schemaRdd.registerAsTable("t")
// Caching
schemaRdd.cache().registerAsTable("t_cached")
// Uncaching (plan A: OK)
schemaRdd.uncache()
// Uncaching (plan B: OK? A little complicated...)
table("t_cached").uncache()
// Uncaching (plan C: probably WRONG)
uncacheTable("t_cached")
```
A major difference here is that in the new way, the name of the cached
table is not necessary the same as the original one. So plan C is not OK
because `uncacheTable` tries to re-register the original RDD with the wrong
table name, and there ends up two uncached tables named `t` and `t_cached`
respectively.
In plan B, `table("t_cached")` returns a new RDD, although the memory
occupied by the columnar storage is released, `schemaRdd` is left untouched and
will be cached again in subsequent jobs even if no `.cache()` is not called.
Another confusion is:
```scala
val productRdd: RDD[SomeCaseClass] = ...
productRdd.cache().registerAsTable("t")
```
Although looks very similar to the second way of caching a `SchemaRDD`
table, in this case, no table is cached, since `productRdd` is not a
`SchemaRDD`.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/liancheng/spark schemaRddCache
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/829.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #829
----
commit fe4ba9eb4c089ac20235af7e9980b94691c1fc0a
Author: Cheng Lian <[email protected]>
Date: 2014-05-16T13:01:35Z
SchemaRDD.cache() now leverages InMemoryColumnarTableScan
commit 919966769bed545c8efba25c66b4e52762b61417
Author: Cheng Lian <[email protected]>
Date: 2014-05-19T02:06:00Z
SchemaRDD.cache() now leverages InMemoryColumnarTableScan
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---