Let's clarify what NextOp is.
Is it operating on a keyed stream, or is something like a map function that has some internal state based on keys of the input elements (e.g., it has something like a Map<Key, SomeState> that it queries/modifies for each input element)?

If NextOp operators on a keyed stream then it's (keyed) state will be redistributed. If it is not a keyed state then, in your example, it will never receive another element with key A.

On 2/22/2021 12:07 PM, Tripathi,Vikash wrote:

Just needed more clarity in terms of a processing scenario.

Say, I was having records of key ‘A’ on a parallel instance ‘Op1’ of operator ‘Op’ and the next operator ‘NextOp’ in the sequence of transformation was getting records of key ‘A’ on it’s parallel instance ‘NextOp2’ at the time when the savepoint was made.

Now, the application has been rescaled to a parallelism level of say 4 as against 2 which was the case at the time of savepoint.

Now, let’s say key ‘A’ records land up to ‘NextOp4’, parallel instance of ‘NextOp’ operator after re-scaling but the operations being performed in this ‘NextOp’ operator demands a windowing event based on event time processing that has still not been finished even after restarting the application from previous savepoint. Some records of the same key ‘A’ lie together to be processed in parallel instance ‘NextOp2’ as was the case during savepoint and the new set of records for the same key now happen to be redirected together, for being processed on the parallel instance ‘NextOp4’ of the same operator. However, to generate a consistent result, the event time window needs to do calculations that take into account both the record sets for key ‘A’ which are present on different instances of the same operator ‘NextOp’.

How will flink runtime handle such a situation?

*From:*Chesnay Schepler <ches...@apache.org>
*Sent:* Friday, February 19, 2021 12:52 AM
*To:* yidan zhao <hinobl...@gmail.com>; Tripathi,Vikash <vikash.tripa...@cerner.com>
*Cc:* user <user@flink.apache.org>
*Subject:* Re: Sharding of Operators

When you change the parallelism then keys are re-distributed across operators instances.

/However/, this re-distribution is limited to the set /maxParallelism /(set via the ExecutionConfig), which by default is 128 if no operators exceeded the parallelism on the first submission.

This *cannot be changed* after the job was run without discarding state.

See https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/production_ready.html#set-an-explicit-max-parallelism <https://nam10.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.12%2Fops%2Fproduction_ready.html%23set-an-explicit-max-parallelism&data=04%7C01%7CVikash.Tripathi%40cerner.com%7C2cd9743df82c4ece5a1608d8d44280de%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637492729385864292%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=gR3eEI3KVcKcgDXTYYx5OveHP1fLk2LJmjxWeAsoxbs%3D&reserved=0>

On 2/18/2021 8:29 AM, yidan zhao wrote:

    Actually, we only need to ensure all records belonging to the same
    key will be forwarded to the same operator instance(i), and we do
    not need to guarantee that 'i' is the same with the 'i' in
    previous savepoints. When the job is restarted, the rule 'same
    key's record will be in together' is guaranteed and more slots
    will be surely useful, since each slot(operator instance) will be
    responsible for less keys, leading to less records.

    Tripathi,Vikash <vikash.tripa...@cerner.com
    <mailto:vikash.tripa...@cerner.com>> 于2021年2月18日周四
    上午12:09写道:

        Hi there,

        I wanted to know how re-partitioning of keys per operator
        instance would happen when the current operator instances are
        scaled up or down and we are restarting our job from a
        previous savepoint which had a different number of parallel
        instances of the same operator.

        My main concern is whether the re-distribution would lead to
        mapping of same keys to same operator instances as was done
        earlier but if this happens then there would be no added
        advantage of adding new task slots for the same operator
        because they would remain less used or not used at all if all
        possible key values have been seen earlier and if we go by the
        other way around of evenly distributing out keys (based on the
        hash function) to the new parallel slots as well, won't this
        cause issues in terms of processing consistent results based
        on the state of operator as was provided by previous savepoint
        of application.

        Is there a guarantee given by the hash function as in attached
        snippet, that same keys which landed earlier on an operator
        instance will land back again to the same operator instance
        once the job is restarted with new set of parallelism
        configuration?

        Thanks,

        Vikash

        CONFIDENTIALITY NOTICE This message and any included
        attachments are from Cerner Corporation and are intended only
        for the addressee. The information contained in this message
        is confidential and may constitute inside or non-public
        information under international, federal, or state securities
        laws. Unauthorized forwarding, printing, copying,
        distribution, or use of such information is strictly
        prohibited and may be unlawful. If you are not the addressee,
        please promptly delete this message and notify the sender of
        the delivery error by e-mail or you may call Cerner's
        corporate offices in Kansas City, Missouri, U.S.A at (+1)
        (816)221-1024.


Reply via email to