Re: Any recomendation for key for GroupIntoBatches

2024-04-28 Thread Wiśniowski Piotr

Hi,

Might be late to the discussion, but providing another option (as I 
think it was not mentioned or I missed it). Take a look at 
[this](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements) 
as I think this is precisely what you want to achieve.


Compared to other answers:

- this one is elastic, to fit any downstream use case

- no custom code - native Beam transform

- no shuffling of the data required as the data would be batched on the 
worker already having the data (but pay attention to the max msg size 
limit of your runner) - shuffling would be required when creating 
artificial random-looking keys.


Note that above is Python, but I do bet there is Java counterpart (or at 
least easy to implement).


Best

Wiśniowski Piotr


On 15.04.2024 19:14, Reuven Lax via user wrote:
There are various strategies. Here is an example of how Beam does it 
(taken from Reshuffle.viaRandomKey().withNumBuckets(N)


Note that this does some extra hashing to work around issues with the 
Spark runner. If you don't care about that, you could implement 
something simpler (e.g. initialize shard to a random number in 
StartBundle, and increment it mod numBuckets in each processelement call).

public static class AssignShardFn extends DoFn> {
   private int shard;
   private @Nullable Integer numBuckets;

   public AssignShardFn(@Nullable Integer numBuckets) {
 this.numBuckets = numBuckets;
   }

   @Setup public void setup() {
 shard =ThreadLocalRandom.current().nextInt();
   }

   @ProcessElement public void processElement(@Element T 
element,OutputReceiver> r) {
 ++shard;
 // Smear the shard into something more random-looking, to avoid issues 
// with runners that don't properly hash the key being shuffled, but 
rely // on it being random-looking. E.g. Spark takes the Java 
hashCode() of keys, // which for Integer is a no-op and it is an 
issue: // 
http://hydronitrogen.com/poor-hash-partitioning-of-timestamps-integers-and-longs-in- 
// spark.html // This hashing strategy is copied from // 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Hashing.smear(). 
int hashOfShard =0x1b873593 *Integer.rotateLeft(shard *0xcc9e2d51,15);

 if (numBuckets !=null) {
   UnsignedInteger unsignedNumBuckets 
=UnsignedInteger.fromIntBits(numBuckets);
   hashOfShard 
=UnsignedInteger.fromIntBits(hashOfShard).mod(unsignedNumBuckets).intValue();
 }
 r.output(KV.of(hashOfShard, element));
   }
}

On Mon, Apr 15, 2024 at 10:01 AM Damon Douglas 
 wrote:


Good day, Ruben,

Would you be able to compute a shasum on the group of IDs to use
as the key?

Best,

Damon

On 2024/04/12 19:22:45 Ruben Vargas wrote:
> Hello guys
>
> Maybe this question was already answered, but I cannot find it  and
> want some more input on this topic.
>
> I have some messages that don't have any particular key candidate,
> except the ID,  but I don't want to use it because the idea is to
> group multiple IDs in the same batch.
>
> This is my use case:
>
> I have an endpoint where I'm gonna send the message ID, this
endpoint
> is gonna return me certain information which I will use to enrich my
> message. In order to avoid fetching the endpoint per message I
want to
> batch it in 100 and send the 100 IDs in one request ( the endpoint
> supports it) . I was thinking on using GroupIntoBatches.
>
> - If I choose the ID as the key, my understanding is that it won't
> work in the way I want (because it will form batches of the same
ID).
> - Use a constant will be a problem for parallelism, is that correct?
>
> Then my question is, what should I use as a key? Maybe something
> regarding the timestamp? so I can have groups of messages that
arrive
> at a certain second?
>
> Any suggestions would be appreciated
>
> Thanks.
>


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Robert Bradshaw via user
On Fri, Apr 12, 2024 at 1:39 PM Ruben Vargas 
wrote:

> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  wrote:
> >
> > Here is an example from a book that I'm reading now and it may be
> applicable.
> >
> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
> > PYTHON - ord(id[0]) % 100
>

or abs(hash(id)) % 100, in case the first character of your id is not well
distributed.


> Maybe this is what I'm looking for. I'll give it a try. Thanks!
>
> >
> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian 
> wrote:
> >>
> >> How about just keeping track of a buffer and flush the buffer after 100
> messages and if there is a buffer on finish_bundle as well?
> >>
> >>
>
> If this is in memory, It could lead to potential loss of data. That is
> why the state is used or at least that is my understanding. but maybe
> there is a way to do this in the state?
>

Bundles are the unit of commitment in Beam [1], so finish_bundle won't drop
any data. A possible downside is that, especially in streaming, they may be
small which would cap the amount of batching you get.

https://beam.apache.org/documentation/runtime/model/#bundling-and-persistence


> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas 
> wrote:
> >>>
> >>> Hello guys
> >>>
> >>> Maybe this question was already answered, but I cannot find it  and
> >>> want some more input on this topic.
> >>>
> >>> I have some messages that don't have any particular key candidate,
> >>> except the ID,  but I don't want to use it because the idea is to
> >>> group multiple IDs in the same batch.
> >>>
> >>> This is my use case:
> >>>
> >>> I have an endpoint where I'm gonna send the message ID, this endpoint
> >>> is gonna return me certain information which I will use to enrich my
> >>> message. In order to avoid fetching the endpoint per message I want to
> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint
> >>> supports it) . I was thinking on using GroupIntoBatches.
> >>>
> >>> - If I choose the ID as the key, my understanding is that it won't
> >>> work in the way I want (because it will form batches of the same ID).
> >>> - Use a constant will be a problem for parallelism, is that correct?
> >>>
> >>> Then my question is, what should I use as a key? Maybe something
> >>> regarding the timestamp? so I can have groups of messages that arrive
> >>> at a certain second?
> >>>
> >>> Any suggestions would be appreciated
> >>>
> >>> Thanks.
>


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Reuven Lax via user
There are various strategies. Here is an example of how Beam does it (taken
from Reshuffle.viaRandomKey().withNumBuckets(N)

Note that this does some extra hashing to work around issues with the Spark
runner. If you don't care about that, you could implement something simpler
(e.g. initialize shard to a random number in StartBundle, and increment it
mod numBuckets in each processelement call).

public static class AssignShardFn extends DoFn> {
  private int shard;
  private @Nullable Integer numBuckets;

  public AssignShardFn(@Nullable Integer numBuckets) {
this.numBuckets = numBuckets;
  }

  @Setup
  public void setup() {
shard = ThreadLocalRandom.current().nextInt();
  }

  @ProcessElement
  public void processElement(@Element T element,
OutputReceiver> r) {
++shard;
// Smear the shard into something more random-looking, to avoid issues
// with runners that don't properly hash the key being shuffled, but rely
// on it being random-looking. E.g. Spark takes the Java hashCode() of keys,
// which for Integer is a no-op and it is an issue:
// 
http://hydronitrogen.com/poor-hash-partitioning-of-timestamps-integers-and-longs-in-
// spark.html
// This hashing strategy is copied from
// 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Hashing.smear().
int hashOfShard = 0x1b873593 * Integer.rotateLeft(shard * 0xcc9e2d51, 15);
if (numBuckets != null) {
  UnsignedInteger unsignedNumBuckets =
UnsignedInteger.fromIntBits(numBuckets);
  hashOfShard =
UnsignedInteger.fromIntBits(hashOfShard).mod(unsignedNumBuckets).intValue();
}
r.output(KV.of(hashOfShard, element));
  }
}



On Mon, Apr 15, 2024 at 10:01 AM Damon Douglas 
wrote:

> Good day, Ruben,
>
> Would you be able to compute a shasum on the group of IDs to use as the
> key?
>
> Best,
>
> Damon
>
> On 2024/04/12 19:22:45 Ruben Vargas wrote:
> > Hello guys
> >
> > Maybe this question was already answered, but I cannot find it  and
> > want some more input on this topic.
> >
> > I have some messages that don't have any particular key candidate,
> > except the ID,  but I don't want to use it because the idea is to
> > group multiple IDs in the same batch.
> >
> > This is my use case:
> >
> > I have an endpoint where I'm gonna send the message ID, this endpoint
> > is gonna return me certain information which I will use to enrich my
> > message. In order to avoid fetching the endpoint per message I want to
> > batch it in 100 and send the 100 IDs in one request ( the endpoint
> > supports it) . I was thinking on using GroupIntoBatches.
> >
> > - If I choose the ID as the key, my understanding is that it won't
> > work in the way I want (because it will form batches of the same ID).
> > - Use a constant will be a problem for parallelism, is that correct?
> >
> > Then my question is, what should I use as a key? Maybe something
> > regarding the timestamp? so I can have groups of messages that arrive
> > at a certain second?
> >
> > Any suggestions would be appreciated
> >
> > Thanks.
> >
>


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Damon Douglas
Good day, Ruben,

Would you be able to compute a shasum on the group of IDs to use as the key?

Best,

Damon

On 2024/04/12 19:22:45 Ruben Vargas wrote:
> Hello guys
> 
> Maybe this question was already answered, but I cannot find it  and
> want some more input on this topic.
> 
> I have some messages that don't have any particular key candidate,
> except the ID,  but I don't want to use it because the idea is to
> group multiple IDs in the same batch.
> 
> This is my use case:
> 
> I have an endpoint where I'm gonna send the message ID, this endpoint
> is gonna return me certain information which I will use to enrich my
> message. In order to avoid fetching the endpoint per message I want to
> batch it in 100 and send the 100 IDs in one request ( the endpoint
> supports it) . I was thinking on using GroupIntoBatches.
> 
> - If I choose the ID as the key, my understanding is that it won't
> work in the way I want (because it will form batches of the same ID).
> - Use a constant will be a problem for parallelism, is that correct?
> 
> Then my question is, what should I use as a key? Maybe something
> regarding the timestamp? so I can have groups of messages that arrive
> at a certain second?
> 
> Any suggestions would be appreciated
> 
> Thanks.
> 


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Ruben Vargas
Yeah  unfortunately the data on the endpoint could change at any point
in time and I need to make sure to have the latest one :/

That limits my options here. But I also have other sources that can
benefit from this caching :)

