Sounds like a good addition to the Beam patterns page Reza :)
On Fri, Apr 24, 2020 at 3:22 AM Aniruddh Sharma <asharma...@gmail.com> wrote: > > Thanks Robert, > > This is a life saver and its a great help :). It works like a charm. > > Thanks > Aniruddh > > On Thu, Apr 23, 2020 at 4:45 PM Robert Bradshaw <rober...@google.com> wrote: >> >> I may have misinterpreted your email, I thought you didn't have a need for >> keys at all. If this is actually the case, you don't need a GroupByKey, just >> have your DoFn take Rows as input, and emit List<Row> as output. That is, >> it's a DoFn<Row, List<Row>>. >> >> You can buffer multiple Rows in an instance variable between process element >> calls. For example, >> >> class MyBufferingDoFn<T, List<T>> { >> List<T> buffer = new ArrayList<>(); >> @ProcessElement public void processElement(T elt, OutputReceiver<List<T>> >> out) { >> buffer.append(out); >> if (buffer.size() > 100) { >> out.output(buffer); >> buffer = new ArrayList<>(); >> } >> } >> @FinishBundle public void finishBundle(OutputReceiver<List<T>> out) { >> out.output(buffer); >> buffer = new ArrayList<>(); >> } >> } >> >> See >> https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/ParDo.html >> for more information on the lifetime of DoFns. >> >> As for why your GBK is taking so long, yes, this can be a bottleneck. >> However, it should be noted that Dataflow (like most other runners) executes >> this step in conjunction with other steps as part of a "fused stage." So if >> your pipeline looks like >> >> Read -> DoFnA -> GBK -> DoFnB -> Write >> >> then Read, DoFnA, and GBK[part1] will execute concurrently (all starting up >> almost immediately), one element at at time, and when that's finished, >> GBK[part2, DoFnB, Write will execute concurrently, one element at a time, so >> you can't just look at the last unfinished stage to determine where the >> bottleneck is. (One helpful tool, however, is looking at the amount of time >> spent on each step in the UI.) >> >> Hopefully that helps. >> >> - Robert >> >> >> On Thu, Apr 23, 2020 at 12:43 PM Aniruddh Sharma <asharma...@gmail.com> >> wrote: >>> >>> Thanks Robert and Luke >>> >>> This approach seems good to me. I am trying that , i have to include a >>> GroupBy to make Iterable<Rows> available to do ParDo function to do same. >>> Now GroupBy is a bottleneck, its working for last 2 hours and proceed only >>> 40 GB data (still waiting for rest of 100's of GB of data). >>> >>> Currently I used GroupByKey.Create() >>> >>> What's recommended way to use what key to make it execute faster like same >>> key for all rows, vs different key for each row vs same row for a group of >>> keys. >>> >>> Thanks >>> Aniruddh >>> >>> On Thu, Apr 23, 2020 at 12:47 PM Luke Cwik <lc...@google.com> wrote: >>>> >>>> As Robert suggested, what prevents you from doing: >>>> ReadFromBQ -> ParDo(BatchInMemory) -> DLP >>>> where BatchInMemory stores elements in the @ProcessElement method in an in >>>> memory list and produce output every time the list is large enough with a >>>> final output in the @FinishBundle method? >>>> >>>> On Thu, Apr 23, 2020 at 9:42 AM Aniruddh Sharma <asharma...@gmail.com> >>>> wrote: >>>>> >>>>> Hi Luke >>>>> >>>>> Sorry forgot to mention the functions. Dataflow adds following function >>>>> and ["PartitionKeys", new GroupByKeyAndSortValuesOnly] this is super >>>>> slow, How to choose keys to make it faster ? >>>>> >>>>> .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<>())) >>>>> .setCoder( >>>>> KvCoder.of( >>>>> keyCoder, >>>>> KvCoder.of(InstantCoder.of(), >>>>> WindowedValue.getFullCoder(kvCoder, windowCoder)))) >>>>> >>>>> // Group by key and sort by timestamp, dropping windows as they >>>>> are reified >>>>> .apply("PartitionKeys", new GroupByKeyAndSortValuesOnly<>()) >>>>> >>>>> // The GBKO sets the windowing strategy to the global default >>>>> .setWindowingStrategyInternal(inputWindowingStrategy); >>>>> >>>>> THanks >>>>> ANiruddh >>>>> >>>>> On 2020/04/23 16:35:58, Aniruddh Sharma <asharma...@gmail.com> wrote: >>>>> > Thanks Luke for your response. >>>>> > >>>>> > My use case is following. >>>>> > a) I read data from BQ (TableRow) >>>>> > b) Convert it into (Table.Row) for DLP calls. >>>>> > c) have to batch Table.Row collection up to a max size of 512 KB (i.e >>>>> > fit may rows from BQ into a single DLP table) and call DLP. >>>>> > >>>>> > Functionally, I don't have a need of key and window. As I just want to >>>>> > fit rows in DLP table up to a max size. >>>>> > >>>>> > In batch mode, when I call StateFulAPI, >>>>> > it adds a "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly" >>>>> > step and this step is super slow. Like it is running on 50 node cluster >>>>> > for 800 GB data for last 10 hours. >>>>> > >>>>> > This step is not added when I call Dataflow in streaming mode. But I >>>>> > can't call it in Streaming mode for other reasons. >>>>> > >>>>> > So I am trying to understand following >>>>> > a) Either I give a hint somehow to Dataflow runner not to add this step >>>>> > "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly" at all, then >>>>> > I don't have any issues. >>>>> > b) if it adds this step, then how should I choose my ARTIFICIALLY >>>>> > created keys that step can execute as fast as possible. It does a SORT >>>>> > by on timestamps on records. As I don't have any functional key >>>>> > requirement, shall I choose same keys for all rows vs randomkey for >>>>> > some rows vs random key for each row; what timestamps shall I add same >>>>> > for all rows ? to make this function work faster. >>>>> > >>>>> > Thanks >>>>> > Aniruddh >>>>> > >>>>> > On 2020/04/23 16:15:44, Luke Cwik <lc...@google.com> wrote: >>>>> > > Stateful & timely operations are always per key and window which is >>>>> > > the >>>>> > > GbkBeforeStatefulParDo is being added. Do you not need your stateful & >>>>> > > timely operation to be done per key and window, if so can you explain >>>>> > > further? >>>>> > > >>>>> > > On Thu, Apr 23, 2020 at 6:29 AM Aniruddh Sharma <asharma...@gmail.com> >>>>> > > wrote: >>>>> > > >>>>> > > > Hi Kenn >>>>> > > > >>>>> > > > Thanks for your guidance, I understand that batch mode waits for >>>>> > > > previous >>>>> > > > stage. But the real issue in this particular case is not only this. >>>>> > > > >>>>> > > > Dataflow runner adds a step automatically >>>>> > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which not only >>>>> > > > waits >>>>> > > > for previous stage but it waits for a very very very long time. Is >>>>> > > > there a >>>>> > > > way to give hint to Dataflow runner not to add this step, as in my >>>>> > > > case I >>>>> > > > functionally do not require this step. >>>>> > > > >>>>> > > > Thanks for your suggestion, will create another thread to >>>>> > > > understand BQ >>>>> > > > options >>>>> > > > >>>>> > > > Thanks >>>>> > > > Aniruddh >>>>> > > > >>>>> > > > On 2020/04/23 03:51:31, Kenneth Knowles <k...@apache.org> wrote: >>>>> > > > > The definition of batch mode for Dataflow is this: completely >>>>> > > > > compute the >>>>> > > > > result of one stage of computation before starting the next >>>>> > > > > stage. There >>>>> > > > is >>>>> > > > > no way around this. It does not have to do with using state and >>>>> > > > > timers. >>>>> > > > > >>>>> > > > > If you are working with state & timers & triggers, and you are >>>>> > > > > hoping for >>>>> > > > > output before the pipeline is completely terminated, then you >>>>> > > > > most likely >>>>> > > > > want streaming mode. Perhaps it is best to investigate the BQ read >>>>> > > > > performance issue. >>>>> > > > > >>>>> > > > > Kenn >>>>> > > > > >>>>> > > > > On Wed, Apr 22, 2020 at 4:04 PM Aniruddh Sharma >>>>> > > > > <asharma...@gmail.com> >>>>> > > > > wrote: >>>>> > > > > >>>>> > > > > > Hi >>>>> > > > > > >>>>> > > > > > I am reading a bounded collection from BQ. >>>>> > > > > > >>>>> > > > > > I have to use a Stateful & Timely operation. >>>>> > > > > > >>>>> > > > > > a) I am invoking job in batch mode. Dataflow runner adds a step >>>>> > > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which has >>>>> > > > partitionBy. >>>>> > > > > > This partitionBy waits for all the data to come and becomes a >>>>> > > > bottleneck. >>>>> > > > > > when I read about its documentation it seems its objective it >>>>> > > > > > to be >>>>> > > > added >>>>> > > > > > when there are no windows. >>>>> > > > > > >>>>> > > > > > I tried added windows and triggering them before stateful step, >>>>> > > > > > but >>>>> > > > > > everything comes to this partitionBy step and waits till all >>>>> > > > > > data is >>>>> > > > here. >>>>> > > > > > >>>>> > > > > > Is there a way to write code in some way (like window etc) or >>>>> > > > > > give >>>>> > > > > > Dataflow a hint not to add this step in. >>>>> > > > > > >>>>> > > > > > b) I dont want to call this job in streaming mode, When I call >>>>> > > > > > in >>>>> > > > > > streaming mode, this Dataflow runner does not add this step, >>>>> > > > > > but in >>>>> > > > > > Streaming BQ read becomes a bottleneck. >>>>> > > > > > >>>>> > > > > > So either I have to solve how I read BQ faster if I call job in >>>>> > > > Streaming >>>>> > > > > > mode or How I bypass this partitionBy from >>>>> > > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" if I >>>>> > > > > > invoke job in >>>>> > > > > > batch mode ? >>>>> > > > > > >>>>> > > > > > Thanks >>>>> > > > > > Aniruddh >>>>> > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> > > > > > >>>>> > > > > >>>>> > > > >>>>> > > >>>>> >