Re: Stateful & Timely Call

2020-04-27 Thread Reza Rokni
Great idea!

On Fri, 24 Apr 2020, 22:33 Ismaël Mejía,  wrote:

> Sounds like a good addition to the Beam patterns page Reza :)
>
> On Fri, Apr 24, 2020 at 3:22 AM Aniruddh Sharma 
> wrote:
> >
> > Thanks Robert,
> >
> > This is a life saver and its a great help :). It works like a charm.
> >
> > Thanks
> > Aniruddh
> >
> > On Thu, Apr 23, 2020 at 4:45 PM Robert Bradshaw 
> wrote:
> >>
> >> I may have misinterpreted your email, I thought you didn't have a need
> for keys at all. If this is actually the case, you don't need a GroupByKey,
> just have your DoFn take Rows as input, and emit List as output. That
> is, it's a DoFn>.
> >>
> >> You can buffer multiple Rows in an instance variable between process
> element calls. For example,
> >>
> >> class MyBufferingDoFn> {
> >>   List buffer = new ArrayList<>();
> >>   @ProcessElement public void processElement(T elt,
> OutputReceiver> out) {
> >> buffer.append(out);
> >> if (buffer.size() > 100) {
> >>   out.output(buffer);
> >>   buffer = new ArrayList<>();
> >> }
> >>   }
> >>   @FinishBundle public void finishBundle(OutputReceiver> out) {
> >> out.output(buffer);
> >> buffer = new ArrayList<>();
> >>   }
> >> }
> >>
> >> See
> https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/ParDo.html
> for more information on the lifetime of DoFns.
> >>
> >> As for why your GBK is taking so long, yes, this can be a bottleneck.
> However, it should be noted that Dataflow (like most other runners)
> executes this step in conjunction with other steps as part of a "fused
> stage." So if your pipeline looks like
> >>
> >> Read -> DoFnA -> GBK -> DoFnB -> Write
> >>
> >> then Read, DoFnA, and GBK[part1] will execute concurrently (all
> starting up almost immediately), one element at at time, and when that's
> finished, GBK[part2, DoFnB, Write will execute concurrently, one element at
> a time, so you can't just look at the last unfinished stage to determine
> where the bottleneck is. (One helpful tool, however, is looking at the
> amount of time spent on each step in the UI.)
> >>
> >> Hopefully that helps.
> >>
> >> - Robert
> >>
> >>
> >> On Thu, Apr 23, 2020 at 12:43 PM Aniruddh Sharma 
> wrote:
> >>>
> >>> Thanks Robert and Luke
> >>>
> >>> This approach seems good to me. I am trying that , i have to include a
> GroupBy to make Iterable available to do ParDo function to do same.
> Now GroupBy is a bottleneck, its working for last 2 hours and proceed only
> 40 GB data (still waiting for rest of 100's of GB of data).
> >>>
> >>> Currently I used GroupByKey.Create()
> >>>
> >>> What's recommended way to use what key to make it execute faster like
> same key for all rows, vs different key for each row vs same row for a
> group of keys.
> >>>
> >>> Thanks
> >>> Aniruddh
> >>>
> >>> On Thu, Apr 23, 2020 at 12:47 PM Luke Cwik  wrote:
> 
>  As Robert suggested, what prevents you from doing:
>  ReadFromBQ -> ParDo(BatchInMemory) -> DLP
>  where BatchInMemory stores elements in the @ProcessElement method in
> an in memory list and produce output every time the list is large enough
> with a final output in the @FinishBundle method?
> 
>  On Thu, Apr 23, 2020 at 9:42 AM Aniruddh Sharma 
> wrote:
> >
> > Hi Luke
> >
> > Sorry forgot to mention the functions. Dataflow adds following
> function and ["PartitionKeys", new GroupByKeyAndSortValuesOnly] this is
> super slow, How to choose keys to make it faster ?
> >
> >  .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<>()))
> >   .setCoder(
> >   KvCoder.of(
> >   keyCoder,
> >   KvCoder.of(InstantCoder.of(),
> WindowedValue.getFullCoder(kvCoder, windowCoder
> >
> >   // Group by key and sort by timestamp, dropping windows as
> they are reified
> >   .apply("PartitionKeys", new
> GroupByKeyAndSortValuesOnly<>())
> >
> >   // The GBKO sets the windowing strategy to the global
> default
> >   .setWindowingStrategyInternal(inputWindowingStrategy);
> >
> > THanks
> > ANiruddh
> >
> > On 2020/04/23 16:35:58, Aniruddh Sharma 
> wrote:
> > > Thanks Luke for your response.
> > >
> > > My use case is following.
> > > a) I read data from BQ (TableRow)
> > > b) Convert it into (Table.Row) for DLP calls.
> > > c) have to batch Table.Row collection up to a max size of 512 KB
> (i.e fit may rows from BQ into a single DLP table) and call DLP.
> > >
> > > Functionally, I don't have a need of key and window. As I just
> want to fit rows in DLP table up to a max size.
> > >
> > > In batch mode, when I call StateFulAPI,
> > > it adds a
> "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly" step and this
> step is super slow. Like it is running on 50 node cluster for 800 GB data
> for last 10 hours.
> > >
> > > This step is not added when I 

Re: Stateful & Timely Call

2020-04-24 Thread Ismaël Mejía
Sounds like a good addition to the Beam patterns page Reza :)

