Your code looks like what I was describing. My only comment would be to use a deterministic hashing function which is stable across JVM versions and JVM instances as it will help in making your pipeline consistent across different runs/environments.
Parallelizing across 8 instances instead of 4 would break the contract around GroupByKey (since it didn't group all the elements for a key correctly). Also, each element is the smallest unit of work and specifically in your pipeline you have chosen to reduce all your elements into 4 logical elements (each containing some proportion of your original data). On Tue, Jun 6, 2017 at 9:37 AM, Josh <jof...@gmail.com> wrote: > Thanks for the reply, Lukasz. > > > What I meant was that I want to shard my data by a "shard key", and be > sure that any two elements with the same "shard key" are processed by the > same thread on the same worker. (Or if that's not possible, by the same > worker JVM with no thread guarantee would be good enough). It doesn't > actually matter to me whether there's 1 or 4 or 100 DoFn instances > processing the data. > > > It sounds like what you suggested will work for this, with the downside of > me needing to choose a number of shards/DoFns (e.g. 4). > > It seems a bit long and messy but am I right in thinking it would look > like this? ... > > > PCollection<MyElement> elements = ...; > > elements > > .apply(MapElements > > .into(TypeDescriptors.kvs(TypeDescriptors.integers(), > TypeDescriptor.of(MyElement.class))) > > .via((MyElement e) -> KV.of( > > e.getKey().toString().hashCode() % 4, e))) > > .apply(GroupByKey.create()) > > .apply(Partition.of(4, > > (Partition.PartitionFn<KV<Integer, Iterable<MyElement>>>) (kv, i) -> > kv.getKey())) > > .apply(ParDo.of(new MyDofn())); > > // Where MyDofn must be changed to handle a KV<Integer, > Iterable<MyElement>> as input instead of just a MyElement > > > I was wondering is there a guarantee that the runner won't parallelise the > final MyDofn across e.g. 8 instances instead of 4? If there are two input > elements with the same key are they actually guaranteed to be processed on > the same instance? > > > Thanks, > > Josh > > > > > On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik <lc...@google.com> wrote: > >> I think this is what your asking for but your statement about 4 instances >> is unclear as to whether that is 4 copies of the same DoFn or 4 completely >> different DoFns. Also its unclear what you mean by instance/thread, I'm >> assuming that you want at most 4 instances of a DoFn each being processed >> by a single thread. >> >> This is a bad idea because you limit your parallelism but this is similar >> to what the default file sharding logic does. In Apache Beam the smallest >> unit of output for a GroupByKey is a single key+iterable pair. We exploit >> this by assigning all our values to a fixed number of keys and then >> performing a GroupByKey. This is the same trick that powers the file >> sharding logic in AvroIO/TextIO/... >> >> Your pipeline would look like (fixed width font diagram): >> your data -> apply shard key -> GroupByKey -> partition >> by key -> your dofn #1 >> >> \> your dofn #2 >> >> \> ... >> a / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ??? >> >> This is not exactly the same as processing a single DoFn instance/thread >> because it relies on the Runner to be able to schedule each key to be >> processed on a different machine. For example a Runner may choose to >> process value 1,[a,c] and 2,[b,d] sequentially on the same machine or may >> choose to distribute them. >> >> >> >> On Tue, Jun 6, 2017 at 8:13 AM, Josh <jof...@gmail.com> wrote: >> >>> Hey Lukasz, >>> >>> I have a follow up question about this - >>> >>> What if I want to do something very similar, but instead of with 4 >>> instances of AvroIO following the partition transform, I want 4 instances >>> of a DoFn that I've written. I want to ensure that each partition is >>> processed by a single DoFn instance/thread. Is this possible with Beam? >>> >>> Thanks, >>> Josh >>> >>> >>> >>> On Wed, May 24, 2017 at 6:15 PM, Josh <jof...@gmail.com> wrote: >>> >>>> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz! >>>> >>>> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lc...@google.com> wrote: >>>> >>>>> Google Cloud Dataflow won't override your setting. The dynamic >>>>> sharding occurs if you don't explicitly set a numShard value. >>>>> >>>>> On Wed, May 24, 2017 at 9:14 AM, Josh <jof...@gmail.com> wrote: >>>>> >>>>>> Hi Lukasz, >>>>>> >>>>>> Thanks for the example. That sounds like a nice solution - >>>>>> I am running on Dataflow though, which dynamically sets numShards - >>>>>> so if I set numShards to 1 on each of those AvroIO writers, I can't be >>>>>> sure >>>>>> that Dataflow isn't going to override my setting right? I guess this >>>>>> should >>>>>> work fine as long as I partition my stream into a large enough number of >>>>>> partitions so that Dataflow won't override numShards. >>>>>> >>>>>> Josh >>>>>> >>>>>> >>>>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com> >>>>>> wrote: >>>>>> >>>>>>> Since your using a small number of shards, add a Partition transform >>>>>>> which uses a deterministic hash of the key to choose one of 4 >>>>>>> partitions. >>>>>>> Write each partition with a single shard. >>>>>>> >>>>>>> (Fixed width diagram below) >>>>>>> Pipeline -> AvroIO(numShards = 4) >>>>>>> Becomes: >>>>>>> Pipeline -> Partition --> AvroIO(numShards = 1) >>>>>>> |-> AvroIO(numShards = 1) >>>>>>> |-> AvroIO(numShards = 1) >>>>>>> \-> AvroIO(numShards = 1) >>>>>>> >>>>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jof...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream >>>>>>>> (withWindowedWrites, hourly windows, numShards=4). >>>>>>>> >>>>>>>> I would like to partition the stream by some key in the element, so >>>>>>>> that all elements with the same key will get processed by the same >>>>>>>> shard >>>>>>>> writer, and therefore written to the same file. Is there a way to do >>>>>>>> this? >>>>>>>> Note that in my stream the number of keys is very large (most elements >>>>>>>> have >>>>>>>> a unique key, while a few elements share a key). >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Josh >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >