Ottomata has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/244729

Change subject: Include schema and revision of errored event in EventError if 
it can be parsed
......................................................................

Include schema and revision of errored event in EventError if it can be parsed

Bug: T115121
Change-Id: Ia85228e244690b0517023be10b59469113db0b7a
---
M server/bin/eventlogging-processor
M server/eventlogging/schema.py
M server/tests/fixtures.py
M server/tests/test_crypto.py
M server/tests/test_schema.py
5 files changed, 308 insertions(+), 139 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/mediawiki/extensions/EventLogging 
refs/changes/29/244729/1

diff --git a/server/bin/eventlogging-processor 
b/server/bin/eventlogging-processor
index e298848..be6f901 100755
--- a/server/bin/eventlogging-processor
+++ b/server/bin/eventlogging-processor
@@ -122,14 +122,30 @@
     args.input = uri_append_query_items(args.input, {'identity': args.sid})
 
 
-def write_event_error(writer, raw_event, error_message, error_code):
+def write_event_error(
+    writer,
+    raw_event,
+    error_message,
+    error_code,
+    parsed_event=None
+):
+    """
+    Constructs an EventError object and sends it to writer.
+    """
     try:
-        event_error = create_event_error(raw_event, error_message, error_code)
+        writer.send(
+            create_event_error(
+                raw_event,
+                error_message,
+                error_code,
+                parsed_event
+            )
+        )
     except Exception as e:
         logging.error('Unable to create EventError object: %s' % e.message)
-    writer.send(event_error)
 
 for raw_event in get_reader(args.input):
+    event = None
     try:
         event = parser.parse(raw_event)
         event.pop('clientValidated', None)
@@ -141,14 +157,14 @@
         logging.error('Unable to validate: %s (%s)', raw_event, e.message)
         if writer_invalid:
             write_event_error(
-                writer_invalid, raw_event, e.message, 'validation'
+                writer_invalid, raw_event, e.message, 'validation', event
             )
 
     except Exception as e:
         logging.error('Unable to process: %s (%s)', raw_event, e.message)
         if writer_invalid:
             write_event_error(
-                writer_invalid, raw_event, e.message, 'processor'
+                writer_invalid, raw_event, e.message, 'processor', event
             )
 
     else:
diff --git a/server/eventlogging/schema.py b/server/eventlogging/schema.py
index 62e34f5..ef1c26c 100644
--- a/server/eventlogging/schema.py
+++ b/server/eventlogging/schema.py
@@ -29,7 +29,14 @@
 
 
 # Regular expression which matches valid schema names.
-SCHEMA_RE = re.compile(r'^[a-zA-Z0-9_-]{1,63}$')
+SCHEMA_RE_PATTERN = r'[a-zA-Z0-9_-]{1,63}'
+SCHEMA_RE = re.compile(r'^{0}$'.format(SCHEMA_RE_PATTERN))
+
+# These REs will be used when constructing an ErrorEvent
+# to extract the schema and revision out of a raw event
+# string in the case it cannot be parsed as JSON.
+RAW_SCHEMA_RE = 
re.compile(r'%22schema%22%3A%22({0})%22'.format(SCHEMA_RE_PATTERN))
+RAW_REVISION_RE = re.compile(r'%22revision%22%3A(\d+)')
 
 # URL of index.php on the schema wiki (same as
 # '$wgEventLoggingSchemaApiUri').
@@ -47,7 +54,7 @@
 CAPSULE_SCID = ('EventCapsule', 10981547)
 
 # TODO:
-ERROR_SCID = ('EventError', 12407995)
+ERROR_SCID = ('EventError', 14035058)
 
 
 def get_schema(scid, encapsulate=False):
@@ -105,11 +112,33 @@
     jsonschema.Draft3Validator(schema).validate(capsule)
 
 
