Great idea!

On Fri, 24 Apr 2020, 22:33 Ismaël Mejía, <ieme...@gmail.com> wrote:

> Sounds like a good addition to the Beam patterns page Reza :)
>
> On Fri, Apr 24, 2020 at 3:22 AM Aniruddh Sharma <asharma...@gmail.com>
> wrote:
> >
> > Thanks Robert,
> >
> > This is a life saver and its a great help :). It works like a charm.
> >
> > Thanks
> > Aniruddh
> >
> > On Thu, Apr 23, 2020 at 4:45 PM Robert Bradshaw <rober...@google.com>
> wrote:
> >>
> >> I may have misinterpreted your email, I thought you didn't have a need
> for keys at all. If this is actually the case, you don't need a GroupByKey,
> just have your DoFn take Rows as input, and emit List<Row> as output. That
> is, it's a DoFn<Row, List<Row>>.
> >>
> >> You can buffer multiple Rows in an instance variable between process
> element calls. For example,
> >>
> >> class MyBufferingDoFn<T, List<T>> {
> >>   List<T> buffer = new ArrayList<>();
> >>   @ProcessElement public void processElement(T elt,
> OutputReceiver<List<T>> out) {
> >>     buffer.append(out);
> >>     if (buffer.size() > 100) {
> >>       out.output(buffer);
> >>       buffer = new ArrayList<>();
> >>     }
> >>   }
> >>   @FinishBundle public void finishBundle(OutputReceiver<List<T>> out) {
> >>     out.output(buffer);
> >>     buffer = new ArrayList<>();
> >>   }
> >> }
> >>
> >> See
> https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/ParDo.html
> for more information on the lifetime of DoFns.
> >>
> >> As for why your GBK is taking so long, yes, this can be a bottleneck.
> However, it should be noted that Dataflow (like most other runners)
> executes this step in conjunction with other steps as part of a "fused
> stage." So if your pipeline looks like
> >>
> >>     Read -> DoFnA -> GBK -> DoFnB -> Write
> >>
> >> then Read, DoFnA, and GBK[part1] will execute concurrently (all
> starting up almost immediately), one element at at time, and when that's
> finished, GBK[part2, DoFnB, Write will execute concurrently, one element at
> a time, so you can't just look at the last unfinished stage to determine
> where the bottleneck is. (One helpful tool, however, is looking at the
> amount of time spent on each step in the UI.)
> >>
> >> Hopefully that helps.
> >>
> >> - Robert
> >>
> >>
> >> On Thu, Apr 23, 2020 at 12:43 PM Aniruddh Sharma <asharma...@gmail.com>
> wrote:
> >>>
> >>> Thanks Robert and Luke
> >>>
> >>> This approach seems good to me. I am trying that , i have to include a
> GroupBy to make Iterable<Rows> available to do ParDo function to do same.
> Now GroupBy is a bottleneck, its working for last 2 hours and proceed only
> 40 GB data (still waiting for rest of 100's of GB of data).
> >>>
> >>> Currently I used GroupByKey.Create()
> >>>
> >>> What's recommended way to use what key to make it execute faster like
> same key for all rows, vs different key for each row vs same row for a
> group of keys.
> >>>
> >>> Thanks
> >>> Aniruddh
> >>>
> >>> On Thu, Apr 23, 2020 at 12:47 PM Luke Cwik <lc...@google.com> wrote:
> >>>>
> >>>> As Robert suggested, what prevents you from doing:
> >>>> ReadFromBQ -> ParDo(BatchInMemory) -> DLP
> >>>> where BatchInMemory stores elements in the @ProcessElement method in
> an in memory list and produce output every time the list is large enough
> with a final output in the @FinishBundle method?
> >>>>
> >>>> On Thu, Apr 23, 2020 at 9:42 AM Aniruddh Sharma <asharma...@gmail.com>
> wrote:
> >>>>>
> >>>>> Hi Luke
> >>>>>
> >>>>> Sorry forgot to mention the functions. Dataflow adds following
> function and ["PartitionKeys", new GroupByKeyAndSortValuesOnly] this is
> super slow, How to choose keys to make it faster ?
> >>>>>
> >>>>>  .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<>()))
> >>>>>           .setCoder(
> >>>>>               KvCoder.of(
> >>>>>                   keyCoder,
> >>>>>                   KvCoder.of(InstantCoder.of(),
> WindowedValue.getFullCoder(kvCoder, windowCoder))))
> >>>>>
> >>>>>           // Group by key and sort by timestamp, dropping windows as
> they are reified
> >>>>>           .apply("PartitionKeys", new
> GroupByKeyAndSortValuesOnly<>())
> >>>>>
> >>>>>           // The GBKO sets the windowing strategy to the global
> default
> >>>>>           .setWindowingStrategyInternal(inputWindowingStrategy);
> >>>>>
> >>>>> THanks
> >>>>> ANiruddh
> >>>>>
> >>>>> On 2020/04/23 16:35:58, Aniruddh Sharma <asharma...@gmail.com>
> wrote:
> >>>>> > Thanks Luke for your response.
> >>>>> >
> >>>>> > My use case is following.
> >>>>> > a) I read data from BQ (TableRow)
> >>>>> > b) Convert it into (Table.Row) for DLP calls.
> >>>>> > c) have to batch Table.Row collection up to a max size of 512 KB
> (i.e fit may rows from BQ into a single DLP table) and call DLP.
> >>>>> >
> >>>>> > Functionally, I don't have a need of key and window. As I just
> want to fit rows in DLP table up to a max size.
> >>>>> >
> >>>>> > In batch mode, when I call StateFulAPI,
> >>>>> > it adds a
> "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly" step and this
> step is super slow. Like it is running on 50 node cluster for 800 GB data
> for last 10 hours.
> >>>>> >
> >>>>> > This step is not added when I call Dataflow in streaming mode. But
> I can't call it in Streaming mode for other reasons.
> >>>>> >
> >>>>> > So I am trying to understand following
> >>>>> > a) Either I give a hint somehow to Dataflow runner not to add this
> step "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"  at all,
> then I don't have any issues.
> >>>>> > b) if it adds this step, then how should I choose my ARTIFICIALLY
> created keys that step can execute as fast as possible. It does a SORT by
> on timestamps on records. As I don't have any functional key requirement,
> shall I choose same keys for all rows vs randomkey for some rows vs random
> key for each row; what timestamps shall I add same for all rows ? to make
> this function work faster.
> >>>>> >
> >>>>> > Thanks
> >>>>> > Aniruddh
> >>>>> >
> >>>>> > On 2020/04/23 16:15:44, Luke Cwik <lc...@google.com> wrote:
> >>>>> > > Stateful & timely operations are always per key and window which
> is the
> >>>>> > > GbkBeforeStatefulParDo is being added. Do you not need your
> stateful &
> >>>>> > > timely operation to be done per key and window, if so can you
> explain
> >>>>> > > further?
> >>>>> > >
> >>>>> > > On Thu, Apr 23, 2020 at 6:29 AM Aniruddh Sharma <
> asharma...@gmail.com>
> >>>>> > > wrote:
> >>>>> > >
> >>>>> > > > Hi Kenn
> >>>>> > > >
> >>>>> > > > Thanks for your guidance, I understand that batch mode waits
> for previous
> >>>>> > > > stage. But the real issue in this particular case is not only
> this.
> >>>>> > > >
> >>>>> > > > Dataflow runner adds a step automatically
> >>>>> > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which not
> only waits
> >>>>> > > > for previous stage but it waits for a very very very long
> time. Is there a
> >>>>> > > > way to give hint to Dataflow runner not to add this step, as
> in my case I
> >>>>> > > > functionally do not require this step.
> >>>>> > > >
> >>>>> > > > Thanks for your suggestion, will create another thread to
> understand BQ
> >>>>> > > > options
> >>>>> > > >
> >>>>> > > > Thanks
> >>>>> > > > Aniruddh
> >>>>> > > >
> >>>>> > > > On 2020/04/23 03:51:31, Kenneth Knowles <k...@apache.org>
> wrote:
> >>>>> > > > > The definition of batch mode for Dataflow is this:
> completely compute the
> >>>>> > > > > result of one stage of computation before starting the next
> stage. There
> >>>>> > > > is
> >>>>> > > > > no way around this. It does not have to do with using state
> and timers.
> >>>>> > > > >
> >>>>> > > > > If you are working with state & timers & triggers, and you
> are hoping for
> >>>>> > > > > output before the pipeline is completely terminated, then
> you most likely
> >>>>> > > > > want streaming mode. Perhaps it is best to investigate the
> BQ read
> >>>>> > > > > performance issue.
> >>>>> > > > >
> >>>>> > > > > Kenn
> >>>>> > > > >
> >>>>> > > > > On Wed, Apr 22, 2020 at 4:04 PM Aniruddh Sharma <
> asharma...@gmail.com>
> >>>>> > > > > wrote:
> >>>>> > > > >
> >>>>> > > > > > Hi
> >>>>> > > > > >
> >>>>> > > > > > I am reading a bounded collection from BQ.
> >>>>> > > > > >
> >>>>> > > > > > I have to use a Stateful & Timely operation.
> >>>>> > > > > >
> >>>>> > > > > > a) I am invoking job in batch mode. Dataflow runner adds a
> step
> >>>>> > > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which
> has
> >>>>> > > > partitionBy.
> >>>>> > > > > > This partitionBy waits for all the data to come and
> becomes a
> >>>>> > > > bottleneck.
> >>>>> > > > > > when I read about its documentation it seems its objective
> it to be
> >>>>> > > > added
> >>>>> > > > > > when there are no windows.
> >>>>> > > > > >
> >>>>> > > > > > I tried added windows and triggering them before stateful
> step, but
> >>>>> > > > > > everything comes to this partitionBy step and waits till
> all data is
> >>>>> > > > here.
> >>>>> > > > > >
> >>>>> > > > > > Is there a way to write code in some way (like window etc)
> or give
> >>>>> > > > > > Dataflow a hint not to add this step in.
> >>>>> > > > > >
> >>>>> > > > > > b) I dont want to call this job in streaming mode, When I
> call in
> >>>>> > > > > > streaming mode, this Dataflow runner does not add this
> step, but in
> >>>>> > > > > > Streaming BQ read becomes a bottleneck.
> >>>>> > > > > >
> >>>>> > > > > > So either I have to solve how I read BQ faster if I call
> job in
> >>>>> > > > Streaming
> >>>>> > > > > > mode or How I bypass this partitionBy from
> >>>>> > > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" if I
> invoke job in
> >>>>> > > > > > batch mode ?
> >>>>> > > > > >
> >>>>> > > > > > Thanks
> >>>>> > > > > > Aniruddh
> >>>>> > > > > >
> >>>>> > > > > >
> >>>>> > > > > >
> >>>>> > > > > >
> >>>>> > > > >
> >>>>> > > >
> >>>>> > >
> >>>>> >
>

Reply via email to