[ https://issues.apache.org/jira/browse/SPARK-10764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15928169#comment-15928169 ]
Sachin Tyagi commented on SPARK-10764: -------------------------------------- Hi, I want to take a stab at it. Here's how I am trying to approach it. A PipelineStage can be marked to persist its output DataFrame by calling a persist(storageLevel, columnExprs). This will result in two cases: * For Transformers -- their output DF should be marked to persist. * For Estimators -- the output of their models should be marked to persist. For example, {code:title=Example.scala|borderStyle=solid} val tokenizer = ... // CountVectorizer estimator's Model should persist its output DF (and only those columns passed in args to persist) so that the LDA iterations can run on the smaller persisted dataframe. val countVectorizer = new CountVectorizer() .setInputCol("words") .setOutputCol("features") .setVocabSize(1000) .persist(StorageLevel.MEMORY_AND_DISK, "doc_id", "features") val lda = ... val pipelineModel = new Pipeline().setStages(Array(tokenizer, countVectorizer, lda)) {code} Also, there should be a way use the fitted pipeline to transform an already persisted dataframe if that dataframe was persisted as part of some stage during fit(). Else we end up doing unnecessary work in some cases (tokenizing and countvectorizing the input dataframe again to get topic distributions, in above example). Instead in such a case, only the necessary stages should be invoked to transform. {code:title=Continue.scala|borderStyle=solid} // The pipeline model should be able to identify whether the passed DF was persisted as part of some stage and then run only needed stages. In this case, the model should run only the LDA stage. pipelineModel.transform(countVectorizer.getCacheDF()) // This should run all stages pipelineModel.tranform(unpersistedDF) {code} In my mind, this can be achieved by modifying the PipelineStage, Pipeline and PipelineModel classes. Specifically, their transform and transformSchema methods. And obviously, by creating the appropriate persist() method(s) on PipelineStage. Please let me know your comments on this approach. Specifically, if you see any issues or things that need to be taken care of. I can submitted a PR soon to see how it looks. > Add optional caching to Pipelines > --------------------------------- > > Key: SPARK-10764 > URL: https://issues.apache.org/jira/browse/SPARK-10764 > Project: Spark > Issue Type: Sub-task > Components: ML > Reporter: Joseph K. Bradley > > We need to explore how to cache DataFrames during the execution of Pipelines. > It's a hard problem in general to handle automatically or manually, so we > should start with some design discussions about: > * How to control it manually > * Whether & how to handle it automatically > * API changes needed for each -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org