queue stream does not support driver checkpoint recovery since the RDDs in
the queue are arbitrary generated by the user and its hard for Spark
Streaming to keep track of the data in the RDDs (thats necessary for
recovering from checkpoint). Anyways queue stream is meant of testing and
development, not for production and hence the question of recovering the
driver does not arise in that case.

On Mon, Jun 22, 2015 at 8:10 AM, Shaanan Cohney <shaan...@gmail.com> wrote:

> It's a generated set of shell commands to run (written in C, highly
> optimized numerical computer), which is create from a set of user provided
> parameters.
>
> The snippet above is:
>
>         task_outfiles_to_cmds = OrderedDict(run_sieving.leftover_tasks)
>
> task_outfiles_to_cmds.update(generate_sieving_task_commands(parameters,
> poly_path, fb_paths))
>
>         task_commands = list(task_outfiles_to_cmds.values())
>         task_path = os.path.join(utils.WORK_DIR, "sieving_tasks")
>         if not os.path.exists(task_path):
>             os.makedirs(task_path)
>
>         batch_size = utils.TOTAL_CORES
>         task_batches = [task_commands[c:c+batch_size] for c in range(0,
> len(task_commands),batch_size)]
>
> Which does not reference the SparkContext or StreamingContext at all.
>
>
>
> --
>
> Shaanan Cohney
> PhD Student
> University of Pennsylvania
>
>
> shaan...@gmail.com
>
> On Tue, Jun 23, 2015 at 1:05 AM, Benjamin Fradet <
> benjamin.fra...@gmail.com> wrote:
>
>> Where does "task_batches" come from?
>> On 22 Jun 2015 4:48 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote:
>>
>>> Thanks,
>>>
>>> I've updated my code to use updateStateByKey but am still getting these
>>> errors when I resume from a checkpoint.
>>>
>>> One thought of mine was that I used sc.parallelize to generate the RDDs
>>> for the queue, but perhaps on resume, it doesn't recreate the context
>>> needed?
>>>
>>>
>>>
>>> --
>>>
>>> Shaanan Cohney
>>> PhD Student
>>> University of Pennsylvania
>>>
>>>
>>> shaan...@gmail.com
>>>
>>> On Mon, Jun 22, 2015 at 9:27 PM, Benjamin Fradet <
>>> benjamin.fra...@gmail.com> wrote:
>>>
>>>> I would suggest you have a look at the updateStateByKey transformation
>>>> in the Spark Streaming programming guide which should fit your needs better
>>>> than your update_state function.
>>>> On 22 Jun 2015 1:03 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote:
>>>>
>>>>> Counts is a list (counts = []) in the driver, used to collect the
>>>>> results.
>>>>> It seems like it's also not the best way to be doing things, but I'm
>>>>> new to spark and editing someone else's code so still learning.
>>>>> Thanks!
>>>>>
>>>>>
>>>>> def update_state(out_files, counts, curr_rdd):
>>>>>     try:
>>>>>         for c in curr_rdd.collect():
>>>>>             fnames, count = c
>>>>>             counts.append(count)
>>>>>             out_files |= fnames
>>>>>     except Py4JJavaError as e:
>>>>>         print("EXCEPTION: %s" % str(e))
>>>>>
>>>>> --
>>>>>
>>>>> Shaanan Cohney
>>>>> PhD Student
>>>>> University of Pennsylvania
>>>>>
>>>>>
>>>>> shaan...@gmail.com
>>>>>
>>>>> On Mon, Jun 22, 2015 at 8:56 PM, Benjamin Fradet <
>>>>> benjamin.fra...@gmail.com> wrote:
>>>>>
>>>>>> What does "counts" refer to?
>>>>>>
>>>>>> Could you also paste the code of your "update_state" function?
>>>>>> On 22 Jun 2015 12:48 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote:
>>>>>>
>>>>>>> I'm receiving the SPARK-5063 error (RDD transformations and actions
>>>>>>> can only be invoked by the driver, not inside of other transformations)
>>>>>>> whenever I try and restore from a checkpoint in spark streaming on my 
>>>>>>> app.
>>>>>>>
>>>>>>> I'm using python3 and my RDDs are inside a queuestream DStream.
>>>>>>>
>>>>>>> This is the little chunk of code causing issues:
>>>>>>>
>>>>>>> -----
>>>>>>>
>>>>>>> p_batches = [sc.parallelize(batch) for batch in task_batches]
>>>>>>>
>>>>>>> sieving_tasks = ssc.queueStream(p_batches)
>>>>>>> sieving_tasks.checkpoint(20)
>>>>>>> relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly,
>>>>>>> poly_path, fb_paths))
>>>>>>> relations.reduce(lambda a, b: (a[0] | b[0], a[1] + b[1])
>>>>>>> ).foreachRDD(lambda s: update_state(out_files, counts, s))
>>>>>>> ssc.checkpoint(s3n_path)
>>>>>>>
>>>>>>> -----
>>>>>>>
>>>>>>> Thanks again!
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> Shaanan Cohney
>>>>>>> PhD Student
>>>>>>> University of Pennsylvania
>>>>>>>
>>>>>>>
>>>>>>> shaan...@gmail.com
>>>>>>>
>>>>>>
>>>>>
>>>
>

Reply via email to