On Fri, Apr 24, 2020 at 3:22 AM Aniruddh Sharma  wrote:
>
> Thanks Robert,
>
> This is a life saver and its a great help :). It works like a charm.
>
> Thanks
> Aniruddh
>
> On Thu, Apr 23, 2020 at 4:45 PM Robert Bradshaw  wrote:
>>
>> I may have misinterpreted your email, I thought you didn't have a need for 
>> keys at all. If this is actually the case, you don't need a GroupByKey, just 
>> have your DoFn take Rows as input, and emit List as output. That is, 
>> it's a DoFn>.
>>
>> You can buffer multiple Rows in an instance variable between process element 
>> calls. For example,
>>
>> class MyBufferingDoFn> {
>>   List buffer = new ArrayList<>();
>>   @ProcessElement public void processElement(T elt, OutputReceiver> 
>> out) {
>> buffer.append(out);
>> if (buffer.size() > 100) {
>>   out.output(buffer);
>>   buffer = new ArrayList<>();
>> }
>>   }
>>   @FinishBundle public void finishBundle(OutputReceiver> out) {
>> out.output(buffer);
>> buffer = new ArrayList<>();
>>   }
>> }
>>
>> See 
>> https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/ParDo.html
>>  for more information on the lifetime of DoFns.
>>
>> As for why your GBK is taking so long, yes, this can be a bottleneck. 
>> However, it should be noted that Dataflow (like most other runners) executes 
>> this step in conjunction with other steps as part of a "fused stage." So if 
>> your pipeline looks like
>>
>> Read -> DoFnA -> GBK -> DoFnB -> Write
>>
>> then Read, DoFnA, and GBK[part1] will execute concurrently (all starting up 
>> almost immediately), one element at at time, and when that's finished, 
>> GBK[part2, DoFnB, Write will execute concurrently, one element at a time, so 
>> you can't just look at the last unfinished stage to determine where the 
>> bottleneck is. (One helpful tool, however, is looking at the amount of time 
>> spent on each step in the UI.)
>>
>> Hopefully that helps.
>>
>> - Robert
>>
>>
>> On Thu, Apr 23, 2020 at 12:43 PM Aniruddh Sharma  
>> wrote:
>>>
>>> Thanks Robert and Luke
>>>
>>> This approach seems good to me. I am trying that , i have to include a 
>>> GroupBy to make Iterable available to do ParDo function to do same. 
>>> Now GroupBy is a bottleneck, its working for last 2 hours and proceed only 
>>> 40 GB data (still waiting for rest of 100's of GB of data).
>>>
>>> Currently I used GroupByKey.Create()
>>>
>>> What's recommended way to use what key to make it execute faster like same 
>>> key for all rows, vs different key for each row vs same row for a group of 
>>> keys.
>>>
>>> Thanks
>>> Aniruddh
>>>
>>> On Thu, Apr 23, 2020 at 12:47 PM Luke Cwik  wrote:

 As Robert suggested, what prevents you from doing:
 ReadFromBQ -> ParDo(BatchInMemory) -> DLP
 where BatchInMemory stores elements in the @ProcessElement method in an in 
 memory list and produce output every time the list is large enough with a 
 final output in the @FinishBundle method?

 On Thu, Apr 23, 2020 at 9:42 AM Aniruddh Sharma  
 wrote:
>
> Hi Luke
>
> Sorry forgot to mention the functions. Dataflow adds following function 
> and ["PartitionKeys", new GroupByKeyAndSortValuesOnly] this is super 
> slow, How to choose keys to make it faster ?
>
>  .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<>()))
>   .setCoder(
>   KvCoder.of(
>   keyCoder,
>   KvCoder.of(InstantCoder.of(), 
> WindowedValue.getFullCoder(kvCoder, windowCoder
>
>   // Group by key and sort by timestamp, dropping windows as they 
> are reified
>   .apply("PartitionKeys", new GroupByKeyAndSortValuesOnly<>())
>
>   // The GBKO sets the windowing strategy to the global default
>   .setWindowingStrategyInternal(inputWindowingStrategy);
>
> THanks
> ANiruddh
>
> On 2020/04/23 16:35:58, Aniruddh Sharma  wrote:
> > Thanks Luke for your response.
> >
> > My use case is following.
> > a) I read data from BQ (TableRow)
> > b) Convert it into (Table.Row) for DLP calls.
> > c) have to batch Table.Row collection up to a max size of 512 KB (i.e 
> > fit may rows from BQ into a single DLP table) and call DLP.
> >
> > Functionally, I don't have a need of key and window. As I just want to 
> > fit rows in DLP table up to a max size.
> >
> > In batch mode, when I call StateFulAPI,
> > it adds a "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly" 
> > step and this step is super slow. Like it is running on 50 node cluster 
> > for 800 GB data for last 10 hours.
> >
> > This step is not added when I call Dataflow in streaming mode. But I 
> > can't call it in Streaming mode for other reasons.
> >
> > So I am trying to understand following
> > a) 

Re: Stateful & Timely Call

2020-04-23 Thread Aniruddh Sharma
Thanks Robert,

This is a life saver and its a great help :). It works like a charm.

Thanks
Aniruddh

On Thu, Apr 23, 2020 at 4:45 PM Robert Bradshaw  wrote:

> I may have misinterpreted your email, I thought you didn't have a need for
> keys at all. If this is actually the case, you don't need a GroupByKey,
> just have your DoFn take Rows as input, and emit List as output. That
> is, it's a DoFn>.
>
> You can buffer multiple Rows in an instance variable between process
> element calls. For example,
>
> class MyBufferingDoFn> {
>   List buffer = new ArrayList<>();
>   @ProcessElement public void processElement(T elt,
> OutputReceiver> out) {
> buffer.append(out);
> if (buffer.size() > 100) {
>   out.output(buffer);
>   buffer = new ArrayList<>();
> }
>   }
>   @FinishBundle public void finishBundle(OutputReceiver> out) {
> out.output(buffer);
> buffer = new ArrayList<>();
>   }
> }
>
> See
> https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/ParDo.html
>  for
> more information on the lifetime of DoFns.
>
> As for why your GBK is taking so long, yes, this can be a bottleneck.
> However, it should be noted that Dataflow (like most other runners)
> executes this step in conjunction with other steps as part of a "fused
> stage." So if your pipeline looks like
>
> Read -> DoFnA -> GBK -> DoFnB -> Write
>
> then Read, DoFnA, and GBK[part1] will execute concurrently (all starting
> up almost immediately), one element at at time, and when that's finished,
> GBK[part2, DoFnB, Write will execute concurrently, one element at a time,
> so you can't just look at the last unfinished stage to determine where the
> bottleneck is. (One helpful tool, however, is looking at the amount of time
> spent on each step in the UI.)
>
> Hopefully that helps.
>
> - Robert
>
>
> On Thu, Apr 23, 2020 at 12:43 PM Aniruddh Sharma 
> wrote:
>
>> Thanks Robert and Luke
>>
>> This approach seems good to me. I am trying that , i have to include a
>> GroupBy to make Iterable available to do ParDo function to do same.
>> Now GroupBy is a bottleneck, its working for last 2 hours and proceed only
>> 40 GB data (still waiting for rest of 100's of GB of data).
>>
>> Currently I used GroupByKey.Create()
>>
>> What's recommended way to use what key to make it execute faster like
>> same key for all rows, vs different key for each row vs same row for a
>> group of keys.
>>
>> Thanks
>> Aniruddh
>>
>> On Thu, Apr 23, 2020 at 12:47 PM Luke Cwik  wrote:
>>
>>> As Robert suggested, what prevents you from doing:
>>> ReadFromBQ -> ParDo(BatchInMemory) -> DLP
>>> where BatchInMemory stores elements in the @ProcessElement method in an
>>> in memory list and produce output every time the list is large enough with
>>> a final output in the @FinishBundle method?
>>>
>>> On Thu, Apr 23, 2020 at 9:42 AM Aniruddh Sharma 
>>> wrote:
>>>
 Hi Luke

 Sorry forgot to mention the functions. Dataflow adds following function
 and ["PartitionKeys", new GroupByKeyAndSortValuesOnly] this is super slow,
 How to choose keys to make it faster ?

  .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<>()))
   .setCoder(
   KvCoder.of(
   keyCoder,
   KvCoder.of(InstantCoder.of(),
 WindowedValue.getFullCoder(kvCoder, windowCoder

   // Group by key and sort by timestamp, dropping windows as
 they are reified
   .apply("PartitionKeys", new GroupByKeyAndSortValuesOnly<>())

   // The GBKO sets the windowing strategy to the global default
   .setWindowingStrategyInternal(inputWindowingStrategy);

 THanks
 ANiruddh

 On 2020/04/23 16:35:58, Aniruddh Sharma  wrote:
 > Thanks Luke for your response.
 >
 > My use case is following.
 > a) I read data from BQ (TableRow)
 > b) Convert it into (Table.Row) for DLP calls.
 > c) have to batch Table.Row collection up to a max size of 512 KB (i.e
 fit may rows from BQ into a single DLP table) and call DLP.
 >
 > Functionally, I don't have a need of key and window. As I just want
 to fit rows in DLP table up to a max size.
 >
 > In batch mode, when I call StateFulAPI,
 > it adds a "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"
 step and this step is super slow. Like it is running on 50 node cluster for
 800 GB data for last 10 hours.
 >
 > This step is not added when I call Dataflow in streaming mode. But I
 can't call it in Streaming mode for other reasons.
 >
 > So I am trying to understand following
 > a) Either I give a hint somehow to Dataflow runner not to add this
 step "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"  at all,
 then I don't have any issues.
 > b) if it adds this step, then how should I choose my ARTIFICIALLY
 created keys that step can execute as fast as 

Re: Stateful & Timely Call

2020-04-23 Thread Reza Rokni
In general for others who may read the thread... If there is no natural key
in your data and you would like to make use of state and timers. Then a
simple pattern to use is :

WithKeys x - > randomint(1000)

This allows the work to go parallel.


On Fri, 24 Apr 2020, 04:45 Robert Bradshaw,  wrote:

> I may have misinterpreted your email, I thought you didn't have a need for
> keys at all. If this is actually the case, you don't need a GroupByKey,
> just have your DoFn take Rows as input, and emit List as output. That
> is, it's a DoFn>.
>
> You can buffer multiple Rows in an instance variable between process
> element calls. For example,
>
> class MyBufferingDoFn> {
>   List buffer = new ArrayList<>();
>   @ProcessElement public void processElement(T elt,
> OutputReceiver> out) {
> buffer.append(out);
> if (buffer.size() > 100) {
>   out.output(buffer);
>   buffer = new ArrayList<>();
> }
>   }
>   @FinishBundle public void finishBundle(OutputReceiver> out) {
> out.output(buffer);
> buffer = new ArrayList<>();
>   }
> }
>
> See
> https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/ParDo.html
>  for
> more information on the lifetime of DoFns.
>
> As for why your GBK is taking so long, yes, this can be a bottleneck.
> However, it should be noted that Dataflow (like most other runners)
> executes this step in conjunction with other steps as part of a "fused
> stage." So if your pipeline looks like
>
> Read -> DoFnA -> GBK -> DoFnB -> Write
>
> then Read, DoFnA, and GBK[part1] will execute concurrently (all starting
> up almost immediately), one element at at time, and when that's finished,
> GBK[part2, DoFnB, Write will execute concurrently, one element at a time,
> so you can't just look at the last unfinished stage to determine where the
> bottleneck is. (One helpful tool, however, is looking at the amount of time
> spent on each step in the UI.)
>
> Hopefully that helps.
>
> - Robert
>
>
> On Thu, Apr 23, 2020 at 12:43 PM Aniruddh Sharma 
> wrote:
>
>> Thanks Robert and Luke
>>
>> This approach seems good to me. I am trying that , i have to include a
>> GroupBy to make Iterable available to do ParDo function to do same.
>> Now GroupBy is a bottleneck, its working for last 2 hours and proceed only
>> 40 GB data (still waiting for rest of 100's of GB of data).
>>
>> Currently I used GroupByKey.Create()
>>
>> What's recommended way to use what key to make it execute faster like
>> same key for all rows, vs different key for each row vs same row for a
>> group of keys.
>>
>> Thanks
>> Aniruddh
>>
>> On Thu, Apr 23, 2020 at 12:47 PM Luke Cwik  wrote:
>>
>>> As Robert suggested, what prevents you from doing:
>>> ReadFromBQ -> ParDo(BatchInMemory) -> DLP
>>> where BatchInMemory stores elements in the @ProcessElement method in an
>>> in memory list and produce output every time the list is large enough with
>>> a final output in the @FinishBundle method?
>>>
>>> On Thu, Apr 23, 2020 at 9:42 AM Aniruddh Sharma 
>>> wrote:
>>>
 Hi Luke

 Sorry forgot to mention the functions. Dataflow adds following function
 and ["PartitionKeys", new GroupByKeyAndSortValuesOnly] this is super slow,
 How to choose keys to make it faster ?

  .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<>()))
   .setCoder(
   KvCoder.of(
   keyCoder,
   KvCoder.of(InstantCoder.of(),
 WindowedValue.getFullCoder(kvCoder, windowCoder

   // Group by key and sort by timestamp, dropping windows as
 they are reified
   .apply("PartitionKeys", new GroupByKeyAndSortValuesOnly<>())

   // The GBKO sets the windowing strategy to the global default
   .setWindowingStrategyInternal(inputWindowingStrategy);

 THanks
 ANiruddh

 On 2020/04/23 16:35:58, Aniruddh Sharma  wrote:
 > Thanks Luke for your response.
 >
 > My use case is following.
 > a) I read data from BQ (TableRow)
 > b) Convert it into (Table.Row) for DLP calls.
 > c) have to batch Table.Row collection up to a max size of 512 KB (i.e
 fit may rows from BQ into a single DLP table) and call DLP.
 >
 > Functionally, I don't have a need of key and window. As I just want
 to fit rows in DLP table up to a max size.
 >
 > In batch mode, when I call StateFulAPI,
 > it adds a "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"
 step and this step is super slow. Like it is running on 50 node cluster for
 800 GB data for last 10 hours.
 >
 > This step is not added when I call Dataflow in streaming mode. But I
 can't call it in Streaming mode for other reasons.
 >
 > So I am trying to understand following
 > a) Either I give a hint somehow to Dataflow runner not to add this
 step "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"  at all,
 then I don't have 

Re: Stateful & Timely Call

2020-04-23 Thread Robert Bradshaw
I may have misinterpreted your email, I thought you didn't have a need for
keys at all. If this is actually the case, you don't need a GroupByKey,
just have your DoFn take Rows as input, and emit List as output. That
is, it's a DoFn>.

