Hmm ok, I don't quite get why what I want to do isn't supported in Beam ... I don't actually have a limited parallelism requirement, I just want to be able to partition my unbounded stream by a key determined from the elements, so that any two elements with the same key will be routed to the same worker. I want to do this because my DoFn keeps some in-memory cached state for each key (which I was planning to store at either DoFn or JVM level). Does this sound like a bad idea?
> On 6 Jun 2017, at 19:14, Lukasz Cwik <lc...@google.com> wrote: > > Your right, the window acts as a secondary key within GroupByKey > (KeyA,Window1 != KeyA,Window2), which means that each of those two composite > keys can be scheduled to execute at the same time. > > At this point I think you should challenge your limited parallelism > requirement as you'll need to build something outside of Apache Beam to > provide these parallelization limits across windows (e.g. lock within the > same process when limiting yourself to a single machine, distributed lock > service when dealing with multiple machines). > > The backlog of data is either going to grow infinitely at the GroupByKey or > grow infinitely at the source if your pipeline can't keep up. It is up to the > Runner to be smart and not produce a giant backlog at the GroupByKey since it > knows how fast work is being completed (unfortunately I don't know if any > Runner is this smart yet to push the backlog up to the source). > >> On Tue, Jun 6, 2017 at 11:03 AM, Josh <jof...@gmail.com> wrote: >> I see, thanks for the tips! >> >> Last question about this! How could this be adapted to work in a >> unbounded/streaming job? To work in an unbounded job, I need to put a >> Window.into with a trigger before GroupByKey. >> I guess this would mean that the "shard gets processed by a single thread in >> MyDofn" guarantee will only apply to messages within a single window, and >> would not apply across windows? >> If this is the case, is there a better solution? I would like to avoid >> buffering data in windows, and want the shard guarantee to apply across >> windows. >> >> >> >>> On Tue, Jun 6, 2017 at 5:42 PM, Lukasz Cwik <lc...@google.com> wrote: >>> Your code looks like what I was describing. My only comment would be to use >>> a deterministic hashing function which is stable across JVM versions and >>> JVM instances as it will help in making your pipeline consistent across >>> different runs/environments. >>> >>> Parallelizing across 8 instances instead of 4 would break the contract >>> around GroupByKey (since it didn't group all the elements for a key >>> correctly). Also, each element is the smallest unit of work and >>> specifically in your pipeline you have chosen to reduce all your elements >>> into 4 logical elements (each containing some proportion of your original >>> data). >>> >>>> On Tue, Jun 6, 2017 at 9:37 AM, Josh <jof...@gmail.com> wrote: >>>> Thanks for the reply, Lukasz. >>>> >>>> What I meant was that I want to shard my data by a "shard key", and be >>>> sure that any two elements with the same "shard key" are processed by the >>>> same thread on the same worker. (Or if that's not possible, by the same >>>> worker JVM with no thread guarantee would be good enough). It doesn't >>>> actually matter to me whether there's 1 or 4 or 100 DoFn instances >>>> processing the data. >>>> >>>> It sounds like what you suggested will work for this, with the downside of >>>> me needing to choose a number of shards/DoFns (e.g. 4). >>>> It seems a bit long and messy but am I right in thinking it would look >>>> like this? ... >>>> >>>> PCollection<MyElement> elements = ...; >>>> elements >>>> .apply(MapElements >>>> .into(TypeDescriptors.kvs(TypeDescriptors.integers(), >>>> TypeDescriptor.of(MyElement.class))) >>>> .via((MyElement e) -> KV.of( >>>> e.getKey().toString().hashCode() % 4, e))) >>>> .apply(GroupByKey.create()) >>>> .apply(Partition.of(4, >>>> (Partition.PartitionFn<KV<Integer, Iterable<MyElement>>>) >>>> (kv, i) -> kv.getKey())) >>>> .apply(ParDo.of(new MyDofn())); >>>> // Where MyDofn must be changed to handle a KV<Integer, >>>> Iterable<MyElement>> as input instead of just a MyElement >>>> >>>> I was wondering is there a guarantee that the runner won't parallelise the >>>> final MyDofn across e.g. 8 instances instead of 4? If there are two input >>>> elements with the same key are they actually guaranteed to be processed on >>>> the same instance? >>>> >>>> Thanks, >>>> Josh >>>> >>>> >>>> >>>>> On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik <lc...@google.com> wrote: >>>>> I think this is what your asking for but your statement about 4 instances >>>>> is unclear as to whether that is 4 copies of the same DoFn or 4 >>>>> completely different DoFns. Also its unclear what you mean by >>>>> instance/thread, I'm assuming that you want at most 4 instances of a DoFn >>>>> each being processed by a single thread. >>>>> >>>>> This is a bad idea because you limit your parallelism but this is similar >>>>> to what the default file sharding logic does. In Apache Beam the smallest >>>>> unit of output for a GroupByKey is a single key+iterable pair. We exploit >>>>> this by assigning all our values to a fixed number of keys and then >>>>> performing a GroupByKey. This is the same trick that powers the file >>>>> sharding logic in AvroIO/TextIO/... >>>>> >>>>> Your pipeline would look like (fixed width font diagram): >>>>> your data -> apply shard key -> GroupByKey -> partition >>>>> by key -> your dofn #1 >>>>> >>>>> \> your dofn #2 >>>>> >>>>> \> ... >>>>> a / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ??? >>>>> >>>>> This is not exactly the same as processing a single DoFn instance/thread >>>>> because it relies on the Runner to be able to schedule each key to be >>>>> processed on a different machine. For example a Runner may choose to >>>>> process value 1,[a,c] and 2,[b,d] sequentially on the same machine or may >>>>> choose to distribute them. >>>>> >>>>> >>>>> >>>>>> On Tue, Jun 6, 2017 at 8:13 AM, Josh <jof...@gmail.com> wrote: >>>>>> Hey Lukasz, >>>>>> >>>>>> I have a follow up question about this - >>>>>> >>>>>> What if I want to do something very similar, but instead of with 4 >>>>>> instances of AvroIO following the partition transform, I want 4 >>>>>> instances of a DoFn that I've written. I want to ensure that each >>>>>> partition is processed by a single DoFn instance/thread. Is this >>>>>> possible with Beam? >>>>>> >>>>>> Thanks, >>>>>> Josh >>>>>> >>>>>> >>>>>> >>>>>>> On Wed, May 24, 2017 at 6:15 PM, Josh <jof...@gmail.com> wrote: >>>>>>> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz! >>>>>>> >>>>>>>> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lc...@google.com> wrote: >>>>>>>> Google Cloud Dataflow won't override your setting. The dynamic >>>>>>>> sharding occurs if you don't explicitly set a numShard value. >>>>>>>> >>>>>>>>> On Wed, May 24, 2017 at 9:14 AM, Josh <jof...@gmail.com> wrote: >>>>>>>>> Hi Lukasz, >>>>>>>>> >>>>>>>>> Thanks for the example. That sounds like a nice solution - >>>>>>>>> I am running on Dataflow though, which dynamically sets numShards - >>>>>>>>> so if I set numShards to 1 on each of those AvroIO writers, I can't >>>>>>>>> be sure that Dataflow isn't going to override my setting right? I >>>>>>>>> guess this should work fine as long as I partition my stream into a >>>>>>>>> large enough number of partitions so that Dataflow won't override >>>>>>>>> numShards. >>>>>>>>> >>>>>>>>> Josh >>>>>>>>> >>>>>>>>> >>>>>>>>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> Since your using a small number of shards, add a Partition transform >>>>>>>>>> which uses a deterministic hash of the key to choose one of 4 >>>>>>>>>> partitions. Write each partition with a single shard. >>>>>>>>>> >>>>>>>>>> (Fixed width diagram below) >>>>>>>>>> Pipeline -> AvroIO(numShards = 4) >>>>>>>>>> Becomes: >>>>>>>>>> Pipeline -> Partition --> AvroIO(numShards = 1) >>>>>>>>>> |-> AvroIO(numShards = 1) >>>>>>>>>> |-> AvroIO(numShards = 1) >>>>>>>>>> \-> AvroIO(numShards = 1) >>>>>>>>>> >>>>>>>>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jof...@gmail.com> wrote: >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream >>>>>>>>>>> (withWindowedWrites, hourly windows, numShards=4). >>>>>>>>>>> >>>>>>>>>>> I would like to partition the stream by some key in the element, so >>>>>>>>>>> that all elements with the same key will get processed by the same >>>>>>>>>>> shard writer, and therefore written to the same file. Is there a >>>>>>>>>>> way to do this? Note that in my stream the number of keys is very >>>>>>>>>>> large (most elements have a unique key, while a few elements share >>>>>>>>>>> a key). >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Josh >