I'm trying to use cross_val_score inside a lambda function to take full
advantage of my processors - especially because previously I was having
problems when setting cross_val_score's built in n_jobs > 1.  I have
successfully done something similar before, though in a little bit
different way, so I'm surprised I'm having trouble with it.

But I've troubleshooted enough at this point that I'm convinced  either
there's something weird happening inside cross_val_score that I don't know
about (I noticed sklearn has a lengthy parallelism handling file) or
there's some nuance about my current approach that I'm simply overlooking.

Below is a shortened, concise version of the code causing problems.
Attached is a slightly longer and better commented (but still shortened)
version of the code.  If I set workers to 1, the code works just fine.
Once I go greater than 1, it crashes the python kernel every time once it
hits cross_val_score.

I'm running this on Windows 7 with Anaconda and Python 2.7:
'2.7.6 |Anaconda 1.9.2 (64-bit)| (default, Nov 11 2013, 10:49:15) [MSC
v.1500 64 bit (AMD64)]'

Anyone know what the deal is? Here is the code in question:




class Job(object):  # struct for holding job info
 pass
workers = 2
jobs_q = Queue()
results_q = Queue()

# pseudo code for loop here
job = Job()
job.estimator = MultinomialNB()
job.x_csrmat = csr_matrix(x_csrmat, copy=True)
job.y_a = np.array(y_a)
job.kfold = KFold(len(y_a), nfolds, random_state=6, shuffle=True)
jobs_q.put(job)
# end pseudo code for loop

def executeWorker():
 while True:
  job = jobs_q.get()
  if job is None:
   break
  # try/except never catches error, and code never proceeds beyond it
  try:
   cvScore = mean(cross_validation.cross_val_score(job.estimator,
job.x_csrmat, y=job.y_a, cv=job.kfold))
  except Exception as e:
   with open(tsOutDir + 'error.txt', 'w') as f:
    f.write(traceback.format_exception(*sys.exc_info()) + "\n\nI/O
error({0}): {1}".format(e.errno, e.strerror))
   raise
  results_q.put(results_df)
workers_list = [Thread(target=executeWorker) for _ in xrange(workers)]
for thread in workers_list:
 thread.daemon = True
 thread.start()
    for _ in xrange(workers):
        # jobs_q already full. Add None-s so workers know to stop
        jobs_q.put(None)
    for thread in workers_list:
        thread.join()
class Job(object):  # struct for holding job info
        pass

workers = 2

jobs_q = Queue()
results_q = Queue()

# Each job is created in a *for loop like this...
job = Job()
# Create new classifier. I've also tried cloning as well - neither solves the 
problem
job.estimator = MultinomialNB()
# Create copies of x_csrmat and y_a which were created previously
job.x_csrmat = csr_matrix(x_csrmat, copy=True)
job.y_a = np.array(y_a)
# create new kfold iterator
job.kfold = KFold(len(y_a), nfolds, random_state=6, shuffle=True)

# ...and then put in a queue like this:
jobs_q.put(job)

# *I'm omitting the for loop b/c it's complex and it's not where the problem is 
occurring

def executeWorker():
        while True:
                job = jobs_q.get()
                if job is None:
                        break
                start = time.time()
                # This portion successfully writes to file for each thread
                dictToCSV(job.settings_dict, tsOutDir + str(time.time()) + "-" 
+ threading.current_thread().name + "-prior.csv")
                # try/except never catches error, and code never proceeds 
beyond it
                try:
                        cvScore = 
mean(cross_validation.cross_val_score(job.estimator, job.x_csrmat, y=job.y_a, 
cv=job.kfold))
                except Exception as e:
                        with open(tsOutDir + 'error.txt', 'w') as f:
                                
f.write(traceback.format_exception(*sys.exc_info()) + "\n\nI/O error({0}): 
{1}".format(e.errno, e.strerror))
                        raise
                elapsed = time.time() - start
                # job.settings_dict is just a dictionary attached to the job 
object to report settings used
                job.settings_dict["CVScore"] = [cvScore]
                job.settings_dict["Time (s)"] = [round(elapsed)]
                job.settings_dict["Time"] = [time.strftime("%H:%M:%S", 
time.gmtime(elapsed))]
                results_df = pd.DataFrame(job.settings_dict)
                results_df.to_csv(tsOutDir + str(time.time()) + "-" + 
threading.current_thread().name + "-post.csv")
                results_q.put(results_df)

workers_list = [Thread(target=executeWorker) for _ in xrange(workers)]
for thread in workers_list:
        # I've tried both with and without setting daemon to True
        thread.daemon = True    # make interrupting w/ ctrl+c easier
        thread.start()

    for _ in xrange(workers):
        # jobs_q already full. Add None-s so workers know to stop
        jobs_q.put(None)

    for thread in workers_list:
        thread.join()
------------------------------------------------------------------------------
Meet PCI DSS 3.0 Compliance Requirements with EventLog Analyzer
Achieve PCI DSS 3.0 Compliant Status with Out-of-the-box PCI DSS Reports
Are you Audit-Ready for PCI DSS 3.0 Compliance? Download White paper
Comply to PCI DSS 3.0 Requirement 10 and 11.5 with EventLog Analyzer
http://pubads.g.doubleclick.net/gampad/clk?id=154622311&iu=/4140/ostg.clktrk
_______________________________________________
Scikit-learn-general mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/scikit-learn-general

Reply via email to