Re: Reusing dataframes for streaming (spark 1.6)

2017-08-09 Thread Tathagata Das
There is a DStream.transform() that does exactly this.

On Tue, Aug 8, 2017 at 7:55 PM, Ashwin Raju  wrote:

> Hi,
>
> We've built a batch application on Spark 1.6.1. I'm looking into how to
> run the same code as a streaming (DStream based) application. This is using
> pyspark.
>
> In the batch application, we have a sequence of transforms that read from
> file, do dataframe operations, then write to file. I was hoping to swap out
> the read from file with textFileStream, then use the dataframe operations
> as is. This would mean that if we change the batch pipeline, so long as it
> is a sequence of dataframe operations, the streaming version can just reuse
> the code.
>
> Looking at the sql_network_wordcount
> 
> example, it looks like I'd have to do DStream.foreachRDD, convert the
> passed in RDD into a dataframe and then do my sequence of dataframe
> operations. However, that list of dataframe operations looks to be
> hardcoded into the process method, is there any way to pass in a function
> that takes a dataframe as input and returns a dataframe?
>
> what i see from the example:
>
> words.foreachRDD(process)
>
> def process(time, rdd):
> # create dataframe from RDD
> # hardcoded operations on the dataframe
>
> what i would like to do instead:
> def process(time, rdd):
> # create dataframe from RDD - input_df
> # output_df = dataframe_pipeline_fn(input_df)
>
> -ashwin
>
>
>
>


Reusing dataframes for streaming (spark 1.6)

2017-08-08 Thread Ashwin Raju
Hi,

We've built a batch application on Spark 1.6.1. I'm looking into how to run
the same code as a streaming (DStream based) application. This is using
pyspark.

In the batch application, we have a sequence of transforms that read from
file, do dataframe operations, then write to file. I was hoping to swap out
the read from file with textFileStream, then use the dataframe operations
as is. This would mean that if we change the batch pipeline, so long as it
is a sequence of dataframe operations, the streaming version can just reuse
the code.

Looking at the sql_network_wordcount

example, it looks like I'd have to do DStream.foreachRDD, convert the
passed in RDD into a dataframe and then do my sequence of dataframe
operations. However, that list of dataframe operations looks to be
hardcoded into the process method, is there any way to pass in a function
that takes a dataframe as input and returns a dataframe?

what i see from the example:

words.foreachRDD(process)

def process(time, rdd):
# create dataframe from RDD
# hardcoded operations on the dataframe

what i would like to do instead:
def process(time, rdd):
# create dataframe from RDD - input_df
# output_df = dataframe_pipeline_fn(input_df)

-ashwin