GitHub user viirya opened a pull request:
https://github.com/apache/spark/pull/16998
[SPARK-19665][SQL] Improve constraint propagation
## What changes were proposed in this pull request?
If there are aliased expression in the projection, we propagate constraints
by completely expanding the original constraints with aliases.
This expanding costs much computation time when the number of aliases
increases.
Fully expanding all constraints at all the time makes iterative ML
algorithms where a ML pipeline with many stages runs very slow. See #16785.
Another issue is we actually don't need the additional constraints at most
of time. For example, if there is a constraint "a > b", and "a" is aliased to
"c" and "d". When we use this constraint in filtering, we don't need all
constraints "a > b", "c > b", "d > b". We only need "a > b" because if it is
false, it is guaranteed that all other constraints are false too.
### Benchmark
Run the following codes locally.
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer,
VectorAssembler}
val df = (1 to 40).foldLeft(Seq((1, "foo"), (2, "bar"), (3,
"baz")).toDF("id", "x0"))((df, i) => df.withColumn(s"x$i", $"x0"))
val indexers = df.columns.tail.map(c => new StringIndexer()
.setInputCol(c)
.setOutputCol(s"${c}_indexed")
.setHandleInvalid("skip"))
val encoders = indexers.map(indexer => new OneHotEncoder()
.setInputCol(indexer.getOutputCol)
.setOutputCol(s"${indexer.getOutputCol}_encoded")
.setDropLast(true))
val stages: Array[PipelineStage] = indexers ++ encoders
val pipeline = new Pipeline().setStages(stages)
val startTime = System.nanoTime
pipeline.fit(df).transform(df).show
val runningTime = System.nanoTime - startTime
Before this patch: 1786001 ms
After this patch: 49972 ms
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise,
remove this)
Please review http://spark.apache.org/contributing.html before opening a
pull request.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/viirya/spark-1
improve-constraints-generation-2
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/16998.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #16998
----
commit b4e514ade7ea478055db448bbf66f7a88caf3a86
Author: Liang-Chi Hsieh <[email protected]>
Date: 2017-02-03T07:08:47Z
Improve the code to generate constraints.
commit 8c98a5c3ab1477408988c8cb682733e65dd554fc
Author: Liang-Chi Hsieh <[email protected]>
Date: 2017-02-09T04:07:09Z
Use parallel collection to improve the function.
commit 1b9c5616633909ad4bdac4cff1534f0bee548a7a
Author: Liang-Chi Hsieh <[email protected]>
Date: 2017-02-17T03:18:21Z
Merge remote-tracking branch 'upstream/master' into
improve-constraints-generation
commit 278c31cf8aa27c71e0f5178bebcb426ec5fba6ce
Author: Liang-Chi Hsieh <[email protected]>
Date: 2017-02-17T03:23:39Z
Revert parallel collection approach. Reduce aliased constraints.
commit 24fb723207d80a7c6068fd113430488b89ed9d0b
Author: Liang-Chi Hsieh <[email protected]>
Date: 2017-02-20T08:32:53Z
Improve constraint propagation.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]