Re: Using Beam Built-in I/O Transforms with an external framework.

2019-09-18 Thread Pulasthi Supun Wickramasinghe
Hi Chamikara, Chad

Thanks for your replies.

@Chamikara
I am already working on a Beam runner for Twister2, the runner is
functional for the most part even though I have not fully tested it. What I
was thinking was to find a way to transform just the sources using the Read
primitive that is used in the Twister2 beam runner, since the read
primitive transforms between Beam sources and the Twister2 sources. The
missing part is a way to do the transform without having to use a beam
pipeline. I will look into this a little more to get a better idea.

@Chad
Like I mentioned we are already working on a runner for Twister2, Since
Beam supports a rich set of data sources it would be great to get them to
work with Twister2 framework directly in addition to supporting Beam as a
runner.

Best Regards,
Pulasthi

On Wed, Sep 18, 2019 at 5:27 PM Chad Dombrova  wrote:

> Hi Pulasthi,
> Just to mirror what Cham said, it would be a non-starter to try to use a
> Beam IO source in another framework: to make them work, you'd have to build
> something that executes them with their expected protocol, and that would
> look an awful lot like a Beam runner.  It makes more sense to think about
> the problem in reverse:  how do you make a transform (or suite of
> transforms) that calls out to your Twister2 framework.  That's something
> that's hard to answer without knowing more about the framework, but
> splittable DoFns may be a good place to start.  But honestly, if I were you
> I'd just invest the time in porting to Beam.  We've recently gone through a
> similar process and it's well worth the effort.
>
> -chad
>
>
>
>
>
>
>
>
> On Wed, Sep 18, 2019 at 2:17 PM Chamikara Jayalath 
> wrote:
>
>> Hi Pulasthi,
>>
>> This might be possible but I don't know if anybody has done this. API of
>> Beam sources are no different from other Beam PTransforms and we highly
>> recommend hiding away various implementations of source framework related
>> abstractions in a composite transform [1]. So what you are looking for is a
>> way to use a Beam transform in a separate system.
>>
>> Thanks,
>> Cham
>>
>> [1]
>> https://beam.apache.org/contribute/ptransform-style-guide/#exposing-a-ptransform-vs-something-else
>>
>> On Wed, Sep 18, 2019 at 1:52 PM Pulasthi Supun Wickramasinghe <
>> pulasthi...@gmail.com> wrote:
>>
>>> Hi Dev's
>>>
>>> We have a big data processing framework named Twister2, and wanted to
>>> know if there is any way we could leverage the I/O Transforms that are
>>> built into Apache Beam externally. That is rather than using it in a Beam
>>> pipeline just use them as data sources in our project. Just wanted to check
>>> with the dev's if such an approach has been done by anyone before, so we
>>> could get some pointers on how this could be done. Any pointers in the
>>> right direction would be highly appreciated.
>>>
>>> Best Regards,
>>> Pulasthi
>>>
>>> --
>>> Pulasthi S. Wickramasinghe
>>> PhD Candidate  | Research Assistant
>>> School of Informatics and Computing | Digital Science Center
>>> Indiana University, Bloomington
>>> cell: 224-386-9035 <(224)%20386-9035>
>>>
>>

-- 
Pulasthi S. Wickramasinghe
PhD Candidate  | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
cell: 224-386-9035


Re: Flink Runner logging FAILED_TO_UNCOMPRESS

2019-09-18 Thread Ahmet Altay
I believe the flag was never relevant for PortableRunner. I might be wrong
as well. The flag affects a few bits in the core code and that is why the
solution cannot be by just setting the flag in Dataflow runner. It requires
some amount of clean up. I agree that it would be good to clean this up,
and I also agree to not rush this especially if this is not currently
impacting users.

Ahmet

On Wed, Sep 18, 2019 at 12:56 PM Maximilian Michels  wrote:

> > I disagree that this flag is obsolete. It is still serving a purpose for
> batch users using dataflow runner and that is decent chunk of beam python
> users.
>
> It is obsolete for the PortableRunner. If the Dataflow Runner needs this
> flag, couldn't we simply add it there? As far as I know Dataflow users
> do not use the PortableRunner. I might be wrong.
>
> As Kyle mentioned, he already fixed the issue. The fix is only present
> in the 2.16.0 release though. This flag has repeatedly caused friction
> for users and that's why I want to get rid of it.
>
> There is of course no need to rush this but it would be great to tackle
> this for the next release. Filed a JIRA:
> https://jira.apache.org/jira/browse/BEAM-8274
>
> Cheers,
> Max
>
> On 17.09.19 15:39, Kyle Weaver wrote:
> > Actually, the reported issues are already fixed on head. We're just
> > trying to prevent similar issues in the future.
> >
> > Kyle Weaver | Software Engineer | github.com/ibzib
> >  | kcwea...@google.com  kcwea...@google.com>
> >
> >
> > On Tue, Sep 17, 2019 at 3:38 PM Ahmet Altay  > > wrote:
> >
> >
> >
> > On Tue, Sep 17, 2019 at 2:26 PM Maximilian Michels  > > wrote:
> >
> >  > Is not this flag set automatically for the portable runner
> >
> > Yes, the flag is set automatically, but it has been broken
> > before and
> > likely will be again. It just adds additional complexity to
> > portable
> > Runners. There is no other portability API then the Fn API. This
> > flag
> > historically had its justification, but seems obsolete now.
> >
> >
> > I disagree that this flag is obsolete. It is still serving a purpose
> > for batch users using dataflow runner and that is decent chunk of
> > beam python users.
> >
> > I agree with switching the default. I would like to give enough time
> > to decouple the flag from the core code. (With a quick search I saw
> > two instances related to Read and Create.) Have time to test changes
> > and then switch the default.
> >
> >
> > An isinstance check might be smarter, but does not get rid of
> > the root
> > of the problem.
> >
> >
> > I might be wrong, IIUC, it will temporarily resolve the reported
> > issues. Is this not accurate?
> >
> >
> > -Max
> >
> > On 17.09.19 14:20, Ahmet Altay wrote:
> >  > Could you make that change and see if it would have addressed
> > the issue
> >  > here?
> >  >
> >  > On Tue, Sep 17, 2019 at 2:18 PM Kyle Weaver
> > mailto:kcwea...@google.com>
> >  > >>
> wrote:
> >  >
> >  > The flag is automatically set, but not in a smart way.
> Taking
> >  > another look at the code, a more resilient fix would be
> > to just
> >  > check if the runner isinstance of PortableRunner.
> >  >
> >  > Kyle Weaver | Software Engineer | github.com/ibzib
> > 
> >  >  | kcwea...@google.com
> > 
> >  > >
> >  >
> >  >
> >  > On Tue, Sep 17, 2019 at 2:14 PM Ahmet Altay
> > mailto:al...@google.com>
> >  > >>
> wrote:
> >  >
> >  > Is not this flag set automatically for the portable
> > runner here
> >  > [1] ?
> >  >
> >  > [1]
> >  >
> >
> https://github.com/apache/beam/blob/f0aa877b8703eed4143957b4cd212aa026238a6e/sdks/python/apache_beam/pipeline.py#L160
> >  >
> >  > On Tue, Sep 17, 2019 at 2:07 PM Robert Bradshaw
> >  > mailto:rober...@google.com>
> > >>
> wrote:
> >  >
> >  > On Tue, Sep 17, 2019 at 1:43 PM Thomas Weise
> > mailto:t...@apache.org>
> >  > >>
> > wrote:
> >  >  >
> >  >  > +1 for making --experiments=beam_fn_api
> default.
> >  >  >
> >  >  > Can the Dataflow runner driver just remove 

