Changeset: 635c867ab029 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=635c867ab029
Added Files:
        clients/iotapi/tests/websocketclient.py
Modified Files:
        clients/iotapi/requirements.txt
        clients/iotapi/src/WebSockets/jsonschemas.py
        clients/iotapi/src/WebSockets/websockets.py
        clients/iotapi/tests/frontendtests.py
        clients/iotapi/tests/main.py
        clients/iotclient/requirements.txt
Branch: iot
Log Message:

Fixed front end test


diffs (truncated from 449 to 300 lines):

diff --git a/clients/iotapi/requirements.txt b/clients/iotapi/requirements.txt
--- a/clients/iotapi/requirements.txt
+++ b/clients/iotapi/requirements.txt
@@ -1,12 +1,12 @@
 git+https://github.com/dpallot/simple-websocket-server.git
-IPy==0.83
-jsonschema==2.5.1
-python-dateutil==2.5.3
-python-monetdb==11.24.0
-pytz==2016.4
-requests==2.10.0
-Sphinx==1.4.4
-sphinx-rtd-theme==0.1.9
-tzlocal==1.2.2
-watchdog==0.8.3
-websocket-client==0.37.0
+IPy>=0.83
+jsonschema>=2.5.1
+python-dateutil>=2.5.3
+python-monetdb>=11.19.3.2
+pytz>=2016.4
+requests>=2.10.0
+Sphinx>=1.4.4
+sphinx-rtd-theme>=0.1.9
+tornado>=3.2.2
+tzlocal>=1.2.2
+watchdog>=0.8.3
diff --git a/clients/iotapi/src/WebSockets/jsonschemas.py 
b/clients/iotapi/src/WebSockets/jsonschemas.py
--- a/clients/iotapi/src/WebSockets/jsonschemas.py
+++ b/clients/iotapi/src/WebSockets/jsonschemas.py
@@ -1,37 +1,35 @@
 SUBSCRIBE_OPTS = ["sub", "subscribe"]
 UNSUBSCRIBE_OPTS = ["unsub", "unsubscribe"]
 INFO_OPTS = ["info"]
