Changeset: b53bfefff4b3 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=b53bfefff4b3
Modified Files:
        clients/iotapi/src/Streams/streampolling.py
        clients/iotapi/src/Streams/streams.py
        clients/iotapi/src/WebSockets/websockets.py
        clients/iotclient/src/Streams/datatypes.py
        clients/iotclient/src/Streams/streamscreator.py
Branch: iot
Log Message:

Corrected basket searching


diffs (243 lines):

diff --git a/clients/iotapi/src/Streams/streampolling.py 
b/clients/iotapi/src/Streams/streampolling.py
--- a/clients/iotapi/src/Streams/streampolling.py
+++ b/clients/iotapi/src/Streams/streampolling.py
@@ -5,15 +5,17 @@ from Utilities.customthreading import Pe
 
 from streamscontext import Streams_context, DataCellStream
 
-SWITCHER = [{'types': ['clob', 'char', 'varchar'], 'class': 'TextType'},
-            {'types': ['boolean'], 'class': 'BooleanType'},
+SWITCHER = [{'types': ['clob', 'char', 'varchar', 'url'], 'class': 'TextType'},
             {'types': ['tinyint', 'smallint', 'int', 'bigint'], 'class': 
'SmallIntegerType'},
             {'types': ['hugeint'], 'class': 'HugeIntegerType'},
             {'types': ['real', 'double'], 'class': 'FloatType'},
             {'types': ['decimal'], 'class': 'DecimalType'},
+            {'types': ['boolean'], 'class': 'BooleanType'},
             {'types': ['date'], 'class': 'DateType'},
             {'types': ['time'], 'class': 'TimeType'},
-            {'types': ['timestamp'], 'class': 'TimestampType'}]
+            {'types': ['timestamp'], 'class': 'TimestampType'},
+            {'types': ['inet'], 'class': 'INetType'},
+            {'types': ['uuid'], 'class': 'UUIDType'}]
 
 
 def init_stream_polling_thread(interval):
diff --git a/clients/iotapi/src/Streams/streams.py 
b/clients/iotapi/src/Streams/streams.py
--- a/clients/iotapi/src/Streams/streams.py
+++ b/clients/iotapi/src/Streams/streams.py
@@ -2,12 +2,14 @@ import struct
 
 import os
 from Settings.filesystem import get_baskets_base_location
+from Utilities.readwritelock import RWLock
 from WebSockets.websockets import notify_clients
 from watchdog.events import FileSystemEventHandler
 from watchdog.observers import Observer
 
 BASKETS_COUNT_FILE = 'count'
 
+
 def represents_int(s):
     try:
         int(s)
@@ -26,56 +28,95 @@ class StreamBasketsHandler(FileSystemEve
     def on_created(self, event):  # whenever a basket directory is created, 
notify to subscribed clients
         if isinstance(event, 'DirCreatedEvent'):
             basket_string = os.path.basename(os.path.normpath(event.src_path))
-            self._stream.baskets.append_basket(basket_string)
-            notify_clients(self._stream.schema_name, self._stream.stream_name)
+            self._stream.append_basket(basket_string)
+            notify_clients(self._stream.get_schema_name(), 
self._stream.get_stream_name())
 
+    def on_deleted(self, event):
+        if isinstance(event, 'DirDeletedEvent'):
+            basket_string = os.path.basename(os.path.normpath(event.src_path))
+            self._stream.delete_basket(basket_string)
 
-class DataCellStream(object):
+
+class IOTStream(object):
     """Representation of a stream"""
 
     def __init__(self, schema_name, stream_name, columns):
-        self.schema_name = schema_name  # name of the schema
-        self.stream_name = stream_name  # name of the stream
+        self._schema_name = schema_name  # name of the schema
+        self._stream_name = stream_name  # name of the stream
         self._columns = columns  # dictionary of name -> data_types
         self._base_path = os.path.join(get_baskets_base_location(), 
schema_name, stream_name)
-        self.baskets = {}  # dictionary of basket_number -> total_tuples
+        self._baskets = {}  # dictionary of basket_number -> total_tuples
         for name in os.listdir(self._base_path):
             self.append_basket(name)
+        self._lock = RWLock()
         self._observer = Observer()
         self._observer.schedule(StreamBasketsHandler(stream=self), 
self._base_path, recursive=False)
         self._observer.start()
 
+    def get_schema_name(self):
+        return self._schema_name
+
+    def get_stream_name(self):
+        return self._stream_name
+
     def append_basket(self, path):
         if represents_int(path):
             with open(os.path.join(self._base_path, path)) as f:
                 count = struct.unpack('i', f.read(4))[0]
-                self.baskets[int(path)] = count
+                self._lock.acquire_write()
+                self._baskets[int(path)] = count
+                self._lock.release()
 
-    # TODO add delete basket!!!!
+    def delete_basket(self, path):
+        if represents_int(path):
+            number = int(path)
+            self._lock.acquire_write()
+            if number in self._baskets:
+                del self._baskets[number]
+            self._lock.release()
+
+    def get_next_basket_number_tuple(self, basket_number):
+        self._lock.acquire_read()
+        if basket_number in self._baskets:
+            self._lock.release()
+            return basket_number, self._baskets[basket_number]
+        else:
+            filtered = filter(lambda x: x > basket_number, 
self._baskets.keys())
+            if len(filtered) > 0:
+                min_basket_number = min(filtered)
+                min_basket_tuples = self._baskets[min_basket_number]
+                self._lock.release()
+                return min_basket_number, min_basket_tuples
+            else:
+                self._lock.release()
+                return None, None
 
     def read_tuples(self, basket_number, limit, offset):
         results = {column: [] for column in self._columns.keys()}
-        current_basket = int(basket_number)
+        current_basket_number = int(basket_number)
         read_tuples = 0
+        skipped_tuples = 0
         finished = False
 
         while True:
-            if current_basket not in self.baskets:
+            current_basket_number, current_tuple_number = 
self.get_next_basket_number_tuple(current_basket_number)
+            if current_basket_number is None:
                 finished = True
                 break
-            offset -= self.baskets[current_basket]
-            if offset < 0:
+            if skipped_tuples + current_tuple_number > offset:
+                offset = offset - skipped_tuples
                 break
-            current_basket += 1
+            skipped_tuples += current_tuple_number
+            current_basket_number += 1
 
         if not finished:
-            offset = abs(offset)
+            while True:
+                current_basket_number, current_tuple_number = 
self.get_next_basket_number_tuple(current_basket_number)
+                if current_basket_number is None or read_tuples >= limit:
+                    break
 
-            while True:
-                if current_basket not in self.baskets:
-                    break
-                next_path = os.path.join(self._base_path, str(current_basket))
-                next_read_size = min(self.baskets[current_basket], limit)
+                next_path = os.path.join(self._base_path, 
str(current_basket_number))
+                next_read_size = min(limit - read_tuples, 
current_tuple_number) - offset
 
                 for key, column in self._columns.iteritems():
                     next_file_name = os.path.join(next_path, key)
@@ -85,12 +126,11 @@ class DataCellStream(object):
                     file_pointer = open(next_file_name, open_string)
                     results[key].append(column.read_next_batch(file_pointer, 
offset, next_read_size))
 
+                read_tuples += next_read_size
                 offset = 0
-                current_basket += 1
-                read_tuples += next_read_size
-                limit -= self.baskets[current_basket]
-                if limit <= 0:
-                    break
+                current_basket_number += 1
 
         # TODO check if this is viable, it could be 1000 tuples!!!!
-        return {'total': read_tuples, 'tuples': zip(*results)}  # TODO not 
done this way!!!
+        keys = results.keys()
+        tuples = [dict(zip(keys, values)) for values in zip(*(results[k] for k 
in keys))]
+        return {'total': read_tuples, 'tuples': tuples}
diff --git a/clients/iotapi/src/WebSockets/websockets.py 
b/clients/iotapi/src/WebSockets/websockets.py
--- a/clients/iotapi/src/WebSockets/websockets.py
+++ b/clients/iotapi/src/WebSockets/websockets.py
@@ -37,7 +37,7 @@ def desubscribe_removed_streams(concaten
 class IOTAPI(WebSocket):
     def __init__(self):
         super(IOTAPI, self).__init__()
-        self._subscriptions = {}
+        self._subscriptions = {}  # dictionary of schema + '.' + stream -> 
IOTStream
         self._locker = RWLock()
 
     def handleMessage(self):
diff --git a/clients/iotclient/src/Streams/datatypes.py 
b/clients/iotclient/src/Streams/datatypes.py
--- a/clients/iotclient/src/Streams/datatypes.py
+++ b/clients/iotclient/src/Streams/datatypes.py
@@ -259,14 +259,14 @@ class EnumType(TextType):
         return ''.join(array)
 
 
-class INetSix(TextType):
+class INetSixType(TextType):
     """Covers: Inet6"""
 
     def __init__(self, **kwargs):
-        super(INetSix, self).__init__(**kwargs)
+        super(INetSixType, self).__init__(**kwargs)
 
     def add_json_schema_entry(self, schema):
-        super(INetSix, self).add_json_schema_entry(schema)
+        super(INetSixType, self).add_json_schema_entry(schema)
         schema[self._column_name]['format'] = 'ipv6'
 
     # 
http://stackoverflow.com/questions/166132/maximum-length-of-the-textual-representation-of-an-ipv6-address
@@ -274,17 +274,17 @@ class INetSix(TextType):
         array[2] = 'char(45)'
 
 
-class INet(StreamDataType):
+class INetType(StreamDataType):
     """Covers: Inet"""
 
     def __init__(self, **kwargs):
-        super(INet, self).__init__(**kwargs)
+        super(INetType, self).__init__(**kwargs)
 
     def get_nullable_constant(self):
         return "0"  # has to trick because it is impossible to get a null 
value from a valid IPv4 address in MonetDB
 
     def add_json_schema_entry(self, schema):
-        super(INet, self).add_json_schema_entry(schema)
+        super(INetType, self).add_json_schema_entry(schema)
         schema[self._column_name]['pattern'] = IPV4_REGEX
 
     def process_next_value(self, entry, counter, parameters, errors):
diff --git a/clients/iotclient/src/Streams/streamscreator.py 
b/clients/iotclient/src/Streams/streamscreator.py
--- a/clients/iotclient/src/Streams/streamscreator.py
+++ b/clients/iotclient/src/Streams/streamscreator.py
@@ -15,8 +15,8 @@ SWITCHER = [{'types': ['text', 'string',
             {'types': ['uuid'], 'class': 'UUIDType'},
             {'types': ['mac'], 'class': 'MACType'},
             {'types': ['url'], 'class': 'URLType'},
-            {'types': ['inet'], 'class': 'INet'},
-            {'types': ['inet6'], 'class': 'INetSix'},
+            {'types': ['inet'], 'class': 'INetType'},
+            {'types': ['inet6'], 'class': 'INetSixType'},
             {'types': ['regex'], 'class': 'RegexType'},
             {'types': ['char', 'character', 'varchar', 'character varying'], 
'class': 'LimitedTextType'},
             {'types': ['enum'], 'class': 'EnumType'},
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to