State is per-key, and keys are distributed across workers. Two workers should not be working on the same state.
On Thu, Mar 2, 2023 at 10:48 AM Binh Nguyen Van <binhn...@gmail.com> wrote: > Thank you Ankur, > > This is the current source code of Deduplicate transform. > > Boolean seen = seenState.read(); > // Seen state is either set or not set so if it has been set then it > must be true. > if (seen == null) { > // We don't want the expiry timer to hold up watermarks. > expiryTimer.offset(duration).withNoOutputTimestamp().setRelative(); > seenState.write(true); > receiver.output(element); > } > > Could you please explain the synchronization for the following scenario? > > - There are two workers. > - Both workers read the same state at the same time and the state was > not set yet. In this case, both will get null in the response (I > believe) > - Both of them will try to set the state and send the output out. > > What will happen in this scenario? > > Thank you > -Binh > > On Thu, Mar 2, 2023 at 10:29 AM Ankur Goenka <ankurgoe...@gmail.com> > wrote: > >> Hi Binh, The Deduplicate transform uses state api to do the >> de-duplication which should do the needful operations to work across >> multiple concurrent workers. >> >> Thanks, >> Ankur >> >> On Thu, 2 Mar 2023 at 10:18, Binh Nguyen Van <binhn...@gmail.com> wrote: >> >>> Hi, >>> >>> I am writing a pipeline and want to apply deduplication. I look at >>> Deduplicate transform that Beam provides and wonder about its usage. Do >>> I need to shuffle input collection by key before calling this >>> transformation? I look at its source code and it doesn’t do any shuffle so >>> wonder how it works when let’s say there are duplicates and the duplicated >>> elements are processed concurrently on multiple workers. >>> >>> Thank you >>> -Binh >>> >>