Changeset: 86b955aa08b7 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=86b955aa08b7
Modified Files:
        clients/iotapi/src/Streams/datatypes.py
        clients/iotapi/src/Streams/streampolling.py
        clients/iotapi/src/Streams/streams.py
        clients/iotclient/documentation/iot_server_arguments.rst
        clients/iotclient/documentation/restful_resources.rst
        clients/iotclient/documentation/streams_data_types.rst
        clients/iotclient/requirements.txt
        clients/iotclient/src/Flask/restresources.py
        clients/iotclient/src/Streams/streams.py
Branch: iot
Log Message:

First version of web api working. Updated some documentation. Fixed small 
errors.


diffs (230 lines):

diff --git a/clients/iotapi/src/Streams/datatypes.py 
b/clients/iotapi/src/Streams/datatypes.py
--- a/clients/iotapi/src/Streams/datatypes.py
+++ b/clients/iotapi/src/Streams/datatypes.py
@@ -119,12 +119,17 @@ class UUIDType(StreamDataType):
         iterator = iter(array)
 
         for _ in xrange(limit):
-            next_uuid = ''.join(map(lambda x: "%02x" % x, [next(iterator) for 
_ in xrange(16)]))
-            next_uuid = ''.join([next_uuid[:8], '-', next_uuid[8:12], '-', 
next_uuid[12:16], '-', next_uuid[16:20],
-                                 '-', next_uuid[20:]])
-            if next_uuid == NIL_UUID:
-                next_uuid = None
-            results.append(next_uuid)
+            next_uuid = []
+            for i in xrange(20):
+                if i in (4, 7, 10, 13):
+                    next_uuid.append("-")
+                else:
+                    next_uuid.append("%02x" % next(iterator))
+
+            built_uuid = ''.join(next_uuid)
+            if built_uuid == NIL_UUID:
+                built_uuid = None
+            results.append(built_uuid)
         return results
 
 
@@ -178,7 +183,7 @@ class HugeIntegerType(StreamDataType):
         results = []
         iterator = iter(array)  # has to iterate two values at once, so use 
iterator
         for value in iterator:
-            next_huge = value + (next(iterator) << 64)
+            next_huge = next(iterator) + (value << 64)
             if next_huge == self._nullable_constant:
                 results.append(None)
             else:
@@ -229,7 +234,7 @@ class DecimalType(StreamDataType):
             results = []
             iterator = iter(array)  # has to iterate two values at once, so 
use iterator
             for value in iterator:
-                next_huge_decimal = value + (next(iterator) << 64)
+                next_huge_decimal = next(iterator) + (value << 64)
                 if next_huge_decimal == self._nullable_constant:
                     results.append(None)
                 else:
@@ -302,8 +307,8 @@ class TimestampType(StreamDataType):  # 
             if value == INT32_MIN and second_value == 0:
                 results.append(None)
             else:  # dates in python start on year 1, so we must subtract one 
