KUDU-1614 - [python] Enable Set/Get of unixtime_micros

Currently, the python client in Kudu does not support setting or
getting columns with the unixtime_micros type. This patch enables
this capability and includes a unit test. This patch also fixes
a minor bug with write operations using column indexes (KUDU-1615).
This fix is reflected in the unit test associated with this patch.

Change-Id: Id428cbd072b7de7a75e58b66e4de89acd381fdca
Reviewed-on: http://gerrit.cloudera.org:8080/4417
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <t...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/1c668742
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/1c668742
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/1c668742

Branch: refs/heads/master
Commit: 1c668742679bb00b5c851c80071c3232aa68441a
Parents: bad9101
Author: Jordan Birdsell <jordantbirds...@gmail.com>
Authored: Wed Sep 14 20:05:18 2016 -0400
Committer: Todd Lipcon <t...@apache.org>
Committed: Thu Sep 22 07:33:34 2016 +0000

----------------------------------------------------------------------
 python/kudu/client.pyx              | 58 +++++++++++++++++--
 python/kudu/libkudu_client.pxd      | 17 +++---
 python/kudu/tests/common.py         |  1 +
 python/kudu/tests/test_client.py    | 13 ++++-
 python/kudu/tests/test_scanner.py   | 41 ++++++--------
 python/kudu/tests/test_scantoken.py | 51 ++++++++---------
 python/kudu/tests/util.py           | 97 ++++++++++++++++++++++++++++++++
 python/kudu/util.py                 | 73 ++++++++++++++++++++++++
 python/requirements.txt             |  1 +
 python/setup.py                     |  2 +-
 10 files changed, 287 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/1c668742/python/kudu/client.pyx
----------------------------------------------------------------------
diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx
index 5394410..800a620 100644
--- a/python/kudu/client.pyx
+++ b/python/kudu/client.pyx
@@ -25,15 +25,29 @@ cimport cpython
 from cython.operator cimport dereference as deref
 
 from libkudu_client cimport *
-
 from kudu.compat import tobytes, frombytes
 from kudu.schema cimport Schema, ColumnSchema
 from kudu.errors cimport check_status
+from kudu.util import to_unixtime_micros, from_unixtime_micros
 from errors import KuduException
 
 import six
 
 
+cdef dict _type_names = {
+    KUDU_INT8 : "KUDU_INT8",
+    KUDU_INT16 : "KUDU_INT16",
+    KUDU_INT32 : "KUDU_INT32",
+    KUDU_INT64 : "KUDU_INT64",
+    KUDU_STRING : "KUDU_STRING",
+    KUDU_BOOL : "KUDU_BOOL",
+    KUDU_FLOAT : "KUDU_FLOAT",
+    KUDU_DOUBLE : "KUDU_DOUBLE",
+    KUDU_BINARY : "KUDU_BINARY",
+    KUDU_UNIXTIME_MICROS : "KUDU_UNIXTIME_MICROS"
+}
+
+
 cdef class TimeDelta:
     """
     Wrapper interface for kudu MonoDelta class, which is used to specify
@@ -513,6 +527,14 @@ cdef class StringVal(RawValue):
     def __dealloc__(self):
         del self.val
 
+cdef class UnixtimeMicrosVal(RawValue):
+    cdef:
+        int64_t val
+
+    def __cinit__(self, obj):
+        self.val = to_unixtime_micros(obj)
+        self.data = &self.val
+
 #----------------------------------------------------------------------
 cdef class TabletServer:
     """
@@ -1030,6 +1052,11 @@ cdef class Row:
         return cpython.PyBytes_FromStringAndSize(<char*> val.mutable_data(),
                                                  val.size())
 
+    cdef inline get_unixtime_micros(self, int i):
+        cdef int64_t val
+        check_status(self.row.GetUnixTimeMicros(i, &val))
+        return val
+
     cdef inline get_slot(self, int i):
         cdef:
             Status s
@@ -1051,8 +1078,11 @@ cdef class Row:
             return self.get_float(i)
         elif t == KUDU_STRING:
             return frombytes(self.get_string(i))
+        elif t == KUDU_UNIXTIME_MICROS:
+            return from_unixtime_micros(self.get_unixtime_micros(i))
         else:
-            raise TypeError(t)
+            raise TypeError("Cannot get kudu type <{0}>"
+                                .format(_type_names[t]))
 
     cdef inline bint is_null(self, int i):
         return self.row.IsNull(i)