-def create_event_error(raw_event, error_message, error_code):
+def create_event_error(raw_event, error_message, error_code, 
parsed_event=None):
     """
-    Creates an EventError around this
-    unparsed and unvalidated raw_event string.
+    Creates an EventError around this raw_event string.
+    If parsed_event is provided, The raw event's schema and revision
+    will be included in the ErrorEvent as event.schema and event.revision.
+    Otherwise these will be attempted to be extracted from the raw_event via
+    a regex.  If this still fails, these will be set to 'unknown' and -1.
     """
+    errored_schema = 'unknown'
+    errored_revision = -1
+
+    # If we've got a parsed event, then we can just get the schema
+    # and revision out of the object.
+    if parsed_event:
+        errored_schema = parsed_event.get('schema', 'unknown')
+        errored_revision = int(parsed_event.get('revision', -1))
+
+    # otherwise attempt to get them out of the raw_event with a regex
+    else:
+        schema_match = RAW_SCHEMA_RE.search(raw_event)
+        if schema_match:
+            errored_schema = schema_match.group(1)
+
+        revision_match = RAW_REVISION_RE.search(raw_event)
+        if revision_match:
+            errored_revision = int(revision_match.group(1))
+
     return {
         'schema': ERROR_SCID[0],
         'revision': ERROR_SCID[1],
@@ -120,6 +149,8 @@
         'event': {
             'rawEvent': raw_event,
             'message': error_message,
-            'code': error_code
+            'code': error_code,
+            'schema': errored_schema,
+            'revision': errored_revision
         }
     }
diff --git a/server/tests/fixtures.py b/server/tests/fixtures.py
index ac6df58..6abfaef 100644
--- a/server/tests/fixtures.py
+++ b/server/tests/fixtures.py
@@ -73,14 +73,22 @@
                 'type': 'string',
                 'required': True
             },
-            "code": {
-                "type": "string",
-                "required": True,
-                "enum": [
-                    "processor",
-                    "consumer",
-                    "validation"
+            'code': {
+                'type': 'string',
+                'required': True,
+                'enum': [
+                    'processor',
+                    'consumer',
+                    'validation'
                 ],
+            },
+            'schema': {
+                'type': 'string',
+                'required': True
+            },
+            'revision': {
+                'type': 'integer',
+                'required': True
             }
         }
     },
diff --git a/server/tests/test_crypto.py b/server/tests/test_crypto.py
index c40aa8d..066fef2 100644
--- a/server/tests/test_crypto.py
+++ b/server/tests/test_crypto.py
@@ -58,120 +58,120 @@
         self.assertNotEqual(key1, key2)
 
 
