Counts is a list (counts = []) in the driver, used to collect the results.
It seems like it's also not the best way to be doing things, but I'm new to
spark and editing someone else's code so still learning.
Thanks!


def update_state(out_files, counts, curr_rdd):
    try:
        for c in curr_rdd.collect():
            fnames, count = c
            counts.append(count)
            out_files |= fnames
    except Py4JJavaError as e:
        print("EXCEPTION: %s" % str(e))

--

Shaanan Cohney
PhD Student
University of Pennsylvania


shaan...@gmail.com

On Mon, Jun 22, 2015 at 8:56 PM, Benjamin Fradet <benjamin.fra...@gmail.com>
wrote:

> What does "counts" refer to?
>
> Could you also paste the code of your "update_state" function?
> On 22 Jun 2015 12:48 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote:
>
>> I'm receiving the SPARK-5063 error (RDD transformations and actions can
>> only be invoked by the driver, not inside of other transformations)
>> whenever I try and restore from a checkpoint in spark streaming on my app.
>>
>> I'm using python3 and my RDDs are inside a queuestream DStream.
>>
>> This is the little chunk of code causing issues:
>>
>> -----
>>
>> p_batches = [sc.parallelize(batch) for batch in task_batches]
>>
>> sieving_tasks = ssc.queueStream(p_batches)
>> sieving_tasks.checkpoint(20)
>> relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly,
>> poly_path, fb_paths))
>> relations.reduce(lambda a, b: (a[0] | b[0], a[1] + b[1])
>> ).foreachRDD(lambda s: update_state(out_files, counts, s))
>> ssc.checkpoint(s3n_path)
>>
>> -----
>>
>> Thanks again!
>>
>>
>>
>> --
>>
>> Shaanan Cohney
>> PhD Student
>> University of Pennsylvania
>>
>>
>> shaan...@gmail.com
>>
>

Reply via email to