You can buffer multiple Rows in an instance variable between process
element calls. For example,

class MyBufferingDoFn> {
  List buffer = new ArrayList<>();
  @ProcessElement public void processElement(T elt, OutputReceiver>
out) {
buffer.append(out);
if (buffer.size() > 100) {
  out.output(buffer);
  buffer = new ArrayList<>();
}
  }
  @FinishBundle public void finishBundle(OutputReceiver> out) {
out.output(buffer);
buffer = new ArrayList<>();
  }
}

See
https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/ParDo.html
for
more information on the lifetime of DoFns.

As for why your GBK is taking so long, yes, this can be a bottleneck.
However, it should be noted that Dataflow (like most other runners)
executes this step in conjunction with other steps as part of a "fused
stage." So if your pipeline looks like

Read -> DoFnA -> GBK -> DoFnB -> Write

then Read, DoFnA, and GBK[part1] will execute concurrently (all starting up
almost immediately), one element at at time, and when that's finished,
GBK[part2, DoFnB, Write will execute concurrently, one element at a time,
so you can't just look at the last unfinished stage to determine where the
bottleneck is. (One helpful tool, however, is looking at the amount of time
spent on each step in the UI.)

Hopefully that helps.

- Robert


On Thu, Apr 23, 2020 at 12:43 PM Aniruddh Sharma 
wrote:

> Thanks Robert and Luke
>
> This approach seems good to me. I am trying that , i have to include a
> GroupBy to make Iterable available to do ParDo function to do same.
> Now GroupBy is a bottleneck, its working for last 2 hours and proceed only
> 40 GB data (still waiting for rest of 100's of GB of data).
>
> Currently I used GroupByKey.Create()
>
> What's recommended way to use what key to make it execute faster like same
> key for all rows, vs different key for each row vs same row for a group of
> keys.
>
> Thanks
> Aniruddh
>
> On Thu, Apr 23, 2020 at 12:47 PM Luke Cwik  wrote:
>
>> As Robert suggested, what prevents you from doing:
>> ReadFromBQ -> ParDo(BatchInMemory) -> DLP
>> where BatchInMemory stores elements in the @ProcessElement method in an
>> in memory list and produce output every time the list is large enough with
>> a final output in the @FinishBundle method?
>>
>> On Thu, Apr 23, 2020 at 9:42 AM Aniruddh Sharma 
>> wrote:
>>
>>> Hi Luke
>>>
>>> Sorry forgot to mention the functions. Dataflow adds following function
>>> and ["PartitionKeys", new GroupByKeyAndSortValuesOnly] this is super slow,
>>> How to choose keys to make it faster ?
>>>
>>>  .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<>()))
>>>   .setCoder(
>>>   KvCoder.of(
>>>   keyCoder,
>>>   KvCoder.of(InstantCoder.of(),
>>> WindowedValue.getFullCoder(kvCoder, windowCoder
>>>
>>>   // Group by key and sort by timestamp, dropping windows as
>>> they are reified
>>>   .apply("PartitionKeys", new GroupByKeyAndSortValuesOnly<>())
>>>
>>>   // The GBKO sets the windowing strategy to the global default
>>>   .setWindowingStrategyInternal(inputWindowingStrategy);
>>>
>>> THanks
>>> ANiruddh
>>>
>>> On 2020/04/23 16:35:58, Aniruddh Sharma  wrote:
>>> > Thanks Luke for your response.
>>> >
>>> > My use case is following.
>>> > a) I read data from BQ (TableRow)
>>> > b) Convert it into (Table.Row) for DLP calls.
>>> > c) have to batch Table.Row collection up to a max size of 512 KB (i.e
>>> fit may rows from BQ into a single DLP table) and call DLP.
>>> >
>>> > Functionally, I don't have a need of key and window. As I just want to
>>> fit rows in DLP table up to a max size.
>>> >
>>> > In batch mode, when I call StateFulAPI,
>>> > it adds a "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"
>>> step and this step is super slow. Like it is running on 50 node cluster for
>>> 800 GB data for last 10 hours.
>>> >
>>> > This step is not added when I call Dataflow in streaming mode. But I
>>> can't call it in Streaming mode for other reasons.
>>> >
>>> > So I am trying to understand following
>>> > a) Either I give a hint somehow to Dataflow runner not to add this
>>> step "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"  at all,
>>> then I don't have any issues.
>>> > b) if it adds this step, then how should I choose my ARTIFICIALLY
>>> created keys that step can execute as fast as possible. It does a SORT by
>>> on timestamps on records. As I don't have any functional key requirement,
>>> shall I choose same keys for all rows vs randomkey for some rows vs random
>>> key for each row; what timestamps shall I add same for all rows ? to make
>>> this function work faster.
>>> >
>>> > Thanks
>>> > Aniruddh
>>> 

Re: Stateful & Timely Call

2020-04-23 Thread Aniruddh Sharma
Thanks Robert and Luke

