I'm trying to use cross_val_score inside a lambda function to take full
advantage of my processors - especially because previously I was having
problems when setting cross_val_score's built in n_jobs > 1. I have
successfully done something similar before, though in a little bit
different way, so I'm surprised I'm having trouble with it.
But I've troubleshooted enough at this point that I'm convinced either
there's something weird happening inside cross_val_score that I don't know
about (I noticed sklearn has a lengthy parallelism handling file) or
there's some nuance about my current approach that I'm simply overlooking.
Below is a shortened, concise version of the code causing problems.
Attached is a slightly longer and better commented (but still shortened)
version of the code. If I set workers to 1, the code works just fine.
Once I go greater than 1, it crashes the python kernel every time once it
hits cross_val_score.
I'm running this on Windows 7 with Anaconda and Python 2.7:
'2.7.6 |Anaconda 1.9.2 (64-bit)| (default, Nov 11 2013, 10:49:15) [MSC
v.1500 64 bit (AMD64)]'
Anyone know what the deal is? Here is the code in question:
class Job(object): # struct for holding job info
pass
workers = 2
jobs_q = Queue()
results_q = Queue()
# pseudo code for loop here
job = Job()
job.estimator = MultinomialNB()
job.x_csrmat = csr_matrix(x_csrmat, copy=True)
job.y_a = np.array(y_a)
job.kfold = KFold(len(y_a), nfolds, random_state=6, shuffle=True)
jobs_q.put(job)
# end pseudo code for loop
def executeWorker():
while True:
job = jobs_q.get()
if job is None:
break
# try/except never catches error, and code never proceeds beyond it
try:
cvScore = mean(cross_validation.cross_val_score(job.estimator,
job.x_csrmat, y=job.y_a, cv=job.kfold))
except Exception as e:
with open(tsOutDir + 'error.txt', 'w') as f:
f.write(traceback.format_exception(*sys.exc_info()) + "\n\nI/O
error({0}): {1}".format(e.errno, e.strerror))
raise
results_q.put(results_df)
workers_list = [Thread(target=executeWorker) for _ in xrange(workers)]
for thread in workers_list:
thread.daemon = True
thread.start()
for _ in xrange(workers):
# jobs_q already full. Add None-s so workers know to stop
jobs_q.put(None)
for thread in workers_list:
thread.join()
class Job(object): # struct for holding job info
pass
workers = 2
jobs_q = Queue()
results_q = Queue()
# Each job is created in a *for loop like this...
job = Job()
# Create new classifier. I've also tried cloning as well - neither solves the
problem
job.estimator = MultinomialNB()
# Create copies of x_csrmat and y_a which were created previously
job.x_csrmat = csr_matrix(x_csrmat, copy=True)
job.y_a = np.array(y_a)
# create new kfold iterator
job.kfold = KFold(len(y_a), nfolds, random_state=6, shuffle=True)
# ...and then put in a queue like this:
jobs_q.put(job)
# *I'm omitting the for loop b/c it's complex and it's not where the problem is
occurring
def executeWorker():
while True:
job = jobs_q.get()
if job is None:
break
start = time.time()
# This portion successfully writes to file for each thread
dictToCSV(job.settings_dict, tsOutDir + str(time.time()) + "-"
+ threading.current_thread().name + "-prior.csv")
# try/except never catches error, and code never proceeds
beyond it
try:
cvScore =
mean(cross_validation.cross_val_score(job.estimator, job.x_csrmat, y=job.y_a,
cv=job.kfold))
except Exception as e:
with open(tsOutDir + 'error.txt', 'w') as f:
f.write(traceback.format_exception(*sys.exc_info()) + "\n\nI/O error({0}):
{1}".format(e.errno, e.strerror))
raise
elapsed = time.time() - start
# job.settings_dict is just a dictionary attached to the job
object to report settings used
job.settings_dict["CVScore"] = [cvScore]
job.settings_dict["Time (s)"] = [round(elapsed)]
job.settings_dict["Time"] = [time.strftime("%H:%M:%S",
time.gmtime(elapsed))]
results_df = pd.DataFrame(job.settings_dict)
results_df.to_csv(tsOutDir + str(time.time()) + "-" +
threading.current_thread().name + "-post.csv")
results_q.put(results_df)
workers_list = [Thread(target=executeWorker) for _ in xrange(workers)]
for thread in workers_list:
# I've tried both with and without setting daemon to True
thread.daemon = True # make interrupting w/ ctrl+c easier
thread.start()
for _ in xrange(workers):
# jobs_q already full. Add None-s so workers know to stop
jobs_q.put(None)
for thread in workers_list:
thread.join()
------------------------------------------------------------------------------
Meet PCI DSS 3.0 Compliance Requirements with EventLog Analyzer
Achieve PCI DSS 3.0 Compliant Status with Out-of-the-box PCI DSS Reports
Are you Audit-Ready for PCI DSS 3.0 Compliance? Download White paper
Comply to PCI DSS 3.0 Requirement 10 and 11.5 with EventLog Analyzer
http://pubads.g.doubleclick.net/gampad/clk?id=154622311&iu=/4140/ostg.clktrk
_______________________________________________
Scikit-learn-general mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/scikit-learn-general