Changeset: 8888dce31e77 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=8888dce31e77
Added Files:
        clients/iotapi/tests/__init__.py
        clients/iotapi/tests/frontendtests.py
        clients/iotapi/tests/main.py
Modified Files:
        clients/iotapi/requirements.txt
        clients/iotclient/src/Streams/jsonschemas.py
        clients/iotclient/src/Streams/streams.py
        clients/iotclient/tests/main.py
Branch: iot
Log Message:

Added first Front-End test


diffs (truncated from 364 to 300 lines):

diff --git a/clients/iotapi/requirements.txt b/clients/iotapi/requirements.txt
--- a/clients/iotapi/requirements.txt
+++ b/clients/iotapi/requirements.txt
@@ -4,7 +4,9 @@ jsonschema==2.5.1
 python-dateutil==2.5.3
 python-monetdb==11.24.0
 pytz==2016.4
+requests==2.10.0
 Sphinx==1.4.4
 sphinx-rtd-theme==0.1.9
 tzlocal==1.2.2
 watchdog==0.8.3
+websocket-client==0.37.0
diff --git a/clients/iotapi/tests/__init__.py b/clients/iotapi/tests/__init__.py
new file mode 100644
diff --git a/clients/iotapi/tests/frontendtests.py 
b/clients/iotapi/tests/frontendtests.py
new file mode 100644
--- /dev/null
+++ b/clients/iotapi/tests/frontendtests.py
@@ -0,0 +1,148 @@
+import json
+import os
+import requests
+import websocket
+
+from distutils.dir_util import copy_tree
+from threading import Thread
+from time import sleep
+from unittest import TestCase
+
+__all__ = ['NullablesTest']
+
+WEB_SOCKETS_THREADS_TIMEOUT = 15
+
+
+class BaseFrontEndTest(TestCase):
+
+    def __init__(self, **kwargs):
+        super(BaseFrontEndTest, self).__init__()
+        self._web_server_baskets_location = 
os.path.join(kwargs['iot_client_path'], 'baskets')
+        self._web_api_baskets_location = os.path.join(kwargs['iot_api_path'], 
'baskets')
+        self.schema = "tests"
+
+    def export_inserts(self, schema, stream, basket):
+        input_dir = os.path.join(self._web_server_baskets_location, schema, 
stream, basket)
+        output_dir = os.path.join(self._web_api_baskets_location, schema, 
stream, basket)
+        copy_tree(input_dir, output_dir)
+
+
+class TestWebSocket(websocket.WebSocketApp):
+
+    def __init__(self, test, url, header=[], on_open=None, on_message=None, 
on_error=None, on_close=None, on_ping=None,
+                 on_pong=None, on_cont_message=None, keep_running=True, 
get_mask_key=None, cookie=None,
+                 subprotocols=None, on_data=None):
+        super(TestWebSocket, self).__init__(url, header, on_open, on_message, 
on_error, on_close, on_ping,
+                                            on_pong, on_cont_message, 
keep_running, get_mask_key, cookie,
+                                            subprotocols, on_data)
+        self.test = test
+        self.ws_state = 1
+
+
+def on_open(ws):
+    ws.send(json.dumps({"request": "sub", "schema": ws.test.schema, "stream":  
ws.test.stream}))
+
+
+def on_message(ws, message):
+    resp = json.loads(message)
+    if resp['response'] == 'error':
+        ws.test.fail(msg=resp['message'])
+    elif ws.ws_state == 1:
+        ws.test.assertDictEqual({"response": "subscribed", "schema": 
ws.test.schema, "stream": ws.test.stream}, resp)
+        ws.ws_state = 2
+    elif ws.ws_state == 2:
+        ws.test.assertDictEqual({"response": "notification", "schema": 
ws.test.schema, "stream": ws.test.stream,
+                                 "basket": 1, "count": 3}, resp)
+        ws.send(json.dumps({"request": "read", "schema": ws.test.schema, 
"stream": ws.test.stream,
+                            "basket": 1, "offset": 0, "limit": 3}))
+        ws.ws_state = 3
+    elif ws.ws_state == 3:
+        ws.test.assertIn('implicit_timestamp', resp['tuples'][0], 
msg='Timestamp not in stream')
+        del resp['tuples'][0]['implicit_timestamp']
+        del resp['tuples'][1]['implicit_timestamp']
+        del resp['tuples'][2]['implicit_timestamp']
+        res_dic = {'vala': None, "valb": None, "valc": None, "vald": None, 
"vale": None, "valf": None, "valg": None,
+                   "valh": None, "vali": None, "valj": None, "valk": None, 
"vall": None, "valm": None, "valn": None,
+                   "valo": None, "valp": None, "valq": None, "valr": None, 
"vals": None, "valt": None, "valu": None,
+                   "valv": None, "valw": None, "valx": None, "valy": None, 
"valz": None}
+        tuples_response = {"response": "read", "schema": ws.test.schema, 
"stream": ws.test.stream,
+                           "count": 3, "tuples": [res_dic, res_dic, res_dic]}
+        ws.test.assertDictEqual(tuples_response, resp)
+        ws.close()
+    elif ws.ws_state == 4:
+        ws.test.assertDictEqual({"response": "removed", "schema": 
ws.test.schema, "stream": ws.test.stream}, resp)
+        ws.close()
+
+
+def on_error(ws, error):
+    ws.close()
+    ws.test.fail(msg=error)
+
+
+def web_socket(test):
+    ws = TestWebSocket(test=test, url="ws://127.0.0.1:8002/", 
on_message=on_message, on_open=on_open, on_error=on_error)
+    test.ws = ws
+    ws.run_forever()
+
+
+class NullablesTest(BaseFrontEndTest):
+
+    def __init__(self, **kwargs):
+        super(NullablesTest, self).__init__(**kwargs)
+        self.stream = "nulls"
+        self.ws = None
+
+    def runTest(self):
+        json_str = {"schema": self.schema, "stream": self.stream, 
"has_hostname": False, "flushing": {"base": "auto"},
+                    "columns": [{"name": "vala", "type": "string", "nullable": 
True},
+                                {"name": "valb", "type": "uuid", "nullable": 
True},
+                                {"name": "valc", "type": "mac", "nullable": 
True},
+                                {"name": "vald", "type": "url", "nullable": 
True},
+                                {"name": "vale", "type": "inet", "nullable": 
True},
+                                {"name": "valf", "type": "inet6", "nullable": 
True},
+                                {"name": "valg", "type": "regex", "regex": 
"a", "nullable": True},
+                                {"name": "valh", "type": "varchar", "limit": 
16, "nullable": True},
+                                {"name": "vali", "type": "enum", "values": 
["a", "b", "c"], "nullable": True},
+                                {"name": "valj", "type": "boolean", 
"nullable": True},
+                                {"name": "valk", "type": "tinyint", 
"nullable": True},
+                                {"name": "vall", "type": "smallint", 
"nullable": True},
+                                {"name": "valm", "type": "int", "nullable": 
True},
+                                {"name": "valn", "type": "bigint", "nullable": 
True},
+                                {"name": "valo", "type": "hugeint", 
"nullable": True},
+                                {"name": "valp", "type": "real", "nullable": 
True},
+                                {"name": "valq", "type": "float", "nullable": 
True},
+                                {"name": "valr", "type": "decimal", 
"precision": 12, "scale": 10, "nullable": True},
+                                {"name": "vals", "type": "decimal", 
"precision": 28, "scale": 20, "nullable": True},
+                                {"name": "valt", "type": "date", "nullable": 
True},
+                                {"name": "valu", "type": "time", "nullable": 
True},
+                                {"name": "valv", "type": "time with time 
zone", "nullable": True},
+                                {"name": "valw", "type": "timestamp", 
"nullable": True},
+                                {"name": "valx", "type": "timestamp with time 
zone", "nullable": True},
+                                {"name": "valy", "type": "interval month", 
"nullable": True},
+                                {"name": "valz", "type": "interval second", 
"nullable": True}]}
+
+        resp = requests.post("http://127.0.0.1:8001/context";, json=json_str)
+
+        self.assertEqual(resp.status_code, 201, msg=resp.text)
+
+        sleep(2)  # we need to sleep to check that the next poll happens
+
+        thread = Thread(target=web_socket, args=(self, ))
+        thread.start()
+
+        resp = requests.post("http://127.0.0.1:8000/stream/%s/%s"; % 
(self.schema, self.stream), json=[{}, {}, {}])
+
+        self.assertEqual(resp.status_code, 201, msg=resp.text)
+
+        self.export_inserts("tests", "nulls", "1")
+
+        sleep(2)
+
+        resp = requests.delete("http://127.0.0.1:8001/context";, 
json={"schema": self.schema, "stream": self.stream})
+
+        self.assertEqual(resp.status_code, 204, msg=resp.text)
+
+        thread.join(timeout=WEB_SOCKETS_THREADS_TIMEOUT)
+        if thread.isAlive():
+            self.ws.close()
+            self.fail(msg='The websockets tests timed out!')
diff --git a/clients/iotapi/tests/main.py b/clients/iotapi/tests/main.py
new file mode 100644
--- /dev/null
+++ b/clients/iotapi/tests/main.py
@@ -0,0 +1,95 @@
+import argparse
+import getpass
+import os
+import shutil
+import subprocess
+import signal
+import sys
+import time
+
+from unittest import TextTestRunner, TestSuite
+from frontendtests import NullablesTest
+
+
+def check_positive_int(value):
+    ivalue = int(value)
+    if ivalue <= 0:
+        raise argparse.ArgumentTypeError("%s is an invalid positive int value" 
% value)
+    return ivalue
+
+
+def check_path(value):
+    if not os.path.isabs(value):
+        raise argparse.ArgumentTypeError("%s is an invalid path" % value)
+    return value
+
+
+def main():
+    parser = argparse.ArgumentParser(description='IOT Front-End Test', 
add_help=False)
+    parser.add_argument('-n', '--number', type=check_positive_int, nargs='?', 
default=1000,
+                        help='Number of inserts (default: 1000)', 
metavar='NUMBER')
+    parser.add_argument('-f', '--filepath', type=check_path, nargs='?', 
default='/tmp',
+                        help='Temp file location (default: %s)' % '/tmp', 
metavar='FILE_PATH')
+    parser.add_argument('-h', '--host', nargs='?', default='127.0.0.1',
+                        help='MonetDB database host (default: 127.0.0.1)', 
metavar='HOST')
+    parser.add_argument('-p', '--port', type=check_positive_int, nargs='?', 
default=50000,
+                        help='Database listening port (default: 50000)', 
metavar='PORT')
+    parser.add_argument('-d', '--database', nargs='?', default='iotdb', 
help='Database name (default: iotdb)')
+    parser.add_argument('-u', '--user', nargs='?', default='monetdb', 
help='Database user (default: monetdb)')
+    parser.add_argument('-?', '--help', action='store_true', help='Display 
this help')
+
+    try:
+        args = vars(parser.parse_args())
+    except BaseException as ex:
+        print ex
+        sys.exit(1)
+
+    if args['help']:
+        parser.print_help()
+        sys.exit(0)
+
+    test_dir = os.path.join(args['filepath'], 'test_dir')
+    shutil.rmtree(test_dir, ignore_errors=True)
+    iot_client_log = os.path.join(test_dir, 'iotclient.log')
+    iot_api_log = os.path.join(test_dir, 'iotapi.log')
+
+    iot_client_path = os.path.join(test_dir, 'iotclient')
+    if not os.path.exists(iot_client_path):
+        os.makedirs(iot_client_path)
+    iot_api_path = os.path.join(test_dir, 'iotapi')
+    if not os.path.exists(iot_api_path):
+        os.makedirs(iot_api_path)
+
+    con_pass = getpass.getpass(prompt='Insert password for user ' + 
args['user'] + ':')
+    other_arguments = ["-h", args['host'], "-p", str(args['port']), "-d", 
args['database'], "-po", "1"]
+
+    head, _ = os.path.split(os.path.dirname(os.path.abspath(__file__)))  # get 
the iotapi path
+    iot_client_exec_dir = os.path.join(os.path.split(head)[0], "iotclient", 
"src", "main.py")
+    iot_api_exec_dir = os.path.join(head, "src", "main.py")
+
+    iot_client = subprocess.Popen([sys.executable, iot_client_exec_dir, "-f", 
iot_client_path,
+                                   "-l", iot_client_log] + other_arguments, 
stdin=subprocess.PIPE)
+    iot_api = subprocess.Popen([sys.executable, iot_api_exec_dir, "-f", 
iot_api_path,
+                                "-l", iot_api_log] + other_arguments, 
stdin=subprocess.PIPE)
+    iot_client.stdin.write(con_pass + os.linesep)
+    iot_client.stdin.flush()
+    iot_api.stdin.write(con_pass + os.linesep)
+    iot_api.stdin.flush()
+
+    time.sleep(5)
+
+    if iot_client.returncode is None and iot_api.returncode is None:
+        
TextTestRunner(verbosity=2).run(TestSuite(tests=[NullablesTest(iot_client_path=iot_client_path,
+                                                                       
iot_api_path=iot_api_path)]))
+    else:
+        print 'Processes finished', iot_client.returncode, iot_api.returncode
+        shutil.rmtree(test_dir, ignore_errors=True)
+        sys.exit(1)
+
+    iot_client.send_signal(signal.SIGINT)
+    iot_api.send_signal(signal.SIGINT)
+    time.sleep(1)
+    shutil.rmtree(test_dir, ignore_errors=True)
+
+if __name__ == '__main__':
+    main()
diff --git a/clients/iotclient/src/Streams/jsonschemas.py 
b/clients/iotclient/src/Streams/jsonschemas.py
--- a/clients/iotclient/src/Streams/jsonschemas.py
+++ b/clients/iotclient/src/Streams/jsonschemas.py
@@ -93,7 +93,7 @@ def init_create_streams_schema(add_hugei
                         "base": {"type": "string", "enum": 
[TUPLE_FLUSH_IDENTIFIER]},
                         "interval": {"type": "integer", "minimum": 1}
                     },
-                    "required": ["base", "number"],
+                    "required": ["base", "interval"],
                     "additionalProperties": False
                 }, {
                     "properties": {
diff --git a/clients/iotclient/src/Streams/streams.py 
b/clients/iotclient/src/Streams/streams.py
--- a/clients/iotclient/src/Streams/streams.py
+++ b/clients/iotclient/src/Streams/streams.py
@@ -64,7 +64,7 @@ class BaseIOTStream:
             if dirs:
                 for elem in dirs:  # for each directory found, flush it
                     dir_path = os.path.join(self._base_path, str(elem))
-                    mapi_flush_baskets(self._connection, self._schema_name, 
self._stream_name, dir_path)
+                    # mapi_flush_baskets(self._connection, self._schema_name, 
self._stream_name, dir_path)
                 self._baskets_counter = max(dirs) + 1  # increment current 
basket number
             else:
                 self._baskets_counter = 1
@@ -124,8 +124,8 @@ class BaseIOTStream:
         add_log(20, 'Stopped stream %s.%s' % (self._schema_name, 
self._stream_name))
 
     @abstractmethod
-    def get_flushing_dictionary(self):  # for information about the stream
-        return {}
+    def get_flushing_dictionary(self, number_tuples):  # for information about 
the stream
+        return ()
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to