Re: using avro instead of json for BigQueryIO.Write

2019-09-18 Thread Pablo Estrada
Thanks for offering to work on this! It would be awesome to have it. I can
say that we don't have that for Python ATM.

On Mon, Sep 16, 2019 at 10:56 AM Steve Niemitz  wrote:

> Our experience has actually been that avro is more efficient than even
> parquet, but that might also be skewed from our datasets.
>
> I might try to take a crack at this, I found
> https://issues.apache.org/jira/browse/BEAM-2879 tracking it (which
> coincidentally references my thread from a couple years ago on the read
> side of this :) ).
>
> On Mon, Sep 16, 2019 at 1:38 PM Reuven Lax  wrote:
>
>> It's been talked about, but nobody's done anything. There as some
>> difficulties related to type conversion (json and avro don't support the
>> same types), but if those are overcome then an avro version would be much
>> more efficient. I believe Parquet files would be even more efficient if you
>> wanted to go that path, but there might be more code to write (as we
>> already have some code in the codebase to convert between TableRows and
>> Avro).
>>
>> Reuven
>>
>> On Mon, Sep 16, 2019 at 10:33 AM Steve Niemitz 
>> wrote:
>>
>>> Has anyone investigated using avro rather than json to load data into
>>> BigQuery using BigQueryIO (+ FILE_LOADS)?
>>>
>>> I'd be interested in enhancing it to support this, but I'm curious if
>>> there's any prior work here.
>>>
>>


Re: Prevent Shuffling on Writing Files

2019-09-18 Thread Reuven Lax
I believe that the Total shuffle data process counter counts the number of
bytes written to shuffle + the number of bytes read. So if you shuffle 1GB
of data, you should expect to see 2GB on the counter.

On Wed, Sep 18, 2019 at 2:39 PM Shannon Duncan 
wrote:

> Ok just ran the job on a small input and did not specify numShards. so
> it's literally just:
>
> .apply("WriteLines", TextIO.write().to(options.getOutput()));
>
> Output of map for join:
> [image: image.png]
>
> Details of Shuffle:
> [image: image.png]
>
> Reported Bytes Shuffled:
> [image: image.png]
>
>
> On Wed, Sep 18, 2019 at 4:24 PM Reuven Lax  wrote:
>
>>
>>
>> On Wed, Sep 18, 2019 at 2:12 PM Shannon Duncan <
>> joseph.dun...@liveramp.com> wrote:
>>
>>> I will attempt to do without sharding (though I believe we did do a run
>>> without shards and it incurred the extra shuffle costs).
>>>
>>
>> It shouldn't. There will be a shuffle, but that shuffle should contain a
>> small amount of data (essentially a list of filenames).
>>
>>>
>>> Pipeline is simple.
>>>
>>> The only shuffle that is explicitly defined is the shuffle after merging
>>> files together into a single PCollection (Flatten Transform).
>>>
>>> So it's a Read > Flatten > Shuffle > Map (Format) > Write. We expected
>>> to pay for shuffles on the middle shuffle but were surprised to see that
>>> the output data from the Flatten was quadrupled in the reflected shuffled
>>> GB shown in Dataflow. Which lead me down this path of finding things.
>>>
>>> [image: image.png]
>>>
>>> On Wed, Sep 18, 2019 at 4:08 PM Reuven Lax  wrote:
>>>
 In that case you should be able to leave sharding unspecified, and you
 won't incur the extra shuffle. Specifying explicit sharding is generally
 necessary only for streaming.

 On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan <
 joseph.dun...@liveramp.com> wrote:

> batch on dataflowRunner.
>
> On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax  wrote:
>
>> Are you using streaming or batch? Also which runner are you using?
>>
>> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <
>> joseph.dun...@liveramp.com> wrote:
>>
>>> So I followed up on why TextIO shuffles and dug into the code some.
>>> It is using the shards and getting all the values into a keyed group to
>>> write to a single file.
>>>
>>> However... I wonder if there is way to just take the records that
>>> are on a worker and write them out. Thus not needing a shard number and
>>> doing this. Closer to how hadoop handle's writes.
>>>
>>> Maybe just a regular pardo and on bundleSetup it creates a writer
>>> and processElement reuses that writter to write to the same file for all
>>> elements within a bundle?
>>>
>>> I feel like this goes beyond scope of simple user mailing list so
>>> I'm expanding it to dev as well.
>>> +dev 
>>>
>>> Finding a solution that prevents quadrupling shuffle costs when
>>> simply writing out a file is a necessity for large scale jobs that work
>>> with 100+ TB of data. If anyone has any ideas I'd love to hear them.
>>>
>>> Thanks,
>>> Shannon Duncan
>>>
>>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>>> joseph.dun...@liveramp.com> wrote:
>>>
 We have been using Beam for a bit now. However we just turned on
 the dataflow shuffle service and were very surprised that the shuffled 
 data
 amounts were quadruple the amounts we expected.

 Turns out that the file writing TextIO is doing shuffles within
 itself.

 Is there a way to prevent shuffling in the writing phase?

 Thanks,
 Shannon Duncan

>>>


Re: Prevent Shuffling on Writing Files

2019-09-18 Thread Shannon Duncan
Sorry missed a part of the map output for flatten:

[image: image.png]

However the shuffle does show only 29.32 GB going into it but the output of
Total Shuffled data is 58.66 GB

[image: image.png]

On Wed, Sep 18, 2019 at 4:39 PM Shannon Duncan 
wrote:

> Ok just ran the job on a small input and did not specify numShards. so
> it's literally just:
>
> .apply("WriteLines", TextIO.write().to(options.getOutput()));
>
> Output of map for join:
> [image: image.png]
>
> Details of Shuffle:
> [image: image.png]
>
> Reported Bytes Shuffled:
> [image: image.png]
>
>
> On Wed, Sep 18, 2019 at 4:24 PM Reuven Lax  wrote:
>
>>
>>
>> On Wed, Sep 18, 2019 at 2:12 PM Shannon Duncan <
>> joseph.dun...@liveramp.com> wrote:
>>
>>> I will attempt to do without sharding (though I believe we did do a run
>>> without shards and it incurred the extra shuffle costs).
>>>
>>
>> It shouldn't. There will be a shuffle, but that shuffle should contain a
>> small amount of data (essentially a list of filenames).
>>
>>>
>>> Pipeline is simple.
>>>
>>> The only shuffle that is explicitly defined is the shuffle after merging
>>> files together into a single PCollection (Flatten Transform).
>>>
>>> So it's a Read > Flatten > Shuffle > Map (Format) > Write. We expected
>>> to pay for shuffles on the middle shuffle but were surprised to see that
>>> the output data from the Flatten was quadrupled in the reflected shuffled
>>> GB shown in Dataflow. Which lead me down this path of finding things.
>>>
>>> [image: image.png]
>>>
>>> On Wed, Sep 18, 2019 at 4:08 PM Reuven Lax  wrote:
>>>
 In that case you should be able to leave sharding unspecified, and you
 won't incur the extra shuffle. Specifying explicit sharding is generally
 necessary only for streaming.

 On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan <
 joseph.dun...@liveramp.com> wrote:

> batch on dataflowRunner.
>
> On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax  wrote:
>
>> Are you using streaming or batch? Also which runner are you using?
>>
>> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <
>> joseph.dun...@liveramp.com> wrote:
>>
>>> So I followed up on why TextIO shuffles and dug into the code some.
>>> It is using the shards and getting all the values into a keyed group to
>>> write to a single file.
>>>
>>> However... I wonder if there is way to just take the records that
>>> are on a worker and write them out. Thus not needing a shard number and
>>> doing this. Closer to how hadoop handle's writes.
>>>
>>> Maybe just a regular pardo and on bundleSetup it creates a writer
>>> and processElement reuses that writter to write to the same file for all
>>> elements within a bundle?
>>>
>>> I feel like this goes beyond scope of simple user mailing list so
>>> I'm expanding it to dev as well.
>>> +dev 
>>>
>>> Finding a solution that prevents quadrupling shuffle costs when
>>> simply writing out a file is a necessity for large scale jobs that work
>>> with 100+ TB of data. If anyone has any ideas I'd love to hear them.
>>>
>>> Thanks,
>>> Shannon Duncan
>>>
>>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>>> joseph.dun...@liveramp.com> wrote:
>>>
 We have been using Beam for a bit now. However we just turned on
 the dataflow shuffle service and were very surprised that the shuffled 
 data
 amounts were quadruple the amounts we expected.

 Turns out that the file writing TextIO is doing shuffles within
 itself.

 Is there a way to prevent shuffling in the writing phase?

 Thanks,
 Shannon Duncan

>>>


Re: Prevent Shuffling on Writing Files

2019-09-18 Thread Shannon Duncan
Ok just ran the job on a small input and did not specify numShards. so it's
literally just:

.apply("WriteLines", TextIO.write().to(options.getOutput()));

Output of map for join:
[image: image.png]

Details of Shuffle:
[image: image.png]

Reported Bytes Shuffled:
[image: image.png]


On Wed, Sep 18, 2019 at 4:24 PM Reuven Lax  wrote:

>
>
> On Wed, Sep 18, 2019 at 2:12 PM Shannon Duncan 
> wrote:
>
>> I will attempt to do without sharding (though I believe we did do a run
>> without shards and it incurred the extra shuffle costs).
>>
>
> It shouldn't. There will be a shuffle, but that shuffle should contain a
> small amount of data (essentially a list of filenames).
>
>>
>> Pipeline is simple.
>>
>> The only shuffle that is explicitly defined is the shuffle after merging
>> files together into a single PCollection (Flatten Transform).
>>
>> So it's a Read > Flatten > Shuffle > Map (Format) > Write. We expected to
>> pay for shuffles on the middle shuffle but were surprised to see that the
>> output data from the Flatten was quadrupled in the reflected shuffled GB
>> shown in Dataflow. Which lead me down this path of finding things.
>>
>> [image: image.png]
>>
>> On Wed, Sep 18, 2019 at 4:08 PM Reuven Lax  wrote:
>>
>>> In that case you should be able to leave sharding unspecified, and you
>>> won't incur the extra shuffle. Specifying explicit sharding is generally
>>> necessary only for streaming.
>>>
>>> On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan <
>>> joseph.dun...@liveramp.com> wrote:
>>>
 batch on dataflowRunner.

 On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax  wrote:

> Are you using streaming or batch? Also which runner are you using?
>
> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <
> joseph.dun...@liveramp.com> wrote:
>
>> So I followed up on why TextIO shuffles and dug into the code some.
>> It is using the shards and getting all the values into a keyed group to
>> write to a single file.
>>
>> However... I wonder if there is way to just take the records that are
>> on a worker and write them out. Thus not needing a shard number and doing
>> this. Closer to how hadoop handle's writes.
>>
>> Maybe just a regular pardo and on bundleSetup it creates a writer and
>> processElement reuses that writter to write to the same file for all
>> elements within a bundle?
>>
>> I feel like this goes beyond scope of simple user mailing list so I'm
>> expanding it to dev as well.
>> +dev 
>>
>> Finding a solution that prevents quadrupling shuffle costs when
>> simply writing out a file is a necessity for large scale jobs that work
>> with 100+ TB of data. If anyone has any ideas I'd love to hear them.
>>
>> Thanks,
>> Shannon Duncan
>>
>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>> joseph.dun...@liveramp.com> wrote:
>>
>>> We have been using Beam for a bit now. However we just turned on the
>>> dataflow shuffle service and were very surprised that the shuffled data
>>> amounts were quadruple the amounts we expected.
>>>
>>> Turns out that the file writing TextIO is doing shuffles within
>>> itself.
>>>
>>> Is there a way to prevent shuffling in the writing phase?
>>>
>>> Thanks,
>>> Shannon Duncan
>>>
>>


Re: Using Beam Built-in I/O Transforms with an external framework.

2019-09-18 Thread Chad Dombrova
Hi Pulasthi,
Just to mirror what Cham said, it would be a non-starter to try to use a
Beam IO source in another framework: to make them work, you'd have to build
something that executes them with their expected protocol, and that would
look an awful lot like a Beam runner.  It makes more sense to think about
the problem in reverse:  how do you make a transform (or suite of
transforms) that calls out to your Twister2 framework.  That's something
that's hard to answer without knowing more about the framework, but
splittable DoFns may be a good place to start.  But honestly, if I were you
I'd just invest the time in porting to Beam.  We've recently gone through a
similar process and it's well worth the effort.

-chad








On Wed, Sep 18, 2019 at 2:17 PM Chamikara Jayalath 
wrote:

> Hi Pulasthi,
>
> This might be possible but I don't know if anybody has done this. API of
> Beam sources are no different from other Beam PTransforms and we highly
> recommend hiding away various implementations of source framework related
> abstractions in a composite transform [1]. So what you are looking for is a
> way to use a Beam transform in a separate system.
>
> Thanks,
> Cham
>
> [1]
> https://beam.apache.org/contribute/ptransform-style-guide/#exposing-a-ptransform-vs-something-else
>
> On Wed, Sep 18, 2019 at 1:52 PM Pulasthi Supun Wickramasinghe <
> pulasthi...@gmail.com> wrote:
>
>> Hi Dev's
>>
>> We have a big data processing framework named Twister2, and wanted to
>> know if there is any way we could leverage the I/O Transforms that are
>> built into Apache Beam externally. That is rather than using it in a Beam
>> pipeline just use them as data sources in our project. Just wanted to check
>> with the dev's if such an approach has been done by anyone before, so we
>> could get some pointers on how this could be done. Any pointers in the
>> right direction would be highly appreciated.
>>
>> Best Regards,
>> Pulasthi
>>
>> --
>> Pulasthi S. Wickramasinghe
>> PhD Candidate  | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>> cell: 224-386-9035 <(224)%20386-9035>
>>
>


Re: Prevent Shuffling on Writing Files

2019-09-18 Thread Reuven Lax
On Wed, Sep 18, 2019 at 2:12 PM Shannon Duncan 
wrote:

> I will attempt to do without sharding (though I believe we did do a run
> without shards and it incurred the extra shuffle costs).
>

It shouldn't. There will be a shuffle, but that shuffle should contain a
small amount of data (essentially a list of filenames).

>
> Pipeline is simple.
>
> The only shuffle that is explicitly defined is the shuffle after merging
> files together into a single PCollection (Flatten Transform).
>
> So it's a Read > Flatten > Shuffle > Map (Format) > Write. We expected to
> pay for shuffles on the middle shuffle but were surprised to see that the
> output data from the Flatten was quadrupled in the reflected shuffled GB
> shown in Dataflow. Which lead me down this path of finding things.
>
> [image: image.png]
>
> On Wed, Sep 18, 2019 at 4:08 PM Reuven Lax  wrote:
>
>> In that case you should be able to leave sharding unspecified, and you
>> won't incur the extra shuffle. Specifying explicit sharding is generally
>> necessary only for streaming.
>>
>> On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan <
>> joseph.dun...@liveramp.com> wrote:
>>
>>> batch on dataflowRunner.
>>>
>>> On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax  wrote:
>>>
 Are you using streaming or batch? Also which runner are you using?

 On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <
 joseph.dun...@liveramp.com> wrote:

> So I followed up on why TextIO shuffles and dug into the code some. It
> is using the shards and getting all the values into a keyed group to write
> to a single file.
>
> However... I wonder if there is way to just take the records that are
> on a worker and write them out. Thus not needing a shard number and doing
> this. Closer to how hadoop handle's writes.
>
> Maybe just a regular pardo and on bundleSetup it creates a writer and
> processElement reuses that writter to write to the same file for all
> elements within a bundle?
>
> I feel like this goes beyond scope of simple user mailing list so I'm
> expanding it to dev as well.
> +dev 
>
> Finding a solution that prevents quadrupling shuffle costs when simply
> writing out a file is a necessity for large scale jobs that work with 100+
> TB of data. If anyone has any ideas I'd love to hear them.
>
> Thanks,
> Shannon Duncan
>
> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
> joseph.dun...@liveramp.com> wrote:
>
>> We have been using Beam for a bit now. However we just turned on the
>> dataflow shuffle service and were very surprised that the shuffled data
>> amounts were quadruple the amounts we expected.
>>
>> Turns out that the file writing TextIO is doing shuffles within
>> itself.
>>
>> Is there a way to prevent shuffling in the writing phase?
>>
>> Thanks,
>> Shannon Duncan
>>
>


Re: Using Beam Built-in I/O Transforms with an external framework.

2019-09-18 Thread Chamikara Jayalath
Hi Pulasthi,

This might be possible but I don't know if anybody has done this. API of
Beam sources are no different from other Beam PTransforms and we highly
recommend hiding away various implementations of source framework related
abstractions in a composite transform [1]. So what you are looking for is a
way to use a Beam transform in a separate system.

Thanks,
Cham

[1]
https://beam.apache.org/contribute/ptransform-style-guide/#exposing-a-ptransform-vs-something-else

On Wed, Sep 18, 2019 at 1:52 PM Pulasthi Supun Wickramasinghe <
pulasthi...@gmail.com> wrote:

> Hi Dev's
>
> We have a big data processing framework named Twister2, and wanted to know
> if there is any way we could leverage the I/O Transforms that are built
> into Apache Beam externally. That is rather than using it in a Beam
> pipeline just use them as data sources in our project. Just wanted to check
> with the dev's if such an approach has been done by anyone before, so we
> could get some pointers on how this could be done. Any pointers in the
> right direction would be highly appreciated.
>
> Best Regards,
> Pulasthi
>
> --
> Pulasthi S. Wickramasinghe
> PhD Candidate  | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> cell: 224-386-9035 <(224)%20386-9035>
>


Re: Prevent Shuffling on Writing Files

2019-09-18 Thread Shannon Duncan
I will attempt to do without sharding (though I believe we did do a run
without shards and it incurred the extra shuffle costs).

Pipeline is simple.

The only shuffle that is explicitly defined is the shuffle after merging
files together into a single PCollection (Flatten Transform).

So it's a Read > Flatten > Shuffle > Map (Format) > Write. We expected to
pay for shuffles on the middle shuffle but were surprised to see that the
output data from the Flatten was quadrupled in the reflected shuffled GB
shown in Dataflow. Which lead me down this path of finding things.

[image: image.png]

On Wed, Sep 18, 2019 at 4:08 PM Reuven Lax  wrote:

> In that case you should be able to leave sharding unspecified, and you
> won't incur the extra shuffle. Specifying explicit sharding is generally
> necessary only for streaming.
>
> On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan 
> wrote:
>
>> batch on dataflowRunner.
>>
>> On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax  wrote:
>>
>>> Are you using streaming or batch? Also which runner are you using?
>>>
>>> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <
>>> joseph.dun...@liveramp.com> wrote:
>>>
 So I followed up on why TextIO shuffles and dug into the code some. It
 is using the shards and getting all the values into a keyed group to write
 to a single file.

 However... I wonder if there is way to just take the records that are
 on a worker and write them out. Thus not needing a shard number and doing
 this. Closer to how hadoop handle's writes.

 Maybe just a regular pardo and on bundleSetup it creates a writer and
 processElement reuses that writter to write to the same file for all
 elements within a bundle?

 I feel like this goes beyond scope of simple user mailing list so I'm
 expanding it to dev as well.
 +dev 

 Finding a solution that prevents quadrupling shuffle costs when simply
 writing out a file is a necessity for large scale jobs that work with 100+
 TB of data. If anyone has any ideas I'd love to hear them.

 Thanks,
 Shannon Duncan

 On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
 joseph.dun...@liveramp.com> wrote:

> We have been using Beam for a bit now. However we just turned on the
> dataflow shuffle service and were very surprised that the shuffled data
> amounts were quadruple the amounts we expected.
>
> Turns out that the file writing TextIO is doing shuffles within
> itself.
>
> Is there a way to prevent shuffling in the writing phase?
>
> Thanks,
> Shannon Duncan
>



Re: Prevent Shuffling on Writing Files

2019-09-18 Thread Reuven Lax
In that case you should be able to leave sharding unspecified, and you
won't incur the extra shuffle. Specifying explicit sharding is generally
necessary only for streaming.

On Wed, Sep 18, 2019 at 2:06 PM Shannon Duncan 
wrote:

> batch on dataflowRunner.
>
> On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax  wrote:
>
>> Are you using streaming or batch? Also which runner are you using?
>>
>> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan <
>> joseph.dun...@liveramp.com> wrote:
>>
>>> So I followed up on why TextIO shuffles and dug into the code some. It
>>> is using the shards and getting all the values into a keyed group to write
>>> to a single file.
>>>
>>> However... I wonder if there is way to just take the records that are on
>>> a worker and write them out. Thus not needing a shard number and doing
>>> this. Closer to how hadoop handle's writes.
>>>
>>> Maybe just a regular pardo and on bundleSetup it creates a writer and
>>> processElement reuses that writter to write to the same file for all
>>> elements within a bundle?
>>>
>>> I feel like this goes beyond scope of simple user mailing list so I'm
>>> expanding it to dev as well.
>>> +dev 
>>>
>>> Finding a solution that prevents quadrupling shuffle costs when simply
>>> writing out a file is a necessity for large scale jobs that work with 100+
>>> TB of data. If anyone has any ideas I'd love to hear them.
>>>
>>> Thanks,
>>> Shannon Duncan
>>>
>>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>>> joseph.dun...@liveramp.com> wrote:
>>>
 We have been using Beam for a bit now. However we just turned on the
 dataflow shuffle service and were very surprised that the shuffled data
 amounts were quadruple the amounts we expected.

 Turns out that the file writing TextIO is doing shuffles within itself.

 Is there a way to prevent shuffling in the writing phase?

 Thanks,
 Shannon Duncan

>>>


Re: Prevent Shuffling on Writing Files

2019-09-18 Thread Shannon Duncan
batch on dataflowRunner.

On Wed, Sep 18, 2019 at 4:05 PM Reuven Lax  wrote:

> Are you using streaming or batch? Also which runner are you using?
>
> On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan 
> wrote:
>
>> So I followed up on why TextIO shuffles and dug into the code some. It is
>> using the shards and getting all the values into a keyed group to write to
>> a single file.
>>
>> However... I wonder if there is way to just take the records that are on
>> a worker and write them out. Thus not needing a shard number and doing
>> this. Closer to how hadoop handle's writes.
>>
>> Maybe just a regular pardo and on bundleSetup it creates a writer and
>> processElement reuses that writter to write to the same file for all
>> elements within a bundle?
>>
>> I feel like this goes beyond scope of simple user mailing list so I'm
>> expanding it to dev as well.
>> +dev 
>>
>> Finding a solution that prevents quadrupling shuffle costs when simply
>> writing out a file is a necessity for large scale jobs that work with 100+
>> TB of data. If anyone has any ideas I'd love to hear them.
>>
>> Thanks,
>> Shannon Duncan
>>
>> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan <
>> joseph.dun...@liveramp.com> wrote:
>>
>>> We have been using Beam for a bit now. However we just turned on the
>>> dataflow shuffle service and were very surprised that the shuffled data
>>> amounts were quadruple the amounts we expected.
>>>
>>> Turns out that the file writing TextIO is doing shuffles within itself.
>>>
>>> Is there a way to prevent shuffling in the writing phase?
>>>
>>> Thanks,
>>> Shannon Duncan
>>>
>>


Re: Prevent Shuffling on Writing Files

2019-09-18 Thread Chamikara Jayalath
Are you specifying the number of shards to write to:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java#L859

If so, this will incur an additional shuffle to re-distribute data written
by all workers into the given number of shards before writing.

In addition to that, I think we also run Reshuffle transforms on the set of
files to break fusion when finalizing files but that cost should not be
that significant.

Probably posting a sketch of your pipeline will be helpful.

Thanks,
Cham

On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan 
wrote:

> So I followed up on why TextIO shuffles and dug into the code some. It is
> using the shards and getting all the values into a keyed group to write to
> a single file.
>
> However... I wonder if there is way to just take the records that are on a
> worker and write them out. Thus not needing a shard number and doing this.
> Closer to how hadoop handle's writes.
>
> Maybe just a regular pardo and on bundleSetup it creates a writer and
> processElement reuses that writter to write to the same file for all
> elements within a bundle?
>
> I feel like this goes beyond scope of simple user mailing list so I'm
> expanding it to dev as well.
> +dev 
>
> Finding a solution that prevents quadrupling shuffle costs when simply
> writing out a file is a necessity for large scale jobs that work with 100+
> TB of data. If anyone has any ideas I'd love to hear them.
>
> Thanks,
> Shannon Duncan
>
> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan 
> wrote:
>
>> We have been using Beam for a bit now. However we just turned on the
>> dataflow shuffle service and were very surprised that the shuffled data
>> amounts were quadruple the amounts we expected.
>>
>> Turns out that the file writing TextIO is doing shuffles within itself.
>>
>> Is there a way to prevent shuffling in the writing phase?
>>
>> Thanks,
>> Shannon Duncan
>>
>


Re: Prevent Shuffling on Writing Files

2019-09-18 Thread Reuven Lax
Are you using streaming or batch? Also which runner are you using?

On Wed, Sep 18, 2019 at 1:57 PM Shannon Duncan 
wrote:

> So I followed up on why TextIO shuffles and dug into the code some. It is
> using the shards and getting all the values into a keyed group to write to
> a single file.
>
> However... I wonder if there is way to just take the records that are on a
> worker and write them out. Thus not needing a shard number and doing this.
> Closer to how hadoop handle's writes.
>
> Maybe just a regular pardo and on bundleSetup it creates a writer and
> processElement reuses that writter to write to the same file for all
> elements within a bundle?
>
> I feel like this goes beyond scope of simple user mailing list so I'm
> expanding it to dev as well.
> +dev 
>
> Finding a solution that prevents quadrupling shuffle costs when simply
> writing out a file is a necessity for large scale jobs that work with 100+
> TB of data. If anyone has any ideas I'd love to hear them.
>
> Thanks,
> Shannon Duncan
>
> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan 
> wrote:
>
>> We have been using Beam for a bit now. However we just turned on the
>> dataflow shuffle service and were very surprised that the shuffled data
>> amounts were quadruple the amounts we expected.
>>
>> Turns out that the file writing TextIO is doing shuffles within itself.
>>
>> Is there a way to prevent shuffling in the writing phase?
>>
>> Thanks,
>> Shannon Duncan
>>
>


Re: Prevent Shuffling on Writing Files

2019-09-18 Thread Jeff Klukas
What you propose with a writer per bundle is definitely possible, but I
expect the blocker is that in most cases the runner has control of bundle
sizes and there's nothing exposed to the user to control that. I've wanted
to do similar, but found average bundle sizes in my case on Dataflow to be
so small that it wasn't feasible to write out a separate file/object per
bundle.

On Wed, Sep 18, 2019 at 4:57 PM Shannon Duncan 
wrote:

