Changeset: 6c4407fe078d for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=6c4407fe078d
Modified Files:
        clients/iotapi/src/Settings/filesystem.py
        clients/iotapi/src/Settings/mapiconnection.py
        clients/iotapi/src/Streams/datatypes.py
        clients/iotapi/src/Streams/streampolling.py
        clients/iotapi/src/Streams/streams.py
        clients/iotapi/src/Streams/streamscontext.py
        clients/iotapi/src/Utilities/customthreading.py
        clients/iotapi/src/Utilities/readwritelock.py
        clients/iotapi/src/WebSockets/websockets.py
        clients/iotapi/src/main.py
        clients/iotclient/src/Flask/app.py
        clients/iotclient/src/Flask/restresources.py
        clients/iotclient/src/Settings/filesystem.py
        clients/iotclient/src/Settings/iotlogger.py
        clients/iotclient/src/Settings/mapiconnection.py
        clients/iotclient/src/Streams/datatypes.py
        clients/iotclient/src/Streams/jsonschemas.py
        clients/iotclient/src/Streams/streampolling.py
        clients/iotclient/src/Streams/streams.py
        clients/iotclient/src/Streams/streamscontext.py
        clients/iotclient/src/Streams/streamscreator.py
        clients/iotclient/src/Utilities/customthreading.py
        clients/iotclient/src/Utilities/readwritelock.py
        clients/iotclient/src/main.py
        clients/iotclient/tests/main.py
Branch: iot
Log Message:

Fixed imports, created unique mapi connections for each stream, fixed small bugs


diffs (truncated from 1302 to 300 lines):

diff --git a/clients/iotapi/src/Settings/filesystem.py 
b/clients/iotapi/src/Settings/filesystem.py
--- a/clients/iotapi/src/Settings/filesystem.py
+++ b/clients/iotapi/src/Settings/filesystem.py
@@ -1,7 +1,7 @@
 import os
 import sys
 
-from iotlogger import add_log
+from .iotlogger import add_log
 
 Baskets_Location = None
 
diff --git a/clients/iotapi/src/Settings/mapiconnection.py 
b/clients/iotapi/src/Settings/mapiconnection.py
--- a/clients/iotapi/src/Settings/mapiconnection.py
+++ b/clients/iotapi/src/Settings/mapiconnection.py
@@ -1,7 +1,7 @@
 import sys
 
-import pymonetdb
-from Settings.iotlogger import add_log
+from pymonetdb import connect
+from ..Settings.iotlogger import add_log
 
 Connection = None
 