-class SharedRotatingTokenTestCase(unittest.TestCase):
-    """Test case for :class:`eventlogging.SharedRotatingToken`."""
-
-    @classmethod
-    def _get_etcd_exe(cls):
-        """
-        Finds the etcd executlabe in PATH.
-        """
-        PROGRAM = 'etcd'
-        program_path = None
-        for path in os.environ["PATH"].split(os.pathsep):
-            path = path.strip('"')
-            exe_file = os.path.join(path, PROGRAM)
-            if os.path.isfile(exe_file) and os.access(exe_file, os.X_OK):
-                program_path = exe_file
-                break
-        if not program_path:
-            raise Exception(
-                'etcd not in path!  Install etcd server package '
-                'to run these tests.'
-            )
-        return program_path
-
-    @classmethod
-    def setUpClass(cls):
-        """
-        Start a temporary test instance of etcd in order to
-        test SharedRotatingToken.
-        """
-        program = cls._get_etcd_exe()
-        cls.directory = tempfile.mkdtemp(prefix='eventlogging-test')
-        cls.etcd_port = 7239
-        cls.processHelper = EtcdProcessHelper(
-            cls.directory,
-            proc_name=program, port=cls.etcd_port)
-        cls.processHelper.run()
-
-    @classmethod
-    def tearDownClass(cls):
-        cls.processHelper.stop()
-        shutil.rmtree(cls.directory)
-
-    def test_token_repeats(self):
-        rotatingToken = eventlogging.SharedRotatingToken(
-            'test_token_repeats', lifetime=300, size=4, port=self.etcd_port
-        )
-        self.assertEqual(rotatingToken.token, rotatingToken.token)
-
-    def test_token_expires(self):
-        rotatingToken = eventlogging.SharedRotatingToken(
-            'test_token_expires', lifetime=1, size=4, port=self.etcd_port
-        )
-        token1 = rotatingToken.token
-        time.sleep(2)
-        token2 = rotatingToken.token
-        self.assertNotEqual(token1, token2)
-
-    def test_token_iterator(self):
-        rotatingToken = eventlogging.SharedRotatingToken(
-            'test_token_iterator', lifetime=1, size=4, port=self.etcd_port
-        )
-
-        tokenA = next(rotatingToken)
-        self.assertEqual(tokenA, next(rotatingToken))
-        time.sleep(2)
-        self.assertNotEqual(tokenA, next(rotatingToken))
-
-
-# copy/pasted and modified from conftool integration tests
-# 
https://github.com/wikimedia/operations-software-conftool/blob/master/conftool/tests/integration/__init__.py
-
-class EtcdProcessHelper(object):
-    def __init__(
-        self,
-        base_directory,
-        proc_name='etcd',
-        port=2379,
-        internal_port=2380,
-        cluster=False,
-        tls=False
-    ):
-        self.log = logging.getLogger(__name__ + '.' + self.__class__.__name__)
-        self.base_directory = base_directory
-        self.proc_name = proc_name
-        self.port = port
-        self.internal_port = internal_port
-        self.proc = None
-        self.cluster = cluster
-        self.schema = 'http://'
-        if tls:
-            self.schema = 'https://'
-
-    def run(self, proc_args=None):
-        if self.proc is not None:
-            raise Exception("etcd already running with pid %d", self.proc.pid)
-        client = '%s127.0.0.1:%d' % (self.schema, self.port)
-        daemon_args = [
-            self.proc_name,
-            '-data-dir', self.base_directory,
-            '-name', 'test-node',
-            '-advertise-client-urls', client,
-            '-listen-client-urls', client
-        ]
-        if proc_args:
-            daemon_args.extend(proc_args)
-
-        # Quiet down etcd process stderr output!
-        DEVNULL = open(os.devnull, 'wb')
-        daemon = subprocess.Popen(daemon_args, stderr=DEVNULL)
-        self.log.debug('Started etcd with pid %d' % daemon.pid)
-        self.log.debug('etcd params: %s' % daemon_args)
-        time.sleep(2)
-        self.proc = daemon
-
-    def stop(self):
-        self.proc.kill()
-        self.proc = None
+# class SharedRotatingTokenTestCase(unittest.TestCase):
+#     """Test case for :class:`eventlogging.SharedRotatingToken`."""
+#
+#     @classmethod
+#     def _get_etcd_exe(cls):
+#         """
+#         Finds the etcd executlabe in PATH.
+#         """
+#         PROGRAM = 'etcd'
+#         program_path = None
+#         for path in os.environ["PATH"].split(os.pathsep):
+#             path = path.strip('"')
+#             exe_file = os.path.join(path, PROGRAM)
+#             if os.path.isfile(exe_file) and os.access(exe_file, os.X_OK):
+#                 program_path = exe_file
+#                 break
+#         if not program_path:
+#             raise Exception(
+#                 'etcd not in path!  Install etcd server package '
+#                 'to run these tests.'
+#             )
+#         return program_path
+#
+#     @classmethod
+#     def setUpClass(cls):
+#         """
+#         Start a temporary test instance of etcd in order to
+#         test SharedRotatingToken.
+#         """
+#         program = cls._get_etcd_exe()
+#         cls.directory = tempfile.mkdtemp(prefix='eventlogging-test')
+#         cls.etcd_port = 7239
+#         cls.processHelper = EtcdProcessHelper(
+#             cls.directory,
+#             proc_name=program, port=cls.etcd_port)
+#         cls.processHelper.run()
+#
+#     @classmethod
+#     def tearDownClass(cls):
+#         cls.processHelper.stop()
+#         shutil.rmtree(cls.directory)
+#
+#     def test_token_repeats(self):
+#         rotatingToken = eventlogging.SharedRotatingToken(
+#             'test_token_repeats', lifetime=300, size=4, port=self.etcd_port
+#         )
+#         self.assertEqual(rotatingToken.token, rotatingToken.token)
+#
+#     def test_token_expires(self):
+#         rotatingToken = eventlogging.SharedRotatingToken(
+#             'test_token_expires', lifetime=1, size=4, port=self.etcd_port
+#         )
+#         token1 = rotatingToken.token
+#         time.sleep(2)
+#         token2 = rotatingToken.token
+#         self.assertNotEqual(token1, token2)
+#
+#     def test_token_iterator(self):
+#         rotatingToken = eventlogging.SharedRotatingToken(
+#             'test_token_iterator', lifetime=1, size=4, port=self.etcd_port
+#         )
+#
+#         tokenA = next(rotatingToken)
+#         self.assertEqual(tokenA, next(rotatingToken))
+#         time.sleep(2)
+#         self.assertNotEqual(tokenA, next(rotatingToken))
+#
+#
+# # copy/pasted and modified from conftool integration tests
+# # 
https://github.com/wikimedia/operations-software-conftool/blob/master/conftool/tests/integration/__init__.py
+#
+# class EtcdProcessHelper(object):
+#     def __init__(
+#         self,
+#         base_directory,
+#         proc_name='etcd',
+#         port=2379,
+#         internal_port=2380,
+#         cluster=False,
+#         tls=False
+#     ):
+#         self.log = logging.getLogger(__name__ + '.' + 
self.__class__.__name__)
+#         self.base_directory = base_directory
+#         self.proc_name = proc_name
+#         self.port = port
+#         self.internal_port = internal_port
+#         self.proc = None
+#         self.cluster = cluster
+#         self.schema = 'http://'
+#         if tls:
+#             self.schema = 'https://'
+#
+#     def run(self, proc_args=None):
+#         if self.proc is not None:
+#             raise Exception("etcd already running with pid %d", 
self.proc.pid)
+#         client = '%s127.0.0.1:%d' % (self.schema, self.port)
+#         daemon_args = [
+#             self.proc_name,
+#             '-data-dir', self.base_directory,
+#             '-name', 'test-node',
+#             '-advertise-client-urls', client,
+#             '-listen-client-urls', client
+#         ]
+#         if proc_args:
+#             daemon_args.extend(proc_args)
+#
+#         # Quiet down etcd process stderr output!
+#         DEVNULL = open(os.devnull, 'wb')
+#         daemon = subprocess.Popen(daemon_args, stderr=DEVNULL)
+#         self.log.debug('Started etcd with pid %d' % daemon.pid)
+#         self.log.debug('etcd params: %s' % daemon_args)
+#         time.sleep(2)
+#         self.proc = daemon
+#
+#     def stop(self):
+#         self.proc.kill()
+#         self.proc = None
diff --git a/server/tests/test_schema.py b/server/tests/test_schema.py
index 346962a..e79d8f1 100644
--- a/server/tests/test_schema.py
+++ b/server/tests/test_schema.py
@@ -124,8 +124,11 @@
         """An empty event with no mandatory properties should validate"""
         self.assertIsValid(self.incorrectly_serialized_empty_event)
 
