KUDU-1614 - [python] Enable Set/Get of unixtime_micros Currently, the python client in Kudu does not support setting or getting columns with the unixtime_micros type. This patch enables this capability and includes a unit test. This patch also fixes a minor bug with write operations using column indexes (KUDU-1615). This fix is reflected in the unit test associated with this patch.
Change-Id: Id428cbd072b7de7a75e58b66e4de89acd381fdca Reviewed-on: http://gerrit.cloudera.org:8080/4417 Tested-by: Kudu Jenkins Reviewed-by: Todd Lipcon <t...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1c668742 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1c668742 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1c668742 Branch: refs/heads/master Commit: 1c668742679bb00b5c851c80071c3232aa68441a Parents: bad9101 Author: Jordan Birdsell <jordantbirds...@gmail.com> Authored: Wed Sep 14 20:05:18 2016 -0400 Committer: Todd Lipcon <t...@apache.org> Committed: Thu Sep 22 07:33:34 2016 +0000 ---------------------------------------------------------------------- python/kudu/client.pyx | 58 +++++++++++++++++-- python/kudu/libkudu_client.pxd | 17 +++--- python/kudu/tests/common.py | 1 + python/kudu/tests/test_client.py | 13 ++++- python/kudu/tests/test_scanner.py | 41 ++++++-------- python/kudu/tests/test_scantoken.py | 51 ++++++++--------- python/kudu/tests/util.py | 97 ++++++++++++++++++++++++++++++++ python/kudu/util.py | 73 ++++++++++++++++++++++++ python/requirements.txt | 1 + python/setup.py | 2 +- 10 files changed, 287 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/1c668742/python/kudu/client.pyx ---------------------------------------------------------------------- diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx index 5394410..800a620 100644 --- a/python/kudu/client.pyx +++ b/python/kudu/client.pyx @@ -25,15 +25,29 @@ cimport cpython from cython.operator cimport dereference as deref from libkudu_client cimport * - from kudu.compat import tobytes, frombytes from kudu.schema cimport Schema, ColumnSchema from kudu.errors cimport check_status +from kudu.util import to_unixtime_micros, from_unixtime_micros from errors import KuduException import six +cdef dict _type_names = { + KUDU_INT8 : "KUDU_INT8", + KUDU_INT16 : "KUDU_INT16", + KUDU_INT32 : "KUDU_INT32", + KUDU_INT64 : "KUDU_INT64", + KUDU_STRING : "KUDU_STRING", + KUDU_BOOL : "KUDU_BOOL", + KUDU_FLOAT : "KUDU_FLOAT", + KUDU_DOUBLE : "KUDU_DOUBLE", + KUDU_BINARY : "KUDU_BINARY", + KUDU_UNIXTIME_MICROS : "KUDU_UNIXTIME_MICROS" +} + + cdef class TimeDelta: """ Wrapper interface for kudu MonoDelta class, which is used to specify @@ -513,6 +527,14 @@ cdef class StringVal(RawValue): def __dealloc__(self): del self.val +cdef class UnixtimeMicrosVal(RawValue): + cdef: + int64_t val + + def __cinit__(self, obj): + self.val = to_unixtime_micros(obj) + self.data = &self.val + #---------------------------------------------------------------------- cdef class TabletServer: """ @@ -1030,6 +1052,11 @@ cdef class Row: return cpython.PyBytes_FromStringAndSize(<char*> val.mutable_data(), val.size()) + cdef inline get_unixtime_micros(self, int i): + cdef int64_t val + check_status(self.row.GetUnixTimeMicros(i, &val)) + return val + cdef inline get_slot(self, int i): cdef: Status s @@ -1051,8 +1078,11 @@ cdef class Row: return self.get_float(i) elif t == KUDU_STRING: return frombytes(self.get_string(i)) + elif t == KUDU_UNIXTIME_MICROS: + return from_unixtime_micros(self.get_unixtime_micros(i)) else: - raise TypeError(t) + raise TypeError("Cannot get kudu type <{0}>" + .format(_type_names[t])) cdef inline bint is_null(self, int i): return self.row.IsNull(i) @@ -1712,6 +1742,11 @@ cdef class PartialRow: cpdef set_field(self, key, value): cdef: int i = self.table.schema.get_loc(key) + + self.set_loc(i, value) + + cpdef set_loc(self, int i, value): + cdef: DataType t = self.table.schema.loc_type(i) cdef Slice* slc @@ -1746,9 +1781,18 @@ cdef class PartialRow: # Not safe to take a reference to PyBytes data for now self.row.SetStringCopy(i, deref(slc)) del slc - - cpdef set_loc(self, int i, value): - pass + elif t == KUDU_UNIXTIME_MICROS: + # String with custom format + # eg: ("2016-01-01", "%Y-%m-%d") + if type(value) is tuple: + self.row.SetUnixTimeMicros(i, <int64_t> + to_unixtime_micros(value[0], value[1])) + # datetime.datetime input or string with default format + else: + self.row.SetUnixTimeMicros(i, <int64_t> + to_unixtime_micros(value)) + else: + raise TypeError("Cannot set kudu type <{0}>.".format(_type_names[t])) cpdef set_field_null(self, key): pass @@ -1839,5 +1883,7 @@ cdef inline cast_pyvalue(DataType t, object o): return FloatVal(o) elif t == KUDU_STRING: return StringVal(o) + elif t == KUDU_UNIXTIME_MICROS: + return UnixtimeMicrosVal(o) else: - raise TypeError(t) + raise TypeError("Cannot cast kudu type <{0}>".format(_type_names[t])) http://git-wip-us.apache.org/repos/asf/kudu/blob/1c668742/python/kudu/libkudu_client.pxd ---------------------------------------------------------------------- diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd index 546b2be..11bc78d 100644 --- a/python/kudu/libkudu_client.pxd +++ b/python/kudu/libkudu_client.pxd @@ -206,21 +206,24 @@ cdef extern from "kudu/client/scan_batch.h" namespace "kudu::client" nogil: # the value is unset, or the value is NULL. Otherwise they return # the current set value in *val. Status GetBool(Slice& col_name, c_bool* val) + Status GetBool(int col_idx, c_bool* val) Status GetInt8(Slice& col_name, int8_t* val) + Status GetInt8(int col_idx, int8_t* val) + Status GetInt16(Slice& col_name, int16_t* val) + Status GetInt16(int col_idx, int16_t* val) + Status GetInt32(Slice& col_name, int32_t* val) + Status GetInt32(int col_idx, int32_t* val) + Status GetInt64(Slice& col_name, int64_t* val) + Status GetInt64(int col_idx, int64_t* val) Status GetUnixTimeMicros(const Slice& col_name, int64_t* micros_since_utc_epoch) - - Status GetBool(int col_idx, c_bool* val) - - Status GetInt8(int col_idx, int8_t* val) - Status GetInt16(int col_idx, int16_t* val) - Status GetInt32(int col_idx, int32_t* val) - Status GetInt64(int col_idx, int64_t* val) + Status GetUnixTimeMicros(int col_idx, + int64_t* micros_since_utc_epoch) Status GetString(Slice& col_name, Slice* val) Status GetString(int col_idx, Slice* val) http://git-wip-us.apache.org/repos/asf/kudu/blob/1c668742/python/kudu/tests/common.py ---------------------------------------------------------------------- diff --git a/python/kudu/tests/common.py b/python/kudu/tests/common.py index 75a6ca6..f39c074 100644 --- a/python/kudu/tests/common.py +++ b/python/kudu/tests/common.py @@ -164,6 +164,7 @@ class KuduTestBase(object): builder.add_column('key', kudu.int32, nullable=False) builder.add_column('int_val', kudu.int32) builder.add_column('string_val', kudu.string) + builder.add_column('unixtime_micros_val', kudu.unixtime_micros) builder.set_primary_keys(['key']) return builder.build() http://git-wip-us.apache.org/repos/asf/kudu/blob/1c668742/python/kudu/tests/test_client.py ---------------------------------------------------------------------- diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py index 9dcb0d7..e900fd7 100644 --- a/python/kudu/tests/test_client.py +++ b/python/kudu/tests/test_client.py @@ -20,6 +20,8 @@ from kudu.compat import unittest, long from kudu.tests.common import KuduTestBase from kudu.client import Partitioning import kudu +import datetime +from pytz import utc class TestClient(KuduTestBase, unittest.TestCase): @@ -37,7 +39,7 @@ class TestClient(KuduTestBase, unittest.TestCase): table = self.client.table(self.ex_table) cols = [(table['key'], 'key', 'int32'), (table[1], 'int_val', 'int32'), - (table[-1], 'string_val', 'string')] + (table[-1], 'unixtime_micros_val', 'unixtime_micros')] for col, name, type in cols: assert col.name == bytes(name) @@ -166,6 +168,9 @@ class TestClient(KuduTestBase, unittest.TestCase): op['key'] = 1 op['int_val'] = 111 op['string_val'] = 'updated' + # Insert datetime without timezone specified, will be assumed + # to be UTC + op['unixtime_micros_val'] = datetime.datetime(2016, 10, 30, 10, 12) session.apply(op) op = table.new_upsert() @@ -178,8 +183,10 @@ class TestClient(KuduTestBase, unittest.TestCase): scanner = table.scanner().open() rows = dict((t[0], t) for t in scanner.read_all_tuples()) assert len(rows) == nrows - assert rows[1] == (1, 111, 'updated') - assert rows[2] == (2, 222, 'upserted') + assert rows[1] == (1, 111, 'updated', + datetime.datetime(2016, 10, 30, 10, 12) + .replace(tzinfo=utc)) + assert rows[2] == (2, 222, 'upserted', None) # Delete the rows we just wrote for i in range(nrows): http://git-wip-us.apache.org/repos/asf/kudu/blob/1c668742/python/kudu/tests/test_scanner.py ---------------------------------------------------------------------- diff --git a/python/kudu/tests/test_scanner.py b/python/kudu/tests/test_scanner.py index 3938d3c..3cfe80e 100644 --- a/python/kudu/tests/test_scanner.py +++ b/python/kudu/tests/test_scanner.py @@ -19,36 +19,17 @@ from __future__ import division from kudu.compat import unittest +from kudu.tests.util import TestScanBase from kudu.tests.common import KuduTestBase import kudu +import datetime -class TestScanner(KuduTestBase, unittest.TestCase): +class TestScanner(TestScanBase): @classmethod - def setUpClass(cls): - super(TestScanner, cls).setUpClass() - - cls.nrows = 100 - table = cls.client.table(cls.ex_table) - session = cls.client.new_session() - - tuples = [] - for i in range(cls.nrows): - op = table.new_insert() - tup = i, i * 2, 'hello_%d' % i if i % 2 == 0 else None - op['key'] = tup[0] - op['int_val'] = tup[1] - if i % 2 == 0: - op['string_val'] = tup[2] - elif i % 3 == 0: - op['string_val'] = None - session.apply(op) - tuples.append(tup) - session.flush() - - cls.table = table - cls.tuples = tuples + def setUpClass(self): + super(TestScanner, self).setUpClass() def setUp(self): pass @@ -161,3 +142,15 @@ class TestScanner(KuduTestBase, unittest.TestCase): tuples.extend(batch.as_tuples()) self.assertEqual(sorted(tuples), self.tuples[10:90]) + + def test_unixtime_micros(self): + """ + Test setting and getting unixtime_micros fields + """ + # Insert new rows + self.insert_new_unixtime_micros_rows() + + # Validate results + scanner = self.table.scanner() + scanner.set_fault_tolerant().open() + self.assertEqual(sorted(self.tuples), scanner.read_all_tuples()) http://git-wip-us.apache.org/repos/asf/kudu/blob/1c668742/python/kudu/tests/test_scantoken.py ---------------------------------------------------------------------- diff --git a/python/kudu/tests/test_scantoken.py b/python/kudu/tests/test_scantoken.py index d855569..415c949 100644 --- a/python/kudu/tests/test_scantoken.py +++ b/python/kudu/tests/test_scantoken.py @@ -17,9 +17,11 @@ # under the License. from kudu.compat import unittest +from kudu.tests.util import TestScanBase from kudu.tests.common import KuduTestBase import kudu from multiprocessing import Pool +import datetime def _get_scan_token_results(input): client = kudu.Client("{0}:{1}".format(input[1], input[2])) @@ -27,37 +29,12 @@ def _get_scan_token_results(input): scanner.open() return scanner.read_all_tuples() -class TestScanToken(KuduTestBase, unittest.TestCase): +class TestScanToken(TestScanBase): @classmethod def setUpClass(self): - """ - Stolen from the the test scanner given the similarity in - functionality. - """ super(TestScanToken, self).setUpClass() - self.nrows = 100 - table = self.client.table(self.ex_table) - session = self.client.new_session() - - tuples = [] - for i in range(self.nrows): - op = table.new_insert() - tup = i, i * 2, 'hello_%d' % i if i % 2 == 0 else None - op['key'] = tup[0] - op['int_val'] = tup[1] - if i % 2 == 0: - op['string_val'] = tup[2] - elif i % 3 == 0: - op['string_val'] = None - session.apply(op) - tuples.append(tup) - session.flush() - - self.table = table - self.tuples = tuples - def setUp(self): pass @@ -160,3 +137,25 @@ class TestScanToken(KuduTestBase, unittest.TestCase): tuples.extend(batch.as_tuples()) self.assertEqual(sorted(tuples), self.tuples[10:90]) + + def test_unixtime_micros(self): + """ + Test setting and getting unixtime_micros fields + """ + # Insert new rows + self.insert_new_unixtime_micros_rows() + + # Validate results + builder = self.table.scan_token_builder() + tokens = builder.set_fault_tolerant().build() + + tuples = [] + for token in tokens: + scanner = token.into_kudu_scanner() + scanner.open() + + while scanner.has_more_rows(): + batch = scanner.next_batch() + tuples.extend(batch.as_tuples()) + + self.assertEqual(sorted(self.tuples), tuples) http://git-wip-us.apache.org/repos/asf/kudu/blob/1c668742/python/kudu/tests/util.py ---------------------------------------------------------------------- diff --git a/python/kudu/tests/util.py b/python/kudu/tests/util.py new file mode 100644 index 0000000..39520e4 --- /dev/null +++ b/python/kudu/tests/util.py @@ -0,0 +1,97 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from kudu.compat import unittest +from kudu.tests.common import KuduTestBase +import kudu +import datetime +import pytz + + +class TestScanBase(KuduTestBase, unittest.TestCase): + + @classmethod + def setUpClass(self): + """ + Parent class for both the Scan tests and the + Scan Token tests + """ + super(TestScanBase, self).setUpClass() + + self.nrows = 100 + table = self.client.table(self.ex_table) + session = self.client.new_session() + + tuples = [] + for i in range(self.nrows): + op = table.new_insert() + tup = i, \ + i * 2, \ + 'hello_%d' % i if i % 2 == 0 else None, \ + datetime.datetime.utcnow().replace(tzinfo=pytz.utc) + op['key'] = tup[0] + op['int_val'] = tup[1] + if i % 2 == 0: + op['string_val'] = tup[2] + elif i % 3 == 0: + op['string_val'] = None + op['unixtime_micros_val'] = tup[3] + session.apply(op) + tuples.append(tup) + session.flush() + + self.table = table + self.tuples = tuples + + def setUp(self): + pass + + def insert_new_unixtime_micros_rows(self): + # Insert new rows + # Also test a timezone other than UTC to confirm that + # conversion to UTC is properly applied + eastern = datetime.datetime.now()\ + .replace(tzinfo=pytz.timezone("America/New_York")) + rows = [[100, "2016-09-14T23:11:32.432019"], + [101, ("2016-09-15", "%Y-%m-%d")], + [102, eastern]] + session = self.client.new_session() + for row in rows: + op = self.table.new_insert() + list = [row[0], + row[0]*2, + 'hello_%d' % row[0] if row[0] % 2 == 0 else None, + row[1]] + for i, val in enumerate(list): + op[i] = val + session.apply(op) + # convert datetime if needed to validate rows + if not isinstance(list[3], datetime.datetime): + if type(list[3]) is tuple: + list[3] = datetime.datetime \ + .strptime(list[3][0], list[3][1]) + else: + list[3] = datetime.datetime \ + .strptime(list[3], "%Y-%m-%dT%H:%M:%S.%f") + else: + # Convert Eastern Time datetime to UTC for confirmation + list[3] -= list[3].utcoffset() + # Apply timezone + list[3] = list[3].replace(tzinfo=pytz.utc) + self.tuples.append(tuple(list)) + session.flush() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kudu/blob/1c668742/python/kudu/util.py ---------------------------------------------------------------------- diff --git a/python/kudu/util.py b/python/kudu/util.py index a2b65cf..603e0e0 100644 --- a/python/kudu/util.py +++ b/python/kudu/util.py @@ -15,7 +15,80 @@ # specific language governing permissions and limitations # under the License. +import datetime +import six +from pytz import utc + + +def _epoch(): + """ + Return the unix epoch in datetime.datetime form for the + timezone provided. + + Returns + ------- + epoch : datetime.datetime + """ + return datetime.datetime.fromtimestamp(0, utc) + def indent(text, spaces): block = ' ' * spaces return '\n'.join(block + x for x in text.split('\n')) + + +def to_unixtime_micros(timestamp, format = "%Y-%m-%dT%H:%M:%S.%f"): + """ + Convert incoming datetime value to a integer representing + the number of microseconds since the unix epoch + + Parameters + --------- + timestamp : datetime.datetime or string + If a string is provided, a format must be provided as well. + Timezones provided in the string are not supported at this + time. UTC unless provided in a datetime object. + format : Required if a string timestamp is provided + Uses the C strftime() function, see strftime(3) documentation. + + Returns + ------- + int : Microseconds since unix epoch + """ + # Validate input + if isinstance(timestamp, datetime.datetime): + pass + elif isinstance(timestamp, six.string_types): + timestamp = datetime.datetime.strptime(timestamp, format) + else: + raise ValueError("Invalid timestamp type. " + + "You must provide a datetime.datetime or a string.") + + # If datetime has a valid timezone assigned, convert it to UTC. + if timestamp.tzinfo and timestamp.utcoffset(): + timestamp = timestamp.astimezone(utc) + # If datetime has no timezone, it is assumed to be UTC + else: + timestamp = timestamp.replace(tzinfo=utc) + + # Return the unixtime_micros for the provided datetime and locale + return int((timestamp - _epoch()).total_seconds() * 1000000) + +def from_unixtime_micros(unixtime_micros): + """ + Convert the input unixtime_micros value to a datetime in UTC. + + Parameters + ---------- + unixtime_micros : int + Number of microseconds since the unix epoch. + + Returns + ------- + timestamp : datetime.datetime in UTC + """ + if isinstance(unixtime_micros, int): + return _epoch() + datetime.timedelta(microseconds=unixtime_micros) + else: + raise ValueError("Invalid unixtime_micros value." + + "You must provide an integer value.") http://git-wip-us.apache.org/repos/asf/kudu/blob/1c668742/python/requirements.txt ---------------------------------------------------------------------- diff --git a/python/requirements.txt b/python/requirements.txt index 7d36257..72d6c68 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -3,3 +3,4 @@ cython >= 0.21 setuptools >= 0.8 six unittest2 +pytz http://git-wip-us.apache.org/repos/asf/kudu/blob/1c668742/python/setup.py ---------------------------------------------------------------------- diff --git a/python/setup.py b/python/setup.py index b3dcda4..91147a8 100644 --- a/python/setup.py +++ b/python/setup.py @@ -155,7 +155,7 @@ setup( }, setup_requires=['pytest-runner'], tests_require=['pytest', 'multiprocessing'], - install_requires=['cython >= 0.21'], + install_requires=['cython >= 0.21', 'pytz', 'six'], description=DESCRIPTION, long_description=LONG_DESCRIPTION, license='Apache License, Version 2.0',