Your code looks like what I was describing. My only comment would be to use
a deterministic hashing function which is stable across JVM versions and
JVM instances as it will help in making your pipeline consistent across
different runs/environments.

Parallelizing across 8 instances instead of 4 would break the contract
around GroupByKey (since it didn't group all the elements for a key
correctly). Also, each element is the smallest unit of work and
specifically in your pipeline you have chosen to reduce all your elements
into 4 logical elements (each containing some proportion of your original
data).

On Tue, Jun 6, 2017 at 9:37 AM, Josh <jof...@gmail.com> wrote:

> Thanks for the reply, Lukasz.
>
>
> What I meant was that I want to shard my data by a "shard key", and be
> sure that any two elements with the same "shard key" are processed by the
> same thread on the same worker. (Or if that's not possible, by the same
> worker JVM with no thread guarantee would be good enough). It doesn't
> actually matter to me whether there's 1 or 4 or 100 DoFn instances
> processing the data.
>
>
> It sounds like what you suggested will work for this, with the downside of
> me needing to choose a number of shards/DoFns (e.g. 4).
>
> It seems a bit long and messy but am I right in thinking it would look
> like this? ...
>
>
> PCollection<MyElement> elements = ...;
>
> elements
>
> .apply(MapElements
>
> .into(TypeDescriptors.kvs(TypeDescriptors.integers(),
> TypeDescriptor.of(MyElement.class)))
>
> .via((MyElement e) -> KV.of(
>
> e.getKey().toString().hashCode() % 4, e)))
>
> .apply(GroupByKey.create())
>
> .apply(Partition.of(4,
>
> (Partition.PartitionFn<KV<Integer, Iterable<MyElement>>>) (kv, i) ->
> kv.getKey()))
>
> .apply(ParDo.of(new MyDofn()));
>
> // Where MyDofn must be changed to handle a KV<Integer,
> Iterable<MyElement>> as input instead of just a MyElement
>
>
> I was wondering is there a guarantee that the runner won't parallelise the
> final MyDofn across e.g. 8 instances instead of 4? If there are two input
> elements with the same key are they actually guaranteed to be processed on
> the same instance?
>
>
> Thanks,
>
> Josh
>
>
>
>
> On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik <lc...@google.com> wrote:
>
>> I think this is what your asking for but your statement about 4 instances
>> is unclear as to whether that is 4 copies of the same DoFn or 4 completely
>> different DoFns. Also its unclear what you mean by instance/thread, I'm
>> assuming that you want at most 4 instances of a DoFn each being processed
>> by a single thread.
>>
>> This is a bad idea because you limit your parallelism but this is similar
>> to what the default file sharding logic does. In Apache Beam the smallest
>> unit of output for a GroupByKey is a single key+iterable pair. We exploit
>> this by assigning all our values to a fixed number of keys and then
>> performing a GroupByKey. This is the same trick that powers the file
>> sharding logic in AvroIO/TextIO/...
>>
>> Your pipeline would look like (fixed width font diagram):
>> your data      -> apply shard key       -> GroupByKey        -> partition
>> by key -> your dofn #1
>>
>>        \> your dofn #2
>>
>>        \> ...
>> a  / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ???
>>
>> This is not exactly the same as processing a single DoFn instance/thread
>> because it relies on the Runner to be able to schedule each key to be
>> processed on a different machine. For example a Runner may choose to
>> process value 1,[a,c] and 2,[b,d] sequentially on the same machine or may
>> choose to distribute them.
>>
>>
>>
>> On Tue, Jun 6, 2017 at 8:13 AM, Josh <jof...@gmail.com> wrote:
>>
>>> Hey Lukasz,
>>>
>>> I have a follow up question about this -
>>>
>>> What if I want to do something very similar, but instead of with 4
>>> instances of AvroIO following the partition transform, I want 4 instances
>>> of a DoFn that I've written. I want to ensure that each partition is
>>> processed by a single DoFn instance/thread. Is this possible with Beam?
>>>
>>> Thanks,
>>> Josh
>>>
>>>
>>>
>>> On Wed, May 24, 2017 at 6:15 PM, Josh <jof...@gmail.com> wrote:
>>>
>>>> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!
>>>>
>>>> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> Google Cloud Dataflow won't override your setting. The dynamic
>>>>> sharding occurs if you don't explicitly set a numShard value.
>>>>>
>>>>> On Wed, May 24, 2017 at 9:14 AM, Josh <jof...@gmail.com> wrote:
>>>>>
>>>>>> Hi Lukasz,
>>>>>>
>>>>>> Thanks for the example. That sounds like a nice solution -
>>>>>> I am running on Dataflow though, which dynamically sets numShards -
>>>>>> so if I set numShards to 1 on each of those AvroIO writers, I can't be 
>>>>>> sure
>>>>>> that Dataflow isn't going to override my setting right? I guess this 
>>>>>> should
>>>>>> work fine as long as I partition my stream into a large enough number of
>>>>>> partitions so that Dataflow won't override numShards.
>>>>>>
>>>>>> Josh
>>>>>>
>>>>>>
>>>>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Since your using a small number of shards, add a Partition transform
>>>>>>> which uses a deterministic hash of the key to choose one of 4 
>>>>>>> partitions.
>>>>>>> Write each partition with a single shard.
>>>>>>>
>>>>>>> (Fixed width diagram below)
>>>>>>> Pipeline -> AvroIO(numShards = 4)
>>>>>>> Becomes:
>>>>>>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>                       \-> AvroIO(numShards = 1)
>>>>>>>
>>>>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jof...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
>>>>>>>> (withWindowedWrites, hourly windows, numShards=4).
>>>>>>>>
>>>>>>>> I would like to partition the stream by some key in the element, so
>>>>>>>> that all elements with the same key will get processed by the same 
>>>>>>>> shard
>>>>>>>> writer, and therefore written to the same file. Is there a way to do 
>>>>>>>> this?
>>>>>>>> Note that in my stream the number of keys is very large (most elements 
>>>>>>>> have
>>>>>>>> a unique key, while a few elements share a key).
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Josh
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to