year
-                read_date = date.fromordinal(value) - relativedelta(years=1)
-                div1, milliseconds = divmod(second_value, 1000)
+                read_date = date.fromordinal(second_value) - 
relativedelta(years=1)
+                div1, milliseconds = divmod(value, 1000)
                 div2, second = divmod(div1, 60)
                 hour, minute = divmod(div2, 60)
                 results.append(datetime.combine(read_date, time(hour=hour, 
minute=minute, second=second,
diff --git a/clients/iotapi/src/Streams/streampolling.py 
b/clients/iotapi/src/Streams/streampolling.py
--- a/clients/iotapi/src/Streams/streampolling.py
+++ b/clients/iotapi/src/Streams/streampolling.py
@@ -40,9 +40,9 @@ def stream_polling():
                     if elem[3] in entry['types']:
                         reflection_class = globals()[entry['class']]  # import 
everything from datatypes!!!
                         new_column = reflection_class(**{'name': elem[2], 
'type': elem[3], 'typewidth': elem[4]})
-                        columns[elem[2]] = new_column
-                        new_streams[key] = IOTStream(elem[0], elem[1], 
**columns)
-                    break
+                        columns[elem[2]] = new_column  # add new column to the 
dictionary
+                        break
+            new_streams[key] = IOTStream(schema_name=elem[0], 
stream_name=elem[1], columns=columns)
         else:
             retained_streams.append(key)
 
diff --git a/clients/iotapi/src/Streams/streams.py 
b/clients/iotapi/src/Streams/streams.py
--- a/clients/iotapi/src/Streams/streams.py
+++ b/clients/iotapi/src/Streams/streams.py
@@ -5,7 +5,7 @@ from datatypes import LITTLE_ENDIAN_ALIG
 from Settings.filesystem import get_baskets_base_location
 from Utilities.readwritelock import RWLock
 from WebSockets.websockets import notify_stream_inserts_to_clients
-from watchdog.events import FileSystemEventHandler
+from watchdog.events import FileSystemEventHandler, DirCreatedEvent, 
DirDeletedEvent
 from watchdog.observers import Observer
 
 BASKETS_COUNT_FILE = 'count'
@@ -27,13 +27,13 @@ class StreamBasketsHandler(FileSystemEve
         self._stream = stream
 
     def on_created(self, event):  # whenever a basket directory is created, 
notify to subscribed clients
-        if isinstance(event, 'DirCreatedEvent'):
+        if isinstance(event, DirCreatedEvent):
             basket_string = os.path.basename(os.path.normpath(event.src_path))
             count = self._stream.append_basket(basket_string)
             notify_stream_inserts_to_clients(self._stream.get_schema_name(), 
self._stream.get_stream_name(), count)
 
     def on_deleted(self, event):
-        if isinstance(event, 'DirDeletedEvent'):
+        if isinstance(event, DirDeletedEvent):
             basket_string = os.path.basename(os.path.normpath(event.src_path))
             self._stream.delete_basket(basket_string)
 
@@ -46,11 +46,10 @@ class IOTStream(object):
         self._stream_name = stream_name  # name of the stream
         self._columns = columns  # dictionary of name -> data_types
         self._base_path = os.path.join(get_baskets_base_location(), 
schema_name, stream_name)
+        self._lock = RWLock()
         self._baskets = {}  # dictionary of basket_number -> total_tuples
         for name in os.listdir(self._base_path):
             self.append_basket(name)
-        self._lock = RWLock()
-
         self._observer = Observer()
         self._observer.schedule(StreamBasketsHandler(stream=self), 
self._base_path, recursive=False)
         self._observer.start()
@@ -63,8 +62,8 @@ class IOTStream(object):
 
     def append_basket(self, path):
         if represents_int(path):
-            with open(os.path.join(self._base_path, path)) as f:
-                count = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + '1i', 
f.read(4))[0]
+            with open(os.path.join(self._base_path, path, BASKETS_COUNT_FILE)) 
as f:
+                count = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + 'i', 
f.read(4))[0]
                 self._lock.acquire_write()
                 self._baskets[int(path)] = count
                 self._lock.release()
@@ -124,7 +123,7 @@ class IOTStream(object):
 
                 for key, column in self._columns.iteritems():
                     next_file_name = os.path.join(next_path, key)
-                    
results[key].append(column.read_next_tuples(next_file_name, offset, 
next_read_size))
+                    results[key] += column.read_next_tuples(next_file_name, 
offset, next_read_size)
 
                 read_tuples += next_read_size
                 offset = 0
diff --git a/clients/iotclient/documentation/iot_server_arguments.rst 
b/clients/iotclient/documentation/iot_server_arguments.rst
--- a/clients/iotclient/documentation/iot_server_arguments.rst
+++ b/clients/iotclient/documentation/iot_server_arguments.rst
@@ -32,10 +32,6 @@ Set the filesystem directory where the b
 
 Location of logfile. On the logfile is reported when streams are created or 
removed, when tuples are inserted and when the baskets are flushed. By default 
in UNIX systems is :code:`/var/log/iot/iot.log`, while on Windows is the 
:code:`iot.log` on the directory where the :code:`main.py` script was called.
 
-**-c  - -config=**
-
-Location of the JSON file where the information of existing streams on the 
server will be stored. By default is the :code:`config.json` file on filesystem 
directory.
-
 Host Identifier
 ---------------
 
diff --git a/clients/iotclient/documentation/restful_resources.rst 
b/clients/iotclient/documentation/restful_resources.rst
--- a/clients/iotclient/documentation/restful_resources.rst
+++ b/clients/iotclient/documentation/restful_resources.rst
@@ -53,7 +53,7 @@ Returns a JSON file with details about a
 
 **POST**
 
-Creates a stream using a pre-defined JSON schema. The JSON must include the 
stream's schema, the stream's name, the flushing method which can be either 
time or tuple based and the stream's columns. For tuple based flushing, the 
number of tuples to flush must be provided using the :code:`number` field. In 
time based flushing, the :code:`interval` field tells the time units between 
flushes and the :code:`unit` field must be "s", "m" or "h" for seconds, minutes 
or hours respectively. For columns `see data types for details 
<streams_data_types.html#data_types>`__.
+Creates a stream using a pre-defined JSON schema. The JSON must include the 
stream's schema, the stream's name, the stream's columns and the flushing 
method. The flushing can be either time based, tuple based or automatic 
(:code:`auto`). For tuple based flushing, the number of tuples to flush must be 
provided using the :code:`number` field. In time based flushing, the 
:code:`interval` field tells the time units between flushes and the 
:code:`unit` field must be "s", "m" or "h" for seconds, minutes or hours 
respectively. In automatic flushing, the baskets are flushed whenver a new 
batch is inserted. For columns `see data types for details 
<streams_data_types.html#data_types>`__.
 
 Bellow is the JSON used to create the stream in streams_:
 
diff --git a/clients/iotclient/documentation/streams_data_types.rst 
b/clients/iotclient/documentation/streams_data_types.rst
--- a/clients/iotclient/documentation/streams_data_types.rst
+++ b/clients/iotclient/documentation/streams_data_types.rst
@@ -43,7 +43,7 @@ An *Uniform Resource Locator* as a speci
 Inet
 ----
 
-An *IPv4* address. The insertion as a JSON string is validated against the 
regular expression :code:`^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$` with further 
semantic validation.
+An *IPv4* address. The insertion as a JSON string is validated against the 
regular expression 
:code:`^(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}(?:\/[0-2]\d|\/3[0-2])?$`.
 
 InetSix
 -------
diff --git a/clients/iotclient/requirements.txt 
b/clients/iotclient/requirements.txt
--- a/clients/iotclient/requirements.txt
+++ b/clients/iotclient/requirements.txt
@@ -3,7 +3,7 @@ jsonschema==2.5.1
 pymonetdb==0.1.1
 python-dateutil==2.5.3
 pytz==2016.4
-rfc3987==1.3.5
+rfc3987==1.3.6
 Sphinx==1.4.1
 sphinx-rtd-theme==0.1.9
 strict-rfc3339==0.7
diff --git a/clients/iotclient/src/Flask/restresources.py 
b/clients/iotclient/src/Flask/restresources.py
--- a/clients/iotclient/src/Flask/restresources.py
+++ b/clients/iotclient/src/Flask/restresources.py
@@ -84,8 +84,6 @@ class StreamsHandling(Resource):
             add_log(50, ex)
             return ex, 400
         else:
-            add_log(20, ''.join['The stream ', schema_to_validate['schema'], 
'.', schema_to_validate['stream'],
-                                ' was created'])
             return 'The stream was created with success!', 201
 
     def delete(self):
diff --git a/clients/iotclient/src/Streams/streams.py 
b/clients/iotclient/src/Streams/streams.py
--- a/clients/iotclient/src/Streams/streams.py
+++ b/clients/iotclient/src/Streams/streams.py
@@ -84,9 +84,17 @@ class BaseIOTStream(object):
         if Use_Host_Identifier:
             create_file_if_not_exists(os.path.join(self._current_base_path, 
HOST_IDENTIFIER_COLUMN_NAME))
 
+        # WARNING DELETE this code afterwards is for debugging purposes for now
+        basket_counter_file_pointer = 
open(os.path.join(self._current_base_path, BASKETS_COUNT_FILE), "w+b")
+        basket_counter_file_pointer.write(struct.pack(LITTLE_ENDIAN_ALIGNMENT 
+ "i",0))
+        basket_counter_file_pointer.flush()
+        basket_counter_file_pointer.close()
+        # END WARNING
+
         if created:  # when the stream is reloaded from the config file, the 
create SQL statement is not sent
             sql_array = [column.create_stream_sql() for column in 
self._columns.values()]
             mapi_create_stream(self._schema_name, self._stream_name, ', 
'.join(sql_array + Extra_columns_SQL))
+            add_log(20, ''.join(['The stream ', self._schema_name, '.', 
self._stream_name, ' was created']))
 
     def get_schema_name(self):
         return self._schema_name
@@ -136,6 +144,13 @@ class BaseIOTStream(object):
             self._current_base_path = os.path.join(self._base_path, 
str(self._baskets_counter))
             os.makedirs(self._current_base_path)
 
+            # WARNING DELETE this code afterwards is for debugging purposes 
for now
+            basket_counter_file_pointer = 
open(os.path.join(self._current_base_path, BASKETS_COUNT_FILE), "w+b")
+            
basket_counter_file_pointer.write(struct.pack(LITTLE_ENDIAN_ALIGNMENT + "i", 0))
+            basket_counter_file_pointer.flush()
+            basket_counter_file_pointer.close()
+            # END WARNING
+
             for key in self._columns.keys():
                 
create_file_if_not_exists(os.path.join(self._current_base_path, key))
             create_file_if_not_exists(os.path.join(self._current_base_path, 
IMPLICIT_TIMESTAMP_COLUMN_NAME))
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to