[ 
https://issues.apache.org/jira/browse/SPARK-10764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928169#comment-15928169
 ] 

Sachin Tyagi commented on SPARK-10764:
--------------------------------------

Hi, I want to take a stab at it. Here's how I am trying to approach it.

A PipelineStage can be marked to persist its output DataFrame by calling a 
persist(storageLevel, columnExprs). This will result in two cases:
* For Transformers -- their output DF should be marked to persist.
* For Estimators -- the output of their models should be marked to persist.

For example,
{code:title=Example.scala|borderStyle=solid}
val tokenizer = ...
// CountVectorizer estimator's Model should persist its output DF (and only 
those columns passed in args to persist) so that the LDA iterations can run on 
the smaller persisted dataframe.
val countVectorizer = new CountVectorizer()
      .setInputCol("words")
      .setOutputCol("features")
      .setVocabSize(1000)
      .persist(StorageLevel.MEMORY_AND_DISK, "doc_id", "features")
val lda = ...
val pipelineModel =  new Pipeline().setStages(Array(tokenizer, countVectorizer, 
lda))
{code}

Also, there should be a way use the fitted pipeline to transform an already 
persisted dataframe if that dataframe was persisted as part of some stage 
during fit(). Else we end up doing unnecessary work in some cases (tokenizing 
and countvectorizing the input dataframe again to get topic distributions, in 
above example). 

Instead in such a case, only the necessary stages should be invoked to 
transform.

{code:title=Continue.scala|borderStyle=solid}
// The pipeline model should be able to identify whether the passed DF was 
persisted as part of some stage and then run only needed stages. In this case, 
the model should run only the LDA stage.
pipelineModel.transform(countVectorizer.getCacheDF())

// This should run all stages
pipelineModel.tranform(unpersistedDF)
{code}

In my mind, this can be achieved by modifying the PipelineStage, Pipeline and 
PipelineModel classes. Specifically, their transform and transformSchema 
methods. And obviously, by creating the appropriate persist() method(s) on 
PipelineStage.

Please let me know your comments on this approach. Specifically, if you see any 
issues or things that need to be taken care of. I can submitted a PR soon to 
see how it looks.

> Add optional caching to Pipelines
> ---------------------------------
>
>                 Key: SPARK-10764
>                 URL: https://issues.apache.org/jira/browse/SPARK-10764
>             Project: Spark
>          Issue Type: Sub-task
>          Components: ML
>            Reporter: Joseph K. Bradley
>
> We need to explore how to cache DataFrames during the execution of Pipelines. 
>  It's a hard problem in general to handle automatically or manually, so we 
> should start with some design discussions about:
> * How to control it manually
> * Whether & how to handle it automatically
> * API changes needed for each



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to