Re: Shuffling on shardnum, is it necessary?

2019-09-27 Thread Shannon Duncan
Yes we do have a use case for specifying number of shards, but
unfortunately I can't share it with the group.

Shannon

On Fri, Sep 27, 2019 at 2:14 PM Reuven Lax  wrote:

> Is there a reason that you need to explicitly specify the number of
> shards? If you don't, then this extra shuffle will not be performed.
>
> Reuven
>
> On Fri, Sep 27, 2019 at 12:12 PM Shannon Duncan <
> joseph.dun...@liveramp.com> wrote:
>
>> Interesting. Right now we are only doing batch processing so I hadn't
>> thought about the windowing aspect.
>>
>> On Fri, Sep 27, 2019 at 12:10 PM Reuven Lax  wrote:
>>
>>> Are you doing this in streaming with windowed writes? Window grouping
>>> does not "happen" in Beam until a GroupByKey, so you do need the GroupByKey
>>> in that case.
>>>
>>> If you are not windowing but want a specific number of shards (though
>>> the general suggestion in that case is to not pick a specific number of
>>> shards, but let the runner pick it for you), your approach could work.
>>> However the implementation would be more complicated than you suggest. The
>>> problem is that every file writer has a buffer, and when you force many of
>>> them to be in memory in a map you risk running out of memory. If you look
>>> at the spilledFiles code in WriteFiles.java, it was written to handle
>>> exactly this case.
>>>
>>> Reuven
>>>
>>> On Fri, Sep 27, 2019 at 8:47 AM Shannon Duncan <
>>> joseph.dun...@liveramp.com> wrote:
>>>
>>>> Yes, Specifically TextIO withNumShards().
>>>>
>>>> On Fri, Sep 27, 2019 at 10:45 AM Reuven Lax  wrote:
>>>>
>>>>> I'm not sure what you mean by "write out ot a specific shard number."
>>>>> Are you talking about FIleIO sinks?
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Fri, Sep 27, 2019 at 7:41 AM Shannon Duncan <
>>>>> joseph.dun...@liveramp.com> wrote:
>>>>>
>>>>>> So when beam writes out to a specific shard number, as I understand
>>>>>> it does a few things:
>>>>>>
>>>>>> - Assigns a shard key to each record (reduces parallelism)
>>>>>> - Shuffles and Groups by the shard key to colocate all records
>>>>>> - Writes out to each shard file within a single DoFn per key...
>>>>>>
>>>>>> When thinking about this, I believe we might be able to eliminate the
>>>>>> GroupByKey to go ahead and write out to each file with its records with
>>>>>> only a DoFn after the shard key is assigned.
>>>>>>
>>>>>> As long as the shard key is the actual key of the PCollection, then
>>>>>> could we use a state variable to force all keys that are the same to
>>>>>> process to share state with each other?
>>>>>>
>>>>>> On a DoFn can we use the setup to hold a Map of files being written
>>>>>> to within bundles on that instance, and on teardown can we close all 
>>>>>> files
>>>>>> within the map?
>>>>>>
>>>>>> If this is the case does it reduce the need for a shuffle and allow a
>>>>>> DoFn to safely write out in append mode to a file, batch, etc held in
>>>>>> state?
>>>>>>
>>>>>> It doesn't really decrease parallelism after the key is assigned
>>>>>> since it can parallelize over each key within its state window. Which is
>>>>>> the same level of parallelism we achieve by doing a GroupByKey and doing 
>>>>>> a
>>>>>> for loop over the result. So performance shouldn't be impacted if this
>>>>>> holds true.
>>>>>>
>>>>>> It's kind of like combining both the shuffle and the data write in
>>>>>> the same step?
>>>>>>
>>>>>> This does however have a significant cost reduction by eliminating a
>>>>>> compute based shuffle and also eliminating a Dataflow shuffle service 
>>>>>> call
>>>>>> if shuffle service is enabled.
>>>>>>
>>>>>> Thoughts?
>>>>>>
>>>>>> Thanks,
>>>>>> Shannon Duncan
>>>>>>
>>>>>


Re: Shuffling on shardnum, is it necessary?

2019-09-27 Thread Shannon Duncan
Interesting. Right now we are only doing batch processing so I hadn't
thought about the windowing aspect.

On Fri, Sep 27, 2019 at 12:10 PM Reuven Lax  wrote:

> Are you doing this in streaming with windowed writes? Window grouping does
> not "happen" in Beam until a GroupByKey, so you do need the GroupByKey in
> that case.
>
> If you are not windowing but want a specific number of shards (though the
> general suggestion in that case is to not pick a specific number of shards,
> but let the runner pick it for you), your approach could work. However the
> implementation would be more complicated than you suggest. The problem is
> that every file writer has a buffer, and when you force many of them to be
> in memory in a map you risk running out of memory. If you look at the
> spilledFiles code in WriteFiles.java, it was written to handle exactly this
> case.
>
> Reuven
>
> On Fri, Sep 27, 2019 at 8:47 AM Shannon Duncan 
> wrote:
>
>> Yes, Specifically TextIO withNumShards().
>>
>> On Fri, Sep 27, 2019 at 10:45 AM Reuven Lax  wrote:
>>
>>> I'm not sure what you mean by "write out ot a specific shard number."
>>> Are you talking about FIleIO sinks?
>>>
>>> Reuven
>>>
>>> On Fri, Sep 27, 2019 at 7:41 AM Shannon Duncan <
>>> joseph.dun...@liveramp.com> wrote:
>>>
>>>> So when beam writes out to a specific shard number, as I understand it
>>>> does a few things:
>>>>
>>>> - Assigns a shard key to each record (reduces parallelism)
>>>> - Shuffles and Groups by the shard key to colocate all records
>>>> - Writes out to each shard file within a single DoFn per key...
>>>>
>>>> When thinking about this, I believe we might be able to eliminate the
>>>> GroupByKey to go ahead and write out to each file with its records with
>>>> only a DoFn after the shard key is assigned.
>>>>
>>>> As long as the shard key is the actual key of the PCollection, then
>>>> could we use a state variable to force all keys that are the same to
>>>> process to share state with each other?
>>>>
>>>> On a DoFn can we use the setup to hold a Map of files being written to
>>>> within bundles on that instance, and on teardown can we close all files
>>>> within the map?
>>>>
>>>> If this is the case does it reduce the need for a shuffle and allow a
>>>> DoFn to safely write out in append mode to a file, batch, etc held in
>>>> state?
>>>>
>>>> It doesn't really decrease parallelism after the key is assigned since
>>>> it can parallelize over each key within its state window. Which is the same
>>>> level of parallelism we achieve by doing a GroupByKey and doing a for loop
>>>> over the result. So performance shouldn't be impacted if this holds true.
>>>>
>>>> It's kind of like combining both the shuffle and the data write in the
>>>> same step?
>>>>
>>>> This does however have a significant cost reduction by eliminating a
>>>> compute based shuffle and also eliminating a Dataflow shuffle service call
>>>> if shuffle service is enabled.
>>>>
>>>> Thoughts?
>>>>
>>>> Thanks,
>>>> Shannon Duncan
>>>>
>>>


Re: Shuffling on shardnum, is it necessary?

2019-09-27 Thread Shannon Duncan
Yes, Specifically TextIO withNumShards().

On Fri, Sep 27, 2019 at 10:45 AM Reuven Lax  wrote:

> I'm not sure what you mean by "write out ot a specific shard number." Are
> you talking about FIleIO sinks?
>
> Reuven
>
> On Fri, Sep 27, 2019 at 7:41 AM Shannon Duncan 
> wrote:
>
>> So when beam writes out to a specific shard number, as I understand it
>> does a few things:
>>
>> - Assigns a shard key to each record (reduces parallelism)
>> - Shuffles and Groups by the shard key to colocate all records
>> - Writes out to each shard file within a single DoFn per key...
>>
>> When thinking about this, I believe we might be able to eliminate the
>> GroupByKey to go ahead and write out to each file with its records with
>> only a DoFn after the shard key is assigned.
>>
>> As long as the shard key is the actual key of the PCollection, then could
>> we use a state variable to force all keys that are the same to process to
>> share state with each other?
>>
>> On a DoFn can we use the setup to hold a Map of files being written to
>> within bundles on that instance, and on teardown can we close all files
>> within the map?
>>
>> If this is the case does it reduce the need for a shuffle and allow a
>> DoFn to safely write out in append mode to a file, batch, etc held in
>> state?
>>
>> It doesn't really decrease parallelism after the key is assigned since it
>> can parallelize over each key within its state window. Which is the same
>> level of parallelism we achieve by doing a GroupByKey and doing a for loop
>> over the result. So performance shouldn't be impacted if this holds true.
>>
>> It's kind of like combining both the shuffle and the data write in the
>> same step?
>>
>> This does however have a significant cost reduction by eliminating a
>> compute based shuffle and also eliminating a Dataflow shuffle service call
>> if shuffle service is enabled.
>>
>> Thoughts?
>>
>> Thanks,
>> Shannon Duncan
>>
>


Re: Multiple iterations after GroupByKey with SparkRunner

2019-09-27 Thread Shannon Duncan
I see two main options here.

Create an in memory Iterable as you do your first iteration. (poor
implementation imo)

Separate your iterations as separate DoFn and call them separately with the
PCollection output from Shuffle. There are many different paths but finding
the most parallel way is probably the best.

- Shannon

On Fri, Sep 27, 2019 at 5:04 AM Jan Lukavský  wrote:

