Below is code from Google for grouping streaming messages into groups with a shard number. If 100 messages come into the window, and you have a shard size of 5, then you get:
(1, [<20 messages>]) (2, [<20 messages>]) .... (5, [<20 messages>]) What is the purpose of this sharding? If I am to understand this, it is to properly parallelize data. For example, a subsequent transformation that calls a ParDo, will call the ParDo 5 times. You then have a limited number of messages. So for example, if you had to upsert them to Postgres, you could upsert 20 at a time, rather than overloading the DB. (This particular question on upserting is for another thread.) Am I understanding this correctly? Here is the gist link. I have posted the code below, but I know it will not format correctly. What is the correct protocol for posting code? https://gist.github.com/paulhtremblay/d7e1080262fcd0e78feff42363bec208.js class GroupMessagesByFixedWindows(PTransform): """A composite transform that groups Kafka messages based on publish time and outputs a list of tuples, each containing a message. """ def __init__(self, window_size:float, num_shards:int=5): self.window_size = int(window_size ) self.num_shards = num_shards def expand(self, pcoll:apache_beam.pvalue.PCollection) -> apache_beam.pvalue.PCollection : """ Group messages into shards, with each shard containing a list of topics :param: pcol, apache beam pcollection """ return ( pcoll | "Window into fixed intervals" >> WindowInto(FixedWindows(self.window_size)) #random key added | "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1)) | "Group by key" >> GroupByKey() ) -- Henry Tremblay Data Engineer, Best Buy