@@ -10,9 +10,8 @@ def init_monetdb_connection(hostname, po
     global Connection
 
     try:  # the autocommit is set to true so each statement will be independent
-        Connection = pymonetdb.connect(hostname=hostname, port=port, 
username=user_name, password=user_password,
-                                       database=database, autocommit=True)
-        Connection.execute("SET SCHEMA iot;")
+        Connection = connect(hostname=hostname, port=port, username=user_name, 
password=user_password,
+                             database=database, autocommit=True)
         log_message = 'User %s connected successfully to database %s' % 
(user_name, database)
         print log_message
         add_log(20, log_message)
@@ -26,16 +25,26 @@ def close_monetdb_connection():
     Connection.close()
 
 
+def check_hugeint_type():
+    Connection.execute("START TRANSACTION")
+    cursor = Connection.cursor()
+    cursor.execute("SELECT COUNT(*) FROM sys.types WHERE sqlname='hugeint'")
+    result = cursor.fetchall()[0][0]
+    Connection.commit()
+    return result
+
+
 def fetch_streams():
-    try:  # TODO paginate results?
+    try:
         cursor = Connection.cursor()
         sql_string = """SELECT schemas."name" as schema, tables."name" as 
table, columns."name" as column,
              columns."type", columns."type_digits", columns."type_scale", 
columns."default", columns."null" FROM
              (SELECT "id", "name", "schema_id" FROM sys.tables WHERE type=4) 
AS tables INNER JOIN (SELECT "id", "name"
-             FROM sys.schemas) AS schemas ON (tables."schema_id"=schemas."id") 
INNER JOIN  (SELECT "table_id", "name",
+             FROM sys.schemas) AS schemas ON (tables."schema_id"=schemas."id") 
INNER JOIN (SELECT "table_id", "name",
              "type", "type_digits", "type_scale", "default", "null" FROM 
sys.columns) AS columns ON
-             (columns."table_id"=tables."id");""".replace('\n', ' ')  # 
important STREAM TABLES TYPE is 4
+             (columns."table_id"=tables."id")""".replace('\n', ' ')
         cursor.execute(sql_string)
         return cursor.fetchall()
     except BaseException as ex:
         add_log(50, ex)
+        raise
diff --git a/clients/iotapi/src/Streams/datatypes.py 
b/clients/iotapi/src/Streams/datatypes.py
--- a/clients/iotapi/src/Streams/datatypes.py
+++ b/clients/iotapi/src/Streams/datatypes.py
@@ -1,7 +1,7 @@
 import struct
+
 from abc import ABCMeta, abstractmethod
 from datetime import date, time, datetime
-
 from dateutil.relativedelta import relativedelta
 
 LITTLE_ENDIAN_ALIGNMENT = '<'  # for now is little-endian for Intel CPU's
diff --git a/clients/iotapi/src/Streams/streampolling.py 
b/clients/iotapi/src/Streams/streampolling.py
--- a/clients/iotapi/src/Streams/streampolling.py
+++ b/clients/iotapi/src/Streams/streampolling.py
@@ -1,14 +1,13 @@
 from itertools import groupby
+from .datatypes import *
+from .streams import IOTStream
+from .streamscontext import Streams_Context
 from Settings.mapiconnection import fetch_streams
 from Utilities.customthreading import PeriodicalThread
-from datatypes import *
-from streams import IOTStream
-from streamscontext import Streams_Context
 
-SWITCHER = [{'types': ['clob', 'url'], 'class': 'TextType'},
+Switcher = [{'types': ['clob', 'url'], 'class': 'TextType'},
             {'types': ['char', 'varchar'], 'class': 'LimitedTextType'},
             {'types': ['tinyint', 'smallint', 'int', 'bigint'], 'class': 
'SmallIntegerType'},
-            {'types': ['hugeint'], 'class': 'HugeIntegerType'},
             {'types': ['real', 'double'], 'class': 'FloatType'},
             {'types': ['decimal'], 'class': 'DecimalType'},
             {'types': ['boolean'], 'class': 'BooleanType'},
@@ -19,6 +18,10 @@ SWITCHER = [{'types': ['clob', 'url'], '
             {'types': ['uuid'], 'class': 'UUIDType'}]
 
 
+def polling_add_hugeint_type():
+    Switcher.append({'types': ['hugeint'], 'class': 'HugeIntegerType'})
+
+
 def init_stream_polling_thread(interval):
     thread = PeriodicalThread(interval=interval, worker_func=stream_polling)
     thread.start()
@@ -30,15 +33,15 @@ def stream_polling():
     array = fetch_streams()  # TODO check whenever stream's columns are updated
     retained_streams = []
     new_streams = {}
-    current_streams = Streams_Context.get_existing_streams()
+    current_streams = get_streams_context().get_existing_streams()
 
     if array is not None:
-        for key, group in groupby(array, lambda x: 
Streams_Context.get_context_entry_name(x[0], x[1])):
+        for key, group in groupby(array, lambda x: 
get_streams_context().get_context_entry_name(x[0], x[1])):
             if key not in current_streams:
                 columns = {}
 
                 for elem in group:
-                    for entry in SWITCHER:  # allocate the proper type wrapper
+                    for entry in Switcher:  # allocate the proper type wrapper
                         if elem[3] in entry['types']:
                             reflection_class = globals()[entry['class']]  # 
import everything from datatypes!!!
                             new_column = reflection_class(*elem[2:])
diff --git a/clients/iotapi/src/Streams/streams.py 
b/clients/iotapi/src/Streams/streams.py
--- a/clients/iotapi/src/Streams/streams.py
+++ b/clients/iotapi/src/Streams/streams.py
@@ -1,13 +1,13 @@
 import os
-import struct
 
-from datatypes import LITTLE_ENDIAN_ALIGNMENT
+from collections import OrderedDict
+from struct import unpack
+from watchdog.events import FileSystemEventHandler, DirCreatedEvent, 
DirDeletedEvent
+from watchdog.observers import Observer
+from .datatypes import LITTLE_ENDIAN_ALIGNMENT
 from Settings.filesystem import get_baskets_base_location
 from Utilities.readwritelock import RWLock
 from WebSockets.websockets import notify_stream_inserts_to_clients
-from watchdog.events import FileSystemEventHandler, DirCreatedEvent, 
DirDeletedEvent
-from watchdog.observers import Observer
-from collections import OrderedDict
 
 BASKETS_COUNT_FILE = 'count'
 
@@ -40,7 +40,7 @@ class StreamBasketsHandler(FileSystemEve
             self._stream.delete_basket(basket_string)
 
 
-class IOTStream(object):
+class IOTStream:
     """Representation of a stream"""
 
     def __init__(self, schema_name, stream_name, columns):
@@ -78,7 +78,7 @@ class IOTStream(object):
     def append_basket(self, path):
         if represents_int(path):
             with open(os.path.join(self._base_path, path, BASKETS_COUNT_FILE)) 
as f:
-                count = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + 'i', 
f.read(4))[0]
+                count = unpack(LITTLE_ENDIAN_ALIGNMENT + 'i', f.read(4))[0]
                 self._baskets_lock.acquire_write()
                 self._baskets[int(path)] = count
                 self._baskets_lock.release()
diff --git a/clients/iotapi/src/Streams/streamscontext.py 
b/clients/iotapi/src/Streams/streamscontext.py
--- a/clients/iotapi/src/Streams/streamscontext.py
+++ b/clients/iotapi/src/Streams/streamscontext.py
@@ -2,7 +2,7 @@ from Utilities.readwritelock import RWLo
 from WebSockets.websockets import unsubscribe_removed_streams
 
 
-class IOTStreams(object):
+class IOTStreams:
     """Streams context"""
 
     @classmethod
diff --git a/clients/iotapi/src/Utilities/customthreading.py 
b/clients/iotapi/src/Utilities/customthreading.py
--- a/clients/iotapi/src/Utilities/customthreading.py
+++ b/clients/iotapi/src/Utilities/customthreading.py
@@ -1,4 +1,4 @@
-import time
+from time import sleep
 from threading import Thread, Event
 
 
@@ -28,4 +28,4 @@ class PeriodicalThread(StoppableThread):
     def run(self):
         while not self.stop_event.is_set():
             self._worker_func()
-            time.sleep(self._interval)
+            sleep(self._interval)
diff --git a/clients/iotapi/src/Utilities/readwritelock.py 
b/clients/iotapi/src/Utilities/readwritelock.py
--- a/clients/iotapi/src/Utilities/readwritelock.py
+++ b/clients/iotapi/src/Utilities/readwritelock.py
@@ -1,11 +1,9 @@
 import threading
 
 
-#  Adapted from https://majid.info/blog/a-reader-writer-lock-for-python/
-
-
-class RWLock:
-    """A simple reader-writer lock Several readers can hold the lock 
simultaneously, XOR one writer. Write locks have priority over reads to prevent 
write starvation."""
+class RWLock:  # Adapted from 
https://majid.info/blog/a-reader-writer-lock-for-python/
+    """A simple reader-writer lock Several readers can hold the lock 
simultaneously, XOR one writer.
+    Write locks have priority over reads to prevent write starvation."""
 
     def __init__(self):
         self.rwlock = 0
@@ -33,7 +31,7 @@ class RWLock:
         self.monitor.release()
 
     def promote(self):
-        """Promote an already-acquired read lock to a write lock WARNING: it 
is very easy to deadlock with this method"""
+        """Promote an already-acquired read lock to a write lock WARNING:it is 
very easy to deadlock with this method"""
         self.monitor.acquire()
         self.rwlock -= 1
         while self.rwlock != 0:
diff --git a/clients/iotapi/src/WebSockets/websockets.py 
b/clients/iotapi/src/WebSockets/websockets.py
--- a/clients/iotapi/src/WebSockets/websockets.py
+++ b/clients/iotapi/src/WebSockets/websockets.py
@@ -1,12 +1,11 @@
-import json
 import sys
 
+from json import loads, dumps
+from jsonschema import Draft4Validator, FormatChecker
+from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket
+from .jsonschemas import CLIENTS_INPUTS_SCHEMA, SUBSCRIBE_OPTS, 
UNSUBSCRIBE_OPTS, READ_OPTS, INFO_OPTS
 from Settings.iotlogger import add_log
-from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket
 from Utilities.readwritelock import RWLock
-from jsonschema import Draft4Validator, FormatChecker
-
-from jsonschemas import CLIENTS_INPUTS_SCHEMA, SUBSCRIBE_OPTS, 
UNSUBSCRIBE_OPTS, READ_OPTS, INFO_OPTS
 
 Client_Messages_Validator = Draft4Validator(CLIENTS_INPUTS_SCHEMA, 
format_checker=FormatChecker())
 WebSocketServer = None
@@ -23,7 +22,7 @@ def unsubscribe_removed_streams(concaten
     for name in concatenated_names:
         add_log(20, ''.join(['Stream ', name, ' removed']))
 
-from Streams.streamscontext import Streams_Context, IOTStreams  # avoid 
circular dependency
+from ..Streams.streamscontext import Streams_Context, IOTStreams  # avoid 
circular dependency
 
 
 def notify_stream_inserts_to_clients(schema_name, stream_name, basket_number, 
count):
@@ -43,7 +42,7 @@ class IOTAPI(WebSocket):
 
     def sendJSONMessage(self, response, message):  # IMPORTANT always use this 
method to send messages to clients!!!!!
         message['response'] = response
-        super(IOTAPI, self).sendMessage(json.dumps(message))  # send JSON 
Strings to clients
+        super(IOTAPI, self).sendMessage(dumps(message))  # send JSON Strings 
to clients
 
     def handleConnected(self):  # overriden
         WebClientsLock.acquire_write()
@@ -61,7 +60,7 @@ class IOTAPI(WebSocket):
         if self.opcode != 0x1:  # TEXT frame
             self.sendJSONMessage(response="error", message={"message": "Only 
TEXT frames allowed!"})
         try:
-            input_schema = json.loads(self.data)
+            input_schema = loads(self.data)
             Client_Messages_Validator.validate(input_schema)
 
             if input_schema['request'] in SUBSCRIBE_OPTS:
diff --git a/clients/iotapi/src/main.py b/clients/iotapi/src/main.py
--- a/clients/iotapi/src/main.py
+++ b/clients/iotapi/src/main.py
@@ -9,7 +9,8 @@ from multiprocessing import Process
 from threading import Thread
 from Settings.filesystem import init_file_system, DEFAULT_FILESYSTEM
 from Settings.iotlogger import init_logging, add_log, DEFAULT_LOGGING
-from Settings.mapiconnection import init_monetdb_connection
+from Settings.mapiconnection import init_monetdb_connection, check_hugeint_type
+from Streams.streampolling import polling_add_hugeint_type
 from Streams.streampolling import init_stream_polling_thread
 from WebSockets.websockets import init_websockets
 
@@ -27,6 +28,10 @@ def start_process(polling_interval, file
     init_file_system(filesystem_location)  # init filesystem
     # init mapi connection
     init_monetdb_connection(connection_hostname, con_port, con_user, 
con_password, con_database)
+
+    if check_hugeint_type():
+        polling_add_hugeint_type()
+
     init_stream_polling_thread(polling_interval)  # start polling
 
     thread1 = Thread(target=init_websockets, args=(sockets_host, sockets_port))
diff --git a/clients/iotclient/src/Flask/app.py 
b/clients/iotclient/src/Flask/app.py
--- a/clients/iotclient/src/Flask/app.py
+++ b/clients/iotclient/src/Flask/app.py
@@ -1,6 +1,6 @@
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to