This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c7b09f  ARROW-1643: [Python] Accept hdfs:// prefixes in 
parquet.read_table and attempt to connect to HDFS
7c7b09f is described below

commit 7c7b09f6e52d8c582ef00d201ec4e783484d3180
Author: Ehsan Totoni <ehsan.tot...@intel.com>
AuthorDate: Tue Mar 13 09:53:33 2018 -0400

    ARROW-1643: [Python] Accept hdfs:// prefixes in parquet.read_table and 
attempt to connect to HDFS
    
    This patch enables pq.read_table, pq.write_table, and pq.ParquetFile to 
accept HDFS URI and connect to HDFS.
    
    Author: Ehsan Totoni <ehsan.tot...@intel.com>
    Author: Wes McKinney <wes.mckin...@twosigma.com>
    
    Closes #1668 from ehsantn/ARROW-1643 and squashes the following commits:
    
    91c3c7f6 <Wes McKinney> Close automatically-opened file handle in 
ParquetWriter.close rather than relying on gc cycle to run
    e93256e9 <Ehsan Totoni> extra blank line for flake8 fix
    9a859b76 <Ehsan Totoni> add test for hdfs uri read of multiple files
    72fd145e <Ehsan Totoni> remove schema check
    60272283 <Ehsan Totoni> handle pathlib.Path for Parquet file names
    46a4e23c <Ehsan Totoni> use IOError instead of FileNotFoundError for Python 
2.7 compatibility
    01d301cc <Wes McKinney> Fix unit test
    9090e93c <Wes McKinney> Factor out string-to-file coercion
    f12711fc <Ehsan Totoni> ARROW-1643  Accept hdfs:// prefixes in 
parquet.read_table and attempt to connect to HDFS
---
 python/pyarrow/parquet.py         | 87 +++++++++++++++++++++++++++++++++------
 python/pyarrow/tests/test_hdfs.py | 71 +++++++++++++++++++++++++++-----
 2 files changed, 135 insertions(+), 23 deletions(-)

diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 42c558b..fd9c740 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -21,6 +21,13 @@ import inspect
 import json
 import re
 import six
+from six.moves.urllib.parse import urlparse
+# pathlib might not be available in Python 2
+try:
+    import pathlib
+    _has_pathlib = True
+except ImportError:
+    _has_pathlib = False
 
 import numpy as np
 
@@ -53,6 +60,7 @@ class ParquetFile(object):
     """
     def __init__(self, source, metadata=None, common_metadata=None):
         self.reader = ParquetReader()
+        source = _ensure_file(source)
         self.reader.open(source, metadata=metadata)
         self.common_metadata = common_metadata
         self._nested_paths_by_prefix = self._build_nested_paths()
@@ -279,8 +287,20 @@ schema : arrow Schema
             self.schema_changed = False
 
         self.schema = schema
+        self.where = where
+
+        # If we open a file using an implied filesystem, so it can be assured
+        # to be closed
+        self.file_handle = None
+
+        if is_path(where):
+            fs = _get_fs_from_path(where)
+            sink = self.file_handle = fs.open(where, 'wb')
+        else:
+            sink = where
+
         self.writer = _parquet.ParquetWriter(
-            where, schema,
+            sink, schema,
             version=version,
             compression=compression,
             use_dictionary=use_dictionary,
@@ -310,6 +330,8 @@ schema : arrow Schema
         if self.is_open:
             self.writer.close()
             self.is_open = False
+        if self.file_handle is not None:
+            self.file_handle.close()
 
 
 def _get_pandas_index_columns(keyvalues):
@@ -559,8 +581,9 @@ class ParquetPartitions(object):
         return self.levels[level].get_index(key)
 
 
-def is_string(x):
-    return isinstance(x, six.string_types)
+def is_path(x):
+    return (isinstance(x, six.string_types)
+            or (_has_pathlib and isinstance(x, pathlib.Path)))
 
 
 class ParquetManifest(object):
@@ -569,7 +592,7 @@ class ParquetManifest(object):
     """
     def __init__(self, dirpath, filesystem=None, pathsep='/',
                  partition_scheme='hive'):
-        self.filesystem = filesystem or LocalFileSystem.get_instance()
+        self.filesystem = filesystem or _get_fs_from_path(dirpath)
         self.pathsep = pathsep
         self.dirpath = dirpath
         self.partition_scheme = partition_scheme
@@ -692,7 +715,10 @@ class ParquetDataset(object):
     def __init__(self, path_or_paths, filesystem=None, schema=None,
                  metadata=None, split_row_groups=False, validate_schema=True):
         if filesystem is None:
-            self.fs = LocalFileSystem.get_instance()
+            a_path = path_or_paths
+            if isinstance(a_path, list):
+                a_path = a_path[0]
+            self.fs = _get_fs_from_path(a_path)
         else:
             self.fs = _ensure_filesystem(filesystem)
 
@@ -851,7 +877,7 @@ def _make_manifest(path_or_paths, fs, pathsep='/'):
         # Dask passes a directory as a list of length 1
         path_or_paths = path_or_paths[0]
 
-    if is_string(path_or_paths) and fs.isdir(path_or_paths):
+    if is_path(path_or_paths) and fs.isdir(path_or_paths):
         manifest = ParquetManifest(path_or_paths, filesystem=fs,
                                    pathsep=fs.pathsep)
         common_metadata_path = manifest.common_metadata_path
@@ -904,11 +930,11 @@ Returns
 
 def read_table(source, columns=None, nthreads=1, metadata=None,
                use_pandas_metadata=False):
-    if is_string(source):
-        fs = LocalFileSystem.get_instance()
+    if is_path(source):
+        fs = _get_fs_from_path(source)
+
         if fs.isdir(source):
-            return fs.read_parquet(source, columns=columns,
-                                   metadata=metadata)
+            return fs.read_parquet(source, columns=columns, metadata=metadata)
 
     pf = ParquetFile(source, metadata=metadata)
     return pf.read(columns=columns, nthreads=nthreads,
@@ -957,7 +983,7 @@ def write_table(table, where, row_group_size=None, 
version='1.0',
                 **kwargs) as writer:
             writer.write_table(table, row_group_size=row_group_size)
     except Exception:
-        if isinstance(where, six.string_types):
+        if is_path(where):
             try:
                 os.remove(where)
             except os.error:
@@ -1026,7 +1052,7 @@ def write_to_dataset(table, root_path, 
partition_cols=None,
     )
 
     if filesystem is None:
-        fs = LocalFileSystem.get_instance()
+        fs = _get_fs_from_path(root_path)
     else:
         fs = _ensure_filesystem(filesystem)
 
@@ -1113,3 +1139,40 @@ def read_schema(where):
     schema : pyarrow.Schema
     """
     return ParquetFile(where).schema.to_arrow_schema()
+
+
+def _ensure_file(source):
+    if is_path(source):
+        fs = _get_fs_from_path(source)
+        try:
+            return fs.open(source)
+        except IOError as e:
+            raise lib.ArrowIOError("failed to open file {}, {}"
+                                   .format(source, e))
+    elif not hasattr(source, 'seek'):
+        raise ValueError('Source does not appear file-like')
+    else:
+        return source
+
+
+def _get_fs_from_path(path):
+    """
+    return filesystem from path which could be an HDFS URI
+    """
+    # input can be hdfs URI such as hdfs://host:port/myfile.parquet
+    if _has_pathlib and isinstance(path, pathlib.Path):
+        path = str(path)
+    parsed_uri = urlparse(path)
+    if parsed_uri.scheme == 'hdfs':
+        netloc_split = parsed_uri.netloc.split(':')
+        host = netloc_split[0]
+        if host == '':
+            host = 'default'
+        port = 0
+        if len(netloc_split) == 2 and netloc_split[1].isnumeric():
+            port = int(netloc_split[1])
+        fs = pa.hdfs.connect(host=host, port=port)
+    else:
+        fs = LocalFileSystem.get_instance()
+
+    return fs
diff --git a/python/pyarrow/tests/test_hdfs.py 
b/python/pyarrow/tests/test_hdfs.py
index 885272b..4840aee 100644
--- a/python/pyarrow/tests/test_hdfs.py
+++ b/python/pyarrow/tests/test_hdfs.py
@@ -272,19 +272,11 @@ class HdfsTestCases(object):
 
         assert result == data
 
-    @test_parquet.parquet
-    def test_read_multiple_parquet_files(self):
+    def _write_multiple_hdfs_pq_files(self, tmpdir):
         import pyarrow.parquet as pq
-
         nfiles = 10
         size = 5
-
-        tmpdir = pjoin(self.tmp_path, 'multi-parquet-' + guid())
-
-        self.hdfs.mkdir(tmpdir)
-
         test_data = []
-        paths = []
         for i in range(nfiles):
             df = test_parquet._test_dataframe(size, seed=i)
 
@@ -300,16 +292,61 @@ class HdfsTestCases(object):
                 pq.write_table(table, f)
 
             test_data.append(table)
-            paths.append(path)
 
-        result = self.hdfs.read_parquet(tmpdir)
         expected = pa.concat_tables(test_data)
+        return expected
+
+    @test_parquet.parquet
+    def test_read_multiple_parquet_files(self):
+
+        tmpdir = pjoin(self.tmp_path, 'multi-parquet-' + guid())
+
+        self.hdfs.mkdir(tmpdir)
+
+        expected = self._write_multiple_hdfs_pq_files(tmpdir)
+        result = self.hdfs.read_parquet(tmpdir)
 
         pdt.assert_frame_equal(result.to_pandas()
                                .sort_values(by='index').reset_index(drop=True),
                                expected.to_pandas())
 
     @test_parquet.parquet
+    def test_read_multiple_parquet_files_with_uri(self):
+        import pyarrow.parquet as pq
+
+        tmpdir = pjoin(self.tmp_path, 'multi-parquet-uri-' + guid())
+
+        self.hdfs.mkdir(tmpdir)
+
+        expected = self._write_multiple_hdfs_pq_files(tmpdir)
+        path = _get_hdfs_uri(tmpdir)
+        result = pq.read_table(path)
+
+        pdt.assert_frame_equal(result.to_pandas()
+                               .sort_values(by='index').reset_index(drop=True),
+                               expected.to_pandas())
+
+    @test_parquet.parquet
+    def test_read_write_parquet_files_with_uri(self):
+        import pyarrow.parquet as pq
+
+        tmpdir = pjoin(self.tmp_path, 'uri-parquet-' + guid())
+        self.hdfs.mkdir(tmpdir)
+        path = _get_hdfs_uri(pjoin(tmpdir, 'test.parquet'))
+
+        size = 5
+        df = test_parquet._test_dataframe(size, seed=0)
+        # Hack so that we don't have a dtype cast in v1 files
+        df['uint32'] = df['uint32'].astype(np.int64)
+        table = pa.Table.from_pandas(df, preserve_index=False)
+
+        pq.write_table(table, path)
+
+        result = pq.read_table(path).to_pandas()
+
+        pdt.assert_frame_equal(result, df)
+
+    @test_parquet.parquet
     def test_read_common_metadata_files(self):
         tmpdir = pjoin(self.tmp_path, 'common-metadata-' + guid())
         self.hdfs.mkdir(tmpdir)
@@ -357,3 +394,15 @@ class TestLibHdfs3(HdfsTestCases, unittest.TestCase):
     def check_driver(cls):
         if not pa.have_libhdfs3():
             pytest.skip('No libhdfs3 available on system')
+
+
+def _get_hdfs_uri(path):
+    host = os.environ.get('ARROW_HDFS_TEST_HOST', 'localhost')
+    try:
+        port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 0))
+    except ValueError:
+        raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not '
+                         'an integer')
+    uri = "hdfs://{}:{}{}".format(host, port, path)
+
+    return uri

-- 
To stop receiving notification emails like this one, please contact
w...@apache.org.

Reply via email to