jenkins-bot has submitted this change and it was merged.

Change subject: Intersect generators
......................................................................


Intersect generators

It used in pagegenerators.py to intersect generators listed in
command line.

Yield items only if they are yielded by all specified generators.
Threads (via ThreadedGenerator) are used in order to run generators
in parallel, so that items can be yielded before generators are
exhausted.

Threads are stopped when they are either exhausted or Ctrl-C is pressed.
Quitting before all generators are finished is attempted if
there is no more chance of finding an item in all queues.

Refactored ThreadList in tool.py and added method
ThrreadList.stop_all().

Change-Id: I53eb1f747f46e18a3dd220feef1e496fd36853e2
---
M pywikibot/pagegenerators.py
M pywikibot/tools.py
M tests/pagegenerators_tests.py
M tests/thread_tests.py
4 files changed, 200 insertions(+), 13 deletions(-)

Approvals:
  John Vandenberg: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/pywikibot/pagegenerators.py b/pywikibot/pagegenerators.py
index 3e77643..319089e 100644
--- a/pywikibot/pagegenerators.py
+++ b/pywikibot/pagegenerators.py
@@ -26,9 +26,14 @@
 import itertools
 import re
 import time
+
 import pywikibot
 from pywikibot import date, config, i18n
-from pywikibot.tools import deprecated_args, DequeGenerator
+from pywikibot.tools import (
+    deprecated_args,
+    DequeGenerator,
+    intersect_generators,
+)
 from pywikibot.comms import http
 import pywikibot.data.wikidataquery as wdquery
 
@@ -36,11 +41,14 @@
     basestring = (str, )
     unicode = str
 
+_logger = "pagegenerators"
+
 # ported from version 1 for backwards-compatibility
 # most of these functions just wrap a Site or Page method that returns
 # a generator
 
 parameterHelp = u"""\
+
 -cat              Work on all pages which are in a specific category.
                   Argument can also be given as "-cat:categoryname" or
                   as "-cat:categoryname|fromtitle" (using # instead of |
@@ -199,6 +207,7 @@
                   Case insensitive regular expressions will be used and
                   dot matches any character, including a newline.
 
+-intersect        Work on the intersection of all the provided generators.
 """
 
 docuReplacements = {'&params;': parameterHelp}
@@ -233,6 +242,7 @@
         self.step = None
         self.limit = None
         self.articlefilter_list = []
+        self.intersect = False
         self._site = site
 
     @property
@@ -273,10 +283,18 @@
             return None
         elif len(self.gens) == 1:
             gensList = self.gens[0]
