Thanks, I did try a few things with pyarrow.compute. However, the 
pyarrow.compute.filter interface indicates that it takes a boolean mask to do 
the filtering: 
https://arrow.apache.org/docs/python/generated/pyarrow.compute.filter.html

But it doesn't actually help me create the mask? I'm back to iterating through 
the rows and now I would need to create a boolean array of size (num_rows) for 
every unique value of `name`.

I saw in the dataset docs ( https://arrow.apache.org/docs/python/dataset.html ) 
some discussion on Expressions, such as `ds.field("name") == "Xander"`. 
However, I don't see a way of computing such an expression without loading the 
entire dataset into memory with `dataset.to_table()`, which doesn't work for my 
dataset because it's many times larger than RAM. Can an Expression be computed 
on a RecordBatch?

But it's also hard to foresee how applying filter for each unique value of 
`name` will be more computationally efficient. The loop I posted above is 
O(num_rows), whereas applying filter for each name would be O(num_rows * 
num_unique_names). It could still be faster if my loop code is poorly 
implemented or if filter is multi-threaded.

On Wed, Apr 14, 2021 at 4:45 PM, Micah Kornfield < [email protected] > 
wrote:

> 
> Have you looked at the pyarrow compute functions [1][2]?
> 
> 
> Unique and filter seems like they would help.
> 
> 
> [1] https:/ / arrow. apache. org/ docs/ python/ compute. 
> html?highlight=pyarrow%20compute
> (
> https://arrow.apache.org/docs/python/compute.html?highlight=pyarrow%20compute
> )
> [2] https:/ / arrow. apache. org/ docs/ cpp/ compute. 
> html#compute-function-list
> ( https://arrow.apache.org/docs/cpp/compute.html#compute-function-list )
> 
> On Wed, Apr 14, 2021 at 2:02 PM Xander Dunn < xander@ xander. ai (
> [email protected] ) > wrote:
> 
> 
>> Thanks Weston,
>> 
>> 
>> 
>> Performance is paramount here, I'm streaming through 7TB of data.
>> 
>> 
>> 
>> I actually need to separate the data based on the value of the `name`
>> column. For every unique value of `name`, I need a batch of those rows. I
>> tried using gandiva's filter function but can't get gandiva installed on
>> Ubuntu (see my earlier thread " [Python] pyarrow.gandiva unavailable on
>> Ubuntu? " on this mailing list).
>> 
>> 
>> 
>> Aside from that, I'm not sure of a way to separate the data faster than
>> iterating through every row and placing the values into a map keyed on
>> `name`:
>> 
>> ```
>> 
>> cdef struct myUpdateStruct:
>> 
>> double value
>> 
>> int64_t checksum
>> 
>> 
>> cdef iterate_dataset():
>> cdef map[c_string, deque[myUpdateStruct]] myUpdates
>> 
>> cdef shared_ptr[CRecordBatch] batch # This is populated by a scanner of
>> .parquet files
>> 
>> cdef int64_t batch_row_index = 0
>> while batch_row_index < batch.get().num_rows():
>> 
>> name_buffer = (<CBaseBinaryScalar*>GetResultValue(names.get().\
>> 
>> GetScalar(batch_row_index)).get()).value
>> 
>> name = <char *>name_buffer.get().data()
>> 
>> value = (<CDoubleScalar*>GetResultValue(values.get().\
>> 
>> GetScalar(batch_row_index)).get()).value
>> 
>> checksum = (<CInt64Scalar*>GetResultValue(checksums.get().\
>> 
>> GetScalar(batch_row_index)).get()).value
>> 
>> newUpdate = myUpdateStruct(value, checksum)
>> 
>> if myUpdates.count(name) <= 0:
>> 
>> myUpdates[name] = deque[myUpdateStruct]()
>> 
>> myUpdates[name].push_front(newUpdate)
>> 
>> if myUpdates[name].size() > 1024:
>> 
>> myUpdates[name].pop_back()
>> 
>> batch_row_index += 1
>> ```
>> 
>> This takes 107minutes to iterate through the first 290GB of data. Without
>> accessing or filtering the data in any way it takes only 12min to read all
>> the .parquet files into RecordBatches and place them into Plasma.
>> 
>> 
>> 
>> 
>> On Wed, Apr 14, 2021 at 12:57 PM, Weston Pace < weston. pace@ gmail. com (
>> [email protected] ) > wrote:
>> 
>>> If you don't need the performance, you could stay in python (use
>>> to_pylist() for the array or as_py() for scalars).
>>> 
>>> 
>>> If you do need the performance then you're probably better served getting
>>> the buffers and operating on them directly.  Or, even better, making use
>>> of the compute kernels:
>>> 
>>> 
>>> arr = pa.array(['abc', 'ab', 'Xander', None], pa.string())
>>> desired = pa.array(['Xander'], pa.string())
>>> pc.any(pc.is_in(arr, value_set=desired)).as_py() # True
>>> 
>>> 
>>> On Wed, Apr 14, 2021 at 6:29 AM Xander Dunn < xander@ xander. ai (
>>> [email protected] ) > wrote:
>>> 
>>> 
>>>> This works for getting a c string out of the CScalar:
>>>> ```
>>>> 
>>>> name_buffer = (<CBaseBinaryScalar*>GetResultValue(names.get().\
>>>> 
>>>> GetScalar(batch_row_index)).get()).value
>>>> 
>>>> name = <char *>name_buffer.get().data()
>>>> 
>>>> ```
>>>> 
>>>> 
>>>> 
>>>> On Tue, Apr 13, 2021 at 10:43 PM, Xander Dunn < xander@ xander. ai (
>>>> [email protected] ) > wrote:
>>>> 
>>>>> Here is an example code snippet from a .pyx file that successfully
>>>>> iterates through a CRecordBatch and ensures that the timestamps are
>>>>> ascending:
>>>>> 
>>>>> ```
>>>>> 
>>>>> while batch_row_index < batch.get().num_rows():
>>>>> 
>>>>> timestamp = GetResultValue(times.get().GetScalar(batch_row_index))
>>>>> 
>>>>> new_timestamp = <CTimestampScalar*>timestamp.get()
>>>>> 
>>>>> current_timestamp = timestamps[name]
>>>>> 
>>>>> if current_timestamp > new_timestamp.value:
>>>>> 
>>>>> abort()
>>>>> 
>>>>> batch_row_index += 1
>>>>> 
>>>>> ```
>>>>> 
>>>>> 
>>>>> 
>>>>> However, I'm having difficulty operating on the values in a column of
>>>>> string type. Unlike CTimestampScalar, there is no CStringScalar. Although
>>>>> there is a StringScalar type in C++, it isn't defined in the Cython
>>>>> interface. There is a `CStringType` and a `c_string` type.
>>>>> 
>>>>> ```
>>>>> 
>>>>> while batch_row_index < batch.get().num_rows():
>>>>> 
>>>>> name = GetResultValue(names.get().GetScalar(batch_row_index))
>>>>> 
>>>>> name_string = <CStringType*>name.get() # This is wrong
>>>>> 
>>>>> printf("%s\n", name_string) # This prints garbage
>>>>> 
>>>>> if name_string == b"Xander": # Doesn't work
>>>>> 
>>>>> print("found it")
>>>>> 
>>>>> batch_row_index += 1
>>>>> 
>>>>> ```
>>>>> 
>>>>> How do I get the string value as a C type and compare it to other strings?
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Xander
>>>>> 
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 
>

Reply via email to