Hi,

Might be late to the discussion, but providing another option (as I think it was not mentioned or I missed it). Take a look at [this](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements) as I think this is precisely what you want to achieve.

Compared to other answers:

- this one is elastic, to fit any downstream use case

- no custom code - native Beam transform

- no shuffling of the data required as the data would be batched on the worker already having the data (but pay attention to the max msg size limit of your runner) - shuffling would be required when creating artificial random-looking keys.

Note that above is Python, but I do bet there is Java counterpart (or at least easy to implement).

Best

Wiśniowski Piotr


On 15.04.2024 19:14, Reuven Lax via user wrote:
There are various strategies. Here is an example of how Beam does it (taken from Reshuffle.viaRandomKey().withNumBuckets(N)

Note that this does some extra hashing to work around issues with the Spark runner. If you don't care about that, you could implement something simpler (e.g. initialize shard to a random number in StartBundle, and increment it mod numBuckets in each processelement call).
public static class AssignShardFn<T> extends DoFn<T,KV<Integer,T>> {
   private int shard;
   private @Nullable Integer numBuckets;

   public AssignShardFn(@Nullable Integer numBuckets) {
     this.numBuckets = numBuckets;
   }

   @Setup public void setup() {
     shard =ThreadLocalRandom.current().nextInt();
   }

   @ProcessElement public void processElement(@Element T 
element,OutputReceiver<KV<Integer,T>> r) {
     ++shard;
// Smear the shard into something more random-looking, to avoid issues // with runners that don't properly hash the key being shuffled, but rely // on it being random-looking. E.g. Spark takes the Java hashCode() of keys, // which for Integer is a no-op and it is an issue: // http://hydronitrogen.com/poor-hash-partitioning-of-timestamps-integers-and-longs-in- // spark.html // This hashing strategy is copied from // org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Hashing.smear(). int hashOfShard =0x1b873593 *Integer.rotateLeft(shard *0xcc9e2d51,15);
     if (numBuckets !=null) {
       UnsignedInteger unsignedNumBuckets 
=UnsignedInteger.fromIntBits(numBuckets);
       hashOfShard 
=UnsignedInteger.fromIntBits(hashOfShard).mod(unsignedNumBuckets).intValue();
     }
     r.output(KV.of(hashOfShard, element));
   }
}

On Mon, Apr 15, 2024 at 10:01 AM Damon Douglas <damondoug...@apache.org> wrote:

    Good day, Ruben,

    Would you be able to compute a shasum on the group of IDs to use
    as the key?

    Best,

    Damon

    On 2024/04/12 19:22:45 Ruben Vargas wrote:
    > Hello guys
    >
    > Maybe this question was already answered, but I cannot find it  and
    > want some more input on this topic.
    >
    > I have some messages that don't have any particular key candidate,
    > except the ID,  but I don't want to use it because the idea is to
    > group multiple IDs in the same batch.
    >
    > This is my use case:
    >
    > I have an endpoint where I'm gonna send the message ID, this
    endpoint
    > is gonna return me certain information which I will use to enrich my
    > message. In order to avoid fetching the endpoint per message I
    want to
    > batch it in 100 and send the 100 IDs in one request ( the endpoint
    > supports it) . I was thinking on using GroupIntoBatches.
    >
    > - If I choose the ID as the key, my understanding is that it won't
    > work in the way I want (because it will form batches of the same
    ID).
    > - Use a constant will be a problem for parallelism, is that correct?
    >
    > Then my question is, what should I use as a key? Maybe something
    > regarding the timestamp? so I can have groups of messages that
    arrive
    > at a certain second?
    >
    > Any suggestions would be appreciated
    >
    > Thanks.
    >

Reply via email to