Re: PCollectionViews$SimplePCollectionView.hashCode allocates memory

2018-10-31 Thread Vojtech Janota
Ok, will do both.

Thanks,
Vojta

On Wed, Oct 31, 2018 at 2:32 PM Ismaël Mejía  wrote:

> Vojta you are right, your implementation seems like a good improvement.
> Can you please create a JIRA and eventually if you are interested do a
> PR to contribute a fix for it.
>
> Regards,
> Ismaël
> On Wed, Oct 31, 2018 at 2:18 PM Vojtech Janota 
> wrote:
> >
> > Hi,
> >
> > I'm currently profiling memory consumption of our Beam pipeline and have
> noticed that
> >
> >
>  org.apache.beam.sdk.values.PCollectionViews$SimplePCollectionView.hashCode()
> >
> > makes noticeable heap allocations. The implementation is:
> >
> > return Objects.hash(tag);
> >
> > That itself translates to:
> >
> > return Arrays.hashCode(values);
> >
> > Which performs implicit array creation in order to call:
> >
> > public static int Arrays.hashCode(Object a[]);
> >
> > Am I right that changing the SimplePCollectionView implementation to a
> simple:
> >
> > return tag.hashCode();
> >
> > Is the right thing to do?
> >
> > Regards,
> > Vojta
>


PCollectionViews$SimplePCollectionView.hashCode allocates memory

2018-10-31 Thread Vojtech Janota
Hi,

I'm currently profiling memory consumption of our Beam pipeline and have
noticed that


org.apache.beam.sdk.values.PCollectionViews$SimplePCollectionView.hashCode()

makes noticeable heap allocations. The implementation is:

return Objects.hash(tag);

That itself translates to:

return Arrays.hashCode(values);

Which performs implicit array creation in order to call:

public static int Arrays.hashCode(Object a[]);

Am I right that changing the SimplePCollectionView implementation to a
simple:

return tag.hashCode();

Is the right thing to do?

Regards,
Vojta


[BEAM-960] Backoff in the DirectRunner if no work is available

2018-08-30 Thread Vojtech Janota
Hi beamers,

I would like to contribute fix for the following issue:

   - https://issues.apache.org/jira/browse/BEAM-690

The corresponding PR:

   - https://github.com/apache/beam/pull/6303

I tried to follow the approach suggested in the comments of the said ticket
and any feedback is appreciated. This is my first attempt to contribute to
Beam code, so please bear with me if I missed something important.

To give you some background on why I'm fixing this:

I understand that performance related issues in direct runner generally
receive low priority as the whole direct runner concept is not meant for a
production deployment. Yet I think that this issue should receive
reasonable attention because even if being used in testing/CI pipelines,
the increased CPU consumption may materialise in a form of higher bill from
your favourite cloud provider. This definitely is our case and it makes
this issue a high priority one for us.

Regards,
Vojta


Re: Performance issue in Beam 2.4 onwards

2018-07-10 Thread Vojtech Janota
Hi guys,

Thank you for all of your feedback. I have created relevant issue in JIRA:
https://issues.apache.org/jira/browse/BEAM-4750

@Lukasz: me mentioning the DirectRunner was somewhat unfortunate - the
bottleneck was introduced into the core library and so Flink and Spark
runners would be impacted too

Thanks,
Vojta

On Mon, Jul 9, 2018 at 5:48 PM, Lukasz Cwik  wrote:

> Instead of reverting/working around specific checks/tests that the
> DirectRunner is doing, have you considered using one of the other runners
> like Flink or Spark with a local execution cluster. You won't hit the
> validation/verification bottlenecks that DirectRunner specifically imposes.
>
> On Mon, Jul 9, 2018 at 8:46 AM Jean-Baptiste Onofré 
> wrote:
>
>> Thanks for the update Eugene.
>>
>> @Vojta: do you mind to create a Jira ? I will tackle a fix for that.
>>
>> Regards
>> JB
>>
>> On 09/07/2018 17:33, Eugene Kirpichov wrote:
>> > Hi -
>> >
>> > If I remember correctly, the reason for this change was to ensure that
>> > the state is encodable at all. Prior to the change, there had been
>> > situations where the coder specified on a state cell is buggy, absent or
>> > set incorrectly (due to some issue in coder inference), but direct
>> > runner did not detect this because it never tried to encode the state
>> > cells - this would have blown up in any distributed runner.
>> >
>> > I think it should be possible to relax this and clone only values being
>> > added to the state, rather than cloning the whole state on copy(). I
>> > don't have time to work on this change myself, but I can review a PR if
>> > someone else does.
>> >
>> > On Mon, Jul 9, 2018 at 8:28 AM Jean-Baptiste Onofré > > <mailto:j...@nanthrax.net>> wrote:
>> >
>> > Hi Vojta,
>> >
>> > I fully agree, that's why it makes sense to wait Eugene's feedback.
>> >
>> > I remember we had some performance regression on the direct runner
>> > identified thanks to Nexmark, but it has been addressed by
>> reverting a
>> > change.
>> >
>> > Good catch anyway !
>> >
>> > Regards
>> > JB
>> >
>> > On 09/07/2018 17:20, Vojtech Janota wrote:
>> > > Hi Reuven,
>> > >
>> > > I'm not really complaining about DirectRunner. In fact it seems to
>> > me as
>> > > if what previously was considered as part of the "expensive extra
>> > > checks" done by the DirectRunner is now done within the
>> > > beam-runners-core-java library. Considering that all objects
>> involved
>> > > are immutable (in our case at least) and simple assignment is
>> > > sufficient, the serialization-deserialization really seems as
>> unwanted
>> > > and hugely expensive correctness check. If there was a problem
>> with
>> > > identity copy, wasn't DirectRunner supposed to reveal it?
>> > >
>> > > Regards,
>> > > Vojta
>> > >
>> > > On Mon, Jul 9, 2018 at 4:46 PM, Reuven Lax > > <mailto:re...@google.com>
>> > > <mailto:re...@google.com <mailto:re...@google.com>>> wrote:
>> > >
>> > > Hi Vojita,
>> > >
>> > > One problem is that the DirectRunner is designed for testing,
>> not
>> > > for performance. The DirectRunner currently does many
>> > > purposely-inefficient things, the point of which is to better
>> > expose
>> > > potential bugs in tests. For example, the DirectRunner will
>> > randomly
>> > > shuffle the order of PCollections to ensure that your code
>> > does not
>> > > rely on ordering.  All of this adds cost, because the current
>> > runner
>> > > is designed for testing. There have been requests in the past
>> > for an
>> > > "optimized" local runner, however we don't currently have such
>> > a thing.
>> > >
>> > > In this case, using coders to clone values is more correct.
>> In a
>> > > distributed environment using encode/decode is the only way to
>> > copy
>> > > values, and the DirectRunner is trying to ensure that your
>> code is
>> > > correct in a distrib

Re: Performance issue in Beam 2.4 onwards

2018-07-09 Thread Vojtech Janota
Hi Reuven,

I'm not really complaining about DirectRunner. In fact it seems to me as if
what previously was considered as part of the "expensive extra checks" done
by the DirectRunner is now done within the beam-runners-core-java library.
Considering that all objects involved are immutable (in our case at least)
and simple assignment is sufficient, the serialization-deserialization
really seems as unwanted and hugely expensive correctness check. If there
was a problem with identity copy, wasn't DirectRunner supposed to reveal
it?

Regards,
Vojta

On Mon, Jul 9, 2018 at 4:46 PM, Reuven Lax  wrote:

> Hi Vojita,
>
> One problem is that the DirectRunner is designed for testing, not for
> performance. The DirectRunner currently does many purposely-inefficient
> things, the point of which is to better expose potential bugs in tests. For
> example, the DirectRunner will randomly shuffle the order of PCollections
> to ensure that your code does not rely on ordering.  All of this adds cost,
> because the current runner is designed for testing. There have been
> requests in the past for an "optimized" local runner, however we don't
> currently have such a thing.
>
> In this case, using coders to clone values is more correct. In a
> distributed environment using encode/decode is the only way to copy values,
> and the DirectRunner is trying to ensure that your code is correct in a
> distributed environment.
>
> Reuven
>
> On Mon, Jul 9, 2018 at 7:22 AM Vojtech Janota 
> wrote:
>
>> Hi,
>>
>> We are using Apache Beam in our project for some time now. Since our
>> datasets are of modest size, we have so far used DirectRunner as the
>> computation easily fits onto a single machine. Recently we upgraded Beam
>> from 2.2 to 2.4 and found out that performance of our pipelines drastically
>> deteriorated. Pipelines that took ~3 minutes with 2.2 do not finish within
>> hours now. We tried to isolate the change that causes the slowdown and came
>> to the commits into the "InMemoryStateInternals" class:
>>
>> * https://github.com/apache/beam/commit/32a427c
>> * https://github.com/apache/beam/commit/8151d82
>>
>> In a nutshell where previously the copy() method simply assigned:
>>
>>   that.value = this.value
>>
>> There is now coder encode/decode combo hidden behind:
>>
>>   that.value = uncheckedClone(coder, this.value)
>>
>> Can somebody explain the purpose of this change? Is it meant as an
>> additional "enforcement" point, similar to DirectRunner's
>> enforceImmutability and enforceEncodability? Or is it something that is
>> genuinely needed to provide correct behaviour of the pipeline?
>>
>> Any hints or thoughts are appreciated.
>>
>> Regards,
>> Vojta
>>
>>
>>
>>
>>
>>


Performance issue in Beam 2.4 onwards

2018-07-09 Thread Vojtech Janota
Hi,

We are using Apache Beam in our project for some time now. Since our
datasets are of modest size, we have so far used DirectRunner as the
computation easily fits onto a single machine. Recently we upgraded Beam
from 2.2 to 2.4 and found out that performance of our pipelines drastically
deteriorated. Pipelines that took ~3 minutes with 2.2 do not finish within
hours now. We tried to isolate the change that causes the slowdown and came
to the commits into the "InMemoryStateInternals" class:

* https://github.com/apache/beam/commit/32a427c
* https://github.com/apache/beam/commit/8151d82

In a nutshell where previously the copy() method simply assigned:

  that.value = this.value

There is now coder encode/decode combo hidden behind:

  that.value = uncheckedClone(coder, this.value)

Can somebody explain the purpose of this change? Is it meant as an
additional "enforcement" point, similar to DirectRunner's
enforceImmutability and enforceEncodability? Or is it something that is
genuinely needed to provide correct behaviour of the pipeline?

Any hints or thoughts are appreciated.

Regards,
Vojta