Hello Alex,
Unfortunately they aren't thread safe. Note for other readers: This isn't a
problem at all in the context of Scrapy because it's used in a
single-threaded context. The library didn't need nor claim to be thread
safe.
Back to the threading issues. It was easy to confirm for FifoMemoryQueue
with a simple test (see at the end of the e-mail) and the others aren't by
inspection:
FifoMemoryQueue is not thread safe because of this:
return q.popleft() <- 2. pop later, it might be empty in the
meanwhile if q <- 1. check for emptyness i else None
PriorityQueue is unfortunately certainly not thread-safe.
You can see for example here that there's no lock and if two different
threads enter it at the same time, two different queues might be created
for the same priority:
T1: def push(self, obj, priority=0):
T1: if priority not in self.queues:
-----------
T2: def push(self, obj, priority=0):
T2: if priority not in self.queues:
T2: self.queues[priority] = self.qfactory(priority)
T2: q = self.queues[priority]
T2: q.push(obj) <---------- Here the object is added to the queue
-----------
T1: self.queues[priority] = self.qfactory(priority) <- a new
queue is crated and the object from T2 gets lost.
T1: q = self.queues[priority]
FifoDiskQueue/LifoDiskQueue are not thread safe for the same reason:
T1: def push(self, string):
T1: ...
T1: hnum, hpos = self.info['head']
T1: ...
-----------
T2: def push(self, string):
T2: ...
T2: hnum, hpos = self.info['head']
T2:
T2: if hpos == self.chunksize:
T2: hpos = 0 <---------- Moves to the next chunk
T2: hnum += 1
-----------
T1: if hpos == self.chunksize:
T1: hpos = 0 <---------- Moves to the next chunk again
T1: hnum += 1
FifoSQLiteQueue is close but not thread safe either unfortunately.
T1: for id_, item in conn.execute(self._sql_pop):
-----------
T2: for id_, item in conn.execute(self._sql_pop):
T2: conn.execute(self._sql_del, (id_,))
T2: return item <---------- Returns item
-----------
T1: conn.execute(self._sql_del, (id_,)) <---------- Doesn't
delete anything, but it's ok
T1: return item <---------- Returns item again
==== Thread safety test ====
class FifoMemoryThreadingTest(unittest.TestCase):
def push_and_pop(self):
for i in xrange(self.itemcnt):
self.q.push(i)
while True:
i = self.q.pop()
if i is None:
break
self.sum += i
def test_threading(self):
self.q = FifoMemoryQueue()
thread_cnt = 100
self.itemcnt = 1000
self.sum = 0
threads = []
for i in xrange(thread_cnt):
t = threading.Thread(target=self.push_and_pop)
threads.append(t)
t.start()
for t in threads:
t.join()
self.assertEqual(thread_cnt * self.itemcnt * (self.itemcnt-1) / 2,
self.sum)
On Saturday, March 19, 2016 at 11:13:59 PM UTC, Alex Railean wrote:
>
> Yes, still interested, thank you for the reaction.
>
> How can I help with the process?
>
--
You received this message because you are subscribed to the Google Groups
"scrapy-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/scrapy-users.
For more options, visit https://groups.google.com/d/optout.