I would suggest you have a look at the updateStateByKey transformation in
the Spark Streaming programming guide which should fit your needs better
than your update_state function.
On 22 Jun 2015 1:03 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote:

> Counts is a list (counts = []) in the driver, used to collect the results.
> It seems like it's also not the best way to be doing things, but I'm new
> to spark and editing someone else's code so still learning.
> Thanks!
>
>
> def update_state(out_files, counts, curr_rdd):
>     try:
>         for c in curr_rdd.collect():
>             fnames, count = c
>             counts.append(count)
>             out_files |= fnames
>     except Py4JJavaError as e:
>         print("EXCEPTION: %s" % str(e))
>
> --
>
> Shaanan Cohney
> PhD Student
> University of Pennsylvania
>
>
> shaan...@gmail.com
>
> On Mon, Jun 22, 2015 at 8:56 PM, Benjamin Fradet <
> benjamin.fra...@gmail.com> wrote:
>
>> What does "counts" refer to?
>>
>> Could you also paste the code of your "update_state" function?
>> On 22 Jun 2015 12:48 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote:
>>
>>> I'm receiving the SPARK-5063 error (RDD transformations and actions can
>>> only be invoked by the driver, not inside of other transformations)
>>> whenever I try and restore from a checkpoint in spark streaming on my app.
>>>
>>> I'm using python3 and my RDDs are inside a queuestream DStream.
>>>
>>> This is the little chunk of code causing issues:
>>>
>>> -----
>>>
>>> p_batches = [sc.parallelize(batch) for batch in task_batches]
>>>
>>> sieving_tasks = ssc.queueStream(p_batches)
>>> sieving_tasks.checkpoint(20)
>>> relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly,
>>> poly_path, fb_paths))
>>> relations.reduce(lambda a, b: (a[0] | b[0], a[1] + b[1])
>>> ).foreachRDD(lambda s: update_state(out_files, counts, s))
>>> ssc.checkpoint(s3n_path)
>>>
>>> -----
>>>
>>> Thanks again!
>>>
>>>
>>>
>>> --
>>>
>>> Shaanan Cohney
>>> PhD Student
>>> University of Pennsylvania
>>>
>>>
>>> shaan...@gmail.com
>>>
>>
>

Reply via email to