This approach seems good to me. I am trying that , i have to include a
GroupBy to make Iterable available to do ParDo function to do same.
Now GroupBy is a bottleneck, its working for last 2 hours and proceed only
40 GB data (still waiting for rest of 100's of GB of data).

Currently I used GroupByKey.Create()

What's recommended way to use what key to make it execute faster like same
key for all rows, vs different key for each row vs same row for a group of
keys.

Thanks
Aniruddh

On Thu, Apr 23, 2020 at 12:47 PM Luke Cwik  wrote:

> As Robert suggested, what prevents you from doing:
> ReadFromBQ -> ParDo(BatchInMemory) -> DLP
> where BatchInMemory stores elements in the @ProcessElement method in an in
> memory list and produce output every time the list is large enough with a
> final output in the @FinishBundle method?
>
> On Thu, Apr 23, 2020 at 9:42 AM Aniruddh Sharma 
> wrote:
>
>> Hi Luke
>>
>> Sorry forgot to mention the functions. Dataflow adds following function
>> and ["PartitionKeys", new GroupByKeyAndSortValuesOnly] this is super slow,
>> How to choose keys to make it faster ?
>>
>>  .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<>()))
>>   .setCoder(
>>   KvCoder.of(
>>   keyCoder,
>>   KvCoder.of(InstantCoder.of(),
>> WindowedValue.getFullCoder(kvCoder, windowCoder
>>
>>   // Group by key and sort by timestamp, dropping windows as they
>> are reified
>>   .apply("PartitionKeys", new GroupByKeyAndSortValuesOnly<>())
>>
>>   // The GBKO sets the windowing strategy to the global default
>>   .setWindowingStrategyInternal(inputWindowingStrategy);
>>
>> THanks
>> ANiruddh
>>
>> On 2020/04/23 16:35:58, Aniruddh Sharma  wrote:
>> > Thanks Luke for your response.
>> >
>> > My use case is following.
>> > a) I read data from BQ (TableRow)
>> > b) Convert it into (Table.Row) for DLP calls.
>> > c) have to batch Table.Row collection up to a max size of 512 KB (i.e
>> fit may rows from BQ into a single DLP table) and call DLP.
>> >
>> > Functionally, I don't have a need of key and window. As I just want to
>> fit rows in DLP table up to a max size.
>> >
>> > In batch mode, when I call StateFulAPI,
>> > it adds a "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"
>> step and this step is super slow. Like it is running on 50 node cluster for
>> 800 GB data for last 10 hours.
>> >
>> > This step is not added when I call Dataflow in streaming mode. But I
>> can't call it in Streaming mode for other reasons.
>> >
>> > So I am trying to understand following
>> > a) Either I give a hint somehow to Dataflow runner not to add this step
>> "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"  at all, then I
>> don't have any issues.
>> > b) if it adds this step, then how should I choose my ARTIFICIALLY
>> created keys that step can execute as fast as possible. It does a SORT by
>> on timestamps on records. As I don't have any functional key requirement,
>> shall I choose same keys for all rows vs randomkey for some rows vs random
>> key for each row; what timestamps shall I add same for all rows ? to make
>> this function work faster.
>> >
>> > Thanks
>> > Aniruddh
>> >
>> > On 2020/04/23 16:15:44, Luke Cwik  wrote:
>> > > Stateful & timely operations are always per key and window which is
>> the
>> > > GbkBeforeStatefulParDo is being added. Do you not need your stateful &
>> > > timely operation to be done per key and window, if so can you explain
>> > > further?
>> > >
>> > > On Thu, Apr 23, 2020 at 6:29 AM Aniruddh Sharma > >
>> > > wrote:
>> > >
>> > > > Hi Kenn
>> > > >
>> > > > Thanks for your guidance, I understand that batch mode waits for
>> previous
>> > > > stage. But the real issue in this particular case is not only this.
>> > > >
>> > > > Dataflow runner adds a step automatically
>> > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which not only
>> waits
>> > > > for previous stage but it waits for a very very very long time. Is
>> there a
>> > > > way to give hint to Dataflow runner not to add this step, as in my
>> case I
>> > > > functionally do not require this step.
>> > > >
>> > > > Thanks for your suggestion, will create another thread to
>> understand BQ
>> > > > options
>> > > >
>> > > > Thanks
>> > > > Aniruddh
>> > > >
>> > > > On 2020/04/23 03:51:31, Kenneth Knowles  wrote:
>> > > > > The definition of batch mode for Dataflow is this: completely
>> compute the
>> > > > > result of one stage of computation before starting the next
>> stage. There
>> > > > is
>> > > > > no way around this. It does not have to do with using state and
>> timers.
>> > > > >
>> > > > > If you are working with state & timers & triggers, and you are
>> hoping for
>> > > > > output before the pipeline is completely terminated, then you
>> most likely
>> > > > > want streaming mode. Perhaps it is best to investigate the BQ read
>> > > > > performance issue.
>> > > > >
>> > 

Re: Stateful & Timely Call

2020-04-23 Thread Luke Cwik
As Robert suggested, what prevents you from doing:
ReadFromBQ -> ParDo(BatchInMemory) -> DLP
where BatchInMemory stores elements in the @ProcessElement method in an in
memory list and produce output every time the list is large enough with a
final output in the @FinishBundle method?

On Thu, Apr 23, 2020 at 9:42 AM Aniruddh Sharma 
wrote:

> Hi Luke
>
> Sorry forgot to mention the functions. Dataflow adds following function
> and ["PartitionKeys", new GroupByKeyAndSortValuesOnly] this is super slow,
> How to choose keys to make it faster ?
>
>  .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<>()))
>   .setCoder(
>   KvCoder.of(
>   keyCoder,
>   KvCoder.of(InstantCoder.of(),
> WindowedValue.getFullCoder(kvCoder, windowCoder
>
>   // Group by key and sort by timestamp, dropping windows as they
> are reified
>   .apply("PartitionKeys", new GroupByKeyAndSortValuesOnly<>())
>
>   // The GBKO sets the windowing strategy to the global default
>   .setWindowingStrategyInternal(inputWindowingStrategy);
>
> THanks
> ANiruddh
>
> On 2020/04/23 16:35:58, Aniruddh Sharma  wrote:
> > Thanks Luke for your response.
> >
> > My use case is following.
> > a) I read data from BQ (TableRow)
> > b) Convert it into (Table.Row) for DLP calls.
> > c) have to batch Table.Row collection up to a max size of 512 KB (i.e
> fit may rows from BQ into a single DLP table) and call DLP.
> >
> > Functionally, I don't have a need of key and window. As I just want to
> fit rows in DLP table up to a max size.
> >
> > In batch mode, when I call StateFulAPI,
> > it adds a "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly" step
> and this step is super slow. Like it is running on 50 node cluster for 800
> GB data for last 10 hours.
> >
> > This step is not added when I call Dataflow in streaming mode. But I
> can't call it in Streaming mode for other reasons.
> >
> > So I am trying to understand following
> > a) Either I give a hint somehow to Dataflow runner not to add this step
> "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"  at all, then I
> don't have any issues.
> > b) if it adds this step, then how should I choose my ARTIFICIALLY
> created keys that step can execute as fast as possible. It does a SORT by
> on timestamps on records. As I don't have any functional key requirement,
> shall I choose same keys for all rows vs randomkey for some rows vs random
> key for each row; what timestamps shall I add same for all rows ? to make
> this function work faster.
> >
> > Thanks
> > Aniruddh
> >
> > On 2020/04/23 16:15:44, Luke Cwik  wrote:
> > > Stateful & timely operations are always per key and window which is the
> > > GbkBeforeStatefulParDo is being added. Do you not need your stateful &
> > > timely operation to be done per key and window, if so can you explain
> > > further?
> > >
> > > On Thu, Apr 23, 2020 at 6:29 AM Aniruddh Sharma 
> > > wrote:
> > >
> > > > Hi Kenn
> > > >
> > > > Thanks for your guidance, I understand that batch mode waits for
> previous
> > > > stage. But the real issue in this particular case is not only this.
> > > >
> > > > Dataflow runner adds a step automatically
> > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which not only
> waits
> > > > for previous stage but it waits for a very very very long time. Is
> there a
> > > > way to give hint to Dataflow runner not to add this step, as in my
> case I
> > > > functionally do not require this step.
> > > >
> > > > Thanks for your suggestion, will create another thread to understand
> BQ
> > > > options
> > > >
> > > > Thanks
> > > > Aniruddh
> > > >
> > > > On 2020/04/23 03:51:31, Kenneth Knowles  wrote:
> > > > > The definition of batch mode for Dataflow is this: completely
> compute the
> > > > > result of one stage of computation before starting the next stage.
> There
> > > > is
> > > > > no way around this. It does not have to do with using state and
> timers.
> > > > >
> > > > > If you are working with state & timers & triggers, and you are
> hoping for
> > > > > output before the pipeline is completely terminated, then you most
> likely
> > > > > want streaming mode. Perhaps it is best to investigate the BQ read
> > > > > performance issue.
> > > > >
> > > > > Kenn
> > > > >
> > > > > On Wed, Apr 22, 2020 at 4:04 PM Aniruddh Sharma <
> asharma...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi
> > > > > >
> > > > > > I am reading a bounded collection from BQ.
> > > > > >
> > > > > > I have to use a Stateful & Timely operation.
> > > > > >
> > > > > > a) I am invoking job in batch mode. Dataflow runner adds a step
> > > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which has
> > > > partitionBy.
> > > > > > This partitionBy waits for all the data to come and becomes a
> > > > bottleneck.
> > > > > > when I read about its documentation it seems its objective it to
> be
> > > > added
> > > > > > when there are no 

Re: Stateful & Timely Call

2020-04-23 Thread Aniruddh Sharma
Hi Luke

Sorry forgot to mention the functions. Dataflow adds following function and 
["PartitionKeys", new GroupByKeyAndSortValuesOnly] this is super slow, How to 
choose keys to make it faster ?

 .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<>()))
  .setCoder(
  KvCoder.of(
  keyCoder,
  KvCoder.of(InstantCoder.of(), 
WindowedValue.getFullCoder(kvCoder, windowCoder

  // Group by key and sort by timestamp, dropping windows as they are 
reified
  .apply("PartitionKeys", new GroupByKeyAndSortValuesOnly<>())

  // The GBKO sets the windowing strategy to the global default
  .setWindowingStrategyInternal(inputWindowingStrategy);

THanks
ANiruddh

On 2020/04/23 16:35:58, Aniruddh Sharma  wrote: 
> Thanks Luke for your response.
> 
> My use case is following.
> a) I read data from BQ (TableRow)
> b) Convert it into (Table.Row) for DLP calls.
> c) have to batch Table.Row collection up to a max size of 512 KB (i.e fit may 
> rows from BQ into a single DLP table) and call DLP.
> 
> Functionally, I don't have a need of key and window. As I just want to fit 
> rows in DLP table up to a max size.
> 
> In batch mode, when I call StateFulAPI, 
> it adds a "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly" step and 
> this step is super slow. Like it is running on 50 node cluster for 800 GB 
> data for last 10 hours.
> 
> This step is not added when I call Dataflow in streaming mode. But I can't 
> call it in Streaming mode for other reasons.
> 
> So I am trying to understand following
> a) Either I give a hint somehow to Dataflow runner not to add this step 
> "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"  at all, then I 
> don't have any issues.
> b) if it adds this step, then how should I choose my ARTIFICIALLY created 
> keys that step can execute as fast as possible. It does a SORT by on 
> timestamps on records. As I don't have any functional key requirement, shall 
> I choose same keys for all rows vs randomkey for some rows vs random key for 
> each row; what timestamps shall I add same for all rows ? to make this 
> function work faster.
> 
> Thanks
> Aniruddh
> 
> On 2020/04/23 16:15:44, Luke Cwik  wrote: 
> > Stateful & timely operations are always per key and window which is the
> > GbkBeforeStatefulParDo is being added. Do you not need your stateful &
> > timely operation to be done per key and window, if so can you explain
> > further?
> > 
> > On Thu, Apr 23, 2020 at 6:29 AM Aniruddh Sharma 
> > wrote:
> > 
> > > Hi Kenn
> > >
> > > Thanks for your guidance, I understand that batch mode waits for previous
> > > stage. But the real issue in this particular case is not only this.
> > >
> > > Dataflow runner adds a step automatically
> > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which not only waits
> > > for previous stage but it waits for a very very very long time. Is there a
> > > way to give hint to Dataflow runner not to add this step, as in my case I
> > > functionally do not require this step.
> > >
> > > Thanks for your suggestion, will create another thread to understand BQ
> > > options
> > >
> > > Thanks
> > > Aniruddh
> > >
> > > On 2020/04/23 03:51:31, Kenneth Knowles  wrote:
> > > > The definition of batch mode for Dataflow is this: completely compute 
> > > > the
> > > > result of one stage of computation before starting the next stage. There
> > > is
> > > > no way around this. It does not have to do with using state and timers.
> > > >
> > > > If you are working with state & timers & triggers, and you are hoping 
> > > > for
> > > > output before the pipeline is completely terminated, then you most 
> > > > likely
> > > > want streaming mode. Perhaps it is best to investigate the BQ read
> > > > performance issue.
> > > >
> > > > Kenn
> > > >
> > > > On Wed, Apr 22, 2020 at 4:04 PM Aniruddh Sharma 
> > > > wrote:
> > > >
> > > > > Hi
> > > > >
> > > > > I am reading a bounded collection from BQ.
> > > > >
> > > > > I have to use a Stateful & Timely operation.
> > > > >
> > > > > a) I am invoking job in batch mode. Dataflow runner adds a step
> > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which has
> > > partitionBy.
> > > > > This partitionBy waits for all the data to come and becomes a
> > > bottleneck.
> > > > > when I read about its documentation it seems its objective it to be
> > > added
> > > > > when there are no windows.
> > > > >
> > > > > I tried added windows and triggering them before stateful step, but
> > > > > everything comes to this partitionBy step and waits till all data is
> > > here.
> > > > >
> > > > > Is there a way to write code in some way (like window etc) or give
> > > > > Dataflow a hint not to add this step in.
> > > > >
> > > > > b) I dont want to call this job in streaming mode, When I call in
> > > > > streaming mode, this Dataflow runner does not add this step, but in
> > > > > Streaming BQ read becomes a 

Re: Stateful & Timely Call

2020-04-23 Thread Robert Bradshaw
If you're using State solely to batch elements together, I would
recommend avoiding state altogether. You can instead have a DoFn that holds
a List as a member variable, add to it, and possibly emit the list
at the threshold, in your ProcessElement method, and also emit the batch in
FinishBundle.

On Thu, Apr 23, 2020 at 9:36 AM Aniruddh Sharma 
wrote:

