There is a DStream.transform() that does exactly this. On Tue, Aug 8, 2017 at 7:55 PM, Ashwin Raju <ther...@gmail.com> wrote:
> Hi, > > We've built a batch application on Spark 1.6.1. I'm looking into how to > run the same code as a streaming (DStream based) application. This is using > pyspark. > > In the batch application, we have a sequence of transforms that read from > file, do dataframe operations, then write to file. I was hoping to swap out > the read from file with textFileStream, then use the dataframe operations > as is. This would mean that if we change the batch pipeline, so long as it > is a sequence of dataframe operations, the streaming version can just reuse > the code. > > Looking at the sql_network_wordcount > <https://github.com/apache/spark/blob/branch-1.6/examples/src/main/python/streaming/sql_network_wordcount.py> > example, it looks like I'd have to do DStream.foreachRDD, convert the > passed in RDD into a dataframe and then do my sequence of dataframe > operations. However, that list of dataframe operations looks to be > hardcoded into the process method, is there any way to pass in a function > that takes a dataframe as input and returns a dataframe? > > what i see from the example: > > words.foreachRDD(process) > > def process(time, rdd): > # create dataframe from RDD > # hardcoded operations on the dataframe > > what i would like to do instead: > def process(time, rdd): > # create dataframe from RDD - input_df > # output_df = dataframe_pipeline_fn(input_df) > > -ashwin > > > >