Thanks for the reply, Lukasz.

What I meant was that I want to shard my data by a "shard key", and be sure
that any two elements with the same "shard key" are processed by the same
thread on the same worker. (Or if that's not possible, by the same worker
JVM with no thread guarantee would be good enough). It doesn't actually
matter to me whether there's 1 or 4 or 100 DoFn instances processing the
data.


It sounds like what you suggested will work for this, with the downside of
me needing to choose a number of shards/DoFns (e.g. 4).

It seems a bit long and messy but am I right in thinking it would look like
this? ...


PCollection<MyElement> elements = ...;

elements

.apply(MapElements

.into(TypeDescriptors.kvs(TypeDescriptors.integers(),
TypeDescriptor.of(MyElement.class)))

.via((MyElement e) -> KV.of(

e.getKey().toString().hashCode() % 4, e)))

.apply(GroupByKey.create())

.apply(Partition.of(4,

(Partition.PartitionFn<KV<Integer, Iterable<MyElement>>>) (kv, i) ->
kv.getKey()))

.apply(ParDo.of(new MyDofn()));

// Where MyDofn must be changed to handle a KV<Integer,
Iterable<MyElement>> as input instead of just a MyElement


I was wondering is there a guarantee that the runner won't parallelise the
final MyDofn across e.g. 8 instances instead of 4? If there are two input
elements with the same key are they actually guaranteed to be processed on
the same instance?


Thanks,

Josh




On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik <lc...@google.com> wrote:

> I think this is what your asking for but your statement about 4 instances
> is unclear as to whether that is 4 copies of the same DoFn or 4 completely
> different DoFns. Also its unclear what you mean by instance/thread, I'm
> assuming that you want at most 4 instances of a DoFn each being processed
> by a single thread.
>
> This is a bad idea because you limit your parallelism but this is similar
> to what the default file sharding logic does. In Apache Beam the smallest
> unit of output for a GroupByKey is a single key+iterable pair. We exploit
> this by assigning all our values to a fixed number of keys and then
> performing a GroupByKey. This is the same trick that powers the file
> sharding logic in AvroIO/TextIO/...
>
> Your pipeline would look like (fixed width font diagram):
> your data      -> apply shard key       -> GroupByKey        -> partition
> by key -> your dofn #1
>
>        \> your dofn #2
>
>        \> ...
> a  / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] -> ???
>
> This is not exactly the same as processing a single DoFn instance/thread
> because it relies on the Runner to be able to schedule each key to be
> processed on a different machine. For example a Runner may choose to
> process value 1,[a,c] and 2,[b,d] sequentially on the same machine or may
> choose to distribute them.
>
>
>
> On Tue, Jun 6, 2017 at 8:13 AM, Josh <jof...@gmail.com> wrote:
>
>> Hey Lukasz,
>>
>> I have a follow up question about this -
>>
>> What if I want to do something very similar, but instead of with 4
>> instances of AvroIO following the partition transform, I want 4 instances
>> of a DoFn that I've written. I want to ensure that each partition is
>> processed by a single DoFn instance/thread. Is this possible with Beam?
>>
>> Thanks,
>> Josh
>>
>>
>>
>> On Wed, May 24, 2017 at 6:15 PM, Josh <jof...@gmail.com> wrote:
>>
>>> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!
>>>
>>> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> Google Cloud Dataflow won't override your setting. The dynamic sharding
>>>> occurs if you don't explicitly set a numShard value.
>>>>
>>>> On Wed, May 24, 2017 at 9:14 AM, Josh <jof...@gmail.com> wrote:
>>>>
>>>>> Hi Lukasz,
>>>>>
>>>>> Thanks for the example. That sounds like a nice solution -
>>>>> I am running on Dataflow though, which dynamically sets numShards - so
>>>>> if I set numShards to 1 on each of those AvroIO writers, I can't be sure
>>>>> that Dataflow isn't going to override my setting right? I guess this 
>>>>> should
>>>>> work fine as long as I partition my stream into a large enough number of
>>>>> partitions so that Dataflow won't override numShards.
>>>>>
>>>>> Josh
>>>>>
>>>>>
>>>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> Since your using a small number of shards, add a Partition transform
>>>>>> which uses a deterministic hash of the key to choose one of 4 partitions.
>>>>>> Write each partition with a single shard.
>>>>>>
>>>>>> (Fixed width diagram below)
>>>>>> Pipeline -> AvroIO(numShards = 4)
>>>>>> Becomes:
>>>>>> Pipeline -> Partition --> AvroIO(numShards = 1)
>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>                       \-> AvroIO(numShards = 1)
>>>>>>
>>>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jof...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am using a FileBasedSink (AvroIO.write) on an unbounded stream
>>>>>>> (withWindowedWrites, hourly windows, numShards=4).
>>>>>>>
>>>>>>> I would like to partition the stream by some key in the element, so
>>>>>>> that all elements with the same key will get processed by the same shard
>>>>>>> writer, and therefore written to the same file. Is there a way to do 
>>>>>>> this?
>>>>>>> Note that in my stream the number of keys is very large (most elements 
>>>>>>> have
>>>>>>> a unique key, while a few elements share a key).
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Josh
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to