Counts is a list (counts = []) in the driver, used to collect the results. It seems like it's also not the best way to be doing things, but I'm new to spark and editing someone else's code so still learning. Thanks!
def update_state(out_files, counts, curr_rdd): try: for c in curr_rdd.collect(): fnames, count = c counts.append(count) out_files |= fnames except Py4JJavaError as e: print("EXCEPTION: %s" % str(e)) -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com On Mon, Jun 22, 2015 at 8:56 PM, Benjamin Fradet <benjamin.fra...@gmail.com> wrote: > What does "counts" refer to? > > Could you also paste the code of your "update_state" function? > On 22 Jun 2015 12:48 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote: > >> I'm receiving the SPARK-5063 error (RDD transformations and actions can >> only be invoked by the driver, not inside of other transformations) >> whenever I try and restore from a checkpoint in spark streaming on my app. >> >> I'm using python3 and my RDDs are inside a queuestream DStream. >> >> This is the little chunk of code causing issues: >> >> ----- >> >> p_batches = [sc.parallelize(batch) for batch in task_batches] >> >> sieving_tasks = ssc.queueStream(p_batches) >> sieving_tasks.checkpoint(20) >> relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly, >> poly_path, fb_paths)) >> relations.reduce(lambda a, b: (a[0] | b[0], a[1] + b[1]) >> ).foreachRDD(lambda s: update_state(out_files, counts, s)) >> ssc.checkpoint(s3n_path) >> >> ----- >> >> Thanks again! >> >> >> >> -- >> >> Shaanan Cohney >> PhD Student >> University of Pennsylvania >> >> >> shaan...@gmail.com >> >