-CONCAT_SUB_OPTS = SUBSCRIBE_OPTS + UNSUBSCRIBE_OPTS
 READ_OPTS = ["read"]
 
 CLIENTS_INPUTS_SCHEMA = {
-    "title": "JSON schema publish/subscribe streams",
+    "title": "JSON schema fro web api",
     "description": "Validate clients inputs",
     "$schema": "http://json-schema.org/draft-04/schema#";,
     "type": "object",
-
     "anyOf": [{
         "properties": {
+            "request": {"type": "string", "enum": SUBSCRIBE_OPTS + 
UNSUBSCRIBE_OPTS + INFO_OPTS},
             "schema": {"type": "string"},
-            "stream": {"type": "string"},
-            "request": {"type": "string", "enum": CONCAT_SUB_OPTS + INFO_OPTS},
+            "stream": {"type": "string"}
         },
-        "required": ["schema", "stream", "request"],
+        "required": ["request", "schema", "stream"],
         "additionalProperties": False
     }, {
         "properties": {
+            "request": {"type": "string", "enum": READ_OPTS},
             "schema": {"type": "string"},
             "stream": {"type": "string"},
-            "request": {"type": "string", "enum": READ_OPTS},
-            "basket": {"type": "integer", "minimum": 1, "default": 1},
+            "basket": {"type": "integer", "minimum": 1},
             "limit": {"type": "integer", "minimum": 0, "default": 100},
             "offset": {"type": "integer", "minimum": 0, "default": 0}
         },
-        "required": ["schema", "stream", "request"],
+        "required": ["request", "schema", "stream", "basket"],
         "additionalProperties": False
     }, {
         "properties": {
-            "request": {"type": "string", "enum": INFO_OPTS}  # for all streams
+            "request": {"type": "string", "enum": INFO_OPTS}
         },
         "required": ["request"],
         "additionalProperties": False
diff --git a/clients/iotapi/src/WebSockets/websockets.py 
b/clients/iotapi/src/WebSockets/websockets.py
--- a/clients/iotapi/src/WebSockets/websockets.py
+++ b/clients/iotapi/src/WebSockets/websockets.py
@@ -68,10 +68,10 @@ class IOTAPI(WebSocket):
             elif input_schema['request'] in UNSUBSCRIBE_OPTS:
                 self.unsubscribe(input_schema['schema'], 
input_schema['stream'])
             elif input_schema['request'] in READ_OPTS:
-                basket = input_schema.get('basket', 1)
                 limit = input_schema.get('limit', 100)
                 offset = input_schema.get('offset', 0)
-                self.read_stream_batch(input_schema['schema'], 
input_schema['stream'], basket, limit, offset)
+                self.read_stream_batch(input_schema['schema'], 
input_schema['stream'],
+                                       input_schema['basket'], limit, offset)
             elif input_schema['request'] in INFO_OPTS:
                 if len(input_schema) == 1:  # get all streams information
                     self.get_streams_data()
@@ -88,7 +88,7 @@ class IOTAPI(WebSocket):
         self._subscriptions[concatenated_name] = stream
         self._subscriptions_locker.release()
         self.sendJSONMessage((('response', 'subscribed'), ('schema', 
schema_name), ('stream', stream_name)))
-        add_log(20, ''.join(['Client ', self.address[0], 'subscribed to stream 
', concatenated_name]))
+        add_log(20, ''.join(['Client ', self.address[0], ' subscribed to 
stream ', concatenated_name]))
 
     def unsubscribe(self, schema_name, stream_name):
         concatenated_name = IOTStreams.get_context_entry_name(schema_name, 
stream_name)
diff --git a/clients/iotapi/tests/frontendtests.py 
b/clients/iotapi/tests/frontendtests.py
--- a/clients/iotapi/tests/frontendtests.py
+++ b/clients/iotapi/tests/frontendtests.py
@@ -1,100 +1,107 @@
 import json
 import os
 import requests
-import websocket
 
 from distutils.dir_util import copy_tree
 from threading import Thread
 from time import sleep
+from tornado import ioloop
 from unittest import TestCase
+from websocketclient import WebSocketClient
 
 __all__ = ['NullablesTest']
 
 WEB_SOCKETS_THREADS_TIMEOUT = 15
 
 
-class BaseFrontEndTest(TestCase):
+class BaseFrontEndTest(TestCase, WebSocketClient):
 
-    def __init__(self, **kwargs):
-        super(BaseFrontEndTest, self).__init__()
-        self._web_server_baskets_location = 
os.path.join(kwargs['iot_client_path'], 'baskets')
-        self._web_api_baskets_location = os.path.join(kwargs['iot_api_path'], 
'baskets')
-        self.schema = "tests"
-
-    def export_inserts(self, schema, stream, basket):
-        input_dir = os.path.join(self._web_server_baskets_location, schema, 
stream, basket)
-        output_dir = os.path.join(self._web_api_baskets_location, schema, 
stream, basket)
-        copy_tree(input_dir, output_dir)
-
-
-class TestWebSocket(websocket.WebSocketApp):
-
-    def __init__(self, test, url, header=[], on_open=None, on_message=None, 
on_error=None, on_close=None, on_ping=None,
-                 on_pong=None, on_cont_message=None, keep_running=True, 
get_mask_key=None, cookie=None,
-                 subprotocols=None, on_data=None):
-        super(TestWebSocket, self).__init__(url, header, on_open, on_message, 
on_error, on_close, on_ping,
-                                            on_pong, on_cont_message, 
keep_running, get_mask_key, cookie,
-                                            subprotocols, on_data)
-        self.test = test
+    def __init__(self, iot_client_path, iot_api_path, stream_name):
+        TestCase.__init__(self)
+        WebSocketClient.__init__(self)
+        self._web_server_baskets_location = os.path.join(iot_client_path, 
'baskets')
+        self._web_api_baskets_location = os.path.join(iot_api_path, 'baskets')
+        self.schema_name = "tests"
+        self.stream_name = stream_name
         self.ws_state = 1
 
-
-def on_open(ws):
-    ws.send(json.dumps({"request": "sub", "schema": ws.test.schema, "stream":  
ws.test.stream}))
-
-
-def on_message(ws, message):
-    resp = json.loads(message)
-    if resp['response'] == 'error':
-        ws.test.fail(msg=resp['message'])
-    elif ws.ws_state == 1:
-        ws.test.assertDictEqual({"response": "subscribed", "schema": 
ws.test.schema, "stream": ws.test.stream}, resp)
-        ws.ws_state = 2
-    elif ws.ws_state == 2:
-        ws.test.assertDictEqual({"response": "notification", "schema": 
ws.test.schema, "stream": ws.test.stream,
-                                 "basket": 1, "count": 3}, resp)
-        ws.send(json.dumps({"request": "read", "schema": ws.test.schema, 
"stream": ws.test.stream,
-                            "basket": 1, "offset": 0, "limit": 3}))
-        ws.ws_state = 3
-    elif ws.ws_state == 3:
-        ws.test.assertIn('implicit_timestamp', resp['tuples'][0], 
msg='Timestamp not in stream')
-        del resp['tuples'][0]['implicit_timestamp']
-        del resp['tuples'][1]['implicit_timestamp']
-        del resp['tuples'][2]['implicit_timestamp']
-        res_dic = {'vala': None, "valb": None, "valc": None, "vald": None, 
"vale": None, "valf": None, "valg": None,
-                   "valh": None, "vali": None, "valj": None, "valk": None, 
"vall": None, "valm": None, "valn": None,
-                   "valo": None, "valp": None, "valq": None, "valr": None, 
"vals": None, "valt": None, "valu": None,
-                   "valv": None, "valw": None, "valx": None, "valy": None, 
"valz": None}
-        tuples_response = {"response": "read", "schema": ws.test.schema, 
"stream": ws.test.stream,
-                           "count": 3, "tuples": [res_dic, res_dic, res_dic]}
-        ws.test.assertDictEqual(tuples_response, resp)
-        ws.close()
-    elif ws.ws_state == 4:
-        ws.test.assertDictEqual({"response": "removed", "schema": 
ws.test.schema, "stream": ws.test.stream}, resp)
-        ws.close()
-
-
-def on_error(ws, error):
-    ws.close()
-    ws.test.fail(msg=error)
-
-
-def web_socket(test):
-    ws = TestWebSocket(test=test, url="ws://127.0.0.1:8002/", 
on_message=on_message, on_open=on_open, on_error=on_error)
-    test.ws = ws
-    ws.run_forever()
+    def export_inserts(self, basket):
+        input_dir = os.path.join(self._web_server_baskets_location, 
self.schema_name, self.stream_name, basket)
+        output_dir = os.path.join(self._web_api_baskets_location, 
self.schema_name, self.stream_name, basket)
+        copy_tree(input_dir, output_dir)
 
 
 class NullablesTest(BaseFrontEndTest):
 
-    def __init__(self, **kwargs):
-        super(NullablesTest, self).__init__(**kwargs)
-        self.stream = "nulls"
-        self.ws = None
+    def __init__(self, iot_client_path, iot_api_path):
+        super(NullablesTest, self).__init__(iot_client_path, iot_api_path, 
stream_name="nulls")
+        self._error = ""
+
+    def web_socket_cycle(self):
+        self.connect("ws://127.0.0.1:8002/")
+        try:
+            ioloop.IOLoop.instance().start()
+        except:
+            pass
+
+    def _on_connection_success(self):
+        self.send(''.join(['{"request": "sub", "schema": "', self.schema_name, 
'", "stream": "', self.stream_name,
+                           '"}']))
+
+    def _on_connection_error(self, exception):
+        self.set_error(exception)
+
+    def set_error(self, msg):
+        self._error = msg
+        self.close()
+        ioloop.IOLoop.instance().stop()
+
+    def _on_message(self, message):
+        resp = json.loads(message)
+        if resp['response'] == 'error':
+            self.set_error("Received error message!")
+        elif self.ws_state == 1:
+            if resp != {"response": "subscribed", "schema": self.schema_name, 
"stream": self.stream_name}:
+                self.set_error("Wrong subscribed response!")
+            else:
+                self.ws_state = 2
+        elif self.ws_state == 2:
+            correct_dic = {"response": "notification", "schema": 
self.schema_name, "stream": self.stream_name,
+                           "basket": 1, "count": 3}
+            if resp != correct_dic:
+                self.set_error("Wrong notification response!")
+            else:
+                self.send(''.join(['{"request": "read", "schema": "', 
self.schema_name, '","stream": "',
+                                   self.stream_name, '", "basket": 1, 
"offset": 0, "limit": 3}']))
+                self.ws_state = 3
+        elif self.ws_state == 3:
+            if 'implicit_timestamp' not in resp['tuples'][0]:
+                self.set_error('Timestamp not in result stream')
+            else:
+                del resp['tuples'][0]['implicit_timestamp']
+                del resp['tuples'][1]['implicit_timestamp']
+                del resp['tuples'][2]['implicit_timestamp']
+                res = {"vala": None, "valb": None, "valc": None, "vald": None, 
"vale": None, "valf": None, "valg": None,
+                       "valh": None, "vali": None, "valj": None, "valk": None, 
"vall": None, "valm": None, "valn": None,
+                       "valo": None, "valp": None, "valq": None, "valr": None, 
"vals": None, "valt": None, "valu": None,
+                       "valv": None, "valw": None, "valx": None, "valy": None, 
"valz": None}
+                tuples_response = {"response": "read", "schema": 
self.schema_name, "stream": self.stream_name,
+                                   "count": 3, "tuples": [res, res, res]}
+                if resp != tuples_response:
+                    self.set_error("Wrong notification response!")
+                else:
+                    self.ws_state = 4
+        elif self.ws_state == 4:
+            if resp != {"response": "removed", "schema": self.schema_name, 
"stream": self.stream_name}:
+                self.set_error("Wrong removed response!")
+            else:
+                self.close()
+                ioloop.IOLoop.instance().stop()
 
     def runTest(self):
-        json_str = {"schema": self.schema, "stream": self.stream, 
"has_hostname": False, "flushing": {"base": "auto"},
-                    "columns": [{"name": "vala", "type": "string", "nullable": 
True},
+        json_str = {"schema": self.schema_name, "stream": self.stream_name, 
"has_hostname": False,
+                    "flushing": {"base": "auto"}, "columns": [
+                                {"name": "vala", "type": "string", "nullable": 
True},
                                 {"name": "valb", "type": "uuid", "nullable": 
True},
                                 {"name": "valc", "type": "mac", "nullable": 
True},
                                 {"name": "vald", "type": "url", "nullable": 
True},
@@ -125,24 +132,28 @@ class NullablesTest(BaseFrontEndTest):
 
         self.assertEqual(resp.status_code, 201, msg=resp.text)
 
-        sleep(2)  # we need to sleep to check that the next poll happens
+        sleep(4)  # we need to sleep to check that the next poll happens
 
-        thread = Thread(target=web_socket, args=(self, ))
+        thread = Thread(target=self.web_socket_cycle)
         thread.start()
 
-        resp = requests.post("http://127.0.0.1:8000/stream/%s/%s"; % 
(self.schema, self.stream), json=[{}, {}, {}])
+        resp = requests.post("http://127.0.0.1:8000/stream/%s/%s"; % 
(self.schema_name, self.stream_name),
+                             json=[{}, {}, {}])
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to