> +dev  
>
> Lukasz, why do you think that users expect to be able to iterate multiple
> times grouped elements? Besides that it obviously suggests the 'Iterable'?
> The way that spark behaves is pretty much analogous to how MapReduce used
> to work - in certain cases it calles repartitionAndSortWithinPartitions and
> then does mapPartition, which accepts Iterator - that is because internally
> it merge sorts pre sorted segments. This approach enables to GroupByKey
> data sets that are too big to fit into memory (per key).
>
> If multiple iterations should be expected by users, we probably should:
>
>  a) include that in @ValidatesRunner tests
>
>  b) store values in memory on spark, which will break for certain pipelines
>
> Because of (b) I think that it would be much better to remove this
> "expectation" and clearly document that the Iterable is not supposed to be
> iterated multiple times.
>
> Jan
> On 9/27/19 9:27 AM, Jan Lukavský wrote:
>
> I pretty much think so, because that is how Spark works. The Iterable
> inside is really an Iterator, which cannot be iterated multiple times.
>
> Jan
> On 9/27/19 2:00 AM, Lukasz Cwik wrote:
>
> Jan, in Beam users expect to be able to iterate the GBK output multiple
> times even from within the same ParDo.
> Is this something that Beam on Spark Runner never supported?
>
> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský  wrote:
>
>> Hi Gershi,
>>
>> could you please outline the pipeline you are trying to execute?
>> Basically, you cannot iterate the Iterable multiple times in single ParDo.
>> It should be possible, though, to apply multiple ParDos to output from
>> GroupByKey.
>>
>> Jan
>> On 9/26/19 3:32 PM, Gershi, Noam wrote:
>>
>> Hi,
>>
>>
>>
>> I want to iterate multiple times on the Iterable (the output of
>> GroupByKey transformation)
>>
>> When my Runner is SparkRunner, I get an exception:
>>
>>
>>
>> Caused by: java.lang.IllegalStateException: ValueIterator can't be
>> iterated more than once,otherwise there could be data lost
>>
>> at
>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>>
>> at java.lang.Iterable.spliterator(Iterable.java:101)
>>
>>
>>
>>
>>
>> I understood I can branch the pipeline after GroupByKey into multiple
>> transformation and iterate in each of them once on the Iterable.
>>
>>
>>
>> Is there a better way for that?
>>
>>
>>
>>
>>
>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi*
>>
>> Software Developer
>>
>> *T*: +972 (3) 7405718 <+972%203-740-5718>
>>
>> [image: Mail_signature_blue]
>>
>>
>>
>>


Shuffling on shardnum, is it necessary?

2019-09-27 Thread Shannon Duncan
So when beam writes out to a specific shard number, as I understand it does
a few things:

- Assigns a shard key to each record (reduces parallelism)
- Shuffles and Groups by the shard key to colocate all records
- Writes out to each shard file within a single DoFn per key...

When thinking about this, I believe we might be able to eliminate the
GroupByKey to go ahead and write out to each file with its records with
only a DoFn after the shard key is assigned.

As long as the shard key is the actual key of the PCollection, then could
we use a state variable to force all keys that are the same to process to
share state with each other?

On a DoFn can we use the setup to hold a Map of files being written to
within bundles on that instance, and on teardown can we close all files
within the map?

If this is the case does it reduce the need for a shuffle and allow a DoFn
to safely write out in append mode to a file, batch, etc held in state?

It doesn't really decrease parallelism after the key is assigned since it
can parallelize over each key within its state window. Which is the same
level of parallelism we achieve by doing a GroupByKey and doing a for loop
over the result. So performance shouldn't be impacted if this holds true.

It's kind of like combining both the shuffle and the data write in the same
step?

This does however have a significant cost reduction by eliminating a
compute based shuffle and also eliminating a Dataflow shuffle service call
if shuffle service is enabled.

Thoughts?

Thanks,
Shannon Duncan


Re: Prevent Shuffling on Writing Files

2019-09-19 Thread Shannon Duncan
As a follow up the pricing as the number of bytes written + read to the
shuffle is confirmed.

However we were able to figure out a way to lower shuffle costs and things
are right in the world again.

Thanks ya'll!
Shannon

On Wed, Sep 18, 2019 at 4:52 PM Reuven Lax  wrote:

> I believe that the Total shuffle data process counter counts the number of
> bytes written to shuffle + the number of bytes read. So if you shuffle 1GB
> of data, you should expect to see 2GB on the counter.
>
> On Wed, Sep 18, 2019 at 2:39 PM Shannon Duncan 
> wrote:
>
>> Ok just ran the job on a small input and did not specify numShards. so
>> it's literally just:
>>
>> .apply("WriteLines", TextIO.write().to(options.getOutput()));
>>
>> Output of map for join:
>> [image: image.png]
>>
>> Details of Shuffle:
>> [image: image.png]
>>
>> Reported Bytes Shuffled:
>> [image: image.png]
>>
>>
>> On Wed, Sep 18, 2019 at 4:24 PM Reuven Lax  wrote:
>>
>>>
>>>
>>> On Wed, Sep 18, 2019 at 2:12 PM Shannon Duncan <
>>> joseph.dun...@liveramp.com> wrote:
>>>
>>>> I will attempt to do without sharding (though I believe we did do a run
>>>> without shards and it incurred the extra shuffle costs).
>>>>
>>>
>>> It shouldn't. There will be a shuffle, but that shuffle should contain a
>>> small amount of data (essentially a list of filenames).
>>>
>>>>
>>>> Pipeline is simple.
>>>>
>>>> The only shuffle that is explicitly defined is the shuffle after
>>>> merging files together into a single PCollection (Flatten Transform).
>>>>
>>>> So it's a Read > Flatten > Shuffle > Map (Format) > Write. We expected
>>>> to pay for shuffles on the middle shuffle but were surprised to see that
>>>> the output data from the Flatten was quadrupled in the reflected shuffled
>>>> GB shown in Dataflow. Which lead me down this path of finding things.
>>>>
>>>> [image: image.png]
>>>>
>>>> On Wed, Sep 18, 2019 at 4:08 PM Reuven Lax  wrote:
>>>>
>>>>> In that case you should be able to leave sharding unspecified, and you
>>>>> won't incur the extra shuffle. Specifying explicit sharding is generally
>>>>> necessary only for streaming.
>>>>>
>>>>> On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan <
>>>>> joseph.dun...@liveramp.com> wrote:
>>>>>
>>>>>> batch on dataflowRunner.
>>>>>>
>>>>>> On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax  wrote:
>>>>>>
>>>>>>> Are you using streaming or batch? Also which runner are you using?
>>>>>>>
>>>>>>> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <
>>>>>>> joseph.dun...@liveramp.com> wrote:
>>>>>>>
>>>>>>>> So I followed up on why TextIO shuffles and dug into the code some.
>>>>>>>> It is using the shards and getting all the values into a keyed group to
>>>>>>>> write to a single file.
>>>>>>>>
>>>>>>>> However... I wonder if there is way to just take the records that
>>>>>>>> are on a worker and write them out. Thus not needing a shard number and
>>>>>>>> doing this. Closer to how hadoop handle's writes.
>>>>>>>>
>>>>>>>> Maybe just a regular pardo and on bundleSetup it creates a writer
>>>>>>>> and processElement reuses that writter to write to the same file for 
>>>>>>>> all
>>>>>>>> elements within a bundle?
>>>>>>>>
>>>>>>>> I feel like this goes beyond scope of simple user mailing list so
>>>>>>>> I'm expanding it to dev as well.
>>>>>>>> +dev 
>>>>>>>>
>>>>>>>> Finding a solution that prevents quadrupling shuffle costs when
>>>>>>>> simply writing out a file is a necessity for large scale jobs that work
>>>>>>>> with 100+ TB of data. If anyone has any ideas I'd love to hear them.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Shannon Duncan
>>>>>>>>
>>>>>>>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>>>>>>>> joseph.dun...@liveramp.com> wrote:
>>>>>>>>
>>>>>>>>> We have been using Beam for a bit now. However we just turned on
>>>>>>>>> the dataflow shuffle service and were very surprised that the 
>>>>>>>>> shuffled data
>>>>>>>>> amounts were quadruple the amounts we expected.
>>>>>>>>>
>>>>>>>>> Turns out that the file writing TextIO is doing shuffles within
>>>>>>>>> itself.
>>>>>>>>>
>>>>>>>>> Is there a way to prevent shuffling in the writing phase?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Shannon Duncan
>>>>>>>>>
>>>>>>>>


Re: Prevent Shuffling on Writing Files

2019-09-18 Thread Shannon Duncan
Sorry missed a part of the map output for flatten:

[image: image.png]

However the shuffle does show only 29.32 GB going into it but the output of
Total Shuffled data is 58.66 GB

[image: image.png]

On Wed, Sep 18, 2019 at 4:39 PM Shannon Duncan 
wrote:

> Ok just ran the job on a small input and did not specify numShards. so
> it's literally just:
>
> .apply("WriteLines", TextIO.write().to(options.getOutput()));
>
> Output of map for join:
> [image: image.png]
>
> Details of Shuffle:
> [image: image.png]
>
> Reported Bytes Shuffled:
> [image: image.png]
>
>
> On Wed, Sep 18, 2019 at 4:24 PM Reuven Lax  wrote:
>
>>
>>
>> On Wed, Sep 18, 2019 at 2:12 PM Shannon Duncan <
>> joseph.dun...@liveramp.com> wrote:
>>
>>> I will attempt to do without sharding (though I believe we did do a run
>>> without shards and it incurred the extra shuffle costs).
>>>
>>
>> It shouldn't. There will be a shuffle, but that shuffle should contain a
>> small amount of data (essentially a list of filenames).
>>
>>>
>>> Pipeline is simple.
>>>
>>> The only shuffle that is explicitly defined is the shuffle after merging
>>> files together into a single PCollection (Flatten Transform).
>>>
>>> So it's a Read > Flatten > Shuffle > Map (Format) > Write. We expected
>>> to pay for shuffles on the middle shuffle but were surprised to see that
>>> the output data from the Flatten was quadrupled in the reflected shuffled
>>> GB shown in Dataflow. Which lead me down this path of finding things.
>>>
>>> [image: image.png]
>>>
>>> On Wed, Sep 18, 2019 at 4:08 PM Reuven Lax  wrote:
>>>
>>>> In that case you should be able to leave sharding unspecified, and you
>>>> won't incur the extra shuffle. Specifying explicit sharding is generally
>>>> necessary only for streaming.
>>>>
>>>> On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan <
>>>> joseph.dun...@liveramp.com> wrote:
>>>>
>>>>> batch on dataflowRunner.
>>>>>
>>>>> On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax  wrote:
>>>>>
>>>>>> Are you using streaming or batch? Also which runner are you using?
>>>>>>
>>>>>> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <
>>>>>> joseph.dun...@liveramp.com> wrote:
>>>>>>
>>>>>>> So I followed up on why TextIO shuffles and dug into the code some.
>>>>>>> It is using the shards and getting all the values into a keyed group to
>>>>>>> write to a single file.
>>>>>>>
>>>>>>> However... I wonder if there is way to just take the records that
>>>>>>> are on a worker and write them out. Thus not needing a shard number and
>>>>>>> doing this. Closer to how hadoop handle's writes.
>>>>>>>
>>>>>>> Maybe just a regular pardo and on bundleSetup it creates a writer
>>>>>>> and processElement reuses that writter to write to the same file for all
>>>>>>> elements within a bundle?
>>>>>>>
>>>>>>> I feel like this goes beyond scope of simple user mailing list so
>>>>>>> I'm expanding it to dev as well.
>>>>>>> +dev 
>>>>>>>
>>>>>>> Finding a solution that prevents quadrupling shuffle costs when
>>>>>>> simply writing out a file is a necessity for large scale jobs that work
>>>>>>> with 100+ TB of data. If anyone has any ideas I'd love to hear them.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Shannon Duncan
>>>>>>>
>>>>>>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>>>>>>> joseph.dun...@liveramp.com> wrote:
>>>>>>>
>>>>>>>> We have been using Beam for a bit now. However we just turned on
>>>>>>>> the dataflow shuffle service and were very surprised that the shuffled 
>>>>>>>> data
>>>>>>>> amounts were quadruple the amounts we expected.
>>>>>>>>
>>>>>>>> Turns out that the file writing TextIO is doing shuffles within
>>>>>>>> itself.
>>>>>>>>
>>>>>>>> Is there a way to prevent shuffling in the writing phase?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Shannon Duncan
>>>>>>>>
>>>>>>>


