Changeset: ee944c8e9c97 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ee944c8e9c97
Removed Files:
        clients/iotclient/src/Streams/flushing.py
Modified Files:
        clients/iotapi/src/Settings/mapiconnection.py
        clients/iotapi/src/Streams/streampolling.py
        clients/iotapi/src/Streams/streams.py
        clients/iotapi/src/Streams/streamscontext.py
        clients/iotapi/src/WebSockets/jsonschemas.py
        clients/iotclient/src/Settings/filesystem.py
        clients/iotclient/src/Settings/mapiconnection.py
        clients/iotclient/src/Streams/datatypes.py
        clients/iotclient/src/Streams/streams.py
        clients/iotclient/src/Streams/streamscreator.py
        clients/iotclient/src/Utilities/filecreator.py
Branch: iot
Log Message:

Cleaned code


diffs (truncated from 578 to 300 lines):

diff --git a/clients/iotapi/src/Settings/mapiconnection.py 
b/clients/iotapi/src/Settings/mapiconnection.py
--- a/clients/iotapi/src/Settings/mapiconnection.py
+++ b/clients/iotapi/src/Settings/mapiconnection.py
@@ -12,9 +12,6 @@ def init_monetdb_connection(hostname, po
 
     user_password = getpass.getpass(prompt='Insert password for user ' + 
user_name + ':')
 
-    if user_password == '':
-        user_password = 'monetdb'
-
     try:  # the autocommit is set to true so each statement will be independent
         Connection = pymonetdb.connect(hostname=hostname, port=port, 
username=user_name, password=user_password,
                                        database=database, autocommit=True)
@@ -35,15 +32,12 @@ def close_monetdb_connection():
 def fetch_streams():
     try:  # TODO paginate results?
         cursor = Connection.cursor()
-        sql_string = """
-          SELECT storage."schema", storage."table", storage."column", 
storage."type", storage."location",
-          storage."typewidth"
+        sql_string = """SELECT storage."schema", storage."table", 
storage."column", storage."type", storage."typewidth"
           FROM (SELECT "schema", "table", "column", "type" FROM sys.storage) 
AS storage
           INNER JOIN (SELECT "name" FROM sys.tables WHERE type=4) AS tables ON 
(storage."table"=tables."name")
-          INNER JOIN (SELECT "name" FROM sys.schemas) AS schemas ON 
(storage."schema"=schemas."name");
-        """.replace('\n', ' ')
+          INNER JOIN (SELECT "name" FROM sys.schemas) AS schemas ON 
(storage."schema"=schemas."name");"""\
+            .replace('\n', ' ')
         cursor.execute(sql_string)
         return cursor.fetchall()
     except BaseException as ex:
-        print >> sys.stdout, ex
         add_log(50, ex)
diff --git a/clients/iotapi/src/Streams/streampolling.py 
b/clients/iotapi/src/Streams/streampolling.py
--- a/clients/iotapi/src/Streams/streampolling.py
+++ b/clients/iotapi/src/Streams/streampolling.py
@@ -2,8 +2,9 @@ from itertools import groupby
 
 from Settings.mapiconnection import fetch_streams
 from Utilities.customthreading import PeriodicalThread
-
-from streamscontext import Streams_context, DataCellStream
+from datatypes import *
+from streams import IOTStream
+from streamscontext import Streams_context
 
 SWITCHER = [{'types': ['clob', 'char', 'varchar', 'url'], 'class': 'TextType'},
             {'types': ['tinyint', 'smallint', 'int', 'bigint'], 'class': 
'SmallIntegerType'},
@@ -23,7 +24,7 @@ def init_stream_polling_thread(interval)
     thread.start()
 
 
-# elem[0] is schema. elem[1] is name, elem[2] is column name, elem[3] is type, 
elem[4] is location, elem[5] is typewidth
+# elem[0] is schema. elem[1] is name, elem[2] is column name, elem[3] is type, 
elem[4] is typewidth
 def stream_polling():
     current_streams = Streams_context.get_existing_streams()
     retained_streams = []
@@ -35,11 +36,13 @@ def stream_polling():
             columns = {}
 
             for elem in group:
-                reflection_class = globals()[elem[3]]  # import everything 
from datatypes!!!
-                kwargs = {'name': elem[2], 'type': elem[3], 'location': 
elem[4], 'typewidth': elem[5]}
-                new_column = reflection_class(kwargs)
-                columns[elem[2]] = new_column
-                new_streams[key] = DataCellStream(key, columns)
+                for entry in SWITCHER:  # allocate the proper type wrapper
+                    if elem[3] in entry['types']:
+                        reflection_class = globals()[entry['class']]  # import 
everything from datatypes!!!
+                        new_column = reflection_class({'name': elem[2], 
'type': elem[3], 'typewidth': elem[4]})
+                        columns[elem[2]] = new_column
+                        new_streams[key] = IOTStream(key, columns)
+                    break
         else:
             retained_streams.append(key)
 
diff --git a/clients/iotapi/src/Streams/streams.py 
b/clients/iotapi/src/Streams/streams.py
--- a/clients/iotapi/src/Streams/streams.py
+++ b/clients/iotapi/src/Streams/streams.py
@@ -1,6 +1,7 @@
 import os
 import struct
 
+from datatypes import LITTLE_ENDIAN_ALIGNMENT
 from Settings.filesystem import get_baskets_base_location
 from Utilities.readwritelock import RWLock
 from WebSockets.websockets import notify_clients
@@ -63,7 +64,7 @@ class IOTStream(object):
     def append_basket(self, path):
         if represents_int(path):
             with open(os.path.join(self._base_path, path)) as f:
-                count = struct.unpack('i', f.read(4))[0]
+                count = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + '1i', 
f.read(4))[0]
                 self._lock.acquire_write()
                 self._baskets[int(path)] = count
                 self._lock.release()
diff --git a/clients/iotapi/src/Streams/streamscontext.py 
b/clients/iotapi/src/Streams/streamscontext.py
--- a/clients/iotapi/src/Streams/streamscontext.py
+++ b/clients/iotapi/src/Streams/streamscontext.py
@@ -1,7 +1,5 @@
-import collections
-
 from Utilities.readwritelock import RWLock
-from WebSockets.websockets import desubscribe_removed_streams
+from WebSockets.websockets import unsubscribe_removed_streams
 
 
 class IOTStreams(object):
@@ -12,7 +10,7 @@ class IOTStreams(object):
         return schema_name + '.' + stream_name
 
     def __init__(self):
-        self._context = collections.OrderedDict()  # dictionary of schema_name 
+ '.' + stream_name -> DataCellStream
+        self._context = {}  # dictionary of schema_name + '.' + stream_name -> 
DataCellStream
         self._locker = RWLock()
 
     def get_existing_streams(self):
@@ -28,7 +26,7 @@ class IOTStreams(object):
             del self._context[k]
         self._context.update(new_streams)
         self._locker.release()
-        desubscribe_removed_streams(removed_streams)
+        unsubscribe_removed_streams(removed_streams)
 
     def get_existing_stream(self, concatenated_name):
         self._locker.acquire_read()
diff --git a/clients/iotapi/src/WebSockets/jsonschemas.py 
b/clients/iotapi/src/WebSockets/jsonschemas.py
--- a/clients/iotapi/src/WebSockets/jsonschemas.py
+++ b/clients/iotapi/src/WebSockets/jsonschemas.py
@@ -19,17 +19,11 @@ CLIENTS_INPUTS_SCHEMA = {
         "additionalProperties": False
     }, {
         "properties": {
-            "action": {"type": "string", "enum": ["info"]}
-        },
-        "required": ["action"],
-        "additionalProperties": False
-    }, {
-        "properties": {
             "schema": {"type": "string"},
             "stream": {"type": "string"},
             "action": {"type": "string", "enum": READ_OPTS},
             "basket": {"type": "integer", "minimum": 1, "default": 1},
-            "limit": {"type": "integer", "minimum": 0, "default": 0},
+            "limit": {"type": "integer", "minimum": 0, "default": 100},
             "offset": {"type": "integer", "minimum": 0, "default": 0}
         },
         "required": ["schema", "stream", "action"],
diff --git a/clients/iotclient/src/Settings/filesystem.py 
b/clients/iotclient/src/Settings/filesystem.py
--- a/clients/iotclient/src/Settings/filesystem.py
+++ b/clients/iotclient/src/Settings/filesystem.py
@@ -32,10 +32,10 @@ def init_file_system(host_identifier=Non
             os.makedirs(Baskets_Location)
 
         if new_configfile_location is not None:
-            Config_File_Location = 
create_file_if_not_exists(new_configfile_location, hidden=False, init_text='[]')
+            Config_File_Location = 
create_file_if_not_exists(new_configfile_location, init_text='[]')
         else:
             Config_File_Location = create_file_if_not_exists(
-                os.path.join(Filesystem_Location, CONFIG_FILE_DEFAULT_NAME), 
hidden=False, init_text='[]')
+                os.path.join(Filesystem_Location, CONFIG_FILE_DEFAULT_NAME), 
init_text='[]')
 
         Host_Identifier = host_identifier
     except (Exception, OSError) as ex:
diff --git a/clients/iotclient/src/Settings/mapiconnection.py 
b/clients/iotclient/src/Settings/mapiconnection.py
--- a/clients/iotclient/src/Settings/mapiconnection.py
+++ b/clients/iotclient/src/Settings/mapiconnection.py
@@ -12,9 +12,6 @@ def init_monetdb_connection(hostname, po
 
     user_password = getpass.getpass(prompt='Insert password for user ' + 
user_name + ':')
 
-    if user_password == '':
-        user_password = 'monetdb'
-
     try:  # the autocommit is set to true so each statement will be independent
         Connection = pymonetdb.connect(hostname=hostname, port=port, 
username=user_name, password=user_password,
                                        database=database, autocommit=True)
diff --git a/clients/iotclient/src/Streams/datatypes.py 
b/clients/iotclient/src/Streams/datatypes.py
--- a/clients/iotclient/src/Streams/datatypes.py
+++ b/clients/iotclient/src/Streams/datatypes.py
@@ -541,7 +541,7 @@ class DecimalType(NumberBaseType):
     def check_value_precision(self, value, text):
         number_digits = int(math.ceil(math.log10(abs(value))))
         if number_digits > self._precision:
-            raise Exception('Too many digits on %s value: %s > %s!' % (text, 
number_digits, self._precision))
+            raise Exception('Too many digits on %s: %s > %s!' % (text, 
number_digits, self._precision))
 
     def add_json_schema_entry(self, schema):
         super(DecimalType, self).add_json_schema_entry(schema)
diff --git a/clients/iotclient/src/Streams/flushing.py 
b/clients/iotclient/src/Streams/flushing.py
deleted file mode 100644
--- a/clients/iotclient/src/Streams/flushing.py
+++ /dev/null
@@ -1,52 +0,0 @@
-from Utilities.customthreading import PeriodicalThread
-from abc import ABCMeta, abstractmethod
-
-
-class StreamFlushingMethod(object):
-    """Base class for flushing"""
-
-    __metaclass__ = ABCMeta
-
-    def __init__(self):
-        pass
-
-    @abstractmethod
-    def get_dictionary_info(self):
-        pass
-
-
-class TimeBasedFlushing(StreamFlushingMethod):
-    """Time based flushing"""
-
-    def __init__(self, interval, time_unit):
-        super(TimeBasedFlushing, self).__init__()
-        self._interval = interval
-        self._time_unit = time_unit
-        self._local_thread = None
-
-    def init_local_thread(self, stream):
-        if self._time_unit == "s":
-            interval = self._interval
-        elif self._time_unit == "m":
-            interval = self._interval * 60
-        else:
-            interval = self._interval * 3600
-        self._local_thread = PeriodicalThread(interval, 
stream.time_based_flush)
-        self._local_thread.start()
-
-    def stop_local_thread(self):
-        self._local_thread.stop()
-
-    def get_dictionary_info(self):
-        return {'base': 'time', 'unit': self._time_unit, 'interval': 
self._interval}
-
-
-class TupleBasedFlushing(StreamFlushingMethod):
-    """Tuple based flushing"""
-
-    def __init__(self, limit):
-        super(TupleBasedFlushing, self).__init__()
-        self.limit = limit
-
-    def get_dictionary_info(self):
-        return {'base': 'tuple', 'number': self.limit}
diff --git a/clients/iotclient/src/Streams/streams.py 
b/clients/iotclient/src/Streams/streams.py
--- a/clients/iotclient/src/Streams/streams.py
+++ b/clients/iotclient/src/Streams/streams.py
@@ -1,14 +1,24 @@
 import os
+import struct
+from abc import ABCMeta, abstractmethod
 from collections import defaultdict, OrderedDict
 
 from Settings.filesystem import get_baskets_base_location, get_host_identifier
 from Settings.iotlogger import add_log
 from Settings.mapiconnection import mapi_create_stream, mapi_flush_baskets
-from Utilities.filecreator import create_file_if_not_exists, 
get_hidden_file_name
+from Utilities.filecreator import create_file_if_not_exists
 from Utilities.readwritelock import RWLock
 
-from datatypes import TimestampType, TextType, DataValidationException
-from flushing import TimeBasedFlushing, TupleBasedFlushing
+from Utilities.customthreading import PeriodicalThread
+from datatypes import TimestampType, TextType, DataValidationException, 
LITTLE_ENDIAN_ALIGNMENT
+
+IMPLICIT_TIMESTAMP_COLUMN_NAME = 'implicit_timestamp'
+Timestamps_Handler = TimestampType(name=IMPLICIT_TIMESTAMP_COLUMN_NAME, 
type="timestamp")  # timestamp
+Extra_columns_SQL = [Timestamps_Handler.create_stream_sql()]  # array for SQL 
creation
+
+HOST_IDENTIFIER_COLUMN_NAME = 'host_identifier'
+Hostname_Bin_Value = None
+BASKETS_COUNT_FILE = 'count'
 
 
 def represents_int(s):
@@ -18,13 +28,6 @@ def represents_int(s):
     except ValueError:
         return False
 
-IMPLICIT_TIMESTAMP_COLUMN_NAME = 'implicit_timestamp'
-Timestamps_Handler = TimestampType(name=IMPLICIT_TIMESTAMP_COLUMN_NAME, 
type="timestamp")  # timestamp
-Extra_columns_SQL = [Timestamps_Handler.create_stream_sql()]  # array for SQL 
creation
-
-HOST_IDENTIFIER_COLUMN_NAME = 'host_identifier'
-Hostname_Bin_Value = None
-
 
 def init_streams_hosts():
     global Hostname_Bin_Value
@@ -43,14 +46,14 @@ class StreamException(Exception):
         self.message = message  # dictionary of column -> list of error 
messages
 
 
-class IOTStream(object):
-    """Representation of the stream for validation"""
+class BaseIOTStream(object):
+    """Representation of a stream for validation"""
+    __metaclass__ = ABCMeta
 
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to