Hm, to_parquet does have a `partition_cols` argument [1] which we pass through [2]. It would be interesting to see what `partition_cols='key1'` does - I suspect it won't work perfectly though.
Do you have any thoughts here Robert? [1] https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html [2] https://github.com/apache/beam/blob/a8cd05932bed9b2480316fb8518409636cb2733b/sdks/python/apache_beam/dataframe/io.py#L525 On Wed, Apr 7, 2021 at 2:22 PM Wenbing Bai <[email protected]> wrote: > Hi Robert and Brian, > > I tried groupby in my case. Here is my pipeline code. I do see all the > data in the final parquet file are sorted in each group. However, I'd like > to write each partition (group) to an individual file, how can I achieve > it? In addition, I am using the master of Apache Beam SDK, how can I test > the pipeline with DataflowRunner considering there is no dataflow worker > image available? > > data = [ > { > "key1": 1000 + i % 10, > "key2": randrange(10000), > "feature_1": "somestring{}".format(i) > } for i in range(10000) > ] > > class TestRow(typing.NamedTuple): > key1: int > key2: int > feature_1: str > > with beam.Pipeline() as p: > pcoll = ( > p > | beam.Create(data) > | beam.Map(lambda x:x).with_output_types(TestRow) > ) > > df = to_dataframe(pcoll) > sorted_df = df.groupby('key1').apply(lambda df: df.sort_values(by='key2') > sorted_df.to_parquet('test_beam_dataframe{}.parquet'.format(str > (uuid.uuid4())[:8]), engine='pyarrow', index=False) > > On Fri, Apr 2, 2021 at 10:00 AM Wenbing Bai <[email protected]> > wrote: > >> Thank you, Robert and Brian. >> >> I'd like to try this out. I am trying to distribute my dataset to nodes, >> sort each partition by some key and then store each partition to its own >> file. >> >> Wenbing >> >> On Fri, Apr 2, 2021 at 9:23 AM Brian Hulette <[email protected]> wrote: >> >>> Note groupby.apply [1] in particular should be able to do what you want, >>> something like: >>> >>> df.groupby('key1').apply(lambda df: df.sort_values('key2')) >>> >>> But as Robert noted we don't make any guarantees about preserving this >>> ordering later in the pipeline. For this reason I actually just sent a PR >>> to disallow sort_values on the entire dataset [2]. >>> >>> Brian >>> >>> [1] https://github.com/apache/beam/pull/13843 >>> [2] https://github.com/apache/beam/pull/14324 >>> >>> On Fri, Apr 2, 2021 at 9:15 AM Robert Bradshaw <[email protected]> >>> wrote: >>> >>>> Thanks for trying this out. >>>> >>>> Better support for groupby (e.g. >>>> https://github.com/apache/beam/pull/13843 , >>>> https://github.com/apache/beam/pull/13637) will be available in the >>>> next Beam release (2.29, in progress, but you could try out head if you >>>> want). Note, however, that Beam PCollections are by definition unordered, >>>> so unless you sort a partition and immediately do something with it that >>>> ordering may not be preserved. If you could let us know what you're trying >>>> to do with this ordering that would be helpful. >>>> >>>> - Robert >>>> >>>> >>>> On Thu, Apr 1, 2021 at 7:31 PM Wenbing Bai <[email protected]> >>>> wrote: >>>> >>>>> Hi Beam users, >>>>> >>>>> I have a user case to partition my PCollection by some key, and then >>>>> sort my rows within the same partition by some other key. >>>>> >>>>> I feel Beam Dataframe could be a candidate solution, but I cannot >>>>> figure out how to make it work. Specifically, I tried df.groupby where I >>>>> expect my data will be distributed to different nodes. I also tried >>>>> df.sort_values, but it will sort my whole dataset, which is not what I >>>>> need. >>>>> >>>>> Can someone shed some light on this? >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> Wenbing Bai >>>>> >>>>> Senior Software Engineer >>>>> >>>>> Data Infrastructure, Cruise >>>>> >>>>> Pronouns: She/Her >>>>> >>>>> >>>>> >>>>> *Confidentiality Note:* We care about protecting our proprietary >>>>> information, confidential material, and trade secrets. This message >>>>> may contain some or all of those things. Cruise will suffer material harm >>>>> if anyone other than the intended recipient disseminates or takes any >>>>> action based on this message. If you have received this message (including >>>>> any attachments) in error, please delete it immediately and notify the >>>>> sender promptly. >>>> >>>> >> >> -- >> >> >> >> >> >> Wenbing Bai >> >> Senior Software Engineer >> >> Data Infrastructure, Cruise >> >> Pronouns: She/Her >> >> > > -- > > > > > > Wenbing Bai > > Senior Software Engineer > > Data Infrastructure, Cruise > > Pronouns: She/Her > > > > *Confidentiality Note:* We care about protecting our proprietary > information, confidential material, and trade secrets. This message may > contain some or all of those things. Cruise will suffer material harm if > anyone other than the intended recipient disseminates or takes any action > based on this message. If you have received this message (including any > attachments) in error, please delete it immediately and notify the sender > promptly.