Re: Prevent Shuffling on Writing Files

2019-09-18 Thread Shannon Duncan
Ok just ran the job on a small input and did not specify numShards. so it's
literally just:

.apply("WriteLines", TextIO.write().to(options.getOutput()));

Output of map for join:
[image: image.png]

Details of Shuffle:
[image: image.png]

Reported Bytes Shuffled:
[image: image.png]


On Wed, Sep 18, 2019 at 4:24 PM Reuven Lax  wrote:

>
>
> On Wed, Sep 18, 2019 at 2:12 PM Shannon Duncan 
> wrote:
>
>> I will attempt to do without sharding (though I believe we did do a run
>> without shards and it incurred the extra shuffle costs).
>>
>
> It shouldn't. There will be a shuffle, but that shuffle should contain a
> small amount of data (essentially a list of filenames).
>
>>
>> Pipeline is simple.
>>
>> The only shuffle that is explicitly defined is the shuffle after merging
>> files together into a single PCollection (Flatten Transform).
>>
>> So it's a Read > Flatten > Shuffle > Map (Format) > Write. We expected to
>> pay for shuffles on the middle shuffle but were surprised to see that the
>> output data from the Flatten was quadrupled in the reflected shuffled GB
>> shown in Dataflow. Which lead me down this path of finding things.
>>
>> [image: image.png]
>>
>> On Wed, Sep 18, 2019 at 4:08 PM Reuven Lax  wrote:
>>
>>> In that case you should be able to leave sharding unspecified, and you
>>> won't incur the extra shuffle. Specifying explicit sharding is generally
>>> necessary only for streaming.
>>>
>>> On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan <
>>> joseph.dun...@liveramp.com> wrote:
>>>
>>>> batch on dataflowRunner.
>>>>
>>>> On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax  wrote:
>>>>
>>>>> Are you using streaming or batch? Also which runner are you using?
>>>>>
>>>>> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <
>>>>> joseph.dun...@liveramp.com> wrote:
>>>>>
>>>>>> So I followed up on why TextIO shuffles and dug into the code some.
>>>>>> It is using the shards and getting all the values into a keyed group to
>>>>>> write to a single file.
>>>>>>
>>>>>> However... I wonder if there is way to just take the records that are
>>>>>> on a worker and write them out. Thus not needing a shard number and doing
>>>>>> this. Closer to how hadoop handle's writes.
>>>>>>
>>>>>> Maybe just a regular pardo and on bundleSetup it creates a writer and
>>>>>> processElement reuses that writter to write to the same file for all
>>>>>> elements within a bundle?
>>>>>>
>>>>>> I feel like this goes beyond scope of simple user mailing list so I'm
>>>>>> expanding it to dev as well.
>>>>>> +dev 
>>>>>>
>>>>>> Finding a solution that prevents quadrupling shuffle costs when
>>>>>> simply writing out a file is a necessity for large scale jobs that work
>>>>>> with 100+ TB of data. If anyone has any ideas I'd love to hear them.
>>>>>>
>>>>>> Thanks,
>>>>>> Shannon Duncan
>>>>>>
>>>>>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>>>>>> joseph.dun...@liveramp.com> wrote:
>>>>>>
>>>>>>> We have been using Beam for a bit now. However we just turned on the
>>>>>>> dataflow shuffle service and were very surprised that the shuffled data
>>>>>>> amounts were quadruple the amounts we expected.
>>>>>>>
>>>>>>> Turns out that the file writing TextIO is doing shuffles within
>>>>>>> itself.
>>>>>>>
>>>>>>> Is there a way to prevent shuffling in the writing phase?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Shannon Duncan
>>>>>>>
>>>>>>


Re: Prevent Shuffling on Writing Files

2019-09-18 Thread Shannon Duncan
I will attempt to do without sharding (though I believe we did do a run
without shards and it incurred the extra shuffle costs).

Pipeline is simple.

The only shuffle that is explicitly defined is the shuffle after merging
files together into a single PCollection (Flatten Transform).

So it's a Read > Flatten > Shuffle > Map (Format) > Write. We expected to
pay for shuffles on the middle shuffle but were surprised to see that the
output data from the Flatten was quadrupled in the reflected shuffled GB
shown in Dataflow. Which lead me down this path of finding things.

[image: image.png]

On Wed, Sep 18, 2019 at 4:08 PM Reuven Lax  wrote:

> In that case you should be able to leave sharding unspecified, and you
> won't incur the extra shuffle. Specifying explicit sharding is generally
> necessary only for streaming.
>
> On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan 
> wrote:
>
>> batch on dataflowRunner.
>>
>> On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax  wrote:
>>
>>> Are you using streaming or batch? Also which runner are you using?
>>>
>>> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <
>>> joseph.dun...@liveramp.com> wrote:
>>>
>>>> So I followed up on why TextIO shuffles and dug into the code some. It
>>>> is using the shards and getting all the values into a keyed group to write
>>>> to a single file.
>>>>
>>>> However... I wonder if there is way to just take the records that are
>>>> on a worker and write them out. Thus not needing a shard number and doing
>>>> this. Closer to how hadoop handle's writes.
>>>>
>>>> Maybe just a regular pardo and on bundleSetup it creates a writer and
>>>> processElement reuses that writter to write to the same file for all
>>>> elements within a bundle?
>>>>
>>>> I feel like this goes beyond scope of simple user mailing list so I'm
>>>> expanding it to dev as well.
>>>> +dev 
>>>>
>>>> Finding a solution that prevents quadrupling shuffle costs when simply
>>>> writing out a file is a necessity for large scale jobs that work with 100+
>>>> TB of data. If anyone has any ideas I'd love to hear them.
>>>>
>>>> Thanks,
>>>> Shannon Duncan
>>>>
>>>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>>>> joseph.dun...@liveramp.com> wrote:
>>>>
>>>>> We have been using Beam for a bit now. However we just turned on the
>>>>> dataflow shuffle service and were very surprised that the shuffled data
>>>>> amounts were quadruple the amounts we expected.
>>>>>
>>>>> Turns out that the file writing TextIO is doing shuffles within
>>>>> itself.
>>>>>
>>>>> Is there a way to prevent shuffling in the writing phase?
>>>>>
>>>>> Thanks,
>>>>> Shannon Duncan
>>>>>
>>>>


Re: Prevent Shuffling on Writing Files

2019-09-18 Thread Shannon Duncan
batch on dataflowRunner.

On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax  wrote:

> Are you using streaming or batch? Also which runner are you using?
>
> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan 
> wrote:
>
>> So I followed up on why TextIO shuffles and dug into the code some. It is
>> using the shards and getting all the values into a keyed group to write to
>> a single file.
>>
>> However... I wonder if there is way to just take the records that are on
>> a worker and write them out. Thus not needing a shard number and doing
>> this. Closer to how hadoop handle's writes.
>>
>> Maybe just a regular pardo and on bundleSetup it creates a writer and
>> processElement reuses that writter to write to the same file for all
>> elements within a bundle?
>>
>> I feel like this goes beyond scope of simple user mailing list so I'm
>> expanding it to dev as well.
>> +dev 
>>
>> Finding a solution that prevents quadrupling shuffle costs when simply
>> writing out a file is a necessity for large scale jobs that work with 100+
>> TB of data. If anyone has any ideas I'd love to hear them.
>>
>> Thanks,
>> Shannon Duncan
>>
>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>> joseph.dun...@liveramp.com> wrote:
>>
>>> We have been using Beam for a bit now. However we just turned on the
>>> dataflow shuffle service and were very surprised that the shuffled data
>>> amounts were quadruple the amounts we expected.
>>>
>>> Turns out that the file writing TextIO is doing shuffles within itself.
>>>
>>> Is there a way to prevent shuffling in the writing phase?
>>>
>>> Thanks,
>>> Shannon Duncan
>>>
>>


Re: Prevent Shuffling on Writing Files

2019-09-18 Thread Shannon Duncan
So I followed up on why TextIO shuffles and dug into the code some. It is
using the shards and getting all the values into a keyed group to write to
a single file.

However... I wonder if there is way to just take the records that are on a
worker and write them out. Thus not needing a shard number and doing this.
Closer to how hadoop handle's writes.

Maybe just a regular pardo and on bundleSetup it creates a writer and
processElement reuses that writter to write to the same file for all
elements within a bundle?

I feel like this goes beyond scope of simple user mailing list so I'm
expanding it to dev as well.
+dev 

Finding a solution that prevents quadrupling shuffle costs when simply
writing out a file is a necessity for large scale jobs that work with 100+
TB of data. If anyone has any ideas I'd love to hear them.

Thanks,
Shannon Duncan

