It's a generated set of shell commands to run (written in C, highly optimized numerical computer), which is create from a set of user provided parameters.
The snippet above is: task_outfiles_to_cmds = OrderedDict(run_sieving.leftover_tasks) task_outfiles_to_cmds.update(generate_sieving_task_commands(parameters, poly_path, fb_paths)) task_commands = list(task_outfiles_to_cmds.values()) task_path = os.path.join(utils.WORK_DIR, "sieving_tasks") if not os.path.exists(task_path): os.makedirs(task_path) batch_size = utils.TOTAL_CORES task_batches = [task_commands[c:c+batch_size] for c in range(0, len(task_commands),batch_size)] Which does not reference the SparkContext or StreamingContext at all. -- Shaanan Cohney PhD Student University of Pennsylvania shaan...@gmail.com On Tue, Jun 23, 2015 at 1:05 AM, Benjamin Fradet <benjamin.fra...@gmail.com> wrote: > Where does "task_batches" come from? > On 22 Jun 2015 4:48 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote: > >> Thanks, >> >> I've updated my code to use updateStateByKey but am still getting these >> errors when I resume from a checkpoint. >> >> One thought of mine was that I used sc.parallelize to generate the RDDs >> for the queue, but perhaps on resume, it doesn't recreate the context >> needed? >> >> >> >> -- >> >> Shaanan Cohney >> PhD Student >> University of Pennsylvania >> >> >> shaan...@gmail.com >> >> On Mon, Jun 22, 2015 at 9:27 PM, Benjamin Fradet < >> benjamin.fra...@gmail.com> wrote: >> >>> I would suggest you have a look at the updateStateByKey transformation >>> in the Spark Streaming programming guide which should fit your needs better >>> than your update_state function. >>> On 22 Jun 2015 1:03 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote: >>> >>>> Counts is a list (counts = []) in the driver, used to collect the >>>> results. >>>> It seems like it's also not the best way to be doing things, but I'm >>>> new to spark and editing someone else's code so still learning. >>>> Thanks! >>>> >>>> >>>> def update_state(out_files, counts, curr_rdd): >>>> try: >>>> for c in curr_rdd.collect(): >>>> fnames, count = c >>>> counts.append(count) >>>> out_files |= fnames >>>> except Py4JJavaError as e: >>>> print("EXCEPTION: %s" % str(e)) >>>> >>>> -- >>>> >>>> Shaanan Cohney >>>> PhD Student >>>> University of Pennsylvania >>>> >>>> >>>> shaan...@gmail.com >>>> >>>> On Mon, Jun 22, 2015 at 8:56 PM, Benjamin Fradet < >>>> benjamin.fra...@gmail.com> wrote: >>>> >>>>> What does "counts" refer to? >>>>> >>>>> Could you also paste the code of your "update_state" function? >>>>> On 22 Jun 2015 12:48 pm, "Shaanan Cohney" <shaan...@gmail.com> wrote: >>>>> >>>>>> I'm receiving the SPARK-5063 error (RDD transformations and actions >>>>>> can only be invoked by the driver, not inside of other transformations) >>>>>> whenever I try and restore from a checkpoint in spark streaming on my >>>>>> app. >>>>>> >>>>>> I'm using python3 and my RDDs are inside a queuestream DStream. >>>>>> >>>>>> This is the little chunk of code causing issues: >>>>>> >>>>>> ----- >>>>>> >>>>>> p_batches = [sc.parallelize(batch) for batch in task_batches] >>>>>> >>>>>> sieving_tasks = ssc.queueStream(p_batches) >>>>>> sieving_tasks.checkpoint(20) >>>>>> relations = sieving_tasks.map(lambda s: run_sieving_command(s, poly, >>>>>> poly_path, fb_paths)) >>>>>> relations.reduce(lambda a, b: (a[0] | b[0], a[1] + b[1]) >>>>>> ).foreachRDD(lambda s: update_state(out_files, counts, s)) >>>>>> ssc.checkpoint(s3n_path) >>>>>> >>>>>> ----- >>>>>> >>>>>> Thanks again! >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> >>>>>> Shaanan Cohney >>>>>> PhD Student >>>>>> University of Pennsylvania >>>>>> >>>>>> >>>>>> shaan...@gmail.com >>>>>> >>>>> >>>> >>