> Thanks Luke for your response.
>
> My use case is following.
> a) I read data from BQ (TableRow)
> b) Convert it into (Table.Row) for DLP calls.
> c) have to batch Table.Row collection up to a max size of 512 KB (i.e fit
> may rows from BQ into a single DLP table) and call DLP.
>
> Functionally, I don't have a need of key and window. As I just want to fit
> rows in DLP table up to a max size.
>
> In batch mode, when I call StateFulAPI,
> it adds a "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly" step
> and this step is super slow. Like it is running on 50 node cluster for 800
> GB data for last 10 hours.
>
> This step is not added when I call Dataflow in streaming mode. But I can't
> call it in Streaming mode for other reasons.
>
> So I am trying to understand following
> a) Either I give a hint somehow to Dataflow runner not to add this step
> "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"  at all, then I
> don't have any issues.
> b) if it adds this step, then how should I choose my ARTIFICIALLY created
> keys that step can execute as fast as possible. It does a SORT by on
> timestamps on records. As I don't have any functional key requirement,
> shall I choose same keys for all rows vs randomkey for some rows vs random
> key for each row; what timestamps shall I add same for all rows ? to make
> this function work faster.
>
> Thanks
> Aniruddh
>
> On 2020/04/23 16:15:44, Luke Cwik  wrote:
> > Stateful & timely operations are always per key and window which is the
> > GbkBeforeStatefulParDo is being added. Do you not need your stateful &
> > timely operation to be done per key and window, if so can you explain
> > further?
> >
> > On Thu, Apr 23, 2020 at 6:29 AM Aniruddh Sharma 
> > wrote:
> >
> > > Hi Kenn
> > >
> > > Thanks for your guidance, I understand that batch mode waits for
> previous
> > > stage. But the real issue in this particular case is not only this.
> > >
> > > Dataflow runner adds a step automatically
> > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which not only
> waits
> > > for previous stage but it waits for a very very very long time. Is
> there a
> > > way to give hint to Dataflow runner not to add this step, as in my
> case I
> > > functionally do not require this step.
> > >
> > > Thanks for your suggestion, will create another thread to understand BQ
> > > options
> > >
> > > Thanks
> > > Aniruddh
> > >
> > > On 2020/04/23 03:51:31, Kenneth Knowles  wrote:
> > > > The definition of batch mode for Dataflow is this: completely
> compute the
> > > > result of one stage of computation before starting the next stage.
> There
> > > is
> > > > no way around this. It does not have to do with using state and
> timers.
> > > >
> > > > If you are working with state & timers & triggers, and you are
> hoping for
> > > > output before the pipeline is completely terminated, then you most
> likely
> > > > want streaming mode. Perhaps it is best to investigate the BQ read
> > > > performance issue.
> > > >
> > > > Kenn
> > > >
> > > > On Wed, Apr 22, 2020 at 4:04 PM Aniruddh Sharma <
> asharma...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi
> > > > >
> > > > > I am reading a bounded collection from BQ.
> > > > >
> > > > > I have to use a Stateful & Timely operation.
> > > > >
> > > > > a) I am invoking job in batch mode. Dataflow runner adds a step
> > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which has
> > > partitionBy.
> > > > > This partitionBy waits for all the data to come and becomes a
> > > bottleneck.
> > > > > when I read about its documentation it seems its objective it to be
> > > added
> > > > > when there are no windows.
> > > > >
> > > > > I tried added windows and triggering them before stateful step, but
> > > > > everything comes to this partitionBy step and waits till all data
> is
> > > here.
> > > > >
> > > > > Is there a way to write code in some way (like window etc) or give
> > > > > Dataflow a hint not to add this step in.
> > > > >
> > > > > b) I dont want to call this job in streaming mode, When I call in
> > > > > streaming mode, this Dataflow runner does not add this step, but in
> > > > > Streaming BQ read becomes a bottleneck.
> > > > >
> > > > > So either I have to solve how I read BQ faster if I call job in
> > > Streaming
> > > > > mode or How I bypass this partitionBy from
> > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" if I invoke
> job in
> > > > > batch mode ?
> > > > >
> > > > > Thanks
> > > > > Aniruddh
> > > > >
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>


Re: Stateful & Timely Call

2020-04-23 Thread Aniruddh Sharma
Thanks Luke for your response.

My use case is following.
a) I read data from BQ (TableRow)
b) Convert it into (Table.Row) for DLP calls.
c) have to batch Table.Row collection up to a max size of 512 KB (i.e fit may 
rows from BQ into a single DLP table) and call DLP.

Functionally, I don't have a need of key and window. As I just want to fit rows 
in DLP table up to a max size.

In batch mode, when I call StateFulAPI, 
it adds a "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly" step and 
this step is super slow. Like it is running on 50 node cluster for 800 GB data 
for last 10 hours.

This step is not added when I call Dataflow in streaming mode. But I can't call 
it in Streaming mode for other reasons.

So I am trying to understand following
a) Either I give a hint somehow to Dataflow runner not to add this step 
"BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"  at all, then I don't 
have any issues.
b) if it adds this step, then how should I choose my ARTIFICIALLY created keys 
that step can execute as fast as possible. It does a SORT by on timestamps on 
records. As I don't have any functional key requirement, shall I choose same 
keys for all rows vs randomkey for some rows vs random key for each row; what 
timestamps shall I add same for all rows ? to make this function work faster.

Thanks
Aniruddh

On 2020/04/23 16:15:44, Luke Cwik  wrote: 
> Stateful & timely operations are always per key and window which is the
> GbkBeforeStatefulParDo is being added. Do you not need your stateful &
> timely operation to be done per key and window, if so can you explain
> further?
> 
> On Thu, Apr 23, 2020 at 6:29 AM Aniruddh Sharma 
> wrote:
> 
> > Hi Kenn
> >
> > Thanks for your guidance, I understand that batch mode waits for previous
> > stage. But the real issue in this particular case is not only this.
> >
> > Dataflow runner adds a step automatically
> > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which not only waits
> > for previous stage but it waits for a very very very long time. Is there a
> > way to give hint to Dataflow runner not to add this step, as in my case I
> > functionally do not require this step.
> >
> > Thanks for your suggestion, will create another thread to understand BQ
> > options
> >
> > Thanks
> > Aniruddh
> >
> > On 2020/04/23 03:51:31, Kenneth Knowles  wrote:
> > > The definition of batch mode for Dataflow is this: completely compute the
> > > result of one stage of computation before starting the next stage. There
> > is
> > > no way around this. It does not have to do with using state and timers.
> > >
> > > If you are working with state & timers & triggers, and you are hoping for
> > > output before the pipeline is completely terminated, then you most likely
> > > want streaming mode. Perhaps it is best to investigate the BQ read
> > > performance issue.
> > >
> > > Kenn
> > >
> > > On Wed, Apr 22, 2020 at 4:04 PM Aniruddh Sharma 
> > > wrote:
> > >
> > > > Hi
> > > >
> > > > I am reading a bounded collection from BQ.
> > > >
> > > > I have to use a Stateful & Timely operation.
> > > >
> > > > a) I am invoking job in batch mode. Dataflow runner adds a step
> > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which has
> > partitionBy.
> > > > This partitionBy waits for all the data to come and becomes a
> > bottleneck.
> > > > when I read about its documentation it seems its objective it to be
> > added
> > > > when there are no windows.
> > > >
> > > > I tried added windows and triggering them before stateful step, but
> > > > everything comes to this partitionBy step and waits till all data is
> > here.
> > > >
> > > > Is there a way to write code in some way (like window etc) or give
> > > > Dataflow a hint not to add this step in.
> > > >
> > > > b) I dont want to call this job in streaming mode, When I call in
> > > > streaming mode, this Dataflow runner does not add this step, but in
> > > > Streaming BQ read becomes a bottleneck.
> > > >
> > > > So either I have to solve how I read BQ faster if I call job in
> > Streaming
> > > > mode or How I bypass this partitionBy from
> > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" if I invoke job in
> > > > batch mode ?
> > > >
> > > > Thanks
> > > > Aniruddh
> > > >
> > > >
> > > >
> > > >
> > >
> >
> 


