Re: Status of IntelliJ with Gradle

2018-08-22 Thread Kai Jiang
I encountered same error with Xinyu when I was launching unit tests in 
Intellij. For now, I am only using gradle to test unit tests.

Thanks,
Kai

On 2018/08/22 21:11:06, Xinyu Liu  wrote: 
> We experienced the same issues too in intellij after switching to latest
> version. I did the trick Luke mentioned before to include the
> beam-model-fn-execution and beam-model-job-management jars in the dependent
> modules to get around compilation. But I cannot get the vendored protobuf
> working. Seems the RunnerApi is using the original protobuf package, and it
> causes confusion in intellij if I added the relocated jar. As a result, I
> have to run and debug only using gradle for now.
> 
> Thanks,
> Xinyu
> 
> On Wed, Aug 22, 2018 at 1:45 AM, Maximilian Michels  wrote:
> 
> > Thanks Lukasz. I also found that I can never fix all import errors by
> > manually adding jars to the IntelliJ library list. It is also not a good
> > solution because it breaks on reloading the Gradle project.
> >
> > New contributors might find the errors in IntelliJ distracting. Even
> > worse, they might assume the problem is on their side. If we can't fix them
> > soon, I'd suggest documenting the IntelliJ limitations in the contributor
> > guide.
> >
> > On 20.08.18 17:58, Lukasz Cwik wrote:
> >
> >> Yes, I have the same issues with vendoring. These are the things that I
> >> have tried without success to get Intellij to import the vendored modules
> >> correctly:
> >> * attempted to modify the idea.module.scopes to only include the vendored
> >> artifacts (for some reason this is ignored and Intellij is relying on the
> >> output of its own internal module, nothing I add to the scopes seems to
> >> impact anything)
> >> * modify the generated iml beforehand to add the vendored jar file as the
> >> top dependency (jar never appears in the modules dependencies)
> >>
> >> On Mon, Aug 20, 2018 at 8:36 AM Maximilian Michels  >> > wrote:
> >>
> >> Thank you Etienne for opening the issue.
> >>
> >> Anyone else having problems with the shaded Protobuf dependency?
> >>
> >> On 20.08.18 16:14, Etienne Chauchot wrote:
> >>  > Hi Max,
> >>  >
> >>  > I experienced the same, I had first opened a general ticket
> >>  > (https://issues.apache.org/jira/browse/BEAM-4418) about gradle
> >>  > improvements and I just split it in several tickets. Here is the
> >> one
> >>  > concerning the same issue:
> >> https://issues.apache.org/jira/browse/BEAM-5176
> >>  >
> >>  > Etienne
> >>  >
> >>  > Le lundi 20 août 2018 à 15:51 +0200, Maximilian Michels a écrit :
> >>  >> Hi Beamers,
> >>  >>
> >>  >> It's great to see the Beam build system overhauled. Thank you
> >> for all
> >>  >> the hard work.
> >>  >>
> >>  >> That said, I've just started contributing to Beam again and I feel
> >>  >> really stupid for not having a fully-functional IDE. I've closely
> >>  >> followed the IntelliJ/Gradle instructions [1]. In the terminal
> >>  >> everything works fine.
> >>  >>
> >>  >> First of all, I get warnings like the following and the build
> >> fails:
> >>  >>
> >>  >> 
> >>  >>
> >> .../beam/sdks/java/core/src/main/java/org/apache/beam/sdk/pa
> >> ckage-info.java:29:
> >>  >> warning: [deprecation] NonNull in
> >> edu.umd.cs.findbugs.annotations has
> >>  >> been deprecated
> >>  >> @DefaultAnnotation(NonNull.class)
> >>  >>^
> >>  >> error: warnings found and -Werror specified
> >>  >> 1 error
> >>  >> 89 warnings
> >>  >> =
> >>  >>
> >>  >> Somehow the "-Xlint:-deprecation" compiler flag does not get
> >> through but
> >>  >> "-Werror" does. I can get it to compile by removing the
> >> "-Werror" flag
> >>  >> from BeamModulePlugin but that's obviously not the solution.
> >>  >>
> >>  >> Further, once the build succeeds I still have to add the relocated
> >>  >> Protobuf library manually because the one in "vendor" does not get
> >>  >> picked up. I've tried clearing caches / re-adding the project /
> >>  >> upgrading IntelliJ / changing Gradle configs.
> >>  >>
> >>  >>
> >>  >> Is this just me or do you also have similar problems? If so, I
> >> would
> >>  >> like to compile a list of possible fixes to help other
> >> contributors.
> >>  >>
> >>  >>
> >>  >> Thanks,
> >>  >> Max
> >>  >>
> >>  >>
> >>  >> Tested with
> >>  >> - IntelliJ 2018.1.6 and 2018.2.1.
> >>  >> - MacOS
> >>  >> - java version "1.8.0_112"
> >>  >>
> >>  >> [1] https://beam.apache.org/contribute/intellij/
> >>  >>
> >>  >>
> >>
> >>
> > --
> > Max
> >
> 


Re: Status of IntelliJ with Gradle

2018-08-22 Thread Kai Jiang



On 2018/08/22 21:11:06, Xinyu Liu  wrote: 
> We experienced the same issues too in intellij after switching to latest
> version. I did the trick Luke mentioned before to include the
> beam-model-fn-execution and beam-model-job-management jars in the dependent
> modules to get around compilation. But I cannot get the vendored protobuf
> working. Seems the RunnerApi is using the original protobuf package, and it
> causes confusion in intellij if I added the relocated jar. As a result, I
> have to run and debug only using gradle for now.
> 
> Thanks,
> Xinyu
> 
> On Wed, Aug 22, 2018 at 1:45 AM, Maximilian Michels  wrote:
> 
> > Thanks Lukasz. I also found that I can never fix all import errors by
> > manually adding jars to the IntelliJ library list. It is also not a good
> > solution because it breaks on reloading the Gradle project.
> >
> > New contributors might find the errors in IntelliJ distracting. Even
> > worse, they might assume the problem is on their side. If we can't fix them
> > soon, I'd suggest documenting the IntelliJ limitations in the contributor
> > guide.
> >
> > On 20.08.18 17:58, Lukasz Cwik wrote:
> >
> >> Yes, I have the same issues with vendoring. These are the things that I
> >> have tried without success to get Intellij to import the vendored modules
> >> correctly:
> >> * attempted to modify the idea.module.scopes to only include the vendored
> >> artifacts (for some reason this is ignored and Intellij is relying on the
> >> output of its own internal module, nothing I add to the scopes seems to
> >> impact anything)
> >> * modify the generated iml beforehand to add the vendored jar file as the
> >> top dependency (jar never appears in the modules dependencies)
> >>
> >> On Mon, Aug 20, 2018 at 8:36 AM Maximilian Michels  >> > wrote:
> >>
> >> Thank you Etienne for opening the issue.
> >>
> >> Anyone else having problems with the shaded Protobuf dependency?
> >>
> >> On 20.08.18 16:14, Etienne Chauchot wrote:
> >>  > Hi Max,
> >>  >
> >>  > I experienced the same, I had first opened a general ticket
> >>  > (https://issues.apache.org/jira/browse/BEAM-4418) about gradle
> >>  > improvements and I just split it in several tickets. Here is the
> >> one
> >>  > concerning the same issue:
> >> https://issues.apache.org/jira/browse/BEAM-5176
> >>  >
> >>  > Etienne
> >>  >
> >>  > Le lundi 20 août 2018 à 15:51 +0200, Maximilian Michels a écrit :
> >>  >> Hi Beamers,
> >>  >>
> >>  >> It's great to see the Beam build system overhauled. Thank you
> >> for all
> >>  >> the hard work.
> >>  >>
> >>  >> That said, I've just started contributing to Beam again and I feel
> >>  >> really stupid for not having a fully-functional IDE. I've closely
> >>  >> followed the IntelliJ/Gradle instructions [1]. In the terminal
> >>  >> everything works fine.
> >>  >>
> >>  >> First of all, I get warnings like the following and the build
> >> fails:
> >>  >>
> >>  >> 
> >>  >>
> >> .../beam/sdks/java/core/src/main/java/org/apache/beam/sdk/pa
> >> ckage-info.java:29:
> >>  >> warning: [deprecation] NonNull in
> >> edu.umd.cs.findbugs.annotations has
> >>  >> been deprecated
> >>  >> @DefaultAnnotation(NonNull.class)
> >>  >>^
> >>  >> error: warnings found and -Werror specified
> >>  >> 1 error
> >>  >> 89 warnings
> >>  >> =
> >>  >>
> >>  >> Somehow the "-Xlint:-deprecation" compiler flag does not get
> >> through but
> >>  >> "-Werror" does. I can get it to compile by removing the
> >> "-Werror" flag
> >>  >> from BeamModulePlugin but that's obviously not the solution.
> >>  >>
> >>  >> Further, once the build succeeds I still have to add the relocated
> >>  >> Protobuf library manually because the one in "vendor" does not get
> >>  >> picked up. I've tried clearing caches / re-adding the project /
> >>  >> upgrading IntelliJ / changing Gradle configs.
> >>  >>
> >>  >>
> >>  >> Is this just me or do you also have similar problems? If so, I
> >> would
> >>  >> like to compile a list of possible fixes to help other
> >> contributors.
> >>  >>
> >>  >>
> >>  >> Thanks,
> >>  >> Max
> >>  >>
> >>  >>
> >>  >> Tested with
> >>  >> - IntelliJ 2018.1.6 and 2018.2.1.
> >>  >> - MacOS
> >>  >> - java version "1.8.0_112"
> >>  >>
> >>  >> [1] https://beam.apache.org/contribute/intellij/
> >>  >>
> >>  >>
> >>
> >>
> > --
> > Max
> >
> I encountered same error with Xinyu when I was launching unit tests in 
> Intellij.
> - Kai


Re: Status of IntelliJ with Gradle

2018-08-22 Thread Xinyu Liu
We experienced the same issues too in intellij after switching to latest
version. I did the trick Luke mentioned before to include the
beam-model-fn-execution and beam-model-job-management jars in the dependent
modules to get around compilation. But I cannot get the vendored protobuf
working. Seems the RunnerApi is using the original protobuf package, and it
causes confusion in intellij if I added the relocated jar. As a result, I
have to run and debug only using gradle for now.

Thanks,
Xinyu

On Wed, Aug 22, 2018 at 1:45 AM, Maximilian Michels  wrote:

> Thanks Lukasz. I also found that I can never fix all import errors by
> manually adding jars to the IntelliJ library list. It is also not a good
> solution because it breaks on reloading the Gradle project.
>
> New contributors might find the errors in IntelliJ distracting. Even
> worse, they might assume the problem is on their side. If we can't fix them
> soon, I'd suggest documenting the IntelliJ limitations in the contributor
> guide.
>
> On 20.08.18 17:58, Lukasz Cwik wrote:
>
>> Yes, I have the same issues with vendoring. These are the things that I
>> have tried without success to get Intellij to import the vendored modules
>> correctly:
>> * attempted to modify the idea.module.scopes to only include the vendored
>> artifacts (for some reason this is ignored and Intellij is relying on the
>> output of its own internal module, nothing I add to the scopes seems to
>> impact anything)
>> * modify the generated iml beforehand to add the vendored jar file as the
>> top dependency (jar never appears in the modules dependencies)
>>
>> On Mon, Aug 20, 2018 at 8:36 AM Maximilian Michels > > wrote:
>>
>> Thank you Etienne for opening the issue.
>>
>> Anyone else having problems with the shaded Protobuf dependency?
>>
>> On 20.08.18 16:14, Etienne Chauchot wrote:
>>  > Hi Max,
>>  >
>>  > I experienced the same, I had first opened a general ticket
>>  > (https://issues.apache.org/jira/browse/BEAM-4418) about gradle
>>  > improvements and I just split it in several tickets. Here is the
>> one
>>  > concerning the same issue:
>> https://issues.apache.org/jira/browse/BEAM-5176
>>  >
>>  > Etienne
>>  >
>>  > Le lundi 20 août 2018 à 15:51 +0200, Maximilian Michels a écrit :
>>  >> Hi Beamers,
>>  >>
>>  >> It's great to see the Beam build system overhauled. Thank you
>> for all
>>  >> the hard work.
>>  >>
>>  >> That said, I've just started contributing to Beam again and I feel
>>  >> really stupid for not having a fully-functional IDE. I've closely
>>  >> followed the IntelliJ/Gradle instructions [1]. In the terminal
>>  >> everything works fine.
>>  >>
>>  >> First of all, I get warnings like the following and the build
>> fails:
>>  >>
>>  >> 
>>  >>
>> .../beam/sdks/java/core/src/main/java/org/apache/beam/sdk/pa
>> ckage-info.java:29:
>>  >> warning: [deprecation] NonNull in
>> edu.umd.cs.findbugs.annotations has
>>  >> been deprecated
>>  >> @DefaultAnnotation(NonNull.class)
>>  >>^
>>  >> error: warnings found and -Werror specified
>>  >> 1 error
>>  >> 89 warnings
>>  >> =
>>  >>
>>  >> Somehow the "-Xlint:-deprecation" compiler flag does not get
>> through but
>>  >> "-Werror" does. I can get it to compile by removing the
>> "-Werror" flag
>>  >> from BeamModulePlugin but that's obviously not the solution.
>>  >>
>>  >> Further, once the build succeeds I still have to add the relocated
>>  >> Protobuf library manually because the one in "vendor" does not get
>>  >> picked up. I've tried clearing caches / re-adding the project /
>>  >> upgrading IntelliJ / changing Gradle configs.
>>  >>
>>  >>
>>  >> Is this just me or do you also have similar problems? If so, I
>> would
>>  >> like to compile a list of possible fixes to help other
>> contributors.
>>  >>
>>  >>
>>  >> Thanks,
>>  >> Max
>>  >>
>>  >>
>>  >> Tested with
>>  >> - IntelliJ 2018.1.6 and 2018.2.1.
>>  >> - MacOS
>>  >> - java version "1.8.0_112"
>>  >>
>>  >> [1] https://beam.apache.org/contribute/intellij/
>>  >>
>>  >>
>>
>>
> --
> Max
>


Re: Process JobBundleFactory for portable runner

2018-08-22 Thread Xinyu Liu
We are also interested in this Process JobBundleFactory as we are planning
to fork a process to run python sdk in Samza runner, instead of using
docker container. So this change will be helpful to us too. On the same
note, we are trying out portable_runner.py to submit a python job. Seems it
will create a default docker url even if the hardness_docker_image is set
to None in pipeline options. Shall we add another option or honor the None
in this option to support the process job? I made some local changes right
now to walk around this.

Thanks,
Xinyu

On Tue, Aug 21, 2018 at 12:25 PM, Henning Rohde  wrote:

> By "enum" in quotes, I meant the usual open URN style pattern not an
> actual enum. Sorry if that wasn't clear.
>
> On Tue, Aug 21, 2018 at 11:51 AM Lukasz Cwik  wrote:
>
>> I would model the environment to be more free form then enums such that
>> we have forward looking extensibility and would suggest to follow the same
>> pattern we use on PTransforms (using an URN and a URN specific payload).
>> Note that in this case we may want to support a list of supported
>> environments (e.g. java, docker, python, ...).
>>
>> On Tue, Aug 21, 2018 at 10:37 AM Henning Rohde 
>> wrote:
>>
>>> One thing to consider that we've talked about in the past. It might make
>>> sense to extend the environment proto and have the SDK be explicit about
>>> which kinds of environment it supports:
>>>
>>> https://github.com/apache/beam/blob/
>>> 8c4f4babc0b0d55e7bddefa3f9f9ba65d21ef139/model/pipeline/src/
>>> main/proto/beam_runner_api.proto#L969
>>>
>>> This choice might impact what files are staged or what not. Some SDKs,
>>> such as Go, provide a compiled binary and _need_ to know what the target
>>> architecture is. Running on a mac with and without docker, say, requires a
>>> different worker in each case. If we add an "enum", we can also easily add
>>> the external idea where the SDK/user starts the SDK harnesses instead of
>>> the runner. Each runner may not support all types of environments.
>>>
>>> Henning
>>>
>>> On Tue, Aug 21, 2018 at 2:52 AM Maximilian Michels 
>>> wrote:
>>>
 For reference, here is corresponding JIRA issue for this thread:
 https://issues.apache.org/jira/browse/BEAM-5187

 On 16.08.18 11:15, Maximilian Michels wrote:
 > Makes sense to have an option to run the SDK harness in a
 non-dockerized
 > environment.
 >
 > I'm in the process of creating a Docker entry point for Flink's
 > JobServer[1]. I suppose you would also prefer to execute that one
 > standalone. We should make sure this is also an option.
 >
 > [1] https://issues.apache.org/jira/browse/BEAM-4130
 >
 > On 16.08.18 07:42, Thomas Weise wrote:
 >> Yes, that's the proposal. Everything that would otherwise be packaged
 >> into the Docker container would need to be pre-installed in the host
 >> environment. In the case of Python SDK, that could simply mean a
 >> (frozen) virtual environment that was setup when the host was
 >> provisioned - the SDK harness process(es) will then just utilize
 that.
 >> Of course this flavor of SDK harness execution could also be useful
 in
 >> the local development environment, where right now someone who
 already
 >> has the Python environment needs to also install Docker and update a
 >> container to launch a Python SDK pipeline on the Flink runner.
 >>
 >> On Wed, Aug 15, 2018 at 12:40 PM Daniel Oliveira <
 danolive...@google.com
 >> > wrote:
 >>
 >>  I just want to clarify that I understand this correctly since
 I'm
 >>  not that familiar with the details behind all these execution
 >>  environments yet. Is the proposal to create a new
 JobBundleFactory
 >>  that instead of using Docker to create the environment that the
 new
 >>  processes will execute in, this JobBundleFactory would execute
 the
 >>  new processes directly in the host environment? So in practice
 if I
 >>  ran a pipeline with this JobBundleFactory the SDK Harness and
 Runner
 >>  Harness would both be executing directly on my machine and would
 >>  depend on me having the dependencies already present on my
 machine?
 >>
 >>  On Mon, Aug 13, 2018 at 5:50 PM Ankur Goenka >>> >>  > wrote:
 >>
 >>  Thanks for starting the discussion. I will be happy to help.
 >>  I agree, we should have pluggable SDKHarness environment
 Factory.
 >>  We can register multiple Environment factory using service
 >>  registry and use the PipelineOption to pick the right one
 on per
 >>  job basis.
 >>
 >>  There are a couple of things which are require to setup
 before
 >>  launching the process.
 >>
 >>* Setting up the environment 

Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Tim Robertson
Reuven, I think you might be on to something

The Beam HadoopFileSystem copy() does indeed stream through the driver [1],
and the FileBasedSink.moveToOutputFiles() seemingly uses that method [2].
I'll cobble together a patched version to test using a rename() rather than
a copy() and report back findings before we consider the implications.

Thanks

[1]
https://github.com/apache/beam/blob/master/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java#L124
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L288

On Wed, Aug 22, 2018 at 8:52 PM Tim Robertson 
wrote:

> > Does HDFS support a fast rename operation?
>
> Yes. From the shell it is “mv” and in the Java API it is “rename(Path src,
> Path dst)”.
> I am not aware of a fast copy though. I think an HDFS copy streams the
> bytes through the driver (unless a distcp is issued which is a MR job).
>
> (Thanks for engaging in this discussion folks)
>
>
> On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax  wrote:
>
>> I have another theory: in FileBasedSink.moveToOutputFiles we copy the
>> temporary files to the final destination and then delete the temp files.
>> Does HDFS support a fast rename operation? If so, I bet Spark is using that
>> instead of paying the cost of copying the files.
>>
>> On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax  wrote:
>>
>>> Ismael, that should already be true. If not using dynamic destinations
>>> there might be some edges in the graph that are never used (i.e. no records
>>> are ever published on them), but that should not affect performance. If
>>> this is not the case we should fix it.
>>>
>>> Reuven
>>>
>>> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía  wrote:
>>>
 Spark runner uses the Spark broadcast mechanism to materialize the
 side input PCollections in the workers, not sure exactly if this is
 efficient assigned in an optimal way but seems logical at least.

 Just wondering if we shouldn't better first tackle the fact that if
 the pipeline does not have dynamic destinations (this case) WriteFiles
 should not be doing so much extra magic?

 On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax  wrote:
 >
 > Often only the metadata (i.e. temp file names) are shuffled, except
 in the "spilling" case (which should only happen when using dynamic
 destinations).
 >
 > WriteFiles depends heavily on side inputs. How are side inputs
 implemented in the Spark runner?
 >
 > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw 
 wrote:
 >>
 >> Yes, I stand corrected, dynamic writes is now much more than the
 >> primitive window-based naming we used to have.
 >>
 >> It would be interesting to visualize how much of this codepath is
 >> metatada vs. the actual data.
 >>
 >> In the case of file writing, it seems one could (maybe?) avoid
 >> requiring a stable input, as shards are accepted as a whole (unlike,
 >> say, sinks where a deterministic uid is needed for deduplication on
 >> retry).
 >>
 >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax  wrote:
 >> >
 >> > Robert - much of the complexity isn't due to streaming, but rather
 because WriteFiles supports "dynamic" output (where the user can choose a
 destination file based on the input record). In practice if a pipeline is
 not using dynamic destinations the full graph is still generated, but much
 of that graph is never used (empty PCollections).
 >> >
 >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw <
 rober...@google.com> wrote:
 >> >>
 >> >> I agree that this is concerning. Some of the complexity may have
 also
 >> >> been introduced to accommodate writing files in Streaming mode,
 but it
 >> >> seems we should be able to execute this as a single Map operation.
 >> >>
 >> >> Have you profiled to see which stages and/or operations are
 taking up the time?
 >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
 >> >>  wrote:
 >> >> >
 >> >> > Hi folks,
 >> >> >
 >> >> > I've recently been involved in projects rewriting Avro files
 and have discovered a concerning performance trait in Beam.
 >> >> >
 >> >> > I have observed Beam between 6-20x slower than native Spark or
 MapReduce code for a simple pipeline of read Avro, modify, write Avro.
 >> >> >
 >> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
 Beam/Spark, 40 minutes with a map-only MR job
 >> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using
 Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1]
 >> >> >
 >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters
 (Spark / YARN) on reference Dell / Cloudera hardware.
 >> >> >
 >> >> > I have only just started exploring but I believe the cause is
 rooted 

Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Tim Robertson
> Does HDFS support a fast rename operation?

Yes. From the shell it is “mv” and in the Java API it is “rename(Path src,
Path dst)”.
I am not aware of a fast copy though. I think an HDFS copy streams the
bytes through the driver (unless a distcp is issued which is a MR job).

(Thanks for engaging in this discussion folks)


On Wed, Aug 22, 2018 at 6:29 PM Reuven Lax  wrote:

> I have another theory: in FileBasedSink.moveToOutputFiles we copy the
> temporary files to the final destination and then delete the temp files.
> Does HDFS support a fast rename operation? If so, I bet Spark is using that
> instead of paying the cost of copying the files.
>
> On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax  wrote:
>
>> Ismael, that should already be true. If not using dynamic destinations
>> there might be some edges in the graph that are never used (i.e. no records
>> are ever published on them), but that should not affect performance. If
>> this is not the case we should fix it.
>>
>> Reuven
>>
>> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía  wrote:
>>
>>> Spark runner uses the Spark broadcast mechanism to materialize the
>>> side input PCollections in the workers, not sure exactly if this is
>>> efficient assigned in an optimal way but seems logical at least.
>>>
>>> Just wondering if we shouldn't better first tackle the fact that if
>>> the pipeline does not have dynamic destinations (this case) WriteFiles
>>> should not be doing so much extra magic?
>>>
>>> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax  wrote:
>>> >
>>> > Often only the metadata (i.e. temp file names) are shuffled, except in
>>> the "spilling" case (which should only happen when using dynamic
>>> destinations).
>>> >
>>> > WriteFiles depends heavily on side inputs. How are side inputs
>>> implemented in the Spark runner?
>>> >
>>> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw 
>>> wrote:
>>> >>
>>> >> Yes, I stand corrected, dynamic writes is now much more than the
>>> >> primitive window-based naming we used to have.
>>> >>
>>> >> It would be interesting to visualize how much of this codepath is
>>> >> metatada vs. the actual data.
>>> >>
>>> >> In the case of file writing, it seems one could (maybe?) avoid
>>> >> requiring a stable input, as shards are accepted as a whole (unlike,
>>> >> say, sinks where a deterministic uid is needed for deduplication on
>>> >> retry).
>>> >>
>>> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax  wrote:
>>> >> >
>>> >> > Robert - much of the complexity isn't due to streaming, but rather
>>> because WriteFiles supports "dynamic" output (where the user can choose a
>>> destination file based on the input record). In practice if a pipeline is
>>> not using dynamic destinations the full graph is still generated, but much
>>> of that graph is never used (empty PCollections).
>>> >> >
>>> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw <
>>> rober...@google.com> wrote:
>>> >> >>
>>> >> >> I agree that this is concerning. Some of the complexity may have
>>> also
>>> >> >> been introduced to accommodate writing files in Streaming mode,
>>> but it
>>> >> >> seems we should be able to execute this as a single Map operation.
>>> >> >>
>>> >> >> Have you profiled to see which stages and/or operations are taking
>>> up the time?
>>> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>>> >> >>  wrote:
>>> >> >> >
>>> >> >> > Hi folks,
>>> >> >> >
>>> >> >> > I've recently been involved in projects rewriting Avro files and
>>> have discovered a concerning performance trait in Beam.
>>> >> >> >
>>> >> >> > I have observed Beam between 6-20x slower than native Spark or
>>> MapReduce code for a simple pipeline of read Avro, modify, write Avro.
>>> >> >> >
>>> >> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
>>> Beam/Spark, 40 minutes with a map-only MR job
>>> >> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using
>>> Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1]
>>> >> >> >
>>> >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters
>>> (Spark / YARN) on reference Dell / Cloudera hardware.
>>> >> >> >
>>> >> >> > I have only just started exploring but I believe the cause is
>>> rooted in the WriteFiles which is used by all our file based IO. WriteFiles
>>> is reasonably complex with reshuffles, spilling to temporary files
>>> (presumably to accommodate varying bundle sizes/avoid small files), a
>>> union, a GBK etc.
>>> >> >> >
>>> >> >> > Before I go too far with exploration I'd appreciate thoughts on
>>> whether we believe this is a concern (I do), if we should explore
>>> optimisations or any insight from previous work in this area.
>>> >> >> >
>>> >> >> > Thanks,
>>> >> >> > Tim
>>> >> >> >
>>> >> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>>>
>>


Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Reuven Lax
I have another theory: in FileBasedSink.moveToOutputFiles we copy the
temporary files to the final destination and then delete the temp files.
Does HDFS support a fast rename operation? If so, I bet Spark is using that
instead of paying the cost of copying the files.

On Wed, Aug 22, 2018 at 8:59 AM Reuven Lax  wrote:

> Ismael, that should already be true. If not using dynamic destinations
> there might be some edges in the graph that are never used (i.e. no records
> are ever published on them), but that should not affect performance. If
> this is not the case we should fix it.
>
> Reuven
>
> On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía  wrote:
>
>> Spark runner uses the Spark broadcast mechanism to materialize the
>> side input PCollections in the workers, not sure exactly if this is
>> efficient assigned in an optimal way but seems logical at least.
>>
>> Just wondering if we shouldn't better first tackle the fact that if
>> the pipeline does not have dynamic destinations (this case) WriteFiles
>> should not be doing so much extra magic?
>>
>> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax  wrote:
>> >
>> > Often only the metadata (i.e. temp file names) are shuffled, except in
>> the "spilling" case (which should only happen when using dynamic
>> destinations).
>> >
>> > WriteFiles depends heavily on side inputs. How are side inputs
>> implemented in the Spark runner?
>> >
>> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw 
>> wrote:
>> >>
>> >> Yes, I stand corrected, dynamic writes is now much more than the
>> >> primitive window-based naming we used to have.
>> >>
>> >> It would be interesting to visualize how much of this codepath is
>> >> metatada vs. the actual data.
>> >>
>> >> In the case of file writing, it seems one could (maybe?) avoid
>> >> requiring a stable input, as shards are accepted as a whole (unlike,
>> >> say, sinks where a deterministic uid is needed for deduplication on
>> >> retry).
>> >>
>> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax  wrote:
>> >> >
>> >> > Robert - much of the complexity isn't due to streaming, but rather
>> because WriteFiles supports "dynamic" output (where the user can choose a
>> destination file based on the input record). In practice if a pipeline is
>> not using dynamic destinations the full graph is still generated, but much
>> of that graph is never used (empty PCollections).
>> >> >
>> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw 
>> wrote:
>> >> >>
>> >> >> I agree that this is concerning. Some of the complexity may have
>> also
>> >> >> been introduced to accommodate writing files in Streaming mode, but
>> it
>> >> >> seems we should be able to execute this as a single Map operation.
>> >> >>
>> >> >> Have you profiled to see which stages and/or operations are taking
>> up the time?
>> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>> >> >>  wrote:
>> >> >> >
>> >> >> > Hi folks,
>> >> >> >
>> >> >> > I've recently been involved in projects rewriting Avro files and
>> have discovered a concerning performance trait in Beam.
>> >> >> >
>> >> >> > I have observed Beam between 6-20x slower than native Spark or
>> MapReduce code for a simple pipeline of read Avro, modify, write Avro.
>> >> >> >
>> >> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
>> Beam/Spark, 40 minutes with a map-only MR job
>> >> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using
>> Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1]
>> >> >> >
>> >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters
>> (Spark / YARN) on reference Dell / Cloudera hardware.
>> >> >> >
>> >> >> > I have only just started exploring but I believe the cause is
>> rooted in the WriteFiles which is used by all our file based IO. WriteFiles
>> is reasonably complex with reshuffles, spilling to temporary files
>> (presumably to accommodate varying bundle sizes/avoid small files), a
>> union, a GBK etc.
>> >> >> >
>> >> >> > Before I go too far with exploration I'd appreciate thoughts on
>> whether we believe this is a concern (I do), if we should explore
>> optimisations or any insight from previous work in this area.
>> >> >> >
>> >> >> > Thanks,
>> >> >> > Tim
>> >> >> >
>> >> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>>
>


Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Reuven Lax
Ismael, that should already be true. If not using dynamic destinations
there might be some edges in the graph that are never used (i.e. no records
are ever published on them), but that should not affect performance. If
this is not the case we should fix it.

