It's a generated set of shell commands to run (written in C, highly
optimized numerical computer), which is create from a set of user provided
parameters.

The snippet above is:

        task_outfiles_to_cmds = OrderedDict(run_sieving.leftover_tasks)

task_outfiles_to_cmds.update(generate_sieving_task_commands(parameters,
poly_path, fb_paths))

        task_commands = list(task_outfiles_to_cmds.values())
        task_path = os.path.join(utils.WORK_DIR, "sieving_tasks")
        if not os.path.exists(task_path):
            os.makedirs(task_path)

        batch_size = utils.TOTAL_CORES
        task_batches = [task_commands[c:c+batch_size] for c in range(0,
len(task_commands),batch_size)]

Which does not reference the SparkContext or StreamingContext at all.



--

Shaanan Cohney
PhD Student
University of Pennsylvania


shaan...@gmail.com

On Tue, Jun 23, 2015 at 1:05 AM, Benjamin Fradet <benjamin.fra...@gmail.com>
wrote:

> Where does "task_batches" come from?
> On 22 Jun 2015 4:48 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote:
>
>> Thanks,
>>
>> I've updated my code to use updateStateByKey but am still getting these
>> errors when I resume from a checkpoint.
>>
>> One thought of mine was that I used sc.parallelize to generate the RDDs
>> for the queue, but perhaps on resume, it doesn't recreate the context
>> needed?
>>
>>
>>
>> --
>>
>> Shaanan Cohney
>> PhD Student
>> University of Pennsylvania
>>
>>
>> shaan...@gmail.com
>>
>> On Mon, Jun 22, 2015 at 9:27 PM, Benjamin Fradet <
>> benjamin.fra...@gmail.com> wrote:
>>
>>> I would suggest you have a look at the updateStateByKey transformation
>>> in the Spark Streaming programming guide which should fit your needs better
>>> than your update_state function.
>>> On 22 Jun 2015 1:03 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote:
>>>
>>>> Counts is a list (counts = []) in the driver, used to collect the
>>>> results.
>>>> It seems like it's also not the best way to be doing things, but I'm
>>>> new to spark and editing someone else's code so still learning.
>>>> Thanks!
>>>>
>>>>
>>>> def update_state(out_files, counts, curr_rdd):
>>>>     try:
>>>>         for c in curr_rdd.collect():
>>>>             fnames, count = c
>>>>             counts.append(count)
>>>>             out_files |= fnames
>>>>     except Py4JJavaError as e:
>>>>         print("EXCEPTION: %s" % str(e))
>>>>
>>>> --
>>>>
>>>> Shaanan Cohney
>>>> PhD Student
>>>> University of Pennsylvania
>>>>
>>>>
>>>> shaan...@gmail.com
>>>>
>>>> On Mon, Jun 22, 2015 at 8:56 PM, Benjamin Fradet <
>>>> benjamin.fra...@gmail.com> wrote:
>>>>
>>>>> What does "counts" refer to?
>>>>>
>>>>> Could you also paste the code of your "update_state" function?
>>>>> On 22 Jun 2015 12:48 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote:
>>>>>
>>>>>> I'm receiving the SPARK-5063 error (RDD transformations and actions
>>>>>> can only be invoked by the driver, not inside of other transformations)
>>>>>> whenever I try and restore from a checkpoint in spark streaming on my 
>>>>>> app.
>>>>>>
>>>>>> I'm using python3 and my RDDs are inside a queuestream DStream.
>>>>>>
>>>>>> This is the little chunk of code causing issues:
>>>>>>
>>>>>> -----
>>>>>>
>>>>>> p_batches = [sc.parallelize(batch) for batch in task_batches]
>>>>>>
>>>>>> sieving_tasks = ssc.queueStream(p_batches)
>>>>>> sieving_tasks.checkpoint(20)
>>>>>> relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly,
>>>>>> poly_path, fb_paths))
>>>>>> relations.reduce(lambda a, b: (a[0] | b[0], a[1] + b[1])
>>>>>> ).foreachRDD(lambda s: update_state(out_files, counts, s))
>>>>>> ssc.checkpoint(s3n_path)
>>>>>>
>>>>>> -----
>>>>>>
>>>>>> Thanks again!
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Shaanan Cohney
>>>>>> PhD Student
>>>>>> University of Pennsylvania
>>>>>>
>>>>>>
>>>>>> shaan...@gmail.com
>>>>>>
>>>>>
>>>>
>>

Reply via email to