@@ -1712,6 +1742,11 @@ cdef class PartialRow:
     cpdef set_field(self, key, value):
         cdef:
             int i = self.table.schema.get_loc(key)
+
+        self.set_loc(i, value)
+
+    cpdef set_loc(self, int i, value):
+        cdef:
             DataType t = self.table.schema.loc_type(i)
             cdef Slice* slc
 
@@ -1746,9 +1781,18 @@ cdef class PartialRow:
             # Not safe to take a reference to PyBytes data for now
             self.row.SetStringCopy(i, deref(slc))
             del slc
-
-    cpdef set_loc(self, int i, value):
-        pass
+        elif t == KUDU_UNIXTIME_MICROS:
+            # String with custom format
+            #  eg: ("2016-01-01", "%Y-%m-%d")
+            if type(value) is tuple:
+                self.row.SetUnixTimeMicros(i, <int64_t>
+                    to_unixtime_micros(value[0], value[1]))
+                # datetime.datetime input or string with default format
+            else:
+                self.row.SetUnixTimeMicros(i, <int64_t>
+                    to_unixtime_micros(value))
+        else:
+            raise TypeError("Cannot set kudu type 
<{0}>.".format(_type_names[t]))
 
     cpdef set_field_null(self, key):
         pass
@@ -1839,5 +1883,7 @@ cdef inline cast_pyvalue(DataType t, object o):
         return FloatVal(o)
     elif t == KUDU_STRING:
         return StringVal(o)
+    elif t == KUDU_UNIXTIME_MICROS:
+        return UnixtimeMicrosVal(o)
     else:
-        raise TypeError(t)
+        raise TypeError("Cannot cast kudu type <{0}>".format(_type_names[t]))

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c668742/python/kudu/libkudu_client.pxd
----------------------------------------------------------------------
diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd
index 546b2be..11bc78d 100644
--- a/python/kudu/libkudu_client.pxd
+++ b/python/kudu/libkudu_client.pxd
@@ -206,21 +206,24 @@ cdef extern from "kudu/client/scan_batch.h" namespace 
"kudu::client" nogil:
         # the value is unset, or the value is NULL. Otherwise they return
         # the current set value in *val.
         Status GetBool(Slice& col_name, c_bool* val)
+        Status GetBool(int col_idx, c_bool* val)
 
         Status GetInt8(Slice& col_name, int8_t* val)
+        Status GetInt8(int col_idx, int8_t* val)
+
         Status GetInt16(Slice& col_name, int16_t* val)
+        Status GetInt16(int col_idx, int16_t* val)
+
         Status GetInt32(Slice& col_name, int32_t* val)
+        Status GetInt32(int col_idx, int32_t* val)
+
         Status GetInt64(Slice& col_name, int64_t* val)
+        Status GetInt64(int col_idx, int64_t* val)
 
         Status GetUnixTimeMicros(const Slice& col_name,
                             int64_t* micros_since_utc_epoch)
-
-        Status GetBool(int col_idx, c_bool* val)
-
-        Status GetInt8(int col_idx, int8_t* val)
-        Status GetInt16(int col_idx, int16_t* val)
-        Status GetInt32(int col_idx, int32_t* val)
-        Status GetInt64(int col_idx, int64_t* val)
+        Status GetUnixTimeMicros(int col_idx,
+                            int64_t* micros_since_utc_epoch)
 
         Status GetString(Slice& col_name, Slice* val)
         Status GetString(int col_idx, Slice* val)

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c668742/python/kudu/tests/common.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/common.py b/python/kudu/tests/common.py
index 75a6ca6..f39c074 100644
--- a/python/kudu/tests/common.py
+++ b/python/kudu/tests/common.py
@@ -164,6 +164,7 @@ class KuduTestBase(object):
         builder.add_column('key', kudu.int32, nullable=False)
         builder.add_column('int_val', kudu.int32)
         builder.add_column('string_val', kudu.string)
+        builder.add_column('unixtime_micros_val', kudu.unixtime_micros)
         builder.set_primary_keys(['key'])
 
         return builder.build()

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c668742/python/kudu/tests/test_client.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py
index 9dcb0d7..e900fd7 100644
--- a/python/kudu/tests/test_client.py
+++ b/python/kudu/tests/test_client.py
@@ -20,6 +20,8 @@ from kudu.compat import unittest, long
 from kudu.tests.common import KuduTestBase
 from kudu.client import Partitioning
 import kudu
+import datetime
+from pytz import utc
 
 
 class TestClient(KuduTestBase, unittest.TestCase):
@@ -37,7 +39,7 @@ class TestClient(KuduTestBase, unittest.TestCase):
         table = self.client.table(self.ex_table)
         cols = [(table['key'], 'key', 'int32'),
                 (table[1], 'int_val', 'int32'),
-                (table[-1], 'string_val', 'string')]
+                (table[-1], 'unixtime_micros_val', 'unixtime_micros')]
 
         for col, name, type in cols:
             assert col.name == bytes(name)