Re: Stateful & Timely Call

2020-04-23 Thread Luke Cwik
Stateful & timely operations are always per key and window which is the
GbkBeforeStatefulParDo is being added. Do you not need your stateful &
timely operation to be done per key and window, if so can you explain
further?

On Thu, Apr 23, 2020 at 6:29 AM Aniruddh Sharma 
wrote:

> Hi Kenn
>
> Thanks for your guidance, I understand that batch mode waits for previous
> stage. But the real issue in this particular case is not only this.
>
> Dataflow runner adds a step automatically
> "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which not only waits
> for previous stage but it waits for a very very very long time. Is there a
> way to give hint to Dataflow runner not to add this step, as in my case I
> functionally do not require this step.
>
> Thanks for your suggestion, will create another thread to understand BQ
> options
>
> Thanks
> Aniruddh
>
> On 2020/04/23 03:51:31, Kenneth Knowles  wrote:
> > The definition of batch mode for Dataflow is this: completely compute the
> > result of one stage of computation before starting the next stage. There
> is
> > no way around this. It does not have to do with using state and timers.
> >
> > If you are working with state & timers & triggers, and you are hoping for
> > output before the pipeline is completely terminated, then you most likely
> > want streaming mode. Perhaps it is best to investigate the BQ read
> > performance issue.
> >
> > Kenn
> >
> > On Wed, Apr 22, 2020 at 4:04 PM Aniruddh Sharma 
> > wrote:
> >
> > > Hi
> > >
> > > I am reading a bounded collection from BQ.
> > >
> > > I have to use a Stateful & Timely operation.
> > >
> > > a) I am invoking job in batch mode. Dataflow runner adds a step
> > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which has
> partitionBy.
> > > This partitionBy waits for all the data to come and becomes a
> bottleneck.
> > > when I read about its documentation it seems its objective it to be
> added
> > > when there are no windows.
> > >
> > > I tried added windows and triggering them before stateful step, but
> > > everything comes to this partitionBy step and waits till all data is
> here.
> > >
> > > Is there a way to write code in some way (like window etc) or give
> > > Dataflow a hint not to add this step in.
> > >
> > > b) I dont want to call this job in streaming mode, When I call in
> > > streaming mode, this Dataflow runner does not add this step, but in
> > > Streaming BQ read becomes a bottleneck.
> > >
> > > So either I have to solve how I read BQ faster if I call job in
> Streaming
> > > mode or How I bypass this partitionBy from
> > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" if I invoke job in
> > > batch mode ?
> > >
> > > Thanks
> > > Aniruddh
> > >
> > >
> > >
> > >
> >
>


Re: Stateful & Timely Call

2020-04-23 Thread Aniruddh Sharma
Hi Kenn

Thanks for your guidance, I understand that batch mode waits for previous 
stage. But the real issue in this particular case is not only this.

Dataflow runner adds a step automatically 
"BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which not only waits for 
previous stage but it waits for a very very very long time. Is there a way to 
give hint to Dataflow runner not to add this step, as in my case I functionally 
do not require this step. 

Thanks for your suggestion, will create another thread to understand BQ options

Thanks
Aniruddh

On 2020/04/23 03:51:31, Kenneth Knowles  wrote: 
> The definition of batch mode for Dataflow is this: completely compute the
> result of one stage of computation before starting the next stage. There is
> no way around this. It does not have to do with using state and timers.
> 
> If you are working with state & timers & triggers, and you are hoping for
> output before the pipeline is completely terminated, then you most likely
> want streaming mode. Perhaps it is best to investigate the BQ read
> performance issue.
> 
> Kenn
> 
> On Wed, Apr 22, 2020 at 4:04 PM Aniruddh Sharma 
> wrote:
> 
> > Hi
> >
> > I am reading a bounded collection from BQ.
> >
> > I have to use a Stateful & Timely operation.
> >
> > a) I am invoking job in batch mode. Dataflow runner adds a step
> > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which has partitionBy.
> > This partitionBy waits for all the data to come and becomes a bottleneck.
> > when I read about its documentation it seems its objective it to be added
> > when there are no windows.
> >
> > I tried added windows and triggering them before stateful step, but
> > everything comes to this partitionBy step and waits till all data is here.
> >
> > Is there a way to write code in some way (like window etc) or give
> > Dataflow a hint not to add this step in.
> >
> > b) I dont want to call this job in streaming mode, When I call in
> > streaming mode, this Dataflow runner does not add this step, but in
> > Streaming BQ read becomes a bottleneck.
> >
> > So either I have to solve how I read BQ faster if I call job in Streaming
> > mode or How I bypass this partitionBy from
> > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" if I invoke job in
> > batch mode ?
> >
> > Thanks
> > Aniruddh
> >
> >
> >
> >
> 


Stateful & Timely Call

2020-04-22 Thread Aniruddh Sharma
Hi 

I am reading a bounded collection from BQ. 

I have to use a Stateful & Timely operation. 

a) I am invoking job in batch mode. Dataflow runner adds a step 
"BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which has partitionBy. 
This partitionBy waits for all the data to come and becomes a bottleneck. when 
I read about its documentation it seems its objective it to be added when there 
are no windows.

I tried added windows and triggering them before stateful step, but everything 
comes to this partitionBy step and waits till all data is here. 

Is there a way to write code in some way (like window etc) or give Dataflow a 
hint not to add this step in.

b) I dont want to call this job in streaming mode, When I call in streaming 
mode, this Dataflow runner does not add this step, but in Streaming BQ read 
becomes a bottleneck.

So either I have to solve how I read BQ faster if I call job in Streaming mode 
or How I bypass this partitionBy from 
"BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" if I invoke job in batch 
mode ?

Thanks
Aniruddh