Thanks,

I've updated my code to use updateStateByKey but am still getting these
errors when I resume from a checkpoint.

One thought of mine was that I used sc.parallelize to generate the RDDs for
the queue, but perhaps on resume, it doesn't recreate the context needed?



--

Shaanan Cohney
PhD Student
University of Pennsylvania


shaan...@gmail.com

On Mon, Jun 22, 2015 at 9:27 PM, Benjamin Fradet <benjamin.fra...@gmail.com>
wrote:

> I would suggest you have a look at the updateStateByKey transformation in
> the Spark Streaming programming guide which should fit your needs better
> than your update_state function.
> On 22 Jun 2015 1:03 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote:
>
>> Counts is a list (counts = []) in the driver, used to collect the results.
>> It seems like it's also not the best way to be doing things, but I'm new
>> to spark and editing someone else's code so still learning.
>> Thanks!
>>
>>
>> def update_state(out_files, counts, curr_rdd):
>>     try:
>>         for c in curr_rdd.collect():
>>             fnames, count = c
>>             counts.append(count)
>>             out_files |= fnames
>>     except Py4JJavaError as e:
>>         print("EXCEPTION: %s" % str(e))
>>
>> --
>>
>> Shaanan Cohney
>> PhD Student
>> University of Pennsylvania
>>
>>
>> shaan...@gmail.com
>>
>> On Mon, Jun 22, 2015 at 8:56 PM, Benjamin Fradet <
>> benjamin.fra...@gmail.com> wrote:
>>
>>> What does "counts" refer to?
>>>
>>> Could you also paste the code of your "update_state" function?
>>> On 22 Jun 2015 12:48 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote:
>>>
>>>> I'm receiving the SPARK-5063 error (RDD transformations and actions can
>>>> only be invoked by the driver, not inside of other transformations)
>>>> whenever I try and restore from a checkpoint in spark streaming on my app.
>>>>
>>>> I'm using python3 and my RDDs are inside a queuestream DStream.
>>>>
>>>> This is the little chunk of code causing issues:
>>>>
>>>> -----
>>>>
>>>> p_batches = [sc.parallelize(batch) for batch in task_batches]
>>>>
>>>> sieving_tasks = ssc.queueStream(p_batches)
>>>> sieving_tasks.checkpoint(20)
>>>> relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly,
>>>> poly_path, fb_paths))
>>>> relations.reduce(lambda a, b: (a[0] | b[0], a[1] + b[1])
>>>> ).foreachRDD(lambda s: update_state(out_files, counts, s))
>>>> ssc.checkpoint(s3n_path)
>>>>
>>>> -----
>>>>
>>>> Thanks again!
>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Shaanan Cohney
>>>> PhD Student
>>>> University of Pennsylvania
>>>>
>>>>
>>>> shaan...@gmail.com
>>>>
>>>
>>

Reply via email to