Below is code from Google for grouping streaming messages into groups with
a shard number. If 100 messages come into the window, and you have a shard
size of 5, then you get:

(1, [<20 messages>])
(2, [<20 messages>])
....
(5, [<20 messages>])

What is the purpose of this sharding? If I am to understand this, it is to
properly parallelize data. For example, a subsequent transformation that
calls a ParDo, will call the ParDo 5 times. You then have a limited number
of messages. So for example, if you had to upsert them to Postgres, you
could upsert 20 at a time, rather than overloading the DB. (This particular
question on upserting is for another thread.)

Am I understanding this correctly?

Here is the gist link. I have posted the code below, but I know it will not
format correctly. What is the correct protocol for posting code?

https://gist.github.com/paulhtremblay/d7e1080262fcd0e78feff42363bec208.js

class GroupMessagesByFixedWindows(PTransform):
    """A composite transform that groups Kafka messages based on publish
time
    and outputs a list of tuples, each containing a message.
    """

    def __init__(self, window_size:float, num_shards:int=5):
        self.window_size = int(window_size )
        self.num_shards = num_shards

    def expand(self, pcoll:apache_beam.pvalue.PCollection) ->
apache_beam.pvalue.PCollection :

        """
        Group messages into shards, with each shard containing a list of
topics

        :param: pcol, apache beam pcollection

        """
        return (
            pcoll
            | "Window into fixed intervals" >>
WindowInto(FixedWindows(self.window_size))
            #random key added
            | "Add key" >> WithKeys(lambda _: random.randint(0,
self.num_shards - 1))
            | "Group by key" >> GroupByKey()
        )


-- 
Henry Tremblay
Data Engineer, Best Buy

Reply via email to