> So I followed up on why TextIO shuffles and dug into the code some. It is
> using the shards and getting all the values into a keyed group to write to
> a single file.
>
> However... I wonder if there is way to just take the records that are on a
> worker and write them out. Thus not needing a shard number and doing this.
> Closer to how hadoop handle's writes.
>
> Maybe just a regular pardo and on bundleSetup it creates a writer and
> processElement reuses that writter to write to the same file for all
> elements within a bundle?
>
> I feel like this goes beyond scope of simple user mailing list so I'm
> expanding it to dev as well.
> +dev 
>
> Finding a solution that prevents quadrupling shuffle costs when simply
> writing out a file is a necessity for large scale jobs that work with 100+
> TB of data. If anyone has any ideas I'd love to hear them.
>
> Thanks,
> Shannon Duncan
>
> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan 
> wrote:
>
>> We have been using Beam for a bit now. However we just turned on the
>> dataflow shuffle service and were very surprised that the shuffled data
>> amounts were quadruple the amounts we expected.
>>
>> Turns out that the file writing TextIO is doing shuffles within itself.
>>
>> Is there a way to prevent shuffling in the writing phase?
>>
>> Thanks,
>> Shannon Duncan
>>
>


Re: Prevent Shuffling on Writing Files

2019-09-18 Thread Shannon Duncan
So I followed up on why TextIO shuffles and dug into the code some. It is
using the shards and getting all the values into a keyed group to write to
a single file.

However... I wonder if there is way to just take the records that are on a
worker and write them out. Thus not needing a shard number and doing this.
Closer to how hadoop handle's writes.

Maybe just a regular pardo and on bundleSetup it creates a writer and
processElement reuses that writter to write to the same file for all
elements within a bundle?

I feel like this goes beyond scope of simple user mailing list so I'm
expanding it to dev as well.
+dev 

Finding a solution that prevents quadrupling shuffle costs when simply
writing out a file is a necessity for large scale jobs that work with 100+
TB of data. If anyone has any ideas I'd love to hear them.

Thanks,
Shannon Duncan

On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan 
wrote:

> We have been using Beam for a bit now. However we just turned on the
> dataflow shuffle service and were very surprised that the shuffled data
> amounts were quadruple the amounts we expected.
>
> Turns out that the file writing TextIO is doing shuffles within itself.
>
> Is there a way to prevent shuffling in the writing phase?
>
> Thanks,
> Shannon Duncan
>


Using Beam Built-in I/O Transforms with an external framework.

2019-09-18 Thread Pulasthi Supun Wickramasinghe
Hi Dev's

We have a big data processing framework named Twister2, and wanted to know
if there is any way we could leverage the I/O Transforms that are built
into Apache Beam externally. That is rather than using it in a Beam
pipeline just use them as data sources in our project. Just wanted to check
with the dev's if such an approach has been done by anyone before, so we
could get some pointers on how this could be done. Any pointers in the
right direction would be highly appreciated.

Best Regards,
Pulasthi

-- 
Pulasthi S. Wickramasinghe
PhD Candidate  | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
cell: 224-386-9035


Re: Beam Summit Videos in youtube

2019-09-18 Thread Maximilian Michels

Hi Rahul,

The Beam Summit committee is working on this at the moment. Stay tuned.

Thanks,
Max

On 18.09.19 11:39, rahul patwari wrote:

Hi,

The videos of Beam Summit that has happened recently have disappeared 
from YouTube Apache Beam channel.


Is uploading the videos a WIP?

Thanks,
Rahul




Re: Flink Runner logging FAILED_TO_UNCOMPRESS

2019-09-18 Thread Maximilian Michels

I disagree that this flag is obsolete. It is still serving a purpose for batch 
users using dataflow runner and that is decent chunk of beam python users.


It is obsolete for the PortableRunner. If the Dataflow Runner needs this 
flag, couldn't we simply add it there? As far as I know Dataflow users 
do not use the PortableRunner. I might be wrong.


As Kyle mentioned, he already fixed the issue. The fix is only present 
in the 2.16.0 release though. This flag has repeatedly caused friction 
for users and that's why I want to get rid of it.


There is of course no need to rush this but it would be great to tackle 
this for the next release. Filed a JIRA: 
https://jira.apache.org/jira/browse/BEAM-8274


Cheers,
Max

On 17.09.19 15:39, Kyle Weaver wrote:
Actually, the reported issues are already fixed on head. We're just 
trying to prevent similar issues in the future.


Kyle Weaver | Software Engineer | github.com/ibzib 
 | kcwea...@google.com 



On Tue, Sep 17, 2019 at 3:38 PM Ahmet Altay > wrote:




On Tue, Sep 17, 2019 at 2:26 PM Maximilian Michels mailto:m...@apache.org>> wrote:

 > Is not this flag set automatically for the portable runner

Yes, the flag is set automatically, but it has been broken
before and
likely will be again. It just adds additional complexity to
portable
Runners. There is no other portability API then the Fn API. This
flag
historically had its justification, but seems obsolete now.


I disagree that this flag is obsolete. It is still serving a purpose
for batch users using dataflow runner and that is decent chunk of
beam python users.

I agree with switching the default. I would like to give enough time
to decouple the flag from the core code. (With a quick search I saw
two instances related to Read and Create.) Have time to test changes
and then switch the default.


An isinstance check might be smarter, but does not get rid of
the root
of the problem.


I might be wrong, IIUC, it will temporarily resolve the reported
issues. Is this not accurate?


-Max

On 17.09.19 14:20, Ahmet Altay wrote:
 > Could you make that change and see if it would have addressed
the issue
 > here?
 >
 > On Tue, Sep 17, 2019 at 2:18 PM Kyle Weaver
mailto:kcwea...@google.com>
 > >> wrote:
 >
 >     The flag is automatically set, but not in a smart way. Taking
 >     another look at the code, a more resilient fix would be
to just
 >     check if the runner isinstance of PortableRunner.
 >
 >     Kyle Weaver | Software Engineer | github.com/ibzib

 >      | kcwea...@google.com

 >     >
 >
 >
 >     On Tue, Sep 17, 2019 at 2:14 PM Ahmet Altay
mailto:al...@google.com>
 >     >> wrote:
 >
 >         Is not this flag set automatically for the portable
runner here
 >         [1] ?
 >
 >         [1]
 >

https://github.com/apache/beam/blob/f0aa877b8703eed4143957b4cd212aa026238a6e/sdks/python/apache_beam/pipeline.py#L160
 >
 >         On Tue, Sep 17, 2019 at 2:07 PM Robert Bradshaw
 >         mailto:rober...@google.com>
>> wrote:
 >
 >             On Tue, Sep 17, 2019 at 1:43 PM Thomas Weise
mailto:t...@apache.org>
 >             >>
wrote:
 >              >
 >              > +1 for making --experiments=beam_fn_api default.
 >              >
 >              > Can the Dataflow runner driver just remove the
setting if
 >             it is not compatible?
 >
 >             The tricky bit would be undoing the differences
in graph
 >             construction
 >             due to this flag flip. But I would be in favor of
changing
 >             the default
 >             (probably just removing the flag) and moving the
 >             non-portability parts
 >             into the dataflow runner itself. (It looks like
the key
 >             differences
 >             here are for the Create and Read transforms.)
 >
 >              > On Tue, Sep 17, 2019 at 11:33 AM Maximilian
