Re: Shuffling on shardnum, is it necessary?

2019-09-27 Thread Shannon Duncan
Yes we do have a use case for specifying number of shards, but
unfortunately I can't share it with the group.

Shannon

On Fri, Sep 27, 2019 at 2:14 PM Reuven Lax  wrote:

> Is there a reason that you need to explicitly specify the number of
> shards? If you don't, then this extra shuffle will not be performed.
>
> Reuven
>
> On Fri, Sep 27, 2019 at 12:12 PM Shannon Duncan <
> joseph.dun...@liveramp.com> wrote:
>
>> Interesting. Right now we are only doing batch processing so I hadn't
>> thought about the windowing aspect.
>>
>> On Fri, Sep 27, 2019 at 12:10 PM Reuven Lax  wrote:
>>
>>> Are you doing this in streaming with windowed writes? Window grouping
>>> does not "happen" in Beam until a GroupByKey, so you do need the GroupByKey
>>> in that case.
>>>
>>> If you are not windowing but want a specific number of shards (though
>>> the general suggestion in that case is to not pick a specific number of
>>> shards, but let the runner pick it for you), your approach could work.
>>> However the implementation would be more complicated than you suggest. The
>>> problem is that every file writer has a buffer, and when you force many of
>>> them to be in memory in a map you risk running out of memory. If you look
>>> at the spilledFiles code in WriteFiles.java, it was written to handle
>>> exactly this case.
>>>
>>> Reuven
>>>
>>> On Fri, Sep 27, 2019 at 8:47 AM Shannon Duncan <
>>> joseph.dun...@liveramp.com> wrote:
>>>
 Yes, Specifically TextIO withNumShards().

 On Fri, Sep 27, 2019 at 10:45 AM Reuven Lax  wrote:

> I'm not sure what you mean by "write out ot a specific shard number."
> Are you talking about FIleIO sinks?
>
> Reuven
>
> On Fri, Sep 27, 2019 at 7:41 AM Shannon Duncan <
> joseph.dun...@liveramp.com> wrote:
>
>> So when beam writes out to a specific shard number, as I understand
>> it does a few things:
>>
>> - Assigns a shard key to each record (reduces parallelism)
>> - Shuffles and Groups by the shard key to colocate all records
>> - Writes out to each shard file within a single DoFn per key...
>>
>> When thinking about this, I believe we might be able to eliminate the
>> GroupByKey to go ahead and write out to each file with its records with
>> only a DoFn after the shard key is assigned.
>>
>> As long as the shard key is the actual key of the PCollection, then
>> could we use a state variable to force all keys that are the same to
>> process to share state with each other?
>>
>> On a DoFn can we use the setup to hold a Map of files being written
>> to within bundles on that instance, and on teardown can we close all 
>> files
>> within the map?
>>
>> If this is the case does it reduce the need for a shuffle and allow a
>> DoFn to safely write out in append mode to a file, batch, etc held in
>> state?
>>
>> It doesn't really decrease parallelism after the key is assigned
>> since it can parallelize over each key within its state window. Which is
>> the same level of parallelism we achieve by doing a GroupByKey and doing 
>> a
>> for loop over the result. So performance shouldn't be impacted if this
>> holds true.
>>
>> It's kind of like combining both the shuffle and the data write in
>> the same step?
>>
>> This does however have a significant cost reduction by eliminating a
>> compute based shuffle and also eliminating a Dataflow shuffle service 
>> call
>> if shuffle service is enabled.
>>
>> Thoughts?
>>
>> Thanks,
>> Shannon Duncan
>>
>


Re: Shuffling on shardnum, is it necessary?

2019-09-27 Thread Reuven Lax
Is there a reason that you need to explicitly specify the number of shards?
If you don't, then this extra shuffle will not be performed.

Reuven

On Fri, Sep 27, 2019 at 12:12 PM Shannon Duncan 
wrote:

> Interesting. Right now we are only doing batch processing so I hadn't
> thought about the windowing aspect.
>
> On Fri, Sep 27, 2019 at 12:10 PM Reuven Lax  wrote:
>
>> Are you doing this in streaming with windowed writes? Window grouping
>> does not "happen" in Beam until a GroupByKey, so you do need the GroupByKey
>> in that case.
>>
>> If you are not windowing but want a specific number of shards (though the
>> general suggestion in that case is to not pick a specific number of shards,
>> but let the runner pick it for you), your approach could work. However the
>> implementation would be more complicated than you suggest. The problem is
>> that every file writer has a buffer, and when you force many of them to be
>> in memory in a map you risk running out of memory. If you look at the
>> spilledFiles code in WriteFiles.java, it was written to handle exactly this
>> case.
>>
>> Reuven
>>
>> On Fri, Sep 27, 2019 at 8:47 AM Shannon Duncan <
>> joseph.dun...@liveramp.com> wrote:
>>
>>> Yes, Specifically TextIO withNumShards().
>>>
>>> On Fri, Sep 27, 2019 at 10:45 AM Reuven Lax  wrote:
>>>
 I'm not sure what you mean by "write out ot a specific shard number."
 Are you talking about FIleIO sinks?

 Reuven

 On Fri, Sep 27, 2019 at 7:41 AM Shannon Duncan <
 joseph.dun...@liveramp.com> wrote:

> So when beam writes out to a specific shard number, as I understand it
> does a few things:
>
> - Assigns a shard key to each record (reduces parallelism)
> - Shuffles and Groups by the shard key to colocate all records
> - Writes out to each shard file within a single DoFn per key...
>
> When thinking about this, I believe we might be able to eliminate the
> GroupByKey to go ahead and write out to each file with its records with
> only a DoFn after the shard key is assigned.
>
> As long as the shard key is the actual key of the PCollection, then
> could we use a state variable to force all keys that are the same to
> process to share state with each other?
>
> On a DoFn can we use the setup to hold a Map of files being written to
> within bundles on that instance, and on teardown can we close all files
> within the map?
>
> If this is the case does it reduce the need for a shuffle and allow a
> DoFn to safely write out in append mode to a file, batch, etc held in
> state?
>
> It doesn't really decrease parallelism after the key is assigned since
> it can parallelize over each key within its state window. Which is the 
> same
> level of parallelism we achieve by doing a GroupByKey and doing a for loop
> over the result. So performance shouldn't be impacted if this holds true.
>
> It's kind of like combining both the shuffle and the data write in the
> same step?
>
> This does however have a significant cost reduction by eliminating a
> compute based shuffle and also eliminating a Dataflow shuffle service call
> if shuffle service is enabled.
>
> Thoughts?
>
> Thanks,
> Shannon Duncan
>



Re: Shuffling on shardnum, is it necessary?

2019-09-27 Thread Shannon Duncan
Interesting. Right now we are only doing batch processing so I hadn't
thought about the windowing aspect.

On Fri, Sep 27, 2019 at 12:10 PM Reuven Lax  wrote:

> Are you doing this in streaming with windowed writes? Window grouping does
> not "happen" in Beam until a GroupByKey, so you do need the GroupByKey in
> that case.
>
> If you are not windowing but want a specific number of shards (though the
> general suggestion in that case is to not pick a specific number of shards,
> but let the runner pick it for you), your approach could work. However the
> implementation would be more complicated than you suggest. The problem is
> that every file writer has a buffer, and when you force many of them to be
> in memory in a map you risk running out of memory. If you look at the
> spilledFiles code in WriteFiles.java, it was written to handle exactly this
> case.
>
> Reuven
>
> On Fri, Sep 27, 2019 at 8:47 AM Shannon Duncan 
> wrote:
>
>> Yes, Specifically TextIO withNumShards().
>>
>> On Fri, Sep 27, 2019 at 10:45 AM Reuven Lax  wrote:
>>
>>> I'm not sure what you mean by "write out ot a specific shard number."
>>> Are you talking about FIleIO sinks?
>>>
>>> Reuven
>>>
>>> On Fri, Sep 27, 2019 at 7:41 AM Shannon Duncan <
>>> joseph.dun...@liveramp.com> wrote:
>>>
 So when beam writes out to a specific shard number, as I understand it
 does a few things:

 - Assigns a shard key to each record (reduces parallelism)
 - Shuffles and Groups by the shard key to colocate all records
 - Writes out to each shard file within a single DoFn per key...

 When thinking about this, I believe we might be able to eliminate the
 GroupByKey to go ahead and write out to each file with its records with
 only a DoFn after the shard key is assigned.

 As long as the shard key is the actual key of the PCollection, then
 could we use a state variable to force all keys that are the same to
 process to share state with each other?

 On a DoFn can we use the setup to hold a Map of files being written to
 within bundles on that instance, and on teardown can we close all files
 within the map?

 If this is the case does it reduce the need for a shuffle and allow a
 DoFn to safely write out in append mode to a file, batch, etc held in
 state?

 It doesn't really decrease parallelism after the key is assigned since
 it can parallelize over each key within its state window. Which is the same
 level of parallelism we achieve by doing a GroupByKey and doing a for loop
 over the result. So performance shouldn't be impacted if this holds true.

 It's kind of like combining both the shuffle and the data write in the
 same step?

 This does however have a significant cost reduction by eliminating a
 compute based shuffle and also eliminating a Dataflow shuffle service call
 if shuffle service is enabled.

 Thoughts?

 Thanks,
 Shannon Duncan