On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan 
wrote:

> We have been using Beam for a bit now. However we just turned on the
> dataflow shuffle service and were very surprised that the shuffled data
> amounts were quadruple the amounts we expected.
>
> Turns out that the file writing TextIO is doing shuffles within itself.
>
> Is there a way to prevent shuffling in the writing phase?
>
> Thanks,
> Shannon Duncan
>


Re: [Python] Read Hadoop Sequence File?

2019-07-16 Thread Shannon Duncan
I am still having the problem that local file system (DirectRunner) will
not allow a local GLOB string to be passed as a file source. I have tried
both relative path and fully qualified paths.

I can confirm the same inputFile source GLOB returns data on a simple cat
command. So I know the GLOB is good.

Error: "java.io.FileNotFoundException: No files matched spec:
/Users//github//io/sequenceFile/part-*/data

Any assistance would be greatly appreciated. This is on the Java SDK.

I tested this with TextIO.read().from(ValueProvider); Still the
same.

Thanks,
Shannon

On Fri, Jul 12, 2019 at 2:14 PM Igor Bernstein 
wrote:

> I'm not sure to be honest. The pattern expansion happens in
> FileBasedSource via FileSystems.match(), so it should follow the same
> expansion rules other file based sinks like TextIO. Maybe someone with more
> beam experience can help?
>
> On Fri, Jul 12, 2019 at 2:55 PM Shannon Duncan 
> wrote:
>
>> Clarification on previous message. Only happens on local file system
>> where it is unable to match a pattern string. Via a `gs://` link it
>> is able to do multiple file matching.
>>
>> On Fri, Jul 12, 2019 at 1:36 PM Shannon Duncan <
>> joseph.dun...@liveramp.com> wrote:
>>
>>> Awesome. I got it working for a single file, but for a structure of:
>>>
>>> /part-0001/index
>>> /part-0001/data
>>> /part-0002/index
>>> /part-0002/data
>>>
>>> I tried to do /part-*  and /part-*/data
>>>
>>> It does not find the multipart files. However if I just do
>>> /part-0001/data it will find it and read it.
>>>
>>> Any ideas why?
>>>
>>> I am using this to generate the source:
>>>
>>> static SequenceFileSource createSource(
>>> ValueProvider sourcePattern) {
>>> return new SequenceFileSource(
>>> sourcePattern,
>>> Text.class,
>>> WritableSerialization.class,
>>> Text.class,
>>> WritableSerialization.class,
>>> SequenceFile.SYNC_INTERVAL);
>>> }
>>>
>>> On Wed, Jul 10, 2019 at 10:52 AM Igor Bernstein <
>>> igorbernst...@google.com> wrote:
>>>
>>>> It should be fairly straight forward:
>>>> 1. Copy SequenceFileSource.java
>>>> <https://github.com/googleapis/cloud-bigtable-client/blob/master/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/SequenceFileSource.java>
>>>>  to
>>>> your project
>>>> 2. Add the source to your pipeline, configuring it with appropriate
>>>> serializers. See here
>>>> <https://github.com/googleapis/cloud-bigtable-client/blob/master/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/ImportJob.java#L159-L173>
>>>> for an example for hbase Results
>>>>
>>>> On Wed, Jul 10, 2019 at 10:58 AM Shannon Duncan <
>>>> joseph.dun...@liveramp.com> wrote:
>>>>
>>>>> If I wanted to go ahead and include this within a new Java Pipeline,
>>>>> what would I be looking at for level of work to integrate?
>>>>>
>>>>> On Wed, Jul 3, 2019 at 3:54 AM Ismaël Mejía  wrote:
>>>>>
>>>>>> That's great. I can help whenever you need. We just need to choose its
>>>>>> destination. Both the `hadoop-format` and `hadoop-file-system` modules
>>>>>> are good candidates, I would even feel inclined to put it in its own
>>>>>> module `sdks/java/extensions/sequencefile` to make it more easy to
>>>>>> discover by the final users.
>>>>>>
>>>>>> A thing to consider is the SeekableByteChannel adapters, we can move
>>>>>> that into hadoop-common if needed and refactor the modules to share
>>>>>> code. Worth to take a look at
>>>>>>
>>>>>> org.apache.beam.sdk.io.hdfs.HadoopFileSystem.HadoopSeekableByteChannel#HadoopSeekableByteChannel
>>>>>> to see if some of it could be useful.
>>>>>>
>>>>>> On Tue, Jul 2, 2019 at 11:46 PM Igor Bernstein <
>>>>>> igorbernst...@google.com> wrote:
>>>>>> >
>>>>>> > Hi all,
>>>>>> >
>>>>>> > I wrote those classes with the intention of upstreaming them to
>>>>>> Beam. I can try to make some time this quarter to clean them up. I would
>>>>>> need a bit of guidance from a beam expert in how to make them coexi

Re: [Python] Read Hadoop Sequence File?

2019-07-12 Thread Shannon Duncan
Clarification on previous message. Only happens on local file system where
it is unable to match a pattern string. Via a `gs://` link it is
able to do multiple file matching.

On Fri, Jul 12, 2019 at 1:36 PM Shannon Duncan 
wrote:

> Awesome. I got it working for a single file, but for a structure of:
>
> /part-0001/index
> /part-0001/data
> /part-0002/index
> /part-0002/data
>
> I tried to do /part-*  and /part-*/data
>
> It does not find the multipart files. However if I just do /part-0001/data
> it will find it and read it.
>
> Any ideas why?
>
> I am using this to generate the source:
>
> static SequenceFileSource createSource(
> ValueProvider sourcePattern) {
> return new SequenceFileSource(
> sourcePattern,
> Text.class,
> WritableSerialization.class,
> Text.class,
> WritableSerialization.class,
> SequenceFile.SYNC_INTERVAL);
> }
>
> On Wed, Jul 10, 2019 at 10:52 AM Igor Bernstein 
> wrote:
>
>> It should be fairly straight forward:
>> 1. Copy SequenceFileSource.java
>> <https://github.com/googleapis/cloud-bigtable-client/blob/master/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/SequenceFileSource.java>
>>  to
>> your project
>> 2. Add the source to your pipeline, configuring it with appropriate
>> serializers. See here
>> <https://github.com/googleapis/cloud-bigtable-client/blob/master/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/ImportJob.java#L159-L173>
>> for an example for hbase Results
>>
>> On Wed, Jul 10, 2019 at 10:58 AM Shannon Duncan <
>> joseph.dun...@liveramp.com> wrote:
>>
>>> If I wanted to go ahead and include this within a new Java Pipeline,
>>> what would I be looking at for level of work to integrate?
>>>
>>> On Wed, Jul 3, 2019 at 3:54 AM Ismaël Mejía  wrote:
>>>
>>>> That's great. I can help whenever you need. We just need to choose its
>>>> destination. Both the `hadoop-format` and `hadoop-file-system` modules
>>>> are good candidates, I would even feel inclined to put it in its own
>>>> module `sdks/java/extensions/sequencefile` to make it more easy to
>>>> discover by the final users.
>>>>
>>>> A thing to consider is the SeekableByteChannel adapters, we can move
>>>> that into hadoop-common if needed and refactor the modules to share
>>>> code. Worth to take a look at
>>>>
>>>> org.apache.beam.sdk.io.hdfs.HadoopFileSystem.HadoopSeekableByteChannel#HadoopSeekableByteChannel
>>>> to see if some of it could be useful.
>>>>
>>>> On Tue, Jul 2, 2019 at 11:46 PM Igor Bernstein <
>>>> igorbernst...@google.com> wrote:
>>>> >
>>>> > Hi all,
>>>> >
>>>> > I wrote those classes with the intention of upstreaming them to Beam.
>>>> I can try to make some time this quarter to clean them up. I would need a
>>>> bit of guidance from a beam expert in how to make them coexist with
>>>> HadoopFormatIO though.
>>>> >
>>>> >
>>>> > On Tue, Jul 2, 2019 at 10:55 AM Solomon Duskis 
>>>> wrote:
>>>> >>
>>>> >> +Igor Bernstein who wrote the Cloud Bigtable Sequence File classes.
>>>> >>
>>>> >> Solomon Duskis | Google Cloud clients | sdus...@google.com |
>>>> 914-462-0531
>>>> >>
>>>> >>
>>>> >> On Tue, Jul 2, 2019 at 4:57 AM Ismaël Mejía 
>>>> wrote:
>>>> >>>
>>>> >>> (Adding dev@ and Solomon Duskis to the discussion)
>>>> >>>
>>>> >>> I was not aware of these thanks for sharing David. Definitely it
>>>> would
>>>> >>> be a great addition if we could have those donated as an extension
>>>> in
>>>> >>> the Beam side. We can even evolve them in the future to be more
>>>> FileIO
>>>> >>> like. Any chance this can happen? Maybe Solomon and his team?
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> On Tue, Jul 2, 2019 at 9:39 AM David Morávek 
>>>> wrote:
>>>> >>> >
>>>> >>> > Hi, you can use SequenceFileSink and Source, from a BigTable
>>>> client. Those works nice with FileIO.
>>>> >>> >
>>>> >>> >
>>>>

Re: [Python] Read Hadoop Sequence File?

2019-07-12 Thread Shannon Duncan
Awesome. I got it working for a single file, but for a structure of:

/part-0001/index
/part-0001/data
/part-0002/index
/part-0002/data

I tried to do /part-*  and /part-*/data

It does not find the multipart files. However if I just do /part-0001/data
it will find it and read it.

Any ideas why?

I am using this to generate the source:

static SequenceFileSource createSource(
ValueProvider sourcePattern) {
return new SequenceFileSource(
sourcePattern,
Text.class,
WritableSerialization.class,
Text.class,
WritableSerialization.class,
SequenceFile.SYNC_INTERVAL);
}

On Wed, Jul 10, 2019 at 10:52 AM Igor Bernstein 
wrote:

