Sounds like a good addition to the Beam patterns page Reza :)

On Fri, Apr 24, 2020 at 3:22 AM Aniruddh Sharma <asharma...@gmail.com> wrote:
>
> Thanks Robert,
>
> This is a life saver and its a great help :). It works like a charm.
>
> Thanks
> Aniruddh
>
> On Thu, Apr 23, 2020 at 4:45 PM Robert Bradshaw <rober...@google.com> wrote:
>>
>> I may have misinterpreted your email, I thought you didn't have a need for 
>> keys at all. If this is actually the case, you don't need a GroupByKey, just 
>> have your DoFn take Rows as input, and emit List<Row> as output. That is, 
>> it's a DoFn<Row, List<Row>>.
>>
>> You can buffer multiple Rows in an instance variable between process element 
>> calls. For example,
>>
>> class MyBufferingDoFn<T, List<T>> {
>>   List<T> buffer = new ArrayList<>();
>>   @ProcessElement public void processElement(T elt, OutputReceiver<List<T>> 
>> out) {
>>     buffer.append(out);
>>     if (buffer.size() > 100) {
>>       out.output(buffer);
>>       buffer = new ArrayList<>();
>>     }
>>   }
>>   @FinishBundle public void finishBundle(OutputReceiver<List<T>> out) {
>>     out.output(buffer);
>>     buffer = new ArrayList<>();
>>   }
>> }
>>
>> See 
>> https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/ParDo.html
>>  for more information on the lifetime of DoFns.
>>
>> As for why your GBK is taking so long, yes, this can be a bottleneck. 
>> However, it should be noted that Dataflow (like most other runners) executes 
>> this step in conjunction with other steps as part of a "fused stage." So if 
>> your pipeline looks like
>>
>>     Read -> DoFnA -> GBK -> DoFnB -> Write
>>
>> then Read, DoFnA, and GBK[part1] will execute concurrently (all starting up 
>> almost immediately), one element at at time, and when that's finished, 
>> GBK[part2, DoFnB, Write will execute concurrently, one element at a time, so 
>> you can't just look at the last unfinished stage to determine where the 
>> bottleneck is. (One helpful tool, however, is looking at the amount of time 
>> spent on each step in the UI.)
>>
>> Hopefully that helps.
>>
>> - Robert
>>
>>
>> On Thu, Apr 23, 2020 at 12:43 PM Aniruddh Sharma <asharma...@gmail.com> 
>> wrote:
>>>
>>> Thanks Robert and Luke
>>>
>>> This approach seems good to me. I am trying that , i have to include a 
>>> GroupBy to make Iterable<Rows> available to do ParDo function to do same. 
>>> Now GroupBy is a bottleneck, its working for last 2 hours and proceed only 
>>> 40 GB data (still waiting for rest of 100's of GB of data).
>>>
>>> Currently I used GroupByKey.Create()
>>>
>>> What's recommended way to use what key to make it execute faster like same 
>>> key for all rows, vs different key for each row vs same row for a group of 
>>> keys.
>>>
>>> Thanks
>>> Aniruddh
>>>
>>> On Thu, Apr 23, 2020 at 12:47 PM Luke Cwik <lc...@google.com> wrote:
>>>>
>>>> As Robert suggested, what prevents you from doing:
>>>> ReadFromBQ -> ParDo(BatchInMemory) -> DLP
>>>> where BatchInMemory stores elements in the @ProcessElement method in an in 
>>>> memory list and produce output every time the list is large enough with a 
>>>> final output in the @FinishBundle method?
>>>>
>>>> On Thu, Apr 23, 2020 at 9:42 AM Aniruddh Sharma <asharma...@gmail.com> 
>>>> wrote:
>>>>>
>>>>> Hi Luke
>>>>>
>>>>> Sorry forgot to mention the functions. Dataflow adds following function 
>>>>> and ["PartitionKeys", new GroupByKeyAndSortValuesOnly] this is super 
>>>>> slow, How to choose keys to make it faster ?
>>>>>
>>>>>  .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<>()))
>>>>>           .setCoder(
>>>>>               KvCoder.of(
>>>>>                   keyCoder,
>>>>>                   KvCoder.of(InstantCoder.of(), 
>>>>> WindowedValue.getFullCoder(kvCoder, windowCoder))))
>>>>>
>>>>>           // Group by key and sort by timestamp, dropping windows as they 
>>>>> are reified
>>>>>           .apply("PartitionKeys", new GroupByKeyAndSortValuesOnly<>())
>>>>>
>>>>>           // The GBKO sets the windowing strategy to the global default
>>>>>           .setWindowingStrategyInternal(inputWindowingStrategy);
>>>>>
>>>>> THanks
>>>>> ANiruddh
>>>>>
>>>>> On 2020/04/23 16:35:58, Aniruddh Sharma <asharma...@gmail.com> wrote:
>>>>> > Thanks Luke for your response.
>>>>> >
>>>>> > My use case is following.
>>>>> > a) I read data from BQ (TableRow)
>>>>> > b) Convert it into (Table.Row) for DLP calls.
>>>>> > c) have to batch Table.Row collection up to a max size of 512 KB (i.e 
>>>>> > fit may rows from BQ into a single DLP table) and call DLP.
>>>>> >
>>>>> > Functionally, I don't have a need of key and window. As I just want to 
>>>>> > fit rows in DLP table up to a max size.
>>>>> >
>>>>> > In batch mode, when I call StateFulAPI,
>>>>> > it adds a "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly" 
>>>>> > step and this step is super slow. Like it is running on 50 node cluster 
>>>>> > for 800 GB data for last 10 hours.
>>>>> >
>>>>> > This step is not added when I call Dataflow in streaming mode. But I 
>>>>> > can't call it in Streaming mode for other reasons.
>>>>> >
>>>>> > So I am trying to understand following
>>>>> > a) Either I give a hint somehow to Dataflow runner not to add this step 
>>>>> > "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"  at all, then 
>>>>> > I don't have any issues.
>>>>> > b) if it adds this step, then how should I choose my ARTIFICIALLY 
>>>>> > created keys that step can execute as fast as possible. It does a SORT 
>>>>> > by on timestamps on records. As I don't have any functional key 
>>>>> > requirement, shall I choose same keys for all rows vs randomkey for 
>>>>> > some rows vs random key for each row; what timestamps shall I add same 
>>>>> > for all rows ? to make this function work faster.
>>>>> >
>>>>> > Thanks
>>>>> > Aniruddh
>>>>> >
>>>>> > On 2020/04/23 16:15:44, Luke Cwik <lc...@google.com> wrote:
>>>>> > > Stateful & timely operations are always per key and window which is 
>>>>> > > the
>>>>> > > GbkBeforeStatefulParDo is being added. Do you not need your stateful &
>>>>> > > timely operation to be done per key and window, if so can you explain
>>>>> > > further?
>>>>> > >
>>>>> > > On Thu, Apr 23, 2020 at 6:29 AM Aniruddh Sharma <asharma...@gmail.com>
>>>>> > > wrote:
>>>>> > >
>>>>> > > > Hi Kenn
>>>>> > > >
>>>>> > > > Thanks for your guidance, I understand that batch mode waits for 
>>>>> > > > previous
>>>>> > > > stage. But the real issue in this particular case is not only this.
>>>>> > > >
>>>>> > > > Dataflow runner adds a step automatically
>>>>> > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which not only 
>>>>> > > > waits
>>>>> > > > for previous stage but it waits for a very very very long time. Is 
>>>>> > > > there a
>>>>> > > > way to give hint to Dataflow runner not to add this step, as in my 
>>>>> > > > case I
>>>>> > > > functionally do not require this step.
>>>>> > > >
>>>>> > > > Thanks for your suggestion, will create another thread to 
>>>>> > > > understand BQ
>>>>> > > > options
>>>>> > > >
>>>>> > > > Thanks
>>>>> > > > Aniruddh
>>>>> > > >
>>>>> > > > On 2020/04/23 03:51:31, Kenneth Knowles <k...@apache.org> wrote:
>>>>> > > > > The definition of batch mode for Dataflow is this: completely 
>>>>> > > > > compute the
>>>>> > > > > result of one stage of computation before starting the next 
>>>>> > > > > stage. There
>>>>> > > > is
>>>>> > > > > no way around this. It does not have to do with using state and 
>>>>> > > > > timers.
>>>>> > > > >
>>>>> > > > > If you are working with state & timers & triggers, and you are 
>>>>> > > > > hoping for
>>>>> > > > > output before the pipeline is completely terminated, then you 
>>>>> > > > > most likely
>>>>> > > > > want streaming mode. Perhaps it is best to investigate the BQ read
>>>>> > > > > performance issue.
>>>>> > > > >
>>>>> > > > > Kenn
>>>>> > > > >
>>>>> > > > > On Wed, Apr 22, 2020 at 4:04 PM Aniruddh Sharma 
>>>>> > > > > <asharma...@gmail.com>
>>>>> > > > > wrote:
>>>>> > > > >
>>>>> > > > > > Hi
>>>>> > > > > >
>>>>> > > > > > I am reading a bounded collection from BQ.
>>>>> > > > > >
>>>>> > > > > > I have to use a Stateful & Timely operation.
>>>>> > > > > >
>>>>> > > > > > a) I am invoking job in batch mode. Dataflow runner adds a step
>>>>> > > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which has
>>>>> > > > partitionBy.
>>>>> > > > > > This partitionBy waits for all the data to come and becomes a
>>>>> > > > bottleneck.
>>>>> > > > > > when I read about its documentation it seems its objective it 
>>>>> > > > > > to be
>>>>> > > > added
>>>>> > > > > > when there are no windows.
>>>>> > > > > >
>>>>> > > > > > I tried added windows and triggering them before stateful step, 
>>>>> > > > > > but
>>>>> > > > > > everything comes to this partitionBy step and waits till all 
>>>>> > > > > > data is
>>>>> > > > here.
>>>>> > > > > >
>>>>> > > > > > Is there a way to write code in some way (like window etc) or 
>>>>> > > > > > give
>>>>> > > > > > Dataflow a hint not to add this step in.
>>>>> > > > > >
>>>>> > > > > > b) I dont want to call this job in streaming mode, When I call 
>>>>> > > > > > in
>>>>> > > > > > streaming mode, this Dataflow runner does not add this step, 
>>>>> > > > > > but in
>>>>> > > > > > Streaming BQ read becomes a bottleneck.
>>>>> > > > > >
>>>>> > > > > > So either I have to solve how I read BQ faster if I call job in
>>>>> > > > Streaming
>>>>> > > > > > mode or How I bypass this partitionBy from
>>>>> > > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" if I 
>>>>> > > > > > invoke job in
>>>>> > > > > > batch mode ?
>>>>> > > > > >
>>>>> > > > > > Thanks
>>>>> > > > > > Aniruddh
>>>>> > > > > >
>>>>> > > > > >
>>>>> > > > > >
>>>>> > > > > >
>>>>> > > > >
>>>>> > > >
>>>>> > >
>>>>> >

Reply via email to