Correct, the "group by" operation you're looking for doesn't quite exist
(externally) yet (others can correct me if I'm wrong here). ARROW-3978
<https://issues.apache.org/jira/browse/ARROW-3978> sometimes gets brought
up in reference to this.  There are some things (e.g. C++ query execution
engine
<https://docs.google.com/document/d/1AyTdLU-RxA-Gsb9EsYnrQrmqPMOYMfPlWwxRi1Is1tQ/edit?usp=sharing>)
in the works which would provide this.  There is also an internal
implementation (arrow::compute::internal::Grouper) that is used for
computing partitions but I believe it was intentionally kept internal,
others may be able to explain more the reason.

Expressions are (or will soon be) built on compute so using them is unable
to provide much benefit over what is in compute.  I want to say the best
approach you could get in what is in compute for 4.0.0 is O(num_rows *
num_unique_names).  To create the mask you would use the equals function.
So the whole operation would be...

1) Use unique to get the possible string values
2) For each string value
  a) Use equals to get a mask
  b) Use filter to get a subarray

So what you have may be a pretty reasonable workaround.  I'd recommend
comparing with what you get from compute just for the sake of comparison.

So there are a few minor optimizations you can make that shouldn't be too
much harder.  You want to avoid GetScalar if you can as it will make an
allocation / copy for every item you access.  Grab the column from the
record batch and cast it to the appropriate typed array (this is only easy
because it appears you have a fairly rigid schema).  This will allow you to
access values directly without wrapping them in a scalar.  For example, in
C++ (I'll leave the cython to you :)) it would look like...

  auto arr =
std::dynamic_pointer_cast<arrow::DoubleArray>(record_batch->column(0));
  std::cout << arr->Value(0) << std::endl;

For the string array I believe it is...

auto str_arr =
std::dynamic_pointer_cast<arrow::StringArray>(record_batch->column(0));
arrow::util::string_view view = arr->GetView(0);

It may take a slight bit of finesse to figure out how to get
arrow::util::string_view to work with map but it should be doable.  There
is also GetString which returns std::string which should only be slightly
more expensive and GetValue which returns a uint8_t* and writes the length
into an out parameter.

On Wed, Apr 14, 2021 at 3:15 PM Xander Dunn <[email protected]> wrote:

> Thanks, I did try a few things with pyarrow.compute. However, the
> pyarrow.compute.filter interface indicates that it takes a boolean mask to
> do the filtering:
> https://arrow.apache.org/docs/python/generated/pyarrow.compute.filter.html
>
> But it doesn't actually help me create the mask? I'm back to iterating
> through the rows and now I would need to create a boolean array of size
> (num_rows) for every unique value of `name`.
>
> I saw in the dataset docs (
> https://arrow.apache.org/docs/python/dataset.html) some discussion on
> Expressions, such as `ds.field("name") == "Xander"`. However, I don't see a
> way of computing such an expression without loading the entire dataset into
> memory with `dataset.to_table()`, which doesn't work for my dataset because
> it's many times larger than RAM. Can an Expression be computed on a
> RecordBatch?
>
> But it's also hard to foresee how applying filter for each unique value of
> `name` will be more computationally efficient. The loop I posted above is
> O(num_rows), whereas applying filter for each name would be O(num_rows *
> num_unique_names). It could still be faster if my loop code is poorly
> implemented or if filter is multi-threaded.
>
>
> On Wed, Apr 14, 2021 at 4:45 PM, Micah Kornfield <[email protected]>
> wrote:
>
>> Have you looked at the pyarrow compute functions [1][2]?
>>
>> Unique and filter seems like they would help.
>>
>> [1]
>> https://arrow.apache.org/docs/python/compute.html?highlight=pyarrow%20compute
>> [2] https://arrow.apache.org/docs/cpp/compute.html#compute-function-list
>>
>> On Wed, Apr 14, 2021 at 2:02 PM Xander Dunn <[email protected]> wrote:
>>
>>> Thanks Weston,
>>>
>>> Performance is paramount here, I'm streaming through 7TB of data.
>>>
>>> I actually need to separate the data based on the value of the `name`
>>> column. For every unique value of `name`, I need a batch of those rows. I
>>> tried using gandiva's filter function but can't get gandiva installed on
>>> Ubuntu (see my earlier thread "[Python] pyarrow.gandiva unavailable on
>>> Ubuntu?" on this mailing list).
>>>
>>> Aside from that, I'm not sure of a way to separate the data faster than
>>> iterating through every row and placing the values into a map keyed on
>>> `name`:
>>> ```
>>> cdef struct myUpdateStruct:
>>>     double value
>>>     int64_t checksum
>>>
>>> cdef iterate_dataset():
>>>     cdef map[c_string, deque[myUpdateStruct]] myUpdates
>>>     cdef shared_ptr[CRecordBatch] batch # This is populated by a scanner
>>> of .parquet files
>>>     cdef int64_t batch_row_index = 0
>>>     while batch_row_index < batch.get().num_rows():
>>>         name_buffer = (<CBaseBinaryScalar*>GetResultValue(names.get().\
>>>                 GetScalar(batch_row_index)).get()).value
>>>         name = <char *>name_buffer.get().data()
>>>         value = (<CDoubleScalar*>GetResultValue(values.get().\
>>>                 GetScalar(batch_row_index)).get()).value
>>>         checksum = (<CInt64Scalar*>GetResultValue(checksums.get().\
>>>                 GetScalar(batch_row_index)).get()).value
>>>         newUpdate = myUpdateStruct(value, checksum)
>>>         if myUpdates.count(name) <= 0:
>>>             myUpdates[name] = deque[myUpdateStruct]()
>>>         myUpdates[name].push_front(newUpdate)
>>>         if myUpdates[name].size() > 1024:
>>>             myUpdates[name].pop_back()
>>>         batch_row_index += 1
>>> ```
>>> This takes 107minutes to iterate through the first 290GB of data.
>>> Without accessing or filtering the data in any way it takes only 12min to
>>> read all the .parquet files into RecordBatches and place them into Plasma.
>>>
>>>
>>> On Wed, Apr 14, 2021 at 12:57 PM, Weston Pace <[email protected]>
>>> wrote:
>>>
>>>> If you don't need the performance, you could stay in python (use
>>>> to_pylist() for the array or as_py() for scalars).
>>>>
>>>> If you do need the performance then you're probably better served
>>>> getting the buffers and operating on them directly.  Or, even better,
>>>> making use of the compute kernels:
>>>>
>>>> arr = pa.array(['abc', 'ab', 'Xander', None], pa.string())
>>>> desired = pa.array(['Xander'], pa.string())
>>>> pc.any(pc.is_in(arr, value_set=desired)).as_py() # True
>>>>
>>>> On Wed, Apr 14, 2021 at 6:29 AM Xander Dunn <[email protected]> wrote:
>>>>
>>>>> This works for getting a c string out of the CScalar:
>>>>> ```
>>>>>                 name_buffer =
>>>>> (<CBaseBinaryScalar*>GetResultValue(names.get().\
>>>>>                         GetScalar(batch_row_index)).get()).value
>>>>>                 name = <char *>name_buffer.get().data()
>>>>> ```
>>>>>
>>>>>
>>>>> On Tue, Apr 13, 2021 at 10:43 PM, Xander Dunn <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Here is an example code snippet from a .pyx file that successfully
>>>>>> iterates through a CRecordBatch and ensures that the timestamps are
>>>>>> ascending:
>>>>>> ```
>>>>>>             while batch_row_index < batch.get().num_rows():
>>>>>>                 timestamp =
>>>>>> GetResultValue(times.get().GetScalar(batch_row_index))
>>>>>>                 new_timestamp = <CTimestampScalar*>timestamp.get()
>>>>>>                 current_timestamp = timestamps[name]
>>>>>>                 if current_timestamp > new_timestamp.value:
>>>>>>                     abort()
>>>>>>                 batch_row_index += 1
>>>>>> ```
>>>>>>
>>>>>> However, I'm having difficulty operating on the values in a column of
>>>>>> string type. Unlike CTimestampScalar, there is no CStringScalar. Although
>>>>>> there is a StringScalar type in C++, it isn't defined in the Cython
>>>>>> interface. There is a `CStringType` and a `c_string` type.
>>>>>> ```
>>>>>>     while batch_row_index < batch.get().num_rows():
>>>>>>         name = GetResultValue(names.get().GetScalar(batch_row_index))
>>>>>>         name_string = <CStringType*>name.get() # This is wrong
>>>>>>         printf("%s\n", name_string) # This prints garbage
>>>>>>         if name_string == b"Xander": # Doesn't work
>>>>>>             print("found it")
>>>>>>         batch_row_index += 1
>>>>>> ```
>>>>>> How do I get the string value as a C type and compare it to other
>>>>>> strings?
>>>>>>
>>>>>> Thanks,
>>>>>> Xander
>>>>>>
>>>>>
>

Reply via email to