Thank you very much!

On Mon, Apr 15, 2024 at 9:37 AM XQ Hu  wrote:
>
> I am not sure you still need to do batching since Web API can handle caching.
>
> If you really need it, I think GoupIntoBatches is a good way to go.
>
> On Mon, Apr 15, 2024 at 11:30 AM Ruben Vargas  wrote:
>>
>> Is there a way to do batching in that transformation? I'm assuming for
>> now no. or may be using in conjuntion with GoupIntoBatches
>>
>> On Mon, Apr 15, 2024 at 9:29 AM Ruben Vargas  wrote:
>> >
>> > Interesting
>> >
>> > I think the cache feature could be interesting for some use cases I have.
>> >
>> > On Mon, Apr 15, 2024 at 9:18 AM XQ Hu  wrote:
>> > >
>> > > For the new web API IO, the page lists these features:
>> > >
>> > > developers provide minimal code that invokes Web API endpoint
>> > > delegate to the transform to handle request retries and exponential 
>> > > backoff
>> > > optional caching of request and response associations
>> > > optional metrics
>> > >
>> > >
>> > > On Mon, Apr 15, 2024 at 10:38 AM Ruben Vargas  
>> > > wrote:
>> > >>
>> > >> That one looks interesting
>> > >>
>> > >> What is not clear to me is what are the advantages of using it? Is
>> > >> only the error/retry handling? anything in terms of performance?
>> > >>
>> > >> My PCollection is unbounded but I was thinking of sending my messages
>> > >> in batches to the external API in order to gain some performance
>> > >> (don't expect to send 1 http request per message).
>> > >>
>> > >> Thank you very much for all your responses!
>> > >>
>> > >>
>> > >> On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user  
>> > >> wrote:
>> > >> >
>> > >> > To enrich your data, have you checked 
>> > >> > https://cloud.google.com/dataflow/docs/guides/enrichment?
>> > >> >
>> > >> > This transform is built on top of 
>> > >> > https://beam.apache.org/documentation/io/built-in/webapis/
>> > >> >
>> > >> > On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas 
>> > >> >  wrote:
>> > >> >>
>> > >> >> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  
>> > >> >> wrote:
>> > >> >> >
>> > >> >> > Here is an example from a book that I'm reading now and it may be 
>> > >> >> > applicable.
>> > >> >> >
>> > >> >> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
>> > >> >> > PYTHON - ord(id[0]) % 100
>> > >> >>
>> > >> >> Maybe this is what I'm looking for. I'll give it a try. Thanks!
>> > >> >>
>> > >> >> >
>> > >> >> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian 
>> > >> >> >  wrote:
>> > >> >> >>
>> > >> >> >> How about just keeping track of a buffer and flush the buffer 
>> > >> >> >> after 100 messages and if there is a buffer on finish_bundle as 
>> > >> >> >> well?
>> > >> >> >>
>> > >> >> >>
>> > >> >>
>> > >> >> If this is in memory, It could lead to potential loss of data. That 
>> > >> >> is
>> > >> >> why the state is used or at least that is my understanding. but maybe
>> > >> >> there is a way to do this in the state?
>> > >> >>
>> > >> >>
>> > >> >> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas 
>> > >> >> >>  wrote:
>> > >> >> >>>
>> > >> >> >>> Hello guys
>> > >> >> >>>
>> > >> >> >>> Maybe this question was already answered, but I cannot find it  
>> > >> >> >>> and
>> > >> >> >>> want some more input on this topic.
>> > >> >> >>>
>> > >> >> >>> I have some messages that don't have any particular key 
>> > >> >> >>> candidate,
>> > >> >> >>> except the ID,  but I don't want to use it because the idea is to
>> > >> >> >>> group multiple IDs in the same batch.
>> > >> >> >>>
>> > >> >> >>> This is my use case:
>> > >> >> >>>
>> > >> >> >>> I have an endpoint where I'm gonna send the message ID, this 
>> > >> >> >>> endpoint
>> > >> >> >>> is gonna return me certain information which I will use to 
>> > >> >> >>> enrich my
>> > >> >> >>> message. In order to avoid fetching the endpoint per message I 
>> > >> >> >>> want to
>> > >> >> >>> batch it in 100 and send the 100 IDs in one request ( the 
>> > >> >> >>> endpoint
>> > >> >> >>> supports it) . I was thinking on using GroupIntoBatches.
>> > >> >> >>>
>> > >> >> >>> - If I choose the ID as the key, my understanding is that it 
>> > >> >> >>> won't
>> > >> >> >>> work in the way I want (because it will form batches of the same 
>> > >> >> >>> ID).
>> > >> >> >>> - Use a constant will be a problem for parallelism, is that 
>> > >> >> >>> correct?
>> > >> >> >>>
>> > >> >> >>> Then my question is, what should I use as a key? Maybe something
>> > >> >> >>> regarding the timestamp? so I can have groups of messages that 
>> > >> >> >>> arrive
>> > >> >> >>> at a certain second?
>> > >> >> >>>
>> > >> >> >>> Any suggestions would be appreciated
>> > >> >> >>>
>> > >> >> >>> Thanks.


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread XQ Hu via user
I am not sure you still need to do batching since Web API can handle
caching.

If you really need it, I think GoupIntoBatches is a good way to go.

On Mon, Apr 15, 2024 at 11:30 AM Ruben Vargas 
wrote:

> Is there a way to do batching in that transformation? I'm assuming for
> now no. or may be using in conjuntion with GoupIntoBatches
>
> On Mon, Apr 15, 2024 at 9:29 AM Ruben Vargas 
> wrote:
> >
> > Interesting
> >
> > I think the cache feature could be interesting for some use cases I have.
> >
> > On Mon, Apr 15, 2024 at 9:18 AM XQ Hu  wrote:
> > >
> > > For the new web API IO, the page lists these features:
> > >
> > > developers provide minimal code that invokes Web API endpoint
> > > delegate to the transform to handle request retries and exponential
> backoff
> > > optional caching of request and response associations
> > > optional metrics
> > >
> > >
> > > On Mon, Apr 15, 2024 at 10:38 AM Ruben Vargas 
> wrote:
> > >>
> > >> That one looks interesting
> > >>
> > >> What is not clear to me is what are the advantages of using it? Is
> > >> only the error/retry handling? anything in terms of performance?
> > >>
> > >> My PCollection is unbounded but I was thinking of sending my messages
> > >> in batches to the external API in order to gain some performance
> > >> (don't expect to send 1 http request per message).
> > >>
> > >> Thank you very much for all your responses!
> > >>
> > >>
> > >> On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user 
> wrote:
> > >> >
> > >> > To enrich your data, have you checked
> https://cloud.google.com/dataflow/docs/guides/enrichment?
> > >> >
> > >> > This transform is built on top of
> https://beam.apache.org/documentation/io/built-in/webapis/
> > >> >
> > >> > On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas <
> ruben.var...@metova.com> wrote:
> > >> >>
> > >> >> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim 
> wrote:
> > >> >> >
> > >> >> > Here is an example from a book that I'm reading now and it may
> be applicable.
> > >> >> >
> > >> >> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
> > >> >> > PYTHON - ord(id[0]) % 100
> > >> >>
> > >> >> Maybe this is what I'm looking for. I'll give it a try. Thanks!
> > >> >>
> > >> >> >
> > >> >> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian <
> ged1...@gmail.com> wrote:
> > >> >> >>
> > >> >> >> How about just keeping track of a buffer and flush the buffer
> after 100 messages and if there is a buffer on finish_bundle as well?
> > >> >> >>
> > >> >> >>
> > >> >>
> > >> >> If this is in memory, It could lead to potential loss of data.
> That is
> > >> >> why the state is used or at least that is my understanding. but
> maybe
> > >> >> there is a way to do this in the state?
> > >> >>
> > >> >>
> > >> >> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas <
> ruben.var...@metova.com> wrote:
> > >> >> >>>
> > >> >> >>> Hello guys
> > >> >> >>>
> > >> >> >>> Maybe this question was already answered, but I cannot find
> it  and
> > >> >> >>> want some more input on this topic.
> > >> >> >>>
> > >> >> >>> I have some messages that don't have any particular key
> candidate,
> > >> >> >>> except the ID,  but I don't want to use it because the idea is
> to
> > >> >> >>> group multiple IDs in the same batch.
> > >> >> >>>
> > >> >> >>> This is my use case:
> > >> >> >>>
> > >> >> >>> I have an endpoint where I'm gonna send the message ID, this
> endpoint
> > >> >> >>> is gonna return me certain information which I will use to
> enrich my
> > >> >> >>> message. In order to avoid fetching the endpoint per message I
> want to
> > >> >> >>> batch it in 100 and send the 100 IDs in one request ( the
> endpoint
> > >> >> >>> supports it) . I was thinking on using GroupIntoBatches.
> > >> >> >>>
> > >> >> >>> - If I choose the ID as the key, my understanding is that it
> won't
> > >> >> >>> work in the way I want (because it will form batches of the
> same ID).
> > >> >> >>> - Use a constant will be a problem for parallelism, is that
> correct?
> > >> >> >>>
> > >> >> >>> Then my question is, what should I use as a key? Maybe
> something
> > >> >> >>> regarding the timestamp? so I can have groups of messages that
> arrive
> > >> >> >>> at a certain second?
> > >> >> >>>
> > >> >> >>> Any suggestions would be appreciated
> > >> >> >>>
> > >> >> >>> Thanks.
>


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Ruben Vargas
Is there a way to do batching in that transformation? I'm assuming for
now no. or may be using in conjuntion with GoupIntoBatches

On Mon, Apr 15, 2024 at 9:29 AM Ruben Vargas  wrote:
>
> Interesting
>
> I think the cache feature could be interesting for some use cases I have.
>
> On Mon, Apr 15, 2024 at 9:18 AM XQ Hu  wrote:
> >
> > For the new web API IO, the page lists these features:
> >
> > developers provide minimal code that invokes Web API endpoint
> > delegate to the transform to handle request retries and exponential backoff
> > optional caching of request and response associations
> > optional metrics
> >
> >
> > On Mon, Apr 15, 2024 at 10:38 AM Ruben Vargas  
> > wrote:
> >>
> >> That one looks interesting
> >>
> >> What is not clear to me is what are the advantages of using it? Is
> >> only the error/retry handling? anything in terms of performance?
> >>
> >> My PCollection is unbounded but I was thinking of sending my messages
> >> in batches to the external API in order to gain some performance
> >> (don't expect to send 1 http request per message).
> >>
> >> Thank you very much for all your responses!
> >>
> >>
> >> On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user  
> >> wrote:
> >> >
> >> > To enrich your data, have you checked 
> >> > https://cloud.google.com/dataflow/docs/guides/enrichment?
> >> >
> >> > This transform is built on top of 
> >> > https://beam.apache.org/documentation/io/built-in/webapis/
> >> >
> >> > On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas  
> >> > wrote:
> >> >>
> >> >> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  wrote:
> >> >> >
> >> >> > Here is an example from a book that I'm reading now and it may be 
> >> >> > applicable.
> >> >> >
> >> >> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
> >> >> > PYTHON - ord(id[0]) % 100
> >> >>
> >> >> Maybe this is what I'm looking for. I'll give it a try. Thanks!
> >> >>
> >> >> >
> >> >> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian  
> >> >> > wrote:
> >> >> >>
> >> >> >> How about just keeping track of a buffer and flush the buffer after 
> >> >> >> 100 messages and if there is a buffer on finish_bundle as well?
> >> >> >>
> >> >> >>
> >> >>
> >> >> If this is in memory, It could lead to potential loss of data. That is
> >> >> why the state is used or at least that is my understanding. but maybe
> >> >> there is a way to do this in the state?
> >> >>
> >> >>
> >> >> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas  
> >> >> >> wrote:
> >> >> >>>
> >> >> >>> Hello guys
> >> >> >>>
> >> >> >>> Maybe this question was already answered, but I cannot find it  and
> >> >> >>> want some more input on this topic.
> >> >> >>>
> >> >> >>> I have some messages that don't have any particular key candidate,
> >> >> >>> except the ID,  but I don't want to use it because the idea is to
> >> >> >>> group multiple IDs in the same batch.
> >> >> >>>
> >> >> >>> This is my use case:
> >> >> >>>
> >> >> >>> I have an endpoint where I'm gonna send the message ID, this 
> >> >> >>> endpoint
> >> >> >>> is gonna return me certain information which I will use to enrich my
> >> >> >>> message. In order to avoid fetching the endpoint per message I want 
> >> >> >>> to
> >> >> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint
> >> >> >>> supports it) . I was thinking on using GroupIntoBatches.
> >> >> >>>
> >> >> >>> - If I choose the ID as the key, my understanding is that it won't
> >> >> >>> work in the way I want (because it will form batches of the same 
> >> >> >>> ID).
> >> >> >>> - Use a constant will be a problem for parallelism, is that correct?
> >> >> >>>
> >> >> >>> Then my question is, what should I use as a key? Maybe something
> >> >> >>> regarding the timestamp? so I can have groups of messages that 
> >> >> >>> arrive
> >> >> >>> at a certain second?
> >> >> >>>
> >> >> >>> Any suggestions would be appreciated
> >> >> >>>
> >> >> >>> Thanks.


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Ruben Vargas
Interesting

I think the cache feature could be interesting for some use cases I have.

On Mon, Apr 15, 2024 at 9:18 AM XQ Hu  wrote:
>
> For the new web API IO, the page lists these features:
>
> developers provide minimal code that invokes Web API endpoint
> delegate to the transform to handle request retries and exponential backoff
> optional caching of request and response associations
> optional metrics
>
>
> On Mon, Apr 15, 2024 at 10:38 AM Ruben Vargas  wrote:
>>
>> That one looks interesting
>>
>> What is not clear to me is what are the advantages of using it? Is
>> only the error/retry handling? anything in terms of performance?
>>
>> My PCollection is unbounded but I was thinking of sending my messages
>> in batches to the external API in order to gain some performance
>> (don't expect to send 1 http request per message).
>>
>> Thank you very much for all your responses!
>>
>>
>> On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user  wrote:
>> >
>> > To enrich your data, have you checked 
>> > https://cloud.google.com/dataflow/docs/guides/enrichment?
>> >
>> > This transform is built on top of 
>> > https://beam.apache.org/documentation/io/built-in/webapis/
>> >
>> > On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas  
>> > wrote:
>> >>
>> >> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  wrote:
>> >> >
>> >> > Here is an example from a book that I'm reading now and it may be 
>> >> > applicable.
>> >> >
>> >> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
>> >> > PYTHON - ord(id[0]) % 100
>> >>
>> >> Maybe this is what I'm looking for. I'll give it a try. Thanks!
>> >>
>> >> >
>> >> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian  
>> >> > wrote:
>> >> >>
>> >> >> How about just keeping track of a buffer and flush the buffer after 
>> >> >> 100 messages and if there is a buffer on finish_bundle as well?
>> >> >>
>> >> >>
>> >>
>> >> If this is in memory, It could lead to potential loss of data. That is
>> >> why the state is used or at least that is my understanding. but maybe
>> >> there is a way to do this in the state?
>> >>
>> >>
>> >> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas  
>> >> >> wrote:
>> >> >>>
>> >> >>> Hello guys
>> >> >>>
>> >> >>> Maybe this question was already answered, but I cannot find it  and
>> >> >>> want some more input on this topic.
>> >> >>>
>> >> >>> I have some messages that don't have any particular key candidate,
>> >> >>> except the ID,  but I don't want to use it because the idea is to
>> >> >>> group multiple IDs in the same batch.
>> >> >>>
>> >> >>> This is my use case:
>> >> >>>
>> >> >>> I have an endpoint where I'm gonna send the message ID, this endpoint
>> >> >>> is gonna return me certain information which I will use to enrich my
>> >> >>> message. In order to avoid fetching the endpoint per message I want to
>> >> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint
>> >> >>> supports it) . I was thinking on using GroupIntoBatches.
>> >> >>>
>> >> >>> - If I choose the ID as the key, my understanding is that it won't
>> >> >>> work in the way I want (because it will form batches of the same ID).
>> >> >>> - Use a constant will be a problem for parallelism, is that correct?
>> >> >>>
>> >> >>> Then my question is, what should I use as a key? Maybe something
>> >> >>> regarding the timestamp? so I can have groups of messages that arrive
>> >> >>> at a certain second?
>> >> >>>
>> >> >>> Any suggestions would be appreciated
>> >> >>>
>> >> >>> Thanks.


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread XQ Hu via user
For the new web API IO, the page lists these features:

   - developers provide minimal code that invokes Web API endpoint
   - delegate to the transform to handle request retries and exponential
   backoff
   - optional caching of request and response associations
   - optional metrics


On Mon, Apr 15, 2024 at 10:38 AM Ruben Vargas 
wrote:

> That one looks interesting
>
> What is not clear to me is what are the advantages of using it? Is
> only the error/retry handling? anything in terms of performance?
>
> My PCollection is unbounded but I was thinking of sending my messages
> in batches to the external API in order to gain some performance
> (don't expect to send 1 http request per message).
>
> Thank you very much for all your responses!
>
>
> On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user 
> wrote:
> >
> > To enrich your data, have you checked
> https://cloud.google.com/dataflow/docs/guides/enrichment?
> >
> > This transform is built on top of
> https://beam.apache.org/documentation/io/built-in/webapis/
> >
> > On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas 
> wrote:
> >>
> >> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  wrote:
> >> >
> >> > Here is an example from a book that I'm reading now and it may be
> applicable.
> >> >
> >> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
> >> > PYTHON - ord(id[0]) % 100
> >>
> >> Maybe this is what I'm looking for. I'll give it a try. Thanks!
> >>
> >> >
> >> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian 
> wrote:
> >> >>
> >> >> How about just keeping track of a buffer and flush the buffer after
> 100 messages and if there is a buffer on finish_bundle as well?
> >> >>
> >> >>
> >>
> >> If this is in memory, It could lead to potential loss of data. That is
> >> why the state is used or at least that is my understanding. but maybe
> >> there is a way to do this in the state?
> >>
> >>
> >> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas 
> wrote:
> >> >>>
> >> >>> Hello guys
> >> >>>
> >> >>> Maybe this question was already answered, but I cannot find it  and
> >> >>> want some more input on this topic.
> >> >>>
> >> >>> I have some messages that don't have any particular key candidate,
> >> >>> except the ID,  but I don't want to use it because the idea is to
> >> >>> group multiple IDs in the same batch.
> >> >>>
> >> >>> This is my use case:
> >> >>>
> >> >>> I have an endpoint where I'm gonna send the message ID, this
> endpoint
> >> >>> is gonna return me certain information which I will use to enrich my
> >> >>> message. In order to avoid fetching the endpoint per message I want
> to
> >> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint
> >> >>> supports it) . I was thinking on using GroupIntoBatches.
> >> >>>
> >> >>> - If I choose the ID as the key, my understanding is that it won't
> >> >>> work in the way I want (because it will form batches of the same
> ID).
> >> >>> - Use a constant will be a problem for parallelism, is that correct?
> >> >>>
> >> >>> Then my question is, what should I use as a key? Maybe something
> >> >>> regarding the timestamp? so I can have groups of messages that
> arrive
> >> >>> at a certain second?
> >> >>>
> >> >>> Any suggestions would be appreciated
> >> >>>
> >> >>> Thanks.
>


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Ruben Vargas
That one looks interesting

What is not clear to me is what are the advantages of using it? Is
only the error/retry handling? anything in terms of performance?

My PCollection is unbounded but I was thinking of sending my messages
in batches to the external API in order to gain some performance
(don't expect to send 1 http request per message).

Thank you very much for all your responses!


On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user  wrote:
>
> To enrich your data, have you checked 
> https://cloud.google.com/dataflow/docs/guides/enrichment?
>
> This transform is built on top of 
> https://beam.apache.org/documentation/io/built-in/webapis/
>
> On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas  wrote:
>>
>> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  wrote:
>> >
>> > Here is an example from a book that I'm reading now and it may be 
>> > applicable.
>> >
>> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
>> > PYTHON - ord(id[0]) % 100
>>
>> Maybe this is what I'm looking for. I'll give it a try. Thanks!
>>
>> >
>> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian  
>> > wrote:
>> >>
>> >> How about just keeping track of a buffer and flush the buffer after 100 
>> >> messages and if there is a buffer on finish_bundle as well?
>> >>
>> >>
>>
>> If this is in memory, It could lead to potential loss of data. That is
>> why the state is used or at least that is my understanding. but maybe
>> there is a way to do this in the state?
>>
>>
>> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas  
>> >> wrote:
>> >>>
>> >>> Hello guys
>> >>>
>> >>> Maybe this question was already answered, but I cannot find it  and
>> >>> want some more input on this topic.
>> >>>
>> >>> I have some messages that don't have any particular key candidate,
>> >>> except the ID,  but I don't want to use it because the idea is to
>> >>> group multiple IDs in the same batch.
>> >>>
>> >>> This is my use case:
>> >>>
>> >>> I have an endpoint where I'm gonna send the message ID, this endpoint
>> >>> is gonna return me certain information which I will use to enrich my
>> >>> message. In order to avoid fetching the endpoint per message I want to
>> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint
>> >>> supports it) . I was thinking on using GroupIntoBatches.
>> >>>
>> >>> - If I choose the ID as the key, my understanding is that it won't
>> >>> work in the way I want (because it will form batches of the same ID).
>> >>> - Use a constant will be a problem for parallelism, is that correct?
>> >>>
>> >>> Then my question is, what should I use as a key? Maybe something
>> >>> regarding the timestamp? so I can have groups of messages that arrive
>> >>> at a certain second?
>> >>>
>> >>> Any suggestions would be appreciated
>> >>>
>> >>> Thanks.


Re: Any recomendation for key for GroupIntoBatches

2024-04-14 Thread XQ Hu via user
To enrich your data, have you checked
https://cloud.google.com/dataflow/docs/guides/enrichment?

This transform is built on top of
https://beam.apache.org/documentation/io/built-in/webapis/

On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas 
wrote:

> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  wrote:
> >
> > Here is an example from a book that I'm reading now and it may be
> applicable.
> >
> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
> > PYTHON - ord(id[0]) % 100
>
> Maybe this is what I'm looking for. I'll give it a try. Thanks!
>
> >
> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian 
> wrote:
> >>
> >> How about just keeping track of a buffer and flush the buffer after 100
> messages and if there is a buffer on finish_bundle as well?
> >>
> >>
>
> If this is in memory, It could lead to potential loss of data. That is
> why the state is used or at least that is my understanding. but maybe
> there is a way to do this in the state?
>
>
> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas 
> wrote:
> >>>
> >>> Hello guys
> >>>
> >>> Maybe this question was already answered, but I cannot find it  and
> >>> want some more input on this topic.
> >>>
> >>> I have some messages that don't have any particular key candidate,
> >>> except the ID,  but I don't want to use it because the idea is to
> >>> group multiple IDs in the same batch.
> >>>
> >>> This is my use case:
> >>>
> >>> I have an endpoint where I'm gonna send the message ID, this endpoint
> >>> is gonna return me certain information which I will use to enrich my
> >>> message. In order to avoid fetching the endpoint per message I want to
> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint
> >>> supports it) . I was thinking on using GroupIntoBatches.
> >>>
> >>> - If I choose the ID as the key, my understanding is that it won't
> >>> work in the way I want (because it will form batches of the same ID).
> >>> - Use a constant will be a problem for parallelism, is that correct?
> >>>
> >>> Then my question is, what should I use as a key? Maybe something
> >>> regarding the timestamp? so I can have groups of messages that arrive
> >>> at a certain second?
> >>>
> >>> Any suggestions would be appreciated
> >>>
> >>> Thanks.
>


Re: Any recomendation for key for GroupIntoBatches

2024-04-12 Thread Ruben Vargas
On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  wrote:
>
> Here is an example from a book that I'm reading now and it may be applicable.
>
> JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
> PYTHON - ord(id[0]) % 100

Maybe this is what I'm looking for. I'll give it a try. Thanks!

>
> On Sat, 13 Apr 2024 at 06:12, George Dekermenjian  wrote:
>>
>> How about just keeping track of a buffer and flush the buffer after 100 
>> messages and if there is a buffer on finish_bundle as well?
>>
>>

If this is in memory, It could lead to potential loss of data. That is
why the state is used or at least that is my understanding. but maybe
there is a way to do this in the state?


>> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas  wrote:
>>>
>>> Hello guys
>>>
>>> Maybe this question was already answered, but I cannot find it  and
>>> want some more input on this topic.
>>>
>>> I have some messages that don't have any particular key candidate,
>>> except the ID,  but I don't want to use it because the idea is to
>>> group multiple IDs in the same batch.
>>>
>>> This is my use case:
>>>
>>> I have an endpoint where I'm gonna send the message ID, this endpoint
>>> is gonna return me certain information which I will use to enrich my
>>> message. In order to avoid fetching the endpoint per message I want to
>>> batch it in 100 and send the 100 IDs in one request ( the endpoint
>>> supports it) . I was thinking on using GroupIntoBatches.
>>>
>>> - If I choose the ID as the key, my understanding is that it won't
>>> work in the way I want (because it will form batches of the same ID).
>>> - Use a constant will be a problem for parallelism, is that correct?
>>>
>>> Then my question is, what should I use as a key? Maybe something
>>> regarding the timestamp? so I can have groups of messages that arrive
>>> at a certain second?
>>>
>>> Any suggestions would be appreciated
>>>
>>> Thanks.


Re: Any recomendation for key for GroupIntoBatches

2024-04-12 Thread Jaehyeon Kim
Here is an example from a book that I'm reading now and it may be
applicable.

JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
PYTHON - ord(id[0]) % 100

On Sat, 13 Apr 2024 at 06:12, George Dekermenjian  wrote:

> How about just keeping track of a buffer and flush the buffer after 100
> messages and if there is a buffer on finish_bundle as well?
>
>
> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas 
> wrote:
>
>> Hello guys
>>
>> Maybe this question was already answered, but I cannot find it  and
>> want some more input on this topic.
>>
>> I have some messages that don't have any particular key candidate,
>> except the ID,  but I don't want to use it because the idea is to
>> group multiple IDs in the same batch.
>>
>> This is my use case:
>>
>> I have an endpoint where I'm gonna send the message ID, this endpoint
>> is gonna return me certain information which I will use to enrich my
>> message. In order to avoid fetching the endpoint per message I want to
>> batch it in 100 and send the 100 IDs in one request ( the endpoint
>> supports it) . I was thinking on using GroupIntoBatches.
>>
>> - If I choose the ID as the key, my understanding is that it won't
>> work in the way I want (because it will form batches of the same ID).
>> - Use a constant will be a problem for parallelism, is that correct?
>>
>> Then my question is, what should I use as a key? Maybe something
>> regarding the timestamp? so I can have groups of messages that arrive
>> at a certain second?
>>
>> Any suggestions would be appreciated
>>
>> Thanks.
>>
>


Re: Any recomendation for key for GroupIntoBatches

2024-04-12 Thread George Dekermenjian
How about just keeping track of a buffer and flush the buffer after 100
messages and if there is a buffer on finish_bundle as well?


On Fri, Apr 12, 2024 at 21.23 Ruben Vargas  wrote:

> Hello guys
>
> Maybe this question was already answered, but I cannot find it  and
> want some more input on this topic.
>
> I have some messages that don't have any particular key candidate,
> except the ID,  but I don't want to use it because the idea is to
> group multiple IDs in the same batch.
>
> This is my use case:
>
> I have an endpoint where I'm gonna send the message ID, this endpoint
> is gonna return me certain information which I will use to enrich my
> message. In order to avoid fetching the endpoint per message I want to
> batch it in 100 and send the 100 IDs in one request ( the endpoint
> supports it) . I was thinking on using GroupIntoBatches.
>
> - If I choose the ID as the key, my understanding is that it won't
> work in the way I want (because it will form batches of the same ID).
> - Use a constant will be a problem for parallelism, is that correct?
>
> Then my question is, what should I use as a key? Maybe something
> regarding the timestamp? so I can have groups of messages that arrive
> at a certain second?
>
> Any suggestions would be appreciated
>
> Thanks.
>