@@ -166,6 +168,9 @@ class TestClient(KuduTestBase, unittest.TestCase):
         op['key'] = 1
         op['int_val'] = 111
         op['string_val'] = 'updated'
+        # Insert datetime without timezone specified, will be assumed
+        # to be UTC
+        op['unixtime_micros_val'] = datetime.datetime(2016, 10, 30, 10, 12)
         session.apply(op)
 
         op = table.new_upsert()
@@ -178,8 +183,10 @@ class TestClient(KuduTestBase, unittest.TestCase):
         scanner = table.scanner().open()
         rows = dict((t[0], t) for t in scanner.read_all_tuples())
         assert len(rows) == nrows
-        assert rows[1] == (1, 111, 'updated')
-        assert rows[2] == (2, 222, 'upserted')
+        assert rows[1] == (1, 111, 'updated',
+                           datetime.datetime(2016, 10, 30, 10, 12)
+                           .replace(tzinfo=utc))
+        assert rows[2] == (2, 222, 'upserted', None)
 
         # Delete the rows we just wrote
         for i in range(nrows):

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c668742/python/kudu/tests/test_scanner.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scanner.py 
b/python/kudu/tests/test_scanner.py
index 3938d3c..3cfe80e 100644
--- a/python/kudu/tests/test_scanner.py
+++ b/python/kudu/tests/test_scanner.py
@@ -19,36 +19,17 @@
 from __future__ import division
 
 from kudu.compat import unittest
+from kudu.tests.util import TestScanBase
 from kudu.tests.common import KuduTestBase
 import kudu
+import datetime
 
 
-class TestScanner(KuduTestBase, unittest.TestCase):
+class TestScanner(TestScanBase):
 
     @classmethod
-    def setUpClass(cls):
-        super(TestScanner, cls).setUpClass()
-
-        cls.nrows = 100
-        table = cls.client.table(cls.ex_table)
-        session = cls.client.new_session()
-
-        tuples = []
-        for i in range(cls.nrows):
-            op = table.new_insert()
-            tup = i, i * 2, 'hello_%d' % i if i % 2 == 0 else None
-            op['key'] = tup[0]
-            op['int_val'] = tup[1]
-            if i % 2 == 0:
-                op['string_val'] = tup[2]
-            elif i % 3 == 0:
-                op['string_val'] = None
-            session.apply(op)
-            tuples.append(tup)
-        session.flush()
-
-        cls.table = table
-        cls.tuples = tuples
+    def setUpClass(self):
+        super(TestScanner, self).setUpClass()
 
     def setUp(self):
         pass
@@ -161,3 +142,15 @@ class TestScanner(KuduTestBase, unittest.TestCase):
             tuples.extend(batch.as_tuples())
 
         self.assertEqual(sorted(tuples), self.tuples[10:90])
+
+    def test_unixtime_micros(self):
+        """
+        Test setting and getting unixtime_micros fields
+        """
+        # Insert new rows
+        self.insert_new_unixtime_micros_rows()
+
+        # Validate results
+        scanner = self.table.scanner()
+        scanner.set_fault_tolerant().open()
+        self.assertEqual(sorted(self.tuples), scanner.read_all_tuples())

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c668742/python/kudu/tests/test_scantoken.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scantoken.py 
b/python/kudu/tests/test_scantoken.py
index d855569..415c949 100644
--- a/python/kudu/tests/test_scantoken.py
+++ b/python/kudu/tests/test_scantoken.py
@@ -17,9 +17,11 @@
 # under the License.
 
 from kudu.compat import unittest
+from kudu.tests.util import TestScanBase
 from kudu.tests.common import KuduTestBase
 import kudu
 from multiprocessing import Pool
