Thanks, I've updated my code to use updateStateByKey but am still getting these errors when I resume from a checkpoint.
One thought of mine was that I used sc.parallelize to generate the RDDs for the queue, but perhaps on resume, it doesn't recreate the context needed? -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com On Mon, Jun 22, 2015 at 9:27 PM, Benjamin Fradet <benjamin.fra...@gmail.com> wrote: > I would suggest you have a look at the updateStateByKey transformation in > the Spark Streaming programming guide which should fit your needs better > than your update_state function. > On 22 Jun 2015 1:03 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote: > >> Counts is a list (counts = []) in the driver, used to collect the results. >> It seems like it's also not the best way to be doing things, but I'm new >> to spark and editing someone else's code so still learning. >> Thanks! >> >> >> def update_state(out_files, counts, curr_rdd): >> try: >> for c in curr_rdd.collect(): >> fnames, count = c >> counts.append(count) >> out_files |= fnames >> except Py4JJavaError as e: >> print("EXCEPTION: %s" % str(e)) >> >> -- >> >> Shaanan Cohney >> PhD Student >> University of Pennsylvania >> >> >> shaan...@gmail.com >> >> On Mon, Jun 22, 2015 at 8:56 PM, Benjamin Fradet < >> benjamin.fra...@gmail.com> wrote: >> >>> What does "counts" refer to? >>> >>> Could you also paste the code of your "update_state" function? >>> On 22 Jun 2015 12:48 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote: >>> >>>> I'm receiving the SPARK-5063 error (RDD transformations and actions can >>>> only be invoked by the driver, not inside of other transformations) >>>> whenever I try and restore from a checkpoint in spark streaming on my app. >>>> >>>> I'm using python3 and my RDDs are inside a queuestream DStream. >>>> >>>> This is the little chunk of code causing issues: >>>> >>>> ----- >>>> >>>> p_batches = [sc.parallelize(batch) for batch in task_batches] >>>> >>>> sieving_tasks = ssc.queueStream(p_batches) >>>> sieving_tasks.checkpoint(20) >>>> relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly, >>>> poly_path, fb_paths)) >>>> relations.reduce(lambda a, b: (a[0] | b[0], a[1] + b[1]) >>>> ).foreachRDD(lambda s: update_state(out_files, counts, s)) >>>> ssc.checkpoint(s3n_path) >>>> >>>> ----- >>>> >>>> Thanks again! >>>> >>>> >>>> >>>> -- >>>> >>>> Shaanan Cohney >>>> PhD Student >>>> University of Pennsylvania >>>> >>>> >>>> shaan...@gmail.com >>>> >>> >>