Thanks Robert,

This is a life saver and its a great help :). It works like a charm.

Thanks
Aniruddh

On Thu, Apr 23, 2020 at 4:45 PM Robert Bradshaw <rober...@google.com> wrote:

> I may have misinterpreted your email, I thought you didn't have a need for
> keys at all. If this is actually the case, you don't need a GroupByKey,
> just have your DoFn take Rows as input, and emit List<Row> as output. That
> is, it's a DoFn<Row, List<Row>>.
>
> You can buffer multiple Rows in an instance variable between process
> element calls. For example,
>
> class MyBufferingDoFn<T, List<T>> {
>   List<T> buffer = new ArrayList<>();
>   @ProcessElement public void processElement(T elt,
> OutputReceiver<List<T>> out) {
>     buffer.append(out);
>     if (buffer.size() > 100) {
>       out.output(buffer);
>       buffer = new ArrayList<>();
>     }
>   }
>   @FinishBundle public void finishBundle(OutputReceiver<List<T>> out) {
>     out.output(buffer);
>     buffer = new ArrayList<>();
>   }
> }
>
> See
> https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/ParDo.html
>  for
> more information on the lifetime of DoFns.
>
> As for why your GBK is taking so long, yes, this can be a bottleneck.
> However, it should be noted that Dataflow (like most other runners)
> executes this step in conjunction with other steps as part of a "fused
> stage." So if your pipeline looks like
>
>     Read -> DoFnA -> GBK -> DoFnB -> Write
>
> then Read, DoFnA, and GBK[part1] will execute concurrently (all starting
> up almost immediately), one element at at time, and when that's finished,
> GBK[part2, DoFnB, Write will execute concurrently, one element at a time,
> so you can't just look at the last unfinished stage to determine where the
> bottleneck is. (One helpful tool, however, is looking at the amount of time
> spent on each step in the UI.)
>
> Hopefully that helps.
>
> - Robert
>
>
> On Thu, Apr 23, 2020 at 12:43 PM Aniruddh Sharma <asharma...@gmail.com>
> wrote:
>
>> Thanks Robert and Luke
>>
>> This approach seems good to me. I am trying that , i have to include a
>> GroupBy to make Iterable<Rows> available to do ParDo function to do same.
>> Now GroupBy is a bottleneck, its working for last 2 hours and proceed only
>> 40 GB data (still waiting for rest of 100's of GB of data).
>>
>> Currently I used GroupByKey.Create()
>>
>> What's recommended way to use what key to make it execute faster like
>> same key for all rows, vs different key for each row vs same row for a
>> group of keys.
>>
>> Thanks
>> Aniruddh
>>
>> On Thu, Apr 23, 2020 at 12:47 PM Luke Cwik <lc...@google.com> wrote:
>>
>>> As Robert suggested, what prevents you from doing:
>>> ReadFromBQ -> ParDo(BatchInMemory) -> DLP
>>> where BatchInMemory stores elements in the @ProcessElement method in an
>>> in memory list and produce output every time the list is large enough with
>>> a final output in the @FinishBundle method?
>>>
>>> On Thu, Apr 23, 2020 at 9:42 AM Aniruddh Sharma <asharma...@gmail.com>
>>> wrote:
>>>
>>>> Hi Luke
>>>>
>>>> Sorry forgot to mention the functions. Dataflow adds following function
>>>> and ["PartitionKeys", new GroupByKeyAndSortValuesOnly] this is super slow,
>>>> How to choose keys to make it faster ?
>>>>
>>>>  .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<>()))
>>>>           .setCoder(
>>>>               KvCoder.of(
>>>>                   keyCoder,
>>>>                   KvCoder.of(InstantCoder.of(),
>>>> WindowedValue.getFullCoder(kvCoder, windowCoder))))
>>>>
>>>>           // Group by key and sort by timestamp, dropping windows as
>>>> they are reified
>>>>           .apply("PartitionKeys", new GroupByKeyAndSortValuesOnly<>())
>>>>
>>>>           // The GBKO sets the windowing strategy to the global default
>>>>           .setWindowingStrategyInternal(inputWindowingStrategy);
>>>>
>>>> THanks
>>>> ANiruddh
>>>>
>>>> On 2020/04/23 16:35:58, Aniruddh Sharma <asharma...@gmail.com> wrote:
>>>> > Thanks Luke for your response.
>>>> >
>>>> > My use case is following.
>>>> > a) I read data from BQ (TableRow)
>>>> > b) Convert it into (Table.Row) for DLP calls.
>>>> > c) have to batch Table.Row collection up to a max size of 512 KB (i.e
>>>> fit may rows from BQ into a single DLP table) and call DLP.
>>>> >
>>>> > Functionally, I don't have a need of key and window. As I just want
>>>> to fit rows in DLP table up to a max size.
>>>> >
>>>> > In batch mode, when I call StateFulAPI,
>>>> > it adds a "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"
>>>> step and this step is super slow. Like it is running on 50 node cluster for
>>>> 800 GB data for last 10 hours.
>>>> >
>>>> > This step is not added when I call Dataflow in streaming mode. But I
>>>> can't call it in Streaming mode for other reasons.
>>>> >
>>>> > So I am trying to understand following
>>>> > a) Either I give a hint somehow to Dataflow runner not to add this
>>>> step "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"  at all,
>>>> then I don't have any issues.
>>>> > b) if it adds this step, then how should I choose my ARTIFICIALLY
>>>> created keys that step can execute as fast as possible. It does a SORT by
>>>> on timestamps on records. As I don't have any functional key requirement,
>>>> shall I choose same keys for all rows vs randomkey for some rows vs random
>>>> key for each row; what timestamps shall I add same for all rows ? to make
>>>> this function work faster.
>>>> >
>>>> > Thanks
>>>> > Aniruddh
>>>> >
>>>> > On 2020/04/23 16:15:44, Luke Cwik <lc...@google.com> wrote:
>>>> > > Stateful & timely operations are always per key and window which is
>>>> the
>>>> > > GbkBeforeStatefulParDo is being added. Do you not need your
>>>> stateful &
>>>> > > timely operation to be done per key and window, if so can you
>>>> explain
>>>> > > further?
>>>> > >
>>>> > > On Thu, Apr 23, 2020 at 6:29 AM Aniruddh Sharma <
>>>> asharma...@gmail.com>
>>>> > > wrote:
>>>> > >
>>>> > > > Hi Kenn
>>>> > > >
>>>> > > > Thanks for your guidance, I understand that batch mode waits for
>>>> previous
>>>> > > > stage. But the real issue in this particular case is not only
>>>> this.
>>>> > > >
>>>> > > > Dataflow runner adds a step automatically
>>>> > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which not
>>>> only waits
>>>> > > > for previous stage but it waits for a very very very long time.
>>>> Is there a
>>>> > > > way to give hint to Dataflow runner not to add this step, as in
>>>> my case I
>>>> > > > functionally do not require this step.
>>>> > > >
>>>> > > > Thanks for your suggestion, will create another thread to
>>>> understand BQ
>>>> > > > options
>>>> > > >
>>>> > > > Thanks
>>>> > > > Aniruddh
>>>> > > >
>>>> > > > On 2020/04/23 03:51:31, Kenneth Knowles <k...@apache.org> wrote:
>>>> > > > > The definition of batch mode for Dataflow is this: completely
>>>> compute the
>>>> > > > > result of one stage of computation before starting the next
>>>> stage. There
>>>> > > > is
>>>> > > > > no way around this. It does not have to do with using state and
>>>> timers.
>>>> > > > >
>>>> > > > > If you are working with state & timers & triggers, and you are
>>>> hoping for
>>>> > > > > output before the pipeline is completely terminated, then you
>>>> most likely
>>>> > > > > want streaming mode. Perhaps it is best to investigate the BQ
>>>> read
>>>> > > > > performance issue.
>>>> > > > >
>>>> > > > > Kenn
>>>> > > > >
>>>> > > > > On Wed, Apr 22, 2020 at 4:04 PM Aniruddh Sharma <
>>>> asharma...@gmail.com>
>>>> > > > > wrote:
>>>> > > > >
>>>> > > > > > Hi
>>>> > > > > >
>>>> > > > > > I am reading a bounded collection from BQ.
>>>> > > > > >
>>>> > > > > > I have to use a Stateful & Timely operation.
>>>> > > > > >
>>>> > > > > > a) I am invoking job in batch mode. Dataflow runner adds a
>>>> step
>>>> > > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which has
>>>> > > > partitionBy.
>>>> > > > > > This partitionBy waits for all the data to come and becomes a
>>>> > > > bottleneck.
>>>> > > > > > when I read about its documentation it seems its objective it
>>>> to be
>>>> > > > added
>>>> > > > > > when there are no windows.
>>>> > > > > >
>>>> > > > > > I tried added windows and triggering them before stateful
>>>> step, but
>>>> > > > > > everything comes to this partitionBy step and waits till all
>>>> data is
>>>> > > > here.
>>>> > > > > >
>>>> > > > > > Is there a way to write code in some way (like window etc) or
>>>> give
>>>> > > > > > Dataflow a hint not to add this step in.
>>>> > > > > >
>>>> > > > > > b) I dont want to call this job in streaming mode, When I
>>>> call in
>>>> > > > > > streaming mode, this Dataflow runner does not add this step,
>>>> but in
>>>> > > > > > Streaming BQ read becomes a bottleneck.
>>>> > > > > >
>>>> > > > > > So either I have to solve how I read BQ faster if I call job
>>>> in
>>>> > > > Streaming
>>>> > > > > > mode or How I bypass this partitionBy from
>>>> > > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" if I
>>>> invoke job in
>>>> > > > > > batch mode ?
>>>> > > > > >
>>>> > > > > > Thanks
>>>> > > > > > Aniruddh
>>>> > > > > >
>>>> > > > > >
>>>> > > > > >
>>>> > > > > >
>>>> > > > >
>>>> > > >
>>>> > >
>>>> >
>>>>
>>>

Reply via email to