+import datetime
 
 def _get_scan_token_results(input):
     client = kudu.Client("{0}:{1}".format(input[1], input[2]))
@@ -27,37 +29,12 @@ def _get_scan_token_results(input):
     scanner.open()
     return scanner.read_all_tuples()
 
-class TestScanToken(KuduTestBase, unittest.TestCase):
+class TestScanToken(TestScanBase):
 
     @classmethod
     def setUpClass(self):
-        """
-        Stolen from the the test scanner given the similarity in
-        functionality.
-        """
         super(TestScanToken, self).setUpClass()
 
-        self.nrows = 100
-        table = self.client.table(self.ex_table)
-        session = self.client.new_session()
-
-        tuples = []
-        for i in range(self.nrows):
-            op = table.new_insert()
-            tup = i, i * 2, 'hello_%d' % i if i % 2 == 0 else None
-            op['key'] = tup[0]
-            op['int_val'] = tup[1]
-            if i % 2 == 0:
-                op['string_val'] = tup[2]
-            elif i % 3 == 0:
-                op['string_val'] = None
-            session.apply(op)
-            tuples.append(tup)
-        session.flush()
-
-        self.table = table
-        self.tuples = tuples
-
     def setUp(self):
         pass
 
@@ -160,3 +137,25 @@ class TestScanToken(KuduTestBase, unittest.TestCase):
                 tuples.extend(batch.as_tuples())
 
         self.assertEqual(sorted(tuples), self.tuples[10:90])
+
+    def test_unixtime_micros(self):
+        """
+        Test setting and getting unixtime_micros fields
+        """
+        # Insert new rows
+        self.insert_new_unixtime_micros_rows()
+
+        # Validate results
+        builder = self.table.scan_token_builder()
+        tokens = builder.set_fault_tolerant().build()
+
+        tuples = []
+        for token in tokens:
+            scanner = token.into_kudu_scanner()
+            scanner.open()
+
+            while scanner.has_more_rows():
+                batch = scanner.next_batch()
+                tuples.extend(batch.as_tuples())
+
+        self.assertEqual(sorted(self.tuples), tuples)

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c668742/python/kudu/tests/util.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/util.py b/python/kudu/tests/util.py
new file mode 100644
index 0000000..39520e4
--- /dev/null
+++ b/python/kudu/tests/util.py
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from kudu.compat import unittest
+from kudu.tests.common import KuduTestBase
+import kudu
+import datetime
+import pytz
+
+
+class TestScanBase(KuduTestBase, unittest.TestCase):
+
+    @classmethod
+    def setUpClass(self):
+        """
+        Parent class for both the Scan tests and the
+        Scan Token tests
+        """
+        super(TestScanBase, self).setUpClass()
+
+        self.nrows = 100
+        table = self.client.table(self.ex_table)
+        session = self.client.new_session()
+
+        tuples = []
+        for i in range(self.nrows):
+            op = table.new_insert()
+            tup = i, \
+                  i * 2, \
+                  'hello_%d' % i if i % 2 == 0 else None, \
+                  datetime.datetime.utcnow().replace(tzinfo=pytz.utc)
+            op['key'] = tup[0]
+            op['int_val'] = tup[1]
+            if i % 2 == 0:
+                op['string_val'] = tup[2]
+            elif i % 3 == 0:
+                op['string_val'] = None
+            op['unixtime_micros_val'] = tup[3]
+            session.apply(op)
+            tuples.append(tup)
+        session.flush()
+
+        self.table = table
+        self.tuples = tuples
+
+    def setUp(self):
+        pass
+
+    def insert_new_unixtime_micros_rows(self):
+        # Insert new rows
+        # Also test a timezone other than UTC to confirm that
+        # conversion to UTC is properly applied
+        eastern = datetime.datetime.now()\
+            .replace(tzinfo=pytz.timezone("America/New_York"))
+        rows = [[100, "2016-09-14T23:11:32.432019"],
+                [101, ("2016-09-15", "%Y-%m-%d")],
+                [102, eastern]]
+        session = self.client.new_session()
+        for row in rows:
+            op = self.table.new_insert()
+            list = [row[0],
+                    row[0]*2,
+                    'hello_%d' % row[0] if row[0] % 2 == 0 else None,
+                    row[1]]
+            for i, val in enumerate(list):
+                op[i] = val
+            session.apply(op)
+            # convert datetime if needed to validate rows
+            if not isinstance(list[3], datetime.datetime):
+                if type(list[3]) is tuple:
+                    list[3] = datetime.datetime \
+                        .strptime(list[3][0], list[3][1])
+                else:
+                    list[3] = datetime.datetime \
+                        .strptime(list[3], "%Y-%m-%dT%H:%M:%S.%f")
+            else:
+                # Convert Eastern Time datetime to UTC for confirmation
+                list[3] -= list[3].utcoffset()
+            # Apply timezone
+            list[3] = list[3].replace(tzinfo=pytz.utc)
+            self.tuples.append(tuple(list))
+        session.flush()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c668742/python/kudu/util.py
----------------------------------------------------------------------
diff --git a/python/kudu/util.py b/python/kudu/util.py
index a2b65cf..603e0e0 100644
--- a/python/kudu/util.py
+++ b/python/kudu/util.py
@@ -15,7 +15,80 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import datetime
+import six
+from pytz import utc
+
+
+def _epoch():
+    """
+    Return the unix epoch in datetime.datetime form for the
+    timezone provided.
+
+    Returns
+    -------
+    epoch : datetime.datetime
+    """
+    return datetime.datetime.fromtimestamp(0, utc)
+
 
 def indent(text, spaces):
     block = ' ' * spaces
     return '\n'.join(block + x for x in text.split('\n'))
