Hm, to_parquet does have a `partition_cols` argument [1] which we pass
through [2]. It would be interesting to see what  `partition_cols='key1'`
does - I suspect it won't work perfectly though.

Do you have any thoughts here Robert?

[1]
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html
[2]
https://github.com/apache/beam/blob/a8cd05932bed9b2480316fb8518409636cb2733b/sdks/python/apache_beam/dataframe/io.py#L525

On Wed, Apr 7, 2021 at 2:22 PM Wenbing Bai <[email protected]>
wrote:

> Hi Robert and Brian,
>
> I tried groupby in my case. Here is my pipeline code. I do see all the
> data in the final parquet file are sorted in each group. However, I'd like
> to write each partition (group) to an individual file, how can I achieve
> it? In addition, I am using the master of Apache Beam SDK, how can I test
> the pipeline with DataflowRunner considering there is no dataflow worker
> image available?
>
> data = [
> {
> "key1": 1000 + i % 10,
> "key2": randrange(10000),
> "feature_1": "somestring{}".format(i)
> } for i in range(10000)
> ]
>
> class TestRow(typing.NamedTuple):
> key1: int
> key2: int
> feature_1: str
>
> with beam.Pipeline() as p:
> pcoll = (
> p
> | beam.Create(data)
> | beam.Map(lambda x:x).with_output_types(TestRow)
> )
>
> df = to_dataframe(pcoll)
> sorted_df = df.groupby('key1').apply(lambda df: df.sort_values(by='key2')
> sorted_df.to_parquet('test_beam_dataframe{}.parquet'.format(str
> (uuid.uuid4())[:8]), engine='pyarrow', index=False)
>
> On Fri, Apr 2, 2021 at 10:00 AM Wenbing Bai <[email protected]>
> wrote:
>
>> Thank you, Robert and Brian.
>>
>> I'd like to try this out. I am trying to distribute my dataset to nodes,
>> sort each partition by some key and then store each partition to its own
>> file.
>>
>> Wenbing
>>
>> On Fri, Apr 2, 2021 at 9:23 AM Brian Hulette <[email protected]> wrote:
>>
>>> Note groupby.apply [1] in particular should be able to do what you want,
>>> something like:
>>>
>>>   df.groupby('key1').apply(lambda df: df.sort_values('key2'))
>>>
>>> But as Robert noted we don't make any guarantees about preserving this
>>> ordering later in the pipeline. For this reason I actually just sent a PR
>>> to disallow sort_values on the entire dataset [2].
>>>
>>> Brian
>>>
>>> [1] https://github.com/apache/beam/pull/13843
>>> [2] https://github.com/apache/beam/pull/14324
>>>
>>> On Fri, Apr 2, 2021 at 9:15 AM Robert Bradshaw <[email protected]>
>>> wrote:
>>>
>>>> Thanks for trying this out.
>>>>
>>>> Better support for groupby (e.g.
>>>> https://github.com/apache/beam/pull/13843 ,
>>>> https://github.com/apache/beam/pull/13637) will be available in the
>>>> next Beam release (2.29, in progress, but you could try out head if you
>>>> want). Note, however, that Beam PCollections are by definition unordered,
>>>> so unless you sort a partition and immediately do something with it that
>>>> ordering may not be preserved. If you could let us know what you're trying
>>>> to do with this ordering that would be helpful.
>>>>
>>>> - Robert
>>>>
>>>>
>>>> On Thu, Apr 1, 2021 at 7:31 PM Wenbing Bai <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Beam users,
>>>>>
>>>>> I have a user case to partition my PCollection by some key, and then
>>>>> sort my rows within the same partition by some other key.
>>>>>
>>>>> I feel Beam Dataframe could be a candidate solution, but I cannot
>>>>> figure out how to make it work. Specifically, I tried df.groupby where I
>>>>> expect my data will be distributed to different nodes. I also tried
>>>>> df.sort_values, but it will sort my whole dataset, which is not what I 
>>>>> need.
>>>>>
>>>>> Can someone shed some light on this?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Wenbing Bai
>>>>>
>>>>> Senior Software Engineer
>>>>>
>>>>> Data Infrastructure, Cruise
>>>>>
>>>>> Pronouns: She/Her
>>>>>
>>>>>
>>>>>
>>>>> *Confidentiality Note:* We care about protecting our proprietary
>>>>> information, confidential material, and trade secrets. This message
>>>>> may contain some or all of those things. Cruise will suffer material harm
>>>>> if anyone other than the intended recipient disseminates or takes any
>>>>> action based on this message. If you have received this message (including
>>>>> any attachments) in error, please delete it immediately and notify the
>>>>> sender promptly.
>>>>
>>>>
>>
>> --
>>
>>
>>
>>
>>
>> Wenbing Bai
>>
>> Senior Software Engineer
>>
>> Data Infrastructure, Cruise
>>
>> Pronouns: She/Her
>>
>>
>
> --
>
>
>
>
>
> Wenbing Bai
>
> Senior Software Engineer
>
> Data Infrastructure, Cruise
>
> Pronouns: She/Her
>
>
>
> *Confidentiality Note:* We care about protecting our proprietary
> information, confidential material, and trade secrets. This message may
> contain some or all of those things. Cruise will suffer material harm if
> anyone other than the intended recipient disseminates or takes any action
> based on this message. If you have received this message (including any
> attachments) in error, please delete it immediately and notify the sender
> promptly.

Reply via email to