Changeset: a91cdecbf145 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=a91cdecbf145
Modified Files:
        clients/iotapi/src/Streams/datatypes.py
        clients/iotapi/src/Streams/streams.py
        clients/iotclient/src/Streams/streamscreator.py
Branch: iot
Log Message:

Finished data reconstruction


diffs (truncated from 334 to 300 lines):

diff --git a/clients/iotapi/src/Streams/datatypes.py 
b/clients/iotapi/src/Streams/datatypes.py
--- a/clients/iotapi/src/Streams/datatypes.py
+++ b/clients/iotapi/src/Streams/datatypes.py
@@ -1,12 +1,13 @@
 import struct
+from abc import ABCMeta, abstractmethod
 from datetime import date, time, datetime
 
-from abc import ABCMeta, abstractmethod
 from dateutil.relativedelta import relativedelta
 
-ALIGNMENT = '<'  # for now is little-endian for Intel CPU's
+LITTLE_ENDIAN_ALIGNMENT = '<'  # for now is little-endian for Intel CPU's
 
 NIL_STRING = "\200\n"  # added newline for performance
+NIL_UUID = "00000000-0000-0000-0000-000000000000"
 
 INT8_MIN = 0x80
 INT16_MIN = 0x8000
@@ -20,24 +21,40 @@ DOUBLE_NAN = struct.unpack('d', '\xff\xf
 
 
 class StreamDataType(object):
-    """MonetDB's data types for validation base class"""
+    """MonetDB's data types for reading base class"""
     __metaclass__ = ABCMeta
 
     def __init__(self, **kwargs):
         self._column_name = kwargs['name']  # name of the column
         self._data_type = kwargs['type']  # SQL name of the type
-        # self._location = kwargs['location'] + '.tail'  # Location of the file
 
     def is_file_mode_binary(self):
         return True
 
     @abstractmethod
-    def read_next_batch(self, file_pointer, count):
-        return []
+    def skip_tuples(self, file_pointer, offset):
+        pass
+
+    @abstractmethod
+    def read_next_batch(self, file_pointer, limit):
+        pass
+
+    def read_next_tuples(self, file_name, offset, read_size):
+        open_string = 'r'
+        if not self.is_file_mode_binary():
+            open_string += 'u'
+        file_pointer = open(file_name, open_string)
+
+        if offset > 0:
+            self.skip_tuples(file_pointer, offset)
+
+        results = self.read_next_batch(file_pointer, read_size)
+        file_pointer.close()
+        return results
 
 
 class TextType(StreamDataType):
-    """Covers: CHAR, VARCHAR, CLOB"""
+    """Covers: CHAR, VARCHAR, CLOB and URL"""
 
     def __init__(self, **kwargs):
         super(TextType, self).__init__(**kwargs)
@@ -46,9 +63,69 @@ class TextType(StreamDataType):
     def is_file_mode_binary(self):
         return False
 
-    def read_next_batch(self, file_pointer, count):
-        array = file_pointer.readlines()
-        return map(lambda x: None if x == self._nullable_constant else x[:-1], 
array)
+    def skip_tuples(self, file_pointer, offset):
+        for _ in xrange(offset):
+            next(file_pointer)
+
+    def read_next_batch(self, file_pointer, limit):
+        array = []
+        for _ in xrange(limit):
+            next_line = next(file_pointer)
+            if next_line == self._nullable_constant:
+                array.append(None)
+            else:
+                array.append(next_line[:-1])  # remove newline
+        return array
+
+
+class INetType(StreamDataType):
+    """Covers: Inet"""
+
+    def __init__(self, **kwargs):
+        super(INetType, self).__init__(**kwargs)
+
+    def skip_tuples(self, file_pointer, offset):
+        file_pointer.seek(offset << 3)
+
+    def read_next_batch(self, file_pointer, limit):
+        results = []
+        read_size = limit << 3
+        array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(read_size) + 'B', 
file_pointer.read(read_size))
+        iterator = iter(array)
+
+        for _ in xrange(limit):
+            next_ipv4 = [next(iterator) for _ in xrange(8)]
+            if next_ipv4[7] == 1:  # check nil value
+                results.append(None)
+            else:
+                parsed_ip = '.'.join([str(next_ipv4[0]), str(next_ipv4[1]), 
str(next_ipv4[2]), str(next_ipv4[3])])
+                results.append(parsed_ip + '/' + str(next_ipv4[4]))
+        return results
+
+
+class UUIDType(StreamDataType):
+    """Covers: UUID"""
+
+    def __init__(self, **kwargs):
+        super(UUIDType, self).__init__(**kwargs)
+
+    def skip_tuples(self, file_pointer, offset):
+        file_pointer.seek(offset << 4)
+
+    def read_next_batch(self, file_pointer, limit):
+        results = []
+        read_size = limit << 4
+        array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(read_size) + 'B', 
file_pointer.read(read_size))
+        iterator = iter(array)
+
+        for _ in xrange(limit):
+            next_uuid = ''.join(map(lambda x: "%02x" % x, [next(iterator) for 
_ in xrange(16)]))
+            next_uuid = ''.join([next_uuid[:8], '-', next_uuid[8:12], '-', 
next_uuid[12:16], '-', next_uuid[16:20],
+                                 '-', next_uuid[20:]])
+            if next_uuid == NIL_UUID:
+                next_uuid = None
+            results.append(next_uuid)
+        return results
 
 
 class BooleanType(StreamDataType):
