[jira] [Created] (ARROW-15716) [Dataset][Python] Parse a list of fragment paths to gather filters
Lance Dacey created ARROW-15716: --- Summary: [Dataset][Python] Parse a list of fragment paths to gather filters Key: ARROW-15716 URL: https://issues.apache.org/jira/browse/ARROW-15716 Project: Apache Arrow Issue Type: Wish Affects Versions: 7.0.0 Reporter: Lance Dacey Is it possible for partitioning.parse() to be updated to parse a list of paths instead of just a single path? I am passing the .paths from file_visitor to downstream tasks to process data which was recently saved, but I can run into problems with this if I overwrite data with delete_matching in order to consolidate small files since the paths won't exist. Here is the output of my current approach to use filters instead of reading the paths directly: {code:java} # Fragments saved during write_dataset ['dev/dataset/fragments/date_id=20210813/data-0.parquet', 'dev/dataset/fragments/date_id=20210114/data-2.parquet', 'dev/dataset/fragments/date_id=20210114/data-1.parquet', 'dev/dataset/fragments/date_id=20210114/data-0.parquet'] # Run partitioning.parse() on each fragment [, , , ] # Format those expressions into a list of tuples [('date_id', 'in', [20210114, 20210813])] # Convert to an expression which is used as a filter in .to_table() is_in(date_id, {value_set=int64:[ 20210114, 20210813 ], skip_nulls=false}) {code} And here is how I am creating the filter from a list of .paths (perhaps there is a better way?): {code:python} partitioning = ds.HivePartitioning(partition_schema) expressions = [] for file in paths: expressions.append(partitioning.parse(file)) values = [] filters = [] for expression in expressions: partitions = ds._get_partition_keys(expression) if len(partitions.keys()) > 1: element = [(k, "==", v) for k, v in partitions.items()] if element not in filters: filters.append(element) else: for k, v in partitions.items(): if v not in values: values.append(v) filters = [(k, "in", sorted(values))] filt_exp = pa.parquet._filters_to_expression(filters) dataset.to_table(filter=filt_exp) {code} My hope would be to do something like filt_exp = partitioning.parse(paths) which would return a dataset expression. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (ARROW-15474) [Python] Possibility of a table.drop_duplicates() function?
Lance Dacey created ARROW-15474: --- Summary: [Python] Possibility of a table.drop_duplicates() function? Key: ARROW-15474 URL: https://issues.apache.org/jira/browse/ARROW-15474 Project: Apache Arrow Issue Type: Wish Affects Versions: 6.0.1 Reporter: Lance Dacey Fix For: 8.0.0 I noticed that there is a group_by() and sort_by() function in the 7.0.0 branch. Is it possible to include a drop_duplicates() function as well? ||id||updated_at|| |1|2022-01-01 04:23:57| |2|2022-01-01 07:19:21| |2|2022-01-10 22:14:01| Something like this which would return a table without the second row in the example above would be great. I usually am reading an append-only dataset and then I need to report on latest version of each row. To drop duplicates, I am temporarily converting the append-only table to a pandas DataFrame, and then I convert it back to a table and save a separate "latest-version" dataset. {code:python} table.sort_by(sorting=[("id", "ascending"), ("updated_at", "ascending")]).drop_duplicates(subset=["id"] keep="last") {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (ARROW-12365) [Python] [Dataset] Add partition_filename_cb to ds.write_dataset()
Lance Dacey created ARROW-12365: --- Summary: [Python] [Dataset] Add partition_filename_cb to ds.write_dataset() Key: ARROW-12365 URL: https://issues.apache.org/jira/browse/ARROW-12365 Project: Apache Arrow Issue Type: Wish Components: Python Affects Versions: 3.0.0 Environment: Ubuntu 18.04 Reporter: Lance Dacey I need to use the legacy pq.write_to_dataset() in order to guarantee that a file within a partition will have a specific name. My use case is that I need to report on the final version of data and our visualization tool connects directly to our parquet files on Azure Blob (Power BI). 1) Download data every hour based on an updated_at timestamp (this data is partitioned by date) 2) Transform the data which was just downloaded and save it into a "staging" dataset (this has all versions of the data and there will be many files within each partition. In this case, up to 24 files within a single date partition since we download hourly) 3) Filter the transformed data and read a subset of columns, sort it by the updated_at timestamp and drop duplicates on the unique constraint, then partition and save it with partition_filename_cb. In the example below, if I partition by the "date_id" column, then my dataset structure will be "/date_id=202104123/20210413.parquet" {code:java} use_legacy_dataset=True, partition_filename_cb=lambda x: str(x[-1]) + ".parquet",{code} Ultimately, I am sure that this final dataset has exactly one file per partition and that I only have the latest version of each row based on the maximum updated_at timestamp. My visualization tool can safely connect to and incrementally refresh from this dataset. An alternative solution would be to allow us to overwrite anything in an existing partition. I do not care about the file names so much as I want to ensure that I am fully replacing any data which might already exist in my partition, and I want to limit the number of physical files. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (ARROW-12364) [Python] [Dataset] Add metadata_collector option to ds.write_dataset()
Lance Dacey created ARROW-12364: --- Summary: [Python] [Dataset] Add metadata_collector option to ds.write_dataset() Key: ARROW-12364 URL: https://issues.apache.org/jira/browse/ARROW-12364 Project: Apache Arrow Issue Type: Wish Components: Parquet, Python Affects Versions: 3.0.0 Environment: Ubuntu 18.04 Reporter: Lance Dacey The legacy pq.write_to_dataset() has an option to save metadata to a list when writing partitioned data. {code:python} collector = [] pq.write_to_dataset( table=table, root_path=output_path, use_legacy_dataset=True, metadata_collector=collector, ) fragments = [] for piece in collector: files.append(filesystem.sep.join([output_path, piece.row_group(0).column(0).file_path])) {code} This allows me to save a list of the specific parquet files which were created when writing the partitions to storage. I use this when scheduling tasks with Airflow. Task A downloads data and partitions it --> Task B reads the file fragments which were just saved and transforms it --> Task C creates a list of dataset filters from the file fragments I transformed, reads each filter to into a table and then processes the data further (normally dropping duplicates or selecting a subset of the columns) and saves it for visualization {code:java} fragments = ['dev/date_id=20180111/transform-split-20210301013200-68.parquet', 'dev/date_id=20180114/transform-split-20210301013200-69.parquet', 'dev/date_id=20180128/transform-split-20210301013200-57.parquet', ] {code} I can use this list downstream to do two things: 1) I can read the list of fragments directly as a new dataset and transform the data {code:java} ds.dataset(fragments) {code} 2) I can generate filters from the fragment paths which were saved using ds._get_partition_keys(). This allows me to query the dataset and retrieve all fragments within the partition. For example, if I partition by date and I process data every 30 minutes I might have 48 individual file fragments within a single partition. I need to know to query the *entire* partition instead of reading a single fragment. {code:java} def consolidate_filters(fragments): """Retrieves the partition_expressions from a list of dataset fragments to build a list of unique filters""" filters = [] for frag in fragments: partitions = ds._get_partition_keys(frag.partition_expression) filter = [(k, "==", v) for k, v in partitions.items()] if filter not in filters: filters.append(filter) return filters filter_expression = pq._filters_to_expression( filters=consolidate_filters(fragments=fragments) ) {code} My current problem is that when I use ds.write_dataset(), I do not have a convenient method for generating a list of the file fragments I just saved. My only choice is to use basename_template and fs.glob() to find a list of the files based on the basename_template pattern. This is much slower and a waste of listing files on blob storage. [Related stackoverflow question with the basis of the approach I am using now |https://stackoverflow.com/questions/66252660/pyarrow-identify-the-fragments-written-or-filters-used-when-writing-a-parquet/66266585#66266585] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (ARROW-11453) [Python] [Dataset] Unable to use write_dataset() to Azure Blob with adlfs 0.6.0
Lance Dacey created ARROW-11453: --- Summary: [Python] [Dataset] Unable to use write_dataset() to Azure Blob with adlfs 0.6.0 Key: ARROW-11453 URL: https://issues.apache.org/jira/browse/ARROW-11453 Project: Apache Arrow Issue Type: Bug Components: Python Affects Versions: 3.0.0 Environment: This environment results in an error: adlfs v0.6.0 fsspec 0.8.5 azure.storage.blob 12.6.0 adal 1.2.6 pandas 1.2.1 pyarrow 3.0.0 Reporter: Lance Dacey https://github.com/dask/adlfs/issues/171 I am unable to save data to Azure Blob using ds.write_dataset() with pyarrow 3.0 and adlfs 0.6.0. Reverting to 0.5.9 fixes the issue, but I am not sure what the cause is - posting this here in case there were filesystem changes in pyarrow recently which are incompatible with changes made in adlfs. {code:java} File "pyarrow/_dataset.pyx", line 2343, in pyarrow._dataset._filesystemdataset_write File "pyarrow/_fs.pyx", line 1032, in pyarrow._fs._cb_create_dir File "/opt/conda/lib/python3.8/site-packages/pyarrow/fs.py", line 259, in create_dir self.fs.mkdir(path, create_parents=recursive) File "/opt/conda/lib/python3.8/site-packages/fsspec/asyn.py", line 121, in wrapper return maybe_sync(func, self, *args, **kwargs) File "/opt/conda/lib/python3.8/site-packages/fsspec/asyn.py", line 100, in maybe_sync return sync(loop, func, *args, **kwargs) File "/opt/conda/lib/python3.8/site-packages/fsspec/asyn.py", line 71, in sync raise exc.with_traceback(tb) File "/opt/conda/lib/python3.8/site-packages/fsspec/asyn.py", line 55, in f result[0] = await future File "/opt/conda/lib/python3.8/site-packages/adlfs/spec.py", line 1033, in _mkdir raise FileExistsError( FileExistsError: Cannot overwrite existing Azure container -- dev already exists. {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (ARROW-11390) [Python] pyarrow 3.0 issues with turbodbc
Lance Dacey created ARROW-11390: --- Summary: [Python] pyarrow 3.0 issues with turbodbc Key: ARROW-11390 URL: https://issues.apache.org/jira/browse/ARROW-11390 Project: Apache Arrow Issue Type: Bug Components: Python Affects Versions: 3.0.0 Environment: pyarrow 3.0.0 fsspec 0.8.4 adlfs v0.5.9 pandas 1.2.1 numpy 1.19.5 turbodbc 4.1.1 Reporter: Lance Dacey This is more of a turbodbc issue I think, but perhaps someone here would have some idea of what changed to cause potential issues. {code:java} cursor = connection.cursor() cursor.execute("select top 10 * from dbo.tickets") table = cursor.fetchallarrow(){code} I am able to run table.num_rows and it will print out 10. If I run table.to_pandas() or table.schema or try to write the table to a dataset, my kernel dies with no explanation. I reverted back to pyarrow 2.0 and the same code works again. [https://github.com/blue-yonder/turbodbc/issues/289] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (ARROW-11250) [Python] Inconsistent behavior calling ds.dataset()
Lance Dacey created ARROW-11250: --- Summary: [Python] Inconsistent behavior calling ds.dataset() Key: ARROW-11250 URL: https://issues.apache.org/jira/browse/ARROW-11250 Project: Apache Arrow Issue Type: Bug Components: Python Affects Versions: 2.0.0 Environment: Ubuntu 18.04 adal 1.2.5 pyh9f0ad1d_0conda-forge adlfs 0.5.9 pyhd8ed1ab_0conda-forge apache-airflow1.10.14 pypi_0pypi azure-common 1.1.24 py_0conda-forge azure-core1.9.0 pyhd3deb0d_0conda-forge azure-datalake-store 0.0.51 pyh9f0ad1d_0conda-forge azure-identity1.5.0 pyhd8ed1ab_0conda-forge azure-nspkg 3.0.2 py_0conda-forge azure-storage-blob12.6.0 pyhd3deb0d_0conda-forge azure-storage-common 2.1.0py37hc8dfbb8_3conda-forge fsspec0.8.5 pyhd8ed1ab_0conda-forge jupyterlab_pygments 0.1.2 pyh9f0ad1d_0conda-forge pandas1.2.0py37ha9443f7_0 pyarrow 2.0.0 py37h4935f41_6_cpuconda-forge Reporter: Lance Dacey Fix For: 3.0.0 In a Jupyter notebook, I have noticed that sometimes I am not able to read a dataset which certainly exists on Azure Blob. {code:java} fs = fsspec.filesystem(protocol="abfs", account_name, account_key) {code} One example of this is reading a dataset in one cell: {code:java} ds.dataset("dev/test-split", partitioning="hive", filesystem=fs){code} Then in another cell I try to read the same dataset: {code:java} ds.dataset("dev/test-split", partitioning="hive", filesystem=fs) --- FileNotFoundError Traceback (most recent call last) in > 1 ds.dataset("dev/test-split", partitioning="hive", filesystem=fs) /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in dataset(source, schema, format, filesystem, partitioning, partition_base_dir, exclude_invalid_files, ignore_prefixes) 669 # TODO(kszucs): support InMemoryDataset for a table input 670 if _is_path_like(source): --> 671 return _filesystem_dataset(source, **kwargs) 672 elif isinstance(source, (tuple, list)): 673 if all(_is_path_like(elem) for elem in source): /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in _filesystem_dataset(source, schema, filesystem, partitioning, format, partition_base_dir, exclude_invalid_files, selector_ignore_prefixes) 426 fs, paths_or_selector = _ensure_multiple_sources(source, filesystem) 427 else: --> 428 fs, paths_or_selector = _ensure_single_source(source, filesystem) 429 430 options = FileSystemFactoryOptions( /opt/conda/lib/python3.8/site-packages/pyarrow/dataset.py in _ensure_single_source(path, filesystem) 402 paths_or_selector = [path] 403 else: --> 404 raise FileNotFoundError(path) 405 406 return filesystem, paths_or_selector FileNotFoundError: dev/test-split {code} If I reset the kernel, it works again. It also works if I change the path slightly, like adding a "/" at the end (so basically it just not work if I read the same dataset twice): {code:java} ds.dataset("dev/test-split/", partitioning="hive", filesystem=fs) {code} The other strange behavior I have noticed that that if I read a dataset inside of my Jupyter notebook, {code:java} %%time dataset = ds.dataset("dev/test-split", partitioning=ds.partitioning(pa.schema([("date", pa.date32())]), flavor="hive"), filesystem=fs, exclude_invalid_files=False) CPU times: user 1.98 s, sys: 0 ns, total: 1.98 s Wall time: 2.58 s{code} Now, on the exact same server when I try to run the same code against the same dataset in Airflow it takes over 3 minutes (comparing the timestamps in my logs between right before I read the dataset, and immediately after the dataset is available to filter): {code:java} [2021-01-14 03:52:04,011] INFO - Reading dev/test-split [2021-01-14 03:55:17,360] INFO - Processing dataset in batches {code} This is probably not a pyarrow issue, but what are some potential causes that I can look into? I have one example where it is 9 seconds to read the dataset in Jupyter, but then 11 *minutes* in Airflow. I don't know what to really investigate - as I mentioned, the Jupyter notebook and Airflow are on the same server and both are deployed using Docker. Airflow is using the CeleryExecutor. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (ARROW-10694) [Python] ds.write_dataset() generates empty files for each final partition
Lance Dacey created ARROW-10694: --- Summary: [Python] ds.write_dataset() generates empty files for each final partition Key: ARROW-10694 URL: https://issues.apache.org/jira/browse/ARROW-10694 Project: Apache Arrow Issue Type: Bug Affects Versions: 2.0.0 Environment: Ubuntu 18.04 Python 3.8.6 adlfs master branch Reporter: Lance Dacey ds.write_dataset() is generating empty files for the final partition folder which causes errors when reading the dataset or converting a dataset to a table. I believe this may be caused by fs.mkdir(). Without the final slash in the path, an empty file is created in the "dev" container: {code:java} fs = fsspec.filesystem(protocol='abfs', account_name=base.login, account_key=base.password) fs.mkdir("dev/test2") {code} If the final slash is added, a proper folder is created: {code:java} fs.mkdir("dev/test2/"){code} Here is a full example of what happens with ds.write_dataset: {code:java} schema = pa.schema( [ ("year", pa.int16()), ("month", pa.int8()), ("day", pa.int8()), ("report_date", pa.date32()), ("employee_id", pa.string()), ("designation", pa.dictionary(index_type=pa.int16(), value_type=pa.string())), ] ) part = DirectoryPartitioning(pa.schema([("year", pa.int16()), ("month", pa.int8()), ("day", pa.int8())])) ds.write_dataset(data=table, base_dir="dev/test-dataset", basename_template="test-{i}.parquet", format="parquet", partitioning=part, schema=schema, filesystem=fs) dataset.files #sample printed below, note the empty files [ 'dev/test-dataset/2018/1/1/test-0.parquet', 'dev/test-dataset/2018/10/1', 'dev/test-dataset/2018/10/1/test-27.parquet', 'dev/test-dataset/2018/3/1', 'dev/test-dataset/2018/3/1/test-6.parquet', 'dev/test-dataset/2020/1/1', 'dev/test-dataset/2020/1/1/test-2.parquet', 'dev/test-dataset/2020/10/1', 'dev/test-dataset/2020/10/1/test-29.parquet', 'dev/test-dataset/2020/11/1', 'dev/test-dataset/2020/11/1/test-32.parquet', 'dev/test-dataset/2020/2/1', 'dev/test-dataset/2020/2/1/test-5.parquet', 'dev/test-dataset/2020/7/1', 'dev/test-dataset/2020/7/1/test-20.parquet', 'dev/test-dataset/2020/8/1', 'dev/test-dataset/2020/8/1/test-23.parquet', 'dev/test-dataset/2020/9/1', 'dev/test-dataset/2020/9/1/test-26.parquet' ]{code} As you can see, there is an empty file for each "day" partition. I was not even able to read the dataset at all until I manually deleted the first empty file in the dataset (2018/1/1). I then get an error when I try to use the to_table() method: {code:java} OSError Traceback (most recent call last) in > 1 dataset.to_table()/opt/conda/lib/python3.8/site-packages/pyarrow/_dataset.pyx in pyarrow._dataset.Dataset.to_table()/opt/conda/lib/python3.8/site-packages/pyarrow/_dataset.pyx in pyarrow._dataset.Scanner.to_table()/opt/conda/lib/python3.8/site-packages/pyarrow/error.pxi in pyarrow.lib.pyarrow_internal_check_status()/opt/conda/lib/python3.8/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()OSError: Could not open parquet input source 'dev/test-dataset/2018/10/1': Invalid: Parquet file size is 0 bytes {code} If I manually delete the empty file, I can then use the to_table() function: {code:java} dataset.to_table(filter=(ds.field("year") == 2020) & (ds.field("month") == 10)).to_pandas() {code} Is this a bug with pyarrow, adlfs, or fsspec? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (ARROW-10517) [Python] Unable to read/write Parquet datasets with fsspec on Azure Blob
Lance Dacey created ARROW-10517: --- Summary: [Python] Unable to read/write Parquet datasets with fsspec on Azure Blob Key: ARROW-10517 URL: https://issues.apache.org/jira/browse/ARROW-10517 Project: Apache Arrow Issue Type: Bug Components: Python Affects Versions: 2.0.0 Environment: Ubuntu 18.04 Reporter: Lance Dacey If I downgrade adlfs to 0.2.5 and azure-blob-storage to 2.1, and then upgrade fsspec (0.6.2 has errors with a detail kwarg, so I need to upgrade it): {code:java} pa.dataset.write_dataset(data=table, base_dir="test/test7", basename_template=None, format="parquet", partitioning=DirectoryPartitioning(pa.schema([("year", pa.int64()), ("month", pa.int16()), ("day", pa.int16())])), schema=table.schema, filesystem=blob_fs){code} {code:java} 226 def create_dir(self, path, recursive): 227 # mkdir also raises FileNotFoundError when base directory is not found --> 228 self.fs.mkdir(path, create_parents=recursive){code} It does not look like there is a mkdir option. However, the output of fs.find() returns a dictionary as expected: {code:java} selected_files = blob_fs.find( "test/test6", maxdepth=None, withdirs=True, detail=True ){code} Now if I install the latest version of adlfs it upgrades my blob SDK to 12.5 (unfortunately, I cannot use this in production since Airflow requires 2.1, so this is only for testing purposes): {code:java} Successfully installed adlfs-0.5.5 azure-storage-blob-12.5.0{code} Now fs.find() returns a list, but I am able to use fs.mkdir(). {code:java} ['test/test6/year=2020', 'test/test6/year=2020/month=11', 'test/test6/year=2020/month=11/day=1', 'test/test6/year=2020/month=11/day=1/8ee6c66320ca47908c37f112f0cffd6c.parquet', 'test/test6/year=2020/month=11/day=1/ef753f016efc44b7b0f0800c35d084fc.parquet',]{code} This causes issues later when I try to read a dataset (the code is expecting a dictionary still): {code:java} dataset = ds.dataset("test/test5", filesystem=blob_fs, format="parquet"){code} {code:java} --> 221 for path, info in selected_files.items(): 222 infos.append(self._create_file_info(path, info)) 223 AttributeError: 'list' object has no attribute 'items'{code} I am still able to read individual files: {code:java} dataset = ds.dataset("test/test4/year=2020/month=11/2020-11.parquet", filesystem=blob_fs, format="parquet"){code} And I can read the dataset if I pass in a list of blob names "manually": {code:java} blobs = wasb.list_blobs(container_name="test", prefix="test4") dataset = ds.dataset(source=["test/" + blob.name for blob in blobs], format="parquet", partitioning="hive", filesystem=blob_fs) {code} For all of my examples, blob_fs is defined by: {code:java} blob_fs = fsspec.filesystem( protocol="abfs", account_name=base.login, account_key=base.password ){code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (ARROW-9682) [Python] Unable to specify the partition style with pq.write_to_dataset
Lance Dacey created ARROW-9682: -- Summary: [Python] Unable to specify the partition style with pq.write_to_dataset Key: ARROW-9682 URL: https://issues.apache.org/jira/browse/ARROW-9682 Project: Apache Arrow Issue Type: Improvement Affects Versions: 1.0.0 Environment: Ubuntu 18.04 Python 3.7 Reporter: Lance Dacey I am able to import and test DirectoryPartitioning but I am not able to figure out a way to write a dataset using this feature. It seems like write_to_dataset defaults to the "hive" style. Is there a way to test this? {code:java} from pyarrow.dataset import DirectoryPartitioning partitioning = DirectoryPartitioning(pa.schema([("year", pa.int16()), ("month", pa.int8()), ("day", pa.int8())])) print(partitioning.parse("/2009/11/3")) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (ARROW-9514) The new Dataset API will not work with files on Azure Blob (pq.read_table() does work and so does Dask)
Lance Dacey created ARROW-9514: -- Summary: The new Dataset API will not work with files on Azure Blob (pq.read_table() does work and so does Dask) Key: ARROW-9514 URL: https://issues.apache.org/jira/browse/ARROW-9514 Project: Apache Arrow Issue Type: Improvement Components: Python Affects Versions: 0.17.1 Environment: Ubuntu 18.04 Reporter: Lance Dacey Fix For: 0.17.1 I tried using pyarrow.dataset and pq.ParquetDataset(use_legacy_system=False) and my connection to Azure Blob fails. I know the documentation says only hdfs and s3 are implemented, but I have been using Azure Blob by using fsspec as the filesystem when reading and writing parquet files/datasets with Pyarrow (with use_legacy_system=True). Also, Dask works with storage_options. I am hoping that Azure Blob will be supported because I'd really like to try out the new row filtering and non-hive partitioning schemes. This is what I use for the filesystem when using read_table() or write_to_dataset(): fs = fsspec.filesystem(protocol='abfs', account_name=base.login, account_key=base.password) -- This message was sent by Atlassian Jira (v8.3.4#803005)