+
+
+def to_unixtime_micros(timestamp, format = "%Y-%m-%dT%H:%M:%S.%f"):
+    """
+    Convert incoming datetime value to a integer representing
+    the number of microseconds since the unix epoch
+
+    Parameters
+    ---------
+    timestamp : datetime.datetime or string
+      If a string is provided, a format must be provided as well.
+      Timezones provided in the string are not supported at this
+      time. UTC unless provided in a datetime object.
+    format : Required if a string timestamp is provided
+      Uses the C strftime() function, see strftime(3) documentation.
+
+    Returns
+    -------
+    int : Microseconds since unix epoch
+    """
+    # Validate input
+    if isinstance(timestamp, datetime.datetime):
+        pass
+    elif isinstance(timestamp, six.string_types):
+        timestamp = datetime.datetime.strptime(timestamp, format)
+    else:
+        raise ValueError("Invalid timestamp type. " +
+                         "You must provide a datetime.datetime or a string.")
+
+    # If datetime has a valid timezone assigned, convert it to UTC.
+    if timestamp.tzinfo and timestamp.utcoffset():
+        timestamp = timestamp.astimezone(utc)
+    # If datetime has no timezone, it is assumed to be UTC
+    else:
+        timestamp = timestamp.replace(tzinfo=utc)
+
+    # Return the unixtime_micros for the provided datetime and locale
+    return int((timestamp - _epoch()).total_seconds() * 1000000)
+
+def from_unixtime_micros(unixtime_micros):
+    """
+    Convert the input unixtime_micros value to a datetime in UTC.
+
+    Parameters
+    ----------
+    unixtime_micros : int
+      Number of microseconds since the unix epoch.
+
+    Returns
+    -------
+    timestamp : datetime.datetime in UTC
+    """
+    if isinstance(unixtime_micros, int):
+        return _epoch() + datetime.timedelta(microseconds=unixtime_micros)
+    else:
+        raise ValueError("Invalid unixtime_micros value." +
+                         "You must provide an integer value.")

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c668742/python/requirements.txt
----------------------------------------------------------------------
diff --git a/python/requirements.txt b/python/requirements.txt
index 7d36257..72d6c68 100644
--- a/python/requirements.txt
+++ b/python/requirements.txt
@@ -3,3 +3,4 @@ cython >= 0.21
 setuptools >= 0.8
 six
 unittest2
+pytz

http://git-wip-us.apache.org/repos/asf/kudu/blob/1c668742/python/setup.py
----------------------------------------------------------------------
diff --git a/python/setup.py b/python/setup.py
index b3dcda4..91147a8 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -155,7 +155,7 @@ setup(
     },
     setup_requires=['pytest-runner'],
     tests_require=['pytest', 'multiprocessing'],
-    install_requires=['cython >= 0.21'],
+    install_requires=['cython >= 0.21', 'pytz', 'six'],
     description=DESCRIPTION,
     long_description=LONG_DESCRIPTION,
     license='Apache License, Version 2.0',

Reply via email to