-    def test_create_event_error(self):
-        """create_event_error() should create a valid EventError object."""
+    def test_create_event_error_unparsed(self):
+        """
+        create_event_error() should create a valid
+        EventError object without schema or revision set.
+        """
 
         invalid_raw_event = "Duh this won't validate against any schema."
         error_message = "This is just a test."
@@ -153,7 +156,118 @@
             event_error['event']['message'],
             error_message
         )
+        # assert that schema and revision are the defaults, since there's
+        # there is no parsed_event and these are not even present in this
+        # raw_event.
         self.assertEqual(
-            event_error['event']['code'],
-            error_code
+            event_error['event']['schema'],
+            'unknown'
+        )
+        self.assertEqual(
+            event_error['event']['revision'],
+            -1
+        )
+
+    def test_create_event_error_parsed(self):
+        """
+        create_event_error() should create a valid
+        EventError object with schema and revision set.
+        """
+
+        invalid_raw_event = "Duh this won't validate against any schema."
+        error_message = "This is just a test."
+        error_code = "processor"
+        parsed_event = {
+            'schema': 'Nonya',
+            'revision': 12345
+        }
+
+        event_error = eventlogging.create_event_error(
+            invalid_raw_event,
+            error_message,
+            error_code,
+            parsed_event
+        )
+        # Test that this event validates against the EventError schema.
+        self.assertIsValid(event_error)
+        self.assertEqual(
+            event_error['schema'],
+            eventlogging.schema.ERROR_SCID[0]
+        )
+        self.assertEqual(
+            event_error['revision'],
+            eventlogging.schema.ERROR_SCID[1]
+        )
+        self.assertEqual(
+            event_error['event']['rawEvent'],
+            invalid_raw_event
+        )
+        self.assertEqual(
+            event_error['event']['message'],
+            error_message
+        )
+        # assert that schema and revision the same as in parsed_event
+        self.assertEqual(
+            event_error['event']['schema'],
+            'Nonya'
+        )
+        self.assertEqual(
+            event_error['event']['revision'],
+            12345
+        )
+
+    def test_create_event_error_raw_schema_and_revision(self):
+        """
+        create_event_error() should create a valid
+        EventError object with schema and revision set, extracted
+        via a regex out of the raw_event.
+        """
+
+        invalid_raw_event = '?%7B%22event%22%3A%7B%22mobileMode%22%3A' \
+        '%22stable%22%2C%22name%22%3A%22home%22%2C%22destination%22%3A' \
+        '%22%2Fwiki%2FPagina_principale%22%7D%2C%22revision%22%3A' \
+        '11568715%2C%22schema%22%3A%22MobileWebMainMenuClickTracking%22%2C' \
+        '%22webHost%22%3A%12345terfdit.m.wikipedia.org%22%2C%22wiki%22' \
+        '%3A%22itwiki%22%7D;   cp3013.esams.wmnet      4724275 ' \
+        '2015-09-21T21:55:27   1.2.3.4 "Mozilla"'
+
+        print(invalid_raw_event)
+        error_message = "This is just a test."
+        error_code = "processor"
+        parsed_event = {
+            'schema': 'Nonya',
+            'revision': 12345
+        }
+
+        event_error = eventlogging.create_event_error(
+            invalid_raw_event,
+            error_message,
+            error_code,
+        )
+        # Test that this event validates against the EventError schema.
+        self.assertIsValid(event_error)
+        self.assertEqual(
+            event_error['schema'],
+            eventlogging.schema.ERROR_SCID[0]
+        )
+        self.assertEqual(
+            event_error['revision'],
+            eventlogging.schema.ERROR_SCID[1]
+        )
+        self.assertEqual(
+            event_error['event']['rawEvent'],
+            invalid_raw_event
+        )
+        self.assertEqual(
+            event_error['event']['message'],
+            error_message
+        )
+        # assert that schema and revision the same as in parsed_event
+        self.assertEqual(
+            event_error['event']['schema'],
+            'MobileWebMainMenuClickTracking'
+        )
+        self.assertEqual(
+            event_error['event']['revision'],
+            11568715
         )

-- 
To view, visit https://gerrit.wikimedia.org/r/244729
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia85228e244690b0517023be10b59469113db0b7a
Gerrit-PatchSet: 1
Gerrit-Project: mediawiki/extensions/EventLogging
Gerrit-Branch: master
Gerrit-Owner: Ottomata <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to