There are various strategies. Here is an example of how Beam does it (taken
from Reshuffle.viaRandomKey().withNumBuckets(N)

Note that this does some extra hashing to work around issues with the Spark
runner. If you don't care about that, you could implement something simpler
(e.g. initialize shard to a random number in StartBundle, and increment it
mod numBuckets in each processelement call).

public static class AssignShardFn<T> extends DoFn<T, KV<Integer, T>> {
  private int shard;
  private @Nullable Integer numBuckets;

  public AssignShardFn(@Nullable Integer numBuckets) {
    this.numBuckets = numBuckets;
  }

  @Setup
  public void setup() {
    shard = ThreadLocalRandom.current().nextInt();
  }

  @ProcessElement
  public void processElement(@Element T element,
OutputReceiver<KV<Integer, T>> r) {
    ++shard;
    // Smear the shard into something more random-looking, to avoid issues
    // with runners that don't properly hash the key being shuffled, but rely
    // on it being random-looking. E.g. Spark takes the Java hashCode() of keys,
    // which for Integer is a no-op and it is an issue:
    // 
http://hydronitrogen.com/poor-hash-partitioning-of-timestamps-integers-and-longs-in-
    // spark.html
    // This hashing strategy is copied from
    // 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Hashing.smear().
    int hashOfShard = 0x1b873593 * Integer.rotateLeft(shard * 0xcc9e2d51, 15);
    if (numBuckets != null) {
      UnsignedInteger unsignedNumBuckets =
UnsignedInteger.fromIntBits(numBuckets);
      hashOfShard =
UnsignedInteger.fromIntBits(hashOfShard).mod(unsignedNumBuckets).intValue();
    }
    r.output(KV.of(hashOfShard, element));
  }
}



On Mon, Apr 15, 2024 at 10:01 AM Damon Douglas <[email protected]>
wrote:

> Good day, Ruben,
>
> Would you be able to compute a shasum on the group of IDs to use as the
> key?
>
> Best,
>
> Damon
>
> On 2024/04/12 19:22:45 Ruben Vargas wrote:
> > Hello guys
> >
> > Maybe this question was already answered, but I cannot find it  and
> > want some more input on this topic.
> >
> > I have some messages that don't have any particular key candidate,
> > except the ID,  but I don't want to use it because the idea is to
> > group multiple IDs in the same batch.
> >
> > This is my use case:
> >
> > I have an endpoint where I'm gonna send the message ID, this endpoint
> > is gonna return me certain information which I will use to enrich my
> > message. In order to avoid fetching the endpoint per message I want to
> > batch it in 100 and send the 100 IDs in one request ( the endpoint
> > supports it) . I was thinking on using GroupIntoBatches.
> >
> > - If I choose the ID as the key, my understanding is that it won't
> > work in the way I want (because it will form batches of the same ID).
> > - Use a constant will be a problem for parallelism, is that correct?
> >
> > Then my question is, what should I use as a key? Maybe something
> > regarding the timestamp? so I can have groups of messages that arrive
> > at a certain second?
> >
> > Any suggestions would be appreciated
> >
> > Thanks.
> >
>

Reply via email to