Reuven

On Wed, Aug 22, 2018 at 8:50 AM Ismaël Mejía  wrote:

> Spark runner uses the Spark broadcast mechanism to materialize the
> side input PCollections in the workers, not sure exactly if this is
> efficient assigned in an optimal way but seems logical at least.
>
> Just wondering if we shouldn't better first tackle the fact that if
> the pipeline does not have dynamic destinations (this case) WriteFiles
> should not be doing so much extra magic?
>
> On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax  wrote:
> >
> > Often only the metadata (i.e. temp file names) are shuffled, except in
> the "spilling" case (which should only happen when using dynamic
> destinations).
> >
> > WriteFiles depends heavily on side inputs. How are side inputs
> implemented in the Spark runner?
> >
> > On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw 
> wrote:
> >>
> >> Yes, I stand corrected, dynamic writes is now much more than the
> >> primitive window-based naming we used to have.
> >>
> >> It would be interesting to visualize how much of this codepath is
> >> metatada vs. the actual data.
> >>
> >> In the case of file writing, it seems one could (maybe?) avoid
> >> requiring a stable input, as shards are accepted as a whole (unlike,
> >> say, sinks where a deterministic uid is needed for deduplication on
> >> retry).
> >>
> >> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax  wrote:
> >> >
> >> > Robert - much of the complexity isn't due to streaming, but rather
> because WriteFiles supports "dynamic" output (where the user can choose a
> destination file based on the input record). In practice if a pipeline is
> not using dynamic destinations the full graph is still generated, but much
> of that graph is never used (empty PCollections).
> >> >
> >> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw 
> wrote:
> >> >>
> >> >> I agree that this is concerning. Some of the complexity may have also
> >> >> been introduced to accommodate writing files in Streaming mode, but
> it
> >> >> seems we should be able to execute this as a single Map operation.
> >> >>
> >> >> Have you profiled to see which stages and/or operations are taking
> up the time?
> >> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
> >> >>  wrote:
> >> >> >
> >> >> > Hi folks,
> >> >> >
> >> >> > I've recently been involved in projects rewriting Avro files and
> have discovered a concerning performance trait in Beam.
> >> >> >
> >> >> > I have observed Beam between 6-20x slower than native Spark or
> MapReduce code for a simple pipeline of read Avro, modify, write Avro.
> >> >> >
> >> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
> Beam/Spark, 40 minutes with a map-only MR job
> >> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using
> Beam/Spark, 18 minutes using vanilla Spark code. Test code available [1]
> >> >> >
> >> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters
> (Spark / YARN) on reference Dell / Cloudera hardware.
> >> >> >
> >> >> > I have only just started exploring but I believe the cause is
> rooted in the WriteFiles which is used by all our file based IO. WriteFiles
> is reasonably complex with reshuffles, spilling to temporary files
> (presumably to accommodate varying bundle sizes/avoid small files), a
> union, a GBK etc.
> >> >> >
> >> >> > Before I go too far with exploration I'd appreciate thoughts on
> whether we believe this is a concern (I do), if we should explore
> optimisations or any insight from previous work in this area.
> >> >> >
> >> >> > Thanks,
> >> >> > Tim
> >> >> >
> >> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>


Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Ismaël Mejía
Spark runner uses the Spark broadcast mechanism to materialize the
side input PCollections in the workers, not sure exactly if this is
efficient assigned in an optimal way but seems logical at least.