+            dupfiltergen = gensList
+            if self.intersect:
+                pywikibot.input(u'Only one generator. '
+                                u'Param "-intersect" has no meaning or 
effect.')
         else:
-            gensList = CombinedPageGenerator(self.gens)
-
-        dupfiltergen = DuplicateFilterPageGenerator(gensList)
+            if self.intersect:
+                gensList = intersect_generators(self.gens)
+                # By definition no duplicates are possible.
+                dupfiltergen = gensList
+            else:
+                gensList = CombinedPageGenerator(self.gens)
+                dupfiltergen = DuplicateFilterPageGenerator(gensList)
 
         if self.articlefilter_list:
             return RegexBodyFilterPageGenerator(
@@ -572,6 +590,9 @@
                 query = pywikibot.input(
                     u'Mysql query string:')
             gen = MySQLPageGenerator(query, site=self.site)
+        elif arg.startswith('-intersect'):
+            self.intersect = True
+            return True
 
         if gen:
             self.gens.append(gen)
diff --git a/pywikibot/tools.py b/pywikibot/tools.py
index c891c83..ad890eb 100644
--- a/pywikibot/tools.py
+++ b/pywikibot/tools.py
@@ -13,7 +13,7 @@
 import time
 import inspect
 import re
-from collections import Mapping, deque
+import collections
 from distutils.version import Version
 
 if sys.version_info[0] > 2:
@@ -256,17 +256,20 @@
 
     """
 
+    _logger = "threadlist"
+
     def __init__(self, limit=128, *args):
+        """Constructor."""
         self.limit = limit
-        list.__init__(self, *args)
-        for item in list(self):
+        super(ThreadList, self).__init__(*args)
+        for item in self:
             if not isinstance(threading.Thread, item):
                 raise TypeError("Cannot add '%s' to ThreadList" % type(item))
 
     def active_count(self):
         """Return the number of alive threads, and delete all non-alive 
ones."""
         count = 0
-        for item in list(self):
+        for item in self[:]:
             if item.isAlive():
                 count += 1
             else:
@@ -274,12 +277,92 @@
         return count
 
     def append(self, thd):
+        """Add a thread to the pool and start it."""
         if not isinstance(thd, threading.Thread):
             raise TypeError("Cannot append '%s' to ThreadList" % type(thd))
         while self.active_count() >= self.limit:
             time.sleep(2)
-        list.append(self, thd)
+        super(ThreadList, self).append(thd)
         thd.start()
+
+    def stop_all(self):
+        """Stop all threads the pool."""
+        if self:
+            debug(u'EARLY QUIT: Threads: %d' % len(self), ThreadList._logger)
+        for thd in self:
+            thd.stop()
+            debug(u'EARLY QUIT: Queue size left in %s: %s'
+                  % (thd, thd.queue.qsize()), ThreadList._logger)
+
+
+def intersect_generators(genlist):
+    """
+    Intersect generators listed in genlist.
+
+    Yield items only if they are yielded by all generators in genlist.
+    Threads (via ThreadedGenerator) are used in order to run generators
+    in parallel, so that items can be yielded before generators are
+    exhausted.
+
+    Threads are stopped when they are either exhausted or Ctrl-C is pressed.
+    Quitting before all generators are finished is attempted if
+    there is no more chance of finding an item in all queues.
+
+    @param genlist: list of page generators
+    @type genlist: list
+    """
+    _logger = ""
+
+    # Item is cached to check that it is found n_gen
+    # times before being yielded.
+    cache = collections.defaultdict(set)
+    n_gen = len(genlist)
+
+    # Class to keep track of alive threads.
+    # Start new threads and remove completed threads.
+    thrlist = ThreadList()
+
+    for source in genlist:
+        threaded_gen = ThreadedGenerator(name=repr(source), target=source)
+        thrlist.append(threaded_gen)
+        debug("INTERSECT: thread started: %r" % threaded_gen, _logger)
+
+    while True:
+        # Get items from queues in a round-robin way.
+        for t in thrlist:
+            try:
+                # TODO: evaluate if True and timeout is necessary.
+                item = t.queue.get(True, 0.1)
+
+                # Cache entry is a set of tuples (item, thread).
+                # Duplicates from same thread are not counted twice.
+                cache[item].add((item, t))
+                if len(cache[item]) == n_gen:
+                    yield item
+                    # Remove item from cache.
+                    # No chance of seeing it again (see later: early stop).
+                    cache.pop(item)
+
+                active = thrlist.active_count()
+                max_cache = n_gen
+                if cache.values():
+                    max_cache = max(len(v) for v in cache.values())
+                # No. of active threads is not enough to reach n_gen.
+                # We can quit even if some thread is still active.
+                # There could be an item in all generators which has not yet
+                # appeared from any generator. Only when we have lost one
+                # generator, then we can bail out early based on seen items.
+                if active < n_gen and n_gen - max_cache > active:
+                    thrlist.stop_all()
+                    return
+            except Queue.Empty:
+                pass
+            except KeyboardInterrupt:
+                thrlist.stop_all()
+            finally:
+                # All threads are done.
+                if thrlist.active_count() == 0:
+                    return
 
 
 class CombinedError(KeyError, IndexError):
@@ -287,7 +370,7 @@
     """An error that gets caught by both KeyError and IndexError."""
 
 
-class EmptyDefault(str, Mapping):
+class EmptyDefault(str, collections.Mapping):
 
     """
     A default for a not existing siteinfo property.
@@ -336,7 +419,7 @@
     """Unicode string with SelfCallMixin."""
 
 
-class DequeGenerator(deque):
+class DequeGenerator(collections.deque):
 
     """A generator that allows items to be added during generating."""
 
diff --git a/tests/pagegenerators_tests.py b/tests/pagegenerators_tests.py
index deeda2a..a816031 100755
--- a/tests/pagegenerators_tests.py
+++ b/tests/pagegenerators_tests.py
@@ -17,6 +17,7 @@
     WikidataTestCase,
     DefaultSiteTestCase,
 )
+from tests.thread_tests import GeneratorIntersectTestCase
 
 
 class TestPageGenerators(TestCase):
@@ -252,6 +253,41 @@
         self.assertPagesInNamespaces(gen, set([1, 3]))
 
 
+class PageGeneratorIntersectTestCase(DefaultSiteTestCase,
+                                     GeneratorIntersectTestCase):
+
+    """Page intersect_generators test cases."""
+
+    def test_intersect_newpages_twice(self):
+        site = self.get_site()
+        self.assertEqualItertools(
+            [pagegenerators.NewpagesPageGenerator(site=site, total=10),
+             pagegenerators.NewpagesPageGenerator(site=site, total=10)])
+
+    def test_intersect_newpages_and_recentchanges(self):
+        site = self.get_site()
+        self.assertEqualItertools(
+            [pagegenerators.NewpagesPageGenerator(site=site, total=50),
+             pagegenerators.RecentChangesPageGenerator(site=site, total=200)])
+
+
+class 
EnglishWikipediaPageGeneratorIntersectTestCase(GeneratorIntersectTestCase):
+
+    """Page intersect_generators test cases."""
+
+    family = 'wikipedia'
+    code = 'en'
+
+    def test_intersect_newpages_csd(self):
+        site = self.get_site()
+        self.assertEqualItertools(
+            [pagegenerators.NewpagesPageGenerator(site=site, total=10),
+             pagegenerators.CategorizedPageGenerator(
+                pywikibot.Category(site,
+                                   'Category:Candidates_for_speedy_deletion'))
+             ])
+
+
 if __name__ == "__main__":
     try:
         unittest.main()
diff --git a/tests/thread_tests.py b/tests/thread_tests.py
index 3d86632..3222a8f 100644
--- a/tests/thread_tests.py
+++ b/tests/thread_tests.py
@@ -7,9 +7,14 @@
 #
 __version__ = '$Id$'
 
+import itertools
+import sys
 
-from tests.aspects import TestCase
-from pywikibot.tools import ThreadedGenerator
+if sys.version_info[0] == 2:
+    from future_builtins import filter
+
+from tests.aspects import unittest, TestCase
+from pywikibot.tools import ThreadedGenerator, intersect_generators
 
 
 class BasicThreadedGeneratorTestCase(TestCase):
@@ -34,3 +39,45 @@
         thd_gen = ThreadedGenerator(target=self.gen_func)
         thd_gen.start()
         self.assertEqual(list(thd_gen), list(iterable))
+
+
+class GeneratorIntersectTestCase(TestCase):
+
+    """Base class for intersect_generators test cases."""
+
+    def assertEqualItertools(self, gens):
+        # If they are a generator, we need to convert to a list
+        # first otherwise the generator is empty the second time.
+        datasets = [list(gen) for gen in gens]
+
+        itertools_result = set(
+            [item[0] for item in filter(
+                lambda lst: all([x == lst[0] for x in lst]),
+                itertools.product(*datasets))
+             ])
+
+        result = list(intersect_generators(datasets))
+
+        self.assertEqual(len(set(result)), len(result))
+
+        self.assertCountEqual(result, itertools_result)
+
+
+class BasicGeneratorIntersectTestCase(GeneratorIntersectTestCase):
+
+    """Disconnected intersect_generators test cases."""
+
+    net = False
+
+    def test_intersect_basic(self):
+        self.assertEqualItertools(['abc', 'db', 'ba'])
+
+    def test_intersect_with_dups(self):
+        self.assertEqualItertools(['aabc', 'dddb', 'baa'])
+
+
+if __name__ == '__main__':
+    try:
+        unittest.main()
+    except SystemExit:
+        pass

-- 
To view, visit https://gerrit.wikimedia.org/r/170832
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I53eb1f747f46e18a3dd220feef1e496fd36853e2
Gerrit-PatchSet: 13
Gerrit-Project: pywikibot/core
Gerrit-Branch: master
Gerrit-Owner: Mpaa <[email protected]>
Gerrit-Reviewer: John Vandenberg <[email protected]>
Gerrit-Reviewer: Ladsgroup <[email protected]>
Gerrit-Reviewer: Merlijn van Deen <[email protected]>
Gerrit-Reviewer: Mpaa <[email protected]>
Gerrit-Reviewer: XZise <[email protected]>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
Pywikibot-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/pywikibot-commits

Reply via email to