@@ -58,8 +135,11 @@ class BooleanType(StreamDataType):
         super(BooleanType, self).__init__(**kwargs)
         self._nullable_constant = INT8_MIN
 
-    def read_next_batch(self, file_pointer, count):
-        array = struct.unpack(ALIGNMENT + str(count) + 'b', 
file_pointer.read(count))
+    def skip_tuples(self, file_pointer, offset):
+        file_pointer.seek(offset)
+
+    def read_next_batch(self, file_pointer, limit):
+        array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + 'b', 
file_pointer.read(limit))
         return map(lambda x: None if x == self._nullable_constant else 
bool(x), array)
 
 
@@ -74,8 +154,12 @@ class SmallIntegerType(StreamDataType):
         self._nullable_constant = {'tinyint': INT8_MIN, 'smallint': INT16_MIN, 
'int': INT32_MIN, 'integer': INT32_MIN,
                                    'bigint': INT64_MIN}.get(self._data_type)
 
-    def read_next_batch(self, file_pointer, count):
-        array = struct.unpack(ALIGNMENT + str(count) + self._pack_sym, 
file_pointer.read(count * self._size))
+    def skip_tuples(self, file_pointer, offset):
+        file_pointer.seek(offset * self._size)
+
+    def read_next_batch(self, file_pointer, limit):
+        array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + 
self._pack_sym,
+                              file_pointer.read(limit * self._size))
         return map(lambda x: None if x == self._nullable_constant else int(x), 
array)
 
 
@@ -86,8 +170,11 @@ class HugeIntegerType(StreamDataType):
         super(HugeIntegerType, self).__init__(**kwargs)
         self._nullable_constant = INT128_MIN
 
-    def read_next_batch(self, file_pointer, count):  # [entry & INT64_MAX, 
(entry >> 64) & INT64_MAX]
-        array = struct.unpack(ALIGNMENT + str(count << 1) + 'Q', 
file_pointer.read(count << 3))
+    def skip_tuples(self, file_pointer, offset):
+        file_pointer.seek(offset << 4)
+
+    def read_next_batch(self, file_pointer, limit):  # [entry & INT64_MAX, 
(entry >> 64) & INT64_MAX]
+        array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit << 1) + 'Q', 
file_pointer.read(limit << 4))
         results = []
         iterator = iter(array)  # has to iterate two values at once, so use 
iterator
         for value in iterator:
@@ -108,8 +195,12 @@ class FloatType(StreamDataType):
         self._size = struct.calcsize(self._pack_sym)
         self._nullable_constant = {'real': FLOAT_NAN, 'float': DOUBLE_NAN, 
'double': DOUBLE_NAN}.get(self._data_type)
 
-    def read_next_batch(self, file_pointer, count):
-        array = struct.unpack(ALIGNMENT + str(count) + self._pack_sym, 
file_pointer.read(count * self._size))
+    def skip_tuples(self, file_pointer, offset):
+        file_pointer.seek(offset * self._size)
+
+    def read_next_batch(self, file_pointer, limit):
+        array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + 
self._pack_sym,
+                              file_pointer.read(limit * self._size))
         return map(lambda x: None if x == self._nullable_constant else 
float(x), array)
 
 
@@ -126,20 +217,24 @@ class DecimalType(StreamDataType):
         if self._pack_sym == 'Q':
             self._size <<= 1  # has to read two values at once
 
-    def read_next_batch(self, file_pointer, count):
-        array = struct.unpack(ALIGNMENT + str(count) + self._pack_sym, 
file_pointer.read(count * self._size))
+    def skip_tuples(self, file_pointer, offset):
+        file_pointer.seek(offset * self._size)
+
+    def read_next_batch(self, file_pointer, limit):
+        array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + 
self._pack_sym,
+                              file_pointer.read(limit * self._size))
         if self._pack_sym != 'Q':
             return map(lambda x: None if x == self._nullable_constant else 
float(x), array)
-
-        results = []
-        iterator = iter(array)  # has to iterate two values at once, so use 
iterator
-        for value in iterator:
-            next_huge_decimal = value + (next(iterator) << 64)
-            if next_huge_decimal == self._nullable_constant:
-                results.append(None)
-            else:
-                results.append(next_huge_decimal)
-        return results
+        else:
+            results = []
+            iterator = iter(array)  # has to iterate two values at once, so 
use iterator
+            for value in iterator:
+                next_huge_decimal = value + (next(iterator) << 64)
+                if next_huge_decimal == self._nullable_constant:
+                    results.append(None)
+                else:
+                    results.append(next_huge_decimal)
+            return results
 
 
 class DateType(StreamDataType):  # Stored as an uint with the number of days 
since day 1 of month 1 (Jan) from year 0
@@ -149,8 +244,11 @@ class DateType(StreamDataType):  # Store
         super(DateType, self).__init__(**kwargs)
         self._nullable_constant = INT32_MIN
 
-    def read_next_batch(self, file_pointer, count):
-        array = struct.unpack(ALIGNMENT + str(count) + 'I', 
file_pointer.read(count << 2))
+    def skip_tuples(self, file_pointer, offset):
+        file_pointer.seek(offset << 2)
+
+    def read_next_batch(self, file_pointer, limit):
+        array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + 'I', 
file_pointer.read(limit << 2))
         results = []
         for value in array:
             if value == self._nullable_constant:
@@ -167,8 +265,11 @@ class TimeType(StreamDataType):  # Store
         super(TimeType, self).__init__(**kwargs)
         self._nullable_constant = INT32_MIN
 
-    def read_next_batch(self, file_pointer, count):
-        array = struct.unpack(ALIGNMENT + str(count) + 'I', 
file_pointer.read(count << 2))
+    def skip_tuples(self, file_pointer, offset):
+        file_pointer.seek(offset << 2)
+
+    def read_next_batch(self, file_pointer, limit):
+        array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit) + 'I', 
file_pointer.read(limit << 2))
         results = []
         for value in array:
             if value == self._nullable_constant:
@@ -188,8 +289,11 @@ class TimestampType(StreamDataType):  # 
     def __init__(self, **kwargs):
         super(TimestampType, self).__init__(**kwargs)
 
-    def read_next_batch(self, file_pointer, count):
-        array = struct.unpack(ALIGNMENT + str(count << 1) + 'I', 
file_pointer.read(count << 3))
+    def skip_tuples(self, file_pointer, offset):
+        file_pointer.seek(offset << 3)
+
+    def read_next_batch(self, file_pointer, limit):
+        array = struct.unpack(LITTLE_ENDIAN_ALIGNMENT + str(limit << 1) + 'I', 
file_pointer.read(limit << 3))
         results = []
         iterator = iter(array)  # has to iterate two values at once, so use 
iterator
 
diff --git a/clients/iotapi/src/Streams/streams.py 
b/clients/iotapi/src/Streams/streams.py
--- a/clients/iotapi/src/Streams/streams.py
+++ b/clients/iotapi/src/Streams/streams.py
@@ -1,6 +1,6 @@
+import os
 import struct
 
-import os
 from Settings.filesystem import get_baskets_base_location
 from Utilities.readwritelock import RWLock
 from WebSockets.websockets import notify_clients
@@ -120,11 +120,7 @@ class IOTStream(object):
 
                 for key, column in self._columns.iteritems():
                     next_file_name = os.path.join(next_path, key)
-                    open_string = 'r'
-                    if not column.is_file_mode_binary():
-                        open_string += 'u'
-                    file_pointer = open(next_file_name, open_string)
-                    results[key].append(column.read_next_batch(file_pointer, 
offset, next_read_size))
+                    
results[key].append(column.read_next_tuples(next_file_name, offset, 
next_read_size))
 
                 read_tuples += next_read_size
                 offset = 0
diff --git a/clients/iotclient/src/Streams/streamscreator.py 
b/clients/iotclient/src/Streams/streamscreator.py
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to