>>>


Re: Shuffling on shardnum, is it necessary?

2019-09-27 Thread Reuven Lax
Are you doing this in streaming with windowed writes? Window grouping does
not "happen" in Beam until a GroupByKey, so you do need the GroupByKey in
that case.

If you are not windowing but want a specific number of shards (though the
general suggestion in that case is to not pick a specific number of shards,
but let the runner pick it for you), your approach could work. However the
implementation would be more complicated than you suggest. The problem is
that every file writer has a buffer, and when you force many of them to be
in memory in a map you risk running out of memory. If you look at the
spilledFiles code in WriteFiles.java, it was written to handle exactly this
case.

Reuven

On Fri, Sep 27, 2019 at 8:47 AM Shannon Duncan 
wrote:

> Yes, Specifically TextIO withNumShards().
>
> On Fri, Sep 27, 2019 at 10:45 AM Reuven Lax  wrote:
>
>> I'm not sure what you mean by "write out ot a specific shard number." Are
>> you talking about FIleIO sinks?
>>
>> Reuven
>>
>> On Fri, Sep 27, 2019 at 7:41 AM Shannon Duncan <
>> joseph.dun...@liveramp.com> wrote:
>>
>>> So when beam writes out to a specific shard number, as I understand it
>>> does a few things:
>>>
>>> - Assigns a shard key to each record (reduces parallelism)
>>> - Shuffles and Groups by the shard key to colocate all records
>>> - Writes out to each shard file within a single DoFn per key...
>>>
>>> When thinking about this, I believe we might be able to eliminate the
>>> GroupByKey to go ahead and write out to each file with its records with
>>> only a DoFn after the shard key is assigned.
>>>
>>> As long as the shard key is the actual key of the PCollection, then
>>> could we use a state variable to force all keys that are the same to
>>> process to share state with each other?
>>>
>>> On a DoFn can we use the setup to hold a Map of files being written to
>>> within bundles on that instance, and on teardown can we close all files
>>> within the map?
>>>
>>> If this is the case does it reduce the need for a shuffle and allow a
>>> DoFn to safely write out in append mode to a file, batch, etc held in
>>> state?
>>>
>>> It doesn't really decrease parallelism after the key is assigned since
>>> it can parallelize over each key within its state window. Which is the same
>>> level of parallelism we achieve by doing a GroupByKey and doing a for loop
>>> over the result. So performance shouldn't be impacted if this holds true.
>>>
>>> It's kind of like combining both the shuffle and the data write in the
>>> same step?
>>>
>>> This does however have a significant cost reduction by eliminating a
>>> compute based shuffle and also eliminating a Dataflow shuffle service call
>>> if shuffle service is enabled.
>>>
>>> Thoughts?
>>>
>>> Thanks,
>>> Shannon Duncan
>>>
>>


Re: Shuffling on shardnum, is it necessary?

2019-09-27 Thread Lukasz Cwik
Using a state variable to store the shard key introduces a GroupByKey
within Dataflow to ensure that there is a strict ordering on state. Other
runners insert similar materializations to guarantee this as well.

Also a sufficiently powerful enough execution engine could do state
processing for the same key in parallel as long as they were able to
resolve state write conflicts.

On Fri, Sep 27, 2019 at 8:47 AM Shannon Duncan 
wrote:

> Yes, Specifically TextIO withNumShards().
>
> On Fri, Sep 27, 2019 at 10:45 AM Reuven Lax  wrote:
>
>> I'm not sure what you mean by "write out ot a specific shard number." Are
>> you talking about FIleIO sinks?
>>
>> Reuven
>>
>> On Fri, Sep 27, 2019 at 7:41 AM Shannon Duncan <
>> joseph.dun...@liveramp.com> wrote:
>>
>>> So when beam writes out to a specific shard number, as I understand it
>>> does a few things:
>>>
>>> - Assigns a shard key to each record (reduces parallelism)
>>> - Shuffles and Groups by the shard key to colocate all records
>>> - Writes out to each shard file within a single DoFn per key...
>>>
>>> When thinking about this, I believe we might be able to eliminate the
>>> GroupByKey to go ahead and write out to each file with its records with
>>> only a DoFn after the shard key is assigned.
>>>
>>> As long as the shard key is the actual key of the PCollection, then
>>> could we use a state variable to force all keys that are the same to
>>> process to share state with each other?
>>>
>>> On a DoFn can we use the setup to hold a Map of files being written to
>>> within bundles on that instance, and on teardown can we close all files
>>> within the map?
>>>
>>> If this is the case does it reduce the need for a shuffle and allow a
>>> DoFn to safely write out in append mode to a file, batch, etc held in
>>> state?
>>>
>>> It doesn't really decrease parallelism after the key is assigned since
>>> it can parallelize over each key within its state window. Which is the same
>>> level of parallelism we achieve by doing a GroupByKey and doing a for loop
>>> over the result. So performance shouldn't be impacted if this holds true.
>>>
>>> It's kind of like combining both the shuffle and the data write in the
>>> same step?
>>>
>>> This does however have a significant cost reduction by eliminating a
>>> compute based shuffle and also eliminating a Dataflow shuffle service call
>>> if shuffle service is enabled.
>>>
>>> Thoughts?
>>>
>>> Thanks,
>>> Shannon Duncan
>>>
>>


Re: Shuffling on shardnum, is it necessary?

2019-09-27 Thread Shannon Duncan
Yes, Specifically TextIO withNumShards().

On Fri, Sep 27, 2019 at 10:45 AM Reuven Lax  wrote:

> I'm not sure what you mean by "write out ot a specific shard number." Are
> you talking about FIleIO sinks?
>
> Reuven
>
> On Fri, Sep 27, 2019 at 7:41 AM Shannon Duncan 
> wrote:
>
>> So when beam writes out to a specific shard number, as I understand it
>> does a few things:
>>
>> - Assigns a shard key to each record (reduces parallelism)
>> - Shuffles and Groups by the shard key to colocate all records
>> - Writes out to each shard file within a single DoFn per key...
>>
>> When thinking about this, I believe we might be able to eliminate the
>> GroupByKey to go ahead and write out to each file with its records with
>> only a DoFn after the shard key is assigned.
>>
>> As long as the shard key is the actual key of the PCollection, then could
>> we use a state variable to force all keys that are the same to process to
>> share state with each other?
>>
>> On a DoFn can we use the setup to hold a Map of files being written to
>> within bundles on that instance, and on teardown can we close all files
>> within the map?
>>
>> If this is the case does it reduce the need for a shuffle and allow a
>> DoFn to safely write out in append mode to a file, batch, etc held in
>> state?
>>
>> It doesn't really decrease parallelism after the key is assigned since it
>> can parallelize over each key within its state window. Which is the same
>> level of parallelism we achieve by doing a GroupByKey and doing a for loop
>> over the result. So performance shouldn't be impacted if this holds true.
>>
>> It's kind of like combining both the shuffle and the data write in the
>> same step?
>>
>> This does however have a significant cost reduction by eliminating a
>> compute based shuffle and also eliminating a Dataflow shuffle service call
>> if shuffle service is enabled.
>>
>> Thoughts?
>>
>> Thanks,
>> Shannon Duncan
>>
>


Re: Shuffling on shardnum, is it necessary?

2019-09-27 Thread Reuven Lax
I'm not sure what you mean by "write out ot a specific shard number." Are
you talking about FIleIO sinks?

Reuven

On Fri, Sep 27, 2019 at 7:41 AM Shannon Duncan 
wrote:

> So when beam writes out to a specific shard number, as I understand it
> does a few things:
>
> - Assigns a shard key to each record (reduces parallelism)
> - Shuffles and Groups by the shard key to colocate all records
> - Writes out to each shard file within a single DoFn per key...
>
> When thinking about this, I believe we might be able to eliminate the
> GroupByKey to go ahead and write out to each file with its records with
> only a DoFn after the shard key is assigned.
>
> As long as the shard key is the actual key of the PCollection, then could
> we use a state variable to force all keys that are the same to process to
> share state with each other?
>
> On a DoFn can we use the setup to hold a Map of files being written to
> within bundles on that instance, and on teardown can we close all files
> within the map?
>
> If this is the case does it reduce the need for a shuffle and allow a DoFn
> to safely write out in append mode to a file, batch, etc held in state?
>
> It doesn't really decrease parallelism after the key is assigned since it
> can parallelize over each key within its state window. Which is the same
> level of parallelism we achieve by doing a GroupByKey and doing a for loop
> over the result. So performance shouldn't be impacted if this holds true.
>
> It's kind of like combining both the shuffle and the data write in the
> same step?
>
> This does however have a significant cost reduction by eliminating a
> compute based shuffle and also eliminating a Dataflow shuffle service call
> if shuffle service is enabled.
>
> Thoughts?
>
> Thanks,
> Shannon Duncan
>