Thanks, I did try a few things with pyarrow.compute. However, the pyarrow.compute.filter interface indicates that it takes a boolean mask to do the filtering: https://arrow.apache.org/docs/python/generated/pyarrow.compute.filter.html
But it doesn't actually help me create the mask? I'm back to iterating through the rows and now I would need to create a boolean array of size (num_rows) for every unique value of `name`. I saw in the dataset docs ( https://arrow.apache.org/docs/python/dataset.html ) some discussion on Expressions, such as `ds.field("name") == "Xander"`. However, I don't see a way of computing such an expression without loading the entire dataset into memory with `dataset.to_table()`, which doesn't work for my dataset because it's many times larger than RAM. Can an Expression be computed on a RecordBatch? But it's also hard to foresee how applying filter for each unique value of `name` will be more computationally efficient. The loop I posted above is O(num_rows), whereas applying filter for each name would be O(num_rows * num_unique_names). It could still be faster if my loop code is poorly implemented or if filter is multi-threaded. On Wed, Apr 14, 2021 at 4:45 PM, Micah Kornfield < [email protected] > wrote: > > Have you looked at the pyarrow compute functions [1][2]? > > > Unique and filter seems like they would help. > > > [1] https:/ / arrow. apache. org/ docs/ python/ compute. > html?highlight=pyarrow%20compute > ( > https://arrow.apache.org/docs/python/compute.html?highlight=pyarrow%20compute > ) > [2] https:/ / arrow. apache. org/ docs/ cpp/ compute. > html#compute-function-list > ( https://arrow.apache.org/docs/cpp/compute.html#compute-function-list ) > > On Wed, Apr 14, 2021 at 2:02 PM Xander Dunn < xander@ xander. ai ( > [email protected] ) > wrote: > > >> Thanks Weston, >> >> >> >> Performance is paramount here, I'm streaming through 7TB of data. >> >> >> >> I actually need to separate the data based on the value of the `name` >> column. For every unique value of `name`, I need a batch of those rows. I >> tried using gandiva's filter function but can't get gandiva installed on >> Ubuntu (see my earlier thread " [Python] pyarrow.gandiva unavailable on >> Ubuntu? " on this mailing list). >> >> >> >> Aside from that, I'm not sure of a way to separate the data faster than >> iterating through every row and placing the values into a map keyed on >> `name`: >> >> ``` >> >> cdef struct myUpdateStruct: >> >> double value >> >> int64_t checksum >> >> >> cdef iterate_dataset(): >> cdef map[c_string, deque[myUpdateStruct]] myUpdates >> >> cdef shared_ptr[CRecordBatch] batch # This is populated by a scanner of >> .parquet files >> >> cdef int64_t batch_row_index = 0 >> while batch_row_index < batch.get().num_rows(): >> >> name_buffer = (<CBaseBinaryScalar*>GetResultValue(names.get().\ >> >> GetScalar(batch_row_index)).get()).value >> >> name = <char *>name_buffer.get().data() >> >> value = (<CDoubleScalar*>GetResultValue(values.get().\ >> >> GetScalar(batch_row_index)).get()).value >> >> checksum = (<CInt64Scalar*>GetResultValue(checksums.get().\ >> >> GetScalar(batch_row_index)).get()).value >> >> newUpdate = myUpdateStruct(value, checksum) >> >> if myUpdates.count(name) <= 0: >> >> myUpdates[name] = deque[myUpdateStruct]() >> >> myUpdates[name].push_front(newUpdate) >> >> if myUpdates[name].size() > 1024: >> >> myUpdates[name].pop_back() >> >> batch_row_index += 1 >> ``` >> >> This takes 107minutes to iterate through the first 290GB of data. Without >> accessing or filtering the data in any way it takes only 12min to read all >> the .parquet files into RecordBatches and place them into Plasma. >> >> >> >> >> On Wed, Apr 14, 2021 at 12:57 PM, Weston Pace < weston. pace@ gmail. com ( >> [email protected] ) > wrote: >> >>> If you don't need the performance, you could stay in python (use >>> to_pylist() for the array or as_py() for scalars). >>> >>> >>> If you do need the performance then you're probably better served getting >>> the buffers and operating on them directly. Or, even better, making use >>> of the compute kernels: >>> >>> >>> arr = pa.array(['abc', 'ab', 'Xander', None], pa.string()) >>> desired = pa.array(['Xander'], pa.string()) >>> pc.any(pc.is_in(arr, value_set=desired)).as_py() # True >>> >>> >>> On Wed, Apr 14, 2021 at 6:29 AM Xander Dunn < xander@ xander. ai ( >>> [email protected] ) > wrote: >>> >>> >>>> This works for getting a c string out of the CScalar: >>>> ``` >>>> >>>> name_buffer = (<CBaseBinaryScalar*>GetResultValue(names.get().\ >>>> >>>> GetScalar(batch_row_index)).get()).value >>>> >>>> name = <char *>name_buffer.get().data() >>>> >>>> ``` >>>> >>>> >>>> >>>> On Tue, Apr 13, 2021 at 10:43 PM, Xander Dunn < xander@ xander. ai ( >>>> [email protected] ) > wrote: >>>> >>>>> Here is an example code snippet from a .pyx file that successfully >>>>> iterates through a CRecordBatch and ensures that the timestamps are >>>>> ascending: >>>>> >>>>> ``` >>>>> >>>>> while batch_row_index < batch.get().num_rows(): >>>>> >>>>> timestamp = GetResultValue(times.get().GetScalar(batch_row_index)) >>>>> >>>>> new_timestamp = <CTimestampScalar*>timestamp.get() >>>>> >>>>> current_timestamp = timestamps[name] >>>>> >>>>> if current_timestamp > new_timestamp.value: >>>>> >>>>> abort() >>>>> >>>>> batch_row_index += 1 >>>>> >>>>> ``` >>>>> >>>>> >>>>> >>>>> However, I'm having difficulty operating on the values in a column of >>>>> string type. Unlike CTimestampScalar, there is no CStringScalar. Although >>>>> there is a StringScalar type in C++, it isn't defined in the Cython >>>>> interface. There is a `CStringType` and a `c_string` type. >>>>> >>>>> ``` >>>>> >>>>> while batch_row_index < batch.get().num_rows(): >>>>> >>>>> name = GetResultValue(names.get().GetScalar(batch_row_index)) >>>>> >>>>> name_string = <CStringType*>name.get() # This is wrong >>>>> >>>>> printf("%s\n", name_string) # This prints garbage >>>>> >>>>> if name_string == b"Xander": # Doesn't work >>>>> >>>>> print("found it") >>>>> >>>>> batch_row_index += 1 >>>>> >>>>> ``` >>>>> >>>>> How do I get the string value as a C type and compare it to other strings? >>>>> >>>>> >>>>> >>>>> >>>>> Thanks, >>>>> >>>>> Xander >>>>> >>>> >>>> >>> >>> >> >> > >