Just wondering if we shouldn't better first tackle the fact that if
the pipeline does not have dynamic destinations (this case) WriteFiles
should not be doing so much extra magic?

On Wed, Aug 22, 2018 at 5:26 PM Reuven Lax  wrote:
>
> Often only the metadata (i.e. temp file names) are shuffled, except in the 
> "spilling" case (which should only happen when using dynamic destinations).
>
> WriteFiles depends heavily on side inputs. How are side inputs implemented in 
> the Spark runner?
>
> On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw  wrote:
>>
>> Yes, I stand corrected, dynamic writes is now much more than the
>> primitive window-based naming we used to have.
>>
>> It would be interesting to visualize how much of this codepath is
>> metatada vs. the actual data.
>>
>> In the case of file writing, it seems one could (maybe?) avoid
>> requiring a stable input, as shards are accepted as a whole (unlike,
>> say, sinks where a deterministic uid is needed for deduplication on
>> retry).
>>
>> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax  wrote:
>> >
>> > Robert - much of the complexity isn't due to streaming, but rather because 
>> > WriteFiles supports "dynamic" output (where the user can choose a 
>> > destination file based on the input record). In practice if a pipeline is 
>> > not using dynamic destinations the full graph is still generated, but much 
>> > of that graph is never used (empty PCollections).
>> >
>> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw  
>> > wrote:
>> >>
>> >> I agree that this is concerning. Some of the complexity may have also
>> >> been introduced to accommodate writing files in Streaming mode, but it
>> >> seems we should be able to execute this as a single Map operation.
>> >>
>> >> Have you profiled to see which stages and/or operations are taking up the 
>> >> time?
>> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>> >>  wrote:
>> >> >
>> >> > Hi folks,
>> >> >
>> >> > I've recently been involved in projects rewriting Avro files and have 
>> >> > discovered a concerning performance trait in Beam.
>> >> >
>> >> > I have observed Beam between 6-20x slower than native Spark or 
>> >> > MapReduce code for a simple pipeline of read Avro, modify, write Avro.
>> >> >
>> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using 
>> >> > Beam/Spark, 40 minutes with a map-only MR job
>> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 
>> >> > 18 minutes using vanilla Spark code. Test code available [1]
>> >> >
>> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark 
>> >> > / YARN) on reference Dell / Cloudera hardware.
>> >> >
>> >> > I have only just started exploring but I believe the cause is rooted in 
>> >> > the WriteFiles which is used by all our file based IO. WriteFiles is 
>> >> > reasonably complex with reshuffles, spilling to temporary files 
>> >> > (presumably to accommodate varying bundle sizes/avoid small files), a 
>> >> > union, a GBK etc.
>> >> >
>> >> > Before I go too far with exploration I'd appreciate thoughts on whether 
>> >> > we believe this is a concern (I do), if we should explore optimisations 
>> >> > or any insight from previous work in this area.
>> >> >
>> >> > Thanks,
>> >> > Tim
>> >> >
>> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro


Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Reuven Lax
Often only the metadata (i.e. temp file names) are shuffled, except in the
"spilling" case (which should only happen when using dynamic destinations).

