Re: Do I have any control over bundle sizes?

2019-04-04 Thread Ismaël Mejía
It seems you can 'hack' it with the State API. See the discussion on
this ticket:
https://issues.apache.org/jira/browse/BEAM-6886

On Thu, Apr 4, 2019 at 9:42 PM Jeff Klukas  wrote:
>
> As far as I can tell, Beam expects runners to have full control over 
> separation of individual elements into bundles and this is something users 
> have no control over. Is that true? Or are there any ways that I might exert 
> some influence over bundle sizes?
>
> My main interest at the moment is investigating lighter-weight alternatives 
> to FileIO for a simple but high-throughput Dataflow job that batches up 
> messages from Pub/Sub and sinks them to GCS. I'm imagining a ParDo that 
> buffers incoming messages and then writes them all as an object to GCS in a 
> @FinalizeBundle method, avoiding the multiple GroupByKey operations needed 
> for writing sharded output from FileIO.
>
> The problem is that bundles in practice look to be far too small to make this 
> feasible. I deployed a 4 node test job that simply reads ~30k messages per 
> second from Pub/Sub and passes them to a transform that publishes some 
> metrics about the bundles passing through. I found a mean bundle size of ~500 
> elements corresponding to ~10 MB of data, which is too small for the proposed 
> approach to be feasible. Are there any tricks I could use to coerce Dataflow 
> to increase the size of bundles?
>
> I realize this is basically an abuse of the Beam programming model, but the 
> alternative I'm looking at is having to write a custom application using the 
> google-cloud APIs and deploying it on Kubernetes.


Do I have any control over bundle sizes?

2019-04-04 Thread Jeff Klukas
As far as I can tell, Beam expects runners to have full control over
separation of individual elements into bundles and this is something users
have no control over. Is that true? Or are there any ways that I might
exert some influence over bundle sizes?

My main interest at the moment is investigating lighter-weight alternatives
to FileIO for a simple but high-throughput Dataflow job that batches up
messages from Pub/Sub and sinks them to GCS. I'm imagining a ParDo that
buffers incoming messages and then writes them all as an object to GCS in a
@FinalizeBundle method, avoiding the multiple GroupByKey operations needed
for writing sharded output from FileIO.

The problem is that bundles in practice look to be far too small to make
this feasible. I deployed a 4 node test job that simply reads ~30k messages
per second from Pub/Sub and passes them to a transform that publishes some
metrics about the bundles passing through. I found a mean bundle size of
~500 elements corresponding to ~10 MB of data, which is too small for the
proposed approach to be feasible. Are there any tricks I could use to
coerce Dataflow to increase the size of bundles?

I realize this is basically an abuse of the Beam programming model, but the
alternative I'm looking at is having to write a custom application using
the google-cloud APIs and deploying it on Kubernetes.


Re: Is AvroCoder the right coder for me?

2019-04-04 Thread Ryan Skraba
Hello Augusto!

I just took a look.  The behaviour that you're seeing looks like it's set
in Avro ReflectData -- to avoid doing expensive reflection calls for each
serialization/deserialization, it uses a cache per-class AND access is
synchronized [1].  Only one thread in your executor JVM is accessing the
cached ClassAccessorData at a time, and so it's "normal" that the others
are waiting...  Of course, this doesn't mean that only one thread in the
executor is running at a time, just that they always need to wait their
turn before passing through that one method.

You could have more executors with fewer cores per executor.  That might
shed some light, but it's not really a workaround or solution.

We've had really good results with AvroCoder.of(Schema), which uses
GenericData underneath.   We already knew the schema we wanted, so it was
ok to lose the "magic" of ReflectData and its automatic schema inference,
etc.   I'm a bit surprised that this hasn't come up as a bottleneck before
in Avro, but I didn't find an existing JIRA.

If Avro serialization isn't important to you, you might want to check out
the custom Coder route.  I'd love to hear if you see a big gain in perf!

I hope this helps, Ryan

[1]
https://github.com/apache/avro/blame/branch-1.8/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java#L262
.

On Tue, Apr 2, 2019 at 5:52 PM Maximilian Michels  wrote:

> Hey Augusto,
>
> I haven't used @DefaultCoder, but it could be the problem here.
>
> What if you specify the coder directly for your PCollection? For example:
>
>pCol.setCoder(AvroCoder.of(YourClazz.class));
>
>
> Thanks,
> Max
>
> On 01.04.19 17:52, Augusto Ribeiro wrote:
> > Hi Max,
> >
> > I tried to run the job again in a cluster, this is a thread dump from
> > one of the Spark executors (16 cores)
> >
> > https://imgur.com/u2Gz0xY
> >
> > As you can see, almost all threads are blocked on that single Avro
> > reflection method.
> >
> > Best regards,
> > Augusto
> >
> >
> > On 2019/03/27 07:43:17, Augusto Ribeiro  > > wrote:
> >  > Hi Max,>
> >  >
> >  > Thanks for the answer I will give it another try after I sorted out
> > some other things. I will try to save more data next time (screenshots,
> > thread dumps) so that if it happens again I will be more specific in my
> > questions.>
> >  >
> >  > Best regards,>
> >  > Augusto>
> >  >
> >  > On 2019/03/26 12:31:54, Maximilian Michels  > > wrote: >
> >  > > Hi Augusto,> >
> >  > > >
> >  > > Generally speaking Avro should provide very good performance. The
> > calls > >
> >  > > you are seeing should not be significant because Avro caches the
> > schema > >
> >  > > information for a type. It only creates a schema via Reflection the
> >  > >
> >  > > first time it sees a new type.> >
> >  > > >
> >  > > You can optimize further by using your domain knowledge and create
> > a > >
> >  > > custom coder. However, if you do not do anything fancy, I think the
> > odds > >
> >  > > are low that you will see a performance increase.> >
> >  > > >
> >  > > Cheers,> >
> >  > > Max> >
> >  > > >
> >  > > On 26.03.19 09:35, Augusto Ribeiro wrote:> >
> >  > > > Hi again,> >
> >  > > > > >
> >  > > > Sorry for bumping this thread but nobody really came with
> > insight.> >
> >  > > > > >
> >  > > > Should I be defining my own coders for my objects or is it common
> > practice to use the AvroCoder or maybe some other coder?> >
> >  > > > > >
> >  > > > Best regards,> >
> >  > > > Augusto> >
> >  > > > > >
> >  > > > On 2019/03/21 07:35:07, au...@gmail.com 
> > http://gmail.com>> wrote:> >
> >  > > >> Hi>> >
> >  > > >>> >
> >  > > >> I am trying out Beam to do some data aggregations. Many of the
> > inputs/outputs of my transforms are complex objects (not super complex,
> > but containing Maps/Lists/Sets sometimes) so when I was prompted to
> > defined a coder to these objects I added the annotation
> > @DefaultCoder(AvroCoder.class) and things worked in my development
> > environment.>> >
> >  > > >>> >
> >  > > >> Now that I am trying to run in on "real" data I notice that
> > after I deployed it to a spark runner and looking at some thread dumps,
> > many of the threads were blocked on the following method on the Avro
> > library (ReflectData.getAccessorsFor). So my question is, did I do the
> > wrong thing by using the AvroCoder or is there some other coder that
> > easily can solve my problem?>> >
> >  > > >>> >
> >  > > >> Best regards,>> >
> >  > > >> Augusto>> >
> >  > > >>> >
> >  > > >>> >
> >  > > >>> >
> >  > > >
>