> It should be fairly straight forward:
> 1. Copy SequenceFileSource.java
> <https://github.com/googleapis/cloud-bigtable-client/blob/master/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/SequenceFileSource.java>
>  to
> your project
> 2. Add the source to your pipeline, configuring it with appropriate
> serializers. See here
> <https://github.com/googleapis/cloud-bigtable-client/blob/master/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/ImportJob.java#L159-L173>
> for an example for hbase Results
>
> On Wed, Jul 10, 2019 at 10:58 AM Shannon Duncan <
> joseph.dun...@liveramp.com> wrote:
>
>> If I wanted to go ahead and include this within a new Java Pipeline, what
>> would I be looking at for level of work to integrate?
>>
>> On Wed, Jul 3, 2019 at 3:54 AM Ismaël Mejía  wrote:
>>
>>> That's great. I can help whenever you need. We just need to choose its
>>> destination. Both the `hadoop-format` and `hadoop-file-system` modules
>>> are good candidates, I would even feel inclined to put it in its own
>>> module `sdks/java/extensions/sequencefile` to make it more easy to
>>> discover by the final users.
>>>
>>> A thing to consider is the SeekableByteChannel adapters, we can move
>>> that into hadoop-common if needed and refactor the modules to share
>>> code. Worth to take a look at
>>>
>>> org.apache.beam.sdk.io.hdfs.HadoopFileSystem.HadoopSeekableByteChannel#HadoopSeekableByteChannel
>>> to see if some of it could be useful.
>>>
>>> On Tue, Jul 2, 2019 at 11:46 PM Igor Bernstein 
>>> wrote:
>>> >
>>> > Hi all,
>>> >
>>> > I wrote those classes with the intention of upstreaming them to Beam.
>>> I can try to make some time this quarter to clean them up. I would need a
>>> bit of guidance from a beam expert in how to make them coexist with
>>> HadoopFormatIO though.
>>> >
>>> >
>>> > On Tue, Jul 2, 2019 at 10:55 AM Solomon Duskis 
>>> wrote:
>>> >>
>>> >> +Igor Bernstein who wrote the Cloud Bigtable Sequence File classes.
>>> >>
>>> >> Solomon Duskis | Google Cloud clients | sdus...@google.com |
>>> 914-462-0531
>>> >>
>>> >>
>>> >> On Tue, Jul 2, 2019 at 4:57 AM Ismaël Mejía 
>>> wrote:
>>> >>>
>>> >>> (Adding dev@ and Solomon Duskis to the discussion)
>>> >>>
>>> >>> I was not aware of these thanks for sharing David. Definitely it
>>> would
>>> >>> be a great addition if we could have those donated as an extension in
>>> >>> the Beam side. We can even evolve them in the future to be more
>>> FileIO
>>> >>> like. Any chance this can happen? Maybe Solomon and his team?
>>> >>>
>>> >>>
>>> >>>
>>> >>> On Tue, Jul 2, 2019 at 9:39 AM David Morávek 
>>> wrote:
>>> >>> >
>>> >>> > Hi, you can use SequenceFileSink and Source, from a BigTable
>>> client. Those works nice with FileIO.
>>> >>> >
>>> >>> >
>>> https://github.com/googleapis/cloud-bigtable-client/blob/master/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/SequenceFileSink.java
>>> >>> >
>>> https://github.com/googleapis/cloud-bigtable-client/blob/master/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/SequenceFileSource.java
>>> >>> >
>>> >>> > It would be really cool to move these into Beam, but that's up to
>>> Googlers to decide, whether they want to donate this.
>>> >>> >
>>> >>> > D.
>>> >>>

Re: [Java] Using a complex datastructure as Key for KV

2019-07-12 Thread Shannon Duncan
I tried to pass ArrayList in and it wouldn't generalize it to List. It
required me to convert my ArrayLists  to Lists.

On Fri, Jul 12, 2019 at 10:20 AM Lukasz Cwik  wrote:

> Additional coders would be useful. Note that we usually don't have coders
> for specific collection types like ArrayList but prefer to have Coders for
> their general counterparts like List, Map, Iterable, 
>
> There has been discussion in the past to make the MapCoder a deterministic
> coder when a coder is required to be deterministic. There are a few people
> working on schema support within Apache Beam that might be able to provide
> guidance (+Reuven Lax  +Brian Hulette
> ).
>
> On Fri, Jul 12, 2019 at 11:05 AM Shannon Duncan <
> joseph.dun...@liveramp.com> wrote:
>
>> I have a working TreeMapCoder now. Got it all setup and done, and the
>> GroupByKey is accepting it.
>>
>> Thanks for all the help. I need to read up more on contributing
>> guidelines then I'll PR the coder into the SDK. Also willing to write
>> coders for things such as ArrayList etc if people want them.
>>
>> On Fri, Jul 12, 2019 at 9:31 AM Shannon Duncan <
>> joseph.dun...@liveramp.com> wrote:
>>
>>> Aha, makes sense. Thanks!
>>>
>>> On Fri, Jul 12, 2019 at 9:26 AM Lukasz Cwik  wrote:
>>>
>>>> TreeMapCoder.of(StringUtf8Coder.of(), ListCoder.of(VarIntCoder.of()));
>>>>
>>>> On Fri, Jul 12, 2019 at 10:22 AM Shannon Duncan <
>>>> joseph.dun...@liveramp.com> wrote:
>>>>
>>>>> So I have my custom coder created for TreeMap and I'm ready to set
>>>>> it...
>>>>>
>>>>> So my Type is "TreeMap>"
>>>>>
>>>>> What do I put for ".setCoder(TreeMapCoder.of(???, ???))"
>>>>>
>>>>> On Thu, Jul 11, 2019 at 8:21 PM Rui Wang  wrote:
>>>>>
>>>>>> Hi Shannon,  [1] will be a good start on coder in Java SDK.
>>>>>>
>>>>>>
>>>>>> [1]
>>>>>> https://beam.apache.org/documentation/programming-guide/#data-encoding-and-type-safety
>>>>>>
>>>>>> Rui
>>>>>>
>>>>>> On Thu, Jul 11, 2019 at 3:08 PM Shannon Duncan <
>>>>>> joseph.dun...@liveramp.com> wrote:
>>>>>>
>>>>>>> Was able to get it to use ArrayList by doing List>
>>>>>>> result = new ArrayList>();
>>>>>>>
>>>>>>> Then storing my keys in a separate array that I'll pass in as a side
>>>>>>> input to key for the list of lists.
>>>>>>>
>>>>>>> Thanks for the help, lemme know more in the future about how coders
>>>>>>> work and instantiate and I'd love to help contribute by adding some new
>>>>>>> coders.
>>>>>>>
>>>>>>> - Shannon
>>>>>>>
>>>>>>> On Thu, Jul 11, 2019 at 4:59 PM Shannon Duncan <
>>>>>>> joseph.dun...@liveramp.com> wrote:
>>>>>>>
>>>>>>>> Will do. Thanks. A new coder for deterministic Maps would be great
>>>>>>>> in the future. Thank you!
>>>>>>>>
>>>>>>>> On Thu, Jul 11, 2019 at 4:58 PM Rui Wang  wrote:
>>>>>>>>
>>>>>>>>> I think Mike refers to ListCoder
>>>>>>>>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/ListCoder.java>
>>>>>>>>>  which
>>>>>>>>> is deterministic if its element is the same. Maybe you can search the 
>>>>>>>>> repo
>>>>>>>>> for examples of ListCoder?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Rui
>>>>>>>>>
>>>>>>>>> On Thu, Jul 11, 2019 at 2:55 PM Shannon Duncan <
>>>>>>>>> joseph.dun...@liveramp.com> wrote:
>>>>>>>>>
>>>>>>>>>> So ArrayList doesn't work either, so just a standard List?
>>>>>>>>>>
>>>>>>>>>> On Thu, Jul 11, 2019 at 4:53 PM Rui Wang 
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Shannon, I agree with Mike on

Re: Python Utilities

2019-07-10 Thread Shannon Duncan
So it seams that the Java SDK has two different Join libraries?

With Schema:
https://github.com/apache/beam/tree/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms
And Another one:
https://github.com/apache/beam/blob/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java

So how does it handle that?

On Mon, Jul 8, 2019 at 12:39 PM Shannon Duncan 
wrote:

> Yeah these are for local testing right now. I was hoping to gain insight
> on better naming.
>
> I was thinking of creating an "extras" module.
>
> On Mon, Jul 8, 2019, 12:28 PM Robin Qiu  wrote:
>
>> Hi Shannon,
>>
>> Thanks for sharing the repo! I took a quick look and I have a concern
>> with the naming of the transforms.
>>
>> Currently, Beam Java already have "Select" and "Join" transforms.
>> However, they work on schemas, a feature that is not yet implemented in
>> Beam Python. (See
>> https://github.com/apache/beam/tree/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms
>> )
>>
>> To maintain consistency between SDKs, I think it is good to avoid having
>> two different transforms with the same name but different functions. So
>> maybe you can consider renaming the transforms or/and putting it in an
>> extension Python module, instead of the main ones?
>>
>> Best,
>> Robin
>>
>> On Mon, Jul 8, 2019 at 9:19 AM Shannon Duncan 
>> wrote:
>>
>>> As a follow up. Here is the repo that contains the utilities for now.
>>> https://github.com/shadowcodex/apache-beam-utilities. Will put together
>>> a proper PR as code gets closer to production quality.
>>>
>>> - Shannon
>>>
>>> On Mon, Jul 8, 2019 at 9:20 AM Shannon Duncan <
>>> joseph.dun...@liveramp.com> wrote:
>>>
>>>> Thanks Frederik,
>>>>
>>>> That's exactly where I was looking. I did get permission to open source
>>>> the utilities module. So I'm going to throw them up on my personal github
>>>> soon and share with the email group for a look over.
>>>>
>>>> I'm going to work on the utilities there because it's a quick dev
>>>> environment and then once they are ready for proper PR I'll begin working
>>>> them into the actual SDK for a PR.
>>>>
>>>> I also joined the slack #beam and #beam-python channels, I was unsure
>>>> of where most collaborators discussed items.
>>>>
>>>> - Shannon
>>>>
>>>> On Mon, Jul 8, 2019 at 9:09 AM Frederik Bode 
>>>> wrote:
>>>>
>>>>> Hi Shannon,
>>>>>
>>>>> This is probably a good starting point:
>>>>> https://github.com/apache/beam/blob/2d5e493abf39ee6fc89831bb0b7ec9fee592b9c5/sdks/python/apache_beam/transforms/combiners.py#L68
>>>>> .
>>>>>
>>>>> Frederik
>>>>>
>>>>> [image: https://ml6.eu]
>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ml6.eu_=DwMFaQ=fP4tf--1dS0biCFlB0saz0I0kjO5v7-GLPtvShAo4cc=pVqtPRV3xHPbewK5Cnv1OugvWbha6Poxqp5n4ssIg74=FLed4d0BjB5-R2hz9IHrat47LfDj7YhMNHbEVeZ0dw8=yd_him24QhfROm7uRZLbfSsUHaA68_8FMl6s1MgT5sM=>
>>>>>
>>>>>
>>>>> * Frederik Bode*
>>>>>
>>>>> ML6 Ghent
>>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__www.google.be_maps_place_ML6_-4051.037408-2C3.7044893-2C17z_data-3D-213m1-214b1-214m5-213m4-211s0x47c37161feeca14b-3A0xb8f72585fdd21c90-218m2-213d51.037408-214d3.706678-3Fhl-3Dnl=DwMFaQ=fP4tf--1dS0biCFlB0saz0I0kjO5v7-GLPtvShAo4cc=pVqtPRV3xHPbewK5Cnv1OugvWbha6Poxqp5n4ssIg74=FLed4d0BjB5-R2hz9IHrat47LfDj7YhMNHbEVeZ0dw8=26TZxPGXg0A_mqgeiw1lMeZYekpkExBAZ5MpavpUZmw=>
>>>>> +32 4 92 78 96 18
>>>>>
>>>>>
>>>>>  DISCLAIMER 
>>>>>
>>>>> This email and any files transmitted with it are confidential and
>>>>> intended solely for the use of the individual or entity to whom they are
>>>>> addressed. If you have received this email in error please notify the
>>>>> system manager. This message contains confidential information and is
>>>>> intended only for the individual named. If you are not the named addressee
>>>>> you should not disseminate, distribute or copy this e-mail. Please notify
>>>>> the 

Re: [Python] Read Hadoop Sequence File?

2019-07-10 Thread Shannon Duncan
If I wanted to go ahead and include this within a new Java Pipeline, what
would I be looking at for level of work to integrate?

On Wed, Jul 3, 2019 at 3:54 AM Ismaël Mejía  wrote:

> That's great. I can help whenever you need. We just need to choose its
> destination. Both the `hadoop-format` and `hadoop-file-system` modules
> are good candidates, I would even feel inclined to put it in its own
> module `sdks/java/extensions/sequencefile` to make it more easy to
> discover by the final users.
>
> A thing to consider is the SeekableByteChannel adapters, we can move
> that into hadoop-common if needed and refactor the modules to share
> code. Worth to take a look at
>
> org.apache.beam.sdk.io.hdfs.HadoopFileSystem.HadoopSeekableByteChannel#HadoopSeekableByteChannel
> to see if some of it could be useful.
>
> On Tue, Jul 2, 2019 at 11:46 PM Igor Bernstein 
> wrote:
> >
> > Hi all,
> >
> > I wrote those classes with the intention of upstreaming them to Beam. I
> can try to make some time this quarter to clean them up. I would need a bit
> of guidance from a beam expert in how to make them coexist with
> HadoopFormatIO though.
> >
> >
> > On Tue, Jul 2, 2019 at 10:55 AM Solomon Duskis 
> wrote:
> >>
> >> +Igor Bernstein who wrote the Cloud Bigtable Sequence File classes.
> >>
> >> Solomon Duskis | Google Cloud clients | sdus...@google.com |
> 914-462-0531
> >>
> >>
> >> On Tue, Jul 2, 2019 at 4:57 AM Ismaël Mejía  wrote:
> >>>
> >>> (Adding dev@ and Solomon Duskis to the discussion)
> >>>
> >>> I was not aware of these thanks for sharing David. Definitely it would
> >>> be a great addition if we could have those donated as an extension in
> >>> the Beam side. We can even evolve them in the future to be more FileIO
> >>> like. Any chance this can happen? Maybe Solomon and his team?
> >>>
> >>>
> >>>
> >>> On Tue, Jul 2, 2019 at 9:39 AM David Morávek  wrote:
> >>> >
> >>> > Hi, you can use SequenceFileSink and Source, from a BigTable client.
> Those works nice with FileIO.
> >>> >
> >>> >
> https://github.com/googleapis/cloud-bigtable-client/blob/master/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/SequenceFileSink.java
> >>> >
> https://github.com/googleapis/cloud-bigtable-client/blob/master/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/SequenceFileSource.java
> >>> >
> >>> > It would be really cool to move these into Beam, but that's up to
> Googlers to decide, whether they want to donate this.
> >>> >
> >>> > D.
> >>> >
> >>> > On Tue, Jul 2, 2019 at 2:07 AM Shannon Duncan <
> joseph.dun...@liveramp.com> wrote:
> >>> >>
> >>> >> It's not outside the realm of possibilities. For now I've created
> an intermediary step of a hadoop job that converts from sequence to text
> file.
> >>> >>
> >>> >> Looking into better options.
> >>> >>
> >>> >> On Mon, Jul 1, 2019, 5:50 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
> >>> >>>
> >>> >>> Java SDK has a HadoopInputFormatIO using which you should be able
> to read Sequence files:
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
> >>> >>> I don't think there's a direct alternative for this for Python.
> >>> >>>
> >>> >>> Is it possible to write to a well-known format such as Avro
> instead of a Hadoop specific format which will allow you to read from both
> Dataproc/Hadoop and Beam Python SDK ?
> >>> >>>
> >>> >>> Thanks,
> >>> >>> Cham
> >>> >>>
> >>> >>> On Mon, Jul 1, 2019 at 3:37 PM Shannon Duncan <
> joseph.dun...@liveramp.com> wrote:
> >>> >>>>
> >>> >>>> That's a pretty big hole for a missing source/sink when looking
> at transitioning from Dataproc to Dataflow using GCS as storage buffer
> instead of a traditional hdfs.
> >>> >>>>
> >>> >>>> From what I've been able to tell from source code and
> documentation, Java is able to but not Python?
> >>> >>>>
> >>> >>>> T

Re: Python Utilities

2019-07-08 Thread Shannon Duncan
Yeah these are for local testing right now. I was hoping to gain insight on
better naming.

I was thinking of creating an "extras" module.

On Mon, Jul 8, 2019, 12:28 PM Robin Qiu  wrote:

> Hi Shannon,
>
> Thanks for sharing the repo! I took a quick look and I have a concern with
> the naming of the transforms.
>
> Currently, Beam Java already have "Select" and "Join" transforms. However,
> they work on schemas, a feature that is not yet implemented in Beam Python.
> (See
> https://github.com/apache/beam/tree/77b295b1c2b0a206099b8f50c4d3180c248e252c/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms
> )
>
> To maintain consistency between SDKs, I think it is good to avoid having
> two different transforms with the same name but different functions. So
> maybe you can consider renaming the transforms or/and putting it in an
> extension Python module, instead of the main ones?
>
> Best,
> Robin
>
> On Mon, Jul 8, 2019 at 9:19 AM Shannon Duncan 
> wrote:
>
>> As a follow up. Here is the repo that contains the utilities for now.
>> https://github.com/shadowcodex/apache-beam-utilities. Will put together
>> a proper PR as code gets closer to production quality.
>>
>> - Shannon
>>
>> On Mon, Jul 8, 2019 at 9:20 AM Shannon Duncan 
>> wrote:
>>
>>> Thanks Frederik,
>>>
>>> That's exactly where I was looking. I did get permission to open source
>>> the utilities module. So I'm going to throw them up on my personal github
>>> soon and share with the email group for a look over.
>>>
>>> I'm going to work on the utilities there because it's a quick dev
>>> environment and then once they are ready for proper PR I'll begin working
>>> them into the actual SDK for a PR.
>>>
>>> I also joined the slack #beam and #beam-python channels, I was unsure of
>>> where most collaborators discussed items.
>>>
>>> - Shannon
>>>
>>> On Mon, Jul 8, 2019 at 9:09 AM Frederik Bode 
>>> wrote:
>>>
>>>> Hi Shannon,
>>>>
>>>> This is probably a good starting point:
>>>> https://github.com/apache/beam/blob/2d5e493abf39ee6fc89831bb0b7ec9fee592b9c5/sdks/python/apache_beam/transforms/combiners.py#L68
>>>> .
>>>>
>>>> Frederik
>>>>
>>>> [image: https://ml6.eu]
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ml6.eu_=DwMFaQ=fP4tf--1dS0biCFlB0saz0I0kjO5v7-GLPtvShAo4cc=pVqtPRV3xHPbewK5Cnv1OugvWbha6Poxqp5n4ssIg74=FLed4d0BjB5-R2hz9IHrat47LfDj7YhMNHbEVeZ0dw8=yd_him24QhfROm7uRZLbfSsUHaA68_8FMl6s1MgT5sM=>
>>>>
>>>>
>>>> * Frederik Bode*
>>>>
>>>> ML6 Ghent
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__www.google.be_maps_place_ML6_-4051.037408-2C3.7044893-2C17z_data-3D-213m1-214b1-214m5-213m4-211s0x47c37161feeca14b-3A0xb8f72585fdd21c90-218m2-213d51.037408-214d3.706678-3Fhl-3Dnl=DwMFaQ=fP4tf--1dS0biCFlB0saz0I0kjO5v7-GLPtvShAo4cc=pVqtPRV3xHPbewK5Cnv1OugvWbha6Poxqp5n4ssIg74=FLed4d0BjB5-R2hz9IHrat47LfDj7YhMNHbEVeZ0dw8=26TZxPGXg0A_mqgeiw1lMeZYekpkExBAZ5MpavpUZmw=>
>>>> +32 4 92 78 96 18
>>>>
>>>>
>>>>  DISCLAIMER 
>>>>
>>>> This email and any files transmitted with it are confidential and
>>>> intended solely for the use of the individual or entity to whom they are
>>>> addressed. If you have received this email in error please notify the
>>>> system manager. This message contains confidential information and is
>>>> intended only for the individual named. If you are not the named addressee
>>>> you should not disseminate, distribute or copy this e-mail. Please notify
>>>> the sender immediately by e-mail if you have received this e-mail by
>>>> mistake and delete this e-mail from your system. If you are not the
>>>> intended recipient you are notified that disclosing, copying, distributing
>>>> or taking any action in reliance on the contents of this information is
>>>> strictly prohibited.
>>>>
>>>>
>>>> On Mon, 8 Jul 2019 at 15:40, Shannon Duncan 
>>>> wrote:
>>>>
>>>>> I'm sure I could use some of the existing aggregations as a guide on
>>>>> how to make aggregations to fill the gap of missing ones. Such as creating
>>>>> Sum/Max/Min.
>>>>>
>>>>> GroupBy is really already handled with GroupByKey and CoGroupByKey
>>>>> unless you are thinking of a different type o

Re: Python Utilities

2019-07-08 Thread Shannon Duncan
As a follow up. Here is the repo that contains the utilities for now.
https://github.com/shadowcodex/apache-beam-utilities. Will put together a
proper PR as code gets closer to production quality.

- Shannon

On Mon, Jul 8, 2019 at 9:20 AM Shannon Duncan 
wrote:

> Thanks Frederik,
>
> That's exactly where I was looking. I did get permission to open source
> the utilities module. So I'm going to throw them up on my personal github
> soon and share with the email group for a look over.
>
> I'm going to work on the utilities there because it's a quick dev
> environment and then once they are ready for proper PR I'll begin working
> them into the actual SDK for a PR.
>
> I also joined the slack #beam and #beam-python channels, I was unsure of
> where most collaborators discussed items.
>
> - Shannon
>
> On Mon, Jul 8, 2019 at 9:09 AM Frederik Bode  wrote:
>
>> Hi Shannon,
>>
>> This is probably a good starting point:
>> https://github.com/apache/beam/blob/2d5e493abf39ee6fc89831bb0b7ec9fee592b9c5/sdks/python/apache_beam/transforms/combiners.py#L68
>> .
>>
>> Frederik
>>
>> [image: https://ml6.eu]
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ml6.eu_=DwMFaQ=fP4tf--1dS0biCFlB0saz0I0kjO5v7-GLPtvShAo4cc=pVqtPRV3xHPbewK5Cnv1OugvWbha6Poxqp5n4ssIg74=FLed4d0BjB5-R2hz9IHrat47LfDj7YhMNHbEVeZ0dw8=yd_him24QhfROm7uRZLbfSsUHaA68_8FMl6s1MgT5sM=>
>>
>>
>> * Frederik Bode*
>>
>> ML6 Ghent
>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__www.google.be_maps_place_ML6_-4051.037408-2C3.7044893-2C17z_data-3D-213m1-214b1-214m5-213m4-211s0x47c37161feeca14b-3A0xb8f72585fdd21c90-218m2-213d51.037408-214d3.706678-3Fhl-3Dnl=DwMFaQ=fP4tf--1dS0biCFlB0saz0I0kjO5v7-GLPtvShAo4cc=pVqtPRV3xHPbewK5Cnv1OugvWbha6Poxqp5n4ssIg74=FLed4d0BjB5-R2hz9IHrat47LfDj7YhMNHbEVeZ0dw8=26TZxPGXg0A_mqgeiw1lMeZYekpkExBAZ5MpavpUZmw=>
>> +32 4 92 78 96 18
>>
>>
>>  DISCLAIMER 
>>
>> This email and any files transmitted with it are confidential and
>> intended solely for the use of the individual or entity to whom they are
>> addressed. If you have received this email in error please notify the
>> system manager. This message contains confidential information and is
>> intended only for the individual named. If you are not the named addressee
>> you should not disseminate, distribute or copy this e-mail. Please notify
>> the sender immediately by e-mail if you have received this e-mail by
>> mistake and delete this e-mail from your system. If you are not the
>> intended recipient you are notified that disclosing, copying, distributing
>> or taking any action in reliance on the contents of this information is
>> strictly prohibited.
>>
>>
>> On Mon, 8 Jul 2019 at 15:40, Shannon Duncan 
>> wrote:
>>
>>> I'm sure I could use some of the existing aggregations as a guide on how
>>> to make aggregations to fill the gap of missing ones. Such as creating
>>> Sum/Max/Min.
>>>
>>> GroupBy is really already handled with GroupByKey and CoGroupByKey
>>> unless you are thinking of a different type of GroupBy?
>>>
>>> - Shannon
>>>
>>> On Sun, Jul 7, 2019 at 10:47 PM Rui Wang  wrote:
>>>
>>>> Maybe also adding Aggregation/GroupBy as utilities?
>>>>
>>>>
>>>> -Rui
>>>>
>>>> On Sun, Jul 7, 2019 at 1:46 PM Shannon Duncan <
>>>> joseph.dun...@liveramp.com> wrote:
>>>>
>>>>> Thanks Valentyn,
>>>>>
>>>>> I'll outline the utilities and accept any suggestions to add / modify.
>>>>> These are really just shortcut PTransforms that I am working on to 
>>>>> simplify
>>>>> creating pipelines.
>>>>>
>>>>> Currently the utilities contain the following PTransforms:
>>>>>
>>>>> - Inner Join
>>>>> - Left Outer Join
>>>>> - Right Outer Join
>>>>> - Full Outer Join
>>>>> - PrepareKey (For selecting items in a dictionary to act as a key for
>>>>> the joins)
>>>>> - Select (very simple filter that returns only items you want from the
>>>>> dictionary) (allows for defining a default nullValue)
>>>>>
>>>>> Currently these operations only work with dictionaries, but I'd be
>>>>> interested to see how it would work for  tuples.
>>>>>
>>>>> I'm new to python so they may not be optimized or the best way, but
>>>>> from my understanding these seem to be the best wa

Re: Python Utilities

2019-07-08 Thread Shannon Duncan
Thanks Frederik,

That's exactly where I was looking. I did get permission to open source the
utilities module. So I'm going to throw them up on my personal github soon
and share with the email group for a look over.

I'm going to work on the utilities there because it's a quick dev
environment and then once they are ready for proper PR I'll begin working
them into the actual SDK for a PR.

I also joined the slack #beam and #beam-python channels, I was unsure of
where most collaborators discussed items.

- Shannon

On Mon, Jul 8, 2019 at 9:09 AM Frederik Bode  wrote:

> Hi Shannon,
>
> This is probably a good starting point:
> https://github.com/apache/beam/blob/2d5e493abf39ee6fc89831bb0b7ec9fee592b9c5/sdks/python/apache_beam/transforms/combiners.py#L68
> .
>
> Frederik
>
> [image: https://ml6.eu]
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ml6.eu_=DwMFaQ=fP4tf--1dS0biCFlB0saz0I0kjO5v7-GLPtvShAo4cc=pVqtPRV3xHPbewK5Cnv1OugvWbha6Poxqp5n4ssIg74=FLed4d0BjB5-R2hz9IHrat47LfDj7YhMNHbEVeZ0dw8=yd_him24QhfROm7uRZLbfSsUHaA68_8FMl6s1MgT5sM=>
>
>
> * Frederik Bode*
>
> ML6 Ghent
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__www.google.be_maps_place_ML6_-4051.037408-2C3.7044893-2C17z_data-3D-213m1-214b1-214m5-213m4-211s0x47c37161feeca14b-3A0xb8f72585fdd21c90-218m2-213d51.037408-214d3.706678-3Fhl-3Dnl=DwMFaQ=fP4tf--1dS0biCFlB0saz0I0kjO5v7-GLPtvShAo4cc=pVqtPRV3xHPbewK5Cnv1OugvWbha6Poxqp5n4ssIg74=FLed4d0BjB5-R2hz9IHrat47LfDj7YhMNHbEVeZ0dw8=26TZxPGXg0A_mqgeiw1lMeZYekpkExBAZ5MpavpUZmw=>
> +32 4 92 78 96 18
>
>
>  DISCLAIMER 
>
> This email and any files transmitted with it are confidential and intended
> solely for the use of the individual or entity to whom they are addressed.
> If you have received this email in error please notify the system manager.
> This message contains confidential information and is intended only for the
> individual named. If you are not the named addressee you should not
> disseminate, distribute or copy this e-mail. Please notify the sender
> immediately by e-mail if you have received this e-mail by mistake and
> delete this e-mail from your system. If you are not the intended recipient
> you are notified that disclosing, copying, distributing or taking any
> action in reliance on the contents of this information is strictly
> prohibited.
>
>
> On Mon, 8 Jul 2019 at 15:40, Shannon Duncan 
> wrote:
>
>> I'm sure I could use some of the existing aggregations as a guide on how
>> to make aggregations to fill the gap of missing ones. Such as creating
>> Sum/Max/Min.
>>
>> GroupBy is really already handled with GroupByKey and CoGroupByKey unless
>> you are thinking of a different type of GroupBy?
>>
>> - Shannon
>>
>> On Sun, Jul 7, 2019 at 10:47 PM Rui Wang  wrote:
>>
>>> Maybe also adding Aggregation/GroupBy as utilities?
>>>
>>>
>>> -Rui
>>>
>>> On Sun, Jul 7, 2019 at 1:46 PM Shannon Duncan <
>>> joseph.dun...@liveramp.com> wrote:
>>>
>>>> Thanks Valentyn,
>>>>
>>>> I'll outline the utilities and accept any suggestions to add / modify.
>>>> These are really just shortcut PTransforms that I am working on to simplify
>>>> creating pipelines.
>>>>
>>>> Currently the utilities contain the following PTransforms:
>>>>
>>>> - Inner Join
>>>> - Left Outer Join
>>>> - Right Outer Join
>>>> - Full Outer Join
>>>> - PrepareKey (For selecting items in a dictionary to act as a key for
>>>> the joins)
>>>> - Select (very simple filter that returns only items you want from the
>>>> dictionary) (allows for defining a default nullValue)
>>>>
>>>> Currently these operations only work with dictionaries, but I'd be
>>>> interested to see how it would work for  tuples.
>>>>
>>>> I'm new to python so they may not be optimized or the best way, but
>>>> from my understanding these seem to be the best way to do these types of
>>>> operations. Essentially I created a pipeline to be able to convert a simple
>>>> sql query into a flow of these utilities. Using prepareKey to define your
>>>> joining key, joining, and then selecting from the join allows you to do a
>>>> lot of powerful manipulation in a simple / familiar way.
>>>>
>>>> If this is something that we'd like to add to the Beam SDK I don't mind
>>>> looking at the contributor license agreement, and conversing more on how to
>>>> get them in.
>>>>
>>>> Thanks,
>>>> Shannon
>

Re: Python Utilities

2019-07-08 Thread Shannon Duncan
I'm sure I could use some of the existing aggregations as a guide on how to
make aggregations to fill the gap of missing ones. Such as creating
Sum/Max/Min.

GroupBy is really already handled with GroupByKey and CoGroupByKey unless
you are thinking of a different type of GroupBy?

- Shannon

On Sun, Jul 7, 2019 at 10:47 PM Rui Wang  wrote:

> Maybe also adding Aggregation/GroupBy as utilities?
>
>
> -Rui
>
> On Sun, Jul 7, 2019 at 1:46 PM Shannon Duncan 
> wrote:
>
>> Thanks Valentyn,
>>
>> I'll outline the utilities and accept any suggestions to add / modify.
>> These are really just shortcut PTransforms that I am working on to simplify
>> creating pipelines.
>>
>> Currently the utilities contain the following PTransforms:
>>
>> - Inner Join
>> - Left Outer Join
>> - Right Outer Join
>> - Full Outer Join
>> - PrepareKey (For selecting items in a dictionary to act as a key for the
>> joins)
>> - Select (very simple filter that returns only items you want from the
>> dictionary) (allows for defining a default nullValue)
>>
>> Currently these operations only work with dictionaries, but I'd be
>> interested to see how it would work for  tuples.
>>
>> I'm new to python so they may not be optimized or the best way, but from
>> my understanding these seem to be the best way to do these types of
>> operations. Essentially I created a pipeline to be able to convert a simple
>> sql query into a flow of these utilities. Using prepareKey to define your
>> joining key, joining, and then selecting from the join allows you to do a
>> lot of powerful manipulation in a simple / familiar way.
>>
>> If this is something that we'd like to add to the Beam SDK I don't mind
>> looking at the contributor license agreement, and conversing more on how to
>> get them in.
>>
>> Thanks,
>> Shannon
>>
>>
>>
>> On Wed, Jul 3, 2019 at 5:16 PM Valentyn Tymofieiev 
>> wrote:
>>
>>> Hi Shannon,
>>>
>>> Thanks for considering a contribution to Beam Python SDK. With a direct
>>> contribution to Beam SDK, your change will reach larger audience of users,
>>> and you will not have to maintain a separate project and keep it up to date
>>> with new releases of Beam.
>>>
>>> I encourage you to take a look at https://beam.apache.org/contribute/ for
>>> general advice on how to get started. To echo some points mentioned in the
>>> guide:
>>>
>>> - If your change is large or it is your first change, it is a good idea
>>> to discuss it on the dev@ mailing list
>>> - For large changes create a design doc (template, examples) and email
>>> it to the dev@ mailing list.
>>>
>>> Thanks,
>>> Valentyn
>>>
>>> On Wed, Jul 3, 2019 at 3:04 PM Shannon Duncan <
>>> joseph.dun...@liveramp.com> wrote:
>>>
>>>> I have been writing a bunch of utilities for the python SDK such as
>>>> joins, selections, composite transforms, etc...
>>>>
>>>> I am working with my company to see if I can open source the utilities.
>>>> Would it be best to post them on a separate PyPi project, or to PR them
>>>> into the beam SDK? I assume if they let me open source it they will want
>>>> some attribution or something like that.
>>>>
>>>> Thanks,
>>>> Shannon
>>>>
>>>


Re: Python Utilities

2019-07-07 Thread Shannon Duncan
Thanks Valentyn,

I'll outline the utilities and accept any suggestions to add / modify.
These are really just shortcut PTransforms that I am working on to simplify
creating pipelines.

Currently the utilities contain the following PTransforms:

- Inner Join
- Left Outer Join
- Right Outer Join
- Full Outer Join
- PrepareKey (For selecting items in a dictionary to act as a key for the
joins)
- Select (very simple filter that returns only items you want from the
dictionary) (allows for defining a default nullValue)

Currently these operations only work with dictionaries, but I'd be
interested to see how it would work for  tuples.

I'm new to python so they may not be optimized or the best way, but from my
understanding these seem to be the best way to do these types of
operations. Essentially I created a pipeline to be able to convert a simple
sql query into a flow of these utilities. Using prepareKey to define your
joining key, joining, and then selecting from the join allows you to do a
lot of powerful manipulation in a simple / familiar way.

If this is something that we'd like to add to the Beam SDK I don't mind
looking at the contributor license agreement, and conversing more on how to
get them in.

Thanks,
Shannon



On Wed, Jul 3, 2019 at 5:16 PM Valentyn Tymofieiev 
wrote:

> Hi Shannon,
>
> Thanks for considering a contribution to Beam Python SDK. With a direct
> contribution to Beam SDK, your change will reach larger audience of users,
> and you will not have to maintain a separate project and keep it up to date
> with new releases of Beam.
>
> I encourage you to take a look at https://beam.apache.org/contribute/ for
> general advice on how to get started. To echo some points mentioned in the
> guide:
>
> - If your change is large or it is your first change, it is a good idea to
> discuss it on the dev@ mailing list
> - For large changes create a design doc (template, examples) and email it
> to the dev@ mailing list.
>
> Thanks,
> Valentyn
>
> On Wed, Jul 3, 2019 at 3:04 PM Shannon Duncan 
> wrote:
>
>> I have been writing a bunch of utilities for the python SDK such as
>> joins, selections, composite transforms, etc...
>>
>> I am working with my company to see if I can open source the utilities.
>> Would it be best to post them on a separate PyPi project, or to PR them
>> into the beam SDK? I assume if they let me open source it they will want
>> some attribution or something like that.
>>
>> Thanks,
>> Shannon
>>
>


Python Utilities

2019-07-03 Thread Shannon Duncan
I have been writing a bunch of utilities for the python SDK such as joins,
selections, composite transforms, etc...

I am working with my company to see if I can open source the utilities.
Would it be best to post them on a separate PyPi project, or to PR them
into the beam SDK? I assume if they let me open source it they will want
some attribution or something like that.

Thanks,
Shannon


Re: [Python] Read Hadoop Sequence File?

2019-07-02 Thread Shannon Duncan
It would be great I'd it was available for both Java and Python.

On Tue, Jul 2, 2019, 3:57 AM Ismaël Mejía  wrote:

> (Adding dev@ and Solomon Duskis to the discussion)
>
> I was not aware of these thanks for sharing David. Definitely it would
> be a great addition if we could have those donated as an extension in
> the Beam side. We can even evolve them in the future to be more FileIO
> like. Any chance this can happen? Maybe Solomon and his team?
>
>
>
> On Tue, Jul 2, 2019 at 9:39 AM David Morávek  wrote:
> >
> > Hi, you can use SequenceFileSink and Source, from a BigTable client.
> Those works nice with FileIO.
> >
> >
> https://github.com/googleapis/cloud-bigtable-client/blob/master/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/SequenceFileSink.java
> >
> https://github.com/googleapis/cloud-bigtable-client/blob/master/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/SequenceFileSource.java
> >
> > It would be really cool to move these into Beam, but that's up to
> Googlers to decide, whether they want to donate this.
> >
> > D.
> >
> > On Tue, Jul 2, 2019 at 2:07 AM Shannon Duncan <
> joseph.dun...@liveramp.com> wrote:
> >>
> >> It's not outside the realm of possibilities. For now I've created an
> intermediary step of a hadoop job that converts from sequence to text file.
> >>
> >> Looking into better options.
> >>
> >> On Mon, Jul 1, 2019, 5:50 PM Chamikara Jayalath 
> wrote:
> >>>
> >>> Java SDK has a HadoopInputFormatIO using which you should be able to
> read Sequence files:
> https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-format/src/main/java/org/apache/beam/sdk/io/hadoop/format/HadoopFormatIO.java
> >>> I don't think there's a direct alternative for this for Python.
> >>>
> >>> Is it possible to write to a well-known format such as Avro instead of
> a Hadoop specific format which will allow you to read from both
> Dataproc/Hadoop and Beam Python SDK ?
> >>>
> >>> Thanks,
> >>> Cham
> >>>
> >>> On Mon, Jul 1, 2019 at 3:37 PM Shannon Duncan <
> joseph.dun...@liveramp.com> wrote:
> >>>>
> >>>> That's a pretty big hole for a missing source/sink when looking at
> transitioning from Dataproc to Dataflow using GCS as storage buffer instead
> of a traditional hdfs.
> >>>>
> >>>> From what I've been able to tell from source code and documentation,
> Java is able to but not Python?
> >>>>
> >>>> Thanks,
> >>>> Shannon
> >>>>
> >>>> On Mon, Jul 1, 2019 at 5:29 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
> >>>>>
> >>>>> I don't think we have a source/sink for reading Hadoop sequence
> files. Your best bet currently will probably be to use FileSystem
> abstraction to create a file from a ParDo and read directly from there
> using a library that can read sequence files.
> >>>>>
> >>>>> Thanks,
> >>>>> Cham
> >>>>>
> >>>>> On Mon, Jul 1, 2019 at 8:42 AM Shannon Duncan <
> joseph.dun...@liveramp.com> wrote:
> >>>>>>
> >>>>>> I'm wanting to read a Sequence/Map file from Hadoop stored on
> Google Cloud Storage via a " gs://bucket/link/SequenceFile-* " via the
> Python SDK.
> >>>>>>
> >>>>>> I cannot locate any good adapters for this, and the one Hadoop
> Filesystem reader seems to only read from a "hdfs://" url.
> >>>>>>
> >>>>>> I'm wanting to use Dataflow and GCS exclusively to start mixing in
> Beam pipelines with our current Hadoop Pipelines.
> >>>>>>
> >>>>>> Is this a feature that is supported or will be supported in the
> future?
> >>>>>> Does anyone have any good suggestions for this that is performant?
> >>>>>>
> >>>>>> I'd also like to be able to write back out to a SequenceFile if
> possible.
> >>>>>>
> >>>>>> Thanks!
> >>>>>>
>