Changeset: e13cdaed7ea2 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=e13cdaed7ea2 Modified Files: clients/iotclient/src/Streams/datatypes.py clients/iotclient/src/Streams/streamscreator.py Branch: iot Log Message:
More bugfixes diffs (202 lines): diff --git a/clients/iotclient/src/Streams/datatypes.py b/clients/iotclient/src/Streams/datatypes.py --- a/clients/iotclient/src/Streams/datatypes.py +++ b/clients/iotclient/src/Streams/datatypes.py @@ -406,7 +406,7 @@ class SmallIntegerType(NumberBaseType): return int(entry) def pack_parsed_values(self, extracted_values, counter, parameters): - return struct.pack(ALIGNMENT + str(counter) + self._pack_sym, extracted_values) + return struct.pack(ALIGNMENT + str(counter) + self._pack_sym, *extracted_values) class HugeIntegerType(NumberBaseType): @@ -540,7 +540,9 @@ class BaseDateTimeType(StreamDataType): self._maximum = self.parse_entry(kwargs['maximum']) if hasattr(self, '_minimum') and hasattr(self, '_maximum') and self._minimum > self._maximum: raise Exception('The minimum value is higher than the maximum!') - self._default_value_text = None # needed later for the SQL creation statement + + def get_nullable_constant(self): + return "0" @abstractmethod def parse_entry(self, entry): @@ -558,14 +560,15 @@ class BaseDateTimeType(StreamDataType): raise Exception('The default value is out of range: %s < %s!' % (default_value, self._minimum_text)) elif hasattr(self, '_maximum') and hasattr(self, '_minimum') and parsed_val > self._maximum: raise Exception('The default value is out of range: %s > %s!' % (default_value, self._maximum_text)) - self._default_value = parsed_val - self._default_value_text = default_value + self._default_value = default_value @abstractmethod def pack_next_value(self, parsed, counter, parameters, errors): pass def process_next_value(self, entry, counter, parameters, errors): + if entry == self.get_nullable_constant(): # have to do this trick due to Python's datetime limitations + return self.pack_next_value(None, counter, parameters, errors) parsed = self.parse_entry(entry) if hasattr(self, '_minimum') and not hasattr(self, '_maximum') and parsed < self._minimum: errors[counter] = 'The value is higher than the minimum: %s < %s!' % (self._minimum_text, parsed) @@ -577,31 +580,16 @@ class BaseDateTimeType(StreamDataType): errors[counter] = 'The value is out of range: %s > %s!' % (parsed, self._maximum_text) return self.pack_next_value(parsed, counter, parameters, errors) - def add_json_schema_entry(self, schema): - super(BaseDateTimeType, self).add_json_schema_entry(schema) - schema[self._column_name]['default'] = self._default_value_text - def to_json_representation(self): json_value = super(BaseDateTimeType, self).to_json_representation() - json_value['type'] = 'string' - if self._default_value is not None: - json_value['default'] = self._default_value_text if hasattr(self, '_minimum'): json_value['minimum'] = self._minimum_text if hasattr(self, '_maximum'): json_value['maximum'] = self._maximum_text return json_value - def create_stream_sql(self): - array = [self._column_name, " ", self._data_type] - if self._default_value is not None: - array.extend([" DEFAULT '", str(self._default_value_text), "'"]) - if not self._is_nullable: - array.append(" NOT NULL") - return ''.join(array) - -class DateType(BaseDateTimeType): # Stored as an uint with the number of days since day 0 of month 1 (Jan) from year 0 +class DateType(BaseDateTimeType): # Stored as an uint with the number of days since day 1 of month 1 (Jan) from year 0 """Covers: DATE""" def __init__(self, **kwargs): @@ -611,15 +599,15 @@ class DateType(BaseDateTimeType): # Sto super(DateType, self).add_json_schema_entry(schema) schema[self._column_name]['format'] = 'date' - def get_nullable_constant(self): - return INT32_MIN # Checked from MonetDB's source code - def parse_entry(self, entry): return datetime.datetime.strptime(str(entry), "%Y-%m-%d") def pack_next_value(self, parsed, counter, parameters, errors): + if parsed is None: + return INT32_MIN day0 = copy.deepcopy(parsed).replace(year=1, month=1, day=1) - return (parsed - day0).days + 366 # the mindate in python is 1, but for the representation is 0, so why the add + # the minyear in python is 1, but for the representation is 0, so why the add + return int((parsed - day0).days) + 366 def pack_parsed_values(self, extracted_values, counter, parameters): return struct.pack(ALIGNMENT + str(counter) + 'I', *extracted_values) @@ -635,9 +623,6 @@ class TimeType(BaseDateTimeType): # Sto else: self._has_timezone = True - def get_nullable_constant(self): - return INT32_MIN # Checked from MonetDB's source code - def add_json_schema_entry(self, schema): super(TimeType, self).add_json_schema_entry(schema) schema[self._column_name]['pattern'] = TIME_REGEX @@ -649,8 +634,11 @@ class TimeType(BaseDateTimeType): # Sto return parsed_time def pack_next_value(self, parsed, counter, parameters, errors): + if parsed is None: + return INT32_MIN hour0 = copy.deepcopy(parsed).replace(hour=0, minute=0, second=0, microsecond=0) - return (parsed - hour0).seconds * 1000 + delta = parsed - hour0 + return int(delta.total_seconds()) * 1000 + int(delta.microseconds) / 1000 def pack_parsed_values(self, extracted_values, counter, parameters): return struct.pack(ALIGNMENT + str(counter) + 'I', *extracted_values) @@ -666,9 +654,6 @@ class TimestampType(BaseDateTimeType): else: self._has_timezone = True - def get_nullable_constant(self): - return [0, INT32_MIN] # Checked from MonetDB's source code - def add_json_schema_entry(self, schema): super(TimestampType, self).add_json_schema_entry(schema) schema[self._column_name]['format'] = 'date-time' @@ -680,10 +665,13 @@ class TimestampType(BaseDateTimeType): return parsed_timestamp def pack_next_value(self, parsed, counter, parameters, errors): + if parsed is None: + return [0, INT32_MIN] hour0 = copy.deepcopy(parsed).replace(hour=0, minute=0, second=0, microsecond=0) day0 = copy.deepcopy(parsed).replace(year=1, month=1, day=1) - days = (parsed - day0).days + 366 # the mindate in python is 1, but for the representation is 0, so why the add - milliseconds = (parsed - hour0).seconds * 1000 + days = int((parsed - day0).days) + 366 + delta = parsed - hour0 + milliseconds = int(delta.total_seconds()) * 1000 + int(delta.microseconds) / 1000 return [milliseconds, days] def pack_parsed_values(self, extracted_values, counter, parameters): diff --git a/clients/iotclient/src/Streams/streamscreator.py b/clients/iotclient/src/Streams/streamscreator.py --- a/clients/iotclient/src/Streams/streamscreator.py +++ b/clients/iotclient/src/Streams/streamscreator.py @@ -11,24 +11,23 @@ class ColumnsValidationException(Excepti super(ColumnsValidationException, self).__init__() self.message = error_messages # dictionary of name -> error message - -SWITCHER = {('text', 'string', 'clob', 'character large object'): 'TextType', - ('uuid'): 'UUIDType', - ('mac'): 'MACType', - ('url'): 'URLType', - ('inet'): 'INet', - ('inet6'): 'INetSix', - ('regex'): 'RegexType', - ('char', 'character', 'varchar', 'character varying'): 'LimitedTextType', - ('enum'): 'EnumType', - ('boolean'): 'BooleanType', - ('tinyint', 'smallint', 'int', 'integer', 'bigint'): 'SmallIntegerType', - ('hugeint'): 'HugeIntegerType', - ('real', 'float', 'double'): 'FloatType', - ('decimal', 'numeric'): 'DecimalType', - ('date'): 'DateType', - ('time'): 'TimeType', - ('timestamp'): 'TimestampType'} +SWITCHER = [{'types': ['text', 'string', 'clob', 'character large object'], 'class': 'TextType'}, + {'types': ['uuid'], 'class': 'UUIDType'}, + {'types': ['mac'], 'class': 'MACType'}, + {'types': ['url'], 'class': 'URLType'}, + {'types': ['inet'], 'class': 'INet'}, + {'types': ['inet6'], 'class': 'INetSix'}, + {'types': ['regex'], 'class': 'RegexType'}, + {'types': ['char', 'character', 'varchar', 'character varying'], 'class': 'LimitedTextType'}, + {'types': ['enum'], 'class': 'EnumType'}, + {'types': ['boolean'], 'class': 'BooleanType'}, + {'types': ['tinyint', 'smallint', 'int', 'integer', 'bigint'], 'class': 'SmallIntegerType'}, + {'types': ['hugeint'], 'class': 'HugeIntegerType'}, + {'types': ['real', 'float', 'double'], 'class': 'FloatType'}, + {'types': ['decimal', 'numeric'], 'class': 'DecimalType'}, + {'types': ['date'], 'class': 'DateType'}, + {'types': ['time'], 'class': 'TimeType'}, + {'types': ['timestamp'], 'class': 'TimestampType'}] def validate_schema_and_create_stream(schema, created=False): @@ -43,10 +42,10 @@ def validate_schema_and_create_stream(sc errors[next_name] = 'The column ' + next_name + ' is duplicated!' continue - for tuple_keys, class_name in SWITCHER.iteritems(): # allocate the proper type wrapper - if next_type in tuple_keys: + for entry in SWITCHER: # allocate the proper type wrapper + if next_type in entry['types']: try: - reflection_class = globals()[class_name] # import everything from datatypes!!! + reflection_class = globals()[entry['class']] # import everything from datatypes!!! new_column = reflection_class(**column) # pass the json entry as kwargs if 'default' in column: new_column.set_default_value(column['default']) _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list