WriteFiles depends heavily on side inputs. How are side inputs implemented
in the Spark runner?

On Wed, Aug 22, 2018 at 8:21 AM Robert Bradshaw  wrote:

> Yes, I stand corrected, dynamic writes is now much more than the
> primitive window-based naming we used to have.
>
> It would be interesting to visualize how much of this codepath is
> metatada vs. the actual data.
>
> In the case of file writing, it seems one could (maybe?) avoid
> requiring a stable input, as shards are accepted as a whole (unlike,
> say, sinks where a deterministic uid is needed for deduplication on
> retry).
>
> On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax  wrote:
> >
> > Robert - much of the complexity isn't due to streaming, but rather
> because WriteFiles supports "dynamic" output (where the user can choose a
> destination file based on the input record). In practice if a pipeline is
> not using dynamic destinations the full graph is still generated, but much
> of that graph is never used (empty PCollections).
> >
> > On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw 
> wrote:
> >>
> >> I agree that this is concerning. Some of the complexity may have also
> >> been introduced to accommodate writing files in Streaming mode, but it
> >> seems we should be able to execute this as a single Map operation.
> >>
> >> Have you profiled to see which stages and/or operations are taking up
> the time?
> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
> >>  wrote:
> >> >
> >> > Hi folks,
> >> >
> >> > I've recently been involved in projects rewriting Avro files and have
> discovered a concerning performance trait in Beam.
> >> >
> >> > I have observed Beam between 6-20x slower than native Spark or
> MapReduce code for a simple pipeline of read Avro, modify, write Avro.
> >> >
> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
> Beam/Spark, 40 minutes with a map-only MR job
> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark,
> 18 minutes using vanilla Spark code. Test code available [1]
> >> >
> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters
> (Spark / YARN) on reference Dell / Cloudera hardware.
> >> >
> >> > I have only just started exploring but I believe the cause is rooted
> in the WriteFiles which is used by all our file based IO. WriteFiles is
> reasonably complex with reshuffles, spilling to temporary files (presumably
> to accommodate varying bundle sizes/avoid small files), a union, a GBK etc.
> >> >
> >> > Before I go too far with exploration I'd appreciate thoughts on
> whether we believe this is a concern (I do), if we should explore
> optimisations or any insight from previous work in this area.
> >> >
> >> > Thanks,
> >> > Tim
> >> >
> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>


Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Robert Bradshaw
Yes, I stand corrected, dynamic writes is now much more than the
primitive window-based naming we used to have.

It would be interesting to visualize how much of this codepath is
metatada vs. the actual data.

In the case of file writing, it seems one could (maybe?) avoid
requiring a stable input, as shards are accepted as a whole (unlike,
say, sinks where a deterministic uid is needed for deduplication on
retry).

On Wed, Aug 22, 2018 at 4:55 PM Reuven Lax  wrote:
>
> Robert - much of the complexity isn't due to streaming, but rather because 
> WriteFiles supports "dynamic" output (where the user can choose a destination 
> file based on the input record). In practice if a pipeline is not using 
> dynamic destinations the full graph is still generated, but much of that 
> graph is never used (empty PCollections).
>
> On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw  wrote:
>>
>> I agree that this is concerning. Some of the complexity may have also
>> been introduced to accommodate writing files in Streaming mode, but it
>> seems we should be able to execute this as a single Map operation.
>>
>> Have you profiled to see which stages and/or operations are taking up the 
>> time?
>> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>>  wrote:
>> >
>> > Hi folks,
>> >
>> > I've recently been involved in projects rewriting Avro files and have 
>> > discovered a concerning performance trait in Beam.
>> >
>> > I have observed Beam between 6-20x slower than native Spark or MapReduce 
>> > code for a simple pipeline of read Avro, modify, write Avro.
>> >
>> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark, 
>> > 40 minutes with a map-only MR job
>> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18 
>> > minutes using vanilla Spark code. Test code available [1]
>> >
>> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark / 
>> > YARN) on reference Dell / Cloudera hardware.
>> >
>> > I have only just started exploring but I believe the cause is rooted in 
>> > the WriteFiles which is used by all our file based IO. WriteFiles is 
>> > reasonably complex with reshuffles, spilling to temporary files 
>> > (presumably to accommodate varying bundle sizes/avoid small files), a 
>> > union, a GBK etc.
>> >
>> > Before I go too far with exploration I'd appreciate thoughts on whether we 
>> > believe this is a concern (I do), if we should explore optimisations or 
>> > any insight from previous work in this area.
>> >
>> > Thanks,
>> > Tim
>> >
>> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro


Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Reuven Lax
Robert - much of the complexity isn't due to streaming, but rather because
WriteFiles supports "dynamic" output (where the user can choose a
destination file based on the input record). In practice if a pipeline is
not using dynamic destinations the full graph is still generated, but much
of that graph is never used (empty PCollections).

On Wed, Aug 22, 2018 at 3:12 AM Robert Bradshaw  wrote:

> I agree that this is concerning. Some of the complexity may have also
> been introduced to accommodate writing files in Streaming mode, but it
> seems we should be able to execute this as a single Map operation.
>
> Have you profiled to see which stages and/or operations are taking up the
> time?
> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>  wrote:
> >
> > Hi folks,
> >
> > I've recently been involved in projects rewriting Avro files and have
> discovered a concerning performance trait in Beam.
> >
> > I have observed Beam between 6-20x slower than native Spark or MapReduce
> code for a simple pipeline of read Avro, modify, write Avro.
> >
> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark,
> 40 minutes with a map-only MR job
> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18
> minutes using vanilla Spark code. Test code available [1]
> >
> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark /
> YARN) on reference Dell / Cloudera hardware.
> >
> > I have only just started exploring but I believe the cause is rooted in
> the WriteFiles which is used by all our file based IO. WriteFiles is
> reasonably complex with reshuffles, spilling to temporary files (presumably
> to accommodate varying bundle sizes/avoid small files), a union, a GBK etc.
> >
> > Before I go too far with exploration I'd appreciate thoughts on whether
> we believe this is a concern (I do), if we should explore optimisations or
> any insight from previous work in this area.
> >
> > Thanks,
> > Tim
> >
> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>


Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Reuven Lax
I think we need dig in more to understand where the slowness is. Some
context (which might not be obvious from the code):

* Much of the complexity in WriteFiles is not always active. e.g. a lot of
it is there to support dynamic output (where the filename is dynamically
chosen based on the input record), and if you're not using dynamic output a
lot of those codepaths will not be used.

* There is some overhead because Beam does not assume that ParDos are
deterministic (by contrast, Spark often assumes that user code is
deterministic), and so inserts a shuffle to make sure that file writes are
deterministic. I believe that the current Spark runner might checkpoint the
entire RDD in such a case, which is quite inefficient. We should try on
other runners to make sure that this issue is not specific to the Spark
runner.

* Spilling to temporary files is done to avoid workers crashing with out of
memory. Beam attempts to write files straight out of the bundle (to avoid
shuffling all the data and just shuffle filenames). However empirically
when there are too many files we get large bundles and all the file write
buffers cause the workers to start running out of memory; a solution is to
reshuffle the data to distribute it. This will only happen if you are using
windowed writes or dynamic destinations to write to dynamic locations,
otherwise the spilled code path is never executed.

On Wed, Aug 22, 2018 at 2:29 AM Tim Robertson 
wrote:

> Hi folks,
>
> I've recently been involved in projects rewriting Avro files and have
> discovered a concerning performance trait in Beam.
>
> I have observed Beam between 6-20x slower than native Spark or MapReduce
> code for a simple pipeline of read Avro, modify, write Avro.
>
>  - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark,
> 40 minutes with a map-only MR job
>  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18
> minutes using vanilla Spark code. Test code available [1]
>
> These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark /
> YARN) on reference Dell / Cloudera hardware.
>
> I have only just started exploring but I believe the cause is rooted in
> the WriteFiles which is used by all our file based IO. WriteFiles is
> reasonably complex with reshuffles, spilling to temporary files (presumably
> to accommodate varying bundle sizes/avoid small files), a union, a GBK etc.
>
> Before I go too far with exploration I'd appreciate thoughts on whether we
> believe this is a concern (I do), if we should explore optimisations or any
> insight from previous work in this area.
>
> Thanks,
> Tim
>
> [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>


Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Tim Robertson
> Are we seeing similar discrepancies for Flink?

I am not sure I'm afraid (no easy access to flink right now). I tried
without success to get Apex runner going on Cloudera YARN for this today -
I'll keep trying when time allows.

I've updated the DAGs to show more detail:
https://github.com/gbif/beam-perf/tree/master/avro-to-avro

On Wed, Aug 22, 2018 at 1:41 PM Robert Bradshaw  wrote:

> That is quite the DAG... Are we seeing similar discrepancies for
> Flink? (Trying to understand if this is Beam->Spark translation bloat,
> or inherent to the WriteFiles transform itself.)
> On Wed, Aug 22, 2018 at 1:35 PM Tim Robertson 
> wrote:
> >
> > Thanks Robert
> >
> > > Have you profiled to see which stages and/or operations are taking up
> the time?
> >
> > Not yet. I'm browsing through the spark DAG produced which I've
> committed [1] and reading the code.
> >
> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
> >
> > On Wed, Aug 22, 2018 at 12:12 PM Robert Bradshaw 
> wrote:
> >>
> >> I agree that this is concerning. Some of the complexity may have also
> >> been introduced to accommodate writing files in Streaming mode, but it
> >> seems we should be able to execute this as a single Map operation.
> >>
> >> Have you profiled to see which stages and/or operations are taking up
> the time?
> >> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
> >>  wrote:
> >> >
> >> > Hi folks,
> >> >
> >> > I've recently been involved in projects rewriting Avro files and have
> discovered a concerning performance trait in Beam.
> >> >
> >> > I have observed Beam between 6-20x slower than native Spark or
> MapReduce code for a simple pipeline of read Avro, modify, write Avro.
> >> >
> >> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using
> Beam/Spark, 40 minutes with a map-only MR job
> >> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark,
> 18 minutes using vanilla Spark code. Test code available [1]
> >> >
> >> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters
> (Spark / YARN) on reference Dell / Cloudera hardware.
> >> >
> >> > I have only just started exploring but I believe the cause is rooted
> in the WriteFiles which is used by all our file based IO. WriteFiles is
> reasonably complex with reshuffles, spilling to temporary files (presumably
> to accommodate varying bundle sizes/avoid small files), a union, a GBK etc.
> >> >
> >> > Before I go too far with exploration I'd appreciate thoughts on
> whether we believe this is a concern (I do), if we should explore
> optimisations or any insight from previous work in this area.
> >> >
> >> > Thanks,
> >> > Tim
> >> >
> >> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>


Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Robert Bradshaw
That is quite the DAG... Are we seeing similar discrepancies for
Flink? (Trying to understand if this is Beam->Spark translation bloat,
or inherent to the WriteFiles transform itself.)
On Wed, Aug 22, 2018 at 1:35 PM Tim Robertson  wrote:
>
> Thanks Robert
>
> > Have you profiled to see which stages and/or operations are taking up the 
> > time?
>
> Not yet. I'm browsing through the spark DAG produced which I've committed [1] 
> and reading the code.
>
> [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>
> On Wed, Aug 22, 2018 at 12:12 PM Robert Bradshaw  wrote:
>>
>> I agree that this is concerning. Some of the complexity may have also
>> been introduced to accommodate writing files in Streaming mode, but it
>> seems we should be able to execute this as a single Map operation.
>>
>> Have you profiled to see which stages and/or operations are taking up the 
>> time?
>> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>>  wrote:
>> >
>> > Hi folks,
>> >
>> > I've recently been involved in projects rewriting Avro files and have 
>> > discovered a concerning performance trait in Beam.
>> >
>> > I have observed Beam between 6-20x slower than native Spark or MapReduce 
>> > code for a simple pipeline of read Avro, modify, write Avro.
>> >
>> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark, 
>> > 40 minutes with a map-only MR job
>> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18 
>> > minutes using vanilla Spark code. Test code available [1]
>> >
>> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark / 
>> > YARN) on reference Dell / Cloudera hardware.
>> >
>> > I have only just started exploring but I believe the cause is rooted in 
>> > the WriteFiles which is used by all our file based IO. WriteFiles is 
>> > reasonably complex with reshuffles, spilling to temporary files 
>> > (presumably to accommodate varying bundle sizes/avoid small files), a 
>> > union, a GBK etc.
>> >
>> > Before I go too far with exploration I'd appreciate thoughts on whether we 
>> > believe this is a concern (I do), if we should explore optimisations or 
>> > any insight from previous work in this area.
>> >
>> > Thanks,
>> > Tim
>> >
>> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro


Re: Beam application upgrade on Flink crashes

2018-08-22 Thread Aljoscha Krettek
Hi,

Unfortunately, there are currently no compatibility guarantees between 
different Beam versions. Beam itself doesn't have the required interfaces or 
procedures in place for supporting backwards compatibility of state and there 
have been quite some changes in the internals between Flink 1.4 and Flink 1.5 
that made larger changed necessary in how the Beam-Flink runner handles 
operator state.

Best,
Aljoscha

> On 22. Aug 2018, at 12:14, Jozef Vilcek  wrote:
> 
> Hm, I am sorry to hear this. I must of missed it in docs, that beam version 
> upgrades can break flink state. It is important information for ones wanting 
> to use Beam on Flink in production.
> 
> So, I guess there is no guarantee for another bump of Flink version to not 
> break things until it reach 1.7. 
> Event then, thinks can break maybe? Is there a plan making Flink runner more 
> robust and catch compatibility issues early by tests?
> 
> Just trying to figure out my options with upgrades. Does other runners suffer 
> the same weak guarantees?
> 
> 
> On Tue, Aug 21, 2018 at 9:25 PM Stephan Ewen  > wrote:
> Flink 1.7 will change the way the "restore serializer" is handled, which 
> should make it much easier to handle such cases.
> Especially breaking java class version format will not be an issue anymore.
> 
> That should help to make it easier to give the Beam-on-Flink runner cross 
> version compatibility.
> 
> 
> On Mon, Aug 20, 2018 at 6:46 PM, Maximilian Michels  > wrote:
> AFAIK the serializer used here is the CoderTypeSerializer which may not
> be recoverable because of changes to the contained Coder
> (TaggedKvCoder). It doesn't currently have a serialVersionUID, so even
> small changes could break serialization backwards-compatibility.
> 
> As of now Beam doesn't offer the same upgrade guarantees as Flink [1].
> This should be improved for the next release.
> 
> Thanks,
> Max
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
>  
> 
> 
> On 20.08.18 17:46, Stephan Ewen wrote:
> > Hi Jozef!
> > 
> > When restoring state, the serializer that created the state must still
> > be available, so the state can be read.
> > It looks like some serializer classes were removed between Beam versions
> > (or changed in an incompatible manner).
> > 
> > Backwards compatibility of an operator implementation needs cooperation
> > from the operator. Withing Flink itself, when we change the way an
> > operator uses state, we keep the old codepath and classes in a
> > "backwards compatibility restore" that takes the old state and brings it
> > into the shape of the new state. 
> > 
> > I am not deeply into the of how Beam and the Flink runner implement
> > their use of state, but it looks this part is not present, which could
> > mean that savepoints taken from Beam applications are not backwards
> > compatible.
> > 
> > 
> > On Mon, Aug 20, 2018 at 4:03 PM, Jozef Vilcek  > 
> > >> wrote:
> > 
> > Hello,
> > 
> > I am attempting to upgrade  Beam app from 2.5.0 running on Flink
> > 1.4.0 to Beam 2.6.0 running on Flink 1.5.0. I am not aware of any
> > state migration changes needed for Flink 1.4.0 -> 1.5.0 so I am just
> > starting a new App with updated libs from Flink save-point captured
> > by previous version of the app.
> > 
> > There is not change in topology. Job is accepted without error to
> > the new cluster which suggests that all operators are matched with
> > state based on IDs. However, app runs only few seccons and then
> > crash with:
> > 
> > java.lang.Exception: Exception while creating 
> > StreamOperatorStateContext.
> >   at 
> > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
> >   at 
> > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
> >   at 
> > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
> >   at 
> > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
> >   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> >   at java.lang.Thread.run(Thread.java:745)
> > Caused by: org.apache.flink.util.FlinkException: Could not restore 
> > operator state backend for 
> > DoFnOperator_43996aa2908fa46bb50160f751f8cc09_(1/100) from any of the 1 
> > provided restore options.
> >   at 
> > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
> >   at 
> > 

Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Tim Robertson
Thanks Robert

> Have you profiled to see which stages and/or operations are taking up the
time?

Not yet. I'm browsing through the spark DAG produced which I've committed
[1] and reading the code.

[1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro

On Wed, Aug 22, 2018 at 12:12 PM Robert Bradshaw 
wrote:

> I agree that this is concerning. Some of the complexity may have also
> been introduced to accommodate writing files in Streaming mode, but it
> seems we should be able to execute this as a single Map operation.
>
> Have you profiled to see which stages and/or operations are taking up the
> time?
> On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
>  wrote:
> >
> > Hi folks,
> >
> > I've recently been involved in projects rewriting Avro files and have
> discovered a concerning performance trait in Beam.
> >
> > I have observed Beam between 6-20x slower than native Spark or MapReduce
> code for a simple pipeline of read Avro, modify, write Avro.
> >
> >  - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark,
> 40 minutes with a map-only MR job
> >  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18
> minutes using vanilla Spark code. Test code available [1]
> >
> > These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark /
> YARN) on reference Dell / Cloudera hardware.
> >
> > I have only just started exploring but I believe the cause is rooted in
> the WriteFiles which is used by all our file based IO. WriteFiles is
> reasonably complex with reshuffles, spilling to temporary files (presumably
> to accommodate varying bundle sizes/avoid small files), a union, a GBK etc.
> >
> > Before I go too far with exploration I'd appreciate thoughts on whether
> we believe this is a concern (I do), if we should explore optimisations or
> any insight from previous work in this area.
> >
> > Thanks,
> > Tim
> >
> > [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro
>


Re: Beam application upgrade on Flink crashes

2018-08-22 Thread Jozef Vilcek
Hm, I am sorry to hear this. I must of missed it in docs, that beam version
upgrades can break flink state. It is important information for ones
wanting to use Beam on Flink in production.

So, I guess there is no guarantee for another bump of Flink version to not
break things until it reach 1.7.
Event then, thinks can break maybe? Is there a plan making Flink runner
more robust and catch compatibility issues early by tests?

Just trying to figure out my options with upgrades. Does other runners
suffer the same weak guarantees?


On Tue, Aug 21, 2018 at 9:25 PM Stephan Ewen  wrote:

> Flink 1.7 will change the way the "restore serializer" is handled, which
> should make it much easier to handle such cases.
> Especially breaking java class version format will not be an issue anymore.
>
> That should help to make it easier to give the Beam-on-Flink runner cross
> version compatibility.
>
>
> On Mon, Aug 20, 2018 at 6:46 PM, Maximilian Michels 
> wrote:
>
>> AFAIK the serializer used here is the CoderTypeSerializer which may not
>> be recoverable because of changes to the contained Coder
>> (TaggedKvCoder). It doesn't currently have a serialVersionUID, so even
>> small changes could break serialization backwards-compatibility.
>>
>> As of now Beam doesn't offer the same upgrade guarantees as Flink [1].
>> This should be improved for the next release.
>>
>> Thanks,
>> Max
>>
>> [1]
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table
>>
>> On 20.08.18 17:46, Stephan Ewen wrote:
>> > Hi Jozef!
>> >
>> > When restoring state, the serializer that created the state must still
>> > be available, so the state can be read.
>> > It looks like some serializer classes were removed between Beam versions
>> > (or changed in an incompatible manner).
>> >
>> > Backwards compatibility of an operator implementation needs cooperation
>> > from the operator. Withing Flink itself, when we change the way an
>> > operator uses state, we keep the old codepath and classes in a
>> > "backwards compatibility restore" that takes the old state and brings it
>> > into the shape of the new state.
>> >
>> > I am not deeply into the of how Beam and the Flink runner implement
>> > their use of state, but it looks this part is not present, which could
>> > mean that savepoints taken from Beam applications are not backwards
>> > compatible.
>> >
>> >
>> > On Mon, Aug 20, 2018 at 4:03 PM, Jozef Vilcek > > > wrote:
>> >
>> > Hello,
>> >
>> > I am attempting to upgrade  Beam app from 2.5.0 running on Flink
>> > 1.4.0 to Beam 2.6.0 running on Flink 1.5.0. I am not aware of any
>> > state migration changes needed for Flink 1.4.0 -> 1.5.0 so I am just
>> > starting a new App with updated libs from Flink save-point captured
>> > by previous version of the app.
>> >
>> > There is not change in topology. Job is accepted without error to
>> > the new cluster which suggests that all operators are matched with
>> > state based on IDs. However, app runs only few seccons and then
>> > crash with:
>> >
>> > java.lang.Exception: Exception while creating
>> StreamOperatorStateContext.
>> >   at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>> >   at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>> >   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
>> >   at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
>> >   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>> >   at java.lang.Thread.run(Thread.java:745)
>> > Caused by: org.apache.flink.util.FlinkException: Could not restore
>> operator state backend for
>> DoFnOperator_43996aa2908fa46bb50160f751f8cc09_(1/100) from any of the 1
>> provided restore options.
>> >   at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>> >   at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:240)
>> >   at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139)
>> >   ... 5 more
>> > Caused by: java.io.IOException: Unable to restore operator state
>> [bundle-buffer-tag]. The previous serializer of the operator state must be
>> present; the serializer could have been removed from the classpath, or its
>> implementation have changed and could not be loaded. This is a temporary
>> restriction that will be fixed in future versions.
>> >   at
>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:514)
>> >   at
>> 

Re: [DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Robert Bradshaw
I agree that this is concerning. Some of the complexity may have also
been introduced to accommodate writing files in Streaming mode, but it
seems we should be able to execute this as a single Map operation.

Have you profiled to see which stages and/or operations are taking up the time?
On Wed, Aug 22, 2018 at 11:29 AM Tim Robertson
 wrote:
>
> Hi folks,
>
> I've recently been involved in projects rewriting Avro files and have 
> discovered a concerning performance trait in Beam.
>
> I have observed Beam between 6-20x slower than native Spark or MapReduce code 
> for a simple pipeline of read Avro, modify, write Avro.
>
>  - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark, 40 
> minutes with a map-only MR job
>  - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18 
> minutes using vanilla Spark code. Test code available [1]
>
> These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark / 
> YARN) on reference Dell / Cloudera hardware.
>
> I have only just started exploring but I believe the cause is rooted in the 
> WriteFiles which is used by all our file based IO. WriteFiles is reasonably 
> complex with reshuffles, spilling to temporary files (presumably to 
> accommodate varying bundle sizes/avoid small files), a union, a GBK etc.
>
> Before I go too far with exploration I'd appreciate thoughts on whether we 
> believe this is a concern (I do), if we should explore optimisations or any 
> insight from previous work in this area.
>
> Thanks,
> Tim
>
> [1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro


Re: Beam Summit London 2018

2018-08-22 Thread Matthias Baetens
Hi Pascal, Javier,

Thanks for your interest in submitting a talk!
@Pascal: I am happy to check for you if what you have in mind is already
being covered in another talk submitted, and happy to help you find a good
topic to talk about :)

@Javier: it will depend on the type and subject of the session. If you are
planning to do an advanced workshop, it makes sense to allocate more time
(>1h). If it is a business case also covering architecture and technical, I
think longer sessions should be possible as well (~1h including Q). If
your talk turns out to have to much content, we can always look at ways to
split it up into 2 stand-alone sessions if that would make sense.

Best,
Matthias

On Tue, 21 Aug 2018 at 19:45 javier ramirez 
wrote:

> Hi,
>
> What'd be the duration of the talks? So I can scope the contents of my
> proposal.
>
> Looking forward to the summit!
>
> J
>
> On Tue, 21 Aug 2018, 14:47 Pascal Gula,  wrote:
>
>> Hi Matthias,
>> we (Peat / Plantix) might be interested by submitting a talk and I would
>> like to know if we can get access to the list of already submitted "Title"
>> to avoid submitting on similar topic!
>> Cheers,
>> Pascal
>>
>> On Tue, Aug 21, 2018 at 1:59 PM, Matthias Baetens <
>> baetensmatth...@gmail.com> wrote:
>>
>>> Hi everyone,
>>>
>>> We are happy to invite you to the first Beam Summit in London.
>>>
>>> The summit will be held in London at Level39
>>>  on *October 1 and 2.*
>>> You can register to attend for free on the Eventbrite page
>>> 
>>> .
>>>
>>> If you are interested in talking, please check our CfP form
>>>  and submit a talk!
>>>
>>> If you or your company is interested in helping out or sponsoring the
>>> summit (to keep it free), you can check out the sponsor booklet
>>> 
>>> .
>>>
>>> We will soon launch a blogpost with more details and announce the agenda
>>> closer to date.
>>>
>>> Thanks to everyone who helped make this happen, looking forward to
>>> welcoming you all in London!
>>>
>>> The Events & Meetups Group
>>>
>>> --
>>>
>>>
>>
>>
>>
>> --
>>
>> Pascal Gula
>> Senior Data Engineer / Scientist+49 (0)176 34232684 
>> <+49%20176%2034232684>www.plantix.net 
>>  PEAT GmbHKastanienallee 4
>> 10435 Berlin // Germany 
>> 
>>  Download 
>> the App! 
>>
>> --


[DISCUSS] Performance of write() in file based IO

2018-08-22 Thread Tim Robertson
Hi folks,

I've recently been involved in projects rewriting Avro files and have
discovered a concerning performance trait in Beam.

I have observed Beam between 6-20x slower than native Spark or MapReduce
code for a simple pipeline of read Avro, modify, write Avro.

 - Rewriting 200TB of Avro files (big cluster): 14 hrs using Beam/Spark, 40
minutes with a map-only MR job
 - Rewriting 1.5TB Avro file (small cluster): 2 hrs using Beam/Spark, 18
minutes using vanilla Spark code. Test code available [1]

These tests were running Beam 2.6.0 on Cloudera 5.12.x clusters (Spark /
YARN) on reference Dell / Cloudera hardware.

I have only just started exploring but I believe the cause is rooted in the
WriteFiles which is used by all our file based IO. WriteFiles is reasonably
complex with reshuffles, spilling to temporary files (presumably to
accommodate varying bundle sizes/avoid small files), a union, a GBK etc.

Before I go too far with exploration I'd appreciate thoughts on whether we
believe this is a concern (I do), if we should explore optimisations or any
insight from previous work in this area.

Thanks,
Tim

[1] https://github.com/gbif/beam-perf/tree/master/avro-to-avro


Re: Status of IntelliJ with Gradle

2018-08-22 Thread Maximilian Michels
Thanks Lukasz. I also found that I can never fix all import errors by 
manually adding jars to the IntelliJ library list. It is also not a good 
solution because it breaks on reloading the Gradle project.


New contributors might find the errors in IntelliJ distracting. Even 
worse, they might assume the problem is on their side. If we can't fix 
them soon, I'd suggest documenting the IntelliJ limitations in the 
contributor guide.


On 20.08.18 17:58, Lukasz Cwik wrote:
Yes, I have the same issues with vendoring. These are the things that I 
have tried without success to get Intellij to import the vendored 
modules correctly:
* attempted to modify the idea.module.scopes to only include the 
vendored artifacts (for some reason this is ignored and Intellij is 
relying on the output of its own internal module, nothing I add to the 
scopes seems to impact anything)
* modify the generated iml beforehand to add the vendored jar file as 
the top dependency (jar never appears in the modules dependencies)


On Mon, Aug 20, 2018 at 8:36 AM Maximilian Michels > wrote:


Thank you Etienne for opening the issue.

Anyone else having problems with the shaded Protobuf dependency?

On 20.08.18 16:14, Etienne Chauchot wrote:
 > Hi Max,
 >
 > I experienced the same, I had first opened a general ticket
 > (https://issues.apache.org/jira/browse/BEAM-4418) about gradle
 > improvements and I just split it in several tickets. Here is the one
 > concerning the same issue:
https://issues.apache.org/jira/browse/BEAM-5176
 >
 > Etienne
 >
 > Le lundi 20 août 2018 à 15:51 +0200, Maximilian Michels a écrit :
 >> Hi Beamers,
 >>
 >> It's great to see the Beam build system overhauled. Thank you
for all
 >> the hard work.
 >>
 >> That said, I've just started contributing to Beam again and I feel
 >> really stupid for not having a fully-functional IDE. I've closely
 >> followed the IntelliJ/Gradle instructions [1]. In the terminal
 >> everything works fine.
 >>
 >> First of all, I get warnings like the following and the build fails:
 >>
 >> 
 >>

.../beam/sdks/java/core/src/main/java/org/apache/beam/sdk/package-info.java:29:
 >> warning: [deprecation] NonNull in
edu.umd.cs.findbugs.annotations has
 >> been deprecated
 >> @DefaultAnnotation(NonNull.class)
 >>                    ^
 >> error: warnings found and -Werror specified
 >> 1 error
 >> 89 warnings
 >> =
 >>
 >> Somehow the "-Xlint:-deprecation" compiler flag does not get
through but
 >> "-Werror" does. I can get it to compile by removing the
"-Werror" flag
 >> from BeamModulePlugin but that's obviously not the solution.
 >>
 >> Further, once the build succeeds I still have to add the relocated
 >> Protobuf library manually because the one in "vendor" does not get
 >> picked up. I've tried clearing caches / re-adding the project /
 >> upgrading IntelliJ / changing Gradle configs.
 >>
 >>
 >> Is this just me or do you also have similar problems? If so, I would
 >> like to compile a list of possible fixes to help other contributors.
 >>
 >>
 >> Thanks,
 >> Max
 >>
 >>
 >> Tested with
 >> - IntelliJ 2018.1.6 and 2018.2.1.
 >> - MacOS
 >> - java version "1.8.0_112"
 >>
 >> [1] https://beam.apache.org/contribute/intellij/
 >>
 >>



--
Max


Re: Apache Beam Newsletter - August 2018

2018-08-22 Thread Etienne Chauchot
Hi Rose, 
I know the newsletter has already been sent, but may I add some of my ongoing 
subjects:
What's been done:- CI improvement: for each new commit on master Nexmark suite 
is run in both batch and streaming mode
in spark, flink, dataflow (thanks to Andrew) and dashboards graphs are produced 
to track functional and performance
regressions.
For talks, I guess only talks that already took place are included, not the 
ones scheduled for the ApacheCon in
September right ?
Etienne




Le vendredi 10 août 2018 à 12:37 -0700, Rose Nguyen a écrit :
> August 2018 | Newsletter
> What’s been doneApache Beam 2.6.0 Release
> The Apache Beam team is pleased to announce the release of 2.6.0 version! 
> This is the second release under the new
> build system, and the process has kept improving.You can download the release 
> here and read the release notes for more
> details.
> 
> Beam Summit London 2018 (by: Matthias Baetens, Gris Cuevas, Viktor Kotai)
> Approval from the Apache Software Foundation is underway. We are currently 
> finding a venue  and sponsors. We’ll send
> the call for participation soon to curate the agenda.If you’re interested in 
> participating in the organization of the
> event, reach out to the organizers.Dates TBD be we are considering the first 
> or last days of October.
> Support for Bounded SDF in all runners (by: Eugene Kirpichov)
> Beam introduced recently a new type of DoFn called SplittableDoFn (SDF) to 
> enable richer modularity in its IO
> connectors. Support for SDF in bounded (batch) connectors was added for all 
> runners. Apache Kudu IO (by: Tim
> Robertson)
> A new IO connector for the Apache Kudu data store was added recently.See 
> BEAM-2661 for more details on it.
> IO improvements (by: Ismaël Mejía)
> HBaseIO added a new transform based on SDF called readAll.See BEAM-4020 for 
> more details on it.
> 
> 
> What we’re working on...Interactive Runner for Beam (by: Harsh Vardhan, Sindy 
> Li, Chamikara Jayalath, Anand Iyer,
> Robert Bradshaw)
> Notebook-based interactive processing of Beam pipelines.This is now ready to 
> try out in Jupyter Notebook for
> BeamPython pipelines over DirectRunner!See the design doc for more details 
> and watch a demo here.Thoughts, comments
> and discussions welcome :)
> Python 3 Support (by, in alphabetical order: Ahmet Altay,  Robert Bradshaw, 
> Charles Chen, Matthias Feys, Holden Karau,
> Sergei Lebedev, Robbe Sneyders, Valentyn Tymofieiev)
> Major progress has been made on making Beam Python codebase 
> Python3-compatible through futurization.Read for more
> details in the proposal.
> 
> New IO connectors (by: John Rudolf Lewis, Jacob Marble)
> Amazon Simple Queue Service (SQS) is in review.Amazon Redshift is in 
> progress.Portable Runners (by: Ankur Goenka,
> Eugene Kirpichov, Ben Sidhom, Axel Magnuson, Thomas Weise, Ryan Williams , 
> Robert Bradshaw, Daniel Oliveira, Holden
> Karau)
> Good progress on Portable Flink Runner and many of the ValidatesRunner tests 
> are passing now.Portable Flink Runner can
> now execute batch WordCount in Java, Python and Go.Many enhancements and bug 
> fixes in Portable Reference Runner.See
> Jira https://issues.apache.org/jira/browse/BEAM-2889 for more details on  
> progress. Dependencies (by: Yifan Zou,
> Chamikara Jayalath)
> We added a dependencies guide for Beam and tooling to automatically create 
> JIRAs for significantly outdated
> dependencies. We are working on upgrading existing dependencies.See the Beam 
> dependencies guide for more details.
> 
> 
> 
> New MembersNew Contributors
> Rose Nguyen, Seattle, WA, USABeam docs contributor Working to improve docs 
> usability Connell O'Callaghan, Seattle, WA,
> USAInterested in growing the communityHelping with community triages and 
> managing issues
> 
> 
> 
> Talks & MeetupsStream Processing Meetup@LinkedIn  7/19/18
> Xinyu Liu gave a talk on building a Samza Runner for Beam“Beam meet up, 
> Samza!” and see it here. 
> Large Scale Landuse Classification of Satellite Images, Berlin 
> Buzzwords@Berlin 6/11/18
> Suneel Marthi and Jose Luis Contreras gave a talk on using streaming 
> pipelines built on Apache Flink for model
> training and inference. They leveraged convolutional Neural Networks (CNNs) 
> built with Apache MXNet to train Deep
> Learning models for land use classification. Read about it and watch it here.
> Big Data in Production Meetup@Cambridge, MA 6/28/18
> Robert Bradshaw and Eila Arich-Landkof gave a talk about Apache Beam and 
> machine learning. Event details here and
> watch their talks here.
> 
> ResourcesAwesome Beam (by: Pablo Estrada)
> Inspired by efforts in Awesome Flink and  Awesome Hadoop, I’ve created the 
> Awesome Beam repo to aggregate interesting
> Beam things.
> 
> 
> Until Next Time!
> This edition was curated by our community of contributors, committers and 
> PMCs. It contains work done in June and July
> of 2018 and ongoing efforts. We hope to provide visibility to what's going on 
> in the community, 

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-22 Thread Henning Rohde
Sending bundles that cannot be executed, i.e., the situation described to
cause deadlock in Flink in the beginning of the thread with mapB. The
discussion of exposing (or assuming an infinitely large) concurrency level
-- while a useful concept in its own right -- came around as a way to
unblock mapB.

On Tue, Aug 21, 2018 at 2:16 PM Lukasz Cwik  wrote:

> Henning, can you clarify by what you mean with send non-executable bundles
> to the SDK harness and how it is useful for Flink?
>
> On Tue, Aug 21, 2018 at 2:01 PM Henning Rohde  wrote:
>
>> I think it will be useful to the runner to know upfront what the
>> fundamental threading capabilities are for the SDK harness (say, "fixed",
>> "linear", "dynamic", ..) so that the runner can upfront make a good static
>> decision on #harnesses and how many resources they should each have. It's
>> wasteful to give the Foo SDK a whole many-core machine with TBs of memory,
>> if it can only support a single bundle at a time. I think this is also in
>> line with what Thomas and Luke are suggesting.
>>
>> However, it still seems to me to be a semantically problematic idea to
>> send non-executable bundles to the SDK harness. I understand it's useful
>> for Flink, but is that really the best path forward?
>>
>>
>>
>> On Mon, Aug 20, 2018 at 5:44 PM Ankur Goenka  wrote:
>>
>>> That's right.
>>> To add to it. We added multi threading to python streaming as a single
>>> thread is sub optimal for streaming use case.
>>> Shall we move towards a conclusion on the SDK bundle processing upper
>>> bound?
>>>
>>> On Mon, Aug 20, 2018 at 1:54 PM Lukasz Cwik  wrote:
>>>
 Ankur, I can see where you are going with your argument. I believe
 there is certain information which is static and won't change at pipeline
 creation time (such as Python SDK is most efficient doing one bundle at a
 time) and some stuff which is best at runtime, like memory and CPU limits,
 worker count.

 On Mon, Aug 20, 2018 at 1:47 PM Ankur Goenka  wrote:

> I would prefer to to keep it dynamic as it can be changed by the
> infrastructure or the pipeline author.
> Like in case of Python, number of concurrent bundle can be changed by
> setting pipeline option worker_count. And for Java it can be computed 
> based
> on the cpus on the machine.
>
> For Flink runner, we can use the worker_count parameter for now to
> increase the parallelism. And we can have 1 container for each 
> mapPartition
> task on Flink while reusing containers as container creation is expensive
> especially for Python where it installs a bunch of dependencies. There is 
> 1
> caveat though. I have seen machine crashes because of too many 
> simultaneous
> container creation. We can rate limit container creation in the code to
> avoid this.
>
> Thanks,
> Ankur
>
> On Mon, Aug 20, 2018 at 9:20 AM Lukasz Cwik  wrote:
>
>> +1 on making the resources part of a proto. Based upon what Henning
>> linked to, the provisioning API seems like an appropriate place to 
>> provide
>> this information.
>>
>> Thomas, I believe the environment proto is the best place to add
>> information that a runner may want to know about upfront during pipeline
>> pipeline creation. I wouldn't stick this into PipelineOptions for the 
>> long
>> term.
>> If you have time to capture these thoughts and update the environment
>> proto, I would suggest going down that path. Otherwise anything short 
>> term
>> like PipelineOptions will do.
>>
>> On Sun, Aug 19, 2018 at 5:41 PM Thomas Weise  wrote:
>>
>>> For SDKs where the upper limit is constant and known upfront, why
>>> not communicate this along with the other harness resource info as part 
>>> of
>>> the job submission?
>>>
>>> Regarding use of GRPC headers: Why not make this explicit in the
>>> proto instead?
>>>
>>> WRT runner dictating resource constraints: The runner actually may
>>> also not have that information. It would need to be supplied as part of 
>>> the
>>> pipeline options? The cluster resource manager needs to allocate 
>>> resources
>>> for both, the runner and the SDK harness(es).
>>>
>>> Finally, what can be done to unblock the Flink runner / Python until
>>> solution discussed here is in place? An extra runner option for SDK
>>> singleton on/off?
>>>
>>>
>>> On Sat, Aug 18, 2018 at 1:34 AM Ankur Goenka 
>>> wrote:
>>>
 Sounds good to me.
 GRPC Header of the control channel seems to be a good place to add
 upper bound information.
 Added jiras:
 https://issues.apache.org/jira/browse/BEAM-5166
 https://issues.apache.org/jira/browse/BEAM-5167

 On Fri, Aug 17, 2018 at 10:51 PM Henning Rohde 
 wrote:

> Regarding resources: the runner can