Thank you, Brian. I tried `partition_cols`, but it is not working. I tried
pure pandas, it does work, so I am not sure if anything wrong with Beam.

Wenbing

On Wed, Apr 7, 2021 at 2:56 PM Brian Hulette <[email protected]> wrote:

> Hm, to_parquet does have a `partition_cols` argument [1] which we pass
> through [2]. It would be interesting to see what  `partition_cols='key1'`
> does - I suspect it won't work perfectly though.
>
> Do you have any thoughts here Robert?
>
> [1]
> https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html
> [2]
> https://github.com/apache/beam/blob/a8cd05932bed9b2480316fb8518409636cb2733b/sdks/python/apache_beam/dataframe/io.py#L525
>
> On Wed, Apr 7, 2021 at 2:22 PM Wenbing Bai <[email protected]>
> wrote:
>
>> Hi Robert and Brian,
>>
>> I tried groupby in my case. Here is my pipeline code. I do see all the
>> data in the final parquet file are sorted in each group. However, I'd like
>> to write each partition (group) to an individual file, how can I achieve
>> it? In addition, I am using the master of Apache Beam SDK, how can I test
>> the pipeline with DataflowRunner considering there is no dataflow worker
>> image available?
>>
>> data = [
>> {
>> "key1": 1000 + i % 10,
>> "key2": randrange(10000),
>> "feature_1": "somestring{}".format(i)
>> } for i in range(10000)
>> ]
>>
>> class TestRow(typing.NamedTuple):
>> key1: int
>> key2: int
>> feature_1: str
>>
>> with beam.Pipeline() as p:
>> pcoll = (
>> p
>> | beam.Create(data)
>> | beam.Map(lambda x:x).with_output_types(TestRow)
>> )
>>
>> df = to_dataframe(pcoll)
>> sorted_df = df.groupby('key1').apply(lambda df: df.sort_values(by='key2')
>> sorted_df.to_parquet('test_beam_dataframe{}.parquet'.format(str
>> (uuid.uuid4())[:8]), engine='pyarrow', index=False)
>>
>> On Fri, Apr 2, 2021 at 10:00 AM Wenbing Bai <[email protected]>
>> wrote:
>>
>>> Thank you, Robert and Brian.
>>>
>>> I'd like to try this out. I am trying to distribute my dataset to nodes,
>>> sort each partition by some key and then store each partition to its own
>>> file.
>>>
>>> Wenbing
>>>
>>> On Fri, Apr 2, 2021 at 9:23 AM Brian Hulette <[email protected]>
>>> wrote:
>>>
>>>> Note groupby.apply [1] in particular should be able to do what you
>>>> want, something like:
>>>>
>>>>   df.groupby('key1').apply(lambda df: df.sort_values('key2'))
>>>>
>>>> But as Robert noted we don't make any guarantees about preserving this
>>>> ordering later in the pipeline. For this reason I actually just sent a PR
>>>> to disallow sort_values on the entire dataset [2].
>>>>
>>>> Brian
>>>>
>>>> [1] https://github.com/apache/beam/pull/13843
>>>> [2] https://github.com/apache/beam/pull/14324
>>>>
>>>> On Fri, Apr 2, 2021 at 9:15 AM Robert Bradshaw <[email protected]>
>>>> wrote:
>>>>
>>>>> Thanks for trying this out.
>>>>>
>>>>> Better support for groupby (e.g.
>>>>> https://github.com/apache/beam/pull/13843 ,
>>>>> https://github.com/apache/beam/pull/13637) will be available in the
>>>>> next Beam release (2.29, in progress, but you could try out head if you
>>>>> want). Note, however, that Beam PCollections are by definition unordered,
>>>>> so unless you sort a partition and immediately do something with it that
>>>>> ordering may not be preserved. If you could let us know what you're trying
>>>>> to do with this ordering that would be helpful.
>>>>>
>>>>> - Robert
>>>>>
>>>>>
>>>>> On Thu, Apr 1, 2021 at 7:31 PM Wenbing Bai <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Beam users,
>>>>>>
>>>>>> I have a user case to partition my PCollection by some key, and then
>>>>>> sort my rows within the same partition by some other key.
>>>>>>
>>>>>> I feel Beam Dataframe could be a candidate solution, but I cannot
>>>>>> figure out how to make it work. Specifically, I tried df.groupby where I
>>>>>> expect my data will be distributed to different nodes. I also tried
>>>>>> df.sort_values, but it will sort my whole dataset, which is not what I 
>>>>>> need.
>>>>>>
>>>>>> Can someone shed some light on this?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Wenbing Bai
>>>>>>
>>>>>> Senior Software Engineer
>>>>>>
>>>>>> Data Infrastructure, Cruise
>>>>>>
>>>>>> Pronouns: She/Her
>>>>>>
>>>>>>
>>>>>>
>>>>>> *Confidentiality Note:* We care about protecting our proprietary
>>>>>> information, confidential material, and trade secrets. This message
>>>>>> may contain some or all of those things. Cruise will suffer material harm
>>>>>> if anyone other than the intended recipient disseminates or takes any
>>>>>> action based on this message. If you have received this message 
>>>>>> (including
>>>>>> any attachments) in error, please delete it immediately and notify the
>>>>>> sender promptly.
>>>>>
>>>>>
>>>
>>> --
>>>
>>>
>>>
>>>
>>>
>>> Wenbing Bai
>>>
>>> Senior Software Engineer
>>>
>>> Data Infrastructure, Cruise
>>>
>>> Pronouns: She/Her
>>>
>>>
>>
>> --
>>
>>
>>
>>
>>
>> Wenbing Bai
>>
>> Senior Software Engineer
>>
>> Data Infrastructure, Cruise
>>
>> Pronouns: She/Her
>>
>>
>>
>> *Confidentiality Note:* We care about protecting our proprietary
>> information, confidential material, and trade secrets. This message may
>> contain some or all of those things. Cruise will suffer material harm if
>> anyone other than the intended recipient disseminates or takes any action
>> based on this message. If you have received this message (including any
>> attachments) in error, please delete it immediately and notify the sender
>> promptly.
>
>

-- 





Wenbing Bai

Senior Software Engineer

Data Infrastructure, Cruise

Pronouns: She/Her

-- 


*Confidentiality Note:* We care about protecting our proprietary 
information, confidential material, and trade secrets. This message may 
contain some or all of those things. Cruise will suffer material harm if 
anyone other than the intended recipient disseminates or takes any action 
based on this message. If you have received this message (including any 
attachments) in error, please delete it immediately and notify the sender 
promptly.

Reply via email to