Michels
 >             mailto:m...@apache.org>


Re: Pointers on Contributing to Structured Streaming Spark Runner

2019-09-18 Thread rahul patwari
Hi,

I would love to join the call.
Can you also share the meeting invitation with me?

Thanks,
Rahul

On Wed 18 Sep, 2019, 11:48 PM Xinyu Liu,  wrote:

> Alexey and Etienne: I'm very happy to join the sync-up meeting. Please
> forward the meeting info to me. I am based in California, US and hopefully
> the time will work :).
>
> Thanks,
> Xinyu
>
> On Wed, Sep 18, 2019 at 6:39 AM Etienne Chauchot 
> wrote:
>
>> Hi Xinyu,
>>
>> Thanks for offering help ! My comments are inline:
>>
>> Le vendredi 13 septembre 2019 à 12:16 -0700, Xinyu Liu a écrit :
>>
>> Hi, Etienne,
>>
>> The slides are very informative! Thanks for sharing the details about how
>> the Beam API are mapped into Spark Structural Streaming.
>>
>>
>> Thanks !
>>
>> We (LinkedIn) are also interested in trying the new SparkRunner to run
>> Beam pipeine in batch, and contribute to it too. From my understanding,
>> seems the functionality on batch side is mostly complete and covers quite a
>> large percentage of the tests (a few missing pieces like state and timer in
>> ParDo and SDF).
>>
>>
>> Correct, it passes 89% of the tests, but there is more than SDF, state
>> and timer missing, there is also ongoing encoders work that I would like to
>> commit/push before merging.
>>
>> If so, is it possible to merge the new runner sooner into master so it's
>> much easier for us to pull it in (we have an internal fork) and contribute
>> back?
>>
>>
>> Sure, see my other mail on this thread. As Alexey mentioned, please join
>> the sync meeting we have, the more the merrier !
>>
>>
>> Also curious about the scheme part in the runner. Seems we can leverage
>> the schema-aware work in PCollection and translate from Beam schema to
>> Spark, so it can be optimized in the planner layer. It will be great to
>> hear back your plans on that.
>>
>>
>> Well, it is not designed yet but, if you remember my talk, we need to
>> store beam windowing information with the data itself, so ending up having
>> a dataset . One lead that was discussed is to store it as a
>> Spark schema such as this:
>>
>> 1. field1: binary data for beam windowing information (cannot be mapped
>> to fields because beam windowing info is complex structure)
>>
>> 2. fields of data as defined in the Beam schema if there is one
>>
>>
>> Congrats on this great work!
>>
>> Thanks !
>>
>> Best,
>>
>> Etienne
>>
>> Thanks,
>> Xinyu
>>
>> On Wed, Sep 11, 2019 at 6:02 PM Rui Wang  wrote:
>>
>> Hello Etienne,
>>
>> Your slide mentioned that streaming mode development is blocked because
>> Spark lacks supporting multiple-aggregations in its streaming mode but
>> design is ongoing. Do you have a link or something else to their design
>> discussion/doc?
>>
>>
>> -Rui
>>
>> On Wed, Sep 11, 2019 at 5:10 PM Etienne Chauchot 
>> wrote:
>>
>> Hi Rahul,
>> Sure, and great ! Thanks for proposing !
>> If you want details, here is the presentation I did 30 mins ago at the
>> apachecon. You will find the video on youtube shortly but in the meantime,
>> here is my presentation slides.
>>
>> And here is the structured streaming branch. I'll be happy to review your
>> PRs, thanks !
>>
>> 
>> https://github.com/apache/beam/tree/spark-runner_structured-streaming
>>
>> Best
>> Etienne
>>
>> Le mercredi 11 septembre 2019 à 16:37 +0530, rahul patwari a écrit :
>>
>> Hi Etienne,
>>
>> I came to know about the work going on in Structured Streaming Spark
>> Runner from Apache Beam Wiki - Works in Progress.
>> I have contributed to BeamSql earlier. And I am working on supporting
>> PCollectionView in BeamSql.
>>
>> I would love to understand the Runner's side of Apache Beam and
>> contribute to the Structured Streaming Spark Runner.
>>
>> Can you please point me in the right direction?
>>
>> Thanks,
>> Rahul
>>
>>


Beam Summit Videos in youtube

2019-09-18 Thread rahul patwari
Hi,

The videos of Beam Summit that has happened recently have disappeared from
YouTube Apache Beam channel.

Is uploading the videos a WIP?

Thanks,
Rahul


Re: Pointers on Contributing to Structured Streaming Spark Runner

2019-09-18 Thread Xinyu Liu
Alexey and Etienne: I'm very happy to join the sync-up meeting. Please
forward the meeting info to me. I am based in California, US and hopefully
the time will work :).

Thanks,
Xinyu

On Wed, Sep 18, 2019 at 6:39 AM Etienne Chauchot 
wrote:

> Hi Xinyu,
>
> Thanks for offering help ! My comments are inline:
>
> Le vendredi 13 septembre 2019 à 12:16 -0700, Xinyu Liu a écrit :
>
> Hi, Etienne,
>
> The slides are very informative! Thanks for sharing the details about how
> the Beam API are mapped into Spark Structural Streaming.
>
>
> Thanks !
>
> We (LinkedIn) are also interested in trying the new SparkRunner to run
> Beam pipeine in batch, and contribute to it too. From my understanding,
> seems the functionality on batch side is mostly complete and covers quite a
> large percentage of the tests (a few missing pieces like state and timer in
> ParDo and SDF).
>
>
> Correct, it passes 89% of the tests, but there is more than SDF, state and
> timer missing, there is also ongoing encoders work that I would like to
> commit/push before merging.
>
> If so, is it possible to merge the new runner sooner into master so it's
> much easier for us to pull it in (we have an internal fork) and contribute
> back?
>
>
> Sure, see my other mail on this thread. As Alexey mentioned, please join
> the sync meeting we have, the more the merrier !
>
>
> Also curious about the scheme part in the runner. Seems we can leverage
> the schema-aware work in PCollection and translate from Beam schema to
> Spark, so it can be optimized in the planner layer. It will be great to
> hear back your plans on that.
>
>
> Well, it is not designed yet but, if you remember my talk, we need to
> store beam windowing information with the data itself, so ending up having
> a dataset . One lead that was discussed is to store it as a
> Spark schema such as this:
>
> 1. field1: binary data for beam windowing information (cannot be mapped to
> fields because beam windowing info is complex structure)
>
> 2. fields of data as defined in the Beam schema if there is one
>
>
> Congrats on this great work!
>
> Thanks !
>
> Best,
>
> Etienne
>
> Thanks,
> Xinyu
>
> On Wed, Sep 11, 2019 at 6:02 PM Rui Wang  wrote:
>
> Hello Etienne,
>
> Your slide mentioned that streaming mode development is blocked because
> Spark lacks supporting multiple-aggregations in its streaming mode but
> design is ongoing. Do you have a link or something else to their design
> discussion/doc?
>
>
> -Rui
>
> On Wed, Sep 11, 2019 at 5:10 PM Etienne Chauchot 
> wrote:
>
> Hi Rahul,
> Sure, and great ! Thanks for proposing !
> If you want details, here is the presentation I did 30 mins ago at the
> apachecon. You will find the video on youtube shortly but in the meantime,
> here is my presentation slides.
>
> And here is the structured streaming branch. I'll be happy to review your
> PRs, thanks !
>
> 
> https://github.com/apache/beam/tree/spark-runner_structured-streaming
>
> Best
> Etienne
>
> Le mercredi 11 septembre 2019 à 16:37 +0530, rahul patwari a écrit :
>
> Hi Etienne,
>
> I came to know about the work going on in Structured Streaming Spark
> Runner from Apache Beam Wiki - Works in Progress.
> I have contributed to BeamSql earlier. And I am working on supporting
> PCollectionView in BeamSql.
>
> I would love to understand the Runner's side of Apache Beam and
> contribute to the Structured Streaming Spark Runner.
>
> Can you please point me in the right direction?
>
> Thanks,
> Rahul
>
>


Re: Pointers on Contributing to Structured Streaming Spark Runner

2019-09-18 Thread Etienne Chauchot
Hi Xinyu,
Thanks for offering help ! My comments are inline:
Le vendredi 13 septembre 2019 à 12:16 -0700, Xinyu Liu a écrit :
> Hi, Etienne,
> The slides are very informative! Thanks for sharing the details about how the 
> Beam API are mapped into Spark
> Structural Streaming. 

Thanks !
> We (LinkedIn) are also interested in trying the new SparkRunner to run Beam 
> pipeine in batch, and contribute to it
> too. From my understanding, seems the functionality on batch side is mostly 
> complete and covers quite a large
> percentage of the tests (a few missing pieces like state and timer in ParDo 
> and SDF). 

Correct, it passes 89% of the tests, but there is more than SDF, state and 
timer missing, there is also ongoing encoders
work that I would like to commit/push before merging.
> If so, is it possible to merge the new runner sooner into master so it's much 
> easier for us to pull it in (we have an
> internal fork) and contribute back?

Sure, see my other mail on this thread. As Alexey mentioned, please join the 
sync meeting we have, the more the merrier
!
> Also curious about the scheme part in the runner. Seems we can leverage the 
> schema-aware work in PCollection and
> translate from Beam schema to Spark, so it can be optimized in the planner 
> layer. It will be great to hear back your
> plans on that.

Well, it is not designed yet but, if you remember my talk, we need to store 
beam windowing information with the data
itself, so ending up having a dataset . One lead that was 
discussed is to store it as a Spark schema such
as this:
1. field1: binary data for beam windowing information (cannot be mapped to 
fields  because beam windowing info is
complex structure)
2. fields of data as defined in the Beam schema if there is one 

> Congrats on this great work!
Thanks !
Best,
Etienne
> Thanks,
> Xinyu
> On Wed, Sep 11, 2019 at 6:02 PM Rui Wang  wrote:
> > Hello Etienne,
> > Your slide mentioned that streaming mode development is blocked because 
> > Spark lacks supporting multiple-aggregations 
> > in its streaming mode but design is ongoing. Do you have a link or 
> > something else to their design discussion/doc?
> > 
> > 
> > -Rui  
> > On Wed, Sep 11, 2019 at 5:10 PM Etienne Chauchot  
> > wrote:
> > > Hi Rahul,Sure, and great ! Thanks for proposing !If you want details, 
> > > here is the presentation I did 30 mins ago
> > > at the apachecon. You will find the video on youtube shortly but in the 
> > > meantime, here is my presentation slides.
> > > And here is the structured streaming branch. I'll be happy to review your 
> > > PRs, thanks ! 
> > > https://github.com/apache/beam/tree/spark-runner_structured-streaming
> > > BestEtienne
> > > Le mercredi 11 septembre 2019 à 16:37 +0530, rahul patwari a écrit :
> > > > Hi Etienne,
> > > > 
> > > > I came to know about the work going on in Structured Streaming Spark 
> > > > Runner from Apache Beam Wiki - Works in
> > > > Progress.
> > > > I have contributed to BeamSql earlier. And I am working on supporting 
> > > > PCollectionView in BeamSql.
> > > > 
> > > > I would love to understand the Runner's side of Apache Beam and 
> > > > contribute to the Structured Streaming Spark
> > > > Runner.
> > > > 
> > > > Can you please point me in the right direction?
> > > > 
> > > > Thanks,
> > > > Rahul


Re: Pointers on Contributing to Structured Streaming Spark Runner

2019-09-18 Thread Etienne Chauchot
Hi Rui,Thanks for proposing to contribute to this new runner !
Here are the pointers:- SS runner branch: 
https://github.com/apache/beam/tree/spark-runner_structured-streaming- spark
design doc for multiple watermarks support: 
https://docs.google.com/document/d/1IAH9UQJPUiUCLd7H6dazRK2k1szDX38SnM6GVNZYvUo/edit#t#
  . There is also a good
discussion in this Spark PR branch : https://github.com/apache/spark/pull/23576
As Alexey mentioned in this thread, the SS runner feature branch will be merged 
into master when the runner is in good
shape. I think we will not wait for the streaming part as it requires a deep 
change in the spark core + impl of the
streaming part of the Beam runner, so it would take too long. IMHO we need to 
get batch mode of the new runner in a
stable state (encoders ongoing work, fix bad perf of the 2 nexmark queries, 
...) before merging.
Best,Etienne
Le mercredi 11 septembre 2019 à 18:02 -0700, Rui Wang a écrit :
> Hello Etienne,
> Your slide mentioned that streaming mode development is blocked because Spark 
> lacks supporting multiple-aggregations
> in its streaming mode but design is ongoing. Do you have a link or something 
> else to their design discussion/doc?
> 
> 
> -Rui  
> On Wed, Sep 11, 2019 at 5:10 PM Etienne Chauchot  wrote:
> > Hi Rahul,Sure, and great ! Thanks for proposing !If you want details, here 
> > is the presentation I did 30 mins ago at
> > the apachecon. You will find the video on youtube shortly but in the 
> > meantime, here is my presentation slides.
> > And here is the structured streaming branch. I'll be happy to review your 
> > PRs, thanks ! 
> > https://github.com/apache/beam/tree/spark-runner_structured-streaming
> > BestEtienne
> > Le mercredi 11 septembre 2019 à 16:37 +0530, rahul patwari a écrit :
> > > Hi Etienne,
> > > 
> > > I came to know about the work going on in Structured Streaming Spark 
> > > Runner from Apache Beam Wiki - Works in
> > > Progress.
> > > I have contributed to BeamSql earlier. And I am working on supporting 
> > > PCollectionView in BeamSql.
> > > 
> > > I would love to understand the Runner's side of Apache Beam and 
> > > contribute to the Structured Streaming Spark
> > > Runner.
> > > 
> > > Can you please point me in the right direction?
> > > 
> > > Thanks,
> > > Rahul