I would suggest you have a look at the updateStateByKey transformation in the Spark Streaming programming guide which should fit your needs better than your update_state function. On 22 Jun 2015 1:03 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote:
> Counts is a list (counts = []) in the driver, used to collect the results. > It seems like it's also not the best way to be doing things, but I'm new > to spark and editing someone else's code so still learning. > Thanks! > > > def update_state(out_files, counts, curr_rdd): > try: > for c in curr_rdd.collect(): > fnames, count = c > counts.append(count) > out_files |= fnames > except Py4JJavaError as e: > print("EXCEPTION: %s" % str(e)) > > -- > > Shaanan Cohney > PhD Student > University of Pennsylvania > > > shaan...@gmail.com > > On Mon, Jun 22, 2015 at 8:56 PM, Benjamin Fradet < > benjamin.fra...@gmail.com> wrote: > >> What does "counts" refer to? >> >> Could you also paste the code of your "update_state" function? >> On 22 Jun 2015 12:48 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote: >> >>> I'm receiving the SPARK-5063 error (RDD transformations and actions can >>> only be invoked by the driver, not inside of other transformations) >>> whenever I try and restore from a checkpoint in spark streaming on my app. >>> >>> I'm using python3 and my RDDs are inside a queuestream DStream. >>> >>> This is the little chunk of code causing issues: >>> >>> ----- >>> >>> p_batches = [sc.parallelize(batch) for batch in task_batches] >>> >>> sieving_tasks = ssc.queueStream(p_batches) >>> sieving_tasks.checkpoint(20) >>> relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly, >>> poly_path, fb_paths)) >>> relations.reduce(lambda a, b: (a[0] | b[0], a[1] + b[1]) >>> ).foreachRDD(lambda s: update_state(out_files, counts, s)) >>> ssc.checkpoint(s3n_path) >>> >>> ----- >>> >>> Thanks again! >>> >>> >>> >>> -- >>> >>> Shaanan Cohney >>> PhD Student >>> University of Pennsylvania >>> >>> >>> shaan...@gmail.com >>> >> >