Changeset: b53bfefff4b3 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=b53bfefff4b3 Modified Files: clients/iotapi/src/Streams/streampolling.py clients/iotapi/src/Streams/streams.py clients/iotapi/src/WebSockets/websockets.py clients/iotclient/src/Streams/datatypes.py clients/iotclient/src/Streams/streamscreator.py Branch: iot Log Message:
Corrected basket searching diffs (243 lines): diff --git a/clients/iotapi/src/Streams/streampolling.py b/clients/iotapi/src/Streams/streampolling.py --- a/clients/iotapi/src/Streams/streampolling.py +++ b/clients/iotapi/src/Streams/streampolling.py @@ -5,15 +5,17 @@ from Utilities.customthreading import Pe from streamscontext import Streams_context, DataCellStream -SWITCHER = [{'types': ['clob', 'char', 'varchar'], 'class': 'TextType'}, - {'types': ['boolean'], 'class': 'BooleanType'}, +SWITCHER = [{'types': ['clob', 'char', 'varchar', 'url'], 'class': 'TextType'}, {'types': ['tinyint', 'smallint', 'int', 'bigint'], 'class': 'SmallIntegerType'}, {'types': ['hugeint'], 'class': 'HugeIntegerType'}, {'types': ['real', 'double'], 'class': 'FloatType'}, {'types': ['decimal'], 'class': 'DecimalType'}, + {'types': ['boolean'], 'class': 'BooleanType'}, {'types': ['date'], 'class': 'DateType'}, {'types': ['time'], 'class': 'TimeType'}, - {'types': ['timestamp'], 'class': 'TimestampType'}] + {'types': ['timestamp'], 'class': 'TimestampType'}, + {'types': ['inet'], 'class': 'INetType'}, + {'types': ['uuid'], 'class': 'UUIDType'}] def init_stream_polling_thread(interval): diff --git a/clients/iotapi/src/Streams/streams.py b/clients/iotapi/src/Streams/streams.py --- a/clients/iotapi/src/Streams/streams.py +++ b/clients/iotapi/src/Streams/streams.py @@ -2,12 +2,14 @@ import struct import os from Settings.filesystem import get_baskets_base_location +from Utilities.readwritelock import RWLock from WebSockets.websockets import notify_clients from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer BASKETS_COUNT_FILE = 'count' + def represents_int(s): try: int(s) @@ -26,56 +28,95 @@ class StreamBasketsHandler(FileSystemEve def on_created(self, event): # whenever a basket directory is created, notify to subscribed clients if isinstance(event, 'DirCreatedEvent'): basket_string = os.path.basename(os.path.normpath(event.src_path)) - self._stream.baskets.append_basket(basket_string) - notify_clients(self._stream.schema_name, self._stream.stream_name) + self._stream.append_basket(basket_string) + notify_clients(self._stream.get_schema_name(), self._stream.get_stream_name()) + def on_deleted(self, event): + if isinstance(event, 'DirDeletedEvent'): + basket_string = os.path.basename(os.path.normpath(event.src_path)) + self._stream.delete_basket(basket_string) -class DataCellStream(object): + +class IOTStream(object): """Representation of a stream""" def __init__(self, schema_name, stream_name, columns): - self.schema_name = schema_name # name of the schema - self.stream_name = stream_name # name of the stream + self._schema_name = schema_name # name of the schema + self._stream_name = stream_name # name of the stream self._columns = columns # dictionary of name -> data_types self._base_path = os.path.join(get_baskets_base_location(), schema_name, stream_name) - self.baskets = {} # dictionary of basket_number -> total_tuples + self._baskets = {} # dictionary of basket_number -> total_tuples for name in os.listdir(self._base_path): self.append_basket(name) + self._lock = RWLock() self._observer = Observer() self._observer.schedule(StreamBasketsHandler(stream=self), self._base_path, recursive=False) self._observer.start() + def get_schema_name(self): + return self._schema_name + + def get_stream_name(self): + return self._stream_name + def append_basket(self, path): if represents_int(path): with open(os.path.join(self._base_path, path)) as f: count = struct.unpack('i', f.read(4))[0] - self.baskets[int(path)] = count + self._lock.acquire_write() + self._baskets[int(path)] = count + self._lock.release() - # TODO add delete basket!!!! + def delete_basket(self, path): + if represents_int(path): + number = int(path) + self._lock.acquire_write() + if number in self._baskets: + del self._baskets[number] + self._lock.release() + + def get_next_basket_number_tuple(self, basket_number): + self._lock.acquire_read() + if basket_number in self._baskets: + self._lock.release() + return basket_number, self._baskets[basket_number] + else: + filtered = filter(lambda x: x > basket_number, self._baskets.keys()) + if len(filtered) > 0: + min_basket_number = min(filtered) + min_basket_tuples = self._baskets[min_basket_number] + self._lock.release() + return min_basket_number, min_basket_tuples + else: + self._lock.release() + return None, None def read_tuples(self, basket_number, limit, offset): results = {column: [] for column in self._columns.keys()} - current_basket = int(basket_number) + current_basket_number = int(basket_number) read_tuples = 0 + skipped_tuples = 0 finished = False while True: - if current_basket not in self.baskets: + current_basket_number, current_tuple_number = self.get_next_basket_number_tuple(current_basket_number) + if current_basket_number is None: finished = True break - offset -= self.baskets[current_basket] - if offset < 0: + if skipped_tuples + current_tuple_number > offset: + offset = offset - skipped_tuples break - current_basket += 1 + skipped_tuples += current_tuple_number + current_basket_number += 1 if not finished: - offset = abs(offset) + while True: + current_basket_number, current_tuple_number = self.get_next_basket_number_tuple(current_basket_number) + if current_basket_number is None or read_tuples >= limit: + break - while True: - if current_basket not in self.baskets: - break - next_path = os.path.join(self._base_path, str(current_basket)) - next_read_size = min(self.baskets[current_basket], limit) + next_path = os.path.join(self._base_path, str(current_basket_number)) + next_read_size = min(limit - read_tuples, current_tuple_number) - offset for key, column in self._columns.iteritems(): next_file_name = os.path.join(next_path, key) @@ -85,12 +126,11 @@ class DataCellStream(object): file_pointer = open(next_file_name, open_string) results[key].append(column.read_next_batch(file_pointer, offset, next_read_size)) + read_tuples += next_read_size offset = 0 - current_basket += 1 - read_tuples += next_read_size - limit -= self.baskets[current_basket] - if limit <= 0: - break + current_basket_number += 1 # TODO check if this is viable, it could be 1000 tuples!!!! - return {'total': read_tuples, 'tuples': zip(*results)} # TODO not done this way!!! + keys = results.keys() + tuples = [dict(zip(keys, values)) for values in zip(*(results[k] for k in keys))] + return {'total': read_tuples, 'tuples': tuples} diff --git a/clients/iotapi/src/WebSockets/websockets.py b/clients/iotapi/src/WebSockets/websockets.py --- a/clients/iotapi/src/WebSockets/websockets.py +++ b/clients/iotapi/src/WebSockets/websockets.py @@ -37,7 +37,7 @@ def desubscribe_removed_streams(concaten class IOTAPI(WebSocket): def __init__(self): super(IOTAPI, self).__init__() - self._subscriptions = {} + self._subscriptions = {} # dictionary of schema + '.' + stream -> IOTStream self._locker = RWLock() def handleMessage(self): diff --git a/clients/iotclient/src/Streams/datatypes.py b/clients/iotclient/src/Streams/datatypes.py --- a/clients/iotclient/src/Streams/datatypes.py +++ b/clients/iotclient/src/Streams/datatypes.py @@ -259,14 +259,14 @@ class EnumType(TextType): return ''.join(array) -class INetSix(TextType): +class INetSixType(TextType): """Covers: Inet6""" def __init__(self, **kwargs): - super(INetSix, self).__init__(**kwargs) + super(INetSixType, self).__init__(**kwargs) def add_json_schema_entry(self, schema): - super(INetSix, self).add_json_schema_entry(schema) + super(INetSixType, self).add_json_schema_entry(schema) schema[self._column_name]['format'] = 'ipv6' # http://stackoverflow.com/questions/166132/maximum-length-of-the-textual-representation-of-an-ipv6-address @@ -274,17 +274,17 @@ class INetSix(TextType): array[2] = 'char(45)' -class INet(StreamDataType): +class INetType(StreamDataType): """Covers: Inet""" def __init__(self, **kwargs): - super(INet, self).__init__(**kwargs) + super(INetType, self).__init__(**kwargs) def get_nullable_constant(self): return "0" # has to trick because it is impossible to get a null value from a valid IPv4 address in MonetDB def add_json_schema_entry(self, schema): - super(INet, self).add_json_schema_entry(schema) + super(INetType, self).add_json_schema_entry(schema) schema[self._column_name]['pattern'] = IPV4_REGEX def process_next_value(self, entry, counter, parameters, errors): diff --git a/clients/iotclient/src/Streams/streamscreator.py b/clients/iotclient/src/Streams/streamscreator.py --- a/clients/iotclient/src/Streams/streamscreator.py +++ b/clients/iotclient/src/Streams/streamscreator.py @@ -15,8 +15,8 @@ SWITCHER = [{'types': ['text', 'string', {'types': ['uuid'], 'class': 'UUIDType'}, {'types': ['mac'], 'class': 'MACType'}, {'types': ['url'], 'class': 'URLType'}, - {'types': ['inet'], 'class': 'INet'}, - {'types': ['inet6'], 'class': 'INetSix'}, + {'types': ['inet'], 'class': 'INetType'}, + {'types': ['inet6'], 'class': 'INetSixType'}, {'types': ['regex'], 'class': 'RegexType'}, {'types': ['char', 'character', 'varchar', 'character varying'], 'class': 'LimitedTextType'}, {'types': ['enum'], 'class': 'EnumType'}, _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list