I'm trying to set up a PySpark ETL job that takes in JSON log files and
spits out fact table files for upload to Redshift.  Is there an efficient
way to send different event types to different outputs without having to
just read the same cached RDD twice?  I have my first RDD which is just a
json parsed version of the input data, and I need to create a flattened
page views dataset off this based on eventType = 'INITIAL', and then a page
events dataset from the same RDD based on eventType  = 'ADDITIONAL'.
Ideally I'd like the output files for both these tables to be written at
the same time, so I'm picturing a function with one input RDD in and two
RDDs out, or a function utilising two CSV writers.  I'm using mapPartitions
at the moment to write to files like this:

def write_records(records):
    output = StringIO.StringIO()
    writer = vlad.CsvUnicodeWriter(output, dialect='excel')
    for record in records:
        writer.writerow(record)
    return [output.getvalue()]

and I use this in the call to write the file as follows (pageviews and
events get created off the same json parsed RDD by filtering on INITIAL or
ADDITIONAL respectively):

pageviews.mapPartitions(writeRecords).saveAsTextFile('s3n://output/pageviews/')
events.mapPartitions(writeRecords).saveAsTextFile(''s3n://output/events/)

Is there a way to change this so that both are written in the same process?

Reply via email to