This is an automated email from the ASF dual-hosted git repository. wesm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 7c7b09f ARROW-1643: [Python] Accept hdfs:// prefixes in parquet.read_table and attempt to connect to HDFS 7c7b09f is described below commit 7c7b09f6e52d8c582ef00d201ec4e783484d3180 Author: Ehsan Totoni <ehsan.tot...@intel.com> AuthorDate: Tue Mar 13 09:53:33 2018 -0400 ARROW-1643: [Python] Accept hdfs:// prefixes in parquet.read_table and attempt to connect to HDFS This patch enables pq.read_table, pq.write_table, and pq.ParquetFile to accept HDFS URI and connect to HDFS. Author: Ehsan Totoni <ehsan.tot...@intel.com> Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #1668 from ehsantn/ARROW-1643 and squashes the following commits: 91c3c7f6 <Wes McKinney> Close automatically-opened file handle in ParquetWriter.close rather than relying on gc cycle to run e93256e9 <Ehsan Totoni> extra blank line for flake8 fix 9a859b76 <Ehsan Totoni> add test for hdfs uri read of multiple files 72fd145e <Ehsan Totoni> remove schema check 60272283 <Ehsan Totoni> handle pathlib.Path for Parquet file names 46a4e23c <Ehsan Totoni> use IOError instead of FileNotFoundError for Python 2.7 compatibility 01d301cc <Wes McKinney> Fix unit test 9090e93c <Wes McKinney> Factor out string-to-file coercion f12711fc <Ehsan Totoni> ARROW-1643 Accept hdfs:// prefixes in parquet.read_table and attempt to connect to HDFS --- python/pyarrow/parquet.py | 87 +++++++++++++++++++++++++++++++++------ python/pyarrow/tests/test_hdfs.py | 71 +++++++++++++++++++++++++++----- 2 files changed, 135 insertions(+), 23 deletions(-) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 42c558b..fd9c740 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -21,6 +21,13 @@ import inspect import json import re import six +from six.moves.urllib.parse import urlparse +# pathlib might not be available in Python 2 +try: + import pathlib + _has_pathlib = True +except ImportError: + _has_pathlib = False import numpy as np @@ -53,6 +60,7 @@ class ParquetFile(object): """ def __init__(self, source, metadata=None, common_metadata=None): self.reader = ParquetReader() + source = _ensure_file(source) self.reader.open(source, metadata=metadata) self.common_metadata = common_metadata self._nested_paths_by_prefix = self._build_nested_paths() @@ -279,8 +287,20 @@ schema : arrow Schema self.schema_changed = False self.schema = schema + self.where = where + + # If we open a file using an implied filesystem, so it can be assured + # to be closed + self.file_handle = None + + if is_path(where): + fs = _get_fs_from_path(where) + sink = self.file_handle = fs.open(where, 'wb') + else: + sink = where + self.writer = _parquet.ParquetWriter( - where, schema, + sink, schema, version=version, compression=compression, use_dictionary=use_dictionary, @@ -310,6 +330,8 @@ schema : arrow Schema if self.is_open: self.writer.close() self.is_open = False + if self.file_handle is not None: + self.file_handle.close() def _get_pandas_index_columns(keyvalues): @@ -559,8 +581,9 @@ class ParquetPartitions(object): return self.levels[level].get_index(key) -def is_string(x): - return isinstance(x, six.string_types) +def is_path(x): + return (isinstance(x, six.string_types) + or (_has_pathlib and isinstance(x, pathlib.Path))) class ParquetManifest(object): @@ -569,7 +592,7 @@ class ParquetManifest(object): """ def __init__(self, dirpath, filesystem=None, pathsep='/', partition_scheme='hive'): - self.filesystem = filesystem or LocalFileSystem.get_instance() + self.filesystem = filesystem or _get_fs_from_path(dirpath) self.pathsep = pathsep self.dirpath = dirpath self.partition_scheme = partition_scheme @@ -692,7 +715,10 @@ class ParquetDataset(object): def __init__(self, path_or_paths, filesystem=None, schema=None, metadata=None, split_row_groups=False, validate_schema=True): if filesystem is None: - self.fs = LocalFileSystem.get_instance() + a_path = path_or_paths + if isinstance(a_path, list): + a_path = a_path[0] + self.fs = _get_fs_from_path(a_path) else: self.fs = _ensure_filesystem(filesystem) @@ -851,7 +877,7 @@ def _make_manifest(path_or_paths, fs, pathsep='/'): # Dask passes a directory as a list of length 1 path_or_paths = path_or_paths[0] - if is_string(path_or_paths) and fs.isdir(path_or_paths): + if is_path(path_or_paths) and fs.isdir(path_or_paths): manifest = ParquetManifest(path_or_paths, filesystem=fs, pathsep=fs.pathsep) common_metadata_path = manifest.common_metadata_path @@ -904,11 +930,11 @@ Returns def read_table(source, columns=None, nthreads=1, metadata=None, use_pandas_metadata=False): - if is_string(source): - fs = LocalFileSystem.get_instance() + if is_path(source): + fs = _get_fs_from_path(source) + if fs.isdir(source): - return fs.read_parquet(source, columns=columns, - metadata=metadata) + return fs.read_parquet(source, columns=columns, metadata=metadata) pf = ParquetFile(source, metadata=metadata) return pf.read(columns=columns, nthreads=nthreads, @@ -957,7 +983,7 @@ def write_table(table, where, row_group_size=None, version='1.0', **kwargs) as writer: writer.write_table(table, row_group_size=row_group_size) except Exception: - if isinstance(where, six.string_types): + if is_path(where): try: os.remove(where) except os.error: @@ -1026,7 +1052,7 @@ def write_to_dataset(table, root_path, partition_cols=None, ) if filesystem is None: - fs = LocalFileSystem.get_instance() + fs = _get_fs_from_path(root_path) else: fs = _ensure_filesystem(filesystem) @@ -1113,3 +1139,40 @@ def read_schema(where): schema : pyarrow.Schema """ return ParquetFile(where).schema.to_arrow_schema() + + +def _ensure_file(source): + if is_path(source): + fs = _get_fs_from_path(source) + try: + return fs.open(source) + except IOError as e: + raise lib.ArrowIOError("failed to open file {}, {}" + .format(source, e)) + elif not hasattr(source, 'seek'): + raise ValueError('Source does not appear file-like') + else: + return source + + +def _get_fs_from_path(path): + """ + return filesystem from path which could be an HDFS URI + """ + # input can be hdfs URI such as hdfs://host:port/myfile.parquet + if _has_pathlib and isinstance(path, pathlib.Path): + path = str(path) + parsed_uri = urlparse(path) + if parsed_uri.scheme == 'hdfs': + netloc_split = parsed_uri.netloc.split(':') + host = netloc_split[0] + if host == '': + host = 'default' + port = 0 + if len(netloc_split) == 2 and netloc_split[1].isnumeric(): + port = int(netloc_split[1]) + fs = pa.hdfs.connect(host=host, port=port) + else: + fs = LocalFileSystem.get_instance() + + return fs diff --git a/python/pyarrow/tests/test_hdfs.py b/python/pyarrow/tests/test_hdfs.py index 885272b..4840aee 100644 --- a/python/pyarrow/tests/test_hdfs.py +++ b/python/pyarrow/tests/test_hdfs.py @@ -272,19 +272,11 @@ class HdfsTestCases(object): assert result == data - @test_parquet.parquet - def test_read_multiple_parquet_files(self): + def _write_multiple_hdfs_pq_files(self, tmpdir): import pyarrow.parquet as pq - nfiles = 10 size = 5 - - tmpdir = pjoin(self.tmp_path, 'multi-parquet-' + guid()) - - self.hdfs.mkdir(tmpdir) - test_data = [] - paths = [] for i in range(nfiles): df = test_parquet._test_dataframe(size, seed=i) @@ -300,16 +292,61 @@ class HdfsTestCases(object): pq.write_table(table, f) test_data.append(table) - paths.append(path) - result = self.hdfs.read_parquet(tmpdir) expected = pa.concat_tables(test_data) + return expected + + @test_parquet.parquet + def test_read_multiple_parquet_files(self): + + tmpdir = pjoin(self.tmp_path, 'multi-parquet-' + guid()) + + self.hdfs.mkdir(tmpdir) + + expected = self._write_multiple_hdfs_pq_files(tmpdir) + result = self.hdfs.read_parquet(tmpdir) pdt.assert_frame_equal(result.to_pandas() .sort_values(by='index').reset_index(drop=True), expected.to_pandas()) @test_parquet.parquet + def test_read_multiple_parquet_files_with_uri(self): + import pyarrow.parquet as pq + + tmpdir = pjoin(self.tmp_path, 'multi-parquet-uri-' + guid()) + + self.hdfs.mkdir(tmpdir) + + expected = self._write_multiple_hdfs_pq_files(tmpdir) + path = _get_hdfs_uri(tmpdir) + result = pq.read_table(path) + + pdt.assert_frame_equal(result.to_pandas() + .sort_values(by='index').reset_index(drop=True), + expected.to_pandas()) + + @test_parquet.parquet + def test_read_write_parquet_files_with_uri(self): + import pyarrow.parquet as pq + + tmpdir = pjoin(self.tmp_path, 'uri-parquet-' + guid()) + self.hdfs.mkdir(tmpdir) + path = _get_hdfs_uri(pjoin(tmpdir, 'test.parquet')) + + size = 5 + df = test_parquet._test_dataframe(size, seed=0) + # Hack so that we don't have a dtype cast in v1 files + df['uint32'] = df['uint32'].astype(np.int64) + table = pa.Table.from_pandas(df, preserve_index=False) + + pq.write_table(table, path) + + result = pq.read_table(path).to_pandas() + + pdt.assert_frame_equal(result, df) + + @test_parquet.parquet def test_read_common_metadata_files(self): tmpdir = pjoin(self.tmp_path, 'common-metadata-' + guid()) self.hdfs.mkdir(tmpdir) @@ -357,3 +394,15 @@ class TestLibHdfs3(HdfsTestCases, unittest.TestCase): def check_driver(cls): if not pa.have_libhdfs3(): pytest.skip('No libhdfs3 available on system') + + +def _get_hdfs_uri(path): + host = os.environ.get('ARROW_HDFS_TEST_HOST', 'localhost') + try: + port = int(os.environ.get('ARROW_HDFS_TEST_PORT', 0)) + except ValueError: + raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not ' + 'an integer') + uri = "hdfs://{}:{}{}".format(host, port, path) + + return uri -- To stop